OpenVINOTM,给你看得见的未来!>>
电子产品世界 » 论坛首页 » 嵌入式开发 » STM32 » STM32G070RB探测38-MQTT协议剖析4SUBSCRIBE

共10条 1/1 1 跳转至

STM32G070RB探测38-MQTT协议剖析4SUBSCRIBE

高工
2021-05-30 01:40:13    评分

前几篇写完了上传,这篇写一下接收(当然是点灯了),有来有回才对嘛。mqtt的接收一般是通过订阅topic,然后接收topic内的消息,需要提前订阅一个topic才能收到,继续看协议

跟以上基本都是大差不差的,就不详细分析了

直接贴程序吧

//==========================================================
//	函数名称:	MQTT_PacketSubscribe
//
//	函数功能:	Subscribe消息组包
//
//	入口参数:	pkt_id:pkt_id
//				qos:消息重发次数
//				topics:订阅的消息
//				topics_cnt:订阅的消息个数
//				mqttPacket:包指针
//
//	返回参数:	0-成功		其他-失败
//
//	说明:
//==========================================================
uint8 MQTT_PacketSubscribe(uint16 pkt_id, enum MqttQosLevel qos, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
{
	
	uint32 topic_len = 0, remain_len = 0;
	int16 len = 0;
	uint8 i = 0;
	
	if(pkt_id == 0)
		return 1;
	
	//计算topic长度-------------------------------------------------------------------------
	for(; i < topics_cnt; i++)
	{
		if(topics[i] == NULL)
			return 2;
		
		topic_len += strlen(topics[i]);
	}
	
	//2 bytes packet id + topic filter(2 bytes topic + topic length + 1 byte reserve)------
	remain_len = 2 + 3 * topics_cnt + topic_len;
	
	//分配内存------------------------------------------------------------------------------
	MQTT_NewBuffer(mqttPacket, remain_len + 5);
	if(mqttPacket->_data == NULL)
		return 3;
	
/*************************************固定头部***********************************************/
	
	//固定头部----------------------头部消息-------------------------------------------------
	mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_SUBSCRIBE << 4 | 0x02;
	
	//固定头部----------------------剩余长度值-----------------------------------------------
	len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);
	if(len < 0)
	{
		MQTT_DeleteBuffer(mqttPacket);
		return 4;
	}
	else
		mqttPacket->_len += len;
	
/*************************************payload***********************************************/
	
	//payload----------------------pkt_id---------------------------------------------------
	mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
	mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
	
	//payload----------------------topic_name-----------------------------------------------
	for(i = 0; i < topics_cnt; i++)
	{
		topic_len = strlen(topics[i]);
		mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
		mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
		
		strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);
		mqttPacket->_len += topic_len;
		
		mqttPacket->_data[mqttPacket->_len++] = qos & 0xFF;
	}

	return 0;

}
//==========================================================
//	函数名称:	OneNet_Subscribe
//
//	函数功能:	订阅
//
//	入口参数:	topics:订阅的topic
//				topic_cnt:topic个数
//
//	返回参数:	SEND_TYPE_OK-成功	SEND_TYPE_SUBSCRIBE-需要重发
//
//	说明:
//==========================================================
void OneNet_Subscribe(const char *topics[], unsigned char topic_cnt)
{

	unsigned char i = 0;

	MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};							//协议包

	for(; i < topic_cnt; i++)
		UsartPrintf("Subscribe Topic: %s\r\n", topics[i]);

	if(MQTT_PacketSubscribe(MQTT_SUBSCRIBE_ID, MQTT_QOS_LEVEL0, topics, topic_cnt, &mqttPacket) == 0)
	{
		ESP8266_SendData(mqttPacket._data, mqttPacket._len);					//向平台发送订阅请求

		MQTT_DeleteBuffer(&mqttPacket);											//删包
	}

}

需要在连接平台之后就订阅这个topic,但是,是哪个topic呢?不知道就看文档,看下onenet的文档

const char *devSubTopic[] = {"$sys/407529/G0701/cmd/#"};

在CONNECT之后订阅,然后根据文档,下发指令

OneNet_Subscribe(devSubTopic, 1);

照抄官方的写一个接收加判断函数,放在主程序中

OneNet_DevLink();
HAL_Delay(500);
Clear_Usart(&usart1_rx);
OneNet_Subscribe(devSubTopic, 1);
Clear_Usart(&usart1_rx);
time6_count = 110;
/* USER CODE END 2 */

/* Infinite loop */
/* USER CODE BEGIN WHILE */
while (1)
{
  if(time6_count1>10)
  {
  	time6_count1 = 0;
  	AHT10ReadData(&pv.tem,&pv.hum);
		BH1750_Read_Dat(pv.lux);//读取数据
		pv.lux_16 = BH1750_Dat_To_Lux(pv.lux);//转换数据
		printf("lux:%d\r\n",pv.lux_16);
		printf("tem:%.1f\r\nhum:%d%%\r\n",pv.tem,pv.hum);
  	BH1750_Send_Cmd(ONCE_H_MODE);//单次模式
		sprintf(buf,"tem:%.1f",pv.tem);
		OLED_P8x16Str(0u,4u,(uint8_t *)buf);
		sprintf(buf,"hum:%d%%",pv.hum);
		OLED_P8x16Str(64u,4u,(uint8_t *)buf);
		sprintf(buf,"lux:%d     ",pv.lux_16);
		OLED_P8x16Str(0u,6u,(uint8_t *)buf);
  }
  if(time6_count>120)
  {
  	time6_count = 0;
  	memset(PUB_BUF, 0, sizeof(PUB_BUF));
  	OneNet_FillBuf(PUB_BUF);
  	printf("PUB_BUF:%s\r\n",PUB_BUF);
  	OneNet_Publish(devPubTopic, PUB_BUF);
  }
  one_ipd_rx = ESP8266_GetIPD();
  if(one_ipd_rx.len > 0 )
  {
  	OneNet_RevPro(one_ipd_rx.IPD_buff);
  }

/* USER CODE END WHILE */

/* USER CODE BEGIN 3 */
}

