这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » 嵌入式开发 » STM32 » STM32H7S78-DKMQTT远程数据交互

共6条 1/1 1 跳转至

STM32H7S78-DKMQTT远程数据交互

助工
2024-08-15 22:35:00   被打赏 50 分(兑奖)     打赏

STM32H7S78-DK基于TCP-CLIENT的冷链管理-电子产品世界论坛 (eepw.com.cn)

在这篇中,我们实现了tcpclient,Mqtt是一种轻量的网络协议,可以实现一对多的数据发布与多对一个数据交互。因此在物联网领域,这是一款比较常用的工具。

【实现步骤】

1、向工程中添加bsp_mqtt.c/h。

image.png

2、编写代码如下:

/*
 * bsp_mqtt.c
 *
 *  Created on: Mar 19, 2024
 *      Author: liujianhua
 */



/*-----------------------------------------------------------
 * Includes files
 *----------------------------------------------------------*/

/* lib includes. */
#include <string.h>
#include <stdio.h>
/* segger rtt includes. */
#include "main.h"
#include "bsp_mqtt.h"

/* FreeRTOS includes. */
#include "FreeRTOS.h"
#include "semphr.h"
#include "cJSON.h"
/* lwip includes. */
#include "lwip/apps/mqtt.h"
#include "lwip/ip4_addr.h"
#include "cmsis_os2.h"
#include "vacc.h"

float heartValue;
uint8_t heartSetValue;
uint8_t myheartstate = 0 ;
//extern uint8_t mqtt_recv_state;
static err_t bsp_mqtt_connect(void);

extern VaccDevice *pmyVacc;

#define USE_MQTT_MUTEX //使用发送数据的互斥锁,多个任务有发送才必须
#ifdef USE_MQTT_MUTEX
static SemaphoreHandle_t s__mqtt_publish_mutex = NULL;
#endif /* USE_MQTT_MUTEX */

static mqtt_client_t *s__mqtt_client_instance = NULL; //mqtt连接句柄,这里一定要设置全局变量,防止 lwip 底层重复申请空间

//MQTT 数据结构体
struct mqtt_recv_buffer
{
    char recv_buffer[1024];  //储存接收的buffer
    uint16_t recv_len;         //记录已接收多少个字节的数据,MQTT的数据分包来的
    uint16_t recv_total;       //MQTT接收数据的回调函数会有个总的大小
};

//结构体初始化
struct mqtt_recv_buffer s__mqtt_recv_buffer_g = {
    .recv_len = 0,
    .recv_total = 0,
};



static err_t bsp_mqtt_subscribe(mqtt_client_t* mqtt_client, char * sub_topic, uint8_t qos);


/* ===========================================
                 接收回调函数
============================================== */

/*!
* @brief mqtt 接收数据处理函数接口,需要在应用层进行处理
*        执行条件:mqtt连接成功
*
* @param [in1] : 用户提供的回调参数指针
* @param [in2] : 接收的数据指针
* @param [in3] : 接收数据长度
* @retval: 处理的结果
*/
__weak int mqtt_rec_data_process(void* arg, char *rec_buf, uint64_t buf_len)
{
	cJSON *json;
    printf("recv_buffer = %s\n", rec_buf);
    json = cJSON_Parse(rec_buf);
		char *ptr;
		double ret;
    if(json == NULL)
    {
    	printf("json fmt error:%s\r\n.", cJSON_GetErrorPtr());
    }
    else
    {
    	cJSON *Temper = cJSON_GetObjectItem(json, "Tmp_up");
    	if (!Temper) {
    	    printf("no Temperature!\n");

    	} else{
    		ret = strtod(Temper->valuestring,NULL);
				printf("value:%.1f\r\n",ret);
				if(NULL != pmyVacc)
				{
					pmyVacc->up_tmp = ret;
				}

    	}
			Temper = cJSON_GetObjectItem(json, "Tmp_down");
    	if (!Temper) {
    	    printf("no Temperature!\n");

    	} else{
    		ret = strtod(Temper->valuestring,NULL);
				printf("value:%.1f\r\n",ret);
				if(NULL != pmyVacc)
				{
					pmyVacc->down_tmp = ret;
				}

    	}
			Temper = cJSON_GetObjectItem(json, "Hum_down");
    	if (!Temper) {
    	    printf("no Temperature!\n");

    	} else{
    		ret = strtod(Temper->valuestring,NULL);
				printf("value:%.1f\r\n",ret);
				if(NULL != pmyVacc)
				{
					pmyVacc->down_hum = ret;
				}

    	}
			Temper = cJSON_GetObjectItem(json, "Hum_up");
    	if (!Temper) {
    	    printf("no Temperature!\n");

    	} else{
    		ret = strtod(Temper->valuestring,NULL);
				printf("value:%.1f\r\n",ret);
				if(NULL != pmyVacc)
				{
					pmyVacc->up_hum = ret;
				}

    	}
    	cJSON_Delete(json);
    }
    return 0;
}


