mqtt:测试eclipse paho qos=1的数据重发的功能

# 测试程序

【pom.xml】

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.12.RELEASE</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.49</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.11</version></dependency>
</dependencies>

【MyDemo3MqttV5Server1.java】模拟一个正常的消息接收服务

package com.chz.myMqttV5.demo3;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;import java.util.concurrent.ThreadLocalRandom;@Slf4j
public class MyDemo3MqttV5Server1
{public static void main(String[] args) throws InterruptedException{String broker = "tcp://192.168.44.228:1883";String clientId = "MyDemo3MqttV5Server1";int subQos = 1;try {MqttClient client = new MqttClient(broker, clientId);MqttConnectionOptions options = new MqttConnectionOptions();options.setAutomaticReconnect(true);client.setCallback(new MyDemo3Server1Callback(clientId));client.connect(options);client.subscribe("$share/demo3/device/#", subQos);} catch (MqttException e) {e.printStackTrace();}}@Slf4jpublic static class MyDemo3Server1Callback implements MqttCallback{private String clientId;public MyDemo3Server1Callback(String clientId){this.clientId = clientId;}public void connectComplete(boolean reconnect, String serverURI) {log.info("{}::connectComplete, reconnect={}, serverURI={}", clientId, reconnect, serverURI);}public void disconnected(MqttDisconnectResponse disconnectResponse) {log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());}public void deliveryComplete(IMqttToken token) {log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());}public void messageArrived(String topic, MqttMessage message) throws Exception {log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));}public void mqttErrorOccurred(MqttException exception) {log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());}public void authPacketArrived(int reasonCode, MqttProperties properties) {log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);}}
}

【MyDemo3MqttV5Server2.java】模拟一个工作不正常的消息接收不服务

