Vertx实现一个通用的MqttServer

 mqtt协议介绍

简介

        MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅范式的“轻量级”消息协议,由 IBM 发布。

        IoT 设备要运作,就必须连接到互联网,设备才能相互协作,以及与后端服务协同工作。而互联网的基础网络协议是 TCP/IP,MQTT 协议是基于 TCP/IP 协议栈而构建的,因此它已经慢慢的已经成为了 IoT 通讯的标准。

优点:代码量少,开销低,带宽占用小,即时通讯协议。

mqtt协议格式

  • 固定报头(Fixed Header)

    • 1字节:控制字段(Control Packet Type、Flags等)。
    • 1字节:剩余长度(Remaining Length),表示后续可变报头和负载的字节数。前7位用于保存长度,后一部用做标识。当最后一位为 1时,表示长度不足,需要使用二个字节继续保存。
  • 可变报头(Variable Header)

    • 根据不同的消息类型(如CONNECT、PUBLISH、SUBSCRIBE等),可变报头的内容和格式会有所不同。例如:
      • CONNECT:包含协议名称、版本号、连接标志、保持时间等。
      • PUBLISH:包含主题名、消息标识符(可选)等。
      • SUBSCRIBE:包含主题订阅请求的相关信息。
  • 负载(Payload)

    • 消息的实际内容,长度可变,取决于具体的应用。

        整体MQTT的消息格式如下图所示

        mqtt更多协议介绍:

        https://github.com/mcxiaoke/mqtt

        https://mcxiaoke.gitbooks.io/mqtt-cn/content/

 

vertx介绍

        Vert.x是Eclipse基金会下面的一个开源项目,Vert.x的基本定位是一个事件驱动的编程框架,通过Vert.x使用者可以用相对低的成本就享受到NIO带来的高性能。netty是Vert.x底层使用的通讯组件,Vert.x为了最大限度的降低使用门槛,刻意屏蔽掉了许多底层netty相关的细节,比如ByteBuf、引用计数等等。

Mqtt Server

        mqtt server通过spi发现的方式加载启动,启动过程中的参数通过读取配置文件和环境变量的设置来赋值,随后判断是否需要启动服务,变量包含:服务是否启动,启动服务的端口,是否需要鉴权等。

spi接口定义

        ToolBox是内部封装的工具类,方便后续服务调用,包含redis,mq,环境变量等引用。toolbox不在这里展开了。

public interface ServerStarter {void init(ToolBox toolBox);Future<Void> run();
}

MqttServer


import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import lombok.extern.slf4j.Slf4j;/*** @author yan* @since 2024-10-22*/
@Slf4j
public class EmsMqttServer implements ServerStarter {private ToolBox toolBox;// 服务是否启动private boolean enable;// 服务端口private Integer port;// 是否需要鉴权private boolean auth;// 鉴权处理器private final MqttAuthHandler mqttAuthHandler;// 发布消息处理器private final PublicHandler publicHandler;// 订阅消息处理器private final SubscribeHandler subscribeHandler;// 取消订阅处理器private final UnsubscribeHandler unsubscribeHandler;public EmsMqttServer() {this.mqttAuthHandler = new MqttAuthHandler();this.publicHandler = new PublicHandler();this.subscribeHandler = new SubscribeHandler();this.unsubscribeHandler = new UnsubscribeHandler();}@Overridepublic void init(ToolBox toolBox) {this.toolBox = toolBox;JsonObject config = toolBox.configRetriever().getCachedConfig();JsonObject optionsConfig = config.getJsonObject("emsServer");this.enable = optionsConfig.getBoolean("enabled", true);if (enable) {port = optionsConfig.getInteger("port");auth = optionsConfig.getBoolean("auth");}}@Overridepublic Future<Void> run() {if (!enable || port == null) {return Future.succeededFuture();}MqttServer mqttServer = MqttServer.create(toolBox.vertx());return mqttServer.endpointHandler(endpoint -> {// shows main connect infolog.info("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());// 关闭连接endpoint.closeHandler(h -> {log.info("close");});handle(endpoint);}).listen(port).onComplete(arr -> {if (arr.failed()) {log.error("mqtt server error! cause:", arr.cause());}MqttServer result = arr.result();log.info("mqtt sever start, port:" + result.actualPort());}).mapEmpty();}private void handle(MqttEndpoint endpoint) {if (auth) {MqttAuth mqttAuth = endpoint.auth();if (mqttAuth == null) {log.error("miss auth info, connection close");endpoint.close();return;}if (endpoint.will() != null) {System.out.println("[will topic = " + endpoint.will().getWillTopic() +" QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");}mqttAuthHandler.handle(mqttAuth).onSuccess(v -> {endpoint.accept(true);endpoint.publishAutoAck(true);endpoint.subscriptionAutoAck(true);endpoint.subscribeHandler(subscribeHandler);endpoint.publishHandler(publicHandler);endpoint.unsubscribeHandler(unsubscribeHandler);}).onFailure(ar -> {log.error("auth error:", ar);endpoint.close();});} else {endpoint.publishAutoAck(true);endpoint.subscriptionAutoAck(true);endpoint.subscribeHandler(subscribeHandler);endpoint.publishHandler(publicHandler);endpoint.unsubscribeHandler(unsubscribeHandler);}}
}

        这里vertx的MqttServer其实做了很简单的封装,netty启动了一个tcp服务,然后再pileline加入的mqtt协议的编解码处理器,处理成mqttmessage,vertx在此基础上做了一层很薄的封装,大部分编解码的工作netty自带的mqtt编解码处理器已经处理好了。

        跟进去MqttServer.create()方法,找到listen方法

