MQTT宝典

文章目录

    • 1.介绍
    • 2.发布和订阅
    • 3.MQTT 数据包结构
    • 4.Demo
    • 5.EMQX

1.介绍

什么是MQTT协议
MQTT(消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上。

MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

特点
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。

MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。

MQTT 与 HTTP 一样,MQTT 运行在传输控制协议/互联网协议 (TCP/IP) 堆栈之上。

2.发布和订阅

MQTT使用的发布/订阅消息模式,它提供了一对多的消息分发机制,从而实现与应用程序的解耦。

这是一种消息传递模式,消息不是直接从发送器发送到接收器(即点对点),而是由MQTT server(或称为 MQTT Broker)分发的。

在这里插入图片描述
MQTT 服务器是发布-订阅架构的核心。

它可以非常简单地在Raspberry Pi或NAS等单板计算机上实现,当然也可以在大型机或 Internet 服务器上实现。

服务器分发消息,因此必须是发布者,但绝不是订阅者!

客户端可以发布消息(发送方)、订阅消息(接收方)或两者兼而有之。

客户端(也称为节点)是一种智能设备,如微控制器或具有 TCP/IP 堆栈和实现 MQTT 协议的软件的计算机。

消息在允许过滤的主题下发布。主题是分层划分的 UTF-8 字符串。不同的主题级别用斜杠/作为分隔符号。

EG:
在这里插入图片描述
在这里插入图片描述
QoS(Quality of Service levels)
服务质量是 MQTT 的一个重要特性。当我们使用 TCP/IP 时,连接已经在一定程度上受到保护。但是在无线网络中,中断和干扰很频繁,MQTT 在这里帮助避免信息丢失及其服务质量水平。这些级别在发布时使用。如果客户端发布到 MQTT 服务器,则客户端将是发送者,MQTT 服务器将是接收者。当MQTT服务器向客户端发布消息时,服务器是发送者,客户端是接收者。

  • QoS 0 : 这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。即:<=1
    在这里插入图片描述
  • QoS 1 : 承诺消息将至少传送一次给订阅者。
    在这里插入图片描述
  • QoS 2 : 我们保证消息仅传送到目的地一次。为此,带有唯一消息 ID 的消息会存储两次,首先来自发送者,然后是接收者。QoS 级别 2 在网络中具有最高的开销,因为在发送方和接收方之间需要两个流。
    在这里插入图片描述

3.MQTT 数据包结构

  • 固定头(Fixed header),存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识;

  • 可变头(Variable header),存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容;

  • 消息体(Payload),存在于部分MQTT数据包中,表示客户端收到的具体内容;

整体MQTT的消息格式如下图所示:
在这里插入图片描述

MQTT固定头
固定头存在于所有MQTT数据包中,其结构如下:
在这里插入图片描述
固定头的消息格式

  • 消息类型 / message type
    位置: byte 1, bits 7-4
    4位的无符号值,类型如下:
    在这里插入图片描述
  • 标识位 / DUP/RET
    位置: byte 1, bits 3-0。

在不使用标识位的消息类型中,标识位被作为保留位。如果收到无效的标志时,接收端必须关闭网络连接:

在这里插入图片描述
DUP:发布消息的副本。用来在保证消息的可靠传输,如果设置为 1,则在下面的变长中增加MessageId,并且需要回复确认,以保证消息传输完成,但不能用于检测消息重复发送。

QoS发布消息的服务质量,即:保证消息传递的次数

在这里插入图片描述
RETAIN:发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它,如果设有那么推送至当前订阅者后释放。

  • 剩余长度

位置:byte 1

固定头的第二字节用来保存变长头部和消息体的总大小的,但不是直接保存的。这一字节是可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为 1时,表示长度不足,需要使用二个字节继续保存。例如:计算出后面的大小为0

MQTT可变头 / Variable header

MQTT数据包中包含一个可变头,它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是做为包的标识:
在这里插入图片描述
很多类型数据包中都包括一个2字节的数据包标识字段,这些类型的包有:

PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、

SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK

Payload消息体
Payload消息体是MQTT数据包的第三部分,CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息 有消息体:

  • CONNECT,消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码

  • SUBSCRIBE,消息体内容是一系列的要订阅的主题以及QoS。

  • SUBACK,消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。

  • UNSUBSCRIBE,消息体内容是要订阅的主题。

4.Demo

  • DEMO1
 public static void main(String[] args) {String broker = "tcp://172.168.1.122:2314";String clientId = "JavaSample";//Use the memory persistenceMemoryPersistence persistence = new MemoryPersistence();try {MqttClient sampleClient = new MqttClient(broker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println("broker:" + broker);sampleClient.connect(connOpts);System.out.println("Connected");String topic = "demo/topics";System.out.println("Subscribe to topic:" + topic);sampleClient.subscribe(topic);//订阅主题sampleClient.setCallback(new MqttCallback() {public void messageArrived(String topic, MqttMessage message) throws Exception {String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);System.out.println(theMsg);}public void deliveryComplete(IMqttDeliveryToken token) {}public void connectionLost(Throwable throwable) {}});String content = "Message from MqttPublishSample";int qos = 2;System.out.println("Publishing message:" + content);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);sampleClient.publish(topic, message);//发布消息System.out.println("Message published");} catch (MqttException me) {System.out.println("reason" + me.getReasonCode());System.out.println("msg" + me.getMessage());System.out.println("loc" + me.getLocalizedMessage());System.out.println("cause" + me.getCause());System.out.println("excep" + me);me.printStackTrace();}}

5.EMQX

  • 本地搭建EMQX服务
    1、下载emqx压缩文件:wget https://www.emqx.com/zh/downloads/broker/5.0.3/emqx-5.0.3-el7-amd64.tar.gz
    2、解压文件:mkdir -p emqx && tar -zxvf emqx-5.0.3-el7-amd64.tar.gz -C emqx
    3、启动服务:./emqx/bin/emqx start

在这里插入图片描述

  • 整合使用

在这里插入图片描述

配置

        <dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version></dependency><!-- MQTT --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
# Mqtt配置
mqtt:#我是在本地搭建了emqx服务,192.168.2.31就是我本地emqx服务的地址。也可用公共测试不需要搭建emqx服务,将ip改为broker.emqx.io即可。serverURIs: tcp://192.168.111.5:1883username: #可不填写password:  #可不填写qos: 2 #等级 有 0 1 2 三种clientId: mqttxxxtopic: testTopic  #订阅的主题,多个时可以使用逗号分开 如:topic1,topic2,topicenabled: true  #是否打开mqtt服务keepalive: 100 #心跳时间  不需要动timeout: 100 # 超时时间秒 不需要动
@Configuration
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;@Value("${mqtt.username:{null}}")private String username;@Value("${mqtt.password:{null}}")private String password;@Value("${mqtt.serverURIs:{null}}")private String hostUrl;@Value("${mqtt.clientId:{null}}")private String clientId;@Value("${mqtt.topic:{null}}")private String defaultTopic;@Value("${mqtt.qos:{null}}")private int qos;@Value("${mqtt.enabled:{null}}")private boolean enabled;@Value("${mqtt.keepalive:{null}}")private int keepalive;@Value("${mqtt.timeout:{null}}")private int timeout;//订阅主体@Beanpublic MqttPushClient getMqttPushClient() {if(enabled == true){String mqtt_topic[] = defaultTopic.split(",");mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接for(int i=0; i<mqtt_topic.length; i++){mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题}}return mqttPushClient;}//发送消息到对应主题@Beanpublic MqttPushClient pushMessage() {mqttPushClient.publish(0,true,"wsy","呵呵哈哈哈 你好呀");return mqttPushClient;}}
@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host      ip+端口* @param clientID  客户端Id* @param username  用户名* @param password  密码* @param timeout   超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(pushCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布** @param qos         连接方式* @param retained    是否保留* @param topic       主题* @param pushMessage 消息体*/public boolean publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);if (null == mTopic) {logger.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();return true;} catch (MqttPersistenceException e) {e.printStackTrace();return false;} catch (MqttException e) {e.printStackTrace();return false;}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("开始订阅主题" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}}
@Component
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;private static String _topic;private static String _qos;private static String _msg;@Overridepublic void connectionLost(Throwable throwable) {// 连接丢失后,一般在这里面进行重连logger.info("连接断开,可以做重连");if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息会执行到这里面logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));_topic = topic;_qos = mqttMessage.getQos()+"";_msg = new String(mqttMessage.getPayload());}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}//别的Controller层会调用这个方法来  获取  接收到的硬件数据public String receive() {JSONObject jsonObject = new JSONObject();jsonObject.put("topic", _topic);jsonObject.put("qos", _qos);jsonObject.put("msg", _msg);return jsonObject.toString();}
}

注意:
clientId不能重复,若存在重复的 clientId 连接,会导致争抢而连接不上。(就是存在两个客户端 clientId 相同,两个在打架,一直争同一连接,导致一直重连,重复发布消息)

发布:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

订阅:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

参考资料

https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/37187.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安卓快速开发

1.环境搭建 Android Studio下载网页&#xff1a;https://developer.android.google.cn/studio/index.html 第一次新建工程需要等待很长时间&#xff0c;新建一个Empty Views Activity 项目&#xff0c;右上角选择要运行的机器&#xff0c;运行就安装上去了(打开USB调试)。 2…

【Linux】UDP协议——传输层

目录 传输层 再谈端口号 端口号范围划分 认识知名端口号 两个问题 netstat与iostat pidof UDP协议 UDP协议格式 UDP协议的特点 面向数据报 UDP的缓冲区 UDP使用注意事项 基于UDP的应用层协议 传输层 在学习HTTP等应用层协议时&#xff0c;为了便于理解&#xff…

【实操】2023年npm组件库的创建发布流程

2022年的实践为基础&#xff0c;2023年我再建一个组件库【ZUI】。步骤回顾&#xff1a; 2022年的npm组件包的发布删除教程_npm i ant-design/pro-components 怎么删除_啥咕啦呛的博客-CSDN博客 1.在gitee上创建一个项目,相信你是会的 2.创建初始化项目&#xff0c;看吧&#…

【新品发布】ChatWork企业知识库系统源码

系统简介 基于前后端分离架构以及Vue3、uni-app、ThinkPHP6.x、PostgreSQL、pgvector技术栈开发&#xff0c;包含PC端、H5端。 ChatWork支持问答式和文档式知识库&#xff0c;能够导入txt、doc、docx、pdf、md等多种格式文档。 导入数据完成向量化训练后&#xff0c;用户提问…

两个pdf合并成一个pdf怎么合并?这几个方法值得推荐

两个pdf合并成一个pdf怎么合并&#xff1f;pdf文件的合并是一个很常见的需求&#xff0c;特别是在处理工作文件或学习资料时。为了更好的帮助你了解如何将两个pdf文件合并成一个&#xff0c;下面就给大家详细介绍几种合并方法。 方法一&#xff1a;使用迅捷PDF转换器 这是一款…

小红书如何打造爆款引流吸粉?11个秘诀助你秒变达人!

在这个充满信息和内容的时代&#xff0c;小红书以其独特的社交平台特性和个性化内容吸引了众多用户。今天&#xff0c;我们就来揭秘小红书关注战略&#xff0c;了解如何在这个平台上打造独特的内容体验&#xff0c;与用户建立更亲近的连接。#小红书# 1、定位清晰&#xff0c;找…

【论文阅读】基于深度学习的时序预测——Pyraformer

系列文章链接 论文一&#xff1a;2020 Informer&#xff1a;长时序数据预测 论文二&#xff1a;2021 Autoformer&#xff1a;长序列数据预测 论文三&#xff1a;2022 FEDformer&#xff1a;长序列数据预测 论文四&#xff1a;2022 Non-Stationary Transformers&#xff1a;非平…

页面跳转和两个页面之间的数据传递-鸿蒙ArkTS

页面跳转和两个页面之间的数据传递-ArkTS 页面跳转和两个页面之间的数据传递-ArkTS关于router的使用**跳转页面的实现方式。**页面接受跳转传递的参数页面返回及携带参数效果代码Index页面Second页面 参考资料 页面跳转和两个页面之间的数据传递-ArkTS 本篇文章主要是对两个页面…

TiDB在科捷物流神州金库核心系统的应用与实践

业务背景 北京科捷物流有限公司于2003年在北京正式成立&#xff0c;是ISO质量管理体系认证企业、国家AAAAA级物流企业、海关AEO高级认证企业&#xff0c;注册资金1亿元&#xff0c;是中国领先的大数据科技公司——神州控股的全资子公司。科捷物流融合B2B和B2C的客户需求&#…

matlab使用教程(15)—图论基础

1.有向图和无向图 1.1什么是图&#xff1f; 图是表示各种关系的节点和边的集合&#xff1a; • 节点 是与对象对应的顶点。 • 边 是对象之间的连接。 • 图的边有时会有权重 &#xff0c;表示节点之间的每个连接的强度&#xff08;或一些其他属性&#xff09;。 这些定…

MySQL8.xx一主两从复制安装与配置

搭建环境: 查看系统版本cat /etc/redhat-release [rootwww tools]# cat /etc/redhat-release CentOS Linux release 7.9.2009 (Core) 查看内核版本cat /proc/version 目标: 一主两从 主机IP 主机名称 端口 搭建环境 安装目录192.168.1.100 docker…

19.正则表达式

19.1什么是正则表达式 ●正则表达式( Regular Expression) 是用于匹配字符串中字符组合的模式。在JavaScript中&#xff0c; 正则表达式也是对象 ●通常用来查找、替换那些符合正则表达式的文本&#xff0c;许多语言都支持正则表达式 ●正则表达式在JavaScript中的使用场景: …

8.15号经典模型复习笔记

文章目录 Deep Residual Learning for Image Recognition(CVPR2016)方法 Densely Connected Convolutional Networks&#xff08;CVPR2017&#xff09;方法 EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks&#xff08;ICML2019&#xff09;方法 Re…

使用维纳过滤器消除驾驶舱噪音(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Tomcat的多实例和动静分离

目录 一、多实例 二、 nginxtomcat的负载均衡和动静分离 三、Tomcat 客户端->四层代理->七层代理->tomcat服务器 实验&#xff1a; 问题总结&#xff1a; tomcat日志文件&#xff1a;/usr/local/tomcat/logs/catalina.out 一、多实例 在一台服务器上有多个tomc…

解压版 MySQL 数据库的安装与配置

目录 1 下载2 安装3 配置3.1 添加环境变量3.2 新建配置文件3.3 初始化MySQL3.4 注册MySQL服务3.5 启动MySQL服务3.6 修改默认账户密码 4 登录5 卸载 安装环境:Win10 64位 软件版本:MySQL 5.7.24 解压版 1 下载 点击链接 进入如下界面 ❗️注意&#xff1a; 我们一般不会选择最新…

RocketMQ 5.1.0 源码详解 | Producer 发送流程

文章目录 初始化DefaultMQProducer实例发送流程DefaultMQProducer#sendDefaultMQProducerImpl#sendMQClientInstance#updateTopicRouteInfoFromNameServer使用特定 topic 获取路由信息使用默认 topic 获取路由信息 DefaultMQProducerImpl#sendDefaultImpl发送流程总结 初始化De…

31 | 独角兽企业数据分析

独角兽企业:是投资行业尤其是风险投资业的术语,一般指成立时间不超过10年、估值超过10亿美元的未上市创业公司。 项目目的: 1.通过对独角兽企业进行全面地分析(地域,投资方,年份,行业等),便于做商业上的战略决策 项目数据源介绍 1.数据源:本项目采用的数据源是近…

文档控件DevExpress Office File API v23.1新版亮点 - 支持.NET MAUI

DevExpress Office File API是一个专为C#, VB.NET 和 ASP.NET等开发人员提供的非可视化.NET库。有了这个库&#xff0c;不用安装Microsoft Office&#xff0c;就可以完全自动处理Excel、Word等文档。开发人员使用一个非常易于操作的API就可以生成XLS, XLSx, DOC, DOCx, RTF, CS…

nginx一般轮询、加权轮询、ip_hash等负载均衡模式配置介绍

一.负载均衡含义简介 二.nginx负载均衡配置方式 准备三台设备&#xff1a; 2.190均衡服务器&#xff0c;2.191web服务器1&#xff0c;2.160web服务器2&#xff0c;三台设备均安装nginx&#xff0c;两台web服务器均有网页内容 1.一般轮询负载均衡 &#xff08;1&#xff09…