SpringBoot中使用MQTT实现消息的订阅和发布

SpringBoot中使用MQTT实现消息的订阅和发布

背景 java框架SpringBoot通过mQTT通信 控制物联网设备

还是直接上代码
第一步依赖:

      <!--mqtt相关依赖--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.14</version></dependency>

第二步配置文件

#mqtt
mqtt:mqttUrl: tcp://127.0.0.1mqttPort: 1883mqttUsername: adminmqttPassword: publicmqttClientId: aaa# MQTT回调类型 按一个MQTT服务区分
# 如果MQTT服务端换了 回调处理的是新的业务需求  就把这个换了
#  然后在MQTT配置文件中扩展新的回调类
mqttTypeCallback: breakerCallback  

第三步 config类

package com.xxx.iotjava.mqtt.config;import com.xxx.iotjava.mqtt.callback.BreakerCallback;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;/*** User:Json* Date: 2024/6/17**/
@Configuration
@Slf4j
public class MqttConfig {@Value("${mqtt.mqttUsername}")private String mqttUsername;@Value("${mqtt.mqttPassword}")private String mqttPassword;@Value("${mqtt.mqttUrl}")private String mqttUrl;@Value("${mqtt.mqttPort}")private Integer mqttPort;@Value("${mqtt.mqttClientId}")private String mqttClientId;@Value("${mqttTypeCallback}")private String mqttTypeCallback;private static String breakerCallback = "breakerCallback";/*** 客户端对象*/private MqttClient client;/*** 客户端连接服务端* 目前只支持一个 MQTT服务端 如果后续一个项目有多个MQTT服务端那就设计成工厂模式*/public boolean connect() {if (isMqtt()){return false;}try {//new MemoryPersistence() 使用内存持久化// 优点:不会在文件系统中创建任何文件(如 .lck 文件),适合对会话持久性没有要求的场景。// 缺点:  客户端断开连接或重启后,会话数据会丢失,无法保留订阅信息和未发送的消息// String persistenceDirectory = "/path/to/your/mqtt/persistence";//new MqttDefaultFilePersistence(persistenceDirectory) 使用文件持久化//如果persistenceDirectory 不写 他默认创建 根目录 linux要给权限// 优点: 客户端断开连接或重启后,能够保留订阅信息和未发送的消息。这对于需要保持会话状态的应用非常重要// 缺点 会在指定的目录中创建文件(如 .lck 文件),需要确保指定的目录是有效的,并且应用有权限访问该目录//创建MQTT客户端对象client = new MqttClient(mqttUrl + ":" + mqttPort, mqttClientId,new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接服务器都是以新的身份//如果他为true 会出现一个问题 //当网络断开后,客户端会进行重连,但是重连之前订阅的主题就失效了,不再接受之前订阅主题的消息。//因为配置里将cleanSession 设为 true ,当客户端掉线时 ,//服务器端会清除 客户端 session 。 重连后 客户端会有一个新的session。 // 所以如果大家把他为true 重新连接mqtt后,要注意需要手动再订阅一下主题// 推荐文档:https://www.cnblogs.com/A-yes/p/9894144.htmloptions.setCleanSession(true);//设置连接用户名options.setUserName(mqttUsername);//设置连接密码options.setPassword(mqttPassword.toCharArray());options.setAutomaticReconnect(true);  // 启用自动重连//设置超时时间,单位为秒  如果在指定的时间内未能建立连接,客户端会放弃连接尝试并抛出异常。options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息//  options.setWill("willTopic",(mqttClientId + ":与服务器断开连接").getBytes(),0,false);if (StringUtils.isEmpty(mqttTypeCallback)) {log.error("MQTT回调类型为空,请去java_config配置文件配置!");}//设置回调if (breakerCallback.equals(mqttTypeCallback)) {//断路器回调client.setCallback(new BreakerCallback());}client.connect(options);return true;} catch (MqttException e) {log.error("MQTT启动报错:" + e.getMessage());e.printStackTrace();return false;}}/*** qos* 0  最多一次传递【适用于对消息丢失不敏感的场景,如传感器数据频繁发送,可以接受偶尔的数据丢失】* 1 至少一次传递  【消息至少传递一次,但可能会重复(即重复消息)】* 2 仅一次传递 【消息确保仅传递一次,既不会丢失也不会重复。】* retained* 保留消息:如果 retained 参数设置为 true,消息会被代理保留。代理将记住这个消息,并在新客户端订阅该主题时立即发送这个消息。* 非保留消息:如果 retained 参数设置为 false,消息不会被保留,只会发送给当前在线并订阅该主题的客户端。* topic    主题* message  内容*/public void publish(int qos, boolean retained, String topic, String message) {log.info("topic为:【"+topic+"】,qos为:【"+qos+"】 mqtt 发布数据为:"+message);MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained); //代理将记住这个消息,并在新客户端订阅该主题时立即发送这个消息。mqttMessage.setPayload(message.getBytes());//主题的目的地,用于发布信息MqttTopic mqttTopic = client.getTopic(topic);MqttDeliveryToken token;try {//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态token = mqttTopic.publish(mqttMessage);//token.waitForCompletion(); // 等待完成 会堵塞} catch (MqttException e) {log.warn("ClientId【" + mqttClientId + "】发布失败!主题【" + topic + "】,发布数据为:" + message);e.printStackTrace();}}/*** 断开连接*/public void disConnect() {try {client.disconnect();} catch (MqttException e) {e.printStackTrace();}}/****  手动连接*  可用于断线后 手动重连* ***/public boolean againConnect() {try {if (client != null && !client.isConnected()) {client.connect();}return true;} catch (MqttException e) {e.printStackTrace();return false;}}//验证是否启动mqtt连接private boolean isMqtt(){if (StringUtils.isEmpty(mqttUrl) || StringUtils.isEmpty(mqttPort)|| StringUtils.isEmpty(mqttUsername) || StringUtils.isEmpty(mqttPassword)|| StringUtils.isEmpty(mqttClientId)) {log.info("==========mqtt 参数不全,无需启动MQTT连接==================");return true;}return false;}/*** 订阅指定主题* @param topic 订阅的主题* @param qos   订阅的服务质量*/public boolean subscribe(String topic, int qos) {if (isMqtt()){return false;}try {if (client != null && client.isConnected()) {client.subscribe(topic, qos);log.info("订阅主题 {} 成功!", topic);} else {log.error("MQTT客户端尚未连接,无法订阅主题 {}!", topic);}return true;} catch (MqttException e) {log.error("订阅主题 {} 失败:{}", topic, e.getMessage());e.printStackTrace();return false;}}/*** 批量订阅主题*  消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息* @param topic 订阅的主题集合* @param qos   订阅的服务质量集合*/public boolean subscribe(String[] topic, int[] qos) {if (isMqtt()){return false;}try {if (client != null && client.isConnected()) {client.subscribe(topic, qos);log.info("订阅主题 {} 成功!", topic);} else {log.error("MQTT客户端尚未连接,无法订阅主题 {}!", topic);}return true;} catch (MqttException e) {log.error("订阅主题 {} 失败:{}", topic, e.getMessage());e.printStackTrace();return false;}}
}

第四步 回调类

package com.xxx.iotjava.mqtt.callback;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xxx.iotjava.entities.BreakerData;
import com.xxx.iotjava.enums.breaker.BreakerKeywordsEnum;
import com.xxx.iotjava.enums.breaker.BreakerKeywordsValueEnum;
import com.xxx.iotjava.enums.breaker.BreakerOperationEnum;
import com.xxx.iotjava.service.inteface.IBreakerDataService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;import com.alibaba.fastjson.JSONArray;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;/*** User:Json* Date: 2024/6/17**/
@Component
@Slf4j
public class BreakerCallback implements MqttCallback {@AutowiredIBreakerDataService iBreakerDataService;/*** 与服务器断开的回调 *  这里可以做手动连接 但是配置config类 配置了 自动检测异常 true 这里可以也不做*      options.setAutomaticReconnect(true);  // 启用自动重连*/@Overridepublic void connectionLost(Throwable throwable) {log.error("MQTT连接有异常:" + throwable.getMessage());}/*** 订阅的回调* 消息到达的回调* 注意 如果这个回调方法 如果有异常 报错 ,mqtt会重新连接 * 因为配置文件 设置了  options.setAutomaticReconnect(true);  // 启用自动重连* 如果自动重连了 如果是开启新的会话 以前的订阅会消失  具体操作 再上面的配置文件类说明过了*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//        System.out.println("上报时间:"+ LocalDateTime.now());
//        System.out.println(String.format("接收消息主题 : %s",topic));
//        System.out.println(String.format("接收消息Qos : %d",mqttMessage.getQos()));
//        System.out.println(String.format("接收消息内容 : %s",new String(mqttMessage.getPayload())));
//        System.out.println(String.format("接收消息retained : %b",mqttMessage.isRetained()));}/*** 发布的回调* 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {IMqttAsyncClient client = token.getClient();log.info(client.getClientId() + "发布消息成功!");}}

第五步 mqtt工具类

package com.xxx.iotjava.utils;import com.xxx.init.utils.AppContextUtil;import com.xxx.iotjava.enums.breaker.BreakerOperationTopicEnum;
import com.xxx.iotjava.mqtt.config.MqttConfig;import lombok.extern.slf4j.Slf4j;/*** User:Json* Date: 2024/6/17**/
@Slf4j
public class MqttUtils {private static MqttConfig mqttConfig;public static MqttConfig getMqttConfig() {if (mqttConfig == null)mqttConfig = AppContextUtil.getBean(MqttConfig.class);return mqttConfig;}//初始化 订阅public  static boolean subscribeInit(){return getMqttConfig().subscribe(BreakerOperationTopicEnum.REPORTING_API.getTopic(), 0);}/*** 发送消息* qos 0 最多一次传递  1 至少一次传递  2 仅一次传递* retained  true 保留消息  false 非保留消息* topic    主题* message  内容*/public static boolean sendMqttMsg(int qos, boolean retained, String topic, String message) {try {getMqttConfig().publish(qos, retained, topic, message);return true;} catch (Exception e) {e.printStackTrace();log.error("MQtt发送消息报错:" + e.getMessage());return false;}}/** topic 主题* message 内容* */public static boolean sendMqttMsg(String topic, String message) {return sendMqttMsg(1, false, topic, message);}}

第六步 调用测试
发布
MqttUtils.sendMqttMsg(topic, data)
//订阅 我做的是启动的时候 初始化订阅 所以 直接根据定义的 topic 常量进行初始化订阅
//BreakerOperationTopicEnum.REPORTING_API.getTopic() 我 定义的topic 枚举类 常量
// 这里就不分享了
MqttUtils.subscribeInit();

完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/34600.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

百度百科词条创建的前提条件

随着互联网的发展&#xff0c;人们获取信息越来越依赖于搜索引擎&#xff0c;而百度百科作为百度搜索的核心产品在百度中一般能够稳居首位&#xff0c;而且百科词条具有权威性&#xff0c;可信度比较高&#xff0c;非常适用于企业和人物的形象宣传。 最近&#xff0c;小马识途营…

JS-数组扁平化方法合集(递归,while循环,flat)

前言 数组扁平化也是面试常考题之一&#xff0c;今天就和大家简单分享一下常见的数组扁平方法。这题其实主要考察的是递归思想&#xff0c;因为当数组里面嵌套非常多层数组的时候只能通过循环递归来进行扁平。本次分享主要也是分享本题的递归思想。话不多说&#xff0c;开始分…

基于Spring Boot构建淘客返利平台

基于Spring Boot构建淘客返利平台 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将讨论如何基于Spring Boot构建一个淘客返利平台。 淘客返利平台通过…

计算机网络地址划分A-E(自学)

1、网络地址组成 &#xff08;1&#xff09;物理地址MAC&#xff08;Media Access Control Address&#xff09; 网卡生产商分配&#xff0c;全球唯一&#xff0c;48/64位二进制 &#xff08;2&#xff09;逻辑地址IP(Internet Protocol) 网络层地址&#xff0c;用于在不同网…

Handling `nil` Values in `NSDictionary` in Objective-C

Handling nil Values in NSDictionary in Objective-C When working with Objective-C, particularly when dealing with data returned from a server, it’s crucial (至关重要的) to handle nil values appropriately (适当地) to prevent unexpected crashes. Here, we ex…

VBA递归过程快速组合数据

实例需求&#xff1a;数据表包含的列数不固定&#xff0c;有的列&#xff08;数量和位置不固定&#xff09;包含组合数据&#xff0c;例如C2单元格为D,P&#xff0c;说明Unit Config有两种分别为D和P&#xff0c;如下图所示。 现在需要将所有的组合罗列出来&#xff0c;如下所示…

git上传本地项目及更新项目

1、注册GitHub账号和下载git 2、在GitHub上新建一个仓库&#xff0c;点击号——>New repository&#xff0c;给仓库起一个名字&#xff0c;点击Create repository 3、进入要上传的项目中&#xff0c;右键点击git back here&#xff0c;命令行输入git init初始化&#xff0c…

19、复杂链表的复制?、 二叉搜索树与双向链表

题目&#xff1a; 复杂链表的复制&#xff1f; 描述&#xff1a; 输入一个复杂链表&#xff08;每个节点中有节点值&#xff0c;以及两个指针&#xff0c;一个指向下一个节点&#xff0c; 另一个特殊指针指向任意一个节点&#xff09;&#xff0c;返回结果为复制后复杂链表的h…

全球电力电子测试方案专业提供商「艾诺仪器」×企企通召开项目启动会,推进企业采购数智化升级

导读 供应链管理已成为企业的核心竞争力之一&#xff0c;为应对快速变化的市场环境&#xff0c;艾诺仪器亟需强化采购管理和供应链协同的竞争力。SRM涉及到各事业部、各所属企业等多个层面&#xff0c;希望通过双方优势资源的整合&#xff0c;打造高效协同、科学智能的数字化采…

第十三站:Java蓝宝石——云计算的浩瀚天空

Java作为一门成熟且广泛使用的编程语言&#xff0c;在云计算领域扮演着重要的角色。以下是对Java在云计算领域应用的详细讲解&#xff1a; 云服务提供商的Java SDK: Amazon Web Services (AWS): 提供了AWS SDK for Java&#xff0c;允许开发者在Java应用程序中轻松集成AWS服务&…

数据挖掘概览

数据挖掘(Data Mining)就是从大量的,不完全的,有噪声的,模糊的,随机的实际应用数据中,提取隐含在其中的,人们事先不知道的,但又是潜在有用的信息和知识的过程. 预测性数据挖掘 分类 定义&#xff1a;分类就是把一些新的数据项映射到给定类别中的某一个类别 分类流程&#x…

深入理解Java集合框架:使用与实现

深入理解Java集合框架:使用与实现 引言 集合框架是Java语言的重要组成部分,提供了用于存储和操作数据的各种集合类和接口。无论是数组、列表、集合还是映射,Java集合框架都为开发者提供了丰富的工具和灵活的解决方案。在本篇文章中,我们将深入探讨Java集合框架的基本概念…

Python | Leetcode Python题解之第189题轮转数组

题目&#xff1a; 题解&#xff1a; def reverse(nums: List[int], left, right) -> None:i, j left, rightwhile i < j:nums[i], nums[j] nums[j], nums[i]i1j-1 class Solution:def rotate(self, nums: List[int], k: int) -> None:n len(nums)k % nreverse(num…

Midway + TypeORM项目部署到BT后启动失败,MySQL报错

Midway TypeORM项目部署到BT后启动失败&#xff0c;MySQL报错 前沿 您需要先了解这篇文章&#xff1a;https://blog.csdn.net/weixin_45687201/article/details/139336111 错误日志 服务状态开启后就失败项目日志&#xff0c;输出 \> my-midway-project1.0.0 start \&…

python练习题—传染问题(治愈)

传染(infect)某种传染病第一天只有一个患者&#xff0c;前五天为潜伏期&#xff0c;不发作也不会传染人第6天开始发作&#xff0c;从发作到治愈需要5天时间&#xff0c;期间每天传3个人 求第N天共有多少患者 思路&#xff1a; 开始时认为可以进行判断五天或者五天十天后进行计算…

[modern c++][11] 非类型模板参数

前言&#xff1a; 我们再使用 std::get 的时候发现其模板并不是一个类型&#xff0c;而是一个整数值&#xff0c;用来标识从某个位置获取值&#xff0c;比如 std::pair 类型的数据 tmppair&#xff0c;那么就可以通过 std::get<0>(tmppair)来获取key的值&#xff0c;通过…

【Python新手入门指南】Linux-conda环境安装与使用参考

文章目录 前言一、conda是什么&#xff1f;二、安装步骤三、使用Conda来管理Python环境1. 创建环境2. 激活环境3. 安装软件包4. 查看环境5. 删除环境&#xff1a;如果您不再需要某个环境&#xff0c;可以使用以下命令将其删除&#xff1a; 前言 如果你是一位经验丰富的Python开…

oracle merge的使用

Oracle中的MERGE语句是一个非常强大的工具&#xff0c;它允许用户在一个SQL语句中同时执行INSERT和UPDATE操作。以下是关于Oracle MERGE语句的详细使用说明&#xff1a; 1. 基本语法 MERGE INTO target_table USING source_table ON (merge_condition) WHEN MATCHED THEN …

【SQL Server数据库】熟悉DBMS的基本操作及数据库的创建

目录 一、SQL SERVER基本操作 二、用Management Studio创建数据库 1、使用Management Studio创建数据库bookdb&#xff0c;各项参数采用默认设置。 2、使用Management Studio创建数据库EDUC 3. 在EDUC中创建三个表&#xff0c;根据下面要求创建Student&#xff0c;Course&am…

昇思25天学习打卡营第01天|基本介绍快速入门

一、什么是昇思MindSpore&#xff1f; 昇思MindSpore是一个全场景深度学习框架&#xff0c;详见基本介绍 那什么是深度学习呢&#xff1f; 深度学习是一种特殊的机器学习&#xff0c;主要是利用了多层神经网络模拟人脑&#xff0c;自动提取特征并进行预测。 什么是机器学习…