MQTT
概述
消息队列遥测传输(英语:Message Queuing Telemetry Transport,缩写:MQTT),是基于发布(Publish)/订阅(Subscribe)范式的消息协议,位于TCP/IP协议族的应用层。
MQTT 常现身于物联网项目中,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。
MQTT 协议规定了两种网络实体:消息代理(中间人)与客户端。消息代理负责接收来自客户端的消息,并转发给目标客户端。
信息的转发是通过主题(topic)管理的。发布者有需要分发的数据时,其向连接的消息代理发送携带有数据的控制消息。代理会向订阅此主题的客户端分发此数据。发布者不需要知道订阅者的数据和具体位置;同样,订阅者不需要配置发布者的相关信息。
MQTT 基于 TCP 协议,用于数据传输。变体 MQTT-SN 用于在蓝牙上传输,基于 UDP。
Topic 主题
MQTT topic 是 MQTT 协议中用于识别和路由消息的字符串。它是 MQTT 发布者和订阅者之间沟通的关键要素。在 MQTT 发布/订阅模型中,发布者向特定主题发送消息,而订阅者可以订阅这些主题以接收消息。
MQTT 主题不需要专门花功夫创建,客户端在订阅或者发布时,代理会自动创建;也无需删除,都由代理自动处理。
主题是一个 UTF-8 编码的字符串,它是 MQTT 协议中消息路由的基础。主题通常被分级,并在级别之间用斜杠/
分隔。这类似于 URL 路径,例如:
root/sensor/#
root/sensor/temperature
root/sensor/smoke
虽然协议不禁止以\
开头或结尾的主题,但是不建议使用。例如
/root
root/
通配符
这里先普及一下通配符的概念:通配,通用适配,常用于模糊搜索。即使用特殊的符号来模糊掉具体的关键字,使查找方可以一次性查找模糊后的多个结果。
下面是 MQTT 的通配符介绍:
MQTT 通配符是一种特殊类型的主题,只能用于订阅,不能用于发布。客户端可以订阅通配符主题以接收来自多个匹配主题的消息,无需单独订阅每个主题并减少开销。MQTT 支持两种类型的通配符:+
(单级)和#
(多级)。
单级通配符+
是仅匹配单级主题的通配符,使用单级通配符时,单级通配符必须占据整个级别,例如:
+ // True
root/sensor/+ // True
root/+/temperature // True
root/sensor+ // False(没有占据整个级别)
如果订阅了root/+/temperature
主题,那么:
// 可以收到以下主题的消息
root/sensor/temperature
root/device/temperature
root/any/temperature
// 无法收到以下主题的消息
root/temperature
root/device/sensor/temperature
多级通配符#
可匹配主题中任意数量的级别。使用多级通配符时,它必须占据整个级别,并且必须是主题的最后一个字符,例如:
# // True(将匹配所有主题)
root/# // True
root/sensor# // False(没有占据整个级别)
root/#/sensor // False(#不在主题字符串的最后一个字节)
如果订阅了root/#
主题,那么:
// 可以收到以下主题的消息
root/sensor
root/device
root/sensor/temperature
root/sensor/smoke
// 无法收到以下主题的消息
ruut/sensor
QoS 服务质量
在不稳定的网络环境中,MQTT 设备可能难以仅使用 TCP 传输协议来确保可靠的通信。为了解决这个问题,MQTT包括一个服务质量(QoS)机制,该机制提供了各种消息交互选项,以提供不同级别的服务,以满足用户在不同场景下对可靠消息传递的特定要求。
MQTT 中有 3 个 QoS 级别:(随着QoS级别的提高,消息传递的可靠性也随之提高,但传输过程的复杂性也随之增加)
- QoS 0,最多一次。(无握手)
- QoS 1,至少一次。(握手一次)
- QoS 2,正好一次。(握手两次)
上述握手包括 客户端到代理 和 代理到客户端 的两段,即当 客户甲 以 QoS 1 发布主题 A 时,客户甲 和代理之间进行一次握手;当客户乙 以 QoS 2 订阅主题 A 时,代理与客户乙进行两次握手。
QoS 0
QoS 0 是最低级别的服务,也称为“即发即弃”。在这种模式下,发送方无需等待确认或存储和重新传输消息,因此接收方无需担心接收重复消息。
为什么 QoS 0 消息会有丢失概率?
QoS 0 消息传输的可靠性取决于 TCP 连接的稳定性。如果连接稳定,TCP可以保证消息的成功下发。但是,如果连接关闭或重置,则存在传输中的消息或操作系统缓冲区中的消息丢失的风险,从而导致 QoS 0 消息传送失败。
QoS 1
QoS 1 引入了确认和重传机制。当发送方收到来自接收方的 PUBACK 数据包时,它认为消息已成功传递。在此之前,发送方必须存储 PUBLISH 数据包以进行可能的重新传输。
发送方使用每个数据包中的数据包 ID 将 PUBLISH 数据包与相应的 PUBACK 数据包进行匹配。这允许发送方识别并从其缓存中删除正确的 PUBLISH 数据包。
为什么QoS 1 的消息会重复?
在两种情况下,发送方不会收到 PUBACK 数据包。
- PUBLISH 数据包未到达接收方。
- PUBLISH 数据包到达接收方,但发送方尚未收到接收方的 PUBACK 数据包。
在第一种情况下,发送方将重新传输 PUBLISH 数据包,但接收方将只接收一次消息。
在第二种情况下,发送方将重新传输 PUBLISH 数据包,接收方将再次接收它,从而产生重复的消息(且此消息在接收方看来是新的消息)。
QoS 2
QoS 2 确保消息不会丢失或重复,这与 QoS 0 和 1 不同。但是,它还具有最复杂的交互和最高的开销,因为它要求每个消息传递的发送方和接收方之间至少有两次握手。
- 要启动 QoS 2 消息传输,发送方首先存储并发送具有 QoS 2 的 PUBLISH 数据包,然后等待接收方的 PUBREC 响应数据包。此过程类似于 QoS 1,不同之处在于响应数据包是 PUBREC 而不是 PUBACK。
- 收到 PUBREC 数据包后,发送方可以确认接收方已收到 PUBLISH 数据包,并可以删除其本地存储的副本。它不再需要也无法重新传输此数据包。然后,发送方发送一个 PUBREL 数据包,通知接收方它已准备好释放数据包 ID。与 PUBLISH 数据包一样,PUBREL 数据包需要可靠地传送到接收方,因此它被存储以备潜在的重传,并且需要响应数据包。
- 当接收方收到 PUBREL 数据包时,它可以确认在此传输流中不会收到其他重新传输的 PUBLISH 数据包。因此,接收方使用 PUBCOMP 数据包进行响应,以发出信号,表明它已准备好将当前数据包 ID 重用于新消息。
- 当发送方收到 PUBCOMP 数据包时,QoS 2 流完成。然后,发送方可以使用当前数据包 ID 发送新消息,接收方会将其视为新消息。
为什么 QoS 2 消息不重复?
用于确保 QoS 2 消息不会丢失的机制与用于 QoS 1 的机制相同,因此此处不再讨论。
与 QoS 1 相比,QoS 2 通过添加涉及 PUBREL 和 PUBCOMP 数据包的新进程来确保消息不会重复。
在进一步讨论之前,让我们快速回顾一下 QoS 1 无法避免消息重复的原因。
当我们使用 QoS 1 时,对于接收方,无论响应是否已到达发送方,在发送 PUBACK 数据包后,数据包 ID 都会再次可用。这意味着接收方无法确定其稍后收到的具有相同数据包 ID 的 PUBLISH 数据包是否由于未收到 PUBACK 响应而从发送方重新传输,或者发送方是否在收到 PUBACK 响应后重复使用数据包 ID 发送新消息。这就是 QoS 1 无法避免消息重复的原因。
在 QoS 2 中,发送方和接收方使用 PUBREL 和 PUBCOMP 数据包来同步数据包 ID 的释放,从而确保发送方在重新传输消息还是发送新消息方面达成共识。这是避免 QoS 1 中可能出现重复消息问题的关键。
在 QoS 2 中,允许发送方在接收接收方接收 PUBREC 数据包之前重新传输 PUBLISH 数据包。发送方收到 PUBREC 并发送 PUBREL 数据包后,将进入数据包 ID 释放过程。发送方在收到来自接收方的 PUBCOMP 数据包之前,无法重新传输 PUBLISH 数据包或发送具有当前数据包 ID 的新消息。
因此,接收方可以使用 PUBREL 数据包作为边界,并将在其之前到达的任何 PUBLISH 数据包视为重复数据包,将在其之后到达的任何 PUBLISH 数据包视为新数据包。这使我们能够在使用 QoS 2 时避免协议级别的消息重复。
keep alive 保活
MQTT 协议托管在 TCP 协议之上,TCP 协议是面向连接的,并在两个连接方之间提供稳定有序的字节流。但是,在某些情况下,TCP 可能会出现半连接问题。半连接是指在一侧断开或未建立连接,而另一侧的连接仍保持连接。在这种情况下,半连接方可能会不断发送数据,而数据显然永远不会到达另一端。为了避免半连接导致的通信黑洞,MQTT协议提供了Keep Alive机制(以下统称保活机制),允许客户端和MQTT服务器判断是否存在半连接问题,并关闭相应的连接。
当客户端与代理创建连接时,通过将连接请求协议包中的Keep Alive变量标头字段设置为非零值,表示客户端发送的连续两个MQTT数据包的时间间隔最大值。如果客户端发送的 Keep Alive 字段不存在值,则代理不会启动保活机制。在 MQTT5 中,引入了 Server Keep Alive 的概念,即由代理来设置这个保活时间。
保活过程
客户端进程
建立连接后,客户端需要确保其发送的任意两个 MQTT 协议报文之间的间隔不超过 Keep Alive 值。如果客户端处于空闲状态并且没有要发送的数据包,则可以改为发送 PINGREQ 协议数据包。
当客户端发送 PINGREQ 数据包时,代理必须返回 PINGRESP 数据包。如果客户端在可靠时间内没有收到来自服务器的 PINGRESP 报文,则表示连接已中断,代理处于离线状态,或者出现网络故障,客户端应关闭连接。
代理流程
建立连接后,如果代理在 Keep Alive 时间的 1.5 倍内没有收到来自客户端的任何报文,则会认为与客户端的连接存在问题,并且代理将与客户端断开连接。
如果代理收到来自客户端的 PINGREQ 协议报文,则需要回复 PINGRESP 协议报文进行确认。
Will Message 遗言
Keep Alive 通常与 Will Message 结合使用,它允许设备在发生意外离线事件时及时通知其他客户端。
保留消息
如果消息代理接受到某个主题上的消息,且这个主题没有任何订阅,那么代理就会丢弃之,除非发布者将其标记为保留消息(retained message)。
共享订阅
共享订阅是 MQTT 5.0 的一项特性,MQTT 5.0 是一种在多个订阅者之间实现负载均衡的订阅方式。共享订阅的主题以 $share 开头。
同非共享订阅一样,共享订阅包含一个主题过滤器和订阅选项,唯一的区别在于共享订阅的主题过滤器格式必须是 $share/{ShareName}/{filter}
这种形式。这几个的字段的含义分别是:
$share
前缀表明这是一个共享订阅的主题{ShareName}
是一个不包含 “/”, “+” 以及 “#” 的字符串。订阅会话通过使用相同的{ShareName}
表示位于同一个共享组,匹配该订阅的消息每次只会发布给一个共享组中的其中一个会话(有点绕,看后面例子会更清晰){filter}
即非共享订阅中的主题过滤器,如root/sensor/smoke
假设某系统有四个订阅者A B C D,一个发布者X;
其中 A和B 共处共享组 team1,C和D 共处共享组 team2;
A B C D 都订阅主题topic/msg
。
$share/team1/topic/msg +---++----------------------->| A |/ +---+// $share/team1/topic/msg +---+/ +------------------------>| B |
+-------+ topic/msg +--------+ / +---+
| X | -----------> | Broker |
+-------+ "hello" +--------+ \ $share/team2/topic/msg +---+\ +------------------------>| C |\ +---+\\ $share/team2/topic/msg +---++------------------------| D |+---+
结果分析:
X向主题topic/msg
发送了一个消息 “hello”,
A和B其中随机一个会收到 “hello”,因为他俩订阅了共享组team1的topic/msg
;
C和D其中随机一个会收到 “hello”,因为他俩订阅了共享组team2的topic/msg
。
代理只会向共享组team1发送一次"hello";同理 team2 也只发一次。