简介
最近半年来,相信热度很高的一个话题就是MCP,即模型上下文通信协议。现在很多AI 的客户端都支持MCP服务的直接调用。网上也有很多MCP server可以用。即可以使用大模型来控制本地的某些服务。那么在这篇文章中我将会带着大家手动来创建自己的MCP Server 并且通过Cherry Studio进行服务的调用使其控制T5 AI开发板上的LED灯进行闪烁。
MCP服务搭建
首先在使用MCPServer之前需要配置环境这里使用的是UV的包管理方式。

使用下述命令进行UV的安装
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
然后重启PowerShell并且检查UV的版本

至此UV安装完毕。然后我们来创建一个文件夹,并且初始化UV的环境变量并且增加MCP的依赖。
uv init --python 3.13 my-mcp-project
上述命令即使用UV来创建一个Py版本为3.13的MCP project。然后在项目中执行下面的命令
uv add "mcp[client]"
至此MCP的依赖已经安装完成。
然后将下面的代码拷贝到main中
from mcp.server.fastmcp import FastMCP
import paho.mqtt.client as mqtt
# 创建 MCP server
mcp = FastMCP("Demo")
# 配置 MQTT
BROKER = "broker.emqx.io"
PORT = 1883
TOPIC = "tuya/tos-test"
client = mqtt.Client()
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("已连接到 MQTT Broker!")
else:
print(f"连接失败, code={rc}")
client.on_connect = on_connect
client.connect(BROKER, PORT, 60)
client.loop_start()
# 初始化时连接一次
connect_mqtt()
# 定义开关灯工具
@mcp.tool()
def switch_led(state: str) -> str:
"""
开关LED灯
参数:
state: "ON" 打开LED灯, "OFF" 关闭LED灯
"""
if state.upper() not in ["ON", "OFF"]:
return "参数错误: 只能是 'ON' 或 'OFF'"
client.publish(TOPIC, state.upper(), qos=0)
print(f"已发送消息: {state.upper()} 到 {TOPIC}")
return f"LED灯已切换为 {state.upper()}"
# 动态资源示例(保持不变)
@mcp.resource("greeting://{name}")
def get_greeting(name: str) -> str:
"""Get a personalized greeting"""
return f"Hello, {name}!"
if __name__ == "__main__":
mcp.run(transport="streamable-http")其代码的主要行为为,定义了一个MCP的服务,即开灯或者关灯,通讯的方式采用的是流式的HTTP服务。其中连接到了公共的MQTT服务器来进行消息的发布。当MCP server收到大模型的命令的时候,其对应的switch_led就会向对应的主题发送消息。那么我们只需要在T5的开发板中订阅这个MQTT的消息即可完成整个MCP服务的调用。从而来控制LED灯的闪烁。

首先启动这个MCP Server,然后打开Cherry studio。进行MCP服务的配置。

类型选择HTTP,然后填上本机的MCP服务地址。之后点击更新即可。 然后我们尝试使用大模型来控制LED灯的闪烁。可以通过MQTT消息的发布状态来确定是否调用成功。或者增加状态日志的打印。
可以看到现在的时间,已经成功的调用了MCP的服务,同时向MQTT发送了数据。

然后关闭LED。

至此MCP的服务搭建完毕。
涂鸦T5AI开发板 连接MQTT和GPIO控制

