Spring Integration + MQTT

1. 简介

Spring Integration:

Spring Integration是一个开源的Java库,用于构建基于消息的应用程序。它提供了一套丰富的组件和工具,使得开发者可以轻松地开发出可靠、灵活和可扩展的集成解决方案。以下是Spring Integration的一些主要用途:

  1. 企业服务总线(ESB): Spring Integration可以用来构建企业服务总线,它支持各种协议和消息格式,使得不同系统间的数据和事件可以轻松交换。

  2. 消息传递和解耦: 它支持在不同的应用程序组件之间进行异步消息传递,从而降低系统组件间的耦合度。

  3. 事件驱动架构: Spring Integration支持事件驱动的架构风格,允许系统对事件做出响应,而不是基于传统的请求-响应模型。

  4. 数据转换和路由: 提供数据转换和路由的功能,可以将数据从一种格式转换为另一种格式,并根据内容将消息路由到不同的目的地。

  5. 错误处理: 它提供了一套完整的错误处理机制,包括重试、补偿和消息存储等策略。

  6. 文件和数据库集成: 可以轻松地与文件系统和数据库进行集成,支持文件传输、数据库操作等场景。

  7. 外部系统适配: 通过提供各种适配器,Spring Integration可以与外部系统(如JMS、AMQP、HTTP、FTP等)进行集成。

  8. 批处理和任务调度: 支持批处理操作和任务调度,可以用于处理大量数据或定时任务。

  9. 模块化和可扩展性: 它的模块化设计使得开发者可以根据需要添加或替换组件,从而构建高度可扩展的系统。

  10. 多环境支持: 支持多种部署环境,包括本地应用、云环境和微服务架构。

  11. 开发和配置的简化: 通过提供声明式的配置和简化的编程模型,Spring Integration降低了开发复杂性,并缩短了开发周期。

  12. 社区和生态系统: 作为Spring家族的一部分,Spring Integration受益于活跃的社区和广泛的生态系统,提供了大量的资源和支持。

Spring Integration + MQTT:

Spring Integration与MQTT的集成是一个非常强大的组合,它允许开发者在Spring应用程序中轻松地实现MQTT协议的消息发布和订阅功能。以下是Spring Integration与MQTT集成的一些主要用途和优势:

  1. 轻量级消息传递: MQTT是一种轻量级的消息传递协议,特别适合带宽有限和延迟敏感的环境,如物联网(IoT)应用。Spring Integration通过提供MQTT通道适配器,使得在Spring应用程序中集成MQTT变得简单直接 。

  2. 简化配置: 通过Spring Integration,开发者可以使用声明式配置来定义MQTT的入站(订阅)和出站(发布)消息通道,而不需要深入了解MQTT客户端库的复杂性 。

  3. 支持MQTT v5: 从Spring Integration 5.5.5版本开始,支持MQTT v5协议,包括对MQTT v5特有的消息属性的支持,如消息过期间隔、响应主题等 。

  4. 灵活的消息处理: Spring Integration提供了强大的消息处理能力,包括消息转换、路由、聚合、分割等,这些都可以通过声明式配置轻松实现 。

  5. 错误处理和重连机制: Spring Integration提供了错误处理机制,包括请求处理建议,例如重试或断路器。同时,支持MQTT的自动重连机制,确保了消息传递的可靠性 。

  6. 与Spring生态系统的集成: 作为Spring家族的一部分,Spring Integration可以很容易地与其他Spring项目(如Spring Boot、Spring Cloud等)集成,提供了与Spring Security、Spring Data等的无缝集成 。

  7. 提高开发效率: Spring Integration的声明式配置和编程模型简化了消息系统开发,降低了开发复杂性,并缩短了开发周期 。

  8. 动态主题管理: Spring Integration允许在运行时动态添加和删除MQTT订阅主题,提供了更高的灵活性 。

  9. 事件驱动架构: 支持事件驱动的架构风格,允许系统对事件做出响应,而不是基于传统的请求-响应模型 。

