【实战】Spring Cloud Stream3.0 整合RocketMq

文章目录

    • 前言
    • 技术积累
      • Spring Cloud Stream3.0新特性
      • RocketMq简介
    • 实战演示
      • 引入Maven依赖
      • 增加application配置
      • 消息生产者
      • 消息消费者

前言

相信很多同学用使用过rocketmq消息中间件,且大多情况下是使用原生的rocketmq-spring-boot-starter 进行集成然后创建一个rocketMQTemplate发送的生产者和@RocketMQMessageListener监听的消费者。今天我们就不按常理出牌,使用Spring Cloud Stream来进行整合RocketMq。如果我们有在一个项目中需要引入多个MQ的需求,用Spring Cloud Stream简直不要太好。当然,我们是直接使用Spring Cloud Stream3.0,不再像之前低版本那样需要引入通道类进行指定,3.0版本可用直接字配置文件进行粘接绑定信道,简直不要太爽。

技术积累

Spring Cloud Stream3.0新特性

Spring Cloud Stream 3.0 引入了一些新特性,包括对新版本Spring Boot和Spring Cloud的支持,以及对反序列化错误处理的改进。
以下是一些主要的新特性:
支持Spring Boot 2.x和Spring Cloud 2020.0.x。
改进了消息中间件的错误处理,提供了更好的异常传播和提供了更多的配置选项来自定义错误处理。
提供了对函数式编程模型的支持。
提供了对Kafka消息传递保证的配置选项。
提供了对消息转换器的支持,可以在发送和接收消息之前进行自定义转换。

RocketMq简介

‌RocketMQ是‌Apache基金会下的一个开源分布式消息中间件,设计用于云原生环境,支持高吞吐量和流处理,广泛应用于金融级稳定性场景。 它具备以下核心特性:
云原生:RocketMQ设计为与云和容器技术(如Kubernetes)友好,支持无限弹性的扩缩。
高吞吐:能够保证万亿级别的吞吐量,满足微服务与大数据场景的需求。
流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎。
金融级稳定性:广泛用于交易核心链路,确保系统的稳定运行。
架构极简:采用零外部依赖的Shared-nothing架构,简化系统设计和维护。
生态友好:无缝对接微服务、实时计算、数据湖等周边生态,便于集成和使用。
支持多种消息类型:包括普通消息、顺序消息、事务消息、批量消息、定时(延时)消息、消息回溯等,满足不同业务场景需求。
易用性与灵活性:提供多种发送与消费模式,丰富的客户端支持,以及易于运维与管理的工具和界面。
在这里插入图片描述

实战演示

今天的重点不要RocketMq的使用,而是Spring Cloud Stream3.0如何整合RocketMq。以下是一个简单的整合DEMO,仅供学习使用,如果需要应用与生产环境需要增加一些额外的方案。比如死信或者消费失败重试机制等等。

引入Maven依赖

这里需要注意SpringBoot与SpriingCloud版本对应,SpringCloud版本与RocketMq Starter版本对应

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
</dependencies><!--rocketmq--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2.2.2.RELEASE</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.7.1</version></dependency></dependencies>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

增加application配置

testChannel可以作为输入输出信道

spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876binders:my-rocketmq:type: rocketmqfunction:definition: testChannelbindings:testChannel-in-0:binder: my-rocketmqdestination: test-rocket-topicgroup: test-rocket-groupcontent-type: text/plain# 设置spring cloud stream次数1,表示禁用,异常情况下只消费一次消息consumer:max-attempts: 1testChannel-out-0:binder: my-rocketmqdestination: test-rocket-topiccontent-type: text/plain

消息生产者

直接可以用过StreamBridge 进行手动发送

@RestController
@RequestMapping("/base")
public class BaseController {@Resourceprivate StreamBridge streamBridge;//@Resourceprivate MqChannel mqChannel;@GetMapping("/send")public Boolean sendMessage(String msg) {boolean send = streamBridge.send("testChannel-out-0", MessageBuilder.withPayload("rocket测试:" + msg).build());return true;}
}

消息消费者

直接监听testChannel通道,默认监听testChannel-input-0信道

