SpringBoot+RabbitMQ实现MQTT协议通讯

一、简介

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。此处使用RabbitMQ
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

二、环境准备

2.1 Erlang安装

使用rabbitMQ首先需要安装Erlang环境,因为rabbitMQ是用Erlang语言编写的。

2.1.1 下载安装

官网下载:https://www.erlang.org/patches/otp-26.0 (比较慢,不推荐)
在这里插入图片描述

百度网盘下载:https://pan.baidu.com/s/1xU4syn14Bh7QR-skjm_hOg (推荐)
提取码:az1t

2.1.2 环境变量

进入高级系统设置
在这里插入图片描述
在这里插入图片描述
环境变量: 变量名-ERLANG_HOME 变量值-文件安装路径
在这里插入图片描述
配置path: 配置完上面的之后,找到系统变量中的path点击编辑,然后新建:%ERLANG_HOME%\bin
在这里插入图片描述
验证: 进入cmd,输入 erl -version 显示版本号就说明安装成功
在这里插入图片描述

2.2 RabbtiMQ安装

2.2.1 下载安装

官网下载:http://www.rabbitmq.com/download.html
下载后一通傻瓜式安装即可。
在这里插入图片描述

2.2.2 环境变量

变量名-RABBITMQ_SERVER 变量值-文件安装路径
在这里插入图片描述
编辑path,点击新建按钮,输入%RABBITMQ_SERVER%\sbin,点击确定
在这里插入图片描述

2.2.3 安装mqtt插件

rabbitmq-plugins enable rabbitmq_mqtt

在这里插入图片描述

2.2.4 管理控制台安装

rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

2.2.5 访问测试

登录测试: 浏览器输入 http://localhost:15672 ,输入用户名:guest,密码:guest在这里插入图片描述

三、代码实现

3.1 引入依赖

<!-- rabbitmq -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><!--mqtt依赖包-->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version>
</dependency>

3.2 yml配置

# mqtt配置
mqtt:url: ***********username: ***********password: ***********# 间隔时间keep-alive-interval: 60# 超时时间completion-timeout: 30000# 会话保持,默认为falseclean-session: false# 自动连接,默认为trueautomatic-reconnect: true# 生产者配置producer:# 很重要client-id: producer1topic: demo-topic# 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次qos: 1# 消费者配置subscriber:# 很重要client-id: subscriber1topic: demo-topic# 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次qos: 0

3.3 配置属性类

package com.qiangesoft.mqtt.config;import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.stereotype.Component;/*** mqtt配置** @author qiangesoft* @date 2024-04-24*/
@Data
@Component
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperty {/*** 服务地址*/private String url;/*** 账号*/private String username;/*** 密码*/private String password;/*** 间隔时间*/private int keepAliveInterval;/*** 超时时间*/private int completionTimeout;/*** 会话保持,默认为false*/private boolean cleanSession;/*** 自动连接,默认为true*/private boolean automaticReconnect = true;/*** 生产者*/private Client producer = new Client();/*** 消费者*/private Client subscriber = new Client();@Datapublic class Client {/*** 客户端id*/private String clientId;/*** 默认主题*/private String topic;/*** 传输质量* QoS 0:最多分发一次* QoS 1:至少分发一次(默认)* QoS 2:只分发一次*/private int qos = 1;}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();// 连接参数MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{url});if (StringUtils.isNotBlank(this.username)) {options.setUserName(this.username);}if (StringUtils.isNotBlank(this.password)) {options.setPassword(this.password.toCharArray());}// 心跳时间options.setKeepAliveInterval(this.keepAliveInterval);// 断开是否自动重联options.setAutomaticReconnect(this.automaticReconnect);// 保持session,客户端上线后会接受到它离线的这段时间的消息options.setCleanSession(this.cleanSession);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//        options.setWill("willTopic", WILL_DATA, 2, false);factory.setConnectionOptions(options);return factory;}
}
package com.qiangesoft.mqtt.constant;/*** mqtt通用常量信息** @author qiangesoft* @date 2024-04-24*/
public class MqttConstant {/*** 生产者管道*/public static final String OUTBOUND_CHANNEL = "outboundChannel";/*** 消费者管道*/public static final String INBOUND_CHANNEL = "inboundChannel";
}