void OneNet_RevPro(unsigned char *cmd)
{

	MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};

	char *req_payload = NULL;
	char *cmdid_topic = NULL;

	unsigned short topic_len = 0;
	unsigned short req_len = 0;

	unsigned char type = 0;
	unsigned char qos = 0;
	static unsigned short pkt_id = 0;

	short result = 0;

	//char *dataPtr = NULL;

	type = MQTT_UnPacketRecv(cmd);
	switch(type)
	{
		case MQTT_PKT_CMD://命令下发

			result = MQTT_UnPacketCmd(cmd, &cmdid_topic, &req_payload, &req_len);
			if(result == 0)
			{
				UsartPrintf("cmdid: %s, req: %s, req_len: %d\r\n", cmdid_topic, req_payload, req_len);

				MQTT_DeleteBuffer(&mqttPacket);
			}
		break;

		case MQTT_PKT_PUBLISH://接收的Publish消息

			result = MQTT_UnPacketPublish(cmd, &cmdid_topic, &topic_len, &req_payload, &req_len, &qos, &pkt_id);
			if(result == 0)
			{
				UsartPrintf("topic: %s, topic_len: %d, payload: %s, payload_len: %d\r\n",
																	cmdid_topic, topic_len, req_payload, req_len);
				if(strstr(req_payload, "led_close") != NULL)
				{
					HAL_GPIO_WritePin(LED_GREEN_GPIO_Port,LED_GREEN_Pin,GPIO_PIN_RESET);
					UsartPrintf("LED_OFF\r\n");
				}
				else if(strstr(req_payload, "led_open") != NULL)
				{
					HAL_GPIO_WritePin(LED_GREEN_GPIO_Port,LED_GREEN_Pin,GPIO_PIN_SET);
					UsartPrintf("LED_ON\r\n");
				}
			}
		break;

		case MQTT_PKT_PUBACK:	//发送Publish消息,平台回复的Ack

			if(MQTT_UnPacketPublishAck(cmd) == 0)
				UsartPrintf("Tips:	MQTT Publish Send OK\r\n");

		break;

		case MQTT_PKT_PUBREC://发送Publish消息,平台回复的Rec,设备需回复Rel消息

			if(MQTT_UnPacketPublishRec(cmd) == 0)
			{
				UsartPrintf("Tips:	Rev PublishRec\r\n");
				if(MQTT_PacketPublishRel(MQTT_PUBLISH_ID, &mqttPacket) == 0)
				{
					UsartPrintf("Tips:	Send PublishRel\r\n");
					ESP8266_SendData(mqttPacket._data, mqttPacket._len);
					MQTT_DeleteBuffer(&mqttPacket);
				}
			}

		break;

		case MQTT_PKT_PUBREL://收到Publish消息,设备回复Rec后,平台回复的Rel,设备需再回复Comp

			if(MQTT_UnPacketPublishRel(cmd, pkt_id) == 0)
			{
				UsartPrintf( "Tips:	Rev PublishRel\r\n");
				if(MQTT_PacketPublishComp(MQTT_PUBLISH_ID, &mqttPacket) == 0)
				{
					UsartPrintf("Tips:	Send PublishComp\r\n");
					ESP8266_SendData(mqttPacket._data, mqttPacket._len);
					MQTT_DeleteBuffer(&mqttPacket);
				}
			}

		break;

		case MQTT_PKT_PUBCOMP://发送Publish消息,平台返回Rec,设备回复Rel,平台再返回的Comp

			if(MQTT_UnPacketPublishComp(cmd) == 0)
			{
				UsartPrintf("Tips:	Rev PublishComp\r\n");
			}

		break;

		case MQTT_PKT_SUBACK://发送Subscribe消息的Ack

			if(MQTT_UnPacketSubscribe(cmd) == 0)
				UsartPrintf("Tips:	MQTT Subscribe OK\r\n");
			else
				UsartPrintf("Tips:	MQTT Subscribe Err\r\n");

		break;

		case MQTT_PKT_UNSUBACK:	//发送UnSubscribe消息的Ack

			if(MQTT_UnPacketUnSubscribe(cmd) == 0)
				UsartPrintf("Tips:	MQTT UnSubscribe OK\r\n");
			else
				UsartPrintf("Tips:	MQTT UnSubscribe Err\r\n");

		break;

		default:
			result = -1;
		break;
	}

	Clear_Usart(&usart1_rx);

	if(result == -1)
		return;

	if(type == MQTT_PKT_CMD || type == MQTT_PKT_PUBLISH)
	{
		MQTT_FreeBuffer(cmdid_topic);
		MQTT_FreeBuffer(req_payload);
	}

}

下发 led_close 关灯 下发 led_open  开灯

完成



专家
2021-05-30 07:25:48    评分
2楼

收藏


高工
2021-05-30 11:33:30    评分
3楼

Very Nice


助工
2021-05-30 15:33:09    评分
4楼

q


高工
2021-05-30 23:46:49    评分
5楼

感谢分享


高工
2021-05-31 00:03:05    评分
6楼

感谢楼主的分享,很实用了。


工程师
2021-05-31 00:11:32    评分
7楼

感谢楼主的分享,很实用了。


专家
2021-05-31 06:29:04    评分
8楼
谢谢分享!



高工
2021-06-04 23:45:30    评分
9楼

剖析做的蛮不错的


工程师
2021-06-05 23:42:29    评分
10楼

协议搞得蛮不错的


共10条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]