/*** RocketChannel* @author senfel* @version 1.0* @date 2024/7/23 12:20*/
@Configuration
public class RocketChannel {/*** testChannel 消费者* @author senfel* @date 2024/7/23 12:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> testChannel(){return message -> {System.out.println("接收到消息Payload:" + message.getPayload());System.out.println("接收到消息Header:" + message.getHeaders());};}
}

测试用例
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/49632.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Three 三维矩阵(Matrix3)、四维矩阵(Matrix4)

三维矩阵&#xff08;Matrix3&#xff09; var matrix3 new THREE.Matrix3().set( 1,2,3,4,5,6,7,8,9); //而其内部elements则展示为&#xff1a; matrix3.elements [1,4,7,2,5,8,3,6,9]; 属性&#xff08;Properties&#xff09; # .elements : Array 矩阵列优先column-…

Spring中Bean的循环依赖

目录 定义&#xff1a; 循环依赖的后果&#xff1a; 一&#xff1a;三级缓存 1、大概的思路&#xff1a; 注意&#xff1a; 2、执行过程&#xff1a; A半完成&#xff1a; B完成&#xff1a; A完成&#xff1a; 注&#xff1a; 二&#xff1a;Lazy 定义&#xff1a; …

入门C语言只需一个星期(星期三)

点击上方"蓝字"关注我们 01、基本数据类型 char 1 字节 −128 ~ 127 单个字符/字母/数字/ASCIIsigned char 1 字节 −128 ~ 127 -unsigned char 1 字节 0 ~ 255 -int…

Vue数组操作之sort详解

在 Vue.js 中&#xff0c;sort() 方法用于对数组进行排序。它会改变原数组&#xff0c;并返回排序后的数组。默认情况下&#xff0c;sort() 方法按照字母顺序&#xff08;Unicode 编码顺序&#xff09;对数组中的元素进行排序。如果需要按照其他规则排序&#xff0c;可以传递一…

【SpringCloud】微服务远程调用OpenFeign

工作原理流程图 上代码 common中添加依赖&#xff1a; <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency><groupId>org.spri…

CV13_混淆矩阵、F1分数和ROC曲线

1.1 混淆矩阵Confusion Matrix 混淆矩阵&#xff08;Confusion Matrix&#xff09;是机器学习和统计学中用于描述监督学习算法性能的特定表格布局。它是一种特定类型的误差矩阵&#xff0c;可以非常直观地表示分类模型在测试数据集上的预测结果与实际结果之间的对比。 混淆矩…

【数据结构】初识集合框架

&#x1f387;&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了 博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳&#xff0c;欢迎大佬指点&#xff01; 人生格言: 当你的才华撑不起你的野心的时候,你就应该静下心来学习! 欢迎志同道合的朋友…

Python爬虫(6) --深层爬取

深层爬取 在前面几篇的内容中&#xff0c;我们都是爬取网页表面的信息&#xff0c;这次我们通过表层内容&#xff0c;深度爬取内部数据。 接着按照之前的步骤&#xff0c;我们先访问表层页面&#xff1a; 指定url发送请求获取你想要的数据数据解析 我们试着将以下豆瓣读书页…

Mysql (五)

create table info ( id int primary key, name varchar(10), score decimal(5,2), address varchar(20), hobbid int(5)); SELECT * FROM info; 排序语法&#xff1a;关键字排序 升序ASC 降序DESC 默认排序方式 升序 配合order by语法使用 SELECT * FROM info ORDER BY…

河南萌新联赛2024第(二)场:南阳理工学院

A 国际旅行Ⅰ D A*BBBB F 水灵灵的小学弟 H 狼狼的备忘录 I 重生之zbk要拿回属于他的一切 J 这是签到 ##A 国际旅行Ⅰ 链接&#xff1a;https://ac.nowcoder.com/acm/contest/87255/A 来源&#xff1a;牛客网 题目描述 很久很久以前&#xff0c;有 n n n 个国家&#xff0c;第…

【ffmpeg命令入门】一篇学会ffmpeg音频选项

文章目录 前言设置音频帧使用方法基本用法 示例示例1&#xff1a;提取前200帧音频示例2&#xff1a;结合其他选项使用 注意事项 设置音频采样率基本用法 示例示例1&#xff1a;调整采样率为48000 Hz示例2&#xff1a;降低采样率为22050 Hz示例3&#xff1a;转换视频文件中的音频…

首发!中科融合3D相机点云转halcon点云 C++代码

中科融合是国内第一家专注于“AI+3D”芯片技术的科技创新型企业。‌拥有完全自主研发的MEMS感知芯片和新一代低功耗AI芯片,‌致力于在5G时代推动具有边缘智能的3D感知设备的发展,‌从而促进智能3D产业链的爆发。‌中科融合的MEMS激光投射模组具有较大的视野和景深,‌以及较强…

字符的统计——423、657、551、696、467、535

423. 从英文中重建数字 最初思路 首先要有一个指针&#xff0c;对于3/4/5为一组地跳跃。起初想的是后瞻性&#xff0c;如果符合0-9任意&#xff0c;则更换index、跳跃。此时写了一个函数&#xff0c;用来判断s的截取段和0-9中有无符合。这个思路并没有进行下去&#xff0c;虽然…

昇思25天学习打卡营第九天|本地安装mindspore之一|Linux的系统在vmware上的安装以及mindspore的安装

课程已经学完了&#xff0c;打算再深入一些。初步的想法是&#xff0c;在本地安装&#xff0c;本地执行。 根据老师的指引&#xff0c;MindSpore官网&#xff0c;“https ://www.mindspore.cn/install/”&#xff0c;注意&#xff0c;因为csdn博客编辑器的原因&#xff0c;当我…

软考:软件设计师 — 6.信息安全

六. 信息安全 1. 加密技术与认证技术 &#xff08;1&#xff09;对称与非对称加密 关于对称与非对称加密算法的详细内容&#xff0c;可以参考文章&#xff1a;信息安全基础技术与原理 对称加密 对称加密也称为共享密钥加密。 对称加密中&#xff0c;加密和解密的密钥是同一…

【保姆级教程】跑通YOLOv8-multi-task多任务模型

一、YOLOV8多任务环境准备 1.1 下载安装最新的YOLOv8-multi-task代码 仓库地址: https://github.com/JiayuanWang-JW/YOLOv8-multi-task克隆指令:git clone https://githubfast.com/JiayuanWang-JW/YOLOv8-multi-task.git1.2 配置环境 pip install -r requirements.txt -i h…

IPython的捕获魔术:%%capture命令全攻略

IPython的捕获魔术&#xff1a;%%capture命令全攻略 在IPython和Jupyter Notebook中&#xff0c;%%capture是一个强大的魔术命令&#xff0c;它允许用户捕获并保存单元格的输出&#xff0c;包括stdout&#xff08;标准输出&#xff09;和stderr&#xff08;标准错误&#xff0…

laravel 8 、thinkphp数据库锁机制

lockForUpdate() 在使用时&#xff0c;如果只需要阻止另外程序对其进行修改&#xff0c;可以只使用 1 个lockForUpdate()&#xff1b; 当某个程序需要对其进行修改时&#xff0c;可以使用 lockForUpdate()&#xff0c;避免并发进行修改&#xff0c; 当其他程序也会需要对相同…

vivado IODELAY_GROUP

IODELAY_GROUP将IDELAYCTRL单元格及其关联的IDELAY和 ODELAY细胞允许正确放置和复制。 如果使用IODELAY_GROUP为IDELAYCTRL分配组名&#xff0c;还需要 使用相同的IODELAY_group属性将IDELAY或ODELAY单元格与组相关联。 重要提示&#xff1a;虽然IODELAY_GROUP可以包含多个单元…

【豆包Marscode体验官】揭秘MarsCode AI编辑助手:高效智能编辑新纪元之入门指导与最佳实践

文章目录 1. 概述2. 工具使用过程2.1 MarsCode插件简介2.2 安装和配置2.2.1 安装MarsCode插件2.2.2 配置MarsCode插件 2.3 各个功能的使用2.3.1 代码补全2.3.2 代码补全 Pro【操作提示&#xff0c;No suggestion from Model&#xff0c;不知道是不是版本的问题】2.3.3 代码生成…