mqtt协议介绍
简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅范式的“轻量级”消息协议,由 IBM 发布。
IoT 设备要运作,就必须连接到互联网,设备才能相互协作,以及与后端服务协同工作。而互联网的基础网络协议是 TCP/IP,MQTT 协议是基于 TCP/IP 协议栈而构建的,因此它已经慢慢的已经成为了 IoT 通讯的标准。
优点:代码量少,开销低,带宽占用小,即时通讯协议。
mqtt协议格式
-
固定报头(Fixed Header):
- 1字节:控制字段(Control Packet Type、Flags等)。
- 1字节:剩余长度(Remaining Length),表示后续可变报头和负载的字节数。前7位用于保存长度,后一部用做标识。当最后一位为 1时,表示长度不足,需要使用二个字节继续保存。
-
可变报头(Variable Header):
- 根据不同的消息类型(如CONNECT、PUBLISH、SUBSCRIBE等),可变报头的内容和格式会有所不同。例如:
- CONNECT:包含协议名称、版本号、连接标志、保持时间等。
- PUBLISH:包含主题名、消息标识符(可选)等。
- SUBSCRIBE:包含主题订阅请求的相关信息。
- 根据不同的消息类型(如CONNECT、PUBLISH、SUBSCRIBE等),可变报头的内容和格式会有所不同。例如:
-
负载(Payload):
- 消息的实际内容,长度可变,取决于具体的应用。
整体MQTT的消息格式如下图所示
mqtt更多协议介绍:
https://github.com/mcxiaoke/mqtt
https://mcxiaoke.gitbooks.io/mqtt-cn/content/
vertx介绍
Vert.x是Eclipse基金会下面的一个开源项目,Vert.x的基本定位是一个事件驱动的编程框架,通过Vert.x使用者可以用相对低的成本就享受到NIO带来的高性能。netty是Vert.x底层使用的通讯组件,Vert.x为了最大限度的降低使用门槛,刻意屏蔽掉了许多底层netty相关的细节,比如ByteBuf、引用计数等等。
Mqtt Server
mqtt server通过spi发现的方式加载启动,启动过程中的参数通过读取配置文件和环境变量的设置来赋值,随后判断是否需要启动服务,变量包含:服务是否启动,启动服务的端口,是否需要鉴权等。
spi接口定义
ToolBox是内部封装的工具类,方便后续服务调用,包含redis,mq,环境变量等引用。toolbox不在这里展开了。
public interface ServerStarter {void init(ToolBox toolBox);Future<Void> run();
}
MqttServer
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import lombok.extern.slf4j.Slf4j;/*** @author yan* @since 2024-10-22*/
@Slf4j
public class EmsMqttServer implements ServerStarter {private ToolBox toolBox;// 服务是否启动private boolean enable;// 服务端口private Integer port;// 是否需要鉴权private boolean auth;// 鉴权处理器private final MqttAuthHandler mqttAuthHandler;// 发布消息处理器private final PublicHandler publicHandler;// 订阅消息处理器private final SubscribeHandler subscribeHandler;// 取消订阅处理器private final UnsubscribeHandler unsubscribeHandler;public EmsMqttServer() {this.mqttAuthHandler = new MqttAuthHandler();this.publicHandler = new PublicHandler();this.subscribeHandler = new SubscribeHandler();this.unsubscribeHandler = new UnsubscribeHandler();}@Overridepublic void init(ToolBox toolBox) {this.toolBox = toolBox;JsonObject config = toolBox.configRetriever().getCachedConfig();JsonObject optionsConfig = config.getJsonObject("emsServer");this.enable = optionsConfig.getBoolean("enabled", true);if (enable) {port = optionsConfig.getInteger("port");auth = optionsConfig.getBoolean("auth");}}@Overridepublic Future<Void> run() {if (!enable || port == null) {return Future.succeededFuture();}MqttServer mqttServer = MqttServer.create(toolBox.vertx());return mqttServer.endpointHandler(endpoint -> {// shows main connect infolog.info("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());// 关闭连接endpoint.closeHandler(h -> {log.info("close");});handle(endpoint);}).listen(port).onComplete(arr -> {if (arr.failed()) {log.error("mqtt server error! cause:", arr.cause());}MqttServer result = arr.result();log.info("mqtt sever start, port:" + result.actualPort());}).mapEmpty();}private void handle(MqttEndpoint endpoint) {if (auth) {MqttAuth mqttAuth = endpoint.auth();if (mqttAuth == null) {log.error("miss auth info, connection close");endpoint.close();return;}if (endpoint.will() != null) {System.out.println("[will topic = " + endpoint.will().getWillTopic() +" QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");}mqttAuthHandler.handle(mqttAuth).onSuccess(v -> {endpoint.accept(true);endpoint.publishAutoAck(true);endpoint.subscriptionAutoAck(true);endpoint.subscribeHandler(subscribeHandler);endpoint.publishHandler(publicHandler);endpoint.unsubscribeHandler(unsubscribeHandler);}).onFailure(ar -> {log.error("auth error:", ar);endpoint.close();});} else {endpoint.publishAutoAck(true);endpoint.subscriptionAutoAck(true);endpoint.subscribeHandler(subscribeHandler);endpoint.publishHandler(publicHandler);endpoint.unsubscribeHandler(unsubscribeHandler);}}
}
这里vertx的MqttServer其实做了很简单的封装,netty启动了一个tcp服务,然后再pileline加入的mqtt协议的编解码处理器,处理成mqttmessage,vertx在此基础上做了一层很薄的封装,大部分编解码的工作netty自带的mqtt编解码处理器已经处理好了。
跟进去MqttServer.create()方法,找到listen方法
MqttAuthHandler
这里处理鉴权的逻辑,返回鉴权结果
/*** @author yan* @since 2024-10-22*/
@Slf4j
public class MqttAuthHandler {public Future<Void> handle(MqttAuth event) {log.info("username:" + event.getUsername() + ", password:" + event.getPassword());// 这里些鉴权的逻辑,返回鉴权结果return Future.failedFuture("auth fail, wrong username or password");}
}
PublicHandler
client发布消息的时候触发的处理器
/*** @author yan* @since 2024-10-23*/
@Slf4j
public class PublicHandler implements Handler<MqttPublishMessage> {@Overridepublic void handle(MqttPublishMessage event) {String topic = event.topicName();byte[] bytes = event.payload().getBytes();String msg = new String(bytes);log.info("topic:" + topic + "\n msg:" + msg);}
}
SubscribeHandler
设备订阅的时候触发的处理器
/*** @author yan* @since 2024-10-23*/
@Slf4j
public class SubscribeHandler implements Handler<MqttSubscribeMessage> {@Overridepublic void handle(MqttSubscribeMessage event) {List<MqttTopicSubscription> topicSubscriptionList = event.topicSubscriptions();for (MqttTopicSubscription subscription : topicSubscriptionList) {log.info("subscribe topic:" + subscription.topicName() + ",Qos:" + subscription.qualityOfService().value());}}
}
UnsubscribeHandler
client取消订阅的时候触发的处理器
/*** @author yan* @since 2024-10-23*/
@Slf4j
public class UnsubscribeHandler implements Handler<MqttUnsubscribeMessage> {@Overridepublic void handle(MqttUnsubscribeMessage event) {for (String topic : event.topics()) {log.info("unsubscribe:" + topic);}}
}