使用java +paho mqtt编写模拟发布温度及订阅的过程

  • 启动mqtt 服务
  •  创建项目,在项目中添加模块
  •  
  •  
  • 添加文件夹
    • 添加maven依赖
  •     <dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version></dependency></dependencies>
    • 编写订阅程序  名字没起好 后面有时间再调整
  • import org.eclipse.paho.client.mqttv3.IMqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;public class EngineTemperatureSensor implements Callable<Void> {// ... private members omittedIMqttClient client;public static final String TOPIC = "testTopic1/003";public EngineTemperatureSensor(IMqttClient client) {this.client = client;}@Overridepublic Void call() throws Exception {if ( !client.isConnected()) {return null;}CountDownLatch receivedSignal = new CountDownLatch(10);client.subscribe("testTopic1/003", (topic, msg) -> {byte[] payload = msg.getPayload();// ... payload handling omitted//print out the messageSystem.out.println("Received message: " + new String(payload));receivedSignal.countDown();});receivedSignal.await(1, TimeUnit.MINUTES);//print out the messageSystem.out.println("Published message:2222222222222 " );return null;}}
  • 订阅:

  • import org.eclipse.paho.client.mqttv3.IMqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;public class EngineTemperatureSensor implements Callable<Void> {// ... private members omittedIMqttClient client;public static final String TOPIC = "testTopic1/003";public EngineTemperatureSensor(IMqttClient client) {this.client = client;}@Overridepublic Void call() throws Exception {if ( !client.isConnected()) {return null;}CountDownLatch receivedSignal = new CountDownLatch(10);client.subscribe("testTopic1/003", (topic, msg) -> {byte[] payload = msg.getPayload();// ... payload handling omitted//print out the messageSystem.out.println("Received message: " + new String(payload));receivedSignal.countDown();});receivedSignal.await(1, TimeUnit.MINUTES);//print out the messageSystem.out.println("Published message:2222222222222 " );return null;}}

import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;public class c5M {//main5public static void main(String[] args) {System.out.println("Hello World");String publisherId = UUID.randomUUID().toString();ExecutorService executor = Executors.newSingleThreadExecutor();try {IMqttClient subscriber = new MqttClient("tcp://127.0.0.1:1883", publisherId);MqttConnectOptions options = new MqttConnectOptions();options.setAutomaticReconnect(true);options.setCleanSession(true);options.setConnectionTimeout(10);subscriber.connect(options);// 调用EngineTemperatureSensorEngineTemperatureSensor sensor = new EngineTemperatureSensor(subscriber);executor.submit(sensor); // 提交任务,但不阻塞主线程// 这里可以添加代码来等待用户输入或者其他信号来安全地关闭程序// 例如,你可以使用System.in.read()来等待用户输入System.out.println("Press Enter to exit...");new Scanner(System.in).nextLine(); // 等待用户输入} catch (Exception e) {//print e message//print seperator lineSystem.out.println("))))))))))))))))))))))))");System.out.println(e.getMessage());throw new RuntimeException(e);} finally {// 确保最后关闭ExecutorService和MQTT客户端executor.shutdown(); // 提交的任务将不再被接受try {// 等待任务完成(可选,取决于你是否需要确保所有任务都完成)if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 取消正在执行的任务}} catch (InterruptedException ie) {executor.shutdownNow(); // 当前线程被中断,需要关闭ExecutorServiceThread.currentThread().interrupt(); // 保留中断状态}// 关闭MQTT客户端(如果有必要的话)// 注意:这里可能需要额外的逻辑来处理MQTT客户端的关闭,具体取决于你的实现}}}

发布代码:

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class EngineTemperatureSensor implements Callable<Void> {// ... private members omittedIMqttClient client;public static final String TOPIC = "testTopic1/003";public EngineTemperatureSensor(IMqttClient client) {this.client = client;}@Overridepublic Void call() throws Exception {if ( !client.isConnected()) {return null;}Random rnd = null;//double temp =  80 + rnd.nextDouble() * 20.0;double temp =  10 + 1.1 * 20.0;byte[] payload = String.format("T:%04.2f",temp).getBytes();MqttMessage msg2= new MqttMessage(payload);msg2.setQos(0);msg2.setRetained(true);client.publish(TOPIC,msg2);//print out the messageSystem.out.println("Published message: " + msg2);return null;}}

 

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class mainc3 {// Main methodpublic static void main(String[] args) {System.out.println("Hello World");String publisherId = UUID.randomUUID().toString();ExecutorService executor = Executors.newSingleThreadExecutor();try {IMqttClient publisher = new MqttClient("tcp://127.0.0.1:1883", publisherId);MqttConnectOptions options = new MqttConnectOptions();options.setAutomaticReconnect(true);options.setCleanSession(true);options.setConnectionTimeout(10);publisher.connect(options);// 调用EngineTemperatureSensorEngineTemperatureSensor sensor = new EngineTemperatureSensor(publisher);executor.submit(sensor); // 提交任务,但不阻塞主线程// 这里可以添加代码来等待用户输入或者其他信号来安全地关闭程序// 例如,你可以使用System.in.read()来等待用户输入System.out.println("Press Enter to exit...");new Scanner(System.in).nextLine(); // 等待用户输入} catch (Exception e) {//print e message//print seperator lineSystem.out.println("))))))))))))))))))))))))");System.out.println(e.getMessage());throw new RuntimeException(e);} finally {// 确保最后关闭ExecutorService和MQTT客户端executor.shutdown(); // 提交的任务将不再被接受try {// 等待任务完成(可选,取决于你是否需要确保所有任务都完成)if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 取消正在执行的任务}} catch (InterruptedException ie) {executor.shutdownNow(); // 当前线程被中断,需要关闭ExecutorServiceThread.currentThread().interrupt(); // 保留中断状态}// 关闭MQTT客户端(如果有必要的话)// 注意:这里可能需要额外的逻辑来处理MQTT客户端的关闭,具体取决于你的实现}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/31970.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

夏至的中医理论,提供相关的养生建议。包括饮食、运动、作息等方面的建议。

夏至中医养生建议 夏至&#xff0c;作为夏季的一个重要节气&#xff0c;标志着炎热季节的正式开始。在中医理论中&#xff0c;夏至被视为阳气最旺盛的时期&#xff0c;因此&#xff0c;养生之道需顺应夏季阳盛阴衰的特点&#xff0c;以保护阳气、调整阴阳平衡为核心。以下从饮…

vue3使用富文本

1、下载 pnpm install wangeditor/editor wangeditor/editor-for-vue 2、引入和使用 <Toolbar style"border-bottom: 1px solid #ccc" :editor"editorRef" :defaultConfig"toolbarConfig" mode"default" /><Editorstyle&q…

如何下载和安装SQLynx数据库管理工具? (MySQL作为测试数据库)

目录 1. 官网下载 2. 安装软件 3. 启动SQLynx软件 4. 开始使用 5. 执行第一条SQL语句 6. 总结 SQLynx是一款先进的Web SQL集成开发环境&#xff08;IDE&#xff09;&#xff0c;专为数据库管理、查询和数据分析设计。作为一个基于浏览器的工具&#xff08;同时也支持桌面…

ruby中语法知识

return home 参考链接 理解Ruby中的作用域Ruby 中的类与对象Ruby学习之元编程  Kernel#evel(), Object#instance_evel()、Module#class_evel() 知识点 ruby中include和extend以及模块中ClassMethods Ruby require,load,include,extend的显著区别 Ruby中的 Object、Class、…

二分查找与移除元素有序数组的平方、 长度最小的子数组、螺旋矩阵II

数组 704. 二分查找 704. 二分查找 - 力扣 给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&#xff0c;否则返回 -1。 class Solution { public:…

Spring Cloud Hystrix快速入门demo

1.什么是Spring Cloud Hystrix&#xff1f; Spring Cloud Hystrix 是一个用于处理分布式系统中故障的库。它实现了熔断器模式&#xff0c;可以防止由于故障服务的连锁反应而导致整个系统崩溃。Spring Cloud Hystrix 提供了丰富的功能&#xff0c;如熔断、降级、限流、缓存等&a…

Python xlwt库:写excel表格

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

基于java+springboot+vue实现的电商应用系统(文末源码+Lw)241

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本电商应用系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&a…

当flex-direction: column时,设置flex:1不生效解决办法

当需求是: 页面纵向排列,且最后一个元素撑满剩余高度 flex:1在横向排列时是可以的,但是纵向排列会失效,此时需要给最后一个子元素设置align-self: stretch;即可撑满剩余高度 <div class"father"><div class"child child1"></div><div…

Python抓取高考网图片

Python抓取高考网图片 一、项目介绍二、完整代码一、项目介绍 本次采集的目标是高考网(http://www.gaokao.com/gkpic/)的图片,实现图片自动下载。高考网主页如下图: 爬取的流程包括寻找数据接口,发送请求,解析图片链接,向图片链接发送请求获取数据,最后保存数据。 二…

C++设计模式——Composite组合模式

一&#xff0c;组合模式简介 真实世界中&#xff0c;像企业组织、文档、图形软件界面等案例&#xff0c;它们在结构上都是分层次的。将系统分层次的方式使得统一管理和添加不同子模块变得容易&#xff0c;在软件开发中&#xff0c;组合模式的设计思想和它们类似。 组合模式是…

DDP(Differential Dynamic Programming)算法举例

DDP(Differential Dynamic Programming)算法 基本原理 DDP(Differential Dynamic Programming)是一种用于求解非线性最优控制问题的递归算法。它基于动态规划的思想,通过线性化系统的动力学方程和二次近似代价函数,递归地优化控制策略。DDP的核心在于利用局部二次近似来…

(vue3)引入组件标红,...has no default export 组件没有默认导出

(vue3)引入组件标红&#xff0c;…has no default export 组件没有默认导出 一、项目背景&#xff1a; 创建的vitevue3ts项目页面有标红,但程序不报错 二、原因 由于之前安装了 Vetur 插件&#xff0c;Vetur 默认使用 eslint-plugin-vue&#xff0c;并且强制 export default …

linux升级openssh

在日常开发中&#xff0c;经常会需要升级服务器漏洞&#xff0c;记录一下linux升级openssh相关&#xff0c;服务器版本为centos7.8&#xff0c;升级有两种方案&#xff0c;一种是可以上互联网环境&#xff0c;一种是内网环境&#xff0c;我这边因为是内网环境&#xff0c;只能进…

MySQL中CASE WHEN用法总结

MySQL中CASE WHEN用法总结 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在MySQL中&#xff0c;CASE WHEN语句是一种条件表达式&#xff0c;用于在查询中进行…

【AI应用探讨】—多模态应用场景

目录 1. 自动驾驶技术 多模态传感器融合 技术突破 2. 智能家居领域 多模态交互方式 应用实例 3. 智能客服领域 智能问答与情感分析 提升服务效率 4. 跨模态生成与理解 文字生成图像/视频 图像/视频生成文本 5. 未来发展趋势 多模态解析与生成 价值对齐与伦理考虑…

数据结构——优先级队列(堆)Priority Queue详解

1. 优先级队列 队列是一种先进先出(FIFO)的数据结构&#xff0c;但有些情况下&#xff0c;操作的数据可能带有优先级&#xff0c;一般出队列时&#xff0c;可能需要优先级高的元素先出队列&#xff0c;该场景下&#xff0c;使用队列不合适 在这种情况下&#xff0c;数据结构应…

代理ip服务器有哪些作用?

IP代理服务器具有以下几个作用&#xff1a; 1. 隐藏真实IP地址 代理服务器作为中间人&#xff0c;将用户请求转发给目标网站&#xff0c;从而隐藏用户的真实IP地址。这有助于保护用户的隐私和匿名性&#xff0c;防止被网站或其他人追踪和监控。 2. 绕过访问限制 某些网站可能…

2024.06.21 刷题日记

101. 对称二叉树 判断是否对称&#xff0c;检查 root->left->val root->right->val&#xff0c;接着进行递归检查对称位置&#xff1a; class Solution { public:// 传入对称位置的两个对称位置bool isMirror(TreeNode* left, TreeNode* right) {if (!left &…

设计模式2-面向对象设计原则

设计模式-面向对象的设计原则 依赖倒置原则开闭封闭原则单一职责原则Liskov替换原则接口隔离原则面向对象优先使用对象组合&#xff0c;而不是类继承。封装变化点针对接口编程&#xff0c;而不是针对实现编程 变化是复用的天地。面向对象设计最大的优势在于抵御变化。 重新认识…