MqttAuthHandler

        这里处理鉴权的逻辑,返回鉴权结果

/*** @author yan* @since 2024-10-22*/
@Slf4j
public class MqttAuthHandler {public Future<Void> handle(MqttAuth event) {log.info("username:" + event.getUsername() + ", password:" + event.getPassword());// 这里些鉴权的逻辑,返回鉴权结果return Future.failedFuture("auth fail, wrong username or password");}
}

PublicHandler

        client发布消息的时候触发的处理器

/*** @author yan* @since 2024-10-23*/
@Slf4j
public class PublicHandler implements Handler<MqttPublishMessage> {@Overridepublic void handle(MqttPublishMessage event) {String topic = event.topicName();byte[] bytes = event.payload().getBytes();String msg = new String(bytes);log.info("topic:" + topic + "\n msg:" + msg);}
}

SubscribeHandler

        设备订阅的时候触发的处理器

/*** @author yan* @since 2024-10-23*/
@Slf4j
public class SubscribeHandler implements Handler<MqttSubscribeMessage> {@Overridepublic void handle(MqttSubscribeMessage event) {List<MqttTopicSubscription> topicSubscriptionList = event.topicSubscriptions();for (MqttTopicSubscription subscription : topicSubscriptionList) {log.info("subscribe topic:" + subscription.topicName() + ",Qos:" + subscription.qualityOfService().value());}}
}

UnsubscribeHandler

