企业级基于SpringBoot的MQTT的构建和使用

基于SpringBoot的MQTT配置及使用

首先要使用EMQX搭建一个MQTT服务器,参考文档:EMQX快速开始

本着开源分享的观点,闲话不多说,直接上代码

导入Maven

        <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.13</version></dependency>

配置文件

spring:mqtt:#MQTT服务端地址,如果是集群,用逗号隔开url: tcp://localhost:1883#用户名username: root#密码password: 123#clientId代表该服务挂起时的名字,在MQTT服务端clientId不可重复clientId: MqttTest#MQTT默认的消息订阅主题,可配置多个,‘#’为通配符,详情可自己看一下MQTT文档的通配符说明defaultTopic:- server1/#- server2/#
# 默认Qos,关于消息等级可以查看文档关于Qos的不同等级对信息的约束力度,2是最高,有且只接受一次,由于我的项目中对数据要求很高,所以不考虑资源消耗的情况下,我一般采用2,此处有几个默认主题,就要设置几个Qos,按照顺序对应每个主题的等级defaultQos:- 2- 2

配置类

该类为一个Config类,用于接收上一步在application.yml配置文件中配置的配置信息

@Component
@Slf4j
@Getter
@Setter
@ToString
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {private String username;private String password;private String url;private String clientId;private List<String> defaultTopic;private List<Integer> defaultQos;
}

MqttBO类

该类用于构建发送信息的对象

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MqttBO {
//    int qos,boolean retained,String topic,String message
// 发送信息的消息等级,要求高的就是2private Integer qos;// 是否信息保留private Boolean retained;// 发送信息的主题private String topic;// 发送信息的主体private byte[] message;
}

MQTTClient类

由于在springboot项目中,我们只想创建一个单例的Mqttclient进行连接,所以我们创建一个类似于工厂的类,在工厂Bean加载后,创建单例的client,并做连接,订阅,断开等方法的支持,该client加载完成后,在调用时,Spring会自动注入工厂创建连接的client。

/*** @Description MQTT客户端实现工厂,该类主要做工厂生成client,在Bean加载后,创建单例的client。在所有MQTT的Bean中第一位加载* 同时做一些连接,订阅,断开等方法的支持*/
@Slf4j
@Component
public class MyMqttClient {private MqttClient client;@Autowiredprivate MqttProperties mqttProperties;/*** 在Bean加载后,创建单例的client* PostConstruct会在该Bean加载后执行,初始化client*/@PostConstructpublic void init() {try {this.client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());log.info("MQTT客户端初始化成功");} catch (MqttException e) {log.error("MQTT客户端初始化失败: {}", e.getMessage(), e);throw new RuntimeException(e);}}// 连接public synchronized void connect(MqttCallBack mqttCallBack) {try {if (client != null && client.isConnected()) {log.info("发现旧连接,正在断开...");client.disconnectForcibly(); // 强制断开旧连接}MqttConnectOptions options = createConnectOptions();client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());client.setCallback(mqttCallBack);client.connect(options);log.info("MQTT连接成功");} catch (MqttSecurityException e) {log.error("MQTT安全异常: {}", e.getMessage(), e);} catch (MqttPersistenceException e) {log.error("MQTT持久化异常: {}", e.getMessage(), e);} catch (MqttException e) {log.error("MQTT连接失败: {}", e.getMessage(), e);}}// 创建连接选项private MqttConnectOptions createConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());// 保留会话options.setCleanSession(false);// 自动重连,无需在连接断联回调方法中处理重连options.setAutomaticReconnect(true);// 连接超时时间(秒)options.setConnectionTimeout(10);// 心跳间隔options.setKeepAliveInterval(30);// 遗嘱消息options.setWill("willTopic", "客户端已断开".getBytes(), 2, false);return options;}// 订阅public synchronized void subscribe() {try {String[] topics = mqttProperties.getDefaultTopic().toArray(new String[0]);int[] qoses = mqttProperties.getDefaultQos().stream().mapToInt(Integer::valueOf).toArray();client.subscribe(topics, qoses);log.info("订阅主题成功: {}", String.join(", ", topics));} catch (MqttException e) {log.error("订阅主题失败: {}", e.getMessage(), e);}}/*** 发布消息** @param mqttBO 消息对象*/public synchronized void publish(MqttBO mqttBO) {if (mqttBO == null || mqttBO.getTopic() == null || mqttBO.getMessage() == null) {log.warn("发布消息失败: 参数不完整");return;}MqttMessage mqttMessage = new MqttMessage();if (mqttBO.getQos()==null){mqttBO.setQos(2); //默认2}mqttMessage.setQos(mqttBO.getQos());mqttMessage.setRetained(mqttBO.getRetained());mqttMessage.setPayload(mqttBO.getMessage());MqttTopic mqttTopic = client.getTopic(mqttBO.getTopic());try {MqttDeliveryToken token = mqttTopic.publish(mqttMessage);token.waitForCompletion();log.info("消息发布成功: Topic={}, Payload={}", mqttBO.getTopic(), new String(mqttBO.getMessage()));} catch (MqttException e) {log.error("发布消息失败: {}", e.getMessage(), e);}}// 断开连接public synchronized void disConnect() {try {if (client != null && client.isConnected()) {client.disconnect();client.close(); // 确保释放资源log.info("成功断开连接并释放资源");}} catch (MqttException e) {log.error("断开连接失败: {}", e.getMessage(), e);}}// 重新连接public synchronized void reconnect() {try {if (!client.isConnected()) {log.info("尝试重新连接...");client.connect(createConnectOptions());log.info("重新连接成功");}} catch (MqttException e) {log.error("重新连接失败: {}", e.getMessage(), e);}}
}

