java项目应用MQTT传输数据

一、概述

近期做的一个项目需要传输数据给第三方。根据协定,采用MQTT进行数据的发送和订阅。一般来说,不通系统进行数据对接,一般采用RESTFul接口,走http。mqtt的话,顾名思义,就是一个消息队列。相比RESTFul接口,MQTT方式也许有个好处就是,数据传输给对方后,对方可以收到一个提醒。这个提醒来自于消息队列,不用自己搞。利用这个提醒,也许可以做点啥。除此之外,我不知道还有什么更多的好处。

MQTT的要素:
1)broker,经纪人,即代理地址,如:tcp://10.0.2.18:1883
2)clientID,客户端ID,如"Client001"; 客户端标识,可以自定义,但不能跟receiver同名
3)topic,// 要发布的主题,接收端据此接收,如”monkey/huaguo-moutain“。主题一经定义,可以多次使用。
4)qos,质量服务等级 0,1,2。2最高。

依赖:

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

二、发送

如果单纯发送,客户端无须安装mqtt。Java中发送消息代码如下:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttPublisher {String broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String clientId     = "Client001"; // 客户端标识String topic        = "mqttdemo/mytopic001"; // 要发布的主题int qos             = 2; // 质量服务等级MqttClient client = null;public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;this.client = new MqttClient(broker, clientId, new MemoryPersistence());MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println("发布者正在连接到broker代理 : " + broker);this.client.connect(connOpts);System.out.println("发布者连接成功!");}public void SendMessage(String content){try {//System.out.println("发布者发送的消息: " + content);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);this.client.publish(topic, message);System.out.println("发布者已经发送消息!");} catch (Exception e) {e.printStackTrace();}}public void DisconnectMqtt() throws MqttException {this.client.disconnect();System.out.println("Disconnected");}
}

三、订阅

如果想接收mqtt消息,本机则要安装mqtt服务。windows可安装一个名为mosquitto的软件。但是,它天然好像不对外,如果想被外部访问,比如局域网的其他机器访问,要做一些设置。具体如何设置,我还不清楚。

java中接收消息代码如下:

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttReceiver {String broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String clientId     = "Client002"; // 客户端标识String topic        = "mqttdemo/mytopic001"; // 订阅的主题int qos             = 2; // 质量服务等级MqttReceiver(String broker, String clientId, String topic, int qos){this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;}public void StartMqttReceiver(){try {MqttClient sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println("正在连接到 broker 代理: " + this.broker);sampleClient.connect(connOpts);System.out.println("接受者连接成功!");// 设置回调sampleClient.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("链接丢失!");}public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("接收者接收到了消息: " + topic + " : " + new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {// Called when a message has been delivered}});// 订阅sampleClient.subscribe(this.topic, this.qos);System.out.println("订阅的topic是: " + this.topic);} catch (Exception e) {e.printStackTrace();}}
}

四、测试

订阅和发送,可以是同一个IP,也就是自己发给自己。但注意订阅和发送的clientID不能相同。比如以下代码,自己发给自己,特别利于测试:

