SpringCloud之Stream框架集成RocketMQ消息中间件

        Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

     目前 Spring Cloud Stream只支持 RabbitMQ 和 Kafka 的自动化配置。

     Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.

在 SpringCloudStream 3.x 版本前是通过 @StreamListener 和 @EnableBinding 进行消息的发送和消费的,springCloudStream 3.x 版本后 @StreamListener 和 @EnableBinding 都打上了@Deprecated 注解,不建议使用了;后续的版本更新中替换成函数式的方式实现。

既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢?

通过 spring.cloud.stream.function.definition:名称的方式进行绑定 公开 topic。

不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个 topic拆封 和 绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 In 和 out 两个通道:

  • 输入 - + -in- + < index >

     例如:myTopic-in-0

  • 输出 - + -out- + < index >

       例如:myTopic-out-0

注意:这里的 functionName需要和代码声明的函数名称还有spring.cloud.stream.function.definition下的名称保持一致(后面还会在项目实战中展示一遍)

代码示例:

----------------------------------项目实战--------------------------------------

看下我们项目中的配置,配置文件是放在nacos上面的:

消息发送:

/*** @ClassName MessageParamParentDto* @Author zxd* @Version 1.0.0* @Description TODO* @CreateTime 2023/6/13 11:27 - 星期二*/
@Data
public class MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7963819193258646924L;private  String routeUrl;}

--------------------------------------------------------------------------------------------------------------

/*** @ClassName MessageParamDto* @Author kch* @Version 1.0.0* @Description 消息队列接收系统消息实体对象* @CreateTime 2022/9/18 15:16 - 星期日*/
@Data
public class MessageParamDto  extends MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7111819193258646924L;/*** 消息模板code*/@NotNull(message = "消息模板不能为空")private String templateCode;/*** 可变参数,必传字段* 该参数匹配模板字符串中的变量和URL中的变量,所以模板和URL中的变量名不能重复*/@NotNull(message = "参数不能为空")private Map<String, String> params;/*** 消息详情跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/
//    private String routerParams;/*** 消息操作跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/
//    private String contentPathParams;/*** 接收者租户*/@NotNull(message = "接收者租户ID不能为空")private Long tenantId;/*** 接收人*/@NotNull(message = "接收者用户ID不能为空")@Size(min = 1, message = "接收者用户ID不能为空")private List<RecipientUser> recipientUsers;@Valid@Data@AllArgsConstructor@NoArgsConstructorpublic static class RecipientUser implements Serializable {/*** 接收人id*/@NotNull(message = "接收者用户ID不能为空")private Long recipientId;/*** 接收人手机号*/@Pattern(regexp = RegexPool.MOBILE, message = "手机格式错误")private String phone;}}

-----------------------------------------------------------------------------------------------------------

/*** @ClassName MessageMqBinding* @Author zpp* @Version 1.0.0* @Description TODO* @CreateTime 2023/2/10 15:37 - 星期五*/
public interface MessageMqBinding {/*** 系统消息生产者交换机*/String MESSAGE_MQ_OUTPUT = "dyzsMessageProvider-out-0";
}

----------------------------------------------------------------------------------------

@Slf4j
@RestController
@RequestMapping("/mq")
public class MessageMqController {@Resourceprivate StreamBridge streamBridge;/*** @param :* @Author zpp* @Description 发送系统消息* @Date 2023/2/10 15:27* @Return com.zysy.common.api.entity.Result<java.lang.Boolean>*/@PostMappingpublic Result<Boolean> sendMessage(@RequestBody @Validated MessageParamDto dto) {log.info("接收到系统消息发送请求:{}", JSONObject.toJSONString(dto));MessageMQParamDto paramDto = new MessageMQParamDto(dto);paramDto.setCreateBy(UserUtil.getUserId());paramDto.setCreateDept(UserUtil.getDeptId());List<MessageMQParamDto> paramDtoList = new ArrayList<>();paramDtoList.add(paramDto);MessageBuilder builder = MessageBuilder.withPayload(paramDtoList).setHeader("Content-Type", "application/json");return Result.success(streamBridge.send(MessageMqBinding.MESSAGE_MQ_OUTPUT, builder.build()));}

------------------------------------------------------------------------------------------------------

消息消费:

