前几篇写完了上传,这篇写一下接收(当然是点灯了),有来有回才对嘛。mqtt的接收一般是通过订阅topic,然后接收topic内的消息,需要提前订阅一个topic才能收到,继续看协议



跟以上基本都是大差不差的,就不详细分析了
直接贴程序吧
//==========================================================
// 函数名称: MQTT_PacketSubscribe
//
// 函数功能: Subscribe消息组包
//
// 入口参数: pkt_id:pkt_id
// qos:消息重发次数
// topics:订阅的消息
// topics_cnt:订阅的消息个数
// mqttPacket:包指针
//
// 返回参数: 0-成功 其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_PacketSubscribe(uint16 pkt_id, enum MqttQosLevel qos, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint32 topic_len = 0, remain_len = 0;
int16 len = 0;
uint8 i = 0;
if(pkt_id == 0)
return 1;
//计算topic长度-------------------------------------------------------------------------
for(; i < topics_cnt; i++)
{
if(topics[i] == NULL)
return 2;
topic_len += strlen(topics[i]);
}
//2 bytes packet id + topic filter(2 bytes topic + topic length + 1 byte reserve)------
remain_len = 2 + 3 * topics_cnt + topic_len;
//分配内存------------------------------------------------------------------------------
MQTT_NewBuffer(mqttPacket, remain_len + 5);
if(mqttPacket->_data == NULL)
return 3;
/*************************************固定头部***********************************************/
//固定头部----------------------头部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_SUBSCRIBE << 4 | 0x02;
//固定头部----------------------剩余长度值-----------------------------------------------
len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);
if(len < 0)
{
MQTT_DeleteBuffer(mqttPacket);
return 4;
}
else
mqttPacket->_len += len;
/*************************************payload***********************************************/
//payload----------------------pkt_id---------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
//payload----------------------topic_name-----------------------------------------------
for(i = 0; i < topics_cnt; i++)
{
topic_len = strlen(topics[i]);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);
mqttPacket->_len += topic_len;
mqttPacket->_data[mqttPacket->_len++] = qos & 0xFF;
}
return 0;
}//==========================================================
// 函数名称: OneNet_Subscribe
//
// 函数功能: 订阅
//
// 入口参数: topics:订阅的topic
// topic_cnt:topic个数
//
// 返回参数: SEND_TYPE_OK-成功 SEND_TYPE_SUBSCRIBE-需要重发
//
// 说明:
//==========================================================
void OneNet_Subscribe(const char *topics[], unsigned char topic_cnt)
{
unsigned char i = 0;
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包
for(; i < topic_cnt; i++)
UsartPrintf("Subscribe Topic: %s\r\n", topics[i]);
if(MQTT_PacketSubscribe(MQTT_SUBSCRIBE_ID, MQTT_QOS_LEVEL0, topics, topic_cnt, &mqttPacket) == 0)
{
ESP8266_SendData(mqttPacket._data, mqttPacket._len); //向平台发送订阅请求
MQTT_DeleteBuffer(&mqttPacket); //删包
}
}需要在连接平台之后就订阅这个topic,但是,是哪个topic呢?不知道就看文档,看下onenet的文档