/*!
* @brief MQTT 接收到数据的回调函数
*        执行条件:MQTT 连接成功
*
* @param [in1] : 用户提供的回调参数指针
* @param [in2] : MQTT 收到的分包数据指针
* @param [in3] : MQTT 分包数据长度
* @param [in4] : MQTT 数据包的标志位
* @retval: None
*/
static void bsp_mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
{
    if( (data == NULL) || (len == 0) )
    {
        printf("mqtt_client_incoming_data_cb: condition error @entry\n");
        return;
    }

    if(s__mqtt_recv_buffer_g.recv_len + len < sizeof(s__mqtt_recv_buffer_g.recv_buffer))
    {
        //
        snprintf(&s__mqtt_recv_buffer_g.recv_buffer[s__mqtt_recv_buffer_g.recv_len], len+1, "%s", data);
        s__mqtt_recv_buffer_g.recv_len += len;
    }

    if ( (flags & MQTT_DATA_FLAG_LAST) == MQTT_DATA_FLAG_LAST )
    {
        //处理数据
        mqtt_rec_data_process(arg , s__mqtt_recv_buffer_g.recv_buffer, s__mqtt_recv_buffer_g.recv_len);

        //已接收字节计数归0
        s__mqtt_recv_buffer_g.recv_len = 0;

        //清空接收buffer
        memset(s__mqtt_recv_buffer_g.recv_buffer, 0, sizeof(s__mqtt_recv_buffer_g.recv_buffer));
    }


    printf("mqtt_client_incoming_data_cb:reveiving incomming data.\n");
}


/*!
* @brief MQTT 接收到数据的回调函数
*        执行条件:MQTT 连接成功
*
* @param [in] : 用户提供的回调参数指针
* @param [in] : MQTT 收到数据的topic
* @param [in] : MQTT 收到数据的总长度
* @retval: None
*/
static void bsp_mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
{
    if( (topic == NULL) || (tot_len == 0) )
    {
        printf("bsp_mqtt_incoming_publish_cb: condition error @entry\n");
        return;
    }

	printf("bsp_mqtt_incoming_publish_cb: topic = %s.\n",topic);
	printf("bsp_mqtt_incoming_publish_cb: tot_len = %d.\n",tot_len);
	s__mqtt_recv_buffer_g.recv_total = tot_len;    //需要接收的总字节
	s__mqtt_recv_buffer_g.recv_len = 0;            //已接收字节计数归0

    //清空接收buffer
    memset(s__mqtt_recv_buffer_g.recv_buffer, 0, sizeof(s__mqtt_recv_buffer_g.recv_buffer));
}


/* ===========================================
                 连接状态回调函数
============================================== */

/*!
* @brief MQTT 连接成功的处理函数,需要的话在应用层定义
*
* @param [in1] : MQTT 连接句柄
* @param [in2] : MQTT 连接参数指针
*
* @retval: None
*/
__weak void mqtt_conn_suc_proc(mqtt_client_t *client, void *arg)
{
    char test_sub_topic[] = "attributes/response";
    bsp_mqtt_subscribe(client,test_sub_topic,0);
}

/*!
* @brief MQTT 处理失败调用的函数
*
* @param [in1] : MQTT 连接句柄
* @param [in2] : MQTT 连接参数指针
*
* @retval: None
*/
__weak void mqtt_error_process_callback(mqtt_client_t * client, void *arg)
{

}