          下图是在代码中配置的消息消费者,这里的函数名称要和上图中的function.definition配置的名称一样;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/100481.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

codesys 控制轴组程序

//轴组使能允许 IF AxisGroupControl.bPower AND NOT AxisGroupStatus.bPowerError THEN bPowerAllow:TRUE; ELSE bPowerAllow:FALSE; END_IF //轴组有效允许 IF AxisGroupControl.bEnable AND NOT AxisGroupControl.bDisable THEN bEnableAllow:TRUE; ELSE b…

SQL Server 简介与 Docker Compose 部署

今天我翻阅了在之前公司工作时的笔记&#xff0c;发现了有关数据库的一些记录。当时&#xff0c;我们的项目开始使用 Oracle 数据库&#xff0c;但后来由于一些项目需求的变更&#xff0c;我们切换到了 SQL Server 。值得一提的是&#xff0c;公司当时也开始采用 Docker 技术&a…

golang的切片使用总结一

举例1&#xff1a;make([]int, b) s : make([]int, 10) fmt.Printf("s:%v, len of s:%v, cap of s:%v \n", s, len(s), cap(s)) 打印结果&#xff1a;s:[0 0 0 0 0 0 0 0 0 0], len of s:10, cap of s:10 结论&#xff1a;make([]int, 10) 创建的切片是capacity(容…

c++模板库容器list vector map set操作和性能对比

文章目录 listvectormapset性能比较总结 list 列表&#xff08;list&#xff09;是C STL中的一种容器类型&#xff0c;它是一个双向链表&#xff0c;可以在任意位置高效地添加、删除、移动元素。 以下是一些常用的列表操作&#xff1a; 创建列表 #include <list> std…

Jetson Orin NX 开发指南(1): 系统烧录

一、SDK Manager SDK Manager 工具是 NVIDIA 官方推荐的烧写和管理 Jetpack 系统组件的一个图形化烧写工具&#xff0c;使用起来非常的简单方便&#xff0c;但是该软件需要在 x86 的 Ubuntu 18.04 或 Ubuntu 20.04 的系统上运行&#xff0c;因此我们需要准备一台安装了 Ubuntu…

Redisson程序化的配置方法

2.1. 程序化配置方法 Redisson程序化的配置方法是通过构建Config对象实例来实现的。例如&#xff1a; Config config new Config(); config.setTransportMode(TransportMode.EPOLL); config.useClusterServers()//可以用"rediss://"来启用SSL连接.addNodeAddress(…

记录:Unity脚本的编写3.0

目录 前言前置控制方法查看效果移动方式 前言 前面记录了一些通过脚本控制对象模型移动和通过用户的操作对模型进行变化的方法&#xff0c;那么为了让我们创造的不论是地形还是模型都拥有真实的物理引擎&#xff08;大雾&#xff09;&#xff0c;那么这次就使用脚本控制模型感…

函数reshape(-1,)里的-1的意思

reshape函数是对narray的数据结构进行维度变换&#xff0c;由于变换遵循对象元素个数不变&#xff0c;在进行变换时&#xff0c;假设一个数据对象narray的总元素个数为N&#xff0c;如果我们给出一个维度为&#xff08;m&#xff0c;-1&#xff09;时&#xff0c;我们就理解为将…

【Linux C】Linux如何执行一个程序(程序存储空间、系统调用、内核调用)

文章目录 一、程序存储空间1.1 C语言程序存储空间1.2 用户空间和内核空间1.3 用户模式和内核模式 二、内核调用-系统调用-C语言库函数2.1 系统调用和内核调用2.2 C语言库函数 三、Linux如何执行一个程序 一、程序存储空间 本节说的空间主要是指内存空间&#xff0c;即程序如何分…

链表(2)——带头双向循环链表

&#x1f341;一、链表的分类 &#x1f315;1.单向或者双向 &#x1f315;2.带头或者不带头&#xff08;有无哨兵&#xff09; &#x1f315;3.循环或者不循环 &#x1f315;4.无头单向非循环链表&#xff08;常用&#xff09; &#x1f315;5.带头双向循环链表&#xff08;常用…

案例分享:原生广告如何助力app实现高效变现收益的转化

原生广告是指将广告嵌入到APP的内容中&#xff0c;使其与APP内容融为一体&#xff0c;达到获得用户关注的效果。在形式上&#xff0c;原生广告并不像传统广告那样显眼&#xff0c;而是以一种更加自然的方式展现在用户面前。 它采用了与APP相似的设计风格和交互方式&#xff0c…

深度学习DAY1:神经网络NN;二元分类

深度学习笔记 DAY1 深度学习基本知识 1.神经网络 1.1 单一神经元 所有神经元将房屋大小size作为输入x,计算线性方程&#xff0c;结果取max&#xff08;0&#xff0c;y&#xff09;,输出预测房价y ReLU函数&#xff08;线性整流函数&#xff09;–max&#xff08;0&#xf…

Axios、SASS学习笔记

目录 前言 一、Axios基础认识 1、简介 2、相关文档 3、基本配置 4、基础快捷使用 二、Axios封装 1、公共配置文件 2、细化每个接口的配置 3、使用并发送请求 三、SASS 1、简介 2、相关文档 3、使用前奏 4、使用变量 5、嵌套规则 6、父级选择器标识 & 前言…

Linux基本指令(下)——“Linux”

各位CSDN的uu们好呀&#xff0c;今天&#xff0c;小雅兰的内容仍然是Linux中的基本指令啦&#xff0c;下面&#xff0c;让我们进入Linux的世界吧&#xff01;&#xff01;&#xff01; Cal指令 find指令&#xff1a;&#xff08;灰常重要&#xff09; -name grep指令 zip/un…

【置顶】关于博客的一些公告

所谓 万事开头难&#xff0c;最开始的两个专栏 《微机》 和 《骨骼动作识别》 定价 29.9 &#xff0c;因为&#xff1a; 刚开始确实比较困难&#xff0c;要把自己学的知识彻底搞懂讲给别人&#xff0c;还要 码字排版&#xff0c;从 Markdown 语法开始学起&#xff08;这都是 花…

机器学习基础-手写数字识别

手写数字识别&#xff0c;计算机视觉领域的Hello World利用MNIST数据集&#xff0c;55000训练集&#xff0c;5000验证集。Pytorch实现神经网络手写数字识别感知机与神经元、权重和偏置、神经网络、输入层、隐藏层、输出层mac gpu的使用本节就是对Pytorch可以做的事情有个直观的…

leetcode 139. 单词拆分

39. 单词拆分 给你一个字符串 s 和一个字符串列表 wordDict 作为字典。请你判断是否可以利用字典中出现的单词拼接出 s 。 注意&#xff1a;不要求字典中出现的单词全部都使用&#xff0c;并且字典中的单词可以重复使用。 示例 1&#xff1a; 输入: s "leetcode"…

java学习--day24(stream流)

文章目录 今天的内容1.Stream【难点】1.1获取流的对象1.2Stream流对象下面1.2.1count和forEach1.2.2filter方法1.2.3limit1.2.4map方法1.2.5skip1.2.6concat 1.3收集流 1.基于接口和抽象类的匿名内部类的写法 abstract class Person {public abstract void eat(); } public sta…

Pytorch因nn.Parameter导致实验不可复现的一种情况

文章首发见博客&#xff1a;https://mwhls.top/4871.html。 无图/格式错误/后续更新请见首发页。 更多更新请到mwhls.top查看 欢迎留言提问或批评建议&#xff0c;私信不回。 没解决&#xff0c;只是记录这种情况。 也可以多次实验取均值以避免结果复现。 场景 自己的模块中&a…

Hadoop2.0探讨

文章目录 8. Hadoop 再探讨8.1 Hadoop的优化与发展8.2 HDFS 的FA和Federation(Hadoop2.0新特性)8.2.1 HDFS HA8.2.2 HDFS Federation 8.3 YARN8.3.1 MapReduce1.0的缺陷8.3.2 Yarn设计思路8.3.3 Yarn体系结构8.3.4 Yarn工作流程8.3.5 Yarn框架和MapReduce1.0框架对比分析8.3.6 …