package com.chz.myMqttV5.demo3;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;import java.util.concurrent.ThreadLocalRandom;@Slf4j
public class MyDemo3MqttV5Server2
{private static int subQos = 1;public static void main(String[] args) throws InterruptedException{String broker = "tcp://192.168.44.228:1883";String clientId = "MyDemo3MqttV5Server2";try {MqttClient client = new MqttClient(broker, clientId);MqttConnectionOptions options = new MqttConnectionOptions();options.setAutomaticReconnect(true);options.setKeepAliveInterval(3);        // keepAliveInterval设置成3秒client.setCallback(new MyDemo3Server2Callback(clientId, client));client.connect(options);} catch (MqttException e) {e.printStackTrace();}}@Slf4jpublic static class MyDemo3Server2Callback implements MqttCallback{private String clientId;private MqttClient client;public MyDemo3Server2Callback(String clientId, MqttClient client){this.clientId = clientId;this.client = client;}public void connectComplete(boolean reconnect, String serverURI) {log.info("{}::connectComplete, reconnect={}, serverURI={}", clientId, reconnect, serverURI);try {if( client.isConnected() ){client.subscribe("$share/demo3/device/#", subQos);}} catch (MqttException e) {log.error("err", e);}}public void disconnected(MqttDisconnectResponse disconnectResponse) {log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());}public void deliveryComplete(IMqttToken token) {log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());}public void messageArrived(String topic, MqttMessage message) throws Exception {log.info("{}::messageArrived start, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));if( ThreadLocalRandom.current().nextInt() % 20 ==0 ){try {log.info("{}::messageArrived ------------error1, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));// 休眠10秒,因为前面设置了keepAliveInterval为3秒,所以一定会导致连接断开Thread.sleep(10000);log.info("{}::messageArrived ------------error2, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));} catch (Exception e) {log.error("err", e);}}}public void mqttErrorOccurred(MqttException exception) {log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());}public void authPacketArrived(int reasonCode, MqttProperties properties) {log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);}}
}

【MyDemo3MqttV5Sender.java】模拟一个发消息出来的设备

package com.chz.myMqttV5.demo3;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;@Slf4j
public class MyDemo3MqttV5Sender {private static String clientId = MyDemo3MqttV5Sender.class.getSimpleName();public static void main(String[] args) throws InterruptedException {String broker = "tcp://192.168.44.228:1883";int subQos = 1;int pubQos = 1;String msg;try {MqttClient client = new MqttClient(broker, clientId);MqttConnectionOptions options = new MqttConnectionOptions();client.setCallback(new MyDemo3SenderCallback(clientId));client.connect(options);client.subscribe("device/#", subQos);for(int i=0; i<200; i++){int id = i;msg = id+"";MqttMessage message = new MqttMessage(msg.getBytes());message.setId(id);        // 这个id很重要,否则qos=1不会生效message.setQos(pubQos);   // 设置qos=1client.publish("device/1", message);Thread.sleep(1L);}} catch (MqttException e) {e.printStackTrace();}}@Slf4jpublic static class MyDemo3SenderCallback implements MqttCallback{private String clientId;public MyDemo3SenderCallback(String clientId){this.clientId = clientId;}public void connectComplete(boolean reconnect, String serverURI) {log.info("{}::MyMqttCallback, reconnect={}, serverURI={}", clientId, reconnect, serverURI);}public void disconnected(MqttDisconnectResponse disconnectResponse) {log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());}public void deliveryComplete(IMqttToken token) {log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());}public void messageArrived(String topic, MqttMessage message) throws Exception {log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));}public void mqttErrorOccurred(MqttException exception) {log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());}public void authPacketArrived(int reasonCode, MqttProperties properties) {log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);}}
}

# 开始测试

启动【MyDemo3MqttV5Server1、MyDemo3MqttV5Server2】
然后启动【MyDemo3MqttV5Sender】,输出日志如下:
在这里插入图片描述
可以看到【MyDemo3MqttV5Server2】接收到消息【74】的时候卡了10秒,然后连接就断开了。
在这里插入图片描述
可以看到【MyDemo3MqttV5Server2】断开连接之后,消息【74】之后的后续消息都被【MyDemo3MqttV5Server1】接收到了。换句话说不会因为某个消息接收服务的问题导致消息丢失。

# 参考资料

mqtt服务emqx的安装可以参考https://blog.csdn.net/chenhz2284/article/details/139411874

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/51254.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity横版动作游戏 -瓦片地形和动画瓦片

(规则瓦片)瓦片地形和动画瓦片 准备阶段 在Tilemap中创建一个新的文件夹起名叫做Rule Tile&#xff0c;创建一个Rule Tile&#xff0c;用来设置瓦片地形&#xff0c;我们将用他来绘制地形图&#xff0c;类似于Godot中的瓦片地形。 这里给他取名为了Ground 1&#xff0c;用于创…

C++——vector在使用过程中迭代器失效的几个常见情景

在使用vector的时候我们可以会遇到需要在已有对象的某个数据之前插入一个数据再对被插入的数据进行修改操作&#xff0c;情况如下&#xff1a; 这段代码在编译器的过程中并不会出现问题&#xff0c;可是一旦运行程序就会出现程序崩溃&#xff0c;这里就是迭代器失效的第一种情况…

如何使用 HTTPie 进行高效的 HTTP 请求

如何使用 HTTPie 进行高效的 HTTP 请求 引言 HTTPie 是一个命令行 HTTP 客户端&#xff0c;它以其简洁的语法和人性化的输出格式赢得了广大开发者的喜爱。与 curl 相比&#xff0c;HTTPie 提供了更加直观和用户友好的接口&#xff0c;使得执行 HTTP 请求变得轻松愉快。本文将…

PVE环境中调整虚拟机磁盘大小

我的希望将PVE中的虚拟机磁盘调整一下&#xff0c;增加20GB。在查询了一些资料后&#xff0c;做一下总结教程。 环境是 PVE8.2.2 版本&#xff0c;虚拟机系统是centos7.9.2009-minimal&#xff0c; 安装系统时划分磁盘分区方式是默认分区方式&#xff08;不同分区方式下&#…

pinia安装及简介

pinia简介 基本特点 轻量级&#xff1a;Pinia相比于传统的Vuex&#xff0c;体积更小&#xff0c;性能更好&#xff0c;只有大约1KB左右。 简化API&#xff1a;Pinia简化了状态管理库的使用方法&#xff0c;抛弃了Vuex中的mutations&#xff0c;只保留了state、getters和actions…

WHAT - 一个 Github 仓库的 License 如何解读

目录 一、背景二、解读许可证说明的作用常见的开源许可证类型使用他人代码仓库时需要注意的事项结论 实践作为开发者1. 选择许可证类型2. 在 README 文件中编写许可证信息 作为使用者1. 确定权限2. 了解和遵守条款 总结 一、背景 我们经常在一些 Github 仓库里看到 License 部…

探索 `pyjwt`:Python 中的 JWT 处理专家

文章目录 探索 pyjwt&#xff1a;Python 中的 JWT 处理专家简介&#xff1a;为何选择 pyjwt&#xff1f;什么是 pyjwt&#xff1f;安装 pyjwtpyjwt 的基本使用1. 编码JWT2. 解码JWT3. 验证签名4. 过期时间5. 自定义头部 场景应用场景一&#xff1a;用户登录场景二&#xff1a;A…

使用标量函数实现 EF Core 的实用方法

一.介绍 在构建应用程序时&#xff0c;您可能使用标量函数在数据库端实现一些逻辑。在 SQL 中&#xff0c;标量函数是一种对单个值或少量输入值进行操作并始终返回单个值作为输出的函数。这些函数本质上是可重复使用的代码块&#xff0c;用于对数据执行计算或操作。 以下是标…

商品中心关于缓存热key的解决方案

缓存热key一旦被击穿&#xff0c;流量势必会打到数据库&#xff0c;如果数据库崩了&#xff0c;游戏直接结束。 从两点来讨论&#xff1a;如何监控、如何解决。 如何监控 通过业务评估&#xff1a;比如营销活动推出的商品或者热卖的商品。基于LRU的命令&#xff0c;redis-cl…

doccano安装与使用

1.安装 &#xff08;1&#xff09;创建虚拟环境 conda create -n doccano conda activate doccano &#xff08;2&#xff09;安装doccano pip install doccano &#xff08;3&#xff09;doccano初始化 doccano init doccano createuser --username admin --password pa…

Java整理20

1、数据校验 Validation数据校验&#xff08;1&#xff09;实现org.springframework.validation.Validator接口&#xff0c;在代码中调用这个类&#xff08;2&#xff09;按照BeanValidation方式来校验&#xff0c;通过注解方式&#xff08;3&#xff09;基于方法实现校验&…

chk是什么文件格式 chk文件怎么恢复正常 chkdsk文件损坏怎么修复

在使用电脑和移动存储设备时&#xff0c;有时我们会发现磁盘中出现了大量的chk文件。这些chk文件无法打开&#xff0c;也无法得知其原本内容。那么&#xff0c;这些chk文件是什么呢&#xff1f;又该如何将chk文件恢复正常呢&#xff1f; chk文件是什么&#xff1f; 在我们查看…

Cocos Creator2D游戏开发-(2)Cocos 常见名词

场景&#xff08;Scene): 它一个容器&#xff0c;容纳游戏中的各个元素&#xff0c;如精灵&#xff0c;标签&#xff0c;节点对象。它负责着游戏的运行逻辑&#xff0c;以帧为单位渲染这些内容。就是你理解到的那个场景; 个人理解就是一个画面, 一个游戏不同的关卡,会有不同的…

【前端 12】js事件绑定

JavaScript 事件绑定 在Web开发中&#xff0c;事件绑定是实现用户与网页交互的重要机制。JavaScript 提供了多种方式来绑定和处理事件&#xff0c;使得开发者能够灵活地控制网页的行为。本文将详细介绍JavaScript中事件绑定的两种主要方式&#xff0c;并通过实例演示如何应用这…

Python+Pytest+Allure+Yaml+Pymysql+Jenkins+GitLab运行原理

PythonPytestAllureYamlPymysqlJenkinsGitLab运行原理逻辑及调用关系 GitLab代码仓&#xff1a; Jenkins工作空间&#xff1a; 代码&#xff1a; 测试报告展示&#xff1a;

<Python><paddle>基于python使用百度paddleocr实现车牌识别

前言 paddleocr是百度飞桨的一个文字识别库&#xff0c;准确度非常高&#xff0c;基于其文字识别的基础&#xff0c;将其用于车牌识别。这个识别的准确度是相当高的。 环境配置 系统&#xff1a;windows 平台&#xff1a;visual studio code 语言&#xff1a;python 库&#…

计算机网络知识点面试总结4

#来自ウルトラマンゼロ&#xff08;赛罗&#xff09; 1 传输层提供的服务 1.1 功能 传输层向它上面的应用层提供通信服务&#xff0c;它属于面向部分的最高层&#xff0c;同时也是用户功能中的最底层。 为运行在不同主机上的进程之间提供了逻辑通信。 传输层的功能&#xff1…

基于Gunicorn + Flask + Docker的高并发部署策略

标题&#xff1a;基于Gunicorn Flask Docker的高并发部署策略 引言 随着互联网用户数量的增长&#xff0c;网站和应用程序需要能够处理越来越多的并发请求。Gunicorn 是一个 Python WSGI HTTP 服务器&#xff0c;Flask 是一个轻量级的 Web 应用框架&#xff0c;Docker 是一…

react中如何避免父子组件同时渲染(memo的使用)

1.需求说明 react的渲染机制是父子组件同时渲染&#xff0c;不管子组件是否有变化只要父组件重新渲染了子组件就跟着重新渲染。为了避免不必要的消耗&#xff0c;我们可以使用memo钩子函数 2.使用memo前展示 import { memo,useState } from "react"function Son()…

20240728 每日AI必读资讯

Google Gemini 聊天机器人更新 可以免费使用Gemini 1.5 Flash 1. 引入Gemini 1.5 Flash模型&#xff1a; • 提供更快和更高质量的响应。 • 提升推理和图像理解能力。 • 上下文窗口扩大到 32Ktokens&#xff0c;允许进行更长的对话和处理更复杂的问题。 • 即将支持通过 Goo…