2. 基本时序架构

        1. 监听到订阅topic有消息流程

        2. 生产者推送一条消息后,中间经过一系列流程后被消费者消费的完整流程

3. 接收消息

通常涉及以下几个步骤:

1. 配置MQTT连接: 首先,需要配置与MQTT代理(如EMQX)的连接。这通常涉及到配置一个MqttPahoClientFactory Bean,它负责创建和管理MQTT客户端连接。

@Bean
public MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(mqttConnectOptions());return factory;
}

2. 创建入站通道适配器: 使用MqttPahoMessageDrivenChannelAdapter创建一个入站通道适配器。这个适配器负责从MQTT代理订阅主题,并在接收到消息时将消息发送到Spring Integration的通道。

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {@Autowiredprivate MqttPahoClientFactory mqttClientFactory;@Resource(name = ChannelName.INBOUND)private MessageChannel inboundChannel;/*** Clients of inbound message channels.* @return*/@Bean(name = "adapter")public MessageProducerSupport mqttInbound() {MqttClientOptions options = MqttConfiguration.getBasicClientOptions();MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(options.getClientId() + "_consumer_" + System.currentTimeMillis(),mqttClientFactory, options.getInboundTopic().split(","));DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();// use byte types uniformlyconverter.setPayloadAsBytes(true);adapter.setConverter(converter);adapter.setQos(1);adapter.setOutputChannel(inboundChannel);// 添加钩子函数,确保在程序关闭时正确断开连接Runtime.getRuntime().addShutdownHook(new Thread(() -> {try {if (adapter != null) {adapter.stop();log.warn("[consumer] MQTT client stopped successfully.");}} catch (Exception e) {log.error("[consumer] MQTT client stopped with error: {}",e.getMessage(),e);}}));return adapter;}

3. 配置消息通道: 配置一个消息通道(如DirectChannel),用于传输从MQTT代理接收到的消息。

@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {return new ExecutorChannel(threadPool);
}

4. 设置消息监听器: 使用@ServiceActivator注解定义一个服务激活器,它将作为消息监听器处理接收到的消息。这个消息监听器可以是一个方法,这个方法将对通道中的消息进行处理。

5. 处理消息: 实现业务逻辑来处理消息。这通常涉及到从消息中提取数据,并执行所需的操作,例如更新数据库、调用服务或触发事件。

@Bean
@ServiceActivator(inputChannel = ChannelName.INBOUND)
public MessageHandler defaultInboundHandler() {return message -> {// 处理消息// log.info("The default channel does not handle messages." +//         "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +//         "\nPayload: " + message.getPayload());};
}

4. 发布信息

        发送MQTT消息通常是通过配置出站通道适配器(MqttOutboundChannelAdapter)来实现的。这个适配器负责将从Spring Integration通道中传来的消息发布到指定的MQTT主题上。

发送MQTT消息的步骤:

1. 配置MQTT客户端工厂(MqttPahoClientFactory: 这个工厂负责创建和管理MQTT客户端连接。

@Bean
public MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(mqttConnectOptions());return factory;
}

2. 配置MQTT出站通道适配器(MqttOutboundChannelAdapter: 这个适配器将消息通道中的消息发布到MQTT代理上。

@Configuration
public class MqttOutboundConfiguration {@Autowiredprivate MqttPahoClientFactory mqttClientFactory;@Bean@ServiceActivator(inputChannel = ChannelName.OUTBOUND)public MqttOutboundChannelAdapter mqttOutboundAdapter() {MqttOutboundChannelAdapter adapter = new MqttOutboundChannelAdapter("client_id", mqttClientFactory, "outputTopic");adapter.setQos(1); // 设置服务质量adapter.setAsync(true); // 异步发送消息return adapter;}
}

可以通过setDefaultTopic方法设置默认主题,这样在发送消息时如果没有指定主题,就会使用这个默认主题。

3. 发送消息到消息通道: 通过编程方式或通过其他Spring Integration组件,将消息发送到与MqttOutboundChannelAdapter绑定的消息通道。

@Autowired
private MessageChannel mqttOutboundChannel;public void sendMqttMessage(String payload) {mqttOutboundChannel.send(MessageBuilder.withPayload(payload).build());
}

注:

1. 要确定消息发送到哪一个主题,可以在发送消息时通过消息头MqttHeaders.TOPIC指定。如果没有指定,就会使用在MqttPahoMessageHandler中配置的默认主题。

@Autowired
private MessageChannel mqttOutboundChannel;public void sendMqttMessage(String topic, String payload) {mqttOutboundChannel.send(MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).build());
}

2. 通过使用IMqttMessageGateway接口去发送消息到OUTBOUND通道,再由MqttPahoMessageHandler去处理消息

@Component
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface IMqttMessageGateway {/*** Publish a message to a specific topic.* @param topic target* @param payload   message*/void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);/*** Use a specific qos to push messages to a specific topic.* @param topic     target* @param payload   message* @param qos   qos*/void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/56276.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jenkins 插件Publish Over SSH

一、安装插件 二、配置sshserver http://192.168.31.156:8080/manage/configure 三、添加自由风格:PublishOverSSHDemo 我们将工作目录:/var/lib/jenkins/workspace/PublishOverSSHDemo下的图片m3.jpeg 同步到目标143服务器目录:/root/imag…

SQL分类中的DDL

DDL(Data Definition Language):数据定义语言,用来定义数据库对象(数据库,表,字段)。 一、DDL语句操作数据库 1、查询所有数据库:show databases;(一般用大写&#xff…

OpenCV-人脸检测

文章目录 一、人脸检测流程二、关键方法三、代码示例四、注意事项 OpenCV是一个开源的计算机视觉和机器学习软件库,它提供了多种人脸检测方法,以下是对OpenCV人脸检测的详细介绍: 一、人脸检测流程 人脸检测是识别图像中人脸位置的过程&…

【Docker系列】Docker查看镜像架构

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

模态与非模态的对话框

本文学习自&#xff1a; 《Qt Creato快速入门》 #include "widget.h" #include <QApplication>int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); }1. #include "widget.h" #include "ui_w…

MySQL数据的导入

【图书推荐】《MySQL 9从入门到性能优化&#xff08;视频教学版&#xff09;》-CSDN博客 《MySQL 9从入门到性能优化&#xff08;视频教学版&#xff09;&#xff08;数据库技术丛书&#xff09;》(王英英)【摘要 书评 试读】- 京东图书 (jd.com) MySQL9数据库技术_夏天又到了…

小白也能学会的预测新模型!ReliefF特征选择+XGBoost回归!

小白也能学会的预测新模型&#xff01;ReliefF特征选择XGBoost回归&#xff01; 目录 小白也能学会的预测新模型&#xff01;ReliefF特征选择XGBoost回归&#xff01;预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab实现ReliefF-XGBoost多变量回归预测 1.excel数据…

有问必答!zabbix“专家坐诊”第259期问答

问题一 Q&#xff1a;现在监控项4万多&#xff0c;调整到多少比较合理 zabbix7.03&#xff1f; A&#xff1a;慢慢往上调&#xff0c;没有标准。 问题二 Q&#xff1a;想问下大家&#xff0c;zabbix的监控项怎么不能自动清除&#xff0c;比如说这次监控是A监控项&#xff0c;下…

如何通过构建对应的api服务器使Vue连接到数据库

一、安装数据库驱动 在后端安装 MySQL 数据库驱动&#xff0c;比如在 Node.js 环境中可以使用 mysql2 包来连接 MySQL 数据库。在项目目录下运行以下命令安装&#xff1a; npm install mysql2或者使用 yarn&#xff1a; yarn add mysql2二、创建数据库连接模块 创建一个专门…

Light灯光组件+组件的相关操作+游戏资源的加载

Light灯光组件 Type: Directional:平行光&#xff0c;模仿的是太阳光 Spot:聚光灯 Area:区域光 Color&#xff1a; 颜色值 Mode: RealTime:实时 Mix:混合 Baked:烘焙 Intersity: 光照强度 Indirect Multiplier:光照强度乘数 Shadow Type:影子设置&#xff1a;…

Java通过RAG构建专属知识问答机器人_超详细

RAG&#xff1a;融合检索与生成的文本精准生成技术 检索增强生成&#xff08;RAG&#xff09;是一种技术&#xff0c;它通过结合检索模型和生成模型来提高文本生成的准确性。具体来说&#xff0c;RAG首先利用检索模型从私有或专有的数据源中搜索相关信息&#xff0c;然后将这些…

智能优化算法-水循环优化算法(WCA)(附源码)

目录 1.内容介绍 2.部分代码 3.实验结果 4.内容获取 1.内容介绍 水循环优化算法 (Water Cycle Algorithm, WCA) 是一种基于自然界水循环过程的元启发式优化算法&#xff0c;由Shah-Hosseini于2012年提出。WCA通过模拟水滴在河流、湖泊和海洋中的流动过程&#xff0c;以及蒸发…

【load_file读文件】

一、文件操作基础 show 先试试 show variables;发现显示了三百多行的系统变量: 这是数据库的目录&#xff1a; mysql有多种编码方式&#xff0c;有数据库编码、连接时的编码、还有客户端的编码&#xff1a; 这里还有一个日志路径&#xff0c;这个日志是需要手动打开的&#…

CSMA/CA协议

802.11局域网在使用CSMA/CA的同时&#xff0c;还使用确认重传&#xff08;ARQ&#xff09;。这是因为无线信道的通信质量远不如有线信道的&#xff0c;因此无线站点每通过无线局域网发送完一帧后&#xff0c;要等到收到对方的确认帧后才能继续发送下一帧。这就是链路层确认。 帧…

C语言笔记 12

逻辑类型 bool&#xff1a;在“#include <stdbool.h>”之后就可以使用bool和true、false 并没有真正的bool量的类型 逻辑运算 逻辑运算是对逻辑量进行的运算&#xff0c;结果只有0或1逻辑量是关系运算或逻辑运算的结果 运算符描述示例结果!逻辑非!a如果a是true结果就是…

ARP欺骗的多种手法

学习参考&#xff1a; ARP欺骗的各种d玩法-CSDN博客 https://juejin.cn/post/7383702153892954164 一、什么是ARP欺骗 1.什么是ARP&#xff1f; ARP (Address Resolution Protocol) 是一种网络层协议&#xff0c;用于将 IP 地址转换为物理地址&#xff08;MAC 地址&#xff0…

paddlepaddle显存未正常释放

NVIDIA GPU 显存未正常释放 问题描述 paddlepaddle 训练过程出现问题中断等导致GPU显存没有释放。 情况1: 使用nvidia-smi -l查看显存占用情况&#xff0c;输出结果中没有显示PID,但是有显存占用。 解决方法 使用killall python 直接kill掉所有python进程。假如运行此命…

LINUX——内核移植、内核编译教程

Linux内核编译是一个将内核源代码转换成可在特定硬件架构上运行的二进制文件的过程。以下是编译Linux内核的一般步骤&#xff1a; 1、准备工作&#xff1a; 确保安装了必要的编译工具&#xff0c;如gcc、make、ncurses库&#xff08;用于make menuconfig&#xff09;等。 2、…

点云深度学习系列:4DenoiseNet——考虑时空维度的去雪模型

文章&#xff1a;4DenoiseNet: Adverse Weather Denoising From Adjacent Point Clouds 代码&#xff1a;https://github.com/alvariseppanen/4DenoiseNet 1&#xff09;摘要 可靠的点云数据对于感知任务至关重要&#xff0c;例如在机器人和自动驾驶应用中。恶劣天气会导致特定…

giugughk

c语言中的小小白-CSDN博客c语言中的小小白关注算法,c,c语言,贪心算法,链表,mysql,动态规划,后端,线性回归,数据结构,排序算法领域.https://blog.csdn.net/bhbcdxb123?spm1001.2014.3001.5343 给大家分享一句我很喜欢我话&#xff1a; 知不足而奋进&#xff0c;望远山而前行&am…