MQTT工具类

项目中用到的MQTT物联网通信协议,记录一下工具类,方便翻阅
用到的依赖:

 <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--分页插件--><!-- https://mvnrepository.com/artifact/com.github.pagehelper/pagehelper-spring-boot-starter --><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper-spring-boot-starter</artifactId><version>1.4.1</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.2</version></dependency>

MqttPushClient.Java

package com.youming.client.equipment.configure;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;/*** 发布连接类* @author wfeil211@foxmail.com* @date 2023-2-11 20:11:46*/
@Slf4j
@Component
public class MqttPushClient {@Lazy@Autowiredprivate PushCallback pushCallback;private static MqttClient client;public static void setClient(MqttClient client) {MqttPushClient.client = client;}public static MqttClient getClient() {return client;}public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);// automaticReconnect 为 true 表示断线自动重连,但仅仅只是重新连接,并不订阅主题;在 connectComplete 回调函数重新订阅options.setAutomaticReconnect(true);MqttPushClient.setClient(client);try {//设置回调类client.setCallback(pushCallback);//client.connect(options);IMqttToken iMqttToken = client.connectWithResult(options);boolean complete = iMqttToken.isComplete();log.info("MQTT连接" + (complete ? "成功" : "失败"));} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布,默认qos为0,非持久化** @param topic       主题名* @param pushMessage 消息*/public void publish(String topic, String pushMessage) {publish(0, false, topic, pushMessage);}/*** 发布** @param qos* @param retained* @param topic* @param pushMessage*/public void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);log.info("发送消息至"+topic+",消息内容:"+pushMessage);if (null == mTopic) {log.error("主题不存在:{}", mTopic);}try {mTopic.publish(message);} catch (Exception e) {log.error("mqtt发送消息异常:", e);}}}

MqttSubClient.Java

