ruoyi-vue 整合EMQX接收MQTT协议数据

EMQX安装完成后,需要搭建客户端进行接收数据进一步对数据处理,下面介绍基于若依分离版开源框架来整合EMQX方法。

1.application.yml 添加代码

mqtt:hostUrl: tcp://localhost:1883username: devpassword: devclient-id: MQTT-CLIENT-DEVcleanSession: truereconnect: truetimeout: 100keepAlive: 100defaultTopic: client/dev/reportserverTopic: server/dev/reportisOpen: trueqos: 0

2.pom.xml 引入依赖

        <!-- mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--配置文件报错问题--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency>

3.新建 MqttAcceptCallback

package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Description : MQTT接受服务的回调类* @Author : lsyong* @Date : 2023/8/1 16:29*/@Component
public class MqttAcceptCallback implements MqttCallbackExtended {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);@Autowiredprivate MqttAcceptClient mqttAcceptClient;@Autowiredprivate MqttProperties mqttProperties;/*** 客户端断开后触发** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,可以重连");if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {logger.info("【emqx重新连接】....................................................");mqttAcceptClient.reconnection();}}/*** 客户端收到消息触发** @param topic       主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {logger.info("【接收消息主题】:" + topic);logger.info("【接收消息Qos】:" + mqttMessage.getQos());logger.info("【接收消息内容】:" + new String(mqttMessage.getPayload()));//        int i = 1/0;}/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {String[] topics = token.getTopics();for (String topic : topics) {logger.info("向主题【" + topic + "】发送消息成功!");}try {MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");logger.info("【消息内容】:" + s);} catch (Exception e) {logger.error("MqttAcceptCallback deliveryComplete error,message:{}", e.getMessage());e.printStackTrace();}}/*** 连接emq服务器后触发** @param b* @param s*/@Overridepublic void connectComplete(boolean b, String s) {logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================");// 以/#结尾表示订阅所有以test开头的主题// 订阅所有机构主题mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 0);}
}

4.新建 MqttAcceptClient