const char *devSubTopic[] = {"$sys/407529/G0701/cmd/#"};在CONNECT之后订阅,然后根据文档,下发指令OneNet_Subscribe(devSubTopic, 1);
照抄官方的写一个接收加判断函数,放在主程序中
OneNet_DevLink(); HAL_Delay(500); Clear_Usart(&usart1_rx); OneNet_Subscribe(devSubTopic, 1); Clear_Usart(&usart1_rx); time6_count = 110; /* USER CODE END 2 */
/* Infinite loop */
/* USER CODE BEGIN WHILE */
while (1)
{
if(time6_count1>10)
{
time6_count1 = 0;
AHT10ReadData(&pv.tem,&pv.hum);
BH1750_Read_Dat(pv.lux);//读取数据
pv.lux_16 = BH1750_Dat_To_Lux(pv.lux);//转换数据
printf("lux:%d\r\n",pv.lux_16);
printf("tem:%.1f\r\nhum:%d%%\r\n",pv.tem,pv.hum);
BH1750_Send_Cmd(ONCE_H_MODE);//单次模式
sprintf(buf,"tem:%.1f",pv.tem);
OLED_P8x16Str(0u,4u,(uint8_t *)buf);
sprintf(buf,"hum:%d%%",pv.hum);
OLED_P8x16Str(64u,4u,(uint8_t *)buf);
sprintf(buf,"lux:%d ",pv.lux_16);
OLED_P8x16Str(0u,6u,(uint8_t *)buf);
}
if(time6_count>120)
{
time6_count = 0;
memset(PUB_BUF, 0, sizeof(PUB_BUF));
OneNet_FillBuf(PUB_BUF);
printf("PUB_BUF:%s\r\n",PUB_BUF);
OneNet_Publish(devPubTopic, PUB_BUF);
}
one_ipd_rx = ESP8266_GetIPD();
if(one_ipd_rx.len > 0 )
{
OneNet_RevPro(one_ipd_rx.IPD_buff);
}
/* USER CODE END WHILE */
/* USER CODE BEGIN 3 */
}void OneNet_RevPro(unsigned char *cmd)
{
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};
char *req_payload = NULL;
char *cmdid_topic = NULL;
unsigned short topic_len = 0;
unsigned short req_len = 0;
unsigned char type = 0;
unsigned char qos = 0;
static unsigned short pkt_id = 0;
short result = 0;
//char *dataPtr = NULL;
type = MQTT_UnPacketRecv(cmd);
switch(type)
{
case MQTT_PKT_CMD://命令下发
result = MQTT_UnPacketCmd(cmd, &cmdid_topic, &req_payload, &req_len);
if(result == 0)
{
UsartPrintf("cmdid: %s, req: %s, req_len: %d\r\n", cmdid_topic, req_payload, req_len);
MQTT_DeleteBuffer(&mqttPacket);
}
break;
case MQTT_PKT_PUBLISH://接收的Publish消息
result = MQTT_UnPacketPublish(cmd, &cmdid_topic, &topic_len, &req_payload, &req_len, &qos, &pkt_id);
if(result == 0)
{
UsartPrintf("topic: %s, topic_len: %d, payload: %s, payload_len: %d\r\n",
cmdid_topic, topic_len, req_payload, req_len);
if(strstr(req_payload, "led_close") != NULL)
{
HAL_GPIO_WritePin(LED_GREEN_GPIO_Port,LED_GREEN_Pin,GPIO_PIN_RESET);
UsartPrintf("LED_OFF\r\n");
}
else if(strstr(req_payload, "led_open") != NULL)
{
HAL_GPIO_WritePin(LED_GREEN_GPIO_Port,LED_GREEN_Pin,GPIO_PIN_SET);
UsartPrintf("LED_ON\r\n");
}
}
break;
case MQTT_PKT_PUBACK: //发送Publish消息,平台回复的Ack
if(MQTT_UnPacketPublishAck(cmd) == 0)
UsartPrintf("Tips: MQTT Publish Send OK\r\n");
break;
case MQTT_PKT_PUBREC://发送Publish消息,平台回复的Rec,设备需回复Rel消息
if(MQTT_UnPacketPublishRec(cmd) == 0)
{
UsartPrintf("Tips: Rev PublishRec\r\n");
if(MQTT_PacketPublishRel(MQTT_PUBLISH_ID, &mqttPacket) == 0)
{
UsartPrintf("Tips: Send PublishRel\r\n");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
}
break;
case MQTT_PKT_PUBREL://收到Publish消息,设备回复Rec后,平台回复的Rel,设备需再回复Comp
if(MQTT_UnPacketPublishRel(cmd, pkt_id) == 0)
{
UsartPrintf( "Tips: Rev PublishRel\r\n");
if(MQTT_PacketPublishComp(MQTT_PUBLISH_ID, &mqttPacket) == 0)
{
UsartPrintf("Tips: Send PublishComp\r\n");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
}
break;
case MQTT_PKT_PUBCOMP://发送Publish消息,平台返回Rec,设备回复Rel,平台再返回的Comp
if(MQTT_UnPacketPublishComp(cmd) == 0)
{
UsartPrintf("Tips: Rev PublishComp\r\n");
}
break;
case MQTT_PKT_SUBACK://发送Subscribe消息的Ack
if(MQTT_UnPacketSubscribe(cmd) == 0)
UsartPrintf("Tips: MQTT Subscribe OK\r\n");
else
UsartPrintf("Tips: MQTT Subscribe Err\r\n");
break;
case MQTT_PKT_UNSUBACK: //发送UnSubscribe消息的Ack
if(MQTT_UnPacketUnSubscribe(cmd) == 0)
UsartPrintf("Tips: MQTT UnSubscribe OK\r\n");
else
UsartPrintf("Tips: MQTT UnSubscribe Err\r\n");
break;
default:
result = -1;
break;
}
Clear_Usart(&usart1_rx);
if(result == -1)
return;
if(type == MQTT_PKT_CMD || type == MQTT_PKT_PUBLISH)
{
MQTT_FreeBuffer(cmdid_topic);
MQTT_FreeBuffer(req_payload);
}
}下发 led_close 关灯 下发 led_open 开灯


完成
我要赚赏金
