1、MQTT程序分层
1.1、MQTT客户端工作流程
(1)连接MQTT服务器。
(2)客户端向服务器发送订阅主题。
(3)客户端等待MQTT的消息。
(4)客户端向服务器发送消息。
2.2、MQTT程序结构
- APP层
- while循环或一个进程中:等待消息,处理消息;
发送消息(如检测到着火,向服务端发送消息)
- while循环或一个进程中:等待消息,处理消息;
- 协议层:MQTT(或其他的SSH、FTP)
- MQTT的内部实现
- 驱动层
- MQTT把这一层看作平台,会提供多线程、定时器(涉及心跳包)、网卡收发
- 提供相应网络模块的驱动程序
- 移植一个操作系统(FreeRTOS、RTthreed、Linux)
2、源码浏览
从示例中emqx平台的代码开始分析,主要包括以下方面
- 连接服务器
- 创建线程
- 发布消息
- 订阅消息
- 接收订阅的消息并处理
2.1、连接服务器
(1)打开mqttclient\example\emqx\emqx.c文件。
(2)浏览main函数。
(3)函数调用过程
main()client = mqtt_lease(); // 客户端结构体的内存分配mqtt_set_port(client, "1883"); // 设置要连接的服务器的端口mqtt_set_host(client, "120.25.213.14"); // 设置要连接的服务器IPmqtt_connect(client); // 服务器连接 mqtt_connect_with_results(c); // 以阻塞模式连接服务器,等待连接结果// 网络初始化rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);// 网络连接rc = network_connect(c->mqtt_network);// nettype:网络类型;TCP连接nettype_tcp_connect(n); // 这个需要程序自己提供,平台相关函数platform_net_socket_connect();
(4)platform_net_socket_connect()函数功能及内容
- xx
- xx
2.2、创建发布消息线程
(1)打开mqttclient\example\emqx\emqx.c文件。
(2)浏览main函数。
(3)函数调用过程
mainres = pthread_create(&thread1, NULL, mqtt_publish_thread, client);mqtt_publish_thread(); // 发布消息线程函数// 1、构造要发送的消息 mqtt_message_t msg;memset(&msg, 0, sizeof(msg));msg.payload = (void *) buf;mqtt_publish(client, "topic1", &msg); // 发布消息// 2、根据平台相关的函数发送数据包mqtt_send_packet();network_write();nettype_tcp_write();// 这个函数需要自己提供,平台相关函数platform_net_socket_write_timeout();
(4)platform_net_socket_write_timeout()函数功能及内容:
- xx
- xx
2.3、mqtt_yield_thread线程
- 接收订阅的消息
- 发送心跳包
- 处理错误
(1)打开mqttclient\example\emqx\emqx.c文件。
(2)浏览main函数。
(3)核心调用过程
mainmqtt_connect(client); // 服务器连接mqtt_connect_with_results(c); // 以阻塞模式连接服务器,等待连接结果 // 网络初始化rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);// 网络连接rc = network_connect(c->mqtt_network);/* send connect packet */// 发送连接包if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)goto exit;// 等待回应if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {}/* connect success, and need init mqtt thread */// 连接成功就初始化线程mqtt_yield_threadc->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, ...);
2.4、处理订阅消息函数
(1)打开mqttclient\example\emqx\emqx.c文件。
(2)浏览main函数。
- 订阅消息函数: mqtt_subscribe();
// 订阅消息且指定处理函数为topic1_handler
mqtt_subscribe(client, "topic1", QOS0, topic1_handler);// 没有指定处理函数的会调用默认处理函数
mqtt_subscribe(client, "topic2", QOS1, NULL);
mqtt_subscribe(client, "topic3", QOS2, NULL);
(3)订阅消息处理函数(default_msg_handler)所在位置:
// int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
mqtt_subscribe(client, "topic2", QOS1, NULL); // 订阅主题topic2// 定义消息处理结构体,结构体内容见下段代码message_handlers_t *msg_handler = NULL; // 如果未指定handler(处理消息的函数指针),则使用默认的处理程序if (NULL == handler)handler = default_msg_handler; // 将消息处理结构体记录到一个链表中:包含主题是啥,接收订阅消息处理函数是啥msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);
消息处理结构体
typedef struct message_handlers {mqtt_list_t list;mqtt_qos_t qos;const char* topic_filter; // 记录消息的主题message_handler_t handler; // 函数指针,指向处理消息的函数
} message_handlers_t;
2.5、订阅消息的接收
(1)因为消息何时到来是不知道的,所以消息的接收是放在线程中不断查询的。(或者在中断中接收到去通知线程)
(2)找到mqtt_yield_thread线程函数。
(3)消息接收到调用消息处理函数的流程:
mqtt_yield_thread() // 线程函数while(1){rc = mqtt_yield(c, c->mqtt_cmd_timeout);// 处理MQTT报文rc = mqtt_packet_handle(c, &timer);// 读取MQTT报文rc = mqtt_read_packet(c, &packet_type, timer);// 根据报文类型调用如下函数rc = mqtt_publish_packet_handle(c, timer); // 服务器发布的消息mqtt_deliver_message(c, &topic_name, &msg);// 获取MQTT消息处理程序msg_handler = mqtt_get_msg_handler(c, topic_name);// 传递消息给处理函数;参数:客户端,消息数据 msg_handler->handler(c, &md); }