package com.youming.client.equipment.configure;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.stereotype.Component;/*** 订阅类* @author wfeil211@foxmail.com* @date 2023-2-11 20:11:46*/
@Slf4j
@Component
public class MqttSubClient {public void subScribeDataPublishTopic(String defaultTopic) {//订阅test_queue主题String mqtt_topic[] = defaultTopic.split(",");for (int i = 0; i < mqtt_topic.length; i++) {subscribe(mqtt_topic[i], 0);//订阅主题}}/*** 订阅某个主题,qos默认为0** @param topic*/public void subscribe(String topic) {subscribe(topic, 0);}/*** 订阅某个主题** @param topic 主题名* @param qos*/public void subscribe(String topic, int qos) {try {MqttClient client = MqttPushClient.getClient();if (client == null) return;client.subscribe(topic, qos);log.info("订阅主题:{}", topic);} catch (MqttException e) {e.printStackTrace();}}}

MqttConfig .java

package com.youming.client.equipment.configure;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;/*** 配置类* @author wfeil211@foxmail.com* @date 2023-2-11 20:11:46*/
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {@Lazy@Autowiredprivate MqttPushClient mqttPushClient;@Lazy@Autowiredprivate MqttSubClient mqttSubClient;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}/*** 连接至mqtt服务器,获取mqtt连接** @return*/@Beanpublic MqttPushClient getMqttPushClient() {//连接至mqtt服务器,获取mqtt连接mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//一连接mqtt,就订阅默认需要订阅的主题(如test_queue)mqttSubClient.subScribeDataPublishTopic(defaultTopic);return mqttPushClient;}}

PushCallback.Java

package com.youming.client.equipment.configure;import com.youming.client.equipment.api.IEquipmentDataHisService;
import com.youming.client.equipment.api.IEquipmentDataService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;/*** 回调类* @author wfeil211@foxmail.com* @date 2023-2-11 20:11:46*/
@Slf4j
@Component
public class PushCallback implements MqttCallback {@Lazy@Autowiredprivate MqttConfig mqttConfig;@Lazy@Autowiredprivate MqttSubClient mqttSubClient;@Autowiredprivate IEquipmentDataHisService equipmentDataHisService;@Autowiredprivate IEquipmentDataService equipmentDataService;@SneakyThrows@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开,正在重连");MqttClient client = MqttPushClient.getClient();while (true){if(client.isConnected()){log.info("重连成功");mqttSubClient.subScribeDataPublishTopic(mqttConfig.getDefaultTopic());return;}client.reconnect();try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}}/*** 发送消息,消息到达后处理方法** @param token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------" + token.isComplete());}/*** 订阅主题接收到消息处理方法** @param topic* @param message*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws UnsupportedEncodingException {// subscribe后得到的消息会执行到这里面,这里在控制台有输出log.info("接收消息主题 : " + topic);log.info("接收消息Qos : " + message.getQos());byte[] bytesData = new byte[message.getPayload().length];for (int i = 0; i < message.getPayload().length; i++) {bytesData[i] = (byte) message.getPayload()[i];}String frame = javax.xml.bind.DatatypeConverter.printHexBinary(bytesData).toLowerCase();log.info("接收消息内容 : " + frame);if(frame.length() == 96){equipmentDataService.asyncSaveOrUpdateEquipment(frame);}if(frame.length() == 74){equipmentDataHisService.asyncSaveOrUpdateEquipmentHis(frame);}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/15547.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单片机第一季:零基础12——I2C和EEPROM

目录 1&#xff0c;EEPROM 2&#xff0c;I2C 2.1&#xff0c;I2C物理层 2.2&#xff0c;I2C协议层 3&#xff0c;AT24C02介绍 4&#xff0c;代码 1&#xff0c;EEPROM 为什么需要EEPROM&#xff1f; 单片机内部的ROM只能在程序下载时进行擦除和改写&#xff0c;但是…

护眼灯全光谱和减蓝光哪个好?推荐五款好用护眼台灯

如今&#xff0c;面临视力下降的问题越来越重视&#xff0c;护眼灯越来越成为人们日常生活中不可或缺的一部分&#xff0c;特别是在工作和学习中使用电脑、手机等电子设备时间较长的人群中。对于护眼灯来说&#xff0c;全光谱和减蓝光都是其主要功能之一&#xff0c;那么哪一种…

aws中opensearch 日志通(Centralized Logging with OpenSearch)2.0(一)

aws日志通2.0 实现全面的日志管理和分析功能 一体化日志摄取 &#xff1a;把aws服务器日志和应用日志传输到opensearch域中无代码日志处理 &#xff1a;在网页控制台中就可以实现数据处理开箱即用 &#xff1a;提供可视化模版&#xff08;nginx、HTTP server &#xff09; 架构…

mysql 主从同步排查和处理 Slave_IO、Slave_SQL

目录 查看主从是否同步 详解Slave_IO、Slave_SQL 判断主从完全同步 各个 Log_File 和 Log_Pos的关系 修复命令 查看主从是否同步 show slave status; Slave_IO_Running、Slave_SQL_Running&#xff0c;这两个值是Yes表示正常&#xff0c;No是异常 使用竖排显示&#xf…

使用 CSS 自定义属性

我们常见的网站日夜间模式的变化&#xff0c;其实用到了 css 自定义属性。 CSS 自定义属性&#xff08;也称为 CSS 变量&#xff09;是一种在 CSS 中预定义和使用的变量。它们提供了一种简洁和灵活的方式来通过多个 CSS 规则共享相同的值&#xff0c;使得样式更易于维护和修改。…

小研究 - 主动式微服务细粒度弹性缩放算法研究(一)

微服务架构已成为云数据中心的基本服务架构。但目前关于微服务系统弹性缩放的研究大多是基于服务或实例级别的水平缩放&#xff0c;忽略了能够充分利用单台服务器资源的细粒度垂直缩放&#xff0c;从而导致资源浪费。为此&#xff0c;本文设计了主动式微服务细粒度弹性缩放算法…

windows端口占用

1.查看当前端口被哪个进程占用了&#xff08;进入到CMD中&#xff09; netstat -ano|findstr "8990"输出结果为&#xff1a; TCP 127.0.0.1:8990 0.0.0.0:0 LISTENING 2700 我们发现8990端口被2700进程占用了 2.基于进程号找进程名称 tasklist|findstr "2700&qu…

【LeetCode每日一题】——566.重塑矩阵

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 矩阵 二【题目难度】 简单 三【题目编号】 566.重塑矩阵 四【题目描述】 在 MATLAB 中&…

【leetcode】977. 有序数组的平方(easy)

给你一个按 非递减顺序 排序的整数数组 nums&#xff0c;返回 每个数字的平方 组成的新数组&#xff0c;要求也按 非递减顺序 排序。 示例 1&#xff1a; 输入&#xff1a;nums [-4,-1,0,3,10] 输出&#xff1a;[0,1,9,16,100] 解释&#xff1a;平方后&#xff0c;数组变为 […

小红书运营推广方法分享

大家好&#xff0c;我是网媒智星&#xff0c;今天跟大家讨论一下小红书的运营推广方法&#xff0c;总结了七点经验分享给大家。 首先&#xff0c;让我们了解一下什么是热门文案。热门文案可从以下三个方面来定义&#xff1a; 1. 阅读量&#xff1a;如果一篇小红书的阅读量达到上…

【RabbitMQ】golang客户端教程1——HelloWorld

一、介绍 本教程假设RabbitMQ已安装并运行在本机上的标准端口&#xff08;5672&#xff09;。如果你使用不同的主机、端口或凭据&#xff0c;则需要调整连接设置。如果你未安装RabbitMQ&#xff0c;可以浏览我上一篇文章Linux系统服务器安装RabbitMQ RabbitMQ是一个消息代理&…

3,this指针、深拷贝浅拷贝、namespace的使用

3&#xff0c;this指针、深拷贝浅拷贝、namespace的使用 3.1this指针3.2深拷贝和浅拷贝3.3namespace的使用 3.1this指针 定义&#xff1a;当前类指向自己地址的常量指针 指针被const修饰&#xff0c;指针指向的内容不能修改 this指针-》类 对象 占不占用大小&#xff1f; this…

《MySQL 实战 45 讲》课程学习笔记(四)

深入浅出索引 索引的出现其实就是为了提高数据查询的效率&#xff0c;就像书的目录一样。 索引的常见模型 哈希表 哈希表是一种以键 - 值&#xff08;key-value&#xff09;存储数据的结构&#xff0c;我们只要输入待查找的值即 key&#xff0c;就可以找到其对应的值即 Val…

docker中涉及的挂载点总结

文章目录 1.场景描述2. 容器信息在主机上位置3. 通过docker run 命令4、通过Dockerfile创建挂载点5、容器共享卷&#xff08;挂载点&#xff09;6、最佳实践&#xff1a;数据容器 1.场景描述 在介绍VOLUME指令之前&#xff0c;我们来看下如下场景需求&#xff1a; 1&#xff…

xshell连接liunx服务器身份验证不能选择password

ssh用户身份验证不能选择password 只能用public key的解决办法 问题现象 使用密码通过Workbench或SSH方式(例如PuTTY、Xshell、SecureCRT等)远程登录ECS实例时&#xff0c;遇到服务器禁用了密码登录方式错误. 可能原因 该问题是由于SSH服务对应配置文件/etc/ssh/sshd_config中…

路由器工作原理

路由器原理 路由概述 路由&#xff1a;跨越从源主机到目标主机的一个互联网络来转发数据包的过程。&#xff08;为数据包选择路径的过程&#xff09; 作用&#xff1a;路由器是连接不同网段的。 转发依据&#xff1a; 路由表&#xff1a;路径选择全看路由表&#xff0c;根…

蓝桥杯2018省赛全球变暖dfs

全球变暖 问题描述格式输入格式输出样例输入样例输出评测用例规模与约定解析参考程序 问题描述 格式输入 格式输出 输出一个整数 样例输入 样例输出 1 评测用例规模与约定 最大运行时间&#xff1a;1s最大运行内存: 256M 解析 采用dfs的方式进行搜索&#xff0c;首先输入地…

【C语言】—— __attribute__((fallthrough))

__attribute__((fallthrough)) 是一个在编译器中使用的特性&#xff0c;用于指示在 switch 语句中的 case 标签中故意省略 break 语句时的意图。它告诉编译器&#xff0c;故意省略 break 是有意为之&#xff0c;而不是出现了错误或遗漏。 当使用 switch 语句时&#xff0c;通常…

Xshell使用是出现全黑或全白问题

Xshell使用是出现全黑或全白问题&#xff0c;这是我实际遇到的问题如图。 解决方式&#xff1a; 设置字体 解决成功&#xff1a;

整理mongodb文档(一):增

个人公众号 整理mongodb文档(一):增 看前提示 本文主要用到的工具是mongodb的db管理工具-----mongo compass。 本文主要讲的是在上述工具中如何对db的增加的操作&#xff0c;对应转化为mongose里面的语句我想应该不需要我帮忙了吧。。。 选用mongose的理由也很简单&#xf…