Springboot整合物联网IOT的MQTT协议

准备工作 (下载EMQX服务端,相关客户端工具)

1. 服务端工具:

https://www.emqx.io/downloads?os=Windows

2. 客户端工具:

https://mqttx.app/zh#download

 <!--web依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><!--mqtt相关依赖--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>

自定义yml配置

server:port: 8989
#mqtt properties
mqtt:#uris 可以有多个 所以是个数组uris:- tcp://127.0.0.1:1883clientId: mqtt_test1topics:- demo- testusername: adminpassword: 123456timeout: 30keepalive: 60qos: 1

增加config配置读取yml文件 (使用了Lombok 需要自行添加pom依赖)

package com.huawen.mqtt.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @author:xjl* @date:2022/5/5 17:27* @Description: MQTT的配置类**/
@Component
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfiguration {/*** uris 服务器地址配置*/private String[] uris;/*** clientId*/private String clientId;/*** 话题*/private String[] topics;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接超时时长*/private Integer timeout;/*** keep Alive时间*/private Integer keepalive;/*** 遗嘱消息 QoS*/private Integer qos;
}

消费者配置

package com.huawen.mqtt.config;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;import javax.annotation.Resource;/*** @author:xjl* @date:2022/5/6 9:06* @Description: MQTT 消费端的配置**/
@Configuration
@Slf4j
public class MqttInBoundConfiguration {@Resourceprivate MqttConfiguration mqttProperties;//==================================== 消费消息==========================================///*** 入站通道** @return 消息通道对象 {@link MessageChannel}*/@Bean("input")public MessageChannel mqttInputChannel() {//直连通道return new DirectChannel();}/*** 创建MqttPahoClientFactory 设置MQTT的broker的连接属性 如果使用ssl验证 也需要此处设置** @return MQTT客户端工厂 {@link MqttPahoClientFactory}*/@Beanpublic MqttPahoClientFactory inClientFactory() {//设置连接属性DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(mqttProperties.getUris());options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepalive());// 接受离线消息  告诉代理客户端是否要建立持久会话   false为建立持久会话options.setCleanSession(false);//设置断开后重新连接options.setAutomaticReconnect(true);factory.setConnectionOptions(options);return factory;}/*** 入站** @return 消息提供者 {@link MessageProducer}*/@Beanpublic MessageProducer producer() {// Paho客户端消息驱动通道适配器,主要用来订阅主题  对inboundTopics主题进行监听//clientId 加后缀 不然会报retrying 不能重复MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_customer", inClientFactory(), mqttProperties.getTopics());adapter.setCompletionTimeout(5000);// Paho消息转换器DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();// 按字节接收消息// defaultPahoMessageConverter.setPayloadAsBytes(true);adapter.setConverter(defaultPahoMessageConverter);// 设置QoSadapter.setQos(mqttProperties.getQos());adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** 通过通道获取数据* ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。* tips:* 异步处理** @return 消息处理 {@link MessageHandler}*/@Bean@ServiceActivator(inputChannel = "input")public MessageHandler handler() {return message -> {log.info("收到的完整消息为--->{}", message);log.info("----------------------");log.info("message:" + message.getPayload());log.info("Id:" + message.getHeaders().getId());log.info("receivedQos:" + message.getHeaders().get(MqttHeaders.RECEIVED_QOS));String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);log.info("topic:" + topic);log.info("----------------------");};}
}

生产者配置