/*!
* @brief MQTT 连接状态的回调函数
*
* @param [in] : MQTT 连接句柄
* @param [in] : 用户提供的回调参数指针
* @param [in] : MQTT 连接状态
* @retval: None
*/
static void bsp_mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
{
    if( client == NULL )
    {
        printf("bsp_mqtt_connection_cb: condition error @entry\n");
        return;
    }

    if ( status == MQTT_CONNECT_ACCEPTED ) //Successfully connected
    {
		printf("bsp_mqtt_connection_cb: Successfully connected\n");

        // 注册接收数据的回调函数
		mqtt_set_inpub_callback(client, bsp_mqtt_incoming_publish_cb, bsp_mqtt_incoming_data_cb, arg);

        //成功处理函数
		mqtt_conn_suc_proc(client, arg);
    }
	else
	{
		printf("bsp_mqtt_connection_cb: Fail connected, status = %s\n", lwip_strerr(status) );
        //错误处理
		mqtt_error_process_callback(client, arg);
	}
}


/*!
* @brief 连接到 mqtt 服务器
*        执行条件:无
*
* @param [in] : None
*
* @retval: 连接状态,如果返回不是 ERR_OK 则需要重新连接
*/
static err_t bsp_mqtt_connect(void)
{
    printf("bsp_mqtt_connect: Enter!\n");
	err_t ret;

    struct mqtt_connect_client_info_t  mqtt_connect_info = {
		"Stm32H7S78_MQTT_Test",  /* 这里需要修改,以免在同一个服务器两个相同ID会发生冲突 */
		"NULL",   /* MQTT 服务器用户名 */
		"NULL",   /* MQTT 服务器密码 */
		60,     /* 与 MQTT 服务器保持连接时间,时间超过未发送数据会断开 */
		"attributes/response",/* MQTT遗嘱的消息发送topic */
		"Offline_pls_check", /* MQTT遗嘱的消息,断开服务器的时候会发送 */
		0,  /* MQTT遗嘱的消息 Qos */
		0   /* MQTT遗嘱的消息 Retain */
	};

    ip_addr_t server_ip;
    ip4_addr_set_u32(&server_ip, ipaddr_addr("192.168.3.180"));  //MQTT服务器IP

    uint16_t server_port = 1883;  //注意这里是 MQTT 的 TCP 连接方式的端口号!!!!

    if (s__mqtt_client_instance == NULL)
    {
        // 句柄==NULL 才申请空间,否则无需重复申请
	    s__mqtt_client_instance = mqtt_client_new();
    }

	if (s__mqtt_client_instance == NULL)
	{
        //防止申请失败
		printf("bsp_mqtt_connect: s__mqtt_client_instance malloc fail @@!!!\n");
		return ERR_MEM;
	}

    //进行连接,注意:如果需要带入 arg ,arg必须是全局变量,局部变量指针会被回收,大坑!!!!!
    ret = mqtt_client_connect(s__mqtt_client_instance, &server_ip, server_port, bsp_mqtt_connection_cb, NULL, &mqtt_connect_info);

    /******************
    小提示:连接错误不需要做任何操作,mqtt_client_connect 中注册的回调函数里面做判断并进行对应的操作
    *****************/

    printf("bsp_mqtt_connect: connect to mqtt %s\n", lwip_strerr(ret));

	return ret;
}




/* ===========================================
                 发送接口、回调函数
============================================== */

/*!
* @brief MQTT 发送数据的回调函数
*        执行条件:MQTT 连接成功
*
* @param [in] : 用户提供的回调参数指针
* @param [in] : MQTT 发送的结果:成功或者可能的错误
* @retval: None
*/
static void mqtt_client_pub_request_cb(void *arg, err_t result)
{

    mqtt_client_t *client = (mqtt_client_t *)arg;
    if (result != ERR_OK)
    {
        printf("mqtt_client_pub_request_cb: c002: Publish FAIL, result = %s\n", lwip_strerr(result));

		//错误处理
		mqtt_error_process_callback(client, arg);
    }
	else
	{
        printf("mqtt_client_pub_request_cb: c005: Publish complete!\n");
	}
}



