04 - 尚硅谷 - MQTT 客户端编程

1.在Java中使用MQTT

1.1 Eclipse Paho Java Client

具体步骤:

1、创建一个Spring Boot项目,添加如下依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version>
</parent><dependencies><!-- spring boot整合junit单元测试的起步依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!-- mqtt java客户端依赖 --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency></dependencies>

2、建立连接代码实现

@Test
public void createConnection() throws MqttException {// 定义链接相关参数String broker = "tcp://localhost:1883";String username = "zhangsan";String password = "123";String clientid = "mqtt_java_client_01";// 创建MqttJava客户端对象// MqttClientPersistence: 代表一个持久的数据存储,用于在传输过程中存储出站和入站的信息MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());   MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());client.connect(options);// 阻塞当前线程while (true) ;
}

3、发布消息代码演示

@Test
public void sendMessage() throws MqttException {// 定义链接相关参数String broker = "tcp://localhost:1883";String username = "zhangsan";String password = "123";String clientid = "mqtt_java_client_01";// 创建MqttJava客户端对象MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());client.connect(options);// 创建消息对象QoSString content = "hello mqtt";MqttMessage message = new MqttMessage(content.getBytes());message.setQos(2);message.setRetained(true);// 发送消息client.publish("a/c" , message);// 关闭链接释放资源client.disconnect();client.close();}

4、订阅主题获取消息

@Test
public void receiveMessage() throws MqttException {// 定义链接相关参数String broker = "tcp://localhost:1883";String username = "zhangsan";String password = "123";String clientid = "mqtt_java_client_02";// 创建MqttJava客户端对象MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());// 添加回调函数获取主题消息client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {  // 连接丢失时被调用System.out.println("connectionLost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {  // 接收到消息时被调用System.out.println("topic: " + topic);System.out.println("Qos: " + message.getQos());System.out.println("message content: " + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {  // 消息接收完成时被调用System.out.println("deliveryComplete---------" + token.isComplete());}});// 订阅主题client.connect(options);client.subscribe("a/d" , 2);while(true) ;}

1.2 spring-integration-mqtt

基础环境搭建

1、创建一个Spring Boot项目,并加入如下依赖:

<dependencies><!-- spring boot项目web开发的起步依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- spring boot项目集成消息中间件基础依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><!-- spring boot项目和mqtt客户端集成起步依赖 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.3</version></dependency><!-- lombok依赖 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- fastjson依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency></dependencies>

2、编写启动类

@EnableConfigurationProperties(value = MqttConfigurationProperties.class)
@SpringBootApplication
public class MqttDemoApplication {public static void main(String[] args) {SpringApplication.run(MqttDemoApplication.class , args) ;}}

3、在application.yml文件中添加如下配置

spring:mqtt:username: zhangsanpassword: 123url: tcp://localhost:1883subClientId: sub_client_id_123subTopic: atguigu/iot/lamp/linepubClientId: pub_client_id_123

4、创建实体类读取自定义配置

@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {private String username;private String password;private String url;private String subClientId ;private String subTopic ;private String pubClientId ;}

5、创建配置类配置链接工厂

@Configuration
public class MqttConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties ;@Beanpublic MqttPahoClientFactory mqttClientFactory(){// 创建客户端工厂DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();// 创建MqttConnectOptions对象MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(mqttConfigurationProperties.getUsername());options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});factory.setConnectionOptions(options);// 返回return factory;}}

订阅主题获取消息

具体步骤:

1、配置入站适配器

@Configuration
public class MqttInboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties ;@Autowiredprivate ReceiverMessageHandler receiverMessageHandler;/*** 配置消息传输通道* @return*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** 配置入站适配器*/@Beanpublic MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) {MqttPahoMessageDrivenChannelAdapter adapter  =new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getUrl() ,mqttConfigurationProperties.getSubClientId() ,mqttPahoClientFactory , mqttConfigurationProperties.getSubTopic().split(",")) ;adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter ;}/*** 配置入站消息处理器* @return*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler messageHandler() {return this.receiverMessageHandler ;}}

2、定义监听主题消息的处理器

@Component
public class ReceiverMessageHandler implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();String receivedTopicName = (String) headers.get("mqtt_receivedTopic");if("atguigu/iot/lamp/line".equals(receivedTopicName)) {System.out.println("接收到消息:" + message.getPayload());}}}

测试:通过MQTTX向atguigu/iot/lamp/line主题发送消息

向指定主题发送消息

具体步骤:

1、配置出站消息处理器

@Configuration
public class MqttOutboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties ;@Autowiredprivate MqttPahoClientFactory pahoClientFactory ;@Beanpublic MessageChannel mqttOutputChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutputChannel")public MessageHandler mqttOutboundMassageHandler() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigurationProperties.getUrl() ,mqttConfigurationProperties.getPubClientId() , pahoClientFactory ) ;messageHandler.setAsync(true);messageHandler.setDefaultQos(0);messageHandler.setDefaultTopic("default");return messageHandler ;}}

2、定义发送消息的网关接口

@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {/*** 发送mqtt消息* @param topic 主题* @param payload 内容*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送包含qos的消息* @param topic 主题* @param qos 对消息处理的几种机制。*          * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>*          * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>*          * 2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}

3、定义发送消息的服务类

@Component
@AllArgsConstructor
public class MqttMessageSender {private MqttGateway mqttGateway;/*** 发送mqtt消息* @param topic 主题* @param message 内容*/public void send(String topic, String message) {mqttGateway.sendToMqtt(topic, message);}/*** 发送包含qos的消息* @param topic 主题* @param qos 质量* @param message 消息体*/public void send(String topic, int qos, byte[] message){mqttGateway.sendToMqtt(topic, qos, message);}
}