package com.huawen.mqtt.controller;import com.huawen.mqtt.bean.MyMessage;
import com.huawen.mqtt.inter.MqttGateway;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @author:xjl* @date:2022/5/6 9:17* @Description: mqtt发布消息controller**/
@RestController
public class MqttPublishController {@Resourceprivate MqttGateway mqttGateWay;@PostMapping("/send")public String send(@RequestBody MyMessage myMessage) {// 发送消息到指定主题mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();}
}

创建一个通用接口 用于发送数据

package com.huawen.mqtt.inter;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;/*** @author:xjl* @date:2022/5/6 9:20* @Description: 接口MqttGateway**/
@MessagingGateway(defaultRequestChannel = "out")
public interface MqttGateway {/*** 定义重载方法,用于消息发送** @param payload 负载*/void sendToMqtt(String payload);/*** 指定topic进行消息发送** @param topic   topic话题* @param payload 负载*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 指定topic和qos进行消息发送** @param topic   topic话题* @param qos     qos* @param payload 负载 (字符串类型)*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);/*** 指定topic和qos进行消息发送** @param topic   topic话题* @param qos     qos* @param payload 负载 (字节数组类型)*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

生产者测试controller

package com.huawen.mqtt.controller;import com.huawen.mqtt.bean.MyMessage;
import com.huawen.mqtt.inter.MqttGateway;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @author:xjl* @date:2022/5/6 9:17* @Description: mqtt发布消息controller**/
@RestController
public class MqttPublishController {@Resourceprivate MqttGateway mqttGateWay;@PostMapping("/send")public String send(@RequestBody MyMessage myMessage) {// 发送消息到指定主题mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();}
}

该文章参考 https://blog.csdn.net/m0_46689235/article/details/124606005

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/814620.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UnityShader学习计划

1.安装ShaderlabVS,vs的语法提示 2. 常规颜色是fixed 3.FrameDebugger调试查看draw的某一帧的全部信息&#xff0c;能看到变量参数的值

架构设计-权限系统之权限系统设计

系统设计权限系统 权限管控可以通俗的理解为权力限制&#xff0c;即不同的人由于拥有不同权力&#xff0c;他所看到的、能使用的可能不一样。对应到一个应用系统&#xff0c;其实就是一个用户可能拥有不同的数据权限&#xff08;看到的&#xff09;和操作权限&#xff08;使用…

前 5 名 iPhone 数据恢复软件评测

如今&#xff0c;我们似乎将整个生活都放在手机和移动设备上。他们用许多照片、备忘录、日历日期等记录了我们的生活&#xff0c;我们总是假设这些信息在我们需要时随时可以访问。但是&#xff0c;有许多情况会导致iPhone上的数据丢失&#xff0c;例如iPhone被盗&#xff0c;损…

JVM垃圾回收(GC)

目录 目录 1.GC 简介 1.1. 引言 1.2. 何为 GC 1.2.1. 手动 GC 1.2.2. 自动 GC 引用计数法 标记清除 2.GC入门分析 2.1.碎片整理 1)对象创建时&#xff0c;执行写入操作越来越耗时 2&#xff09;内存分配错误 2.2. 分代设想 2.3. 对象分配 对象内存分配过程 2.4. …

数据中心可视化平台:有效提升运维效率与管理水平

随着企业信息化建设的不断深入&#xff0c;数据中心作为支撑企业核心业务的重要基石&#xff0c;其运维管理的复杂性和挑战性也日益凸显。为了提升数据中心机房的管理水平和效率&#xff0c;实现精细化、专业化、规范化和自动化管理&#xff0c;构建数据中心可视化平台成为了一…

【鸿蒙开发】第二十章 Camera相机服务

1 简介 开发者通过调用Camera Kit(相机服务)提供的接口可以开发相机应用&#xff0c;应用通过访问和操作相机硬件&#xff0c;实现基础操作&#xff0c;如预览、拍照和录像&#xff1b;还可以通过接口组合完成更多操作&#xff0c;如控制闪光灯和曝光时间、对焦或调焦等。 2 …

RHCSA 模拟题(4)

请查阅1&#xff1a;RHCSA 模拟题-CSDN博客 请查阅2&#xff1a;RHCSA 模拟题&#xff08;2&#xff09;-CSDN博客 请查阅3&#xff1a;RHCSA 模拟题&#xff08;3&#xff09;-CSDN博客 在node2.example.com上执行以下任务 一、设置root密码 1、重启系统 2、将光标移动到…

《青少年成长管理2024》050 “成长目标:寻找世界的入口”1/2

《青少年成长管理2024》050 “成长目标&#xff1a;寻找世界的入口”1/2 一、蛋壳理论二、正向认知三、逆向认知。四、双向认知。 本节摘要&#xff1a;青少年在建立成长目标之前&#xff0c;需要具备一定的前提&#xff0c;就像寻找一个宝藏&#xff0c;要先找到一个入口&…

dijkstra + dp,PTA 天梯赛练习集L2-001 紧急救援

一、题目 1、题目描述 作为一个城市的应急救援队伍的负责人&#xff0c;你有一张特殊的全国地图。在地图上显示有多个分散的城市和一些连接城市的快速道路。每个城市的救援队数量和每一条连接两个城市的快速道路长度都标在地图上。当其他城市有紧急求助电话给你的时候&#xf…

5.8 mybatis之EnumTypeHandler详细使用

文章目录 1. 把java中枚举数据插入到数据库中2. 把数据库中值查询到java对象中 在 Java 中&#xff0c;枚举类型是一种特殊的类&#xff0c;当我们在数据库和 Java 对象之间进行映射时&#xff0c;通常需要将数据库中的某个字段&#xff08;如字符串或数字&#xff09;映射到 J…

7天八股速记之C++后端——Day 4

坚持7天&#xff0c;短期内快速完成C后端面试突击。每天10题&#xff0c;弥补后端八股知识缺漏&#xff0c;熟练掌握后端的高频考点&#xff0c;后端面试更有把握。 1. 一条 SQL 语句在数据库框架中的执行流程&#xff1f; 连接数据库&#xff1a; 客户端应用程序通过数据库连…

Github远程仓库改名字之后,本地git如何配置?

文章目录 缘由解决方案 缘由 今天在github创建一个仓库&#xff0c;备份一下本地电脑上的资料。起初随便起一个仓库名字&#xff0c;后来修改之。既然远程仓库改名&#xff0c;那么本地仓库需要更新地址。这里采用SSH格式。 解决方案 如果你的GitHub仓库改名了&#xff0c;你…

Python基于大数据的微博的舆论情感分析,微博评论情感分析可视化系统,附源码

博主介绍&#xff1a;✌Java徐师兄、7年大厂程序员经历。全网粉丝13w、csdn博客专家、掘金/华为云等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3fb; 不…

练习题(2024/4/13)

1长度最小的子数组 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 连续 子数组 [numsl, numsl1, ..., numsr-1, numsr] &#xff0c;并返回其长度。如果不存在符合条件的子数组&#xff0c;返回 0 。 示例 1&am…

2024.4.14每日一题

LeetCode 设计哈希集合 题目链接&#xff1a;705. 设计哈希集合 - 力扣&#xff08;LeetCode&#xff09; 题目描述 不使用任何内建的哈希表库设计一个哈希集合&#xff08;HashSet&#xff09;。 实现 MyHashSet 类&#xff1a; void add(key) 向哈希集合中插入值 key 。…

1. 软件是如何访问硬件的

1 软件是如何访问硬件的 操作系统作为硬件层的上层&#xff0c;是对硬件的管理和抽象。对于操作系统上面的运行库和应用程序来说&#xff0c;他们希望看到的是一个统一的硬件访问模式。作为应用程序开发者&#xff0c;不希望在开发应用程序的时候直接读写硬件端口、处理硬件中断…

Linux第89步_了解异步通知及其结构和函数

1、了解“异步通知” “异步通知”的核心就是信号。信号是采用软件模拟的“中断”&#xff0c;它由“驱动程序”主动向“应用程序”发送信号&#xff0c;并报告自己可以访问了&#xff0c;“应用程序”收到信号以后&#xff0c;就从“驱动设备”中读取或者写入数据。整个过程就…

数据库(4)

目录 16.MySQL主从复制&#xff1f; 17.MySQL主从的延迟是怎么解决的呢&#xff1f; 18.MySQL读写分离方案&#xff1f; 19.什么是Redis&#xff0c;为什么用Redis&#xff1f; 20.为什么Redis是单线程的以及为什么这么快&#xff1f; 16.MySQL主从复制&#xff1f; 主要涉…

杰发科技AC7840——CAN通信简介(3)_时间戳

0. 时间戳简介 时间戳表示的是收到该CAN消息的时刻&#xff0c;通过连续多帧的时间戳&#xff0c;可以计算出CAN消息的发送周期&#xff0c;也可以用于判断CAN消息是否被持续收到。 1. 使用步骤 注意分别是发送和接收的功能&#xff1a; 2. 现象分析_接收时间戳 看下寄存器的…

帝国cms仿《鳄鱼下载站》网站源码

仿《鳄鱼下载站》网站源码手机安卓软件网站模版 PHP网站源码 帝国cms内核 采用帝国cms7.5 环境PHPmysql 恢复数据库后如何修改密码: 双击表&#xff0c;进入对应的详细数据表&#xff0c;然后找到&#xff1a;www_96kaifa_com_enewsuser这个表&#xff0c;双击打开修改&…