六、MQTT源码简单浏览

1、MQTT程序分层

1.1、MQTT客户端工作流程

(1)连接MQTT服务器。

(2)客户端向服务器发送订阅主题。

(3)客户端等待MQTT的消息。

(4)客户端向服务器发送消息。

2.2、MQTT程序结构

  • APP层
    • while循环或一个进程中:等待消息,处理消息;
      发送消息(如检测到着火,向服务端发送消息)
  • 协议层:MQTT(或其他的SSH、FTP)
    • MQTT的内部实现
  • 驱动层
    • MQTT把这一层看作平台,会提供多线程、定时器(涉及心跳包)、网卡收发
    • 提供相应网络模块的驱动程序
    • 移植一个操作系统(FreeRTOS、RTthreed、Linux)

2、源码浏览

从示例中emqx平台的代码开始分析,主要包括以下方面

  • 连接服务器
  • 创建线程
  • 发布消息
  • 订阅消息
  • 接收订阅的消息并处理

2.1、连接服务器

(1)打开mqttclient\example\emqx\emqx.c文件。

(2)浏览main函数。

(3)函数调用过程

main()client = mqtt_lease();                       // 客户端结构体的内存分配mqtt_set_port(client, "1883");               // 设置要连接的服务器的端口mqtt_set_host(client, "120.25.213.14");      // 设置要连接的服务器IPmqtt_connect(client);                        // 服务器连接 mqtt_connect_with_results(c);            // 以阻塞模式连接服务器,等待连接结果// 网络初始化rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);// 网络连接rc = network_connect(c->mqtt_network);// nettype:网络类型;TCP连接nettype_tcp_connect(n);    // 这个需要程序自己提供,平台相关函数platform_net_socket_connect();

(4)platform_net_socket_connect()函数功能及内容

  • xx
  • xx

2.2、创建发布消息线程

(1)打开mqttclient\example\emqx\emqx.c文件。

(2)浏览main函数。

(3)函数调用过程

mainres = pthread_create(&thread1, NULL, mqtt_publish_thread, client);mqtt_publish_thread();        // 发布消息线程函数// 1、构造要发送的消息 mqtt_message_t msg;memset(&msg, 0, sizeof(msg));msg.payload = (void *) buf;mqtt_publish(client, "topic1", &msg);  // 发布消息// 2、根据平台相关的函数发送数据包mqtt_send_packet();network_write();nettype_tcp_write();// 这个函数需要自己提供,平台相关函数platform_net_socket_write_timeout();

(4)platform_net_socket_write_timeout()函数功能及内容:

  • xx
  • xx

2.3、mqtt_yield_thread线程

  • 接收订阅的消息
  • 发送心跳包
  • 处理错误

(1)打开mqttclient\example\emqx\emqx.c文件。

(2)浏览main函数。

(3)核心调用过程

mainmqtt_connect(client);             // 服务器连接mqtt_connect_with_results(c); // 以阻塞模式连接服务器,等待连接结果 // 网络初始化rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);// 网络连接rc = network_connect(c->mqtt_network);/* send connect packet */// 发送连接包if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)goto exit;// 等待回应if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {}/* connect success, and need init mqtt thread */// 连接成功就初始化线程mqtt_yield_threadc->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, ...);

2.4、处理订阅消息函数

(1)打开mqttclient\example\emqx\emqx.c文件。

(2)浏览main函数。

  • 订阅消息函数: mqtt_subscribe();
// 订阅消息且指定处理函数为topic1_handler
mqtt_subscribe(client, "topic1", QOS0, topic1_handler);// 没有指定处理函数的会调用默认处理函数
mqtt_subscribe(client, "topic2", QOS1, NULL);
mqtt_subscribe(client, "topic3", QOS2, NULL);

(3)订阅消息处理函数(default_msg_handler)所在位置:

// int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
mqtt_subscribe(client, "topic2", QOS1, NULL); // 订阅主题topic2// 定义消息处理结构体,结构体内容见下段代码message_handlers_t *msg_handler = NULL;   // 如果未指定handler(处理消息的函数指针),则使用默认的处理程序if (NULL == handler)handler = default_msg_handler; // 将消息处理结构体记录到一个链表中:包含主题是啥,接收订阅消息处理函数是啥msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);  