3.智能灯泡案例

需求:

1、智能灯泡设备上线以后向MQTT服务端发送消息,后端服务从MQTT中获取消息记录设备信息到数据库中

2、后端微服务向MQTT服务端发送开灯或者关灯消息,设备端从MQTT中获取消息控制灯泡的开和关

3、设备端对灯泡进行开和关操作的时候向MQTT中发送消息,后端服务获取MQTT消息记录灯泡的开关状态

3.1 环境准备

具体步骤:

1、创建对应的数据库表

-- 智能灯泡设备表
CREATE TABLE `tb_lamp` (`id` bigint NOT NULL AUTO_INCREMENT,`deviceId` varchar(50) DEFAULT NULL,`status` int DEFAULT NULL COMMENT '1:上线  0:下线',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ,`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- 智能灯泡设备状态表
CREATE TABLE `tb_lamp_status` (`id` int NOT NULL AUTO_INCREMENT,`deviceId` varchar(50) DEFAULT NULL,`status` int DEFAULT NULL COMMENT '0: 关灯   1:开灯',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

2、在spring-integration-mqtt案例中加入如下依赖

<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version>
</dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.32</version>
</dependency>

3、在application.yml文件中加入如下依赖

spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.136.147:3306/lamp_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: 1234mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplmap-underscore-to-camel-case: truemapper-locations: classpath*:mapper/*Mapper.xml

4、通过mybatis的逆向工程生成tb_lamp和tb_lamp_status表对应的基础代码

5、在启动类上添加@MapperScan注解指定Mapper接口的包路径

3.2 服务端获取设备上线消息

接口说明

接口一:设备上线

当终端设备连接上EMQX以后,发送上线消息到EMQX服务端,说明如下:

主题: atguigu/iot/lamp/line
消息内容:{"deviceId": "xxxxxx","online": 1}
数据说明:deviceId: 设备idonline:   上线状态,1表示上线,0表示离线

业务代码

ReceiverMessageHandler类的代码进行如下改造:

@Component
public class ReceiverMessageHandler implements MessageHandler {@Autowiredprivate TbLampService tbLampService ;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();String receivedTopicName = (String) headers.get("mqtt_receivedTopic");if("atguigu/iot/lamp/line".equals(receivedTopicName)) {tbLampService.updateLampOnlineStatus(message.getPayload().toString()) ;        // 更新智能灯泡的上线状态}}}

TbLampServiceImpl类的代码进行如下改造:

@Service
public class TbLampServiceImpl extends ServiceImpl<TbLampMapper, TbLamp> implements TbLampService {@Overridepublic void updateLampOnlineStatus(String jsonInfo) {// 解析消息获取设备id和上线状态Map<String ,  Object> map = JSON.parseObject(jsonInfo, Map.class);String deviceId = map.get("deviceId").toString();Integer status = Integer.parseInt(map.get("online").toString());// 根据设备的id查询设备数据LambdaQueryWrapper<TbLamp> lambdaQueryWrapper = new LambdaQueryWrapper<>() ;lambdaQueryWrapper.eq(TbLamp::getDeviceid , deviceId) ;TbLamp tbLamp = this.getOne(lambdaQueryWrapper);if(tbLamp == null) {        // 设备不存在,新增设备tbLamp = new TbLamp() ;tbLamp.setDeviceid(deviceId);tbLamp.setStatus(status);this.save(tbLamp) ;}else {     // 设备已经存在,修改设备的状态tbLamp.setStatus(status);tbLamp.setUpdateTime(new Date());this.updateById(tbLamp) ;}}}

3.3 服务端发送关灯开灯消息到MQTT

接口说明

接口三:后端发送消息控制智能灯泡开关

后端可以发送控制灯泡状态消息到EMQX中,设备端监听指定主题获取消息,控制灯泡的开关状态,说明如下:

主题: atguigu/iot/lamp/server/status
消息内容:{"deviceId": "xxxxxx","status": 0}
数据说明:		status:	0:关灯   , 1:开灯

业务代码

@RestController
@RequestMapping(value = "/api/lamp")
public class LampApiController {@Autowiredprivate MqttMessageSender mqttMessageSender;@GetMapping(value = "/{deviceId}/{status}")public String sendStatusLampMsg(@PathVariable(value = "deviceId") String deviceId , @PathVariable(value = "status") Integer status) {Map<String , Object> map = new HashMap<>() ;map.put("deviceId" , deviceId) ;map.put("status" , status) ;String json = JSON.toJSONString(map);mqttMessageSender.send("atguigu/iot/lamp/server/status" , json);return "ok" ;}}

3.4 服务端获取设备开灯关灯消息

接口说明

接口四:设备端改变智能灯泡开关的状态,状态发给给后端,后端记录状态

主题:atguigu/iot/lamp/device/status
消息内容:{"deviceId": "xxxxx"  "status": 0}
数据说明:	deviceId:设备idstatus:0:关灯   , 1:开灯

业务代码

@Override
public void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();String receivedTopicName = (String) headers.get("mqtt_receivedTopic");if("atguigu/iot/lamp/line".equals(receivedTopicName)) {tbLampService.updateLampOnlineStatus(message.getPayload().toString()) ;        // 更新智能灯泡的上线状态}else if("atguigu/iot/lamp/device/status".equals(receivedTopicName)) {tbLampStatusService.saveDeviceStatus(message.getPayload().toString()) ;}
}
@Service
public class TbLampStatusServiceImpl extends ServiceImpl<TbLampStatusMapper, TbLampStatus> implements TbLampStatusService {@Overridepublic void saveDeviceStatus(String json) {// 获取消息内容Map<String , Object> map = JSON.parseObject(json, Map.class);String deviceId = map.get("deviceId").toString();Integer status = Integer.parseInt(map.get("status").toString());// 创建对象封装消息TbLampStatus tbLampStatus = new TbLampStatus() ;tbLampStatus.setDeviceid(deviceId);tbLampStatus.setStatus(status);this.save(tbLampStatus) ;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/61640.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

shell第一次作业

要求&#xff1a; 通过shell脚本分析部署nginx网络服务 1.接收用户部署的服务名称 2.判断服务是否安装 ​ 已安装&#xff1b;自定义网站配置路径为/www&#xff1b;并创建共享目录和网页文件&#xff1b;重启服务 ​ 没有安装&#xff1b;安装对应的软件包 3.测试 判断服务是…

1+X应急响应(网络)病毒与木马的处置:

病毒与木马的处置&#xff1a; 病毒与木马的简介&#xff1a; 病毒和木马的排查与恢复&#xff1a;

服务器数据恢复—热备盘未激活导致硬盘掉线的raid5阵列崩溃的数据恢复案例

服务器数据恢复环境&#xff1a; 某品牌X3850服务器中有一组由数块SAS硬盘组建的RAID5阵列&#xff0c;该阵列中有一块盘是热备盘。操作系统为linux redhat&#xff0c;上面跑着一个基于oracle数据库的oa。 服务器故障&#xff1a; 服务器raid5阵列中有一块硬盘离线&#xff0…

Eclipse 创建Dynamic web project项目-配置Tomcat服务器

1、new——>project: 2、选择web的 Dynamic web project项目: 3、 项目命名&#xff0c;选择new runtime(没有部署过web项目&#xff0c;一般tartget runtime选项里面是空的)&#xff1a; 4、完成1、2的路径选择&#xff1a; 5、完成两个选项操作后&#xff0c;点击finish &…

理解折半查找法

理解折半查找法&#xff1a;高效的查找算法 折半查找法&#xff08;又称二分查找法&#xff09;是一种高效的查找算法&#xff0c;用于查找一个已排序数组中的目标元素。与线性查找方法不同&#xff0c;折半查找每次都将搜索范围减半&#xff0c;从而大幅提升查找效率。本文将详…

VM虚拟机装MAC后无法联网,如何解决?

✨在vm虚拟机上&#xff0c;给虚拟机MacOS设置网络适配器。选择NAT模式用于共享主机的IP地址 ✨在MacOS设置中设置网络 以太网 使用DHCP ✨回到本地电脑上&#xff0c;打开 服务&#xff0c;找到VMware DHCP和VMware NAT&#xff0c;把这两个服务打开&#xff0c;专一般问题就…

力扣 LeetCode 236. 二叉树的最近公共祖先(Day10:二叉树)

解题思路&#xff1a; 后序遍历 注意&#xff1a; p和q其中一个就是它们的公共祖先的情况也考虑到了&#xff0c;假设q是公共祖先&#xff0c;遇到q就直接返回&#xff0c;相当于是下面一边为空&#xff0c;一边不为空的情况&#xff0c;返回不为空就一边即可 class Solutio…

前端速通(HTML)

1. HTML HTML基础&#xff1a; 什么是HTML&#xff1f; 超文本&#xff1a; "超文本"是指通过链接连接不同网页或资源的能力。HTML支持通过<a>标签创建超链接&#xff0c;方便用户从一个页面跳转到另一个页面。 标记语言&#xff1a; HTML使用一组预定义的标签…

论文阅读——Performance Evaluation of Passive Tag to Tag Communications(一)

文章目录 摘要一、互耦对监听器标签输入阻抗的影响A. 无限细偶极子互阻抗的理论研究B. 电细偶极子的情况&#xff1a;理论与模拟C. 印刷偶极子的情况&#xff1a;电磁模拟与测量 二、T2T 通信系统的性能评估总结 论文来源&#xff1a;https://ieeexplore.ieee.org/document/970…

Palo Alto Networks PAN-OS身份认证绕过漏洞复现(CVE-2024-0012)

0x01 产品描述: PAN-OS 是运行 Palo Alto Networks 下一代防火墙的软件。通过利用 PAN-OS 本机内置的关键技术(App-ID、Content-ID、设备 ID 和用户 ID),可以在任何时间、任何地点完全了解和控制所有用户和设备中正在使用的应用程序。0x02 漏洞描述: PAN-OS 设备管理 Web …

使用ENSP实现静态路由

一、双路由器静态路由 1.项目拓扑 2.项目实现 (1)路由器AR1配置 进入系统试图 sys将路由器命名为R1 sysname R1进入g0/0/0接口 int g0/0/0将g0/0/0接口IP地址配置为1.1.1.1/24 ip address 1.1.1.1 24进入g0/0/1接口 int g0/0/1将g0/0/1接口IP地址配置为192.168.1.1/24 ip ad…

Claude3.5-Sonnet和GPT-4o怎么选(附使用链接)

随着人工智能模型的不断进化&#xff0c;传统的评估标准已经逐渐变得陈旧和不再适用。以经典的“喝水测试”为例&#xff0c;过去广泛应用于检测模型能力&#xff0c;但现如今即便是国内的一些先进模型&#xff0c;也能够轻松答对这些简单的问题。因此&#xff0c;我们亟需引入…

uniapp+vue3+ts H5端使用Quill富文本插件以及解决上传图片反显的问题

uniappvue3ts H5端使用Quill富文本插件以及解决上传图片反显的问题 1.在项目中安装Quill npm i quill1.3.72.需要显示富文本的页面完整代码 <template><view><div ref"quillEditor" style"height: 65vh"></div></view> &…

QML —— 3种等待指示控件(附源码)

效果如下 说明 BusyIndicator应用于指示在加载内容或UI被阻止等待资源可用时的活动。BusyIndicator类似于一个不确定的ProgressBar。两者都可以用来指示背景活动。主要区别在于视觉效果,ProgressBar还可以显示具体的进度(当可以确定时)。由于视觉差异,繁忙指示器和不确定的…

数字后端零基础入门系列 | Innovus零基础LAB学习Day11(Function ECO流程)

###LAB 20 Engineering Change Orders (ECO) 这个章节的学习目标是学习数字IC后端实现innovus中的一种做function eco的flow。对于初学者&#xff0c;如果前面的lab还没掌握好的&#xff0c;可以直接跳过这节内容。有时间的同学&#xff0c;可以熟悉掌握下这个flow。 数字后端…

R语言绘图过程中遇到图例的图块中出现字符“a“的解决方法

R语言绘图过程中遇到图例的图块中出现字符的解决方法 因为我遇到这个问题的时候没在网上找到合适的方法&#xff0c;找到个需要付费的&#xff0c;算了。也许是因为问的方式不同&#xff0c;问了半天AI也回答出来&#xff0c;莫名有些烦躁&#xff0c;打算对代码做个分析&…

云服务器部署WebSocket项目

WebSocket是一种在单个TCP连接上进行全双工通信的协议&#xff0c;其设计的目的是在Web浏览器和Web服务器之间进行实时通信&#xff08;实时Web&#xff09; WebSocket协议的优点包括&#xff1a; 1. 更高效的网络利用率&#xff1a;与HTTP相比&#xff0c;WebSocket的握手只…

数字反向输出

数字反向输出 C语言代码C 代码Java代码Python代码 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 小明听到广播里的数字后&#xff0c;总喜欢反着念给妈妈听。请聪明的你将小明听到的数字反向输出。 输入 输入为一个整型的四位数n 输出 …

2024-11-19 kron积

若A[a11 a12; a21 a22]; B[b11 b12; b21 b22]; 则C[a11*b11 a12*b11 a21*b11 a22*b11; a11*b12 a12*b12 a21*b12 a22*b12; a11*b21 a12*b21 a21*b21 a22*b21; a11*b22 a12*b22 a21*b22 a22*b22] 用MATLAB实现 方法1&#xff1a; A [a11 a12; a21 a22]; B [b11 b12; b21 b22]…

Java多态的优势和弊端

1. public class text {public static void main(String[] args) {animal dnew dog();d.eat();// dog a (dog) d;//类似强制转换//a.lookhome();/* if(d instanceof dog){dog a(dog)d;a.lookhome();}else if(d instanceof cat){cat c(cat) d;c.work();}else{System.out.print…