import org.eclipse.paho.client.mqttv3.MqttException;public class MainClass {public static void main(String[] args) throws MqttException, InterruptedException {String recever_broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String recever_clientId     = "ClientReceiver001"; // 客户端标识String recever_topic        = "ocean/south/message/status"; // 订阅的主题int recever_qos             = 2; // 质量服务等级MqttReceiver mqttReceiver = new MqttReceiver(recever_broker, recever_clientId, recever_topic, recever_qos);mqttReceiver.StartMqttReceiver();String publisher_broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String publisher_clientId     = "ClientPublisher002"; // 客户端标识String publisher_topic        = "ocean/south/message/status"; // 发布的主题int publisher_qos             = 2; // 质量服务等级MqttPublisher mqttPublisher = new MqttPublisher(publisher_broker, publisher_clientId, publisher_topic,publisher_qos);int cnt = 0;while(true){cnt ++;mqttPublisher.SendMessage("{\"msg\":\"send some message to you!\",\"data\":"+Integer.toString(cnt)+"}");Thread.sleep(1000);}}
}

在这里插入图片描述

五、重连

以上例子还比较简单,需要考虑连接失败或突然断掉的时候重连。

1、发送端重连

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.nio.charset.StandardCharsets;public class MqttPublisher extends Thread {String broker; //"tcp://10.0.2.18:1883";String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟receiver同名String topic; // 要发布的主题,接收端据此接收int qos; // 质量服务等级 0,1,2。2最高。MqttClient client = null;public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;this.client = new MqttClient(broker, clientId, new MemoryPersistence());_connect();}public boolean sendMessage(String content) {boolean ok = false;MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));message.setQos(qos);int count = 2;while (!ok && count > 0) {count--;try {this.client.publish(topic, message);ok = true;} catch (MqttException e) {System.err.println(e.getMessage());reconnect(1);} catch (Exception e) {System.err.println(e.getMessage());}}return ok;}public void disconnectMqtt() throws MqttException {this.client.disconnect();System.out.println("Disconnected");}public boolean reconnect(long retryTimes) {boolean ok = false;long count = retryTimes;while (!ok && count > 0) {count--;ok = _reconnect();}return ok;}private boolean _reconnect() {boolean ok = false;try {// 关闭现有连接if (this.client != null && client.isConnected()) {this.client.disconnect();}// 重新连接ok = _connect();} catch (MqttException e) {// 处理重新连接失败的情况System.err.println(e.getCause());}return ok;}private boolean _connect() {boolean ok = false;MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.print(String.format("正在连接到远程mqtt服务器: %s ......", broker));try {this.client.connect(connOpts);ok = true;System.out.println("连接成功!");} catch (MqttException e) {System.out.println("连接失败!");}return ok;}}

2、订阅端重连

订阅端重连当时是采用这样的思路:连接失败时延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次,直到成功。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class MqttReceiver {String broker; //"tcp://10.0.2.18:1883";String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟publish同名String topic; // 要发布的主题,接收端据此接收int qos; // 质量服务等级 0,1,2。2最高。private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);public MqttReceiver(String broker, String clientId, String topic, int qos) {this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;}public void StartMqttReceiver() {MqttClient sampleClient = null;try {sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);// 设置回调MqttClient finalSampleClient = sampleClient;sampleClient.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("监听mqtt服务器连接丢失!");scheduleReconnect(finalSampleClient);}public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format("接收到消息(%s): %s", topic, new String(message.getPayload())));}public void deliveryComplete(IMqttDeliveryToken token) {//消息传递完成//System.out.println("a message has been delivered");}});// 订阅sampleClient.subscribe(this.topic, this.qos);sampleClient.connect(connOpts);System.out.println(String.format("正在连接到监听mqtt服务器: %s 成功", this.broker));} catch(MqttException e){System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));scheduleReconnect(sampleClient);} catch (Exception e) {System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));}}private void scheduleReconnect(MqttClient sampleClient) {if(sampleClient == null) return;final Runnable reconnectTask = () -> {try {if (!sampleClient.isConnected()) {// 尝试重新连接sampleClient.connect();// 重新订阅sampleClient.subscribe(topic, qos);System.out.println("重新连接监听mqtt服务器成功!");// 取消定时任务,因为连接成功了//scheduler.shutdownNow();}} catch (MqttException e) {System.out.println("重新连接监听mqtt服务器出现异常: " + e.getMessage());}};// 延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次scheduler.scheduleWithFixedDelay(() -> {reconnectTask.run();}, 10, 30, TimeUnit.SECONDS);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/582767.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

透彻掌握GIT基础使用

网址 https://learngitbranching.js.org/?localezh_CN 清屏 clear重新开始reset

科荣AIO UtilServlet存在任意文件读取漏洞

文章目录 产品简介漏洞概述指纹识别漏洞利用修复建议 产品简介 科荣AIO是一款企业管理软件&#xff0c;提供企业一体化管理解决方案。它整合了ERP&#xff08;如进销存、财务管理&#xff09;、OA&#xff08;办公自动化&#xff09;、CRM&#xff08;客户关系管理&#xff09…

阿里云数据库PolarDB费用价格_MySQL版_PolarDB_分布式版

阿里云数据库PolarDB租用价格表&#xff0c;云数据库PolarDB MySQL版2核4GB&#xff08;通用&#xff09;、2个节点、60 GB存储空间55元5天&#xff0c;云数据库 PolarDB 分布式版标准版2核16G&#xff08;通用&#xff09;57.6元3天&#xff0c;阿里云百科aliyunbaike.com分享…

Leetcode—1572.矩阵对角线元素的和【简单】

2023每日刷题&#xff08;七十三&#xff09; Leetcode—1572.矩阵对角线元素的和 实现代码 class Solution { public:int diagonalSum(vector<vector<int>>& mat) {int n mat.size();if(n 1) {return mat[0][0];}int sum 0;int i 0, j n - 1;while(i &…

Bean 生命周期 和 SpringMVC 执行过程

这里简单记录下 Bean 生命周期的过程&#xff0c;方便自己日后面试用。源码部分还没看懂&#xff0c;这里先贴上结论 源码 结论

遥感技术应用于作物类型种植面积估算实例

1.农作物遥感分类 1.1 利用多时相环境星 CCD 数据作物分类识别实验 采用支持向量机分类器进行基于象素遥感影像分类方法。在分类过程中&#xff0c;分别对不同日期的单景环境星数据以及不同日期环境星数据的组合进行分类&#xff0c;以评价环境星在作物分类中的应用潜力&#x…

云计算:OpenStack 配置云主机实例的资源实现内网互通

目录 一、实验 1. 环境 2.配置项目及用户 3.配置规格实例与镜像 4.配置VPC 5. 配置安全组 6. 创建云主机 cs_01 &#xff08;cirros系统&#xff09; 7.创建云主机 cs_02 &#xff08;cirros系统&#xff09; 8.创建云主机 cs_03 &#xff08;cirros系统&#xff09; …

NFC物联网智能锁安全测试研究

针对短距离无线通信在物联网智能锁实际运用中的安全机制问题&#xff0c;通过理论分析和实际操作演示潜在的攻击流程&#xff0c;发现其存在的安全漏洞并提出可行的加固方法&#xff0c;并对加固后的通信系统进行CPN建模与安全性分析&#xff0c;对无线通信协议的安全性能提升、…

处理urllib.request.urlopen报错UnicodeEncodeError:‘ascii‘

参考&#xff1a;[Python3填坑之旅]一urllib模块网页爬虫访问中文网址出错 目录 一、报错内容 二、报错截图 三、解决方法 四、实例代码 五、运行截图 六、其他UnicodeEncodeError: ascii codec 问题 一、报错内容 UnicodeEncodeError: ascii codec cant encode charac…

Springboot使用log4j2日志框架

文章目录 1.pom.xml引入依赖2.配置文件引入log4j2的配置文件3.导入log4j2配置文件4.通过Slf4j注解来使用log.info()等最后 1.pom.xml引入依赖 提示&#xff1a;lombok用于Slf4j注解 <dependency><groupId>org.springframework.boot</groupId><artifactId&…

UG NX二次开发(C++)-通过两点和高度创建长方体

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 1、前言2、采用UFun函数来创建长方体3、采用NXOpen方法实现两点和高度创建长方体4、验证1、前言 在UG NX二次开发时,我们通常会采用ufun函数来完成功能的开发,但是有些功能在ufun函数中不能找到…

牛客网SQL训练5—SQL大厂真题面试

文章目录 一、某音短视频1.各个视频的平均完播率2.平均播放进度大于60%的视频类别3.每类视频近一个月的转发量/率4.每个创作者每月的涨粉率及截止当前的总粉丝量5.国庆期间每类视频点赞量和转发量6.近一个月发布的视频中热度最高的top3视频 二、用户增长场景&#xff08;某度信…

关于Python里xlwings库对Excel表格的操作(二十二)

这篇小笔记主要记录如何【用“.number_format ”函数获取单元格的文本各种属性】。前面的小笔记已整理成目录&#xff0c;可点链接去目录寻找所需更方便。 【目录部分内容如下】【点击此处可进入目录】 &#xff08;1&#xff09;如何安装导入xlwings库&#xff1b; &#xff0…

Python新年炫酷烟花秀代码

新年马上就要到来&#xff0c;烟花秀必须得安排上&#xff01; Pygame 绘制烟花的基本原理 1&#xff0c;发射阶段&#xff1a;在这一阶段烟花的形状是线性向上&#xff0c;通过设定一组大小不同、颜色不同的点来模拟“向上发射” 的运动运动&#xff0c;运动过程中 5个点被赋…

硅像素传感器文献调研(三)

写在前面&#xff1a; 引言&#xff1a;也是先总结前人的研究结果&#xff0c;重点论述其不足之处。 和该方向联系不大&#xff0c;但还是有值得学习的地方。逻辑很清晰&#xff0c;易读性很好。 1991年—场板半阻层 使用场板和半电阻层的高压平面器件 0.摘要 提出了一种…

线程基础知识(三)

前言 之前两篇文章介绍了线程的基本概念和锁的基本知识&#xff0c;本文主要是学习同步机制&#xff0c;包括使用synchronized关键字、ReentrantLock等&#xff0c;了解锁的种类&#xff0c;死锁、竞争条件等并发编程中常见的问题。 关键字synchronized synchronied关键字可…

出现频率高达80%的软件测试常见面试题合集(内附详细答案)

最近看到网上流传着各种面试经验及面试题&#xff0c;往往都是一大堆技术题目贴上去&#xff0c;但是没有答案。 为此我业余时间整理了这份软件测试基础常见的面试题及详细答案&#xff0c;望各路大牛发现不对的地方不吝赐教&#xff0c;留言即可。 01 软件测试理论部分 1.1…

Java线程池执行流程及参数详解

线程池的定义分为以下几个部分&#xff1a; public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {...}我们分别来看…

leaflet学习笔记-初始化vue项目(一)

leaflet简介 Leaflet是一款开源的轻量级交互式地图可视化JavaScript库&#xff0c;能够满足大多数开发者的地图可视化需求&#xff0c;其最早的版本大小仅仅38 KB。Leaflet能够在主流的计算机或移动设备上高效运行&#xff0c;其功能可通过插件进行扩展&#xff0c;拥有易于使用…

金睛云华斩获ISC2023数字安全创新能力百强 五项殊荣!

昨日&#xff0c;由北京市通州区人民政府指导&#xff1b;中关村科技园区通州园管理委员会、ISC平台主办的ISC2023数字安全创新能力百强颁奖典礼在京举行。金睛云华以卓越的产品创新实力&#xff0c;一举斩获网络与流量安全、威胁检测与响应、AI驱动安全、创新力十强、年度十强…