消息处理结构体

typedef struct message_handlers {mqtt_list_t         list;mqtt_qos_t          qos;const char*         topic_filter;   // 记录消息的主题message_handler_t   handler;        // 函数指针,指向处理消息的函数
} message_handlers_t;

2.5、订阅消息的接收

(1)因为消息何时到来是不知道的,所以消息的接收是放在线程中不断查询的。(或者在中断中接收到去通知线程)

(2)找到mqtt_yield_thread线程函数。

(3)消息接收到调用消息处理函数的流程:

mqtt_yield_thread()  // 线程函数while(1){rc = mqtt_yield(c, c->mqtt_cmd_timeout);// 处理MQTT报文rc = mqtt_packet_handle(c, &timer);// 读取MQTT报文rc = mqtt_read_packet(c, &packet_type, timer);// 根据报文类型调用如下函数rc = mqtt_publish_packet_handle(c, timer);  // 服务器发布的消息mqtt_deliver_message(c, &topic_name, &msg);// 获取MQTT消息处理程序msg_handler = mqtt_get_msg_handler(c, topic_name);// 传递消息给处理函数;参数:客户端,消息数据    msg_handler->handler(c, &md);   }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/713861.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[法规规划|方案实操]数据资产入表,城投将获融资新渠道

2023年8月,财政部发布了《企业数据资源相关会计处理暂行规定》,并从2024年1月1日开始实施,标志着数据资产正式纳入企业的资产负债表。这一举措被视为数据资产从理论走向实践的重大一步。 数据资产入表对城投运营模式的影响 随着全球经济格局…

Vue3速成

文章目录 day 11. 创建vue3工程3. 响应式数据4. 计算属性 day 25. watch 监视6. watchEffect7. 标签的ref属性8. 回顾TS中的接口_泛型_自定义类型 day 1 1. 创建vue3工程 相关代码如下: ## 创建vue工程 npm create vuelastest## 安装node_modules npm install //…

JSON 文件里的 “$schema” 是干什么用的?

最近我在做一些前端项目,我发现有的配置文件,比如 .prettierrc.json 或者 tsconfig.json 里面都会看到一个 $schema 字段,有点好奇,就查了一下。 什么是 JSON Schema JSON Schema是一种基于JSON (JavaScript Object Notation) 的…

【Leetcode】2369. 检查数组是否存在有效划分

文章目录 题目思路代码结果 题目 题目链接 给你一个下标从 0 开始的整数数组 nums ,你必须将数组划分为一个或多个 连续 子数组。 如果获得的这些子数组中每个都能满足下述条件 之一 ,则可以称其为数组的一种 有效 划分: 子数组 恰 由 2 个…

MATLAB算法实战应用案例精讲-【图像处理】三维重建(最终篇)

目录 前言 相机定标和三维重建 针孔相机模型和变形 三维成像 一、机器视觉系统组成

大数据智能化-长视频领域

随着数字化时代的到来,长视频领域的发展迎来了新的机遇和挑战。在这一背景下,大数据智能化技术的应用成为长视频行业提升用户体验、优化运营管理的重要手段之一。本文将从优爱腾3大长视频背景需求出发,分析静态资源CDN、视频文件存储与分发、…

网络安全、信息安全、计算机安全,有何区别?

这三个概念都存在,一般人可能会混为一谈。 究竟它们之间是什么关系?并列?交叉? 可能从广义上来说它们都可以用来表示安全security这样一个笼统的概念。 但如果从狭义上理解,它们应该是有区别的,区别在哪呢&…

力扣hot100题解(python版36-40题)

36、二叉树的中序遍历 给定一个二叉树的根节点 root ,返回 它的 *中序 遍历* 。 示例 1: 输入:root [1,null,2,3] 输出:[1,3,2]示例 2: 输入:root [] 输出:[]示例 3: 输入&am…

tcping实用小工具