package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Description : MQTT接受服务的客户端* @Author : lsyong* @Date : 2023/8/1 16:26*/
@Component
public class MqttAcceptClient {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);@Autowiredprivate MqttAcceptCallback mqttAcceptCallback;@Autowiredprivate MqttProperties mqttProperties;public static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttAcceptClient.client = client;}/*** 客户端连接*/public void connect() {MqttClient client;try {client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(),new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setAutomaticReconnect(mqttProperties.getReconnect());options.setCleanSession(mqttProperties.getCleanSession());MqttAcceptClient.setClient(client);// 设置回调client.setCallback(mqttAcceptCallback);client.connect(options);} catch (Exception e) {logger.error("MqttAcceptClient connect error,message:{}", e.getMessage());e.printStackTrace();}}/*** 重新连接*/public void reconnection() {try {client.connect();} catch (MqttException e) {logger.error("MqttAcceptClient reconnection error,message:{}", e.getMessage());e.printStackTrace();}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("========================【开始订阅主题:" + topic + "】========================");try {client.subscribe(topic, qos);} catch (MqttException e) {logger.error("MqttAcceptClient subscribe error,message:{}", e.getMessage());e.printStackTrace();}}/*** 取消订阅某个主题** @param topic*/public void unsubscribe(String topic) {logger.info("========================【取消订阅主题:" + topic + "】========================");try {client.unsubscribe(topic);} catch (MqttException e) {logger.error("MqttAcceptClient unsubscribe error,message:{}", e.getMessage());e.printStackTrace();}}
}

5.新建 MqttCondition

package com.ruoyi.iot.mqtt;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;/*** @Description : 自定义配置,通过这个配置,来控制启动项目的时候是否启动mqtt* @Author : lsyong* @Date : 2023/8/1 16:32*/
public class MqttCondition implements Condition {@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {//1、能获取到ioc使用的beanfactoryConfigurableListableBeanFactory beanFactory = context.getBeanFactory();//2、获取类加载器ClassLoader classLoader = context.getClassLoader();//3、获取当前环境信息Environment environment = context.getEnvironment();String isOpen = environment.getProperty("mqtt.isOpen");return Boolean.valueOf(isOpen);}
}

6.新建 MqttConfig

package com.ruoyi.iot.mqtt;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;/*** @Description : 启动服务的时候开启监听客户端* @Author : lsyong* @Date : 2023/8/1 16:35*/
@Configuration
public class MqttConfig {@Autowiredprivate MqttAcceptClient mqttAcceptClient;/*** 订阅mqtt** @return*/@Conditional(MqttCondition.class)@Beanpublic MqttAcceptClient getMqttPushClient() {mqttAcceptClient.connect();return mqttAcceptClient;}
}

7.新建 MqttProperties

package com.ruoyi.iot.mqtt;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @Description : MQTT配置信息* @Author : lsyong* @Date : 2023/8/1 16:25*/
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户端Id,同一台服务器下,不允许出现重复的客户端id*/private String clientId;/*** 默认连接主题,以/#结尾表示订阅所有以test开头的主题*/private String defaultTopic;/*** 默认服务器发送主题前缀,格式:server:${env}:report:${topic}*/private String serverTopic;/*** 超时时间*/private int timeout;/*** 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制*/private int keepAlive;/*** 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接*/private Boolean cleanSession;/*** 是否断线重连*/private Boolean reconnect;/*** 启动的时候是否关闭mqtt*/private Boolean isOpen;/*** 连接方式*/private Integer qos;/*** 获取默认主题,以/#结尾表示订阅所有以test开头的主题** @return*/public String getDefaultTopic() {return defaultTopic + "/#";}/*** 获取服务器发送主题,格式:server/${env}/report/${topic}** @param topic* @return*/public String getServerTopic(String topic) {return serverTopic + "/" + topic;}
}

8.新建 MqttSendCallBack

package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;/*** @Description : MQTT发送客户端的回调类* @Author : lsyong* @Date : 2023/8/1 16:31*/
@Component
public class MqttSendCallBack implements MqttCallbackExtended {private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class);/*** 客户端断开后触发** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,可以重连");}/*** 客户端收到消息触发** @param topic       主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {logger.info("【接收消息主题】: " + topic);logger.info("【接收消息Qos】: " + mqttMessage.getQos());logger.info("【接收消息内容】: " + new String(mqttMessage.getPayload()));}/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {String[] topics = token.getTopics();for (String topic : topics) {logger.info("向主题【" + topic + "】发送消息成功!");}try {MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");logger.info("【消息内容】:" + s);} catch (Exception e) {logger.error("MqttSendCallBack deliveryComplete error,message:{}", e.getMessage());e.printStackTrace();}}/*** 连接emq服务器后触发** @param b* @param s*/@Overridepublic void connectComplete(boolean b, String s) {logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================");}
}

9.新建 MqttSendClient

package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** @Description : MQTT发送客户端* @Author : lsyong* @Date : 2023/8/1 16:30*/
@Component
public class MqttSendClient {private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class);@Autowiredprivate MqttSendCallBack mqttSendCallBack;@Autowiredprivate MqttProperties mqttProperties;public MqttClient connect() {MqttClient client = null;try {String uuid = UUID.randomUUID().toString().replaceAll("-", "");client = new MqttClient(mqttProperties.getHostUrl(), uuid, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setCleanSession(true);options.setAutomaticReconnect(false);// 设置回调client.setCallback(mqttSendCallBack);client.connect(options);} catch (Exception e) {logger.error("MqttSendClient connect error,message:{}", e.getMessage());e.printStackTrace();}return client;}/*** 发布消息** @param retained 是否保留* @param topic    主题,格式: server:${env}:report:${topic}* @param content  消息内容*/public void publish(boolean retained, String topic, String content) {MqttMessage message = new MqttMessage();message.setQos(mqttProperties.getQos());message.setRetained(retained);message.setPayload(content.getBytes());MqttDeliveryToken token;MqttClient mqttClient = connect();try {mqttClient.publish(mqttProperties.getServerTopic(topic), message);} catch (MqttException e) {logger.error("MqttSendClient publish error,message:{}", e.getMessage());e.printStackTrace();} finally {disconnect(mqttClient);close(mqttClient);}}/*** 关闭连接** @param mqttClient*/public static void disconnect(MqttClient mqttClient) {try {if (mqttClient != null)mqttClient.disconnect();} catch (MqttException e) {logger.error("MqttSendClient disconnect error,message:{}", e.getMessage());e.printStackTrace();}}/*** 释放资源** @param mqttClient*/public static void close(MqttClient mqttClient) {try {if (mqttClient != null)mqttClient.close();} catch (MqttException e) {logger.error("MqttSendClient close error,message:{}", e.getMessage());e.printStackTrace();}}
}

10.新建测试类 MqttController

package com.ruoyi.iot.mqtt;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Description : 测试类* @Author : lsyong* @Date : 2023/8/1 16:35*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttSendClient mqttSendClient;@GetMapping(value = "/publishTopic")public String publishTopic(String topic, String sendMessage) {System.out.println("topic:" + topic);System.out.println("message:" + sendMessage);this.mqttSendClient.publish(false, topic, sendMessage);return "topic:" + topic + "\nmessage:" + sendMessage;}}

放开测试类的访问权限,在com.ruoyi.framework.config 路径下的 SecurityConfig 类中添加如下代码

