mqtt:测试eclipse paho qos=1的数据重发的功能

# 测试程序

【pom.xml】

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.12.RELEASE</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.49</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.11</version></dependency>
</dependencies>

【MyDemo3MqttV5Server1.java】模拟一个正常的消息接收服务

package com.chz.myMqttV5.demo3;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;import java.util.concurrent.ThreadLocalRandom;@Slf4j
public class MyDemo3MqttV5Server1
{public static void main(String[] args) throws InterruptedException{String broker = "tcp://192.168.44.228:1883";String clientId = "MyDemo3MqttV5Server1";int subQos = 1;try {MqttClient client = new MqttClient(broker, clientId);MqttConnectionOptions options = new MqttConnectionOptions();options.setAutomaticReconnect(true);client.setCallback(new MyDemo3Server1Callback(clientId));client.connect(options);client.subscribe("$share/demo3/device/#", subQos);} catch (MqttException e) {e.printStackTrace();}}@Slf4jpublic static class MyDemo3Server1Callback implements MqttCallback{private String clientId;public MyDemo3Server1Callback(String clientId){this.clientId = clientId;}public void connectComplete(boolean reconnect, String serverURI) {log.info("{}::connectComplete, reconnect={}, serverURI={}", clientId, reconnect, serverURI);}public void disconnected(MqttDisconnectResponse disconnectResponse) {log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());}public void deliveryComplete(IMqttToken token) {log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());}public void messageArrived(String topic, MqttMessage message) throws Exception {log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));}public void mqttErrorOccurred(MqttException exception) {log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());}public void authPacketArrived(int reasonCode, MqttProperties properties) {log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);}}
}

【MyDemo3MqttV5Server2.java】模拟一个工作不正常的消息接收不服务