涂鸦在SDK的协议example下的MQTTdemo下已经有了完整的实现。我们这里只需要做一点小小的修改。 首先修改WIFI的账号和密码。然后引入下述的两个头文件。
#include "tkl_gpio.h" #include "tal_api.h"
在MQTT消息接收回调函数中增加收到MQTT消息时对GPIO的控制。
static void mqtt_client_message_cb(void *client, uint16_t msgid, const mqtt_client_message_t *msg, void *userdata)
{
PR_DEBUG("recv message TopicName:%s, payload len:%d", msg->topic, msg->length);
// 检查是否是目标主题的消息
if (strcmp(msg->topic, "tuya/tos-test") == 0) {
// 将payload转换为字符串进行比较
char *payload = (char *)msg->payload;
// 检查消息内容是否为"ON"
if (strncmp(payload, "ON", msg->length) == 0 || strncmp(payload, "on", msg->length) == 0) {
// 打开LED灯
tkl_gpio_write(TUYA_GPIO_NUM_1, TUYA_GPIO_LEVEL_HIGH);
PR_DEBUG("LED turned ON");
}
// 检查消息内容是否为"OFF"
else if (strncmp(payload, "OFF", msg->length) == 0 || strncmp(payload, "off", msg->length) == 0) {
// 关闭LED灯
tkl_gpio_write(TUYA_GPIO_NUM_1, TUYA_GPIO_LEVEL_LOW);
PR_DEBUG("LED turned OFF");
}
}
}然后在pub回调函数中移除对主题的取消订阅代码(否则的话,等任务运行完毕订阅会被自动取消)
static void mqtt_client_puback_cb(void *client, uint16_t msgid, void *userdata)
{
PR_DEBUG("PUBACK successed ID:%d", msgid);
// PR_DEBUG("UnSubscribe topic tuya/tos-test");
// mqtt_client_unsubscribe(client, "tuya/tos-test", MQTT_QOS_0);
// PR_DEBUG("MQTT Client Disconnect");
// mqtt_client_disconnect(client);
}然后在主程序中增加对GPIO 1 的初始化代码
TUYA_GPIO_BASE_CFG_T out_pin_cfg = {
.mode = TUYA_GPIO_PUSH_PULL, .direct = TUYA_GPIO_OUTPUT, .level = TUYA_GPIO_LEVEL_LOW};
TUYA_CALL_ERR_LOG(tkl_gpio_init(TUYA_GPIO_NUM_1, &out_pin_cfg));
int i = 10;
for (int index = 0; index < i; index++) {
tkl_gpio_write(TUYA_GPIO_NUM_1, TUYA_GPIO_LEVEL_HIGH);
tal_system_sleep(100);
tkl_gpio_write(TUYA_GPIO_NUM_1, TUYA_GPIO_LEVEL_LOW);
tal_system_sleep(100);
}对应的GPIO pin可以查看下面的原理图