3.4 生产者

package com.qiangesoft.mqtt.producer;import com.qiangesoft.mqtt.config.MqttProperty;
import com.qiangesoft.mqtt.constant.MqttConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** mqtt生产者** @author qiangesoft* @date 2024-04-24*/
@Configuration
public class MqttProducerConfig {@Autowiredprivate MqttProperty mqttProperty;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;/*** 消息生产通道** @return*/@Bean(name = MqttConstant.OUTBOUND_CHANNEL)public MessageChannel outboundChannel() {return new DirectChannel();}/*** 消息发布** @return*/@Bean@ServiceActivator(inputChannel = MqttConstant.OUTBOUND_CHANNEL)public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperty.getProducer().getClientId(), mqttPahoClientFactory);messageHandler.setAsync(false);messageHandler.setDefaultQos(mqttProperty.getProducer().getQos());messageHandler.setDefaultTopic(mqttProperty.getProducer().getTopic());return messageHandler;}
}

3.5 消费者

package com.qiangesoft.mqtt.subscriber;import com.qiangesoft.mqtt.config.MqttProperty;
import com.qiangesoft.mqtt.constant.MqttConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;import java.util.Objects;/*** mqtt消费者** @author qiangesoft* @date 2024-04-24*/
@Slf4j
@Configuration
public class MqttSubscriberConfig {@Autowiredprivate MqttProperty mqttProperty;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;/*** 消息订阅通道** @return*/@Bean(name = MqttConstant.INBOUND_CHANNEL)public MessageChannel inboundChannel() {return new DirectChannel();}/*** 消息订阅通道绑定** @return*/@Beanpublic MessageProducer mqttInbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperty.getSubscriber().getClientId(),mqttPahoClientFactory, mqttProperty.getSubscriber().getTopic());adapter.setCompletionTimeout(mqttProperty.getCompletionTimeout());adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(mqttProperty.getSubscriber().getQos());adapter.setOutputChannel(inboundChannel());return adapter;}/*** 消息订阅** @return*/@Bean@ServiceActivator(inputChannel = MqttConstant.INBOUND_CHANNEL)public MessageHandler messageHandler() {return message -> {try {String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();log.info("订阅主题为: {}", topic);String payload = message.getPayload().toString();log.info("订阅接收到消息:{}", payload);} catch (Exception e) {e.printStackTrace();}};}}

3.6 消息发送网关

