spring boot集成mqtt协议发送和订阅数据

maven的pom.xml引入包
        <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.3.6.RELEASE</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId><version>5.3.4.RELEASE</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.3.4.RELEASE</version></dependency>
mqtt.yml配置文件
spring:mqtt:username: adminpassword: beyond_2021url: tcp://192.168.3.100:1883client-id: data-clientIdserver-id: data-serverIddata-topic: data/#will-topic: data-willwill-content: data server offlinecompletion-timeout: 10000
初始化MQTT配置bean
package com.beyond.config;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;import java.security.SecureRandom;
import java.util.Date;@Configuration
@IntegrationComponentScan
public class MqttConfig {private static final Logger log = LoggerFactory.getLogger(MqttConfig.class);@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client-id}")private String clientId;@Value("${spring.mqtt.server-id}")private String serverId;@Value("${spring.mqtt.data-topic:data/#}")private String dataTopic;@Value("${spring.mqtt.will-topic}")private String willTopic;@Value("${spring.mqtt.will-content}")private String willContent;/*** @desc 连接超时*/@Value("${spring.mqtt.completion-timeout}")private int completionTimeout ;@Beanpublic MqttConnectOptions getMqttConnectOptions(){// MQTT的连接设置MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();// 设置连接的用户名mqttConnectOptions.setUserName(username);// 设置连接的密码mqttConnectOptions.setPassword(password.toCharArray());// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,// 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,// 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息mqttConnectOptions.setCleanSession(true);// 设置发布端地址,多个用逗号分隔, 如:tcp://111:1883,tcp://222:1883// 当第一个111连接上后,222不会在连,如果111挂掉后,重试连111几次失败后,会自动去连接222mqttConnectOptions.setServerURIs(hostUrl.split(","));// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(20);mqttConnectOptions.setAutomaticReconnect(true);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。mqttConnectOptions.setWill(willTopic, willContent.getBytes(), 2, false);mqttConnectOptions.setMaxInflight(1000000);return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}/*** @desc 发送通道配置 默认主题* @date 2021/3/16*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线String clientIdStr = clientId + new SecureRandom().nextInt(10);MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdStr, mqttClientFactory());//async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)messageHandler.setAsync(true);messageHandler.setAsyncEvents(true);messageHandler.setDefaultTopic(dataTopic);messageHandler.setDefaultQos(1);return messageHandler;}/*** @desc 发送通道* @date 2021/3/16*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** @desc 接收通道* @date 2021/3/16*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** @desc 配置监听的 topic 支持通配符* @date 2021/3/16*/@Beanpublic MessageProducer inbound() {//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线String serverIdStr = serverId + new SecureRandom().nextInt(10);MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(serverIdStr, mqttClientFactory(), dataTopic);adapter.setCompletionTimeout(completionTimeout);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** @desc 通过通道获取数据 订阅的数据* @date 2021/3/16*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String payload = message.getPayload().toString();String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();//处理订阅topic:(data/#)到的所有的数据}};}/*** @desc mqtt连接失败或者订阅失败时,触发MqttConnectionFailedEvent事件* @date 2021/7/22*@param event* @return void*/@EventListener(MqttConnectionFailedEvent.class)public void mqttConnectionFailedEvent(MqttConnectionFailedEvent event) {log.error("mqttConnectionFailedEvent连接mqtt失败: " +"date={}, hostUrl={}, username={}, error={}",new Date(), hostUrl, username, event.getCause().getMessage());}/*** @desc 当async和async事件(async-events)都为true时,将发出MqttMessageSentEvent* 它包含消息、主题、客户端库生成的消息id、clientId和clientInstance(每次连接客户端时递增)* @date 2021/7/22*@param event* @return void*/@EventListener(MqttMessageSentEvent.class)public void mqttMessageSentEvent(MqttMessageSentEvent event) {log.info("mqttMessageSentEvent发送信息: date={}, info={}", new Date(), event.toString());}/*** @desc 当async和async事件(async-events)都为true时,将发出MqttMessageDeliveredEvent* 当客户端库确认传递时,将发出MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使传递与发送相关。* @date 2021/7/22*@param event* @return void*/@EventListener(MqttMessageDeliveredEvent.class)public void mqttMessageDeliveredEvent(MqttMessageDeliveredEvent event) {log.info("mqttMessageDeliveredEvent发送成功信息: date={}, info={}", new Date(), event.toString());}/*** @desc 成功订阅到主题,MqttSubscribedEvent事件就会被触发(多个主题,多次触发)* @date 2021/7/22*@param event* @return void*/@EventListener(MqttSubscribedEvent.class)public void mqttSubscribedEvent(MqttSubscribedEvent event) {log.info("mqttSubscribedEvent订阅成功信息: date={}, info={}", new Date(), event.toString());}}
mqtt发送数据网关配置
package com.beyond.data.component;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @desc MQTT发送网关* @date 2021/3/12*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGatewayComponent {void sendToMqtt(String data);void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
发送数据到mqtt伪代码
@Autowired
private MqttGatewayComponent mqttGatewayComponent;//发送字符串或json字符串,到指定的topic
mqttGatewayComponent.sendToMqtt("json string", "data/abcd");

参考链接:
https://blog.csdn.net/sinat_21184471/article/details/87186186
https://blog.csdn.net/qq_29467891/article/details/107043225?utm_source=app
https://blog.csdn.net/myinsert/article/details/107715538

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/50369.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python使用pymysql和sqlalchemy访问MySQL的区别

Python使用pymysql和sqlalchemy访问MySQL的区别 1. 两个数据库连接工具的对比 pymysql和sqlalchemy是两个Python中经常用于与MySQL数据库交互的库。都可以连接MySQL数据库&#xff0c;但它们有明显的区别。 &#xff08;1&#xff09;特点 pymysql是一个Python模块&#xf…

C++,类的特殊函数练习

设计一个Per类&#xff0c;类中包含私有成员:姓名、年龄、指针成员身高、体重&#xff0c;再设计一个Stu类&#xff0c;类中包含私有成员:成绩、Per类对象p1&#xff0c;设计这两个类的构造函数、析构函数和拷贝构造函数。 #include <iostream> using namespace std;cla…

无类别域间路由(Classless Inter-Domain Routing, CIDR):理解IP网络和子网划分(传统的IP地址类ABCDE:分类网络)

文章目录 无类别域间路由&#xff08;CIDR&#xff09;&#xff1a;理解IP网络和子网划分引言传统的IP地址类关于“IP地址的浪费” IP地址与CIDRIP地址概述网络号与主机号CIDR记法&#xff08;网络 网络地址/子网掩码&#xff09;网络和广播地址 CIDR的优势减少路由表项缓解IP…

leetcode做题笔记91. 解码方法

一条包含字母 A-Z 的消息通过以下映射进行了 编码 &#xff1a; A -> "1" B -> "2" ... Z -> "26" 要 解码 已编码的消息&#xff0c;所有数字必须基于上述映射的方法&#xff0c;反向映射回字母&#xff08;可能有多种方法&#xff…

PDF校对:追求文档的精准与完美

随着数字化时代的到来&#xff0c;PDF已经成为了多数机构和个人首选的文件格式&#xff0c;原因在于它的稳定性、跨平台特性以及统一的显示效果。但是&#xff0c;对于任何需要公开或正式发布的文档&#xff0c;确保其内容的准确性是至关重要的&#xff0c;这就是PDF校对显得尤…

item_search_shop-获得淘宝/天猫店铺的所有商品

一、接口参数说明&#xff1a; item_search_shop-获得店铺的所有商品&#xff0c;点击更多API调试&#xff0c;请移步注册API账号点击获取测试key和secret 公共参数 请求地址: https://api-gw.onebound.cn/taobao/item_search_shop 名称类型必须描述keyString是调用key&…

图像分割unet系列------TransUnet详解

图像分割unet系列------TransUnet详解 1、TransUnet结构2、我关心的问题3、总结与展望 TransUnet发表于2021年&#xff0c;它是对UNet非常重要的改进&#xff0c;专为医学图像分割任务设计&#xff0c;特别用于在医学图像中分割器官或病变等解剖结构。 1、TransUnet结构 TransU…

学习网络编程No.2【深入理解TCP/IP】

引言&#xff1a; 北京时间&#xff1a;2023/8/9/13:04&#xff0c;昨天在摆烂中把网络基础相关知识的博客更新&#xff0c;依然还是上不了C站热榜&#xff0c;我估计是因为我账号热度不够没有上榜资格&#xff0c;也可能是因为前段时间没有积极更新&#xff0c;导致周榜被甩出…

【力扣每日一题】2023.8.18 3n块披萨

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们一个披萨&#xff0c;分成了3n块&#xff0c;每次我们可以选择一块&#xff0c;而我们的两个小伙伴会拿走我们选的披萨的相邻的…

js识别图片中的文字插件 tesseract.js

使用方法及步骤 1.安装依赖 npm i tesseract.js 2.引入插件 import { createWorker } from tesseract.js;//worker多线程引入这个import Tesseract from tesseract.js;//js单线程引入这个 3.使用插件识别图片 //使用worker线程识别(async () > {console.time()const wo…

Redis大key问题的排查与解决

什么是 Redis 大 key&#xff1f; 大 key 并不是指 key 的值很大&#xff0c;而是 key 对应的 value 很大。 一般而言&#xff0c;下面这两种情况被称为大 key&#xff1a; String 类型的值大于 10 KB&#xff1b; Hash、List、Set、ZSet 类型的元素的个数超过 5000个&#…

.bat批处理打开多个程序

作为程序员每天开机都需要打开idea、数据库、xshell等开发软件&#xff0c;操作相对繁琐&#xff0c;于是想起了批处理来帮忙一键启动。 在桌面新建一个txt文件&#xff0c;改后缀名为.bat&#xff0c;并加上下面的代码。 代码一&#xff08;推荐&#xff09; cd /d C:\Users…

uni-app的nvue文件国际化不翻译问题解决办法

官网上的nvue国际化方式介绍的实在是太简单了&#xff0c;记得要引入下message文件&#xff0c;还要用uni.setLocale()设置下&#xff0c;但是不管我怎么引入都会报错。 所以我直接把文件拿过来了&#xff0c;通过对象的方式去取。 <!-- index.nvue 文件 --> <view&g…

WebSocket 中的心跳是什么,有什么作用?

在网络应用开发中&#xff0c;WebSocket 是一种重要的通信协议&#xff0c;它允许客户端和服务器之间建立持久性的双向通信连接。然而&#xff0c;为了保持连接的稳定性&#xff0c;WebSocket 中的心跳是一个不可或缺的概念。本文将详细介绍 WebSocket 中的心跳是什么&#xff…

博客系统前端页面(项目实战系列1)

目录 前言&#xff1a; 1.前端 1.1博客列表页 1.1.1博客列表页效果预览图 1.1.2实现导航栏 1.1.3实现版心个人信息博客列表 1.2博客详情页 1.2.1博客详情页效果预览图 1.2.2实现导航栏 版心个人信息 1.2.3实现博客正文 1.3登录页 1.3.1登录页效果预览图 1.3.2导航…

Redis是如何保证高可用的?

Redis这种基于内存的关系型数据库我们在选用的时候就是考虑到它的快。而且可以很方便的实现诸如分布式锁、消息队列等功能。 笔者在前一段秋招面试的时候就被提问&#xff0c;“Redis是怎么保证高可用的&#xff1f;” 后续的子问题包含&#xff0c;集群模式是怎么实现的&…

睿趣科技:抖音开网店要怎么找货源

在当今数字化的时代&#xff0c;电商平台的兴起为越来越多的人提供了开设网店的机会&#xff0c;而抖音作为一个充满活力的短视频平台&#xff0c;也为创业者提供了广阔的发展空间。然而&#xff0c;对于许多初次涉足电商领域的人来说&#xff0c;找到合适的货源却是一个重要的…

Qt应用开发(拓展篇)——示波器/图表 QCustomPlot

一、介绍 QCustomPlot是一个用于绘图和数据可视化的Qt C小部件。它没有进一步的依赖关系&#xff0c;提供友好的文档帮助。这个绘图库专注于制作好看的&#xff0c;出版质量的2D绘图&#xff0c;图形和图表&#xff0c;以及为实时可视化应用程序提供高性能。 QCustomPl…

../../ 目录遍历

在web功能设计中,很多时候我们会要将需要访问的文件定义成变量&#xff0c;从而让前端的功能便的更加灵活。 当用户发起一个前端的请求时&#xff0c;便会将请求的这个文件的值(比如文件名称)传递到后台&#xff0c;后台再执行其对应的文件。 在这个过程中&#xff0c;如果后…

前端工程化概述

软件工程定义&#xff1a;将工程方法系统化地应用到软件开发中 前端发展历史 前端工程化的发展历史可以追溯到互联网的早期阶段&#xff0c;随着前端技术的不断演进和互联网应用的复杂化&#xff0c;前端工程化也逐渐成为了前端开发的重要领域。以下是前端工程化的主要发展里程…