简介
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,专为低带宽、不稳定或高延迟的网络环境设计。它采用发布/订阅模式,通过主题(Topic)传递消息,支持三种服务质量(QoS)级别,确保消息的可靠传输。MQTT协议具有高效、低带宽需求和持久会话等特点,广泛应用于物联网、智能家居、工业自动化和远程监控等领域,是设备间通信的理想选择。
在上一个章节中,我们已经使用了开源的mosquitto成功的在香橙派Zero3上部署了MQTT服务,而在另一篇文章中,我们也成功的点亮了OLED0.96寸. 在本篇文章中, 我们将学习如何使用ESP-IDF对MQTT消息进行订阅和发布.
上图为使用0.96寸屏幕的驱动效果
那么在本文章中, 我们将使用上述的开发板来进行消息的发送和订阅, 具体的步骤为:
1- 首先创建一个任务, 该任务的主要作用是向MQTT的某个主题发送数据
2- 订阅上述的主题, 并且在回掉函数中处理订阅的数据
3- 使用OLED显示屏显示数据
首先要使用MQTT,我们需要引入MQTT相关的头文件定义
#include "mqtt_client.h"
然后在函数中初始化MQTT的连接参数
esp_mqtt_client_handle_t client; void mqtt_init() { const esp_mqtt_client_config_t mqtt_cfg = { .broker.address.hostname = "服务器地址", .broker.address.port = 服务器端口, .broker.address.transport = MQTT_TRANSPORT_OVER_TCP, .credentials.client_id = "设备标识符", .credentials.username = "用户名", .credentials.authentication.password = "密码"}; client = esp_mqtt_client_init(&mqtt_cfg); // 修改这一行 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); esp_mqtt_client_start(client); }
需要注意的是上述的MQTT连接方式是基于TCP进行连接的, 如果对应的MQTT服务器设置了认证信息即需要使用账号和密码信息来进行连接的话, 那么需要在这里指定用户名. 下面是一个示例的配置
void mqtt_init() { const esp_mqtt_client_config_t mqtt_cfg = { .broker.address.hostname = "192.168.1.113", .broker.address.port = 1883, .broker.address.transport = MQTT_TRANSPORT_OVER_TCP, .credentials.client_id = "esp32c6", .credentials.username = "yiwen", .credentials.authentication.password = "123456"}; client = esp_mqtt_client_init(&mqtt_cfg); // 修改这一行 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); esp_mqtt_client_start(client); }
需要注意的esp_mqtt_client_register_event 方法需要传递一个函数句柄, mqtt_event_handler, 这个函数句柄主要是为处理一些MQTT的各项事件. 官方提供的有以下事件.
- MQTT_EVENT_CONNECTED (MQTT连接事件)
- MQTT_EVENT_DISCONNECTED(MQTT断开连接事件)
- MQTT_EVENT_SUBSCRIBED(MQTT订阅事件)
- MQTT_EVENT_UNSUBSCRIBED(MQTT取消订阅事件)
- MQTT_EVENT_PUBLISHED(MQTT发布消息事件)
- MQTT_EVENT_DATA(MQTT接收消息事件)
- MQTT_EVENT_ERROR(MQTT错误消息事件)
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id); esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; int msg_id; switch ((esp_mqtt_event_id_t)event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); // 打印接收到的主题和数据 printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); printf("DATA=%.*s\r\n", event->data_len, event->data); break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err); log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err); log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno); ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno)); } break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } }
示例事件回调函数
完整代码如下所示
#include "ota.h" #include "freertos/task.h" #include "esp_log.h" #include "ssd1306.h" #include "font8x8_basic.h" #include <stdio.h> #include <stdint.h> #include <stddef.h> #include <string.h> #include "esp_system.h" #include "nvs_flash.h" #include "esp_event.h" #include "esp_netif.h" #include "mqtt_client.h" #include "esp_wifi.h" #include "protocol_examples_common.h" #include "esp_partition.h" #include "esp_flash_partitions.h" #include "esp_ota_ops.h" #define tag "MAIN" SSD1306_t dev; static const char *TAG = "main"; i2c_port_t i2c_num = I2C_NUM_0; esp_mqtt_client_handle_t client; static void log_error_if_nonzero(const char *message, int error_code) { if (error_code != 0) { ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code); } } void init_network() { ESP_ERROR_CHECK(esp_netif_init()); ESP_ERROR_CHECK(esp_event_loop_create_default()); ESP_ERROR_CHECK(example_connect()); #if CONFIG_IF_CONNECT_TO_WIFI esp_wifi_set_ps(WIFI_PS_NONE); #endif // CONFIG_IF_CONNECT_TO_WIFI } void init_nvs() { esp_err_t err = nvs_flash_init(); if (err == ESP_ERR_NVS_NO_FREE_PAGES || err == ESP_ERR_NVS_NEW_VERSION_FOUND) { ESP_ERROR_CHECK(nvs_flash_erase()); err = nvs_flash_init(); } ESP_ERROR_CHECK(err); } void init_ssd1306() { i2c_master_init(&dev, CONFIG_SDA_GPIO, CONFIG_SCL_GPIO, CONFIG_RESET_GPIO); i2c_num = dev._i2c_num; ESP_LOGI(tag, "Panel is 128x32"); ssd1306_init(&dev, 128, 32); } static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id); esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; int msg_id; switch ((esp_mqtt_event_id_t)event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: break; case MQTT_EVENT_ERROR: break; default: } } void mqtt_init() { const esp_mqtt_client_config_t mqtt_cfg = { .broker.address.hostname = "192.168.1.113", .broker.address.port = 1883, .broker.address.transport = MQTT_TRANSPORT_OVER_TCP, .credentials.client_id = "esp32c6", .credentials.username = "yiwen", .credentials.authentication.password = "123456"}; client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); esp_mqtt_client_start(client); } void init_application() { init_nvs(); #if CONFIG_IF_CONNECT_TO_WIFI init_network(); #endif #if CONFIG_IF_CONNECT_TO_MQTT mqtt_init(); #endif /* 初始化SSD1306 */ init_ssd1306(); } void app_main(void) { init_application(); }
那么此时我们便可以在控制台查看到已经成功的获取到了IP和连接上了MQTT
接下来我们来创建一个订阅, 希望在MQTT连接到MQTT服务器的时候便订阅这个主题.
static const char *TOPIC = "eepw"; // 定义主题名称
然后我们需要在回掉函数里来订阅它, 修改回掉函数如下所示.
case MQTT_EVENT_CONNECTED: esp_mqtt_client_subscribe(client, TOPIC, 0); ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); break;
然后我们创建另一个任务来向这个Topic里发送消息
void send_task(void *pvParameters) { uint64_t i = 0; char message[50]; while (1) { snprintf(message, sizeof(message), "%llu", i); esp_mqtt_client_publish(client, TOPIC, message, 0, 1, 0); i++; vTaskDelay(pdMS_TO_TICKS(1000)); // 每隔一秒发送消息 } }
并且在app_main中启动这个任务
xTaskCreate(&send_task, "send_task", 8192, NULL, 5, NULL);
MQTTX接收消息示例
之后我们在回掉函数中更新数据订阅后接收部分,并且把消息显示到屏幕上
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id); esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; switch ((esp_mqtt_event_id_t)event_id) { case MQTT_EVENT_CONNECTED: esp_mqtt_client_subscribe(client, TOPIC, 0); ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); ESP_LOGI(TAG, "TOPIC=%.*s\r\n", event->topic_len, event->topic); ESP_LOGI(TAG, "DATA=%.*s\r\n", event->data_len, event->data); // 更新屏幕上的数据 ssd1306_clear_screen(&dev, false); //接收订阅的消息 ssd1306_display_text(&dev, 0, event->data, event->data_len, false); break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } }
完整效果如下
此时屏幕的效果为, 每秒逐次累加
总结
通过上述文章我们已经学会如何订阅和发布MQTT消息, 相信通过这一篇教程, 你一定能有所获能将MQTT应用起来, 相对而言如果使用Arduino环境上的MQTT会更加简单. 当然现在有些AT固件也是集成了MQTT的功能, 用户可以通过串口和对应的模组来连接使其一个不联网的单片机来实现消息的订阅和收发.
附件