SpringBoot(Java)实现MQTT连接(本地Mosquitto)通讯调试

1.工作及使用背景

        工作中需要跟收集各种硬件或传感器数据用于Web展示及统计计算分析,如电表、流量计、泵、控制器等物联网设备。

        目前的思路及解决策略是使用力控或者杰控等组态软件实现数据的转储(也会涉及收费问题),通过组态软件自带的转储工具将数据转储到关系型数据库,如MySQL、sqlLite、Postgresql等。然后在BS架构后台程序中通过定时刷数据或者查询时计算的方式进行统计分析计算。

        但上述解决方案实际上是实现简单,但是数据统计时机有潜在的偏差风险,且逻辑设计非常别扭,数据库压力大等问题,理论上应该通过消息队列来接收实时数据参与计算的方式,Web系统只负责展示计算统计之后的结果,这样无论是时效还是数据准确性更容易保证,实时数据存储的数据库压力也不存在(可做数据校验用,也可不用),逻辑也不显别扭。

2.开发环境及工具

JDK1.8、maven、Mosquitto、IDEA、postman

3.框架结构及文件声明

因为我用的现成的框架,所以启动模块和业务模块分开了。实际开发调试中完全可以放一起也没关系。

MqttClientConnectorPool对外提供一个初始化的Mqtt客户端,在服务启动时初始化
MqttMsgSender对外提供一个可以执行消息发送的方法
MqttMsgSubscriber初始化一个Mqtt客户端,并根据配置订阅topic
TestController接收web请求的调用消息发送,用于测试
BusinessApplicationStartup服务启动时执行,调用MqttClientConnectorPool初始化一个客户端并调起MqttMsgSubscriber的监听等待
BusinessApplicationShutdown服务正常终止时调用,关闭服务启动默认创建的Mqtt客户端
MqttBrokerServerSpringBoot服务启动类

4.具体实现逻辑及代码

4.1 maven依赖

<properties><MQTTv3.version>1.2.5</MQTTv3.version>
</properties><dependencyManagement><dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>${MQTTv3.version}</version></dependency></dependencies>
</dependencyManagement>或者直接使用
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>1.2.5</version>
</dependency>

