前言
OneNET提供的MQTT协议和IBM释放出来的不太一样,是由OneNET工程师根据MQTT协议本身的理解写的,如果读者想更快的了解MQTT协议,建议可以先看看OneNET的MQTT协议SDK,很容易理解。OneNET的MQTT协议是使用MQTT_V3.1.1版本,更多关于MQTT协议的讲解可以到OneNET平台下载:MQTT协议和OneNET设备终端接入协议-MQTT。在本文会使用到MQTT调试工具,关于工具的简单使用可以参考我另外一篇博客用MQTT调试工具调试OneNET MQTT协议。
本文主要讲解MQTT的消息发布和订阅以及数据流上报,不讲解命令下发,对命令下发感兴趣的读者可以看EDP协议,可参考我另外一篇博客OneNET的EDP协议讲解与应用。本文用socket的方式来通信,这里不再列出socket的代码,读者可以到我另外一篇博客OneNET的EDP协议讲解与应用查看。本文列出的代码部分是参考OneNET工程师——张继瑞的代码修改的,代码可能存在bug,请读者自行辨别,仅作为参考。
一、什么是MQTT?
我们直接到官网查看:
MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimise network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.
二、OneNET MQTT协议讲解和应用
1、连接请求
(1)连接OneNET平台,MQTT协议对应的服务器地址为:"183.230.40.39",端口为6002。
使用socket连接服务器:
typedef enum
{MQTT_SRV_CONNECT_OK = 0,MQTT_SRV_CONNECT_ERROR = 1
}mqtt_srv_connect_t;int g_onenet_mqtt_socket_id = -1; /* socket id */char *g_onenet_mqtt_srv_ip_addr = "183.230.40.39";
int g_onenet_mqtt_srv_ip_port = 6002;/**************************************************************
函数名称 : onenet_mqtt_service_connect
函数功能 : mqtt 服务器连接
输入参数 : ip_addr:ip地址,ip_port:端口
返回值 : MQTT_SRV_CONNECT_OK:连接成功,MQTT_SRV_CONNECT_ERROR:连接失败
备注 : 无
**************************************************************/
mqtt_srv_connect_t onenet_mqtt_service_connect(char *ip_addr, unsigned int ip_port)
{g_onenet_mqtt_socket_id = socket_create();if(g_onenet_mqtt_socket_id < 0){ONENET_MQTT_LOG("connect failed, g_onenet_mqtt_socket_id < 0");return MQTT_SRV_CONNECT_ERROR;}if(CONNECT_ERROR == socket_connect_service(g_onenet_mqtt_socket_id, ip_addr, ip_port)){ONENET_MQTT_LOG("connect failed, connect error");return MQTT_SRV_CONNECT_ERROR;}else{ONENET_MQTT_LOG("connect success");return MQTT_SRV_CONNECT_OK;}}onenet_mqtt_service_connect(g_onenet_mqtt_srv_ip_addr, g_onenet_mqtt_srv_ip_port);
(2)连接设备,连接设备的方式有两种:
连接设备代码:
typedef enum
{MQTT_DEV_CONNECT_OK = 0,MQTT_DEV_CONNECT_ERROR = 1
}mqtt_dev_link_t;#define ONENET_MQTT_KEEP_ALIVE_TIME 256 /* 保活时间 */char *g_onenet_mqtt_pro_api_key = "LnEhym2IW1pTfBhFMjKK8s7HFJQ=";
char *g_onenet_mqtt_pro_id = "194320";
char *g_onenet_mqtt_dev_auth_info = "mqttdev001";
char *g_onenet_mqtt_dev_id = "505550506";/**************************************************************
函数名称 : onenet_mqtt_device_link
函数功能 : 与onenet设备创建连接
输入参数 : devid:创建设备的devidproid:产品IDauth_key:创建设备的masterKey或apiKey或设备鉴权信息
返回值 : MQTT_CONNECT_OK:连接成功,MQTT_CONNECT_ERROR:连接失败
备注 : 无
**************************************************************/
mqtt_dev_link_t onenet_mqtt_device_link(const char* devid, const char *proid, const char* auth_key)
{MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包unsigned char time_out = 100;ONENET_MQTT_LOG("devid: %s, proid:%s, api_key: %s\r\n", devid, proid, auth_key);if(MQTT_PacketConnect(proid, auth_key, devid, ONENET_MQTT_KEEP_ALIVE_TIME, 0, MQTT_QOS_LEVEL0, NULL, NULL, 0, &mqttPacket) == 0){socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);//上传平台while(--time_out){vTaskDelay(1);}MQTT_DeleteBuffer(&mqttPacket); //删包return MQTT_DEV_CONNECT_OK;}else{ONENET_MQTT_LOG("link failed\r\n");return MQTT_DEV_CONNECT_ERROR;}}onenet_mqtt_device_link(g_onenet_mqtt_dev_id, g_onenet_mqtt_pro_id, g_onenet_mqtt_pro_api_key);
连接设备成功之后,可以在页面看到设备在线状态:
(3)断开设备连接,断开设备连接同时需要断开socket连接,代码:
/**************************************************************
函数名称 : onenet_mqtt_disconnect
函数功能 : 与onenet断开连接
输入参数 : 无
返回值 : 无
备注 : 无
**************************************************************/
void onenet_mqtt_disconnect(void)
{MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包if(MQTT_PacketDisConnect(&mqttPacket) == 0){socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);//上传平台MQTT_DeleteBuffer(&mqttPacket); //删包}
}/* 断开连接 */
onenet_mqtt_disconnect();
socket_close(g_onenet_mqtt_socket_id);
g_onenet_mqtt_socket_id = -1;
成功断开设备连接后,可在页面看到离线状态:
2、心跳保持
心跳保持是需要定时向服务器发送1倍的keep_alive时间,这个keep_alive时间是前面连接平台是设置的时间,请看连接平台代码,在这我使用FreeRTOS的软件定时器定时向平台发送心跳。
心跳保持代码:
#define ONENET_MQTT_SEND_HEART_TIME ((ONENET_MQTT_KEEP_ALIVE_TIME - 20)*10)/* 心跳请求周期,单位100ms,MQTT心跳周期要求在1倍Keep_Alive内*//**************************************************************
函数名称 : onenet_mqtt_send_heart
函数功能 : 发送心跳
输入参数 : 无
返回值 : 无
备注 : 无
**************************************************************/
void onenet_mqtt_send_heart(void)
{ONENET_MQTT_LOG("send heart");MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包MQTT_PacketPing(&mqttPacket);socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);MQTT_DeleteBuffer(&mqttPacket); //删包
}/**************************************************************
函数名称 : onenet_mqtt_send_heart_sw_timer_callback
函数功能 : 用FreeRTOS软件定时器定时发送心跳的回掉函数
输入参数 : xTimer:软件定时器任务句柄
返回值 : 无
备注 : 无
**************************************************************/
void onenet_mqtt_send_heart_sw_timer_callback(TimerHandle_t xTimer)
{onenet_mqtt_send_heart();
}/**************************************************************
函数名称 : onenet_mqtt_send_heart_sw_timer_task_init
函数功能 : 创建发送心跳软件定时器任务
输入参数 : 无
返回值 : 无
备注 : 无
**************************************************************/
void onenet_mqtt_send_heart_sw_timer_task_init(void)
{g_onenet_mqtt_send_heart_sw_timer_handle = xTimerCreate("onenet_mqtt_send_heart_sw_timer_task",ONENET_MQTT_SEND_HEART_TIME * COAP_MAX_TRANSMIT_WAIT / portTICK_PERIOD_MS, pdTRUE,NULL,onenet_mqtt_send_heart_sw_timer_callback);
}/* 在连接设备成功之后发送开启定时器发送心跳 */
if(MQTT_DEV_CONNECT_OK == onenet_mqtt_device_link(g_onenet_mqtt_dev_id, g_onenet_mqtt_pro_id, g_onenet_mqtt_pro_api_key))
{xTimerStart(g_onenet_mqtt_send_heart_sw_timer_handle, 0);/* 连接设备成功之后开启发送心跳定时器 */
}
3、上传数据流
在这里,作为测试,我使用FreeRTOS的软件定时器,定时向平台上传数据流。
上传数据流代码:
/---------------------------------------------------------------------------------------/
typedef enum
{TYPE_BOOL = 0,TYPE_CHAR,TYPE_UCHAR,TYPE_SHORT,TYPE_USHORT,TYPE_INT,TYPE_UINT,TYPE_LONG,TYPE_ULONG,TYPE_FLOAT,TYPE_DOUBLE,TYPE_GPS,TYPE_STRING,
} DATA_TYPE;typedef struct
{char *name;void *dataPoint;DATA_TYPE dataType;bool flag;} DATA_STREAM;typedef enum
{FORMAT_TYPE1 = 1,FORMAT_TYPE2,FORMAT_TYPE3,FORMAT_TYPE4,FORMAT_TYPE5} FORMAT_TYPE;/---------------------------------------------------------------------------------------///数据流
float g_mqtt_temperature = 26.5;
DATA_STREAM g_mqtt_data_stream[] =
{{"temperature", &g_mqtt_temperature, TYPE_FLOAT, 1},
};
/* 数据流个数 */
unsigned short g_mqtt_data_stream_cnt = sizeof(g_mqtt_data_stream) / sizeof(g_mqtt_data_stream[0]);/**************************************************************
函数名称 : onenet_mqtt_send_data
函数功能 : 上传数据到平台
输入参数 : type:发送数据的格式devid:设备IDapikey:设备apikeystreamArray:数据流streamArrayNum:数据流个数
返回值 : 无
备注 : 无
**************************************************************/
void onenet_mqtt_send_data(FORMAT_TYPE type, char *devid, char *apikey, DATA_STREAM *streamArray, unsigned short streamArrayCnt)
{MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包short body_len = 0;ONENET_MQTT_LOG("MQTT_TYPE%d\r\n", type);body_len = DSTREAM_GetDataStream_Body_Measure(type, streamArray, streamArrayCnt, 0); //获取当前需要发送的数据流的总长度if(body_len > 0){if(MQTT_PacketSaveData(devid, body_len, NULL, (uint8)type, &mqttPacket) == 0){body_len = DSTREAM_GetDataStream_Body(type, streamArray, streamArrayCnt, mqttPacket._data, mqttPacket._size, mqttPacket._len);if(body_len > 0){mqttPacket._len += body_len;ONENET_MQTT_LOG("Send %d Bytes\r\n", mqttPacket._len);socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);}else{ONENET_MQTT_LOG("WARN: DSTREAM_GetDataStream_Body Failed\r\n");} MQTT_DeleteBuffer(&mqttPacket); //删包}else{ONENET_MQTT_LOG("WARN: MQTT_NewBuffer Failed\r\n");}}
}TimerHandle_t g_onenet_mqtt_send_data_sw_timer_handle = NULL;/**************************************************************
函数名称 : onenet_mqtt_send_data_sw_timer_callback
函数功能 : 用FreeRTOS软件定时器定时上发数据的回掉函数
输入参数 : xTimer:软件定时器任务句柄
返回值 : 无
备注 : 无
**************************************************************/
void onenet_mqtt_send_data_sw_timer_callback(TimerHandle_t xTimer)
{onenet_mqtt_send_data(FORMAT_TYPE3, g_onenet_mqtt_dev_id, g_onenet_mqtt_pro_api_key, g_mqtt_data_stream, g_mqtt_data_stream_cnt);//上传数据到平台
}/**************************************************************
函数名称 : onenet_mqtt_send_data_sw_timer_task_init
函数功能 : 创建软件定时器任务
输入参数 : 无
返回值 : 无
备注 : 对于MQTT的心跳请求必须在1.5倍keep_alive时间内发送
**************************************************************/
void onenet_mqtt_send_data_sw_timer_task_init(void)
{g_onenet_mqtt_send_data_sw_timer_handle = xTimerCreate("onenet_mqtt_send_data_sw_timer_task",200 * COAP_MAX_TRANSMIT_WAIT / portTICK_PERIOD_MS, /* 20s定时,软件定时器误差大 */pdTRUE,NULL,onenet_mqtt_send_data_sw_timer_callback);
}
打开上传数据流软件定时器开始定时上传数据流:
if(pdPASS == xTimerStart(g_onenet_mqtt_send_data_sw_timer_handle, 0))
{ONENET_MQTT_LOG("start success");
}
如果不想上传数据流,则关闭软件定时器:
if(pdPASS == xTimerStop(g_onenet_mqtt_send_data_sw_timer_handle, 0))
{ONENET_MQTT_LOG("stop success");
}
数据流成功上传,可以在设备查看到:
4、设备消息的发布
在这里,我们需要借助前面提到的MQTT调试工具,同时我们在OneNET再创建一个设备,现在我们有两个设备:
上面的MQTT_DEV_001设备由我们的硬件设备去连接,MQTT_DEV_002由我们的MQTT调试工具去连接。打开MQTT调试工具,并连接MQTT_DEV_002,同时订阅一个主题为“hello_topic”,如下:
接着,我们来实现硬件设备连接MQTT_DEV_001向调试工具连接的MQTT_DEV_002发布消息,代码如下:
typedef enum
{PUBLISH_MSG_OK = 0,PUBLISH_MSG_ERROR = 1
}mqtt_publish_topic_msg_t;/**************************************************************
函数名称 : onenet_mqtt_publish_topic
函数功能 : onenet mqtt 发布主题消息
输入参数 : topic:发布的主题msg:消息内容
返回值 : PUBLISH_MSG_OK: 发布消息成功PUBLISH_MSG_ERROR:发布消息失败
备注 : 无
**************************************************************/
mqtt_publish_topic_msg_t onenet_mqtt_publish_topic(const char *topic, const char *msg)
{MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包ONENET_MQTT_LOG("publish topic: %s, msg: %s\r\n", topic, msg);if(MQTT_PacketPublish(MQTT_PUBLISH_ID, topic, msg, strlen(msg), MQTT_QOS_LEVEL2, 0, 1, &mqttPacket) == 0){socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);//上传平台MQTT_DeleteBuffer(&mqttPacket); //删包return PUBLISH_MSG_OK;}else{return PUBLISH_MSG_ERROR;}
}/* 发布主题为"hello_topic",消息为"hello_world" */
if(PUBLISH_MSG_OK == onenet_mqtt_publish_topic("hello_topic", "hello_world"))
{ONENET_MQTT_LOG("publish success");
}
消息发布成功之后,可以在MQTT调试工具查看到对应的消息,说明MQTT_DEV_002成功接收到MQTT_DEV_001发布过来的消息,如下所示:
5、设备消息的订阅
接下来,我们用硬件设备连接的MQTT_DEV_001去订阅一个主题为“topic_test”,然后使用MQTT调试工具去连接MQTT_DEV_002来向主题“topic_test”发布消息。
硬件设备消息订阅代码如下:
/* 主题 */
const signed char *g_mqtt_topics[] = {"mqtt_topic", "topic_test"};typedef enum
{SUBSCRIBE_TOPIC_OK = 0,SUBSCRIBE_TOPIC_ERROR = 1
}mqtt_subscribe_topic_t;/**************************************************************
函数名称 : onenet_mqtt_subscribe_topic
函数功能 : onenet mqtt 订阅主题
输入参数 : topic:订阅的topictopic_cnt:topic个数
返回值 : SUBSCRIBE_TOPIC_OK:订阅成功SUBSCRIBE_TOPIC_ERROR:订阅失败
备注 : 无
**************************************************************/
mqtt_subscribe_topic_t onenet_mqtt_subscribe_topic(const signed char *topic[], unsigned char topic_cnt)
{unsigned char i = 0;MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包for(; i < topic_cnt; i++){ONENET_MQTT_LOG("subscribe topic: %s\r\n", topic[i]);}if(MQTT_PacketSubscribe(MQTT_SUBSCRIBE_ID, MQTT_QOS_LEVEL2, topic, topic_cnt, &mqttPacket) == 0){socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);//上传平台MQTT_DeleteBuffer(&mqttPacket); //删包return SUBSCRIBE_TOPIC_OK;}else{return SUBSCRIBE_TOPIC_ERROR;}
}/* 开启消息订阅 */
if(SUBSCRIBE_TOPIC_OK == onenet_mqtt_subscribe_topic(g_mqtt_topics, 2))
{ONENET_MQTT_LOG("subscribe success");
}
既然是消息订阅,那么就是MQTT调试工具通过订阅服务器/后台下发过来的消息,那么久需要处理从后天得到的数据,代码如下:
/**************************************************************
函数名称 : onenet_mqtt_device_recv_pro
函数功能 : 平台返回数据检测处理
输入参数 : data:平台返回的数据
返回值 : 无
备注 : 无
**************************************************************/
void onenet_mqtt_device_recv_pro(unsigned char *data)
{MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包signed char *req_payload = NULL;signed char *cmdid_topic = NULL;unsigned short topic_len = 0;unsigned short req_len = 0;unsigned char qos = 0;static unsigned short pkt_id = 0;ONENET_MQTT_LOG("device_recv_data:%s\r\n", data);switch(MQTT_UnPacketRecv(data)){case MQTT_PKT_CONNACK:{switch(MQTT_UnPacketConnectAck(data)){case 0:ONENET_MQTT_LOG("Tips: 连接成功\r\n");break;case 1:ONENET_MQTT_LOG("WARN: 连接失败:协议错误\r\n");break;case 2:ONENET_MQTT_LOG("WARN: 连接失败:非法的clientid\r\n");break;case 3:ONENET_MQTT_LOG("WARN: 连接失败:服务器失败\r\n");break;case 4:ONENET_MQTT_LOG("WARN: 连接失败:用户名或密码错误\r\n");break;case 5:ONENET_MQTT_LOG("WARN: 连接失败:非法链接(比如token非法)\r\n");break;default:ONENET_MQTT_LOG("ERR: 连接失败:未知错误\r\n");break;}break;}case MQTT_PKT_PINGRESP:{ONENET_MQTT_LOG("Tips: HeartBeat OK\r\n");break;}case MQTT_PKT_CMD: //命令下发{if(MQTT_UnPacketCmd(data, &cmdid_topic, &req_payload, &req_len) == 0) //解出topic和消息体{ONENET_MQTT_LOG("cmdid: %s, req: %s, req_len: %d\r\n", cmdid_topic, req_payload, req_len);//执行命令回调------------------------------------------------------------//CALLBACK_Execute(req_payload);if(MQTT_PacketCmdResp(cmdid_topic, req_payload, &mqttPacket) == 0) //命令回复组包{ONENET_MQTT_LOG("Tips: Send CmdResp\r\n");socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);MQTT_DeleteBuffer(&mqttPacket); //删包}MQTT_FREE_BUFFER(cmdid_topic);MQTT_FREE_BUFFER(req_payload);}break;} case MQTT_PKT_PUBLISH: //接收的Publish消息{if(MQTT_UnPacketPublish(data, &cmdid_topic, &topic_len, &req_payload, &req_len, &qos, &pkt_id) == 0){ONENET_MQTT_LOG("topic: %s, topic_len: %d, payload: %s, payload_len: %d\r\n",cmdid_topic, topic_len, req_payload, req_len);//执行命令回调------------------------------------------------------------//CALLBACK_Execute(req_payload);switch(qos){case 1: //收到publish的qos为1,设备需要回复Ack{if(MQTT_PacketPublishAck(pkt_id, &mqttPacket) == 0){ONENET_MQTT_LOG("Tips: Send PublishAck\r\n");socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);MQTT_DeleteBuffer(&mqttPacket);}break;}case 2: //收到publish的qos为2,设备先回复Rec{ //平台回复Rel,设备再回复Compif(MQTT_PacketPublishRec(pkt_id, &mqttPacket) == 0){ONENET_MQTT_LOG("Tips: Send PublishRec\r\n");socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);MQTT_DeleteBuffer(&mqttPacket);}break;}default:break;}MQTT_FREE_BUFFER(cmdid_topic);MQTT_FREE_BUFFER(req_payload);}break;}case MQTT_PKT_PUBACK: //发送Publish消息,平台回复的Ack{if(MQTT_UnPacketPublishAck(data) == 0){ONENET_MQTT_LOG("Tips: MQTT Publish Send OK\r\n");}break;} case MQTT_PKT_PUBREC: //发送Publish消息,平台回复的Rec,设备需回复Rel消息{if(MQTT_UnPacketPublishRec(data) == 0){ONENET_MQTT_LOG("Tips: Rev PublishRec\r\n");if(MQTT_PacketPublishRel(MQTT_PUBLISH_ID, &mqttPacket) == 0){ONENET_MQTT_LOG("Tips: Send PublishRel\r\n");socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);MQTT_DeleteBuffer(&mqttPacket);}}break;} case MQTT_PKT_PUBREL://收到Publish消息,设备回复Rec后,平台回复的Rel,设备需再回复Comp{if(MQTT_UnPacketPublishRel(data, pkt_id) == 0){ONENET_MQTT_LOG("Tips: Rev PublishRel\r\n");if(MQTT_PacketPublishComp(MQTT_PUBLISH_ID, &mqttPacket) == 0){ONENET_MQTT_LOG("Tips: Send PublishComp\r\n");socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);MQTT_DeleteBuffer(&mqttPacket);}}break;}case MQTT_PKT_PUBCOMP: //发送Publish消息,平台返回Rec,设备回复Rel,平台再返回的Comp{if(MQTT_UnPacketPublishComp(data) == 0){ONENET_MQTT_LOG("Tips: Rev PublishComp\r\n");}break;}case MQTT_PKT_SUBACK: //发送Subscribe消息的Ack{if(MQTT_UnPacketSubscribe(data) == 0){ONENET_MQTT_LOG("Tips: MQTT Subscribe OK\r\n");}else{ONENET_MQTT_LOG("Tips: MQTT Subscribe Err\r\n");}break;}case MQTT_PKT_UNSUBACK: //发送UnSubscribe消息的Ack{if(MQTT_UnPacketUnSubscribe(data) == 0){ONENET_MQTT_LOG("Tips: MQTT UnSubscribe OK\r\n");}else{ONENET_MQTT_LOG("Tips: MQTT UnSubscribe Err\r\n");}break;} default:break;}}#define ONENET_MQTT_RECV_MAX_SIZE 256
/**************************************************************
函数名称 : onenet_mqtt_device_recv_pro_task
函数功能 : 接收onenet下发数据处理任务函数
输入参数 : pvParameter:任务入口参数
返回值 : 无
备注 : 无
**************************************************************/
void onenet_mqtt_device_recv_pro_task(void *pvParameter)
{unsigned char data_ptr[ONENET_MQTT_RECV_MAX_SIZE];memset(data_ptr, 0, sizeof(data_ptr));while(1){if(socket_receive(g_onenet_mqtt_socket_id, data_ptr, ONENET_MQTT_RECV_MAX_SIZE) > 0) //使用MSG_DONTWAIT会比较稳定{onenet_mqtt_device_recv_pro(data_ptr); //集中处理memset(data_ptr, 0, sizeof(data_ptr));} vTaskDelay(1); //挂起任务10ms}
}/* 任务句柄 */
TaskHandle_t g_onenet_mqtt_device_recv_pro_task_handle = NULL; /**************************************************************
函数名称 : onenet_edp_receive_cmd_task_init
函数功能 : 创建接收onenet下发数据处理任务
输入参数 : 无
返回值 : 无
备注 : 无
**************************************************************/
void onenet_mqtt_device_recv_pro_task_init(void)
{if(g_onenet_mqtt_device_recv_pro_task_handle == NULL) {xTaskCreate(onenet_mqtt_device_recv_pro_task,"onenet_mqtt_device_recv_pro_task",1024 * 4 / sizeof(portSTACK_TYPE),NULL,TASK_PRIORITY_NORMAL,&g_onenet_mqtt_device_recv_pro_task_handle);ONENET_MQTT_LOG("onenet_mqtt_device_recv_pro_task");}}
接着,我们使用MQTT调试工具发布消息,如下:
发布成功之后且硬件设备订阅到消息,可以抓取到串口调试是打印的信息,说明消息订阅成功:
topic: topic_test, topic_len: 10, payload: topic_test003, payload_len: 13
playload就是订阅到的消息,拿到消息之后就可根据这条信息去执行相应的操作,这里我没有写对应操作的代码,读者可以自己根据自己的情况去写,在void onenet_mqtt_device_recv_pro(unsigned char *data) 平台返回数据检测处理函数里面的代码写对应的操作即可,具体位置如下:
6、取消设备消息的订阅
取消消息订阅之后,设备将不再订阅到消息,代码如下:
typedef enum
{UNSUBSCRIBE_TOPIC_OK = 0,UNSUBSCRIBE_TOPIC_ERROR = 1
}mqtt_unsubscribe_topic_t;/**************************************************************
函数名称 : onenet_mqtt_unsubscribe_topic
函数功能 : onenet mqtt 取消订阅主题
输入参数 : topic:取消订阅的topictopic_cnt:topic个数
返回值 : UNSUBSCRIBE_TOPIC_OK: 取消订阅成功UNSUBSCRIBE_TOPIC_ERROR:取消订阅失败
备注 : 无
**************************************************************/
mqtt_unsubscribe_topic_t onenet_mqtt_unsubscribe_topic(const signed char *topic[], unsigned char topic_cnt)
{unsigned char i = 0;MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包for(; i < topic_cnt; i++){ONENET_MQTT_LOG("unsubscribe topic: %s\r\n", topic[i]);}if(MQTT_PacketUnSubscribe(MQTT_UNSUBSCRIBE_ID, topic, topic_cnt, &mqttPacket) == 0){socket_send(g_onenet_mqtt_socket_id, mqttPacket._data, mqttPacket._len);//上传平台MQTT_DeleteBuffer(&mqttPacket); //删包return UNSUBSCRIBE_TOPIC_OK;}else{return UNSUBSCRIBE_TOPIC_ERROR;}
}/* 执行取消消息订阅 */
if(UNSUBSCRIBE_TOPIC_OK == onenet_mqtt_unsubscribe_topic(g_mqtt_topics, 2))
{ONENET_MQTT_LOG("unsubscribe success");
}
三、贴出代码
以下代码作者是OneNET工程师——张继瑞,本人只是根据自己的使用的平台修改了内存申请和释放函数。
1、mqttkit.c :
#include "string.h"
#include "mqttkit.h"#define CMD_TOPIC_PREFIX "$creq"//==========================================================
// 函数名称: MQTT_NewBuffer
//
// 函数功能: 申请内存
//
// 入口参数: mqttPacket:包结构体
// size:大小
//
// 返回参数: 无
//
// 说明: 1.可使用动态分配来分配内存
// 2.可使用局部或全局数组来指定内存
//==========================================================
void MQTT_NewBuffer(MQTT_PACKET_STRUCTURE *mqttPacket, uint32 size)
{uint32 i = 0;if(mqttPacket->_data == NULL){mqttPacket->_memFlag = MEM_FLAG_ALLOC;mqttPacket->_data = (uint8 *)MQTT_MALLOC_BUFFER(size);if(mqttPacket->_data != NULL){mqttPacket->_len = 0;mqttPacket->_size = size;for(; i < mqttPacket->_size; i++)mqttPacket->_data[i] = 0;}}else{mqttPacket->_memFlag = MEM_FLAG_STATIC;for(; i < mqttPacket->_size; i++)mqttPacket->_data[i] = 0;mqttPacket->_len = 0;if(mqttPacket->_size < size)mqttPacket->_data = NULL;}}//==========================================================
// 函数名称: MQTT_DeleteBuffer
//
// 函数功能: 释放数据内存
//
// 入口参数: mqttPacket:包结构体
//
// 返回参数: 无
//
// 说明:
//==========================================================
void MQTT_DeleteBuffer(MQTT_PACKET_STRUCTURE *mqttPacket)
{if(mqttPacket->_memFlag == MEM_FLAG_ALLOC)MQTT_FREE_BUFFER(mqttPacket->_data);mqttPacket->_data = NULL;mqttPacket->_len = 0;mqttPacket->_size = 0;mqttPacket->_memFlag = MEM_FLAG_NULL;}int32 MQTT_DumpLength(size_t len, uint8 *buf)
{int32 i = 0;for(i = 1; i <= 4; ++i){*buf = len % 128;len >>= 7;if(len > 0){*buf |= 128;++buf;}else{return i;}}return -1;
}int32 MQTT_ReadLength(const uint8 *stream, int32 size, uint32 *len)
{int32 i;const uint8 *in = stream;uint32 multiplier = 1;*len = 0;for(i = 0; i < size; ++i){*len += (in[i] & 0x7f) * multiplier;if(!(in[i] & 0x80)){return i + 1;}multiplier <<= 7;if(multiplier >= 2097152) //128 * *128 * *128{return -2; // error, out of range}}return -1; // not complete}//==========================================================
// 函数名称: MQTT_UnPacketRecv
//
// 函数功能: MQTT数据接收类型判断
//
// 入口参数: dataPtr:接收的数据指针
//
// 返回参数: 0-成功 其他-失败原因
//
// 说明:
//==========================================================
uint8 MQTT_UnPacketRecv(uint8 *dataPtr)
{uint8 status = 255;uint8 type = dataPtr[0] >> 4; //类型检查if(type < 1 || type > 14)return status;if(type == MQTT_PKT_PUBLISH){uint8 *msgPtr;uint32 remain_len = 0;msgPtr = dataPtr + MQTT_ReadLength(dataPtr + 1, 4, &remain_len) + 1;if(remain_len < 2 || dataPtr[0] & 0x01) //retainreturn 255;if(remain_len < ((uint16)msgPtr[0] << 8 | msgPtr[1]) + 2)return 255;if(strstr((int8 *)msgPtr + 2, CMD_TOPIC_PREFIX) != NULL) //如果是命令下发status = MQTT_PKT_CMD;elsestatus = MQTT_PKT_PUBLISH;}elsestatus = type;return status;}//==========================================================
// 函数名称: MQTT_PacketConnect
//
// 函数功能: 连接消息组包
//
// 入口参数: user:用户名:产品ID
// password:密码:鉴权信息或apikey
// devid:设备ID
// cTime:连接保持时间
// clean_session:离线消息清除标志
// qos:重发标志
// will_topic:异常离线topic
// will_msg:异常离线消息
// will_retain:消息推送标志
// mqttPacket:包指针
//
// 返回参数: 0-成功 其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_PacketConnect(const int8 *user, const int8 *password, const int8 *devid,uint16 cTime, bool clean_session, bool qos,const int8 *will_topic, const int8 *will_msg, int32 will_retain,MQTT_PACKET_STRUCTURE *mqttPacket)
{uint8 flags = 0;uint8 will_topic_len = 0;uint16 total_len = 15;int16 len = 0, devid_len = strlen(devid);if(!devid)return 1;total_len += devid_len + 2;//断线后,是否清理离线消息:1-清理 0-不清理--------------------------------------------if(clean_session){flags |= MQTT_CONNECT_CLEAN_SESSION;}//异常掉线情况下,服务器发布的topic------------------------------------------------------if(will_topic){flags |= MQTT_CONNECT_WILL_FLAG;will_topic_len = strlen(will_topic);total_len += 4 + will_topic_len + strlen(will_msg);}//qos级别--主要用于PUBLISH(发布态)消息的,保证消息传递的次数-----------------------------switch((unsigned char)qos){case MQTT_QOS_LEVEL0:flags |= MQTT_CONNECT_WILL_QOS0; //最多一次break;case MQTT_QOS_LEVEL1:flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS1); //最少一次break;case MQTT_QOS_LEVEL2:flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS2); //只有一次break;default:return 2;}//主要用于PUBLISH(发布态)的消息,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它。如果不设那么推送至当前订阅的就释放了if(will_retain){flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_RETAIN);}//账号为空 密码为空---------------------------------------------------------------------if(!user || !password){return 3;}flags |= MQTT_CONNECT_USER_NAME | MQTT_CONNECT_PASSORD;total_len += strlen(user) + strlen(password) + 4;//分配内存-----------------------------------------------------------------------------MQTT_NewBuffer(mqttPacket, total_len);if(mqttPacket->_data == NULL)return 4;memset(mqttPacket->_data, 0, total_len);/*************************************固定头部***********************************************///固定头部----------------------连接请求类型---------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_CONNECT << 4;//固定头部----------------------剩余长度值-----------------------------------------------len = MQTT_DumpLength(total_len - 5, mqttPacket->_data + mqttPacket->_len);if(len < 0){MQTT_DeleteBuffer(mqttPacket);return 5;}elsemqttPacket->_len += len;/*************************************可变头部***********************************************///可变头部----------------------协议名长度 和 协议名--------------------------------------mqttPacket->_data[mqttPacket->_len++] = 0;mqttPacket->_data[mqttPacket->_len++] = 4;mqttPacket->_data[mqttPacket->_len++] = 'M';mqttPacket->_data[mqttPacket->_len++] = 'Q';mqttPacket->_data[mqttPacket->_len++] = 'T';mqttPacket->_data[mqttPacket->_len++] = 'T';//可变头部----------------------protocol level 4-----------------------------------------mqttPacket->_data[mqttPacket->_len++] = 4;//可变头部----------------------连接标志(该函数开头处理的数据)-----------------------------mqttPacket->_data[mqttPacket->_len++] = flags;//可变头部----------------------保持连接的时间(秒)----------------------------------------mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(cTime);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(cTime);/*************************************消息体************************************************///消息体----------------------------devid长度、devid-------------------------------------mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(devid_len);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(devid_len);strncat((int8 *)mqttPacket->_data + mqttPacket->_len, devid, devid_len);mqttPacket->_len += devid_len;//消息体----------------------------will_flag 和 will_msg---------------------------------if(flags & MQTT_CONNECT_WILL_FLAG){unsigned short mLen = 0;if(!will_msg)will_msg = "";mLen = strlen(will_topic);mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen);strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_topic, mLen);mqttPacket->_len += mLen;mLen = strlen(will_msg);mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen);strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_msg, mLen);mqttPacket->_len += mLen;}//消息体----------------------------use---------------------------------------------------if(flags & MQTT_CONNECT_USER_NAME){unsigned short user_len = strlen(user);mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(user_len);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(user_len);strncat((int8 *)mqttPacket->_data + mqttPacket->_len, user, user_len);mqttPacket->_len += user_len;}//消息体----------------------------password----------------------------------------------if(flags & MQTT_CONNECT_PASSORD){unsigned short psw_len = strlen(password);mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(psw_len);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(psw_len);strncat((int8 *)mqttPacket->_data + mqttPacket->_len, password, psw_len);mqttPacket->_len += psw_len;}return 0;}//==========================================================
// 函数名称: MQTT_PacketDisConnect
//
// 函数功能: 断开连接消息组包
//
// 入口参数: mqttPacket:包指针
//
// 返回参数: 0-成功 1-失败
//
// 说明:
//==========================================================
bool MQTT_PacketDisConnect(MQTT_PACKET_STRUCTURE *mqttPacket)
{MQTT_NewBuffer(mqttPacket, 2);if(mqttPacket->_data == NULL)return 1;/*************************************固定头部***********************************************///固定头部----------------------头部消息-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_DISCONNECT << 4;//固定头部----------------------剩余长度值-----------------------------------------------mqttPacket->_data[mqttPacket->_len++] = 0;return 0;}//==========================================================
// 函数名称: MQTT_UnPacketConnectAck
//
// 函数功能: 连接消息解包
//
// 入口参数: rev_data:接收的数据
//
// 返回参数: 1、255-失败 其他-平台的返回码
//
// 说明:
//==========================================================
uint8 MQTT_UnPacketConnectAck(uint8 *rev_data)
{if(rev_data[1] != 2)return 1;if(rev_data[2] == 0 || rev_data[2] == 1)return rev_data[3];elsereturn 255;}//==========================================================
// 函数名称: MQTT_PacketSaveData
//
// 函数功能: 数据点上传组包
//
// 入口参数: devid:设备ID(可为空)
// send_buf:json缓存buf
// send_len:json总长
// type_bin_head:bin文件的消息头
// type:类型
//
// 返回参数: 0-成功 1-失败
//
// 说明:
//==========================================================
bool MQTT_PacketSaveData(const int8 *devid, int16 send_len, int8 *type_bin_head, uint8 type, MQTT_PACKET_STRUCTURE *mqttPacket)
{if(MQTT_PacketPublish(MQTT_PUBLISH_ID, "$dp", NULL, send_len + 3, MQTT_QOS_LEVEL1, 0, 1, mqttPacket) == 0){mqttPacket->_data[mqttPacket->_len++] = type; //类型mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(send_len);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(send_len);}elsereturn 1;return 0;}//==========================================================
// 函数名称: MQTT_UnPacketCmd
//
// 函数功能: 命令下发解包
//
// 入口参数: rev_data:接收的数据指针
// cmdid:cmdid-uuid
// req:命令
// req_len:命令长度
//
// 返回参数: 0-成功 其他-失败原因
//
// 说明:
//==========================================================
uint8 MQTT_UnPacketCmd(uint8 *rev_data, int8 **cmdid, int8 **req, uint16 *req_len)
{int8 *dataPtr = strchr((int8 *)rev_data + 6, '/'); //加6是跳过头信息uint32 remain_len = 0;if(dataPtr == NULL) //未找到'/'return 1;dataPtr++; //跳过'/'MQTT_ReadLength(rev_data + 1, 4, &remain_len); //读取剩余字节*cmdid = (int8 *)MQTT_MALLOC_BUFFER(37); //cmdid固定36字节,多分配一个结束符的位置if(*cmdid == NULL)return 2;memset(*cmdid, 0, 37); //全部清零memcpy(*cmdid, (const int8 *)dataPtr, 36); //复制cmdiddataPtr += 36;*req_len = remain_len - 44; //命令长度 = 剩余长度(remain_len) - 2 - 5($creq) - 1(\) - cmdid长度*req = (int8 *)MQTT_MALLOC_BUFFER(*req_len + 1); //分配命令长度+1if(*req == NULL){MQTT_FREE_BUFFER(*cmdid);return 3;}memset(*req, 0, *req_len + 1); //清零memcpy(*req, (const int8 *)dataPtr, *req_len); //复制命令return 0;}//==========================================================
// 函数名称: MQTT_PacketCmdResp
//
// 函数功能: 命令回复组包
//
// 入口参数: cmdid:cmdid
// req:命令
// mqttPacket:包指针
//
// 返回参数: 0-成功 1-失败
//
// 说明:
//==========================================================
bool MQTT_PacketCmdResp(const int8 *cmdid, const int8 *req, MQTT_PACKET_STRUCTURE *mqttPacket)
{uint16 cmdid_len = strlen(cmdid);uint16 req_len = strlen(req);bool status = 0;int8 *payload = MQTT_MALLOC_BUFFER(cmdid_len + 6);if(payload == NULL)return 1;memset(payload, 0, cmdid_len + 6);memcpy(payload, "$crsp/", 6);strncat(payload, cmdid, cmdid_len);if(MQTT_PacketPublish(MQTT_PUBLISH_ID, payload, req, strlen(req), MQTT_QOS_LEVEL0, 0, 1, mqttPacket) == 0)status = 0;elsestatus = 1;MQTT_FREE_BUFFER(payload);return status;}//==========================================================
// 函数名称: MQTT_PacketSubscribe
//
// 函数功能: Subscribe消息组包
//
// 入口参数: pkt_id:pkt_id
// qos:消息重发次数
// topics:订阅的消息
// topics_cnt:订阅的消息个数
// mqttPacket:包指针
//
// 返回参数: 0-成功 其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_PacketSubscribe(uint16 pkt_id, enum MqttQosLevel qos, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
{uint32 topic_len = 0, remain_len = 0;int16 len = 0;uint8 i = 0;if(pkt_id == 0)return 1;//计算topic长度-------------------------------------------------------------------------for(; i < topics_cnt; i++){if(topics[i] == NULL)return 2;topic_len += strlen(topics[i]);}//2 bytes packet id + topic filter(2 bytes topic + topic length + 1 byte reserve)------remain_len = 2 + 3 * topics_cnt + topic_len;//分配内存------------------------------------------------------------------------------MQTT_NewBuffer(mqttPacket, remain_len + 5);if(mqttPacket->_data == NULL)return 3;/*************************************固定头部***********************************************///固定头部----------------------头部消息-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_SUBSCRIBE << 4;//固定头部----------------------剩余长度值-----------------------------------------------len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);if(len < 0){MQTT_DeleteBuffer(mqttPacket);return 4;}elsemqttPacket->_len += len;/*************************************payload***********************************************///payload----------------------pkt_id---------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);//payload----------------------topic_name-----------------------------------------------for(i = 0; i < topics_cnt; i++){topic_len = strlen(topics[i]);mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);mqttPacket->_len += topic_len;mqttPacket->_data[mqttPacket->_len++] = qos & 0xFF;}return 0;}//==========================================================
// 函数名称: MQTT_UnPacketSubscrebe
//
// 函数功能: Subscribe的回复消息解包
//
// 入口参数: rev_data:接收到的信息
//
// 返回参数: 0-成功 其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_UnPacketSubscribe(uint8 *rev_data)
{uint8 result = 255;if(rev_data[2] == MOSQ_MSB(MQTT_SUBSCRIBE_ID) && rev_data[3] == MOSQ_LSB(MQTT_SUBSCRIBE_ID)){switch(rev_data[4]){case 0x00:case 0x01:case 0x02://MQTT Subscribe OKresult = 0;break;case 0x80://MQTT Subscribe Failedresult = 1;break;default://MQTT Subscribe UnKnown Errresult = 2;break;}}return result;}//==========================================================
// 函数名称: MQTT_PacketUnSubscribe
//
// 函数功能: UnSubscribe消息组包
//
// 入口参数: pkt_id:pkt_id
// qos:消息重发次数
// topics:订阅的消息
// topics_cnt:订阅的消息个数
// mqttPacket:包指针
//
// 返回参数: 0-成功 其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_PacketUnSubscribe(uint16 pkt_id, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
{uint32 topic_len = 0, remain_len = 0;int16 len = 0;uint8 i = 0;if(pkt_id == 0)return 1;//计算topic长度-------------------------------------------------------------------------for(; i < topics_cnt; i++){if(topics[i] == NULL)return 2;topic_len += strlen(topics[i]);}//2 bytes packet id, 2 bytes topic length + topic + 1 byte reserve---------------------remain_len = 2 + (topics_cnt << 1) + topic_len;//分配内存------------------------------------------------------------------------------MQTT_NewBuffer(mqttPacket, remain_len + 5);if(mqttPacket->_data == NULL)return 3;/*************************************固定头部***********************************************///固定头部----------------------头部消息-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_UNSUBSCRIBE << 4;//固定头部----------------------剩余长度值-----------------------------------------------len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);if(len < 0){MQTT_DeleteBuffer(mqttPacket);return 4;}elsemqttPacket->_len += len;/*************************************payload***********************************************///payload----------------------pkt_id---------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);//payload----------------------topic_name-----------------------------------------------for(i = 0; i < topics_cnt; i++){topic_len = strlen(topics[i]);mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);mqttPacket->_len += topic_len;}return 0;}//==========================================================
// 函数名称: MQTT_UnPacketUnSubscribe
//
// 函数功能: UnSubscribe的回复消息解包
//
// 入口参数: rev_data:接收到的信息
//
// 返回参数: 0-成功 其他-失败
//
// 说明:
//==========================================================
bool MQTT_UnPacketUnSubscribe(uint8 *rev_data)
{bool result = 1;if(rev_data[2] == MOSQ_MSB(MQTT_UNSUBSCRIBE_ID) && rev_data[3] == MOSQ_LSB(MQTT_UNSUBSCRIBE_ID)){result = 0;}return result;}//==========================================================
// 函数名称: MQTT_PacketPublish
//
// 函数功能: Pulish消息组包
//
// 入口参数: pkt_id:pkt_id
// topic:发布的topic
// payload:消息体
// payload_len:消息体长度
// qos:重发次数
// retain:离线消息推送
// own:
// mqttPacket:包指针
//
// 返回参数: 0-成功 其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_PacketPublish(uint16 pkt_id, const int8 *topic,const int8 *payload, uint32 payload_len,enum MqttQosLevel qos, int32 retain, int32 own,MQTT_PACKET_STRUCTURE *mqttPacket)
{uint32 total_len = 0, topic_len = 0;int32 len = 0;uint8 flags = 0;//pkt_id检查----------------------------------------------------------------------------if(pkt_id == 0)return 1;//$dp为系统上传数据点的指令--------------------------------------------------------------for(topic_len = 0; topic[topic_len] != '\0'; ++topic_len){if((topic[topic_len] == '#') || (topic[topic_len] == '+'))return 2;}//Publish消息---------------------------------------------------------------------------flags |= MQTT_PKT_PUBLISH << 4;//retain标志----------------------------------------------------------------------------if(retain)flags |= 0x01;//总长度--------------------------------------------------------------------------------total_len = topic_len + payload_len + 2;//qos级别--主要用于PUBLISH(发布态)消息的,保证消息传递的次数-----------------------------switch(qos){case MQTT_QOS_LEVEL0:flags |= MQTT_CONNECT_WILL_QOS0; //最多一次break;case MQTT_QOS_LEVEL1:flags |= 0x02; //最少一次total_len += 2;break;case MQTT_QOS_LEVEL2:flags |= 0x04; //只有一次total_len += 2;break;default:return 3;}//分配内存------------------------------------------------------------------------------MQTT_NewBuffer(mqttPacket, total_len + 3);if(mqttPacket->_data == NULL)return 4;memset(mqttPacket->_data, 0, total_len + 3);/*************************************固定头部***********************************************///固定头部----------------------头部消息-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = flags;//固定头部----------------------剩余长度值-----------------------------------------------len = MQTT_DumpLength(total_len, mqttPacket->_data + mqttPacket->_len);if(len < 0){MQTT_DeleteBuffer(mqttPacket);return 5;}elsemqttPacket->_len += len;/*************************************可变头部***********************************************///可变头部----------------------写入topic长度、topic-------------------------------------mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topic, topic_len);mqttPacket->_len += topic_len;if(qos != MQTT_QOS_LEVEL0){mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);}//可变头部----------------------写入payload----------------------------------------------if(payload != NULL){strncat((int8 *)mqttPacket->_data + mqttPacket->_len, payload, payload_len);mqttPacket->_len += payload_len;}return 0;}//==========================================================
// 函数名称: MQTT_UnPacketPublish
//
// 函数功能: Publish消息解包
//
// 入口参数: flags:MQTT相关标志信息
// pkt:指向可变头部
// size:固定头部中的剩余长度信息
//
// 返回参数: 0-成功 其他-失败原因
//
// 说明:
//==========================================================
uint8 MQTT_UnPacketPublish(uint8 *rev_data, int8 **topic, uint16 *topic_len, int8 **payload, uint16 *payload_len, uint8 *qos, uint16 *pkt_id)
{const int8 flags = rev_data[0] & 0x0F;uint8 *msgPtr;uint32 remain_len = 0;const int8 dup = flags & 0x08;*qos = (flags & 0x06) >> 1;msgPtr = rev_data + MQTT_ReadLength(rev_data + 1, 4, &remain_len) + 1;if(remain_len < 2 || flags & 0x01) //retainreturn 255;*topic_len = (uint16)msgPtr[0] << 8 | msgPtr[1];if(remain_len < *topic_len + 2)return 255;if(strstr((int8 *)msgPtr + 2, CMD_TOPIC_PREFIX) != NULL) //如果是命令下发return MQTT_PKT_CMD;switch(*qos){case MQTT_QOS_LEVEL0: // qos0 have no packet identifierif(0 != dup)return 255;*topic = MQTT_MALLOC_BUFFER(*topic_len + 1); //为topic分配内存if(*topic == NULL)return 255;memset(*topic, 0, *topic_len + 1);memcpy(*topic, (int8 *)msgPtr + 2, *topic_len); //复制数据*payload_len = remain_len - 2 - *topic_len; //为payload分配内存*payload = MQTT_MALLOC_BUFFER(*payload_len + 1);if(*payload == NULL) //如果失败{MQTT_FREE_BUFFER(*topic); //则需要把topic的内存释放掉return 255;}memset(*payload, 0, *payload_len + 1);memcpy(*payload, (int8 *)msgPtr + 2 + *topic_len, *payload_len);break;case MQTT_QOS_LEVEL1:case MQTT_QOS_LEVEL2:if(*topic_len + 2 > remain_len)return 255;*pkt_id = (uint16)msgPtr[*topic_len + 2] << 8 | msgPtr[*topic_len + 3];if(pkt_id == 0)return 255;*topic = MQTT_MALLOC_BUFFER(*topic_len + 1); //为topic分配内存if(*topic == NULL)return 255;memset(*topic, 0, *topic_len + 1);memcpy(*topic, (int8 *)msgPtr + 2, *topic_len); //复制数据*payload_len = remain_len - 4 - *topic_len;*payload = MQTT_MALLOC_BUFFER(*payload_len + 1); //为payload分配内存if(*payload == NULL) //如果失败{MQTT_FREE_BUFFER(*topic); //则需要把topic的内存释放掉return 255;}memset(*payload, 0, *payload_len + 1);memcpy(*payload, (int8 *)msgPtr + 4 + *topic_len, *payload_len);break;default:return 255;}if(strchr((int8 *)topic, '+') || strchr((int8 *)topic, '#'))return 255;return 0;}//==========================================================
// 函数名称: MQTT_PacketPublishAck
//
// 函数功能: Publish Ack消息组包
//
// 入口参数: pkt_id:packet id
// mqttPacket:包指针
//
// 返回参数: 0-成功 1-失败原因
//
// 说明: 当收到的Publish消息的QoS等级为1时,需要Ack回复
//==========================================================
bool MQTT_PacketPublishAck(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
{MQTT_NewBuffer(mqttPacket, 4);if(mqttPacket->_data == NULL)return 1;/*************************************固定头部***********************************************///固定头部----------------------头部消息-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBACK << 4;//固定头部----------------------剩余长度-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = 2;/*************************************可变头部***********************************************///可变头部----------------------pkt_id长度-----------------------------------------------mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;return 0;}//==========================================================
// 函数名称: MQTT_UnPacketPublishAck
//
// 函数功能: Publish Ack消息解包
//
// 入口参数: rev_data:收到的数据
//
// 返回参数: 0-成功 1-失败原因
//
// 说明:
//==========================================================
bool MQTT_UnPacketPublishAck(uint8 *rev_data)
{if(rev_data[1] != 2)return 1;if(rev_data[2] == MOSQ_MSB(MQTT_PUBLISH_ID) && rev_data[3] == MOSQ_LSB(MQTT_PUBLISH_ID))return 0;elsereturn 1;}//==========================================================
// 函数名称: MQTT_PacketPublishRec
//
// 函数功能: Publish Rec消息组包
//
// 入口参数: pkt_id:packet id
// mqttPacket:包指针
//
// 返回参数: 0-成功 1-失败原因
//
// 说明: 当收到的Publish消息的QoS等级为2时,先收到rec
//==========================================================
bool MQTT_PacketPublishRec(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
{MQTT_NewBuffer(mqttPacket, 4);if(mqttPacket->_data == NULL)return 1;/*************************************固定头部***********************************************///固定头部----------------------头部消息-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBREC << 4;//固定头部----------------------剩余长度-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = 2;/*************************************可变头部***********************************************///可变头部----------------------pkt_id长度-----------------------------------------------mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;return 0;}//==========================================================
// 函数名称: MQTT_UnPacketPublishRec
//
// 函数功能: Publish Rec消息解包
//
// 入口参数: rev_data:接收到的数据
//
// 返回参数: 0-成功 1-失败
//
// 说明:
//==========================================================
bool MQTT_UnPacketPublishRec(uint8 *rev_data)
{if(rev_data[1] != 2)return 1;if(rev_data[2] == MOSQ_MSB(MQTT_PUBLISH_ID) && rev_data[3] == MOSQ_LSB(MQTT_PUBLISH_ID))return 0;elsereturn 1;}//==========================================================
// 函数名称: MQTT_PacketPublishRel
//
// 函数功能: Publish Rel消息组包
//
// 入口参数: pkt_id:packet id
// mqttPacket:包指针
//
// 返回参数: 0-成功 1-失败原因
//
// 说明: 当收到的Publish消息的QoS等级为2时,先收到rec,再回复rel
//==========================================================
bool MQTT_PacketPublishRel(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
{MQTT_NewBuffer(mqttPacket, 4);if(mqttPacket->_data == NULL)return 1;/*************************************固定头部***********************************************///固定头部----------------------头部消息-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBREL << 4 | 0x02;//固定头部----------------------剩余长度-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = 2;/*************************************可变头部***********************************************///可变头部----------------------pkt_id长度-----------------------------------------------mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;return 0;}//==========================================================
// 函数名称: MQTT_UnPacketPublishRel
//
// 函数功能: Publish Rel消息解包
//
// 入口参数: rev_data:接收到的数据
//
// 返回参数: 0-成功 1-失败
//
// 说明:
//==========================================================
bool MQTT_UnPacketPublishRel(uint8 *rev_data, uint16 pkt_id)
{if(rev_data[1] != 2)return 1;if(rev_data[2] == MOSQ_MSB(pkt_id) && rev_data[3] == MOSQ_LSB(pkt_id))return 0;elsereturn 1;}//==========================================================
// 函数名称: MQTT_PacketPublishComp
//
// 函数功能: Publish Comp消息组包
//
// 入口参数: pkt_id:packet id
// mqttPacket:包指针
//
// 返回参数: 0-成功 1-失败原因
//
// 说明: 当收到的Publish消息的QoS等级为2时,先收到rec,再回复rel
//==========================================================
bool MQTT_PacketPublishComp(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
{MQTT_NewBuffer(mqttPacket, 4);if(mqttPacket->_data == NULL)return 1;/*************************************固定头部***********************************************///固定头部----------------------头部消息-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBREL << 4;//固定头部----------------------剩余长度-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = 2;/*************************************可变头部***********************************************///可变头部----------------------pkt_id长度-----------------------------------------------mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;return 0;}//==========================================================
// 函数名称: MQTT_UnPacketPublishComp
//
// 函数功能: Publish Comp消息解包
//
// 入口参数: rev_data:接收到的数据
//
// 返回参数: 0-成功 1-失败
//
// 说明:
//==========================================================
bool MQTT_UnPacketPublishComp(uint8 *rev_data)
{if(rev_data[1] != 2)return 1;if(rev_data[2] == MOSQ_MSB(MQTT_PUBLISH_ID) && rev_data[3] == MOSQ_LSB(MQTT_PUBLISH_ID))return 0;elsereturn 1;}//==========================================================
// 函数名称: MQTT_PacketPing
//
// 函数功能: 心跳请求组包
//
// 入口参数: mqttPacket:包指针
//
// 返回参数: 0-成功 1-失败
//
// 说明:
//==========================================================
bool MQTT_PacketPing(MQTT_PACKET_STRUCTURE *mqttPacket)
{MQTT_NewBuffer(mqttPacket, 2);if(mqttPacket->_data == NULL)return 1;/*************************************固定头部***********************************************///固定头部----------------------头部消息-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PINGREQ << 4;//固定头部----------------------剩余长度-------------------------------------------------mqttPacket->_data[mqttPacket->_len++] = 0;return 0;}
2、mqttkit.h:
#ifndef __MQTT_KIT_H__
#define __MQTT_KIT_H__#include "common.h"
#include "stdbool.h"#define MQTT_MALLOC_BUFFER(buffer_size) pvPortCalloc(1, buffer_size);
#define MQTT_FREE_BUFFER(buffer)\
{\if (buffer) {\vPortFree(buffer);\buffer = NULL;\}\
}\#define MOSQ_MSB(A) (uint8)((A & 0xFF00) >> 8)
#define MOSQ_LSB(A) (uint8)(A & 0x00FF)/*--------------------------------内存分配方案标志--------------------------------*/
#define MEM_FLAG_NULL 0
#define MEM_FLAG_ALLOC 1
#define MEM_FLAG_STATIC 2typedef struct Buffer
{uint8 *_data; //协议数据uint32 _len; //写入的数据长度uint32 _size; //缓存总大小uint8 _memFlag; //内存使用的方案:0-未分配 1-使用的动态分配 2-使用的固定内存
} MQTT_PACKET_STRUCTURE;/*--------------------------------固定头部消息类型--------------------------------*/
enum MqttPacketType
{MQTT_PKT_CONNECT = 1, /**< 连接请求数据包 */MQTT_PKT_CONNACK, /**< 连接确认数据包 */MQTT_PKT_PUBLISH, /**< 发布数据数据包 */MQTT_PKT_PUBACK, /**< 发布确认数据包 */MQTT_PKT_PUBREC, /**< 发布数据已接收数据包,Qos 2时,回复MQTT_PKT_PUBLISH */MQTT_PKT_PUBREL, /**< 发布数据释放数据包, Qos 2时,回复MQTT_PKT_PUBREC */MQTT_PKT_PUBCOMP, /**< 发布完成数据包, Qos 2时,回复MQTT_PKT_PUBREL */MQTT_PKT_SUBSCRIBE, /**< 订阅数据包 */MQTT_PKT_SUBACK, /**< 订阅确认数据包 */MQTT_PKT_UNSUBSCRIBE, /**< 取消订阅数据包 */MQTT_PKT_UNSUBACK, /**< 取消订阅确认数据包 */MQTT_PKT_PINGREQ, /**< ping 数据包 */MQTT_PKT_PINGRESP, /**< ping 响应数据包 */MQTT_PKT_DISCONNECT, /**< 断开连接数据包 *///新增MQTT_PKT_CMD /**< 命令下发数据包 */};/*--------------------------------MQTT QOS等级--------------------------------*/
enum MqttQosLevel
{MQTT_QOS_LEVEL0, /**< 最多发送一次 */MQTT_QOS_LEVEL1, /**< 最少发送一次 */MQTT_QOS_LEVEL2 /**< 只发送一次 */
};/*--------------------------------MQTT 连接请求标志位,内部使用--------------------------------*/
enum MqttConnectFlag
{MQTT_CONNECT_CLEAN_SESSION = 0x02,MQTT_CONNECT_WILL_FLAG = 0x04,MQTT_CONNECT_WILL_QOS0 = 0x00,MQTT_CONNECT_WILL_QOS1 = 0x08,MQTT_CONNECT_WILL_QOS2 = 0x10,MQTT_CONNECT_WILL_RETAIN = 0x20,MQTT_CONNECT_PASSORD = 0x40,MQTT_CONNECT_USER_NAME = 0x80
};/*--------------------------------消息的packet ID,可自定义--------------------------------*/
#define MQTT_PUBLISH_ID 10
#define MQTT_SUBSCRIBE_ID 20
#define MQTT_UNSUBSCRIBE_ID 30/*--------------------------------删包--------------------------------*/
void MQTT_DeleteBuffer(MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------解包--------------------------------*/
uint8 MQTT_UnPacketRecv(uint8 *dataPtr);/*--------------------------------登录组包--------------------------------*/
uint8 MQTT_PacketConnect(const int8 *user, const int8 *password, const int8 *devid,uint16 cTime, bool clean_session, bool qos,const int8 *will_topic, const int8 *will_msg, int32 will_retain,MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------断开连接组包--------------------------------*/
bool MQTT_PacketDisConnect(MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------连接响应解包--------------------------------*/
uint8 MQTT_UnPacketConnectAck(uint8 *rev_data);/*--------------------------------数据点上传组包--------------------------------*/
bool MQTT_PacketSaveData(const int8 *devid, int16 send_len, int8 *type_bin_head, uint8 type, MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------命令下发解包--------------------------------*/
uint8 MQTT_UnPacketCmd(uint8 *rev_data, int8 **cmdid, int8 **req, uint16 *req_len);/*--------------------------------命令回复组包--------------------------------*/
bool MQTT_PacketCmdResp(const int8 *cmdid, const int8 *req, MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------订阅主题组包--------------------------------*/
uint8 MQTT_PacketSubscribe(uint16 pkt_id, enum MqttQosLevel qos, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------订阅主题回复解包--------------------------------*/
uint8 MQTT_UnPacketSubscribe(uint8 *rev_data);/*--------------------------------取消订阅组包--------------------------------*/
uint8 MQTT_PacketUnSubscribe(uint16 pkt_id, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------取消订阅回复解包--------------------------------*/
bool MQTT_UnPacketUnSubscribe(uint8 *rev_data);/*--------------------------------发布主题组包--------------------------------*/
uint8 MQTT_PacketPublish(uint16 pkt_id, const int8 *topic,const int8 *payload, uint32 payload_len,enum MqttQosLevel qos, int32 retain, int32 own,MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------发布消息回复解包--------------------------------*/
uint8 MQTT_UnPacketPublish(uint8 *rev_data, int8 **topic, uint16 *topic_len, int8 **payload, uint16 *payload_len, uint8 *qos, uint16 *pkt_id);/*--------------------------------发布消息的Ack组包--------------------------------*/
bool MQTT_PacketPublishAck(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------发布消息的Ack解包--------------------------------*/
bool MQTT_UnPacketPublishAck(uint8 *rev_data);/*--------------------------------发布消息的Rec组包--------------------------------*/
bool MQTT_PacketPublishRec(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------发布消息的Rec解包--------------------------------*/
bool MQTT_UnPacketPublishRec(uint8 *rev_data);/*--------------------------------发布消息的Rel组包--------------------------------*/
bool MQTT_PacketPublishRel(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------发布消息的Rel解包--------------------------------*/
bool MQTT_UnPacketPublishRel(uint8 *rev_data, uint16 pkt_id);/*--------------------------------发布消息的Comp组包--------------------------------*/
bool MQTT_PacketPublishComp(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);/*--------------------------------发布消息的Comp解包--------------------------------*/
bool MQTT_UnPacketPublishComp(uint8 *rev_data);/*--------------------------------心跳请求组包--------------------------------*/
bool MQTT_PacketPing(MQTT_PACKET_STRUCTURE *mqttPacket);#endif
3、common.h:
#ifndef __COMMON_H__
#define __COMMON_H__typedef unsigned char uint8;
typedef char int8;
typedef unsigned short uint16;
typedef short int16;
typedef unsigned int uint32;
typedef int int32;
typedef unsigned long int uint64;
typedef long int int64;#endif /* __COMMON_H__ */
4、data_stream.c:
/*************************************************************************************************************************************************************************************** 文件名: dStream.c** 作者: 张继瑞** 日期: 2017-09-11** 版本: V1.1** 说明: cJson格式数据流通用封装** 修改记录: V1.1:修复当数据流flag全为0时封装错误的bug。************************************************************************************************************************************************************************************
**///C库
#include "string.h"
#include "stdio.h"
//协议封装文件
#include "data_stream.h"//==========================================================
// 函数名称: DSTREAM_toString
//
// 函数功能: 将数值转为字符串
//
// 入口参数: StreamArray:数据流
// buf:转换后的缓存
// pos:数据流中的哪个数据
// bufLen:缓存长度
//
// 返回参数: 无
//
// 说明:
//==========================================================
void DSTREAM_toString(DATA_STREAM *streamArray, char *buf, unsigned short pos, unsigned short bufLen)
{memset(buf, 0, bufLen);switch((unsigned char)streamArray[pos].dataType){case TYPE_BOOL:snprintf(buf, bufLen, "%d", *(bool *)streamArray[pos].dataPoint);break;case TYPE_CHAR:snprintf(buf, bufLen, "%d", *(signed char *)streamArray[pos].dataPoint);break;case TYPE_UCHAR:snprintf(buf, bufLen, "%d", *(unsigned char *)streamArray[pos].dataPoint);break;case TYPE_SHORT:snprintf(buf, bufLen, "%d", *(signed short *)streamArray[pos].dataPoint);break;case TYPE_USHORT:snprintf(buf, bufLen, "%d", *(unsigned short *)streamArray[pos].dataPoint);break;case TYPE_INT:snprintf(buf, bufLen, "%d", *(signed int *)streamArray[pos].dataPoint);break;case TYPE_UINT:snprintf(buf, bufLen, "%d", *(unsigned int *)streamArray[pos].dataPoint);break;case TYPE_LONG:snprintf(buf, bufLen, "%ld", *(signed long *)streamArray[pos].dataPoint);break;case TYPE_ULONG:snprintf(buf, bufLen, "%ld", *(unsigned long *)streamArray[pos].dataPoint);break;case TYPE_FLOAT:snprintf(buf, bufLen, "%f", *(float *)streamArray[pos].dataPoint);break;case TYPE_DOUBLE:snprintf(buf, bufLen, "%f", *(double *)streamArray[pos].dataPoint);break;case TYPE_GPS:snprintf(buf, bufLen, "{\"lon\":%s,\"lat\":%s}", (char *)streamArray[pos].dataPoint, (char *)(streamArray[pos].dataPoint) + 16);break;case TYPE_STRING:snprintf(buf, bufLen, "\"%s\"", (char *)streamArray[pos].dataPoint);break;}}//==========================================================
// 函数名称: DSTREAM_GetDataStream_Body
//
// 函数功能: 获取数据流格式消息体
//
// 入口参数: type:格式类型
// streamArray:数据流结构
// streamArrayCnt:数据流个数
// buffer:缓存
// maxLen:最大缓存长度
// offset:偏移
//
// 返回参数: Body的长度,0-失败
//
// 说明:
//==========================================================
short DSTREAM_GetDataStream_Body(unsigned char type, DATA_STREAM *streamArray, unsigned short streamArrayCnt, unsigned char *buffer, short maxLen, short offset)
{short count = 0, numBytes = 0; //count-循环计数。numBytes-记录数据装载长度char stream_buf[96];char data_buf[48];short cBytes = 0;unsigned char *dataPtr = buffer + offset;for(; count < streamArrayCnt; count++){if(streamArray[count].flag)break;}if(count == streamArrayCnt)return -1;count = 0;maxLen -= 1; //预留结束符位置switch(type){case FORMAT_TYPE1:if(numBytes + 16 < maxLen){memcpy(dataPtr, "{\"datastreams\":[", 16);numBytes += 16;}elsereturn 0;for(; count < streamArrayCnt; count++){if(streamArray[count].flag) //如果使能发送标志位{DSTREAM_toString(streamArray, data_buf, count, sizeof(data_buf));snprintf(stream_buf, sizeof(stream_buf), "{\"id\":\"%s\",\"datapoints\":[{\"value\":%s}]},", streamArray[count].name, data_buf);cBytes = strlen(stream_buf);if(cBytes >= maxLen - numBytes){//UsartPrintf(USART_DEBUG, "dStream_Get_dFormatBody Load Failed %d\r\n", numBytes);return 0;}memcpy(dataPtr + numBytes, stream_buf, cBytes);numBytes += cBytes;if(numBytes > maxLen) //内存长度判断return 0;}}dataPtr[numBytes] = '\0'; //将最后的','替换为结束符if(numBytes + 1 <= maxLen){memcpy(dataPtr + numBytes - 1, "]}", 2);numBytes++;}elsereturn 0;break;case FORMAT_TYPE3:if(numBytes + 1 < maxLen){memcpy(dataPtr, "{", 1);numBytes++;}elsereturn 0;for(; count < streamArrayCnt; count++){if(streamArray[count].flag) //如果使能发送标志位{DSTREAM_toString(streamArray, data_buf, count, sizeof(data_buf));snprintf(stream_buf, sizeof(stream_buf), "\"%s\":%s,", streamArray[count].name, data_buf);cBytes = strlen(stream_buf);if(cBytes >= maxLen - numBytes){//UsartPrintf(USART_DEBUG, "dStream_Get_dFormatBody Load Failed %d\r\n", numBytes);return 0;}memcpy(dataPtr + numBytes, stream_buf, cBytes);numBytes += cBytes;if(numBytes > maxLen) //内存长度判断return 0;}}dataPtr[numBytes] = '\0'; //将最后的','替换为结束符memcpy(dataPtr + numBytes - 1, "}", 1);break;case FORMAT_TYPE4:if(numBytes + 1 < maxLen){memcpy(dataPtr, "{", 1);numBytes++;}elsereturn 0;for(; count < streamArrayCnt; count++){if(streamArray[count].flag) //如果使能发送标志位{DSTREAM_toString(streamArray, data_buf, count, sizeof(data_buf));snprintf(stream_buf, sizeof(stream_buf), "\"%s\":{\"2016-08-10T12:31:17\":%s},", streamArray[count].name, data_buf);cBytes = strlen(stream_buf);if(cBytes >= maxLen - numBytes){//UsartPrintf(USART_DEBUG, "dStream_Get_dFormatBody Load Failed %d\r\n", numBytes);return 0;}memcpy(dataPtr + numBytes, stream_buf, cBytes);numBytes += cBytes;if(numBytes > maxLen) //内存长度判断return 0;}}dataPtr[numBytes] = '\0'; //将最后的','替换为结束符memcpy(dataPtr + numBytes - 1, "}", 1);break;case FORMAT_TYPE5:if(numBytes + 2 < maxLen){memcpy(dataPtr, ",;", 2);numBytes += 2;}elsereturn 0;for(; count < streamArrayCnt; count++){if(streamArray[count].flag && streamArray[count].dataType != TYPE_GPS) //如果使能发送标志位 格式5不支持GPS{DSTREAM_toString(streamArray, data_buf, count, sizeof(data_buf));snprintf(stream_buf, sizeof(stream_buf), "%s,%s;", streamArray[count].name, data_buf);cBytes = strlen(stream_buf);if(cBytes >= maxLen - numBytes - 2){//UsartPrintf(USART_DEBUG, "dStream_Get_dFormatBody Load Failed %d\r\n", numBytes);return 0;}memcpy(dataPtr + numBytes, stream_buf, cBytes);numBytes += cBytes;if(numBytes > maxLen) //内存长度判断return 0;}}break;default:break;}//UsartPrintf(USART_DEBUG, "Body Len: %d\r\n", numBytes);return numBytes;}//==========================================================
// 函数名称: DSTREAM_GetDataStream_Body_Measure
//
// 函数功能: 测量当前使能的数据流长度
//
// 入口参数: type:格式类型
// streamArray:数据流结构
// streamArrayCnt:数据流个数
// flag:1-测量全部数据流长度 0-测量当前需要发送的数据流长度
//
// 返回参数: Body的长度
//
// 说明:
//==========================================================
short DSTREAM_GetDataStream_Body_Measure(unsigned char type, DATA_STREAM *streamArray, unsigned short streamArrayCnt, bool flag)
{short count = 0, numBytes = 0; //count-循环计数。numBytes-记录数据装载长度char stream_buf[96];char data_buf[48];for(; count < streamArrayCnt; count++){if(streamArray[count].flag)break;}if(count == streamArrayCnt)return -1;count = 0;switch(type){case FORMAT_TYPE1:numBytes += 16;for(; count < streamArrayCnt; count++){if(streamArray[count].flag || flag){DSTREAM_toString(streamArray, data_buf, count, sizeof(data_buf));snprintf(stream_buf, sizeof(stream_buf), "{\"id\":\"%s\",\"datapoints\":[{\"value\":%s}]},", streamArray[count].name, data_buf);numBytes += strlen(stream_buf);}}numBytes += 1;break;case FORMAT_TYPE3:numBytes++;for(; count < streamArrayCnt; count++){if(streamArray[count].flag || flag){DSTREAM_toString(streamArray, data_buf, count, sizeof(data_buf));snprintf(stream_buf, sizeof(stream_buf), "\"%s\":%s,", streamArray[count].name, data_buf);numBytes += strlen(stream_buf);}}break;case FORMAT_TYPE4:numBytes++;for(; count < streamArrayCnt; count++){if(streamArray[count].flag || flag){DSTREAM_toString(streamArray, data_buf, count, sizeof(data_buf));snprintf(stream_buf, sizeof(stream_buf), "\"%s\":{\"2016-08-10T12:31:17\":%s},", streamArray[count].name, data_buf);numBytes += strlen(stream_buf);}}break;case FORMAT_TYPE5:numBytes += 2;for(; count < streamArrayCnt; count++){if(streamArray[count].flag || flag){DSTREAM_toString(streamArray, data_buf, count, sizeof(data_buf));snprintf(stream_buf, sizeof(stream_buf), "%s,%s;", streamArray[count].name, data_buf);numBytes += strlen(stream_buf);}}break;default:break;}return numBytes;}
5、data_stream.h:
#ifndef __DATA_STREAM_H__
#define __DATA_STREAM_H__#include "system.h"typedef enum
{TYPE_BOOL = 0,TYPE_CHAR,TYPE_UCHAR,TYPE_SHORT,TYPE_USHORT,TYPE_INT,TYPE_UINT,TYPE_LONG,TYPE_ULONG,TYPE_FLOAT,TYPE_DOUBLE,TYPE_GPS,TYPE_STRING,
} DATA_TYPE;typedef struct
{char *name;void *dataPoint;DATA_TYPE dataType;bool flag;} DATA_STREAM;typedef enum
{FORMAT_TYPE1 = 1,FORMAT_TYPE2,FORMAT_TYPE3,FORMAT_TYPE4,FORMAT_TYPE5} FORMAT_TYPE;short DSTREAM_GetDataStream_Body(unsigned char type, DATA_STREAM *streamArray, unsigned short streamArrayCnt, unsigned char *buffer, short maxLen, short offset);
short DSTREAM_GetDataStream_Body_Measure(unsigned char type, DATA_STREAM *streamArray, unsigned short streamArrayCnt, bool flag);#endif
总结
更多关于MQTT协议的使用可以看OneNET平台的社区帖子或开发文档。本文是直接在无线通讯模块上使用的是socket接口完成的,也就是把无线通讯模块当做一个MCU来使用,如果你只有无线通讯模块,而没有这个模块的SDK代码,只是有一些AT命令来控制这个模块的通信,例如ESP8266模块,GSM模块等等,本文也适用,只需要将socket部分的连接、发送、接收等函数处理修改为自己的代码即可。
本文还有很多不足之处,读者若有自己的看法和建议,评论留言指正指出,谢谢!