SpringBoot集成MQTT实现交互服务通信

引言

本文是springboot集成mqtt的一个实战案例。
gitee代码库地址:源码地址

一、什么是MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 于 1999 年发明。MQTT 协议的主要特征是开放、简单、轻量级和易于实现,这些特征使得它适用于受约束的应用环境,如:

网络受限:网络带宽较低且传输不可靠
终端受限:协议运行在嵌入式设备上,嵌入式终端的处理器、内存等是受限的

MQTT 非常适用于物联网领域,如传感器与服务器的通信、传感器信息采集等。

二、发布/订阅模式

发布/订阅模式(Publish/Subscribe Pattern,简称Pub/Sub)是一种消息通信模式,在这种模式下,消息的发送者(发布者)不会将消息直接发送给特定的接收者(订阅者)。而是将代表消息内容的通知(事件)发布到一个特定的主题或频道上,而订阅了这个主题的接收者会收到所有在这个主题上发布的通知。这种模式解耦了消息的发送者和接收者,使得系统更加灵活和可扩展。

主要组成部分

  1. 发布者(Publisher):负责生成消息并将其发布到特定的主题或频道。

  2. 订阅者(Subscriber):注册对特定主题的兴趣,并接收该主题上的所有消息。

  3. 消息代理(Message Broker):作为中间件,它接收来自发布者的消息,并将这些消息传递给所有相关的订阅者。

优点

  • 解耦:发布者和订阅者之间不需要直接交互,这降低了系统的耦合度。

  • 灵活性:可以动态添加或删除订阅者,不影响其他组件。

  • 可扩展性:系统容易扩展,可以轻松增加新的发布者或订阅者。

缺点

  • 复杂性:引入了额外的组件(如消息代理),增加了系统的复杂性和管理成本。

  • 性能开销:消息的传递需要通过中间件,可能会有延迟和性能损失。

应用场景

  • 事件驱动架构:在微服务架构中,不同的服务通过发布/订阅模式进行异步通信。

  • 数据流处理:如实时数据分析,多个组件可以订阅数据流并进行处理。

  • 分布式系统:用于跨系统或跨服务的消息传递。

发布/订阅模式并不是 MQTT 协议特有的模式,很多消息中间件都有使用发布/订阅模式,有同学可能认为这就是观察者模式,还真不是,这两个模式很容易混淆。观察者模式只有观察者 + 被观察者两个角色,而发布/订阅模式还有一个经纪人 Broker;往更深层次的讲观察者和被观察者,是松耦合的关系,而发布者和订阅者,则完全不存在耦合。

三、Windows下安装MQTT消息服务器

非常遗憾,EMQ X Broker 在 5.4.0 版本的发行版中已不支持 windows 版本的安装包了,笔者从网上找了一个最后支持版本的压缩包,已上传资源。

  • 解压后,在bin文件下,使用cmd执行运行命令 .\emqx console
  • 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码 admin/public

如果报错缺少Erlang环境,需要自行安装下该环境

在这里插入图片描述
在这里插入图片描述

浏览器访问:http://localhost:18083/#,输入账号密码进入,会要求你修改密码,可以暂时跳过

在这里插入图片描述

四、Windows安装MQTT消息代理客户端MQTTX

下载地址:MQTTX下载地址

点击免费下载
在这里插入图片描述

选择64位版本

在这里插入图片描述
下好后点击安装,启动运行界面如下:
在这里插入图片描述
语言是英文,可以在设置按钮里调成中文。这个客户端代理主要是进行消息发送的测试服务。

五、新建MQTT集成项目

随便新建了一个springboot应用,用的是JDK17,在pom文件中引入如下依赖:

        <!-- MQTT --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

5.1 yml配置

server:port: 8081#允许循环依赖
spring:main:allow-circular-references: truecustomer:mqtt:broker: tcp://localhost:1883clientList:#发布客户端ID- clientId: nays_service#监听主题 同时订阅多个主题 使用 - 分割开subscribeTopic: mqtt/publish#用户名userName: admin#密码password: public#接收客户端ID- clientId: receive_service#监听主题 同时订阅多个主题 使用 - 分割开subscribeTopic: mqtt/receive#用户名userName: admin#密码password: public

5.2 Mqtt配置类

package com.hulei.mqttproject.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.List;/*** Mqtt配置类*/
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {/*** mqtt broker地址*/String broker;/*** 需要创建的MQTT客户端*/List<MqttClient> clientList;
}

5.3 MQTT客户端