Tcping实用小工具命令详解 一、tcping介绍 tcping:tcping命令基于tcp协议监控,可以从较低级别的协议获得简单的,可能不可靠的数据报服务。 原则上,TCP应该能够在从容硬线连接到分组交换或电路交换网络的各种通信系统之上操作。 …

【机器学习基础】层次聚类-BIRCH聚类

🚀个人主页:为梦而生~ 关注我一起学习吧! 💡专栏:机器学习 欢迎订阅!相对完整的机器学习基础教学! ⭐特别提醒:针对机器学习,特别开始专栏:机器学习python实战…

企业微信私有部署:实现高效沟通与信息安全

随着移动互联网的快速发展,企业微信作为一种高效、便捷的通讯工具,已经成为了众多企业的首选。然而,对于一些对信息安全有特殊要求的大型企业而言,使用公有版企业微信并不能满足其安全需求。因此,企业微信私有部署应运…

matplotlib.animation 3d姿态动画

目录 演示效果: 演示代码: 保存为gif 演示效果: 演示代码: import numpy as np import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D from matplotlib.animation import FuncAnimation# 定义人体关键点…

【c++入门】纯粹的五位偶数

说明 纯粹偶数指的是一个数的各个位都是偶数的数,比如:24686;请编程求出10000~n中,所有的五位的纯粹偶数有多少个? 输入数据 一个整数n(n为一个5位的整数) 输出数据 一个整数,代…

网络防御第6次作业

防病毒网关 按照传播方式分类 病毒 病毒是一种基于硬件和操作系统的程序,具有感染和破坏能力,这与病毒程序的结构有关。病毒攻击的宿主程序是病毒的栖身地,它是病毒传播的目的地,又是下一次感染的出发点。计算机病毒感染的一般过…

Java基础 - Stream 流:Stream API的中间操作

在上一篇博客中,我介绍了构建 Stream 流的多种方式,以及 Stream 流的特点和优势。如果你还没有阅读,你可以点击这里查看。 Java基础 - Stream 流:构建流的多种方式 在这篇博客中,我将探索 Stream API 的中间操作&…

动态规划(算法竞赛、蓝桥杯)--分组背包DP

1、B站视频链接&#xff1a;E16 背包DP 分组背包_哔哩哔哩_bilibili #include <bits/stdc.h> using namespace std; const int N110; int v[N][N],w[N][N],s[N]; // v[i,j]:第i组第j个物品的体积 s[i]:第i组物品的个数 int f[N][N]; // f[i,j]:前i组物品&#xff0c;能放…

学习JavaEE的日子 Day21 枚举

Day21 1.枚举的引入 需求&#xff1a;编写季节类&#xff08;Season&#xff09;&#xff0c;该类只有四个对象&#xff08;spring&#xff0c;summer&#xff0c;autumn&#xff0c;winter&#xff09; 概念&#xff1a;枚举&#xff08;enum&#xff09;全称为 enumeration&…

基带信号处理设计原理图:2-基于6U VPX的双TMS320C6678+Xilinx FPGA K7 XC7K420T的图像信号处理板

基于6U VPX的双TMS320C6678Xilinx FPGA K7 XC7K420T的图像信号处理板 综合图像处理硬件平台包括图像信号处理板2块&#xff0c;视频处理板1块&#xff0c;主控板1块&#xff0c;电源板1块&#xff0c;VPX背板1块。 一、板卡概述 图像信号处理板包括2片TI 多核DSP处理…

Linux进程管理:(二)进程调度原语

文章说明&#xff1a; Linux内核版本&#xff1a;5.0 架构&#xff1a;ARM64 参考资料及图片来源&#xff1a;《奔跑吧Linux内核》 Linux 5.0内核源码注释仓库地址&#xff1a; zhangzihengya/LinuxSourceCode_v5.0_study (github.com) 进程调度的概念比较简单&#xff0c…

Java学习笔记NO.17

T1&#xff1a;合并两个排序好的整数数组 import java.util.Arrays;public class MergeSortedArrays {public static int[] mergeArrays(int[] arr1, int[] arr2) {int[] result new int[arr1.length arr2.length];int i 0, j 0, k 0;while (i < arr1.length &&am…