MQTT协议
MQTT 是一种基于发布/订阅模式的轻量级消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用而设计,可以用极少的代码为联网设备提供实时可靠的消息服务。MQTT 协议广泛应用于物联网、移动互联网、智能硬件、车联网、智慧城市、远程医疗、电力、石油与能源等领域。
MQTT 与其他协议对比
MQTT vs HTTP
MQTT 的最小报文仅为 2 个字节,比 HTTP 占用更少的网络开销。
MQTT 与 HTTP 都能使用 TCP 连接,并实现稳定、可靠的网络连接。
MQTT 基于发布订阅模型,HTTP 基于请求响应,因此 MQTT 支持双工通信。
MQTT 可实时推送消息,但 HTTP 需要通过轮询获取数据更新。
MQTT 是有状态的,但是 HTTP 是无状态的。
MQTT 可从连接异常断开中恢复,HTTP 无法实现此目标。
MQTT vs XMPP
MQTT 协议设计简单轻量、路由灵活,将在移动互联网、物联网消息领域,全面取代 PC 时代的 XMPP 协议。
MQTT 报文体积小且编解码容易,XMPP 基于繁重的 XML,报文体积大且交互繁琐。
MQTT 基于发布订阅模式,相比 XMPP 基于 JID 的点对点消息路由更为灵活。
MQTT 支持 JSON、二进制等不同类型报文。XMPP 采用 XML 承载报文,二进制必须 Base64 编码等处理。
MQTT 通过 QoS 保证消息可靠传输,XMPP 主协议并未定义类似机制。
MQTT VS HTTP 请求响应
HTTP 是万维网数据通信的基础,其简单易用无客户端依赖,被广泛应用于各个行业。在物联网领域,HTTP也可以用于连接物联网设备和 Web 服务器,实现设备的远程监控和控制。
虽然使用简单、开发周期短,但是基于请求响应的 HTTP 在物联网领域的应用却有一定的局限性。
首先,协议层面 HTTP 报文相较与 MQTT 需要占用更多的网络开销;
其次,HTTP 是一种无状态协议,这意味着服务器在处理请求时不会记录客户端的状态,也无法实现从连接异常断开中恢复;
最后,请求响应模式需要通过轮询才能获取数据更新,而 MQTT 通过订阅即可获取实时数据更新。
发布订阅模式的松耦合特性,也给 MQTT 带来了一些副作用。由于发布者并不知晓订阅者的状态,因此发布者也无法得知订阅者是否收到了消息,或者是否正确处理了消息。为此,MQTT 5.0 增加了请求响应特性,以实现订阅者收到消息后向某个主题发送应答,发布者收到应答后再进行后续操作。
MQTT VS消息队列
尽管 MQTT 与消息队列的很多行为和特性非常接近,比如都采用发布/订阅模式,但是他们面向的场景却有着显著的不同。
消息队列主要用于服务端应用之间的消息存储与转发,这类场景往往数据量大但客户端数量少。
MQTT 是一种消息传输协议,主要用于物联网设备之间的消息传递,这类场景的特点是海量的设备接入、管理与消息传输。
在一些实际的应用场景中,MQTT 与消息队列往往会被结合起来使用,以使 MQTT 服务器能专注于处理设备的连接与设备间的消息路由。比如先由 MQTT 服务器接收物联网设备上报的数据,然后再通过消息队列将这些数据转发到不同的业务系统进行处理。
不同于消息队列,MQTT 主题不需要提前创建。MQTT 客户端在订阅或发布时即自动的创建了主题,开发者无需再关心主题的创建,并且也不需要手动删除主题。
MQTT常见的特点
1)轻量高效,节省带宽:MQTT 将协议本身占用的额外消耗最小化,消息头部最小只需要占用 2 个字节,可稳定运行在带宽受限的网络环境下。同时,MQTT 客户端只需占用非常小的硬件资源,能运行在各种资源受限的边缘端设备上。
2)可靠性较强:提供了多种消息的质量等级
3)安全的双向通讯:依赖于发布订阅模式,MQTT 允许在设备和云之间进行双向消息通信。发布订阅模式的优点在于:发布者与订阅者不需要建立直接连接,也不需要同时在线,而是由消息服务器负责所有消息的路由和分发工作。
安全性是所有物联网应用的基石,MQTT 支持通过 TLS/SSL 确保安全的双向通信,同时 MQTT 协议中提供的客户端 ID、用户名和密码允许我们实现应用层的身份验证和授权。
4)多语言支持:PHP、Node.js、python、java、golang
5)海量连接支持:连接海量的物联网设备,离不开 MQTT 服务器的支持。目前,MQTT 服务器中支持并发连接数最多的是EMQX。最近发布的 EMQX 5.0 通过一个 23 节点的集群达成了 1 亿 MQTT 连接+每秒 100 万消息吞吐,这使得 EMQX 5.0 成为目前为止全球最具扩展性的 MQTT 服务器。
6)在线状态感知:为了应对网络不稳定的情况,MQTT 提供了心跳保活(Keep Alive)机制。在客户端与服务端长时间无消息交互的情况下,Keep Alive 保持连接不被断开,若一旦断开,客户端可即时感知并立即重连。
同时,MQTT 设计了遗愿(Last Will)消息,让服务端在发现客户端异常下线的情况下,帮助客户端发布一条遗愿消息到指定的 MQTT 主题。
另外,部分 MQTT 服务器如 EMQX 也提供了上下线事件通知功能,当后端服务订阅了特定主题后,即可收到所有客户端的上下线事件,这样有助于后端服务统一处理客户端的上下线事件。
MQTT的常见概念
1)MQTT客户端:任何运行的MQTT客户端库(MQTT开发工具的SDK)的应用或设备都是MQTT客户端
2)MQTT Broker:实现了MQTT通讯的代理软件
3)主题:存在于MQTT Broker中的,就是一个普通字符串,使用主体来对消息进行分类的
MQTT快速入门
EMQX:一款实现了MQTT协议的,开源的MQTT消息代理软件,MQTT定义了消息通讯的规则和流程,而EMQX则是遵循这些规则的软件,使得设备能够一句MQTT协议进行有效通信。
官网地址:EMQX:用于物联网、车联网和工业物联网的企业级 MQTT 平台
下载emqx
在windows上下载emqx,网址:Directory listing for EMQX: / | EMQ
将下载的emqx-5.3.2-windows-amd64.zip
解压出来,解压目录不能存在中文、空格、特殊字符
打开其中的bin文件夹,在地栏复制地址
用管理员 身份在cmd中进行操作
.\emqx.cmd install 将发行版安装为 Windows 服务
.\emqx.cmd start 启动服务和 Erlang 节点
.\emqx.cmd stop 停止服务和 Erlang 节点
.\emqx.cmd restart 运行停止命令和启动命令
.\emqx.cmd uninstall 卸载服务并终止正在运行的节点
.\emqx.cmd ping 检查节点是否正在运行
.\emqx.cmd ctl 运行管理命令
.\emqx.cmd console 在Windows shell 中启动 Erlang 版本
.\emqx.cmd attach 连接到正在运行的节点并打开交互式控制台
.\emqx.cmd remote_console - 与附加相同
.\emqx.cmd list 显示已安装的 Erlang 服务的列表
.\emqx.cmd usage 显示可用命令
启动 emqx服务,输入命令:.\emqx.cmd console
提示EMQX 版本号 is running now!,则说明运行成功
浏览器输入localhost:18083回车,即可访问EMQX控制台,在登录页面输入初始化账号 :用户名:admin 密码:public
连接MQTT
下载并安装MQTTX,网址:MQTTX:全功能 MQTT 客户端工具
进入EMQX控制台,访问控制->客户端认证->创建认证方式(以选择密码认证为例)->添加用户
打开MQTTX客户端,建立连接两个 sub和pub
sub添加订阅aa
pub发送消息
sub接收到消息
MQTT控制报文
控制报文简介
MQTT 控制报文是 MQTT 数据传输的最小单元。MQTT 客户端和服务端通过交换控制报文来完成它们的工作,比如订阅主题和发布消息。
MQTT 目前定义了 15 种控制报文类型,如果按照功能进行分类,我们可以将这些报文分为连接、发布、订阅三个类别:
CONNECT 报文用于客户端向服务端发起连接,
CONNACK 报文则作为响应返回连接的结果。
如果想要结束通信,或者遇到了一个必须终止连接的错误,客户端和服务端可以发送一个 DISCONNECT 报文然后关闭网络连接。
AUTH 报文是 MQTT 5.0 引入的全新的报文类型,它仅用于增强认证,为客户端和服务端提供更安全的身份验证。
PINGREQ 和 PINGRESP 报文用于连接保活和探活,客户端定期发出 PINGREQ 报文向服务端表示自己仍然活跃,然后根据 PINGRESP 报文是否及时返回判断服务端是否活跃。
PUBLISH 报文用于发布消息,余下的四个报文分别用于 QoS 1 和 2 消息的确认流程。
SUBSCRIBE 报文用于客户端向服务端发起订阅,UNSUBSCRIBE 报文则正好相反,
SUBACK 和UNSUBACK 报文分别用于返回订阅和取消订阅的结果。
MQTT报文格式
在MQTT中,无论是什么类型的控制报文,它们都由固定报头、可变报头和有效载荷三个部分组成。
固定报头固定存在于所有控制报文中,而可变报头和有效载荷是否存在以及它们的内容则取决于具体的报文类型。例如用于维持连接的 PINGREQ 报文就只有一个固定报头,用于传递应用消息的 PUBLISH 报文则完整地包含了这三个部分。
固定报头
固定报头由报文类型、标识位和报文剩余长度三个字段组成。
报文类型位于固定报头第一个字节的高 4 位,它是一个无符号整数,很显然,它表示当前报文的类型,例如 1 表示这是一个 CONNECT 报文,2 表示 CONNACK 报文等等。事实上,除了报文类型和剩余长度这两个字段,MQTT报文剩余部分的内容基本都取决于具体的报文类型,所以这个字段也决定了接收方应该如何解析报文的后续内容。
固定报头第一个字节中剩下的低 4 位包含了由控制报文类型决定的标识位。不过到 MQTT 5.0 为止,只有 PUBLISH 报文的这四个比特位被赋予了明确的含义:
Bit 3:DUP,表示当前 PUBLISH 报文是否是一个重传的报文。
Bit 2,1:QoS,表示当前 PUBLISH 报文使用的服务质量等级。
Bit 0:Retain,表示当前 PUBLISH 报文是否是一个保留消息。
其他所有的报文中,这 4 位都仍是保留的,即它们是一个固定的,不可随意变更的值。
最后的剩余长度指示了当前控制报文剩余部分的字节数,也就是可变报头和有效载荷这两个部分的长度。
所以 MQTT 控制报文的总长度实际上等于固定报头的长度加上剩余长度。
可变报头
可变报头的内容取决于具体的报文类型。例如 CONNECT 报文的可变报头按顺序包含了协议名、协议级别、连接标识、Keep Alive 和属性这五个字段。PUBLISH 报文的可变报头则按顺序包含了主题名、报文标识符和属性这三个字段。
有效载荷
最后是有效载荷部分。我们可以将报文的可变报头看作是它的附加项,而有效载荷则用于实现这个报文的核心目的。
比如在 PUBLISH 报文中,Payload 用于承载具体的应用消息内容,这也是 PUBLISH 报文最核心的功能。
而 PUBLISH 报文的可变报头中的 QoS、Retain 等字段,则是围绕着应用消息提供一些额外的能力。
SUBSCRIBE 报文也是如此,Payload 包含了想要订阅的主题以及对应的订阅选项,这也是 SUBSCRIBE 报文最主要的工作。
MQTT报文验证
wireshark工具抓取通讯数据报文
下载地址:Wireshark · Download
QOS
QOS是什么
很多时候,使用 MQTT 协议的设备都运行在网络受限的环境下,而只依靠底层的 TCP 传输协议,并不能完全保证消息的可靠到达。因此,MQTT 提供了 QoS 机制,其核心是设计了多种消息交互机制来提供不同的服务质量,来满足用户在各种场景下对消息可靠性的要求。
MQTT 定义了三个 QoS 等级,分别为:
QoS 0,消息最多发送一次--->可能丢失消息
QoS 1,消息至少发送一次--->可以保证收到消息,但消息可能重复
QoS 2,消息只发送一次--->可以保证消息既不丢失也不重复
QoS 等级从低到高,不仅意味着消息可靠性的提升,也意味着传输复杂程度的提升。
在一个完整的从发布者到订阅者的消息投递流程中,QoS 等级是由发布者在 PUBLISH 报文中指定的,大部分情况下 Broker 向订阅者转发消息时都会维持原始的 QoS 不变。不过也有一些例外的情况,根据订阅者的订阅要求,消息的 QoS 等级可能会在转发的时候发生降级。
例如,订阅者在订阅时要求 Broker 可以向其转发的消息的最大 QoS 等级为 QoS 1,那么后续所有 QoS 2 消息都会降级至 QoS 1 转发给此订阅者,而所有 QoS 0 和 QoS 1 消息则会保持原始的 QoS 等级转发。
QoS 0 - 最多交付一次
QoS 0 是最低的 QoS 等级。QoS 0 消息即发即弃,不需要等待确认,不需要存储和重传,因此对于接收方来说,永远都不需要担心收到重复的消息。
为什么 QoS 0 消息会丢失?
当我们使用 QoS 0 传递消息时,消息的可靠性完全依赖于底层的 TCP 协议。
而 TCP 只能保证在连接稳定不关闭的情况下消息的可靠到达,一旦出现连接关闭、重置,仍有可能丢失
当前处于网络链路或操作系统底层缓冲区中的消息。这也是 QoS 0 消息最主要的丢失场景。
QoS 1 - 至少交付一次
为了保证消息到达,QoS 1 加入了应答与重传机制,发送方只有在收到接收方的 PUBACK 报文以后,才能认为消息投递成功,在此之前,发送方需要存储该 PUBLISH 报文以便下次重传。
QoS 1 需要在 PUBLISH 报文中设置 Packet ID,而作为响应的 PUBACK 报文,则会使用与 PUBLISH 报文相同的 Packet ID,以便发送方收到后删除正确的 PUBLISH 报文缓存。
为什么 QoS 1 消息会重复?
对于发送方来说,没收到 PUBACK 报文分为以下两种情况:
1.PUBLISH 未到达接收方
2.PUBLISH 已经到达接收方,接收方的 PUBACK 报文还未到达发送方
在第一种情况下,发送方虽然重传了 PUBLISH 报文,但是对于接收方来说,实际上仍然仅收到了一次消息。
但是在第二种情况下,在发送方重传时,接收方已经收到过了这个 PUBLISH 报文,这就导致接收方将收到重复的消息。
虽然重传时 PUBLISH 报文中的 DUP 标志会被设置为 1,用以表示这是一个重传的报文。但是接收方并不能因此假定自己曾经接收过这个消息,仍然需要将其视作一个全新的消息。
QoS 2 - 只交付一次
QoS 2 解决了 QoS 0、1 消息可能丢失或者重复的问题,但相应地,它也带来了最复杂的交互流程和最高的开销。每一次的 QoS 2 消息投递,都要求发送方与接收方进行至少两次请求/响应流程。
首先,发送方存储并发送 QoS 为 2 的 PUBLISH 报文以启动一次 QoS 2 消息的传输,然后等待接收方回复 PUBREC 报文。这一部分与 QoS 1 基本一致,只是响应报文从 PUBACK 变成了PUBREC。
当发送方收到 PUBREC 报文,即可确认对端已经收到了 PUBLISH 报文,发送方将不再需要重传这个报文,并且也不能再重传这个报文。所以此时发送方可以删除本地存储的 PUBLISH 报文,然后发送一个 PUBREL 报文,通知对端自己准备将本次使用的 Packet ID 标记为可用了。与 PUBLISH 报文一样,我们需要确保 PUBREL 报文到达对端,所以也需要一个响应报文,并且这个 PUBREL 报文需要被存储下来以便后续重传。
当接收方收到 PUBREL 报文,也可以确认在这一次的传输流程中不会再有重传的 PUBLISH 报文到达,因此回复 PUBCOMP 报文表示自己也准备好将当前的 Packet ID 用于新的消息了。
当发送方收到 PUBCOMP 报文,这一次的 QoS 2 消息传输就算正式完成了。在这之后,发送方可以再次使用当前的 Packet ID 发送新的消息,而接收方再次收到使用这个 Packet ID 的 PUBLISH 报文时,也会将它视为一个全新的消息。
为什么 QoS 2 消息不会重复?
QoS 2 消息保证不会丢失的逻辑与 QoS 1 相同,所以这里我们就不再重复了。
与 QoS 1 相比,QoS 2 新增了 PUBREL 报文和 PUBCOMP 报文的流程,也正是这个新增的流程带来了消息不会重复的保证。
在我们更进一步之前,我们先快速回顾一下 QoS 1 消息无法避免重复的原因。
当我们使用 QoS 1 消息时,对接收方来说,回复完 PUBACK 这个响应报文以后 Packet ID 就重新可用了,也不管响应是否确实已经到达了发送方。所以就无法得知之后到达的,携带了相同 Packet ID 的PUBLISH 报文,到底是发送方因为没有收到响应而重传的,还是发送方因为收到了响应所以重新使用了这个 Packet ID 发送了一个全新的消息。
所以,消息去重的关键就在于,通信双方如何正确地同步释放 Packet ID,换句话说,不管发送方是重传消息还是发布新消息,一定是和对端达成共识了的。而 QoS 2 中增加的 PUBREL 流程,正是提供了帮助通信双方协商 Packet ID 何时可以重用的能力。
QoS 2 规定,发送方只有在收到 PUBREC 报文之前可以重传 PUBLISH 报文。一旦收到 PUBREC 报文并发出 PUBREL 报文,发送方就进入了 Packet ID 释放流程,不可以再使用当前 Packet ID 重传PUBLISH 报文。同时,在收到对端回复的 PUBCOMP 报文确认双方都完成 Packet ID 释放之前,也不可以使用当前 Packet ID 发送新的消息。
因此,对于接收方来说,能够以 PUBREL 报文为界限,凡是在 PUBREL 报文之前到达的 PUBLISH 报文,都必然是重复的消息;而凡是在 PUBREL 报文之后到达的 PUBLISH 报文,都必然是全新的消息。一旦有了这个前提,我们就能够在协议层面完成 QoS 2 消息的去重。
不同 QoS 的适用场景和注意事项
QoS 0
QoS 0 的缺点是可能会丢失消息,消息丢失的频率依赖于你所处的网络环境,并且可能使你错过断开连接期间的消息,不过优点是投递的效率较高。
所以我们通常选择使用 QoS 0 传输一些高频且不那么重要的数据,比如传感器数据,周期性更新,即使遗漏几个周期的数据也可以受。
QoS 1
QoS 1 可以保证消息到达,所以适合传输一些较为重要的数据,比如下达关键指令、更新重要的有实时性要求的状态等。
但因为 QoS 1 还可能会导致消息重复,所以当我们选择使用 QoS 1 时,还需要能够处理消息的重复,或者能够允许消息的重复。
在我们决定使用 QoS 1 并且不对其进行去重处理之前,我们需要先了解,允许消息的重复,可能意味着什么。
如果我们不对 QoS 1 进行去重处理,我们可能会遭遇这种情况,发布方以 1、2 的顺序发布消息,但最终订阅方接收到的消息顺序可能是 1、2、1、2。如果 1 表示开灯指令,2 表示关灯指令,我想大部分用户都不会接受自己仅仅进行了开灯然后关灯的操作,结果灯在开和关的状态来回变化。
QoS 2
QoS 2 既可以保证消息到达,也可以保证消息不会重复,但传输成本最高。如果我们不愿意自行实现去重方案,并且能够接受 QoS 2 带来的额外开销,那么 QoS 2 将是一个合适的选择。通常我们会在金融、航空等行业场景下会更多地见到 QoS 2 的使用。
如何为 QoS 1 消息去重?
在我们介绍 QoS 1 的时候讲到,QoS 1 消息的重复在协议层面上是无法避免的。所以如果我们想要对 QoS1 消息进行去重,只能从业务层面入手。
一个比较常用且简单的方法是,在每个 PUBLISH 报文的 Payload 中都带上一个时间戳或者一个单调递增的计数,这样上层业务就可以根据当前收到消息中的时间戳或计数是否大于自己上一次接收的消息中的时间戳或计数来判断这是否是一个新消息。
何时向后分发 QoS 2 消息?
我们已经了解到,QoS 2 的流程是非常长的,为了不影响消息的实时性,我们可以在第一次收到 PUBLISH 报文时,就启动消息的向后分发。当然一旦开始向后分发,后续收到在 PUBREL 报文之前到达的 PUBLISH 报文,都不能再重复分发操作,以免消息重复。
不同 QoS 的性能有差距么?
以 EMQX 为例,在相同的硬件配置下进行点对点通信,通常 QoS 0 与 QoS 1 能够达到的吞吐比较接近,不过 QoS 1 的 CPU 占用会略高于 QoS 0,负载较高时,QoS 1 的消息延迟也会进一步增加。而 QoS 2 能够达到的吞吐一般仅为 QoS 0、1 的一半左右。
主题与通配符
主题
MQTT 主题本质上是一个 UTF-8 编码的字符串,是 MQTT 协议进行消息路由的基础。MQTT 主题类似URL 路径,使用斜杠/进行分层
为了避免歧义且易于理解,通常不建议主题以/开头或结尾,例如/chat或chat/
不同于消息队列中的主题(比如 Kafka 和 Pulsar),MQTT 主题不需要提前创建。MQTT 客户端在订阅或发布时即自动的创建了主题,开发者无需再关心主题的创建,并且也不需要手动删除主题。
主题通配符
MQTT 主题通配符包含单层通配符+及多层通配符#,主要用于客户端一次订阅多个主题。
注意: 通配符只能用于订阅,不能用于发布。
单层通配符
加号 (“+” U+002B) 是用于单个主题层级匹配的通配符。在使用单层通配符时,单层通配符必须占据整个层级,例如:
多层通配符
井字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后一个字符,例如:
系统主题-以 $ SYS/开头的主题
以 $SYS/ 开头的主题为系统主题,系统主题主要用于获取 MQTT 服务器自身运行状态、消息统计、客户端上下线事件等数据。目前,MQTT 协议暂未明确规定 $SYS/ 主题标准,但大多数 MQTT 服务器都遵循该标准建议。
例如,EMQX 服务器支持通过以下主题获取集群状态。
EMQX 还支持客户端上下线事件、收发流量、消息收发、系统监控等丰富的系统主题,用户可通过订阅$SYS/# 主题获取所有系统主题消息。
不同场景主题设计
智能家居
比如我们用传感器监测卧室、客厅以及厨房的温度、湿度和空气质量,可以设计以下几个主题:
myhome/bedroom/temperature
myhome/bedroom/humidity
myhome/bedroom/airquality
myhome/livingroom/temperature
myhome/livingroom/humidity
myhome/livingroom/airquality
myhome/kitchen/temperature
myhome/kitchen/humidity
myhome/kitchen/airquality
接下来,可以通过订阅 myhome/bedroom/+ 主题获取卧室的温度、湿度及空气质量数据,订阅myhome/+/temperature 主题获取三个房间的温度数据,订阅 myhome/# 获取所有的数据。
充电桩
充电桩的上行主题格式为 ocpp/cp/${cid}/notify/${action},下行主题格式为 ocpp/cp/${cid}/reply/${action}。
ocpp/cp/cp001/notify/bootNotification
充电桩上线时向该主题发布上线请求。
ocpp/cp/cp001/notify/startTransaction
向该主题发布充电请求。
ocpp/cp/cp001/reply/bootNotification
充电桩上线前需订阅该主题接收上线应答。
ocpp/cp/cp001/reply/startTransaction
充电桩发起充电请求前需订阅该主题接收充电请求应答。
即时消息
chat/user/${user_id}/inbox
一对一聊天:用户上线后订阅该收件箱主题 ,将能接收到好友发送给自己的消息。给好友回复消息时,只需要将该主题的 user_id 换为好友的的 id 即可。
chat/group/${group_id}/inbox
群聊:用户加群成功后,可订阅该主题获取对应群组的消息,回复群聊时直接给该主题发布消息即可。
req/user/${user_id}/add
添加好友:可向该主题发布添加好友的申请(user_id 为对方的 id)。
接收好友请求:用户可订阅该主题(user_id 为自己的 id)接收其他用户发起的好友请求。
resp/user/${user_id}/add
接收好友请求的回复:用户添加好友前,需订阅该主题接收请求结果(user_id 为自己的 id)。
回复好友申请:用户向该主题发送消息表明是否同意好友申请(user_id 为对方的 id)。
user/${user_id}/state
用户在线状态:用户可以订阅该主题获取好友的在线状态。
MQTT 主题常见问题及解答
主题的层级及长度有什么限制吗?
MQTT 协议规定主题的长度为两个字节,因此主题最多可包含 65,535 个字符。
建议主题层级为 7 个以内。使用较短的主题名称和较少的主题层级意味着较少的资源消耗,例如
my-home/room1/data 比 my/home/room1/data 更好。
服务器对主题数量有限制吗?
不同消息服务器对最大主题数量的支持各不一致,目前 EMQX 的默认配置对主题数量没有限制,但是主题数量越多将会消耗越多的服务器内存。考虑到连接到 MQTT Broker 的设备数量一般较多,我们建议一个客户端订阅的主题数量最好控制在 10 个以内。
通配符主题订阅与普通主题订阅性能是否一致?
通配符主题订阅的性能弱于普通主题订阅,且会消耗更多的服务器资源,用户可根据实际业务情况选择订阅类型。
同一个主题能被共享订阅与普通订阅同时使用吗?
可以,但是不建议同时使用。
常见的 MQTT 主题使用建议有哪些?
不建议使用 # 订阅所有主题;
不建议主题以 / 开头或结尾,例如 /chat 或 chat/;
不建议在主题里添加空格及非 ASCII 特殊字符;
同一主题层级内建议使用下划线 _ 或横杆 - 连接单词(或者使用驼峰命名);
尽量使用较少的主题层级;
当使用通配符时,将唯一值的主题层(例如设备号)越靠近第一层越好。例如,
device/00000001/command/# 比 device/command/00000001/# 更好。
持久会话
什么是 MQTT 持久会话?
不稳定的网络及有限的硬件资源是物联网应用需要面对的两大难题,MQTT 客户端与服务器的连接可能随时会因为网络波动及资源限制而异常断开。为了解决网络连接断开对通信造成的影响,MQTT 协议提供了持久会话功能。
MQTT 客户端在发起到服务器的连接时,可以设置是否创建一个持久会话。持久会话会保存一些重要的数据,以使会话能在多个网络连接中继续。持久会话主要有以下三个作用:
避免因网络中断导致需要反复订阅带来的额外开销。
避免错过离线期间的消息。
确保 QoS 1 和 QoS 2 的消息质量保证不被网络中断影响。
持久会话需要存储哪些数据?
通过上文我们知道持久会话需要存储一些重要的数据,以使会话能被恢复。这些数据有的存储在客户端,有的则存储在服务端。
客户端中存储的会话数据:
已发送给服务端,但是还没有完成确认的 QoS 1 与 QoS 2 消息。
从服务端收到的,但是还没有完成确认的 QoS 2 消息。
服务端中存储的会话数据:
会话是否存在,即使会话状态其余部分为空。
已发送给客户端,但是还没有完成确认的 QoS 1 与 QoS 2 消息。
等待传输给客户端的 QoS 0 消息(可选),QoS 1 与 QoS 2 消息。
从客户端收到的,但是还没有完成确认的 QoS 2 消息,遗嘱消息和遗嘱延时间隔。
MQTT Clean Session 的使用
Clean Session 是用来控制会话状态生命周期的标志位,为 true 时表示创建一个新的会话,在客户端断开
连接时,会话将自动销毁。为 false 时表示创建一个持久会话,在客户端断开连接后会话仍然保持,直到会话超时注销。
注意: 持久会话能被恢复的前提是客户端使用固定的 Client ID 再次连接,如果 Client ID 是动态的,那么连接成功后将会创建一个新的持久会话。
MQTT 5.0 中将 Clean Session 拆分成了 Clean Start 与 Session Expiry Interval。Clean Start 用于指定连接时是创建一个全新的会话还是尝试复用一个已存在的会话,Session Expiry Interval 用于指定网络连接断开后会话的过期时间。
Clean Start 为 true 时表示必须丢弃任何已存在的会话,并创建一个全新的会话;为 false 时表示必须使用与 Client ID 关联的会话来恢复与客户端的通信(除非会话不存在)。
Session Expiry Interval 解决了 MQTT 3.1.1 中持久会话永久存在造成的服务器资源浪费问题。设置为 0或未设置,表示断开连接时会话即到期;设置为大于 0 的数值,则表示会话在网络连接关闭后会保持多少秒;设置为 0xFFFFFFFF 表示会话永远不会过期。
会话相关问题
当会话结束后,保留消息还存在么?
MQTT 保留消息不是会话状态的一部分,它们不会在会话结束时被删除。
客户端如何知道当前会话是被恢复的会话?
MQTT 协议从 v3.1.1 开始,就为 CONNACK 报文设计了 Session Present 字段。当服务器返回的该字
段值为 1 时,表示当前连接将会复用服务器保存的会话。客户端可通过该字段值决定在连接成功后是否需
要重新订阅。
使用持久会话时有哪些建议?
不能使用动态 Client ID,需要保证客户端每次连接的 Client ID 都是固定的。
根据服务器性能、网络状况、客户端类型等合理评估会话过期时间。设置过长会占用更多的服务端资源,设置过短会导致未重连成功会话就失效。
当客户端确定不再需要会话时,可使用 Clean Session 为 true 进行重连,重连成功后再断开连接。
如果是 MQTT 5.0 则可在断开连接时直接设置 Session Expiry Interval 为 0,表示连接断开后会话即失效。
保留消息
什么是 MQTT 保留消息?
发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(RetainedMessage)。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
何时使用 MQTT 保留消息?
发布订阅模式虽然能让消息的发布者与订阅者充分解耦,但也存在一个缺点,即订阅者无法主动向发布者请求消息。订阅者何时收到消息完全依赖于发布者何时发布消息,这在某些场景中就产生了不便。
借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,例如:
智能家居设备的状态只有在变更时才会上报,但是控制端需要在上线后就能获取到设备的状态;
传感器上报数据的间隔太长,但是订阅者需要在订阅后立即获取到最新的数据;
传感器的版本号、序列号等不会经常变更的属性,可在上线后发布一条保留消息告知后续的所有订阅者。
MQTT 保留消息的使用
若要使用 MQTT 保留消息,只需在消息发布时将 Retained 状态设置为 true 即可
如何判断一条消息是否是保留消息?
当客户端订阅了有保留消息的主题后,即会收到该主题的保留消息,可通过消息中的保留标志位判断是否是保留消息。需要注意的是,在保留消息发布前订阅主题,将不会收到保留消息。需要待保留消息发布后,重新订阅该主题,才会收到保留消息。
先订阅主题 sensor/t2,然后向该主题发布一条保留消息,该订阅会立即收到一条消息,但是该消息并不是保留消息。当我们删除该订阅,再次重新订阅 sensor/t2 主题时,立即收到了刚刚发布的保留消息。
注意:
1)可以通过Dasgboard查看保留消息
2)MQTT服务器会为每个主题存储最新一条保留消息
3)在保留消息发布前订阅主题,将不会收到保留消息。需要待保留消息发布后,重新订阅该主题,才会收到保留消息。
保留消息将保存多久?如何删除?
服务器只会为每个主题保存最新一条保留消息,保留消息的保存时间与服务器的设置有关。若服务器设置保留消息存储在内存,则 MQTT 服务器重启后消息即会丢失;若存储在磁盘,则服务器重启后保留消息仍然存在。
保留消息存储方式默认存储为ram,存储在内存中,emqx重启后消息丢失;可以设置为disc,存储在磁盘中,重启后消息不会丢失
保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话已结束,保留消息也不会被删除。删除保留消息有以下几种方式:
客户端往某个主题发送一个 Payload 为空的保留消息,服务端就会删除这个主题下的保留消息;
在 MQTT 服务器上删除,比如 EMQX MQTT 服务器提供了在 Dashboard 上删除保留消息的功能;
MQTT 5.0 新增了消息过期间隔属性,发布时可使用该属性设置消息的过期时间,不管消息是否为保留消息,都将会在过期时间后自动被删除。
消息过期间隔
什么是消息过期间隔?
消息过期间隔是 MQTT 5.0 引入的一个新特性,它允许发布端为有时效性的消息设置一个过期间隔,如果该消息在服务端中停留超过了这个指定的间隔,那么服务端将不会再将它分发给订阅端。默认情况下,消息中不会包含消息过期间隔,这表示该消息永远不会过期。
MQTT 的持久会话可以为离线客户端缓存尚未发送的消息,然后在客户端恢复连接时发送。但如果客户端离线时间较长,可能有一些寿命较短的消息已经没有必要必须发送给客户端了,继续发送这些过期的消息,只会浪费网络带宽和客户端资源。
以联网汽车为例,我们可以向车辆发送建议车速使它能够在绿灯期间通过路口,这类消息通常仅在车辆到达下一个路口之前有效,生命周期非常短暂。而前方拥堵提醒这类消息的生命周期则会更长一些,一般会在半小时到 1 小时内有效。
如果客户端在发布消息时设置了过期间隔,那么服务端在转发这个消息时也会包含过期间隔,但过期间隔的值会被更新为服务端接收到的值减去该消息在服务端停留的时间。
何时使用消息过期间隔?
消息过期间隔非常适合在以下场景下使用:
-
与时间强绑定的消息。比如优惠还剩最后两小时这个消息,如果用户在两个小时后才收到它,不会有任何的意义。
-
周期性告知最新状态的消息。仍然以道路拥堵提醒为例,我们需要周期性向车辆发送拥堵的预计结束时间,这个时间会随最新的道路情况而发生变化。所以当最新的消息到达后,之前还未发送的消息也没有必要继续发送了。此时消息的过期间隔将由我们实际的发送周期决定。
-
保留消息。相比于需要再次发送 Payload 为空的保留消息来清除对应主题下的保留消息,为其设置过期时间然后由服务器自动删除显然更加方便,这也可以有效避免保留消息占用过多的存储资源。
遗嘱消息
遗嘱消息是 MQTT 为那些可能出现意外断线的设备提供的将遗嘱优雅地发送给第三方的能力。意外断线包
括但不限于:
因网络故障或网络波动,设备在保持连接周期内未能通讯,连接被服务端关闭
设备意外掉电
设备尝试进行不被允许的操作而被服务端关闭连接,例如订阅自身权限以外的主题等
遗嘱消息可以看作是一个简化版的 PUBLISH 消息,他也包含 Topic, Payload, QoS 等字段。遗嘱消息会在设备与服务端连接时,通过 CONNECT 报文指定,然后在设备意外断线时由服务端将该遗嘱消息发布到连接时指定的遗嘱主题(Will Topic)上。这也意味着服务端必须在回复 CONNACK 之前完成遗嘱消息的存储,以确保之后任一时刻发生意外断线的情况,服务端都能保证遗嘱消息被发布。
Will Retain 的使用场景,它是保留消息与遗嘱消息的结合。如果订阅该遗嘱主题(WillTopic)的客户端不能保证遗嘱消息发布时在线,那么建议为遗嘱消息设置 Will Retain,避免订阅端错过遗嘱消息。
Will Flag 通常是 MQTT 协议实现方关心的字段,它用于标识 CONNECT 报文中是否会包含 WillProperties、Will Topic 等字段。
最后一个是 MQTT 5.0 新增的 Will Properties 字段,属性本身也是 MQTT 5.0 的一个新特性,不同类型的报文有着不同的属性,例如 CONNECT 报文有会话过期间隔(Session Expiry Interval)、最大报文长度(Maximum Packet Size)等属性,SUBSCRIBE 报文则有订阅标识符(Subscription Identifier)等属性。
Will Properties 中的消息过期间隔(Message Expiry Interval)等属性与 PUBLISH 报文中的用法基本一致,只有一个遗嘱延迟间隔(Will Delay Interval)是遗嘱消息特有的属性。
遗嘱延迟间隔顾名思义,就是在连接断开后延迟一段时间才发布遗嘱消息。它的一个重要用途就是避免在设备因网络波动短暂断开连接,但能够快速恢复连接继续提供服务时发出遗嘱消息,并对遗嘱消息订阅方造成困扰。
需要注意的是,具体延迟多久发布遗嘱消息,除了遗嘱延迟间隔,还受限于会话过期间隔,取决于两者谁先发生。所以当我们将会话过期间隔设置为 0 时,即会话在网络连接关闭时过期,那么不管遗嘱延迟间隔的值是多少,遗嘱消息都会在网络连接断开时立即发布。
延迟发布
EMQX的延迟发布功能是一种强大的消息处理机制,它允许用户按照配置的时间间隔延迟发布MQTT消息。以下是对EMQX延迟发布功能的详细解析:
一、功能概述
EMQX的延迟发布功能依赖于其内置的emqx_mod_delayed
模块。当客户端使用特殊主题前缀$delayed/{DelayInterval}
发布消息到EMQX时,将触发延迟发布功能。这意味着消息不会立即被发布到目标主题,而是会在指定的延迟时间后发布。
二、主题格式
延迟发布主题的具体格式如下:
-
$delayed/
:这是延迟发布功能的前缀,所有使用该前缀的主题都将被视为需要延迟发布的消息。 -
{DelayInterval}
:指定MQTT消息延迟发布的时间间隔,单位是秒。允许的最大间隔是4294967秒。如果{DelayInterval}
无法被解析为一个整型数字,EMQX将丢弃该消息,客户端不会收到任何信息。 -
{TopicName}
:MQTT消息的目标主题名称。
例如,$delayed/15/x/y
表示15秒后将MQTT消息发布到主题x/y
;$delayed/60/a/b
表示1分钟后将MQTT消息发布到主题a/b
。
三、使用步骤
-
启用模块:首先,需要在EMQX的模块中启用
emqx_mod_delayed
模块。这通常可以在EMQX的管理界面或配置文件中完成。
-
发布延迟消息:在发布消息时,将主题设置为上述的延迟发布主题格式。例如,如果想要在10秒后发布消息到主题
test/topic
,则可以将主题设置为$delayed/10/test/topic
。 -
等待延迟时间:在指定的延迟时间后,消息将被自动发布到目标主题。
四、应用场景
EMQX的延迟发布功能在多种场景下都非常有用,例如:
-
消息调度:在某些应用中,可能需要按照特定的时间间隔发布消息。通过延迟发布功能,可以轻松实现这一需求。
-
负载均衡:在高并发场景下,通过延迟发布功能可以平滑消息发布速率,减轻系统的瞬时负载。
-
消息去重:在订阅者集群中,通过延迟发布和消息ID的唯一性检查,可以确保消息只被消费一次,避免重复消费问题。
五、注意事项
-
延迟时间限制:EMQX对延迟时间有一定的限制,最大允许间隔为4294967秒。在实际应用中,需要根据具体需求设置合理的延迟时间。
-
模块状态:确保
emqx_mod_delayed
模块已经启用,否则延迟发布功能将无法正常工作。 -
消息丢弃:如果延迟时间间隔无法被解析为整型数字,或者由于其他原因导致消息无法被处理,EMQX将丢弃该消息。因此,在发布延迟消息时,需要确保主题格式正确且延迟时间合理。
用户属性
什么是用户属性
用户属性(User Properties)其实是一种自定义属性,允许用户向 MQTT 消息添加自己的元数据,传输额外的自定义信息以扩充更多应用场景。
它由一个用户自定义的 UTF-8 的键/值对数组组成,并在消息属性字段中配置,只要不超过最大的消息大小,可以使用无限数量的用户属性来向 MQTT 消息添加元数据,并在发布者、MQTT 服务器和订阅者之间传递信息。
如果你熟悉 HTTP 协议的话,该功能与 HTTP 的 Header 的概念非常类似。用户属性有效地允许用户扩展 MQTT 协议,并且可以出现在所有消息和响应中。因为用户属性是由用户定义的,它们只对该用户的实现有意义。
为什么需要使用用户属性
MQTT 3 的协议扩展性能力较差,用户属性其实就是为了解决这个问题,它支持在消息中传递任何信息,确保了用户可扩展标准协议的功能。
对于选择和配置不同的消息类型,用户属性可以在客户端与 MQTT 服务器之间,或者客户端和客户端之间发送。在连接客户端中配置用户属性时,只能在 MQTT 服务器上接收,无法在客户端中接收。如果在发送消息的时候配置用户属性,则可以在其它客户端中接收。常用的有以下两种用户属性配置。
连接客户端的用户属性
当客户端与 MQTT 服务器发起连接时,服务器可以预先定义好一些需要并且可以使用到的元数据信息,即用户属性,当连接成功后,MQTT 服务可以拿到连接发送过来的相关信息进行使用,因此连接客户端的用户属性依赖于 MQTT 服务器。
消息发布的用户属性
消息发布时的用户属性可能是较为常用的,因为它们可以在客户端与客户端之间进行元数据信息传递。比如可以在发布时添加一些常见的信息:消息编号,时间戳,文件,客户端信息和路由信息等属性。
除上述较为常用的用户属性设置外,还可以在订阅 Topic 时,取消订阅时,断开连接时配置用户属性。
订阅
订阅的组成:
1)主题过滤器:决定了服务端将为我们转发哪些主题下的消息
2)订阅选项:允许我们进一步定制服务端的转发行为
订阅选项
MQTT 5.0 提供了 4 个订阅选项,分别是 QoS、No Local、Retain As Published、Retain Handling
QoS
QoS 是最常用的一个订阅选项,它表示服务端在向订阅端发送消息时可以使用的最大 QoS 等级。
客户端可能会在订阅时指定一个小于 2 的 QoS,因为它的实现不支持 QoS 1 或者 QoS 2。而如果服务端支持的最大 QoS 小于客户端订阅时请求的最大 QoS,那么显然服务端将无法满足客户端的要求,这时服务端就会通过订阅的响应报文(SUBACK)告知订阅端最终授予的最大 QoS 等级,订阅端可以自行评估是否接受并继续通信。
一个简单的计算公式:
服务端最终授予的最大 QoS = min ( 服务端支持的最大 QoS, 客户端请求的最大 QoS )
但是,我们在订阅时请求的最大 QoS,并不能限制发布端发布消息时使用的 QoS。当我们订阅时请求的最大 QoS,小于消息发布时的 QoS 时,为了尽可能地投递消息,服务端不会忽略这些消息,而是会在转发时对这些消息的 QoS 进行降级处理。
同样,我们也有一个简单的计算公式:
消息被转发时的 QoS = min ( 消息原始的 QoS, 服务端最终授予的最大 QoS )
No Local
No Local 只有 0 和 1 两个可取值,为 1 表示服务端不能将消息转发给发布这个消息的客户端,为 0 则相反。
这个选项通常被用在桥接场景中。桥接本质上是两个 MQTT Server 建立了一个 MQTT 连接,然后相互订阅一些主题,Server 将客户端的消息转发给另一个 Server,而另一个 Server 则可以将消息继续转发给它的客户端。
那么最简单的一个例子,我们假设两个 MQTT Server 分别是 Server A 和 Server B,它们分别向对方订阅了 # 主题。现在,Server A 将一些来自客户端的消息转发给了 Server B,而当 Server B 查找匹配的订阅时,Server A 也会位于其中。如果 Server B 将消息转发给了 Server A,那么同样 Server A 在收到消息后又会把它们再次转发给 Server B,这样就陷入了无休止的转发风暴。而如果 Server A 和 Server B 在订阅 # 主题的同时,将 No Local 选项设置为 1,就可以完美地避免这个问题。
Retain As Published
Retain As Published 同样只有 0 和 1 两个可取值,为 1 表示服务端在向此订阅转发应用消息时需要保持消息中的 Retain 标识不变,为 0 则表示必须清除。
Retain As Published 与 No Local 一样,同样也是主要适用于桥接场景。我们知道当服务端收到一条保留消息时,除了将它存储起来,还会将它像普通消息一样转发给当前已经存在的订阅者,并且在转发时会清除消息的 Retain 标识。
这在桥接场景下带来了一些问题。我们继续沿用前面的设定,当 Server A 将保留消息转发给 Server B 时,由于消息中的 Retain 标识已经被清除,Server B 将不会知道这原本是一条保留消息,自然不会再存储它。这就导致了保留消息无法跨桥接使用。
那么在 MQTT 5.0 中,我们可以让桥接的服务端在订阅时将 Retain As Published 选项设置为 1,来解决这个问题。
Retain Handling
Retain Handling 这个订阅选项被用来向服务端指示当订阅建立时,是否需要发送保留消息。
我们知道默认情况下,只要订阅建立,那么服务端中与订阅匹配的保留消息就会下发。
但某些时候,客户端可能并不想接收保留消息,比如客户端在连接时复用了会话,但是客户端无法确认上一次连接中是否成功创建了订阅,所以它可能会再次发起订阅。如果订阅已经存在,那么可能保留消息已经被消费过了,也可能服务端已经在会话中缓存了一些离线期间到达的消息,这时客户端可能并不希望服务端发布保留消息。
另外,客户端也可能在任何时刻都不想收到保留消息,即使是第一次订阅。比如我们将开关状态作为保留消息发送,但对某个订阅端来说,开关事件将触发一些操作,那么在这种情况下不发送保留消息是很有用的。
这三种不同的行为,我们可以通过 Retain Handling 来选择。
将 Retain Handling 设置为 0,表示只要订阅建立,就发送保留消息;
将 Retain Handling 设置为 1,表示只有建立全新的订阅而不是重复订阅时,才发送保留消息;
将 Retain Handling 设置为 2,表示订阅建立时不要发送保留消息。
共享订阅
在普通的订阅中,我们每发布一条消息,所有匹配的订阅端都会收到该消息的副本。当某个订阅端的消费速度无法跟上消息的生产速度时,我们没有办法将其中一部分消息分流到其他订阅端中来分担压力。这使订阅端容易成为整个消息系统的性能瓶颈。
所以 MQTT 5.0 引入了共享订阅特性,它使得 MQTT 服务端可以在使用特定订阅的客户端之间均衡地分配消息负载。这表示,当我们有两个客户端共享一个订阅时,那么每个匹配该订阅的消息都只会有一个副本投递给其中一个客户端。
共享订阅不仅为消费端带来了极佳的水平扩展能力,使我们可以应对更高的吞吐量,还为其带来了高可用性,即使共享订阅组中的一个客户端断开连接或发生故障,其他客户端仍然可以继续处理消息,在必要时还可以接管原先流向该客户端的消息流。
共享订阅是 MQTT 5.0 引入的新特性,用于在多个订阅者之间实现订阅的负载均衡,MQTT 5.0 规定的共享订阅主题以 $share 开头。
下图中,3 个订阅者用共享订阅的方式订阅了同一个主题 $share/g/topic,其中 topic 是它们订阅的真实主题名,而 $share/g/ 是共享订阅前缀(g/ 是群组名,可为任意 UTF-8 编码字符串)。
使用共享订阅,我们不需要对客户端的底层代码进行任何改动,只需要在订阅时使用遵循以下命名规范的
$share/{Share Name}/{Topic Filter}
其中 $share 是一个固定的前缀,以便服务端知道这是一个共享订阅主题。{Topic Filter} 则是我们实际想要订阅的主题。
中间的 {Share Name} 是一个由客户端指定的字符串,表示当前共享订阅使用的共享名。很多时候,{Share Name} 这个字段也会被叫作 Group Name 或者 Group ID,这确实会更容易理解一些。
需要共享同一个订阅的一组订阅会话,必须使用相同的共享名。所以 $share/consumer1/sport/# $share/consumer2/sport/# 属于不同的共享订阅组。当一个消息同时与多个共享订阅组使用的过滤器匹配时,服务端会在每个匹配的共享订阅组中选择一个会话发送该消息的副本。这在某个主题的消息有多个不同类型的消费者时非常有用。
共享订阅的负载均衡策略
共享订阅的核心在于服务端如何在客户端之间分配消息负载。比较常见的负载均衡策略有以下几种:
随机(Random),在共享订阅组内随机选择一个会话发送消息。
轮询(Round Robin),在共享订阅组内按顺序选择一个会话发送消息,循环往复。
哈希(Hash),基于某个字段的哈希结果来分配。
粘性(Sticky),在共享订阅组内随机选择一个会话发送消息,此后保持这一选择,直到该会话结束再重复这一过程。
本地优先(Local),随机选择,但优先选择与消息的发布者处于同一节点的会话,如果不存在这样的会话,则退化为普通的随机策略。
随机 和 轮询 这两种策略实现的均衡效果较为接近,所以它们在应用场景上的区别不大,但 随机 策略实际的均衡效果通常还会受到服务端采用的随机算法的影响。
在实际应用中,消息之间可能存在关联,比如属于同一张图片的多个分片显然不适合分发给多个订阅者。在这种情况下,我们就需要基于Client ID 或者 Topic 的 哈希 策略来选择会话。这可以保证来自同一个发布端或者主题的消息始终由共享订阅组中的同一个会话处理。当=然,粘性 策略也有相同的效果。
本地优先 策略比 随机 策略更合适在集群中使用,优先选择本地订阅端的策略可以有效降低消息的延迟。
不过使用这一策略的前提是我们可以确保发布端和订阅端比较均衡地分布在每个节点上,以免不同订阅端上的消息负载差别过大。
共享订阅使用场景
以下是几个典型的共享订阅的使用场景:
后端消费能力与消息的生产能力不匹配时,我们可以借助共享订阅让更多的客户端一起分担负载。
系统需要保证高可用性,特别是在大量消息流入的关键业务上,我们可以通过共享订阅来避免单点故障。
消息的流入量可能会在未来快速增长,需要消费端能够水平扩展,我们可以通过共享订阅来提供高扩展性。
共享订阅使用建议
在共享订阅组内使用相同的 QoS
MQTT 虽然允许一个共享订阅组内的会话使用不同的 QoS 等级,但这可能会使消息在投递给同一个组内的不同会话时存在不同的质量保证。相应地,在出现一些问题的时候,我们的调试也将变得困难重重。所以我们最好在共享订阅组内使用相同的 QoS。
合理地设置会话过期时间
持久会话与共享订阅一起使用是非常常见的。但需要注意,即便共享订阅组中的某个客户端离线,但只要它的会话与订阅仍在存在时,MQTT 服务端仍然会向此会话分发消息。考虑到客户端可能因为故障等原因长时间离线,如果会话的过期时间过长,那么这段时间内将有很多消息因为被投递给离线客户端而无法得到处理。
一个更好的选择可能是,一旦订阅端离线,即便会话没有过期,MQTT 服务端在分配消息负载时也不再考虑这个订阅端。虽然与普通订阅的行为不同,但这是 MQTT 协议允许的。
排它订阅
概念:
排它订阅是MQTT协议中一种特殊的订阅方式,它允许对主题进行互斥订阅,即一个主题在同一时刻仅被允许存在一个订阅者。在当前订阅者未取消订阅前,其他订阅者将无法订阅该主题。这种订阅方式确保了消息的独占性,避免了多个客户端同时处理同一主题的消息,从而减少了消息处理的冲突和复杂性。
使用:
-
前缀规则:在MQTT中,排它订阅通常通过使用特定的主题前缀(如
$exclusive/
)来标识。例如,主题$exclusive/t/1
就是一个排它订阅的主题。 -
配置启用:排它订阅功能需要在EMQX等MQTT Broker中进行配置和启用。用户可以通过配置文件(如
etc/emqx.conf
)或Dashboard来开启排它订阅功能。 -
订阅操作:一旦排它订阅功能被启用,客户端就可以尝试订阅带有
$exclusive/
前缀的主题。如果某个客户端已经订阅了该主题,那么其他客户端在尝试订阅同一主题时将失败,直到前一个客户端取消订阅为止。 -
错误处理:当客户端尝试订阅已被其他客户端订阅的排它主题时,MQTT Broker会返回一个错误码(如0x97),表示该主题已被订阅。
自动订阅
概念:
自动订阅是MQTT协议中一种便捷的消息订阅方式,它允许设备在成功连接到MQTT Broker后,自动根据预设的规则订阅指定的主题。这种方式简化了设备连接和消息订阅的流程,提高了系统的自动化程度。
使用:
-
配置规则:在MQTT Broker中,用户可以通过配置文件、Dashboard或外部数据库等方式来设置自动订阅的规则。这些规则包括要订阅的主题、服务质量(QoS)等选项。在DashBoard中,管理-->MQTT高级属性-->自动订阅-->添加
-
连接触发:当设备成功连接到MQTT Broker时,Broker会根据预设的规则自动为设备订阅相应的主题。这意味着设备无需手动发送订阅请求即可开始接收消息。
-
动态调整:在某些高级场景中,MQTT Broker还支持动态调整自动订阅的规则。例如,可以根据设备的类型、状态或外部条件来动态添加、删除或修改订阅的主题。
-
注意事项:虽然自动订阅提高了系统的自动化程度,但也需要用户谨慎配置规则,以避免不必要的消息订阅和资源浪费。同时,对于敏感或重要的消息主题,建议采用手动订阅方式进行精确控制。