package com.hulei.mqttproject.config;import lombok.Data;/*** MQTT客户端*/
@Data
public class MqttClient {/*** 客户端ID*/private String clientId;/*** 监听主题*/private String subscribeTopic;/*** 用户名*/private String userName;/*** 密码*/private String password;
}

5.4 MQTT客户端管理类

package com.hulei.mqttproject.config;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** MQTT客户端管理类,如果客户端非常多后续可入redis缓存*/
@Slf4j
@Component
public class MqttClientManager {@Value("${customer.mqtt.broker}")private String mqttBroker;@Resourceprivate MqttCallBackContext mqttCallBackContext;/*** 存储MQTT客户端*/public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();public MqttClient getMqttClientById(String clientId) {return MQTT_CLIENT_MAP.get(clientId);}/*** 创建mqtt客户端** @param clientId       客户端ID* @param subscribeTopic 订阅主题,可为空* @param userName       用户名,可为空* @param password       密码,可为空*/public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(mqttBroker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();if (null != userName && !userName.isEmpty()) {connOpts.setUserName(userName);}if (null != password && !password.isEmpty()) {connOpts.setPassword(password.toCharArray());}connOpts.setCleanSession(true);if (null != subscribeTopic && !subscribeTopic.isEmpty()) {AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);if (null == callBack) {callBack = mqttCallBackContext.getCallBack("default");}callBack.setClientId(clientId);callBack.setConnectOptions(connOpts);client.setCallback(callBack);}//连接mqtt服务端brokerclient.connect(connOpts);// 订阅主题if (null != subscribeTopic && !subscribeTopic.isEmpty()) {if (subscribeTopic.contains("-"))client.subscribe(subscribeTopic.split("-"));else {client.subscribe(subscribeTopic);}}MQTT_CLIENT_MAP.putIfAbsent(clientId, client);} catch (MqttException e) {log.error("Create mqttClient failed!", e);}}
}

5.5 MQTT客户端创建