package com.chz.myMqttV5.demo3;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;import java.util.concurrent.ThreadLocalRandom;@Slf4j
public class MyDemo3MqttV5Server2
{private static int subQos = 1;public static void main(String[] args) throws InterruptedException{String broker = "tcp://192.168.44.228:1883";String clientId = "MyDemo3MqttV5Server2";try {MqttClient client = new MqttClient(broker, clientId);MqttConnectionOptions options = new MqttConnectionOptions();options.setAutomaticReconnect(true);options.setKeepAliveInterval(3);        // keepAliveInterval设置成3秒client.setCallback(new MyDemo3Server2Callback(clientId, client));client.connect(options);} catch (MqttException e) {e.printStackTrace();}}@Slf4jpublic static class MyDemo3Server2Callback implements MqttCallback{private String clientId;private MqttClient client;public MyDemo3Server2Callback(String clientId, MqttClient client){this.clientId = clientId;this.client = client;}public void connectComplete(boolean reconnect, String serverURI) {log.info("{}::connectComplete, reconnect={}, serverURI={}", clientId, reconnect, serverURI);try {if( client.isConnected() ){client.subscribe("$share/demo3/device/#", subQos);}} catch (MqttException e) {log.error("err", e);}}public void disconnected(MqttDisconnectResponse disconnectResponse) {log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());}public void deliveryComplete(IMqttToken token) {log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());}public void messageArrived(String topic, MqttMessage message) throws Exception {log.info("{}::messageArrived start, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));if( ThreadLocalRandom.current().nextInt() % 20 ==0 ){try {log.info("{}::messageArrived ------------error1, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));// 休眠10秒,因为前面设置了keepAliveInterval为3秒,所以一定会导致连接断开Thread.sleep(10000);log.info("{}::messageArrived ------------error2, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));} catch (Exception e) {log.error("err", e);}}}public void mqttErrorOccurred(MqttException exception) {log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());}public void authPacketArrived(int reasonCode, MqttProperties properties) {log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);}}
}

【MyDemo3MqttV5Sender.java】模拟一个发消息出来的设备

package com.chz.myMqttV5.demo3;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;@Slf4j
public class MyDemo3MqttV5Sender {private static String clientId = MyDemo3MqttV5Sender.class.getSimpleName();public static void main(String[] args) throws InterruptedException {String broker = "tcp://192.168.44.228:1883";int subQos = 1;int pubQos = 1;String msg;try {MqttClient client = new MqttClient(broker, clientId);MqttConnectionOptions options = new MqttConnectionOptions();client.setCallback(new MyDemo3SenderCallback(clientId));client.connect(options);client.subscribe("device/#", subQos);for(int i=0; i<200; i++){int id = i;msg = id+"";MqttMessage message = new MqttMessage(msg.getBytes());message.setId(id);        // 这个id很重要,否则qos=1不会生效message.setQos(pubQos);   // 设置qos=1client.publish("device/1", message);Thread.sleep(1L);}} catch (MqttException e) {e.printStackTrace();}}@Slf4jpublic static class MyDemo3SenderCallback implements MqttCallback{private String clientId;public MyDemo3SenderCallback(String clientId){this.clientId = clientId;}public void connectComplete(boolean reconnect, String serverURI) {log.info("{}::MyMqttCallback, reconnect={}, serverURI={}", clientId, reconnect, serverURI);}public void disconnected(MqttDisconnectResponse disconnectResponse) {log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());}public void deliveryComplete(IMqttToken token) {log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());}public void messageArrived(String topic, MqttMessage message) throws Exception {log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));}public void mqttErrorOccurred(MqttException exception) {log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());}public void authPacketArrived(int reasonCode, MqttProperties properties) {log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);}}
}

# 开始测试

启动【MyDemo3MqttV5Server1、MyDemo3MqttV5Server2】
然后启动【MyDemo3MqttV5Sender】,输出日志如下:
在这里插入图片描述
可以看到【MyDemo3MqttV5Server2】接收到消息【74】的时候卡了10秒,然后连接就断开了。
在这里插入图片描述
可以看到【MyDemo3MqttV5Server2】断开连接之后,消息【74】之后的后续消息都被【MyDemo3MqttV5Server1】接收到了。换句话说不会因为某个消息接收服务的问题导致消息丢失。

# 参考资料

mqtt服务emqx的安装可以参考https://blog.csdn.net/chenhz2284/article/details/139411874

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/51254.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity横版动作游戏 -瓦片地形和动画瓦片

(规则瓦片)瓦片地形和动画瓦片 准备阶段 在Tilemap中创建一个新的文件夹起名叫做Rule Tile&#xff0c;创建一个Rule Tile&#xff0c;用来设置瓦片地形&#xff0c;我们将用他来绘制地形图&#xff0c;类似于Godot中的瓦片地形。 这里给他取名为了Ground 1&#xff0c;用于创…

PVE环境中调整虚拟机磁盘大小

我的希望将PVE中的虚拟机磁盘调整一下&#xff0c;增加20GB。在查询了一些资料后&#xff0c;做一下总结教程。 环境是 PVE8.2.2 版本&#xff0c;虚拟机系统是centos7.9.2009-minimal&#xff0c; 安装系统时划分磁盘分区方式是默认分区方式&#xff08;不同分区方式下&#…

pinia安装及简介

pinia简介 基本特点 轻量级&#xff1a;Pinia相比于传统的Vuex&#xff0c;体积更小&#xff0c;性能更好&#xff0c;只有大约1KB左右。 简化API&#xff1a;Pinia简化了状态管理库的使用方法&#xff0c;抛弃了Vuex中的mutations&#xff0c;只保留了state、getters和actions…

WHAT - 一个 Github 仓库的 License 如何解读

目录 一、背景二、解读许可证说明的作用常见的开源许可证类型使用他人代码仓库时需要注意的事项结论 实践作为开发者1. 选择许可证类型2. 在 README 文件中编写许可证信息 作为使用者1. 确定权限2. 了解和遵守条款 总结 一、背景 我们经常在一些 Github 仓库里看到 License 部…

探索 `pyjwt`:Python 中的 JWT 处理专家

文章目录 探索 pyjwt&#xff1a;Python 中的 JWT 处理专家简介&#xff1a;为何选择 pyjwt&#xff1f;什么是 pyjwt&#xff1f;安装 pyjwtpyjwt 的基本使用1. 编码JWT2. 解码JWT3. 验证签名4. 过期时间5. 自定义头部 场景应用场景一&#xff1a;用户登录场景二&#xff1a;A…

使用标量函数实现 EF Core 的实用方法

一.介绍 在构建应用程序时&#xff0c;您可能使用标量函数在数据库端实现一些逻辑。在 SQL 中&#xff0c;标量函数是一种对单个值或少量输入值进行操作并始终返回单个值作为输出的函数。这些函数本质上是可重复使用的代码块&#xff0c;用于对数据执行计算或操作。 以下是标…

doccano安装与使用

1.安装 &#xff08;1&#xff09;创建虚拟环境 conda create -n doccano conda activate doccano &#xff08;2&#xff09;安装doccano pip install doccano &#xff08;3&#xff09;doccano初始化 doccano init doccano createuser --username admin --password pa…

chk是什么文件格式 chk文件怎么恢复正常 chkdsk文件损坏怎么修复

在使用电脑和移动存储设备时&#xff0c;有时我们会发现磁盘中出现了大量的chk文件。这些chk文件无法打开&#xff0c;也无法得知其原本内容。那么&#xff0c;这些chk文件是什么呢&#xff1f;又该如何将chk文件恢复正常呢&#xff1f; chk文件是什么&#xff1f; 在我们查看…

【前端 12】js事件绑定

JavaScript 事件绑定 在Web开发中&#xff0c;事件绑定是实现用户与网页交互的重要机制。JavaScript 提供了多种方式来绑定和处理事件&#xff0c;使得开发者能够灵活地控制网页的行为。本文将详细介绍JavaScript中事件绑定的两种主要方式&#xff0c;并通过实例演示如何应用这…

Python+Pytest+Allure+Yaml+Pymysql+Jenkins+GitLab运行原理

PythonPytestAllureYamlPymysqlJenkinsGitLab运行原理逻辑及调用关系 GitLab代码仓&#xff1a; Jenkins工作空间&#xff1a; 代码&#xff1a; 测试报告展示&#xff1a;

<Python><paddle>基于python使用百度paddleocr实现车牌识别

前言 paddleocr是百度飞桨的一个文字识别库&#xff0c;准确度非常高&#xff0c;基于其文字识别的基础&#xff0c;将其用于车牌识别。这个识别的准确度是相当高的。 环境配置 系统&#xff1a;windows 平台&#xff1a;visual studio code 语言&#xff1a;python 库&#…

计算机网络知识点面试总结4

#来自ウルトラマンゼロ&#xff08;赛罗&#xff09; 1 传输层提供的服务 1.1 功能 传输层向它上面的应用层提供通信服务&#xff0c;它属于面向部分的最高层&#xff0c;同时也是用户功能中的最底层。 为运行在不同主机上的进程之间提供了逻辑通信。 传输层的功能&#xff1…

react中如何避免父子组件同时渲染(memo的使用)

1.需求说明 react的渲染机制是父子组件同时渲染&#xff0c;不管子组件是否有变化只要父组件重新渲染了子组件就跟着重新渲染。为了避免不必要的消耗&#xff0c;我们可以使用memo钩子函数 2.使用memo前展示 import { memo,useState } from "react"function Son()…

20240728 每日AI必读资讯

Google Gemini 聊天机器人更新 可以免费使用Gemini 1.5 Flash 1. 引入Gemini 1.5 Flash模型&#xff1a; • 提供更快和更高质量的响应。 • 提升推理和图像理解能力。 • 上下文窗口扩大到 32Ktokens&#xff0c;允许进行更长的对话和处理更复杂的问题。 • 即将支持通过 Goo…

深入理解计算机系统 CSAPP 练习题12.4

我们每次都用read_set初始化ready_set是因为我们每次都处理read_set里的描述符,这是我们希望服务器做的事情.每次一有描述符3或描述符0,select函数会更新ready_set ,我们判断更新后ready_set的情况.然后干对应的事. 由此可以看到select函数的神奇之处,它把一个复杂的事情简单化…

软件测试---网络基础、HTTP

一、网络基础 &#xff08;1&#xff09;Web和网络知识 网络基础TCP/IP 使用HTTP协议访问Web WWW万维网的诞生 WWW万维网的构成 &#xff08;2&#xff09;IP协议 &#xff08;3&#xff09;可靠传输的TCP和三次握手策略 &#xff08;4&#xff09;域名解析服务DNS &#xff0…

Vue3-拉开序幕的setup

Vue3 中的 setup 是一个新的配置项&#xff0c;值是一个函数。 export default {name: App,setup: function () {} } </script> 和 Vue2 中的 data 一样&#xff0c;我也可以将 setup 简写成为 export default {name: App,setup() {} } setup函数的使用 与 Vue2 不一样…

详细介绍MLP的原理

什么是MLP MLP&#xff08;Multi-Layer Perceptron&#xff09;&#xff0c;即多层感知机&#xff0c;是一种前馈型人工神经网络。它由一个输入层、一个输出层以及至少一个隐藏层&#xff08;输入层和输出层中间的层&#xff09;组成。每个神经元&#xff08;或称为节点&#x…

【Django】 js实现动态赋值、显示show隐藏hide效果

文章目录 需要达到的前端效果预览&#xff1a;实现步骤复制bootstrp代码&#xff08;buttons&#xff09;复制bootstrp代码&#xff08;Alert警告框&#xff09;写js测试效果 需要达到的前端效果预览&#xff1a; {% load static %} <!DOCTYPE html> <html lang"…

十分钟速通 MySQL —— CRUD

表格的结构 在之前的课程中我们已经学习了关系型数据库的表格&#xff0c;我们再来回顾-下表格由哪些元素构成 表由表名、行、列、列名构成表名是表的名称列名表示列的名字&#xff0c;列名不可以重复表格实质上是一个二维数组&#xff0c;行和列都是从0开始数的(数组的特性) …