/*!
* @brief 发送消息到服务器
*        执行条件:无
*
* @param [in1] : mqtt 连接句柄
* @param [in2] : mqtt 发送 topic 指针
* @param [in3] : 发送数据包指针
* @param [in4] : 数据包长度
* @param [in5] : qos
* @param [in6] : retain
* @retval: 发送状态
* @note: 有可能发送不成功但是现实返回值是 0 ,需要判断回调函数 mqtt_client_pub_request_cb 是否 result == ERR_OK
*/
err_t bsp_mqtt_publish(mqtt_client_t *client, char *pub_topic, char *pub_buf, uint16_t data_len, uint8_t qos, uint8_t retain)
{
	if ( (client == NULL) || (pub_topic == NULL) || (pub_buf == NULL) || (data_len == 0) || (qos > 2) || (retain > 1) )
	{
		printf("bsp_mqtt_publish: input error@@" );
		return ERR_VAL;
	}

    //判断是否连接状态
    if(mqtt_client_is_connected(client) != pdTRUE)
    {
				printf("bsp_mqtt_publish: client is not connected\n");
        return ERR_CONN;
    }

	err_t err;
#ifdef USE_MQTT_MUTEX

    // 创建 mqtt 发送互斥锁
    if (s__mqtt_publish_mutex == NULL)
    {
				printf("bsp_mqtt_publish: create mqtt mutex ! \n" );
        s__mqtt_publish_mutex = xSemaphoreCreateMutex();
    }

    if (xSemaphoreTake(s__mqtt_publish_mutex, portMAX_DELAY) == pdPASS)
#endif /* USE_MQTT_MUTEX */

    {
	    err = mqtt_publish(client, pub_topic, pub_buf, data_len, qos, retain, mqtt_client_pub_request_cb, (void*)client);
	    printf("bsp_mqtt_publish: mqtt_publish err = %s\n", lwip_strerr(err) );

#ifdef USE_MQTT_MUTEX
        printf("bsp_mqtt_publish: mqtt_publish xSemaphoreTake\n");
        xSemaphoreGive(s__mqtt_publish_mutex);
#endif /* USE_MQTT_MUTEX */

    }
	return err;
}

/* ===========================================
                 MQTT 订阅接口函数
============================================== */


/*!
* @brief MQTT 订阅的回调函数
*        执行条件:MQTT 连接成功
*
* @param [in] : 用户提供的回调参数指针
* @param [in] : MQTT 订阅结果
* @retval: None
*/
static void bsp_mqtt_request_cb(void *arg, err_t err)
{
    if ( arg == NULL )
    {
        printf("bsp_mqtt_request_cb: input error@@\n");
        return;
    }

    mqtt_client_t *client = (mqtt_client_t *)arg;

    if ( err != ERR_OK )
    {
        printf("bsp_mqtt_request_cb: FAIL sub, sub again, err = %s\n", lwip_strerr(err));

		//错误处理
		mqtt_error_process_callback(client, arg);
    }
	else
	{
		printf("bsp_mqtt_request_cb: sub SUCCESS!\n");
	}
}

/*!
* @brief mqtt 订阅
*        执行条件:连接成功
*
* @param [in1] : mqtt 连接句柄
* @param [in2] : mqtt 发送 topic 指针
* @param [in5] : qos
* @retval: 订阅状态
*/
static err_t bsp_mqtt_subscribe(mqtt_client_t* mqtt_client, char * sub_topic, uint8_t qos)
{
    printf("bsp_mqtt_subscribe: Enter\n");

	if( ( mqtt_client == NULL) || ( sub_topic == NULL) || ( qos > 2 ) )
	{
        printf("bsp_mqtt_subscribe: input error@@\n");
		return ERR_VAL;
	}

	if ( mqtt_client_is_connected(mqtt_client) != pdTRUE )
	{
		printf("bsp_mqtt_subscribe: mqtt is not connected, return ERR_CLSD.\n");
		return ERR_CLSD;
	}

	err_t err;
	err = mqtt_subscribe(mqtt_client, sub_topic, qos, bsp_mqtt_request_cb, (void *)mqtt_client);  // subscribe and call back.

	if (err != ERR_OK)
	{
		printf("bsp_mqtt_subscribe: mqtt_subscribe Fail, return:%s \n", lwip_strerr(err));
	}
	else
	{
		printf("bsp_mqtt_subscribe: mqtt_subscribe SUCCESS, reason: %s\n", lwip_strerr(err));
	}

	return err;
}