MQTT回调实现类

要想实现接收到发送到mqtt中的信息,我们需要实现回调接口。

/*** @Description MQTT的回调函数,此处在接收回调里仅作判断,具体逻辑放在mqttService里面*/
@Slf4j
@Component
@DependsOn("myMqttClient")
public class MqttCallBack implements MqttCallbackExtended {// 在回调中,我们的业务逻辑都放在mqttService里面@Autowiredprivate MqttService mqttService;// 注入已有的单例Bean@Autowiredprivate MyMqttClient client;@Autowiredprivate MqttProperties mqttProperties;/*** 客户端断开连接的回调,断开后,mqtt开启了断联自动重连机制,由于在创建连接时,我们开启了自动重连机制,此处无需处理重连,如果有其他需求可以改写*/@Overridepublic void connectionLost(Throwable throwable) {log.info("与服务器断开连接,尝试重新连接...");
//        断开后,mqtt开启了断联自动重连机制,此处无需处理}/*** 接受到信息回调* @param s* @param mqttMessage* @throws Exception*/@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {//    此处我们将接收到的信息传递给mqttService处理,其中s为发送到我们这里的具体地址,MqttMessage为Mqtt返回的对象,但是返回的是字节数组,需要自己转,如果传回的是Json,我们需要转化String message = new String(MqttMessage.getPayload());log.info("接收到信息: Topic={}, Payload={}", s, message);// 然后下一步我们可以用FastJson或者其他的Json工具对接收到的json进行处理}/*** 通知客户端某条消息已经成功发送到 MQTT 服务器并完成交付。此处没有特殊需求无需处理* @param iMqttDeliveryToken*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}/*** 连接成功后回调,由于我们开启了会话保留机制,在断线后会保留会话的信息,但是首次连接需要订阅主题。* @param reconnect* @param serverURI*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {// 首次连接成功时订阅if (!reconnect) {client.subscribe();log.info("首次连接,订阅主题成功");}else{log.info("重新连接成功");}}
}

MQTTConfig类

MQTTConfig类是为了解耦设立,异步连接mqtt,在项目启动时,会调用mqttInitRunner方法,进行连接。