然后在主程序中移除或者注释掉主程序task删除任务的代码
static void tuya_app_thread(void *arg)
{
user_main();
while (1) {
tal_system_sleep(1000); // 每秒睡眠 1s,降低 CPU 占用
}
// tal_thread_delete(ty_app_thread);
// ty_app_thread = NULL;
}完整的代码如下所示。
#include "tuya_cloud_types.h"
#include "mqtt_client_interface.h"
#include "tuya_config_defaults.h"
#include "core_mqtt_config.h"
#include "core_mqtt.h"
#include "tuya_transporter.h"
#include "backoff_algorithm.h"
#include "tal_api.h"
#include "tkl_output.h"
#include "lwip_init.h"
#include "netmgr.h"
#if defined(ENABLE_WIFI) && (ENABLE_WIFI == 1)
#include "netconn_wifi.h"
#include "tkl_gpio.h"
#endif
#if defined(ENABLE_WIRED) && (ENABLE_WIRED == 1)
#include "netconn_wired.h"
#endif
/***********************************************************
*********************** macro define ***********************
***********************************************************/
#ifdef ENABLE_WIFI
#define DEFAULT_WIFI_SSID "ImmortalWrt"
#define DEFAULT_WIFI_PSWD "mazha1997"
#endif
/***********************************************************
********************** typedef define **********************
***********************************************************/
typedef struct {
mqtt_client_config_t config;
MQTTContext_t mqclient;
tuya_transporter_t network;
uint8_t mqttbuffer[CORE_MQTT_BUFFER_SIZE];
} mqtt_client_context_t;
/***********************************************************
********************** variable define *********************
***********************************************************/
static netmgr_status_e netmgr_status = NETMGR_LINK_DOWN;
/***********************************************************
********************** function define *********************
***********************************************************/
static void mqtt_client_connected_cb(void *client, void *userdata)
{
PR_INFO("mqtt client connected! try to subscribe tuya/tos-test");
uint16_t msgid = mqtt_client_subscribe(client, "tuya/tos-test", MQTT_QOS_0);
if (msgid <= 0) {
PR_ERR("Subscribe failed!");
}
PR_DEBUG("Subscribe topic tuya/tos-test ID:%d", msgid);
}
static void mqtt_client_disconnected_cb(void *client, void *userdata)
{
PR_INFO("mqtt client disconnected!");
// PR_DEBUG("MQTT Client Deinit");
// mqtt_client_deinit(client);
}
static void mqtt_client_message_cb(void *client, uint16_t msgid, const mqtt_client_message_t *msg, void *userdata)
{
PR_DEBUG("recv message TopicName:%s, payload len:%d", msg->topic, msg->length);
// 检查是否是目标主题的消息
if (strcmp(msg->topic, "tuya/tos-test") == 0) {
// 将payload转换为字符串进行比较
char *payload = (char *)msg->payload;
// 检查消息内容是否为"ON"
if (strncmp(payload, "ON", msg->length) == 0 || strncmp(payload, "on", msg->length) == 0) {
// 打开LED灯
tkl_gpio_write(TUYA_GPIO_NUM_1, TUYA_GPIO_LEVEL_HIGH);
PR_DEBUG("LED turned ON");
}
// 检查消息内容是否为"OFF"
else if (strncmp(payload, "OFF", msg->length) == 0 || strncmp(payload, "off", msg->length) == 0) {
// 关闭LED灯
tkl_gpio_write(TUYA_GPIO_NUM_1, TUYA_GPIO_LEVEL_LOW);
PR_DEBUG("LED turned OFF");
}
}
}
static void mqtt_client_subscribed_cb(void *client, uint16_t msgid, void *userdata)
{
PR_DEBUG("Subscribe successed ID:%d", msgid);
uint16_t new_msgid = mqtt_client_publish(client, "tuya/tos-test",
(const uint8_t *)"hello, tuya-open-sdk-for-device", // 类型转换
strlen("hello, tuya-open-sdk-for-device") + 1, MQTT_QOS_1);
if (new_msgid <= 0) {
PR_ERR("Publish failed!");
}
PR_DEBUG("Publish msg ID:%d", new_msgid);
}
static void mqtt_client_puback_cb(void *client, uint16_t msgid, void *userdata)
{
PR_DEBUG("PUBACK successed ID:%d", msgid);
// PR_DEBUG("UnSubscribe topic tuya/tos-test");
// mqtt_client_unsubscribe(client, "tuya/tos-test", MQTT_QOS_0);
// PR_DEBUG("MQTT Client Disconnect");
// mqtt_client_disconnect(client);
}
static void mqtt_client_example(void)
{
PR_DEBUG("start mqtt client to broker.emqx.io");
/* MQTT Client init */
mqtt_client_context_t mqtt_client = {0};
mqtt_client_status_t mqtt_status;
const mqtt_client_config_t mqtt_config = {.cacert = NULL,
.cacert_len = 0,
.host = "broker.emqx.io",
.port = 1883,
.keepalive = MQTT_KEEPALIVE_INTERVALIN,
.timeout_ms = MATOP_TIMEOUT_MS_DEFAULT,
.clientid = "tuya-open-sdk-for-device-01",
.username = "emqx",
.password = "public",
.on_connected = mqtt_client_connected_cb,
.on_disconnected = mqtt_client_disconnected_cb,
.on_message = mqtt_client_message_cb,
.on_subscribed = mqtt_client_subscribed_cb,
.on_published = mqtt_client_puback_cb,
.userdata = NULL};
mqtt_status = mqtt_client_init(&mqtt_client, &mqtt_config);
if (mqtt_status != MQTT_STATUS_SUCCESS) {
PR_ERR("MQTT init failed: Status = %d.", mqtt_status);
return;
}
mqtt_status = mqtt_client_connect(&mqtt_client);
if (MQTT_STATUS_NOT_AUTHORIZED == mqtt_status) {
PR_ERR("MQTT connect fail:%d", mqtt_status);
return;
}
mqtt_client_yield(&mqtt_client);
}
/**
* @brief __link_status_cb
*
* @param[in] param:Task parameters
* @return none
*/
OPERATE_RET __link_status_cb(void *data)
{
PR_DEBUG("link status changed: %d", (netmgr_status_e)data);
if (netmgr_status == (netmgr_status_e)data && NETMGR_LINK_UP == (netmgr_status_e)data)
return OPRT_OK;
netmgr_status = (netmgr_status_e)data;
return OPRT_OK;
}
/**
* @brief user_main
*
* @return void
*/
void user_main(void)
{
OPERATE_RET rt = OPRT_OK; // 添加这一行
/* basic init */
tal_log_init(TAL_LOG_LEVEL_DEBUG, 1024, (TAL_LOG_OUTPUT_CB)tkl_log_output);
PR_NOTICE("Application information:");
PR_NOTICE("Project name: %s", PROJECT_NAME);
PR_NOTICE("App version: %s", PROJECT_VERSION);
PR_NOTICE("Compile time: %s", __DATE__);
PR_NOTICE("TuyaOpen version: %s", OPEN_VERSION);
PR_NOTICE("TuyaOpen commit-id: %s", OPEN_COMMIT);
PR_NOTICE("Platform chip: %s", PLATFORM_CHIP);
PR_NOTICE("Platform board: %s", PLATFORM_BOARD);
PR_NOTICE("Platform commit-id: %s", PLATFORM_COMMIT);
TUYA_GPIO_BASE_CFG_T out_pin_cfg = {
.mode = TUYA_GPIO_PUSH_PULL, .direct = TUYA_GPIO_OUTPUT, .level = TUYA_GPIO_LEVEL_LOW};
TUYA_CALL_ERR_LOG(tkl_gpio_init(TUYA_GPIO_NUM_1, &out_pin_cfg));
int i = 10;
for (int index = 0; index < i; index++) {
tkl_gpio_write(TUYA_GPIO_NUM_1, TUYA_GPIO_LEVEL_HIGH);
tal_system_sleep(100);
tkl_gpio_write(TUYA_GPIO_NUM_1, TUYA_GPIO_LEVEL_LOW);
tal_system_sleep(100);
}
tal_kv_init(&(tal_kv_cfg_t){
.seed = "vmlkasdh93dlvlcy",
.key = "dflfuap134ddlduq",
});
tal_sw_timer_init();
tal_workq_init();
tal_event_subscribe(EVENT_LINK_STATUS_CHG, "mqtt_client", __link_status_cb, SUBSCRIBE_TYPE_NORMAL);
#if defined(ENABLE_LIBLWIP) && (ENABLE_LIBLWIP == 1)
TUYA_LwIP_Init();
#endif
// network init
netmgr_type_e type = 0;
#if defined(ENABLE_WIFI) && (ENABLE_WIFI == 1)
type |= NETCONN_WIFI;
#endif
#if defined(ENABLE_WIRED) && (ENABLE_WIRED == 1)
type |= NETCONN_WIRED;
#endif
netmgr_init(type);
#if defined(ENABLE_WIFI) && (ENABLE_WIFI == 1)
netconn_wifi_info_t wifi_info = {0};
// connect wifi
strcpy(wifi_info.ssid, DEFAULT_WIFI_SSID);
strcpy(wifi_info.pswd, DEFAULT_WIFI_PSWD);
netmgr_conn_set(NETCONN_WIFI, NETCONN_CMD_SSID_PSWD, &wifi_info);
#endif
while (1) {
if (netmgr_status == NETMGR_LINK_UP) {
mqtt_client_example();
break;
} else {
tal_system_sleep(50);
}
}
return;
}
/**
* @brief main
*
* @param argc
* @param argv
* @return void
*/
#if OPERATING_SYSTEM == SYSTEM_LINUX
void main(int argc, char *argv[])
{
user_main();
while (1) {
tal_system_sleep(500);
}
}
#else
/* Tuya thread handle */
static THREAD_HANDLE ty_app_thread = NULL;
/**
* @brief task thread
*
* @param[in] arg:Parameters when creating a task
* @return none
*/
static void tuya_app_thread(void *arg)
{
user_main();
while (1) {
tal_system_sleep(1000); // 每秒睡眠 1s,降低 CPU 占用
}
// tal_thread_delete(ty_app_thread);
// ty_app_thread = NULL;
}
void tuya_app_main(void)
{
THREAD_CFG_T thrd_param = {4096, 4, "tuya_app_main"};
tal_thread_create_and_start(&ty_app_thread, NULL, NULL, tuya_app_thread, NULL, &thrd_param);
}
#endif然后将程序烧录到单片机中,并且使用tos.py monitor的方式监控日志。
可以看到成功的订阅了主题 tuya/tos-test。 然后我们在CherryStudio中告诉GPT,让他帮我们打开LED灯。

LED已经被正确的打开。然后我们让他关闭LED

总结
从上述的效果看来搭建MCP的服务还是比较轻松的,这样如果本地要是有ASR或者TTS服务的话可以非常方便的将家中的智能设备都通过MCP进行集成,从而使用大模型进行控制。目前的效果上看来,不知道是因为FreeRTOS的原因还是因为免费的MQTT服务的原因。T5开发板在订阅几次消息之后便会出现无响应的情况。暂时没有发现时什么原因。这两天抽空再用ESP32来试一下。
我要赚赏金