package com.qiangesoft.mqtt.service;import com.qiangesoft.mqtt.constant.MqttConstant;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** 消息发送网关** @author qiangesoft* @date 2024-04-24*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConstant.OUTBOUND_CHANNEL)
public interface MqttGateway {/*** 发送到mqtt** @param payload 消息内容*/void sendToMqtt(String payload);/*** 发送到mqtt** @param topic   主题* @param payload 消息内容*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送到mqtt** @param topic   主题* @param qos     qos* @param payload 消息内容*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}

3.7 发送测试

package com.qiangesoft.mqtt.controller;import com.qiangesoft.mqtt.service.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** 控制器** @author qiangesoft* @date 2024-04-24*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttGateway mqttGateway;@GetMapping("/send")public String send(String message) {mqttGateway.sendToMqtt(message);return "success";}
}

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/3496.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android在AMS中拦截某个Activity的启动

文章目录 Android在AMS中拦截某个具体Activity的启动方案一&#xff08;推荐&#xff09;&#xff1a;在ActivityTaskManagerService.startActivityAsUser方法中去作拦截方案二&#xff1a;在Dialog.show()方法中直接对这个包名所创建的Dialog做限制 Android在AMS中拦截某个具体…

@Slf4j vs LoggerFactory.getLogger(): 日志记录方式的对比与选择

前言 在Java开发中&#xff0c;日志记录是追踪系统行为、诊断问题和监控应用性能的关键工具。SLF4J&#xff08;Simple Logging Facade for Java&#xff09;作为一款流行的日志门面库&#xff0c;提供了统一的日志API&#xff0c;允许开发者灵活地对接多种底层日志实现。本文…

一文解读:阿里云 AI 基础设施的演进与挑战

云布道师 2024 年 4 月 18-19 日&#xff0c;2024 中国生成式 AI 大会在北京 JW 万豪酒店举行&#xff0c;阿里云高级技术专家、阿里云异构计算 AI 推理团队负责人李鹏受邀在【AI Infra】专场发表题为《AI 基础设施的演进与挑战》的主题演讲。李鹏从 AIGC 对云基础设施的挑战、…

HarmonyOS hsp制作与引用

1. HarmonyOS hsp制作与引用 1.1 介绍 HSP动态共享包&#xff08;模块&#xff09;,应用内HSP指的是专门为某一应用开发的HSP&#xff0c;只能被该应用内部其他HAP/HSP使用&#xff0c;用于应用内部代码、资源的共享。应用内HSP跟随其宿主应用的APP包一起发布&#xff0c;与该…

react —— useState 深入

基础用法 useState Hook 提供了这两个功能&#xff1a; State 变量 在第一次重新渲染期间&#xff0c;这将具有作为参数传递的值State setter 函数 set 函数将允许将状态的值更新为不同的值&#xff0c;如果 set 函数中提供的值不同&#xff0c;则将触发重新渲染。 注意&…

MyBatis基础操作

黑马程序员JavaWeb开发教程 文章目录 根据资料中提供的《tlias智能学习辅助系统》页面原型及需求&#xff0c;完成员工管理的需求开发一、环境准备1、准备数据库表emp2、创建一个新的springboot工程&#xff0c;选择引入对应的起步依赖&#xff08;mybatis、mysql驱动、lombok&…

【树莓派4B】如何点亮树莓派的LED灯

在之前一系列文章中&#xff0c;使用python、行人入侵检测&#xff0c;确没有使用树莓派的硬件。控制引脚进行输出&#xff1a; 如何写python点亮led灯闪烁&#xff0c;我灯接在gpio13,GPIO19,gpio26。我都想闪烁。 你可以使用Python的GPIO库来控制树莓派上的LED灯。首先&…

Linux 安装 nvm,并使用 Jenkins 打包前端

文章目录 nvm是什么nvm下载nvm安装设置 nvm 环境变量设置 Jenkins 打包命令 nvm是什么 nvm全英文也叫node.js version management&#xff0c;是一个nodejs的版本管理工具。nvm和n都是node.js版本管理工具&#xff0c;为了解决node.js各种版本存在不兼容现象可以通过它可以安装…

数仓开发LAG 和 LEAD 函数详细解析和用例

在做Iot大数据开发时&#xff0c;需要用到lag和lead函数来计算设备故障。下面详细解析lag和lead函数的作用和例子。 LAG 和 LEAD 函数是用于在 Spark SQL 中进行窗口函数操作时常用的两个函数&#xff0c;它们用于获取某一行在分组内的前一行或后一行的数值。下面详细解释它们…

element-ui el-tabs el-tab-pane 的使用

实现效果&#xff1a;1、去掉它的下划线 2、标签切换的蓝色线条 3、字体&#xff0c;鼠标滑过字体、点击的字体 4、如果数据超出&#xff0c;出现左右滑动标签 html <div class"activity"><div class"cont"><el-tabsv-if"search &…

实验 | RT-Thread:L1

1 线程间同步 同步是指按预定的先后次序进行运行&#xff0c;线程同步是指多个线程通过特定的机制&#xff08;如互斥量&#xff0c;事件对象&#xff0c;临界区&#xff09;来控制线程之间的执行顺序&#xff0c;也可以说是在线程之间通过同步建立起执行顺序的关系&#xff0…

视频抽帧转图片,opencv和ffmpeg效果测评

最近在做一个项目&#xff0c;需要从视频中抽帧转图片&#xff0c;于是对opencv和ffmpeg效果进行了测评。 文章目录 1. open cv2. ffmpeg3.抽帧效果对比 1. open cv open cv 视频抽图片的教程&#xff0c;推荐以下链接&#xff0c;抽的帧数可以自行调节&#xff01; 用pythono…

maya 设置半径 获取时长,设置时长

maya 选择当前节点的所有子节点&#xff0c;设置半径&#xff0c;获取动画时长&#xff0c;并且设置时长 python 脚本 import maya.cmds as cmds# 获取当前选择的节点 selected_nodes cmds.ls(selectionTrue)# 创建一个列表来存储所需的节点&#xff1a;当前选中的节点及其所…

四川易点慧电子商务:抖音小店引领潮流,先进模式打造电商新标杆

在当下数字化浪潮中&#xff0c;电子商务行业如日中天&#xff0c;四川易点慧电子商务有限公司以其独特的视角和前瞻性的战略布局&#xff0c;成功在抖音小店领域崭露头角&#xff0c;成为行业内的佼佼者。本文将深入剖析四川易点慧电子商务的成功秘诀&#xff0c;以及其在抖音…

Mysql当前列的值等于上一行的值累加前一列的值

Mysql当前列的值等于上一行的值累加前一列的值 前言&#xff1a;公司项目需要做数据可视化&#xff0c;统计一些数据&#xff0c;比如用户增长量&#xff0c;按每天分组&#xff0c;还要计算每天累加的用户量&#xff0c;一开始也是想了很久&#xff0c;不知道怎么做&#xff…

百度网盘svip白嫖永久手机2024最新教程

百度网盘&#xff08;原名百度云&#xff09;是百度推出的一项云存储服务&#xff0c;已覆盖主流PC和手机操作系统&#xff0c;包含Web版、Windows版、Mac版、Android版、iPhone版和Windows Phone版。用户将可以轻松将自己的文件上传到网盘上&#xff0c;并可跨终端随时随地查看…

从0到1:如何成为优秀产品经理?必备哪些硬核技能?

成为一名优秀的产品经理&#xff0c;需要深入理解产品设计的理念、出发点&#xff0c;以及如何把控和收集需求。首先&#xff0c;产品设计的出发点应该是工具化思维、以人为本的设计&#xff0c;以及以完成商业目的的设计。这意味着产品经理需要根据产品的不同阶段&#xff0c;…

基于SpringBoot和Leaflet的地震台网信息预警可视化

目录 前言 一、后台管理设计与实现 1、Model层 2、业务层 3、控制层 二、前端预警可视化设计与实现 1、网页结构 2、数据绑定 三、效果展示 总结 前言 在之前的几篇博客中&#xff0c;我们讲解了如何在Leaflet中进行预警信息提示效果&#xff0c;以及基于XxlCrawler进…

软件无线电系列——宽带中频带通采样(超外差接收体制)和射频直接带通采样定理(盲区采样定理)

本节目录 一、宽带中频带通采样(超外差接收体制) 1、宽带中频带通采样的原理 2、宽带中频带通采样的设计示例 二、射频直接带通采样定理 1、整带采样 2、射频直接带通采样本节内容 一、宽带中频带通采样(超外差接收体制) 1、宽带中频带通采样的原理 宽带中频带通采样(超外差接…

wstunnel (websocket模式ssh)

接上一篇 修改客户端运行参数 ssh -o ProxyCommand"./wstunnel client -L stdio://%h:%p ws://192.168.254.131:8080" 127.0.0.1 其中127.0.0.1为服务端的本地ssh访问&#xff0c;可以修改为通过服务端访问其他设备的ssh服务。例如&#xff1a; ssh -o ProxyComma…