package com.hulei.mqttproject.config;import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.List;/*** MQTT客户端创建*/
@Component
@Slf4j
public class MqttClientCreate {@Resourceprivate MqttClientManager mqttClientManager;@Resourceprivate MqttConfig mqttConfig;/*** 创建MQTT客户端*/@PostConstructpublic void createMqttClient() {List<MqttClient> mqttClientList = mqttConfig.getClientList();for (MqttClient mqttClient : mqttClientList) {log.info("{}", mqttClient);//创建客户端,客户端ID:demo,回调类跟客户端ID一致mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());}}
}

5.6 MQTT回调抽象类

package com.hulei.mqttproject.config;import jakarta.annotation.Resource;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;/*** MQTT回调抽象类*/
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {private String clientId;private MqttConnectOptions connectOptions;@ResourceMqttClientManager mqttClientManager;/*** 失去连接操作,进行重连** @param throwable 异常*/@Overridepublic void connectionLost(Throwable throwable) {try {if (null != clientId) {if (null != connectOptions) {mqttClientManager.getMqttClientById(clientId).connect(connectOptions);} else {mqttClientManager.getMqttClientById(clientId).connect();}}} catch (Exception e) {log.error("{} reconnect failed!", e.getMessage(), e);}}/*** 接收订阅消息* @param topic    主题* @param mqttMessage 接收消息* @throws Exception 异常*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {String content = new String(mqttMessage.getPayload());handleReceiveMessage(topic, content);}/*** 消息发送成功** @param iMqttDeliveryToken toke*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("消息发送成功");}/*** 处理接收的消息* @param topic   主题* @param message 消息内容*/protected abstract void handleReceiveMessage(String topic, String message);
}

5.7 MQTT订阅回调环境类

package com.hulei.mqttproject.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** MQTT订阅回调环境类*/
@Component
@Slf4j
public class MqttCallBackContext {private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();/*** 默认构造函数** @param callBackMap 回调集合*/public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {this.callBackMap.putAll(callBackMap);}/*** 获取MQTT回调类** @param clientId 客户端ID* @return MQTT回调类*/public AbsMqttCallBack getCallBack(String clientId) {return this.callBackMap.get(clientId);}
}

5.8 默认回调类

package com.hulei.mqttproject.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** 默认回调*/
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {/*** @param topic   主题* @param message 消息内容*/@Overrideprotected void handleReceiveMessage(String topic, String message) {log.info("接收到主题---{}", topic);log.info("接收到消息---{}", message);// 自定义消息处理业务}
}

六、测试服务类

package com.hulei.mqttproject.controller;import com.hulei.mqttproject.config.MqttClientManager;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@Slf4j
public class SendController {@Resourceprivate MqttClientManager mqttClientManager;@RequestMapping("/sendMessage")public String sendMessage(String topic){try {MqttMessage mqttMessage = new MqttMessage("你好".getBytes());mqttClientManager.getMqttClientById("nays_service").publish(topic,mqttMessage);return "发送成功";} catch (Exception e) {log.error("发送失败",e);return "发送失败";}}
}

七、启动springboot

启动日志可以看到,mqtt消息服务器连接成功

在这里插入图片描述
EMQX工具显示发布客户端和接收客户端均已成功注册

在这里插入图片描述

使用Apifox测试下SendController中的接口,mqtt/receive是yaml中接收客户端订阅的主题,当然也可以往mqtt/publish主题发,mqtt中消息的发布者也可以订阅主题,监听某些消息。

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/47390.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA自带的Maven 3.9.x无法刷新http nexus私服

问题&#xff1a; 自建的私服&#xff0c;配置了域名&#xff0c;使用http协议&#xff0c;在IDEA中或本地Maven 3.9.x会出现报错&#xff0c;提示http被blocked&#xff0c;原因是Maven 3.8.1开始&#xff0c;Maven默认禁止使用HTTP仓库地址&#xff0c;只允许使用HTTPS仓库地…

【单片机毕业设计选题24069】-物联网节水灌溉系统设计

系统功能: 完成基于物联网的节水灌溉系统的电路图以及软件代码编写。要求系统可以通过传感器监测土壤的湿度和环境温湿度&#xff0c;如果土壤湿度低于限值和环境温湿度超过限值&#xff0c;则需开启继电器&#xff0c;打开电机水泵进行供水灌溉&#xff1b;当土壤湿度高于限值…

高数知识补充----矩阵、行列式、数学符号

矩阵计算 参考链接&#xff1a;矩阵如何运算&#xff1f;——线性代数_矩阵计算-CSDN博客 行列式计算 参考链接&#xff1a;实用的行列式计算方法 —— 线性代数&#xff08;det&#xff09;_det线性代数-CSDN博客 参考链接&#xff1a;行列式的计算方法(含四种&#xff0c;…

使用ETLCloud实现MySQL数据库与StarRocks数据库同步

在现代数据架构中&#xff0c;数据同步是保证数据一致性和分析准确性的关键步骤之一。本文将介绍如何利用ETLCloud技术实现MySQL数据库与StarRocks数仓数据库的高效数据同步&#xff0c;以及其在数据管理和分析中的重要性。 数据同步的重要性 在数据驱动的时代&#xff0c;企…

uniapp 解决scroll-view组件 refresher-triggered刷新无效

直接上代码 看代码注释 const isRefresh ref(false); //下拉刷新状态// 下拉刷新async function refresherpulling() {renderArr.value [];isRefresh.value true; // 先赋为true 调用完接口再设为falseawait reqData();isRefresh.value false; // 重置状态}下面是组件视图 …

OpenAI训练数据从哪里来、与苹果合作进展如何?“ChatGPT之母”最新回应

7月9日&#xff0c;美国约翰霍普金斯大学公布了对“ChatGPT之母”、OpenAI首席技术官米拉穆拉蒂&#xff08;Mira Murati&#xff09;的采访视频。这场采访时间是6月10日&#xff0c;访谈中&#xff0c;穆拉蒂不仅与主持人讨论了OpenAI与Apple的合作伙伴关系&#xff0c;还深入…

Apache Omid TSO 组件源码实现原理

Apache Omid TSO 组件实现原理 作用 独立进程&#xff0c;处理全局事务之间的并发冲突。 流程 TSOChannelHandler#channelRead -> AbstractRequestProcessor -> PersistenceProcessorHandler 总体流程 thread1TSOChannelHandler#channelReadAbstractRequestProcess…

智能边缘计算网关:实现工业自动化与数据处理的融合-天拓四方

随着物联网&#xff08;IoT&#xff09;技术的迅速发展和普及&#xff0c;越来越多的设备被连接到互联网上&#xff0c;产生了海量的数据。如何有效地处理和分析这些数据&#xff0c;同时确保数据的安全性和实时性&#xff0c;成为了摆在企业面前的一大挑战。智能边缘计算网关作…

广联达Linkworks ArchiveWebService XML实体注入漏洞复现

0x01 产品简介 广联达 LinkWorks(也称为 GlinkLink 或 GTP-LinkWorks)是广联达公司(Glodon)开发的一种BIM(建筑信息模型)协同平台。广联达是中国领先的数字建造技术提供商之一,专注于为建筑、工程和建筑设计行业提供数字化解决方案。 0x02 漏洞概述 广联达 LinkWorks…

在VScode中编译C程序

一&#xff0c;安装 VS Code 下载并安装VS code&#xff0c;安装简体中文和C/C插件。略。 二&#xff0c;配置gcc环境 下载并安装MinGW。添加环境变量。略。 在cmd中输入 gcc -v 能打印版本即可。 三&#xff0c;打开文件夹&#xff0c;创建工作区 1&#xff0c;打开文件夹…

数据库系统概论:数据库系统模式

数据库系统在我们的数字世界中扮演着至关重要的角色&#xff0c;无论是个人设备还是企业级应用&#xff0c;数据的有效管理和访问都是必不可少的。而数据库系统的模式结构是确保数据一致性和可访问性的关键组成部分。 数据库系统模式 基本概念 型和值 数据模型中有 型(type…

游戏中的敏感词算法初探

在游戏中起名和聊天需要服务器判断是否含有敏感词&#xff0c;从而拒绝或屏蔽敏感词显示&#xff0c;这里枚举一些常用的算法和实际效果。 1.字符串匹配算法 常用的有KMP&#xff0c;核心就是预处理出next数组&#xff0c;也就是失配信息&#xff0c;时间复杂度在O(mn) 。还有个…

微软研究人员为电子表格应用开发了专用人工智能LLM

微软的 Copilot 生成式人工智能助手现已成为该公司许多软件应用程序的一部分。其中包括 Excel 电子表格应用程序&#xff0c;用户可以在其中输入文本提示来帮助处理某些选项。微软的一组研究人员一直在研究一种新的人工智能大型语言模型&#xff0c;这种模型是专门为 Excel、Go…

Transformer系列专题(四)——Swintransformer

文章目录 九、SwinTransformer9.1 整体网络架构9.2 Transformer Blocks9.3 Patch Embedding&#xff08;将图像切割成小块&#xff08;Patch&#xff09;&#xff09;9.4 window_partition9.5 W-MSA&#xff08;Window Multi-head Self Attention&#xff09;9.6 window_revers…

Redis-应用

目录 应用 缓存雪崩、击穿、穿透和解决办法? 布隆过滤器是怎么工作的? 缓存的数据一致性怎么保证 Redis和Mysql消息一致性 业务一致性要求高怎么办? 数据库与缓存的一致性问题 数据库和缓存的一致性如何保证 如何保证本地缓存和分布式缓存的一致&#xff1f; 如果在…

【Pytorch】一文向您详细介绍 `tensor.max(1, keepdims=True)`

【&#x1f525;Pytorch】一文向您详细介绍 tensor.max(1, keepdimsTrue) 下滑即可查看博客内容 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; &#x1f393; 博主简介&#xff…

(一)原生js案例之图片轮播

原生js实现的两种播放效果 效果一 循环播放&#xff0c;单一的效果 代码实现 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-sc…

昇思学习打卡-20-生成式/GAN图像生成

文章目录 网络介绍生成器和判别器的博弈过程数据集可视化模型细节训练过程网络优缺点优点缺点 网络介绍 GAN通过设计生成模型和判别模型这两个模块&#xff0c;使其互相博弈学习产生了相当好的输出。 GAN模型的核心在于提出了通过对抗过程来估计生成模型这一全新框架。在这个…

今日安装了一下Eclipse,配置了SVN

Eclipse安装配置参考文章1&#xff1a; https://blog.csdn.net/maiya_yayaya/article/details/132208892 Eclipse配置SVN参考文章2&#xff1a; https://blog.csdn.net/zzh45828/article/details/106224375 Eclipse如何导入项目参考文章3&#xff1a; https://blog.csdn.n…

Linux上的系统服务——DNS、WEB、NFS 和 AutoFS 服务的详细配置步骤

现有主机 node01 和 node02&#xff0c;完成如下需求&#xff1a; 1、在 node01 主机上提供 DNS 和 WEB 服务 2、dns 服务提供本实验所有主机名解析 3、web服务提供 www.rhce.com 虚拟主机 4、该虚拟主机的documentroot目录在 /nfs/rhce 目录 5、该目录由 node02 主机提供的NFS…