 .antMatchers("/mqtt/**").permitAll()

11.启动项目进行测试

如果连接不上,确认emqx是否启动成功,详细可以查看Windows安装EMQX(搭建MQTT服务)-CSDN博客

连接成功后可以登入EMQX去查看

浏览器访问 http://localhost:8080/mqtt/publishTopic?sendMessage=你好啊

控制台打印

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/183581.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【物联网与大数据应用】Hadoop数据处理

Hadoop是目前最成熟的大数据处理技术。Hadoop利用分而治之的思想为大数据提供了一整套解决方案&#xff0c;如分布式文件系统HDFS、分布式计算框架MapReduce、NoSQL数据库HBase、数据仓库工具Hive等。 Hadoop的两个核心解决了数据存储问题&#xff08;HDFS分布式文件系统&#…

nexus 制品库管理

目录 一、nexus 介绍 二、nexus 支持的仓库 三、nexus 部署 四、nexus 数据备份 五、创建一个内网yum源 六、创建一个代理yum仓库 七、jenkins 使用 nexus插件 7.1 jenkins 安装插件 7.2 配置 maven 工程 7.3 查看构建和上传 一、nexus 介绍 Nexus 是一个强大的仓库管…

在氮化镓和AlGaN上的湿式数字蚀刻

引言 由于其独特的材料特性&#xff0c;III族氮化物半导体广泛应用于电力、高频电子和固态照明等领域。加热的四甲基氢氧化铵(TMAH)和KOH3处理的取向相关蚀刻已经被用于去除III族氮化物材料中干法蚀刻引起的损伤&#xff0c;并缩小垂直结构。 不幸的是&#xff0c;由于化学蚀…

谱方法学习笔记-下(超详细)

谱方法学习笔记&#x1f4d2; 谱方法学习笔记-上(超详细) 声明&#xff1a;鉴于CSDN使用 K a T e X KaTeX KaTeX 渲染公式&#xff0c; KaTeX \KaTeX KATE​X 与 L a T e X LaTeX LaTeX 不同&#xff0c;不支持直接的交叉引用命令&#xff0c;如\label和\eqref。 KaTeX \KaT…

Docker + Jenkins + Nginx实现前端自动化部署

目录 前言一、前期准备工作1、示例环境2、安装docker3、安装Docker Compose4、安装Git5、安装Nginx和Jenkinsnginx.confdocker-compose.yml 6、启动环境7、验证Nginx8、验证Jenkins 二、Jenkins 自动化部署配置1、设置中文2、安装Publish Over SSH、NodeJS&#xff08;1&#x…

Stream API练习题

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 考虑到Stream API在实际…

关于前端学习的思考-浮动元素和块级元素的关系

先摆关系&#xff1a;浮动元素嵌套块级元素&#xff0c;浮动元素和块级元素是上下关系。 1、浮动元素为父盒子&#xff0c;块级元素为子盒子。 父盒子为浮动元素&#xff0c;子盒子不会继承。如图floatnone&#xff1b; 摆结论&#xff1a;子盒子为行内元素&#xff0c;行内块…

37.从0到上线三天搭建个人网站(第一天)

点赞收藏加关注&#xff0c;你也能住大别墅&#xff01; 挑战三天搭建个人网站 从0到上线 一、项目的主要功能 1.作为自己在网上的一个工作室。 2.发帖 3.展示个人项目连接 4.介绍自己&#xff08;没准儿还能接点活儿&#xff09; 二、UI风格参考 三、技术选型 1.前端&a…

设计规则:模块化的力量

这是一本比较冷门的书**《设计规则&#xff1a;模块化的力量》**&#xff0c;虽然豆瓣上只有58个评价&#xff0c;但是确实能学到很多东西。 这本书对我非常深远。不是是投资&#xff0c;创业&#xff0c;还是其他领域&#xff0c;模块化思想都能帮上你。这本书告诉我们生万物…

数据结构中的二分查找(折半查找)

二分法&#xff1a;顾名思义&#xff0c;把问题一分为2的处理&#xff0c;是一种常见的搜索算法&#xff0c;用于在有序数组或这有序列表中查找指定元素的位置&#xff0c;它的思想是将待搜索的区间不断二分&#xff0c;然后比较目标值与中间元素的大小关系&#xff0c;然后确定…

第八天:信息打点-系统端口CDN负载均衡防火墙

信息打点-系统篇&端口扫描&CDN服务&负载均衡&WAF防火墙 一、知识点 1、获取网络信息-服务器厂商&#xff1a; 阿里云&#xff0c;腾讯云&#xff0c;机房内部等。 网络架构&#xff1a; 内外网环境。 2、获取服务信息-应用协议-内网资产&#xff1a; FTP…

Making Reconstruction-based Method Great Again for Video Anomaly Detection

Making Reconstruction-based Method Great Again for Video Anomaly Detection 文章信息&#xff1a; 发表于ICDM 2022&#xff08;CCF B会议&#xff09; 原文地址&#xff1a;https://arxiv.org/abs/2301.12048 代码地址&#xff1a;https://github.com/wyzjack/MRMGA4VAD…

layui提示框没有渲染bug解决

bug&#xff1a;使用layui时或许是依赖导入又或是ideal和浏览器缓存问题导致前面明明正常的页面显示&#xff0c;后面出现提示框没有css样式&#xff0c;弹出框没有背景css 效果如下 解决后 解决方法 在你的代码中引入layer.js 我这是jsp页面 <script type"text/jav…

Unity求向量A在平面L上的投影向量

如题&#xff1a;求向量A在平面L上的投影向量(图左) 即求 其实等价于求向量&#xff0c;那在中&#xff0c;,所以只需要求即可 而就是在平面L的法向量的投影坐标&#xff0c;所以代码就是 /// <summary>/// 求向量A在平面B上的投影向量/// </summary>/// <para…

Android关于杀掉进程的方案

《风波莫听穿林打叶声》—— 苏轼 〔宋代〕 三月七日&#xff0c;沙湖道中遇雨&#xff0c;雨具先去&#xff0c;同行皆狼狈&#xff0c;余独不觉。已而遂晴&#xff0c;故作此词。 莫听穿林打叶声&#xff0c;何妨吟啸且徐行。 竹杖芒鞋轻胜马&#xff0c;谁怕&#xff1f;一蓑…

记一篇Centos7安装innodb_ruby

安装innodb_ruby过程非常坎坷&#xff0c;这里记录下安装过程&#xff0c;有些坑当时没有记录下来&#xff0c;主要把完成安装过程就记录下来 yum安装ruby默认的会安装ruby2.0.0版本&#xff0c;但是在安装innodb_ruby时&#xff0c;会报错&#xff0c;提示至少需要2.4版本以上…

汽车标定技术(十)--从CPU角度观察Overlay实现原理

目录 1.问题引入 2.功能概述 2.1 P1X 标定功能 2.2 MPC57xx标定功能 2.3 TC3xx标定功能 3.问题分析 3.1 英飞凌CPU子系统猜想 3.2 ARM内核CPU子系统分析 4.小结 1.问题引入 在分析瑞萨RH850-P1x系列、NXP S32K3系列和英飞凌TC3xx系列对标定测量功能的实现时&#xf…

顶级大厂Quora如何优化数据库性能?

Quora 的流量涉及大量阅读而非写入&#xff0c;一直致力于优化读和数据量而非写。 0 数据库负载的主要部分 读取数据量写入 1 优化读取 1.1 不同类型的读需要不同优化 ① 复杂查询&#xff0c;如连接、聚合等 在查询计数已成为问题的情况下&#xff0c;它们在另一个表中构…

TikTok革新挑战者:全球小众创作者的崛起

随着数字娱乐的快速发展&#xff0c;TikTok以其独特的短视频形式和开放的创作平台&#xff0c;成为全球范围内小众创作者崛起的推动者。本文将深入剖析TikTok在这一领域的革新&#xff0c;以及全球范围内小众创作者如何通过这一平台崭露头角。 TikTok&#xff1a;小众创作者的乐…

LeetCode刷题---斐波那契数列模型

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C/C》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 一、第N个泰波那契数 题目链接&#xff1a;1137. 第 N 个泰波那契数 题目描述 泰波那契序列Tn定义如下: T00,T11,T2 1,且在n&g…