        client取消订阅的时候触发的处理器

/*** @author yan* @since 2024-10-23*/
@Slf4j
public class UnsubscribeHandler implements Handler<MqttUnsubscribeMessage> {@Overridepublic void handle(MqttUnsubscribeMessage event) {for (String topic : event.topics()) {log.info("unsubscribe:" + topic);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/56506.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据分析-Apache_hive

任务一 创建库及外部表 在 comm 数 据 库 下 创 建 一 个 名 为 dws_behavior_log的外部表&#xff0c;如果表已存在&#xff0c;则先删除&#xff1b;分 区字段为dt&#xff0c;即根据日期进行分区&#xff1b;另外&#xff0c;要求指定表的存 储路径为HDFS的/behavior/dws/d…

java让系统直接播放音频/java获取音频流输送到播放设备SourceDataLine

Java Sound API支持常见的格式&#xff1a; WAV&#xff08;Waveform Audio File Format&#xff09; 常见的无损音频格式&#xff0c;支持 PCM&#xff08;脉冲编码调制&#xff09;编码。 AIFF&#xff08;Audio Interchange File Format&#xff09; 主要用于 Macintosh 系统…

OpenCV视觉分析之运动分析(2)背景减除类:BackgroundSubtractorKNN的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 K-最近邻&#xff08;K-nearest neighbours, KNN&#xff09;基于的背景/前景分割算法。 该类实现了如 319中所述的 K-最近邻背景减除。如果前景…

058_基于python时尚女装抖音号评论数据分析系统

目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍&#xff1a;CodeMentor毕业设计领航者、全网关注者30W群落&#xff0c;InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者&#xff0c;博客领航之星、开发者头条/腾讯云/AW…

智慧城管综合管理系统源码,微服务架构,基于springboot、vue+element+uniapp技术开发,支持二次开发

智慧城管源码&#xff0c;智慧城管执法办案系统源码 智慧城管综合执法办案平台是智慧城市框架下&#xff0c;依托物联网、云计算、多网融合等现代化技术&#xff0c;运用数字基础资源、多维信息感知、协同工作处置、智能化辅助决策分析等手段&#xff0c;形成具备高度感知、互联…

C++共同体

共同体是一种数据格式&#xff0c;他能储存不同的数据类型&#xff0c;但是同一时间只能储存其中的一种类型。 语法&#xff1a; union 共同体名 { 成员一的数据类型 成员名一&#xff1b; 成员二的数据类型 成员名二&#xff1b; 成员n的数据类型 成员名n&#xff1b; }

目标检测算法-YOLOV11解析

原文首发于微信公众号 微信公众号-人工智能与图像处理&#xff1a;目标检测算法-YOLOV11解析 一&#xff0c;YOLOV11概述 YOLOv11是由Ultralytics公司开发的新一代目标检测算法&#xff0c;它在之前YOLO版本的基础上进行了显著的架构和训练方法改进。整合了改进的模型结构设计…

Redis Search系列 - 第四讲 支持中文

目录 一、支持中文二、自定义中文词典2.1 Redis Search设置FRISOINI参数2.2 friso.ini文件相关配置1&#xff09;自定义friso UTF-8字典2&#xff09;修改friso.ini配置文件 三、实测中文分词效果 一、支持中文 Redis Stack 从版本 0.99.0 开始支持中文文档的添加和分词。中文…

Django+Vue智慧分析居家养老系统统的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 需要的环境3.2 Django接口层3.3 实体类3.4 config.ini3.5 启动类3.5 Vue 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平台Java领域优质创作者&…

【学习笔记】强化学习

李宏毅深度强化学习 笔记 课程主页&#xff1a;NTU-MLDS18 视频&#xff1a;youtube B站 参考资料&#xff1a; 作业代码参考 纯numpy实现非Deep的RL算法 OpenAI tutorial 文章目录 李宏毅深度强化学习 笔记1. Introduction2. Policy Gradient2.1 Origin Policy Gradient2.2…

基于大型语言模型的智能网页抓取

Google Gemini 是 Google AI 创建的大型语言模型 (LLM) 系列&#xff0c;可提供最先进的 AI 功能。Gemini 模型包括&#xff1a; Gemini Ultra — 最大、最强大的模型&#xff0c;擅长处理编码、逻辑推理和创意协作等复杂任务。可通过 Gemini Advanced&#xff08;原名 Bard&a…

【Linux】基础IO-上

1、共识原理 1、文件 内容 属性 2、文件分为打开的文件和没打开的文件 3、打开的文件是谁打开的&#xff1f; 答案是&#xff1a;进程&#xff01;---本质是研究进程和文件的关系 文件被打开必须先被加载到内存&#xff0c;一个进程可以打开多个文件。因此&#xff0c;在OS内…

NVR小程序接入平台/设备EasyNVR多个NVR同时管理的高效解决方案

在当今的数字化安防时代&#xff0c;视频监控系统的需求日益复杂和多样化。为了满足不同场景下的监控需求&#xff0c;一种高效、灵活且兼容性强的安防视频监控平台——NVR批量管理软件/平台EasyNVR应运而生。本篇探讨这一融合所带来的创新与发展。 一、NVR监测软件/设备EasyNV…

mysql 13 MySQL基于规则的优化

01.条件化简 我们编写的查询语句的搜索条件本质上是一个表达式&#xff0c;这些表达式可能比较繁杂&#xff0c;或者不能高效的执行&#xff0c; MySQL的查询优化器会为我们简化这些表达式。为了方便大家理解&#xff0c;我们后边举例子的时候都使用诸如 a 、 b 、 c 之类的简…

shell——正则表达式入门

目录 一、常规匹配 二、特殊字符 ^ $ . * 字符区间 \ 三、示例 shell中总是会需要对文本字符串做各种各样的剪切拼接等操作&#xff0c;除了 basename 和 dirname 这种简单的函数外&#xff0c;还可以用正则表达式&#xff0c;定义模糊匹配的筛选规则 一、常规匹配 管…

Pyqt5设计打开电脑摄像头+可选择哪个摄像头(如有多个)

目录 专栏导读库的安装代码介绍完整代码总结 专栏导读 &#x1f338; 欢迎来到Python办公自动化专栏—Python处理办公问题&#xff0c;解放您的双手 &#x1f3f3;️‍&#x1f308; 博客主页&#xff1a;请点击——> 一晌小贪欢的博客主页求关注 &#x1f44d; 该系列文…

【C++】——list 容器的解析与极致实现

人的一切痛苦&#xff0c;本质上都是对自己的无能的愤怒。 —— 王小波 目录 1、list 介绍 2、list的使用 2.1 list 的构造 2.2 iterator 的使用 2.3 list 的修改 2.4一些特殊接口 2.5 迭代器失效问题 3、实现list 3.1底层结构 结点类 list类 迭代器类 3.2功能接…

【优选算法篇】在分割中追寻秩序:二分查找的智慧轨迹

文章目录 C 二分查找详解&#xff1a;基础题解与思维分析前言第一章&#xff1a;热身练习1.1 二分查找基本实现解题思路图解分析C代码实现易错点提示代码解读 1.2 在排序数组中查找元素的第一个和最后一个位置解题思路1.2.1 查找左边界算法步骤&#xff1a;图解分析C代码实现 1…

git clone报错fatal: pack has bad object at offset 186137397: inflate returned 1

git clone报错fatal: pack has bad object at offset 186137397: inflate returned 1 逐步拷贝 https://stackoverflow.com/questions/27653116/git-fatal-pack-has-bad-object-at-offset-x-inflate-returned-5 https://www.cnblogs.com/Lenbrother/p/17726195.html https://…

在UE引擎中使用spine动画(1)

注意事项&#xff0c;spine的版本必须和UE插件的版本相同。 1.最重要的是“修改骨架名称。&#xff08;影响在UE引擎中的资产名称&#xff09; 2.导出操作&#xff08;把非必要的数据取消掉&#xff0c;可能会影响UE导入&#xff09;。 3.纹理打包&#xff08;一般默认&#…