/*** @Description mqttConfig是为了解耦设立,异步连接mqtt*/
@Configuration
@Slf4j
public class MqttConfig {@Autowiredprivate MqttCallBack mqttCallBack;@Autowiredprivate MyMqttClient client;@Beanpublic ApplicationRunner mqttInitRunner() {return args -> {try {client.connect(mqttCallBack);
//                连接后,此处会回调connectComplete方法去进行订阅主题} catch (Exception e) {log.error("MQTT 初始化失败", e);}};}}

如何使用

导入Maven后,将上述代码复制进项目后,直接启动即可,该配置不唯一,根据项目需求可以更改。
在有的需求中,需要对设备去请求信息,这时就要发送到mqtt中请求信息,然后设备订阅mqtt主题,在MqttService中,注入client,使用client.publish(mqttBO)即可发送到mqtt请求信息,设备接收后,返回信息到Mqtt中,接收信息需要在MqttCallBack中的messageArrived方法中处理。

一切的返回都会在MqttCallBack的messageArrived方法中返回,具体逻辑在MqttService中处理,如果返回的是json,需要自己解析。

ps:
关于技术方案的构建,不同业务场景往往存在多种实现路径,本文所述仅为其中一种实践方案。若读者在具体实施过程中遇到技术选型或架构设计方面的疑问,欢迎在评论区留言探讨,笔者将结合过往经验给予针对性建议。

需要特别说明的是,文中代码源自笔者过往工作实践中的项目积累,应用于真实的企业级开发中,现经脱敏和简化处理后开源,以供读者借鉴。
鉴于当前技术生态的演进态势,传统Java技术栈的市场空间逐渐收窄,该篇文章可能是笔者的封笔作。技术浪潮奔涌不息,青山不改,绿水长流,期待与诸位在更广阔的数字化领域相逢,共同见证科技赋能未来的无限可能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/74636.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

26考研——图_图的代码实操(6)

408答疑 文章目录 五、图的代码实操图的存储邻接矩阵结构定义初始化插入顶点获取顶点位置在顶点 v1 和 v2 之间插入边获取第一个邻接顶点获取下一个邻接顶点显示图 邻接表结构定义初始化图插入顶点获取顶点位置在顶点 v1 和 v2 之间插入边获取第一个邻接顶点获取下一个邻接顶点…

开源webmail邮箱客户端rainloop的分支版本SnappyMail 设置发件人允许多重身份

RainLoop已多年未更新&#xff0c;SnappyMail 是 RainLoop 的分支&#xff0c;由社区维护。SnappyMail 不仅修复了漏洞&#xff0c;还增加了更多功能和优化。对 IMAP 支持更好&#xff0c;移动端体验也比 RainLoop 更细致。 安装过程和设置跟RainLoop一样&#xff1a; 以宝塔面…

海量数据场景题--查找两个大文件的URL

查找两个大文件共同的URL 给定 a、b 两个文件&#xff0c;各存放 50 亿个 URL&#xff0c;每个 URL 各占 64B&#xff0c;找出 a、b 两个文件共同的 URL。内存限制是 4G。 操作逻辑&#xff1a; 使用哈希函数 hash(URL) % 1000​ 将每个URL映射到0-999的编号 文件A切割为a0, a1…

简单ELK框架搭建

简介 ELK 框架是一套开源的日志管理和分析工具&#xff0c;由 Elasticsearch、Logstash 和 Kibana 三个主要组件组成&#xff0c;现在新增了Filebeat组件&#xff0c;可以更高效的收集数据。 Elasticsearch&#xff1a;是一个分布式、高可扩展的开源搜索引擎&#xff0c;能快速…

VS Code 中 .history`文件的来源与 .gitignore`的正确使用

引言 在使用 VS Code 进行 Git 版本控制时&#xff0c;有时会发现项目中多出一个 .history 目录&#xff0c;并被 Git 识别为未跟踪文件。本文将解释 .history 的来源&#xff0c;并提供 .gitignore 的正确配置方法&#xff0c;确保开发环境的整洁性。 1. .history 文件的来源…

网络之数据链路层

数据链路层 数据链路层目标 TCP/IP提供了一种能力, 将数据可靠的从 B 跨网络送到 C 主机, 这期间是由无数次局域网转发构成的, 比如 主机B 到 路由器F 就是一次局域网通信的问题, 而数据链路层就是研究数据是如何在局域网内部转发的. 也就是说, 应用层是进行数据的处理, 传输…

A Brief History: from GPT-1 to GPT-3

This is my reading notes of 《Developing Apps with GPT-4 and ChatGPT》. In this section, we will introduce the evolution of the OpenAI GPT medels from GPT-1 to GPT-4. GPT-1 In mid-2018, OpenAI published a paper titled “Improving Language Understanding …

基于大数据的各品牌手机销量数据可视化分析系统(源码+lw+部署文档+讲解),源码可白嫖!

摘要 时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;各品牌手机销量数据可视化分析系统当然不能排除在外。基于大数据的各品牌手机销量数据可视化分析系统是在实际应用和软件工程的开发原理之…

人工智能-群晖Docker部署DB-GPT

人工智能-群晖Docker部署DB-GPT 0 环境及说明1 获取dbgpt的docker镜像2 下载向量模型3 下载配置文件4 修改配置文件5 创建dbgpt容器并运行6 访问dbgpt0 环境及说明 环境项说明DSM版本DSM 7.2.1-69057 update 3Container Manager版本24.0.2-1535当前 hub.docker.com 镜像仓库中的…

Netty——TCP 粘包/拆包问题

文章目录 1. 什么是 粘包/拆包 问题&#xff1f;2. 原因2.1 Nagle 算法2.2 滑动窗口2.3 MSS 限制2.4 粘包的原因2.5 拆包的原因 3. 解决方案3.1 固定长度消息3.2 分隔符标识3.3 长度前缀协议3.3.1 案例一3.3.2 案例二3.3.3 案例三 4. 总结 1. 什么是 粘包/拆包 问题&#xff1f…

JavaScript Fetch API

简介 fetch() API 是用于发送 HTTP 请求的现代异步方法&#xff0c;它基于 Promise&#xff0c;比传统的 XMLHttpRequest 更加简洁、强大 示例 基本语法 fetch(url, options).then(response > response.json()).then(data > console.log(data)).catch(error > con…

UMI-OCR Docker 部署

额外补充 Docker 0.前置条件 部署前&#xff0c;请检查主机的CPU是否具有AVX指令集 lscpu | grep avx 输出如下即可继续部署 Flags: ... avx ... avx2 ... 1.下载dockerfile wget https://raw.githubusercontent.com/hiroi-sora/Umi-OCR_runtime_linux/main/Do…

C++ --- 二叉搜索树

1 二叉搜索树的概念 ⼆叉搜索树⼜称⼆叉排序树&#xff0c;它或者是⼀棵空树&#xff0c;或者是具有以下性质的⼆叉树: 1 若它的左⼦树不为空&#xff0c;则左⼦树上所有结点的值都⼩于等于根结点的值 2 若它的右⼦树不为空&#xff0c;则右⼦树上所有结点的值都⼤于等于根结点…

跨语言语言模型预训练

摘要 最近的研究表明&#xff0c;生成式预训练在英语自然语言理解任务中表现出较高的效率。在本研究中&#xff0c;我们将这一方法扩展到多种语言&#xff0c;并展示跨语言预训练的有效性。我们提出了两种学习跨语言语言模型&#xff08;XLM&#xff09;的方法&#xff1a;一种…

文件描述符,它在哪里存的,exec()后还存在吗

学过计系肯定了解 寄存器、程序计数器、堆栈这些 程序运行需要的资源。 这些是进程地址空间。 而操作系统分配一个进程资源时&#xff0c;分配的是 PCB 进程控制块。 所以进程控制块还维护其他资源——程序与外部交互的资源——文件、管道、套接字。 文章目录 文件描述符进程管…

Slidev使用(一)安装

文章目录 1. **安装位置**2. **使用方式**3. **适用场景**4. **管理和维护** 全局安装1. **检查 Node.js 和 npm 是否已安装**2. **全局安装 Slidev CLI**3. **验证安装是否成功**4. **创建幻灯片文件**5. **启动 Slidev**6. **实时编辑和预览**7. **构建和导出&#xff08;可选…

第二十一章:模板与继承_《C++ Templates》notes

模板与继承 重点和难点编译与测试说明第一部分&#xff1a;多选题 (10题)第二部分&#xff1a;设计题 (5题)答案与详解多选题答案&#xff1a;设计题参考答案 测试说明 重点和难点 21.1 空基类优化&#xff08;EBCO&#xff09; 知识点 空基类优化&#xff08;Empty Base Cla…

AOA与TOA混合定位,MATLAB例程,自适应基站数量,三维空间下的运动轨迹,滤波使用EKF

本代码实现了一个基于 到达角(AOA) 和 到达时间(TOA) 的混合定位算法,结合 扩展卡尔曼滤波(EKF) 对三维运动目标的轨迹进行滤波优化。代码通过模拟动态目标与基站网络,展示了从信号测量、定位解算到轨迹滤波的全流程,适用于城市峡谷、室内等复杂环境下的定位研究。 文…

量子计算:开启未来计算的新纪元

一、引言 在当今数字化时代&#xff0c;计算技术的飞速发展深刻地改变了我们的生活和工作方式。从传统的电子计算机到如今的高性能超级计算机&#xff0c;人类在计算能力上取得了巨大的进步。然而&#xff0c;随着科技的不断推进&#xff0c;我们面临着越来越多的复杂问题&…

AMD机密计算虚拟机介绍

一、什么机密计算虚拟机 机密计算虚拟机 是一种基于硬件安全技术(如 AMD Secure Encrypted Virtualization, SEV)的虚拟化环境,旨在保护虚拟机(VM)的 ​运行中数据​(包括内存、CPU 寄存器等)免受外部攻击或未经授权的访问,即使云服务提供商或管理员也无法窥探。 AMD …