安信可AI语音开发板AiPi-PalChatV1 + MCP,通过HomeAssistant自动化控制设备

描述

以下作品由安信可社区用户

WT_0213制作

自从拥有了安信可AiPi-PalChatV1

Ai-M61+VC02做的语音控制器不香了

这是之前做的两个版本

通过VC02控制HA灯光的小项目

Ai-M61+VC02语音控制HA设备​bbs.ai-thinker.com/forum.php?mod=viewthread&tid=45021

Ai-M61+VC02语音控制HA设备v1.1版​bbs.ai-thinker.com/forum.php?mod=viewthread&tid=45059

这两个项目都是基于HA的自动化来完成的。

参考:

[智能家居]MQTT控制HomeAssistant设备​bbs.ai-thinker.com/forum.php?mod=viewthread&tid=44644&fromuid=15918

现在都在链接智能化、大模型、MCP、AI,那就接入吧

不得不说虾哥开源的这个小智真的太棒了

方案落地

下面介绍下如何通过AiPi-PalChatV1+MCP控制HA设备。

参考:

【AiPi-PalChatV1语音开发板】小智 MCP 接入 Home Assistant​bbs.ai-thinker.com/forum.php?mod=viewthread&tid=47027

1、下载部署小智MCP服务代码

git clone https://gitee.com/lazy-ai/ai-pi-pal-chat-v1-ha.git

克隆代码后,先运行

pip install paho-mqtt
pip install -r requirements.txt
如果是mac
pip install paho-mqtt
pip install -r requirements_mac.txt

等待安装完成即可

语音控制器

安装好后如果报 XXXX 模块没有找到,就继续执行 pip install xxx把缺失的部分安装全就好了。

2、下载小智客户端

https://github.com/huangjunsen0406/py-xiaozhi.git

github 地址可能间歇性的打不开,在gitee上面找了个

git clone https://gitee.com/tinytaro/py-xiaozhi0.git

克隆下来以后和上面基本一样先安装依赖

pip install -r requirements.txt
如果是mac
pip install -r requirements_mac.txt

这里需要注意的是有个包 **cv2**找不到

pip install opencv-python

还有一个修改点,就是将小智客户端中的IoT相关代码注释掉,不然不走MCP

语音控制器

Iot代码在src下面iot目录下

注释 **application.py**第138行

# 初始化物联网设备

# self._initialize_iot_devices()

这样就好了然后进到项目目录打开命令行

执行

python main.py
 

语音控制器

正常情况下他会播报一个设备码,和AiPi-PalChatV1 配网时效果时是一样的。将设备码添加到智能体。

3、配置MCP接入点

打开 https://xiaozhi.me/

语音控制器

点击**控制台**, 登录后

语音控制器

点击**配置角色**,拉到屏幕最下方

语音控制器

右下角**MCP接入点**

语音控制器

复制接入点地址到第一步命令行

先执行

export MCP_ENDPOINT=接入点地址

再执行命令

python mcp_pipe.py switch_lamp.py

成功运行输出

(base) ➜ mcp git:(main) python mcp_pipe.py switch_lamp.py
2025-06-17 18:15:30,888 - MCP_PIPE - INFO - Connecting to WebSocket server...
2025-06-17 18:15:31,541 - MCP_PIPE - INFO - Successfully connected to WebSocket server
2025-06-17 18:15:31,548 - MCP_PIPE - INFO - Started switch_lamp.py process
/Users/tengyun1/AI/mcp-calculator/switch_lamp.py:30: DeprecationWarning: Callback API version 1 is deprecated, update to latest version
client = mqtt.Client() # 创建MQTT客户端实例
[06/17/25 18:15:32] INFO Processing request of type server.py:523
ListToolsRequest
2025-06-17 18:15:32,397 - Home Assistant MCP - ERROR - MQTT连接: 0
ERROR MQTT连接: 0 switch_lamp.py:51
2025-06-17 18:15:32,845 - Home Assistant MCP - INFO - Connected with result code 0
INFO Connected with result code 0 switch_lamp.py:33

这样就都跑起来了,然后就是测试

语音控制器

在会话框中输入“开灯/关灯”,右侧会同步限制MQTT接收到的信息。

下面是MCP 源代码

# server.py
import sys
import json
import logging
import paho.mqtt.client as mqtt
import threading
from mcp.server.fastmcp import FastMCP
# 配置日志
logger = logging.getLogger('Home Assistant MCP')
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(http://logging.INFO)
# Fix UTF-8 encoding for Windows console
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# MQTT配置
MQTT_CONFIG = {
"broker_address": "http://broker.emqx.io",
"broker_port": 1883,
"username": None,
"password": None,
}
client = mqtt.Client() # 创建MQTT客户端实例
def on_connect(client, userdata, flags, rc):
http://logger.info(f"Connected with result code {rc}")
client.publish("ha/ai", "Hello MQTT") # 连接后发布消息
client.subscribe("ha/ai") # 订阅主题以接收消息
def on_message(client, userdata, message):
try:
payload = message.payload.decode()
except UnicodeDecodeError:
payload = f"<无法解码的消息内容: {message.payload}>"
http://logger.info(f"Received message: {payload} on topic {message.topic}")
def init_mqtt():
try:
client.on_connect = on_connect
client.on_message = on_message
if MQTT_CONFIG["username"] and MQTT_CONFIG["password"]:
client.username_pw_set(MQTT_CONFIG["username"], MQTT_CONFIG["password"])
ret = client.connect(MQTT_CONFIG["broker_address"], MQTT_CONFIG["broker_port"], 60)
logger.error(f"MQTT连接: {ret}")
client.loop_forever()
except Exception as e:
logger.error(f"MQTT连接失败: {e}")
sys.exit(1)
# Create an MCP server
mcp = FastMCP("Home Assistant MCP")
# Add a lamp control tool
@mcp.tool()
def switchLamp(on_off: bool) -> dict:
"""这是一个用于控制HomeAssistant的灯的接口"""
result = "开灯成功" if on_off else "关灯成功"
http://logger.info(f"switchLamp formula: {on_off}, result: {result}")
status = 1 if on_off else 0
payload = json.dumps({"status": status})
client.publish("ha/ai", payload) # 连接后发布消息
return {"success": True, "result": result}
# Start the server
if __name__ == "__main__":
# 启动 MQTT 客户端在单独的线程中
mqtt_thread = threading.Thread(target=init_mqtt, daemon=True)
mqtt_thread.start()
mcp.run(transport="stdio")


审核编辑 黄宇

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分