4.2 MqttClientConnectorPool

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttClientConnectorPool {public static MqttClient mqttClient;/*** 连接MQTT客户端* @return 获取MQTT连队对象*/public static MqttClient connectMQTT() {if (mqttClient != null){log.info("已存在,我深深的脑海!");return mqttClient;}try {// broker及连接信息String broker = "tcp://127.0.0.1:1883";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";//创建MQTT客户端(指定broker、客户端id、消息持久策略)mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());//创建连接参数配置MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);//连接超时时间options.setKeepAliveInterval(20);//是否自动重连options.setAutomaticReconnect(true);mqttClient.connect(options);log.info("MqttClient 服务启动broker初始化!");} catch (MqttException e){log.error("MqttClient connect Error:{}", e.getMessage());e.printStackTrace();}return mqttClient;}/*** 关闭MQTT客户端* @param client client*/public static void closeClient(MqttClient client){try {// 断开连接client.disconnect();// 关闭客户端client.close();} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}/*** 关闭MQTT客户端*/public static void closeStaticClient(){try {if (mqttClient != null){// 断开连接mqttClient.disconnect();// 关闭客户端mqttClient.close();}} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}
}

4.3 MqttMsgSender

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;@Slf4j
public class MqttMsgSender {public void sendMessage(MqttClient client,String topic,String content,int qos){MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);try{client.publish(topic,message);} catch (MqttException e){log.error("MqttClient publish text info Error:{}!", e.getMessage());e.printStackTrace();}}
}

4.4 MqttMsgSubscriber

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttMsgSubscriber {String broker = "tcp://127.0.0.1:1883";String topic = "/deviceUp";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";int qos = 1;public void readSubscribeTopicMessage(){try {MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());// 连接参数MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);options.setConnectionTimeout(60);options.setKeepAliveInterval(60);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {log.error("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {log.info("topic为: " + topic);log.info("qos为: " + mqttMessage.getQos());log.info("消息内容为: " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {// 当消息被完全传送出去后调用log.info("交付完成 ---Delivery complete!");// 可以在这里处理一些发送完成后的清理工作}});client.connect(options);client.subscribe(topic, qos);} catch (MqttException e){log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage());} catch (Exception e){log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage());}}}

4.5 TestController

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.web.bind.annotation.*;import java.util.List;@Slf4j
@RestController
@RequestMapping()
public class TestController {@GetMapping("/test/mqtt/{msg}")public String testSendMqttMsg(@PathVariable("msg") String msg){log.info("消息内容:{}.", msg);MqttClient mqttClient = MqttClientConnectorPool.connectMQTT();MqttMsgSender sender = new MqttMsgSender();String content = "{" + " \"deviceNo\": \"" + msg + "\"," + " \"val\": 232.5" + "}";String topic = "/deviceUp";int qos = 1;if (null != mqttClient){sender.sendMessage(mqttClient, topic, content, qos);} else {log.info("MqttClient为空,无法发送!");return "失败!";}return "成功!";}}

4.6 BusinessApplicationStartup

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import 包路径(可以删掉这一行手动导入).MqttMsgSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;@Slf4j
@Order(10)
@Component
public class BusinessApplicationStartup implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("MqttClientConnectorPool ===================== Startup");MqttClientConnectorPool.connectMQTT();log.info("MqttClientConnectorPool ===================== recoveryAllJob Over !");log.info("MqttMsgSubscriber ===================== Startup");// 先订阅等待MqttMsgSubscriber subscriber = new MqttMsgSubscriber();subscriber.readSubscribeTopicMessage();}
}

4.7 BusinessApplicationShutdown

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class BusinessApplicationShutdown implements ApplicationListener<ContextClosedEvent> {@Overridepublic void onApplicationEvent(ContextClosedEvent contextClosedEvent) {log.info("服务终止! shutdown hook, ContextClosedEvent");MqttClientConnectorPool.closeStaticClient();}}

4.8 MqttBrokerServer

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@EnableScheduling
@SpringBootApplication
public class MqttBrokerServer {public static void main(String[] args) {SpringApplication.run(MqttBrokerServer.class, args);}}

5.其他备注

5.1 需要Mqtt(Broker)服务器

        如果是直接使用示例代码的Mqtt服务器(Broker)配置,需要在自己电脑上安装Mqtt服务器,如mosquitto、EMQX等,具体自行搜索,或者使用公用的Mqtt服务器(我没测试试过

// 📢注意,当前Broker本人未测试
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";

5.2 调试地址

如果配置文件没配置[server.servlet.context-path],就不需要我自己/backend这一段

6.参考文章

MQTT协议介绍及Java教程

https://baijiahao.baidu.com/s?id=1801542244354727565&wfr=spider&for=pc

7.喜欢作者

暂无

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/880701.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙开发(NEXT/API 12)【应用间消息通信】手机侧应用开发

在手机侧与穿戴设备侧构建应用到应用的通信隧道&#xff0c;用于收发应用自定义的报文消息以及文件。实现手机应用和穿戴设备应用间的交互&#xff0c;为用户提供分布式场景和体验。比如手机应用发送音频文件到穿戴设备侧应用&#xff0c;实现在穿戴设备侧应用上播放音乐&#…

BUG——IMX6ULL编译正点原子Linux内核报错

最初编译的是正点原子改过的Linux内核&#xff0c;可能是版本问题&#xff0c;一直报错&#xff0c;无法成功编译。然后换成NXP官方Linux内核6.6版本&#xff0c;初始编译虽然也报各种错&#xff0c;但都是缺少库或相关工具&#xff0c;全部安装后就可以成功编译出镜像了&#…

Leetcode 740. 删除并获得点数

原题链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 给你一个整数数组 nums &#xff0c;你可以对它进行一些操作。 每次操作中&#xff0c;选择任意一个 nums[i] &#xff0c;删除它并获得 nums[i] 的点数。之后&#xff0c;你必须删除 所有 等于 nums[i] - 1 和…

cscode搭建vue项目

创建前安装环境 ctrlj弹出终端 window需要管理员运行并且授权 node -v # 显示版本号&#xff0c;说明 node 已经装好 npm -v # 显示版本号&#xff0c;说明 npm 可以使用 # 安装cnpm npm install -g cnpm --registryhttps://registry.npm.taobao.org cnpm -v # 显示版本号&a…

【hot100-java】【合并两个有序链表】

记忆中&#xff0c;两个指针合并即可。 建立哨兵节点dum /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* ListNode(int val, ListNode next) { t…

Docker更换阿里容器镜像源

以Mac为例&#xff0c; 一、获取阿里容器镜像加速器地址 访问阿里云官网https://cn.aliyun.com/ 登录阿里云&#xff0c;没有账号就注册一个 登录完成后在搜索框搜索&#xff0c;容器镜像服务&#xff0c;并打开 点击管理控制台&#xff0c;进入管理控制台 左侧点击镜像加速…

【Python入门】20个Python自动化脚本,解放双手、事半功倍

如果你正在学习Python&#xff0c;那么你需要的话可以&#xff0c;点击这里&#x1f449;Python重磅福利&#xff1a;入门&进阶全套学习资料、电子书、软件包、项目源码等等免费分享&#xff01; 在当今的快节奏工作环境中&#xff0c;自动化不再是一种奢侈&#xff0c;而是…

下一代性能怪兽RTX 5090最新规格更新与Blackwell架构解析

据悉&#xff0c;目前各家AIC厂商已经陆续收到NVIDIA的相关资料&#xff0c;RTX 5090、RTX 5080已经正式进入开案阶段&#xff0c;也就是厂商们开始设计各自的产品方案了。不出意外&#xff0c;年初的CES 2025上会看到RTX 5090/5080的发布。 作为NVIDIA的新一代GPU&#xff0c…

【车联网安全】车端知识调研

一、CAN总线&#xff1a; 1、定义&#xff1a; CAN 总线相当于汽车的神经网络&#xff0c;连接车内各控制系统,其通信采用广播机制&#xff0c;各连接部件均可收发控制消息&#xff0c;通信效率高&#xff0c;可确保通信实时性。当前市场上的汽车至少拥有一个CAN网络&#xff0…

如何进行“服务器内部错误”的诊断 | OceanBase诊断案例

本文作者&#xff1a;任仲禹&#xff0c;爱可生数据库高级工程师&#xff0c;擅长故障分析和性能优化。 的OMS迁移工具具备丰富的功能。但在实际运维场景中&#xff0c;我们可能会遇到各种问题&#xff0c;其中“服务器内部错误”便是一个较为棘手的问题&#xff0c;因为界面上…

【易上手快捷开发新框架技术】nicegui标签组件lable用法庖丁解牛深度解读和示例源代码IDE运行和调试通过截图为证

传奇开心果微博文系列 序言一、标签组件lable最基本用法示例1.在网页上显示出 Hello World 的标签示例2. 使用 style 参数改变标签样式示例 二、标签组件lable更多用法示例1. 添加按钮动态修改标签文字2. 点击按钮动态改变标签内容、颜色、大小和粗细示例代码3. 添加开关组件动…

美图AI短片创作工具MOKI全面开放 支持生成配乐、细节修改

人工智能 - Ai工具集 - 集合全球ai人工智能软件的工具箱网站 美图公司近日宣布&#xff0c;其研发的AI短片创作工具MOKI已正式向所有用户开放。这款专注于AI短片创作的工具&#xff0c;提供了包括动画短片、网文短剧等多种类型视频内容的生成能力&#xff0c;致力于为用户带来…

linux-CMake

linux-CMake 1.安装CMake工具2.单个源文件3.多个源文件4.生成库文件5.将源文件组织到不同的目录下6.可执行文件和库文件放置到单独的目录下7.常见的命令 CMake使用。 1.安装CMake工具 sudo apt-get install cmake2.单个源文件 1.先在文件夹里创建两个文件&#xff1a;main.c&…

Vscode超好看的渐变主题插件

样式效果&#xff1a; 插件使用方法&#xff1a; 然后重启&#xff0c;之后会显示vccode损坏&#xff0c;不用理会&#xff0c;因为这个插件是更改了应用内部代码&#xff0c;直接不再显示即可。

cesium实战代码

代码中有一点bug还没改 cesium地球 地形+地形 <html lang="en"><head><style>.cesium-animation-rectButton .cesium-animation-buttonGlow {filter: url(#animation_blurred); }.cesium-animation-rectButton .cesium-animation-buttonMain {fil…

Win32打开UWP应用

最近无意间发现Windows里一个神奇的文件夹。 shell:appsfolder 运行打开 这个文件夹后&#xff0c;你可以看到本机安装的所有应用程序。 我觉得这个挺方便的&#xff0c;所以做了一个简单的appFolderDialog包给C#用 项目地址&#xff1a;https://github.com/TianXiaTech/App…

基于单片机的多路温度检测系统

**单片机设计介绍&#xff0c;基于单片机CAN总线的多路温度检测系统设计 文章目录 前言概要功能设计设计思路 软件设计效果图 程序设计程序 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师&#xff0c;一名热衷于单片机技术探…

总结拓展十一:S4 HANA和ECC区别

第一节 S/4 HANA系统简介 SAP系统的产品线 R/1版本——主要财务模块R/3版本——基本实现全模块ECC6.0——2005年推出&#xff08;ECC是2004年推出&#xff09;HANA——数据库产品——属于内存数据库BW on HANA——HANA与数据分析相结合 拓展&#xff1a; 数据库类型&#x…

【如何在Linux系统本地快速部署Leanote蚂蚁笔记】

文章目录 前言1. 安装Docker2. Docker本地部署Leanote蚂蚁笔记3. 安装cpolar内网穿透4. 固定Leanote蚂蚁笔记公网地址 前言 本篇文章主要介绍如何在Linux系统本地快速部署Leanote蚂蚁笔记&#xff0c;并且结合cpolar内网穿透实现公网远程访问本地笔记编辑并制作个人博客等。 …

外卖点餐小程序源码系统 单店多门店自助切换 带完整的安装代码包以及搭建部署教程

系统概述 本外卖点餐小程序源码系统旨在帮助餐饮企业和商家快速搭建一个功能完善的在线外卖平台。系统支持单店与多门店的灵活切换&#xff0c;方便商家根据自身业务需求进行管理和运营。同时&#xff0c;系统还提供了丰富的营销工具和数据分析功能&#xff0c;助力商家实现精…