/* ===========================================
                 初始化接口函数
============================================== */

/*!
* @brief 封装 MQTT 初始化接口
*        执行条件:无
*
* @retval: 无
*/
void bsp_mqtt_init(void)
{
    printf("Mqtt init...");

    // 连接服务器
    bsp_mqtt_connect();

    // 发送消息到服务器
    char message_test[] = "Hello mqtt server";
//    for(int i = 0; i < 10; i++)
//    {
//        bsp_mqtt_publish(s__mqtt_client_instance,"/lugl/heart",message_test,sizeof(message_test),1,0);
//        vTaskDelay(1000);
//    }

}

void user_pub_set(int setValue )
{
	char sen_buff[4] = {0};
	sprintf(sen_buff,"%d",setValue);
	bsp_mqtt_publish(s__mqtt_client_instance,"attributes",sen_buff,2,1,0);
	osDelay(100);
}

void user_pub_onoff(int setValue )
{
	if(setValue == 1)
	{
		bsp_mqtt_publish(s__mqtt_client_instance,"/lugl/heartonoff","on",2,1,0);

	}
	else
	{
		bsp_mqtt_publish(s__mqtt_client_instance,"/lugl/heartonoff","off",3,1,0);

	}
}

err_t user_pub_tmp_hum(float Tmp,float Hum)
{
	char buff[48];
	err_t err;
	sprintf(buff,"{\"temperature\":%.1f, \"humidity\":%.1f}",Tmp,Hum);
	err = bsp_mqtt_publish(s__mqtt_client_instance,"attributes",buff,strlen(buff),1,0);
	return err;
}

代码已经添加了详细的注释,不再过多解析。这里只做数据接收回调的分析。

image.png

在订阅指定的主题后,进行Json解析,如果符合,则更新到冷链设备数据中。以便进行数据分析。

使用,在工程中创建一个任务,添加mqtt的初始化后,就可以实现mqtt的发送与接收了。

void vaccTask(void *argument)
{
	pmyVacc = vaccGetDevice();
	err_t err;
	if(NULL == pmyVacc)
	{
		printf ("error Three is vacc no vaccdevice");
		return;
	}
	pmyVacc->sensor_Init(pmyVacc);
	osDelay(5000);
	bsp_mqtt_init();
   for(;;)
   {
			if(pmyVacc->Get_TemperatureHumidity(pmyVacc))
			{
				printf("get sht30 error\r\n");
			}	
      osDelay(2000);   /* 延时500个tick */
			err = user_pub_tmp_hum(pmyVacc->Temperature,pmyVacc->Humidity);
			if(err == ERR_CONN)
			{
				osDelay(2000);
				bsp_mqtt_init();
			}
			pmyVacc->vacc_check_state(pmyVacc);
			printf("vacc state:%d\r\n",pmyVacc->warn_sta);
   }

}

如果在任务中出现发布数据,回复连接出错时,再次调用mqtt_ini就可以实现重连。

【实验环境】

1、使用emqt创建一个mqtt服务器,打开一个MQTT调试工具:

image.png

发送指定的主题,sth32H7s接收到相关数据后,可以实时更新参数的配置:

image.png

【总结】

在lwip移植成功后,可以快速的实现mqtt_client的创建,快速实现物联网的数据交互。




关键词: STM32H7S     STM32CubeMAX     LwI    

院士
2024-08-16 10:15:24     打赏
2楼

楼主要是能大致讲讲MQTT协议,MQTT的数据结构就好了


工程师
2024-08-16 15:18:27     打赏
3楼

666666


专家
2024-08-17 00:03:15     打赏
4楼

感谢感谢,


高工
2024-08-17 08:35:01     打赏
5楼

谢谢分享


专家
2024-08-22 07:58:53     打赏
6楼

谢谢分享


共6条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]