【实战】Spring Cloud Stream3.0 整合RocketMq

文章目录

    • 前言
    • 技术积累
      • Spring Cloud Stream3.0新特性
      • RocketMq简介
    • 实战演示
      • 引入Maven依赖
      • 增加application配置
      • 消息生产者
      • 消息消费者

前言

相信很多同学用使用过rocketmq消息中间件,且大多情况下是使用原生的rocketmq-spring-boot-starter 进行集成然后创建一个rocketMQTemplate发送的生产者和@RocketMQMessageListener监听的消费者。今天我们就不按常理出牌,使用Spring Cloud Stream来进行整合RocketMq。如果我们有在一个项目中需要引入多个MQ的需求,用Spring Cloud Stream简直不要太好。当然,我们是直接使用Spring Cloud Stream3.0,不再像之前低版本那样需要引入通道类进行指定,3.0版本可用直接字配置文件进行粘接绑定信道,简直不要太爽。

技术积累

Spring Cloud Stream3.0新特性

Spring Cloud Stream 3.0 引入了一些新特性,包括对新版本Spring Boot和Spring Cloud的支持,以及对反序列化错误处理的改进。
以下是一些主要的新特性:
支持Spring Boot 2.x和Spring Cloud 2020.0.x。
改进了消息中间件的错误处理,提供了更好的异常传播和提供了更多的配置选项来自定义错误处理。
提供了对函数式编程模型的支持。
提供了对Kafka消息传递保证的配置选项。
提供了对消息转换器的支持,可以在发送和接收消息之前进行自定义转换。

RocketMq简介

‌RocketMQ是‌Apache基金会下的一个开源分布式消息中间件,设计用于云原生环境,支持高吞吐量和流处理,广泛应用于金融级稳定性场景。 它具备以下核心特性:
云原生:RocketMQ设计为与云和容器技术(如Kubernetes)友好,支持无限弹性的扩缩。
高吞吐:能够保证万亿级别的吞吐量,满足微服务与大数据场景的需求。
流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎。
金融级稳定性:广泛用于交易核心链路,确保系统的稳定运行。
架构极简:采用零外部依赖的Shared-nothing架构,简化系统设计和维护。
生态友好:无缝对接微服务、实时计算、数据湖等周边生态,便于集成和使用。
支持多种消息类型:包括普通消息、顺序消息、事务消息、批量消息、定时(延时)消息、消息回溯等,满足不同业务场景需求。
易用性与灵活性:提供多种发送与消费模式,丰富的客户端支持,以及易于运维与管理的工具和界面。
在这里插入图片描述

实战演示

今天的重点不要RocketMq的使用,而是Spring Cloud Stream3.0如何整合RocketMq。以下是一个简单的整合DEMO,仅供学习使用,如果需要应用与生产环境需要增加一些额外的方案。比如死信或者消费失败重试机制等等。

引入Maven依赖

这里需要注意SpringBoot与SpriingCloud版本对应,SpringCloud版本与RocketMq Starter版本对应

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
</dependencies><!--rocketmq--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2.2.2.RELEASE</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.7.1</version></dependency></dependencies>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

增加application配置

testChannel可以作为输入输出信道

spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876binders:my-rocketmq:type: rocketmqfunction:definition: testChannelbindings:testChannel-in-0:binder: my-rocketmqdestination: test-rocket-topicgroup: test-rocket-groupcontent-type: text/plain# 设置spring cloud stream次数1,表示禁用,异常情况下只消费一次消息consumer:max-attempts: 1testChannel-out-0:binder: my-rocketmqdestination: test-rocket-topiccontent-type: text/plain

消息生产者

直接可以用过StreamBridge 进行手动发送

@RestController
@RequestMapping("/base")
public class BaseController {@Resourceprivate StreamBridge streamBridge;//@Resourceprivate MqChannel mqChannel;@GetMapping("/send")public Boolean sendMessage(String msg) {boolean send = streamBridge.send("testChannel-out-0", MessageBuilder.withPayload("rocket测试:" + msg).build());return true;}
}

消息消费者

直接监听testChannel通道,默认监听testChannel-input-0信道

/*** RocketChannel* @author senfel* @version 1.0* @date 2024/7/23 12:20*/
@Configuration
public class RocketChannel {/*** testChannel 消费者* @author senfel* @date 2024/7/23 12:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> testChannel(){return message -> {System.out.println("接收到消息Payload:" + message.getPayload());System.out.println("接收到消息Header:" + message.getHeaders());};}
}

测试用例
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/49632.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring中Bean的循环依赖

目录 定义&#xff1a; 循环依赖的后果&#xff1a; 一&#xff1a;三级缓存 1、大概的思路&#xff1a; 注意&#xff1a; 2、执行过程&#xff1a; A半完成&#xff1a; B完成&#xff1a; A完成&#xff1a; 注&#xff1a; 二&#xff1a;Lazy 定义&#xff1a; …

入门C语言只需一个星期(星期三)

点击上方"蓝字"关注我们 01、基本数据类型 char 1 字节 −128 ~ 127 单个字符/字母/数字/ASCIIsigned char 1 字节 −128 ~ 127 -unsigned char 1 字节 0 ~ 255 -int…

【SpringCloud】微服务远程调用OpenFeign

工作原理流程图 上代码 common中添加依赖&#xff1a; <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency><groupId>org.spri…

CV13_混淆矩阵、F1分数和ROC曲线

1.1 混淆矩阵Confusion Matrix 混淆矩阵&#xff08;Confusion Matrix&#xff09;是机器学习和统计学中用于描述监督学习算法性能的特定表格布局。它是一种特定类型的误差矩阵&#xff0c;可以非常直观地表示分类模型在测试数据集上的预测结果与实际结果之间的对比。 混淆矩…

【数据结构】初识集合框架

&#x1f387;&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了 博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳&#xff0c;欢迎大佬指点&#xff01; 人生格言: 当你的才华撑不起你的野心的时候,你就应该静下心来学习! 欢迎志同道合的朋友…

Python爬虫(6) --深层爬取

深层爬取 在前面几篇的内容中&#xff0c;我们都是爬取网页表面的信息&#xff0c;这次我们通过表层内容&#xff0c;深度爬取内部数据。 接着按照之前的步骤&#xff0c;我们先访问表层页面&#xff1a; 指定url发送请求获取你想要的数据数据解析 我们试着将以下豆瓣读书页…

河南萌新联赛2024第(二)场:南阳理工学院

A 国际旅行Ⅰ D A*BBBB F 水灵灵的小学弟 H 狼狼的备忘录 I 重生之zbk要拿回属于他的一切 J 这是签到 ##A 国际旅行Ⅰ 链接&#xff1a;https://ac.nowcoder.com/acm/contest/87255/A 来源&#xff1a;牛客网 题目描述 很久很久以前&#xff0c;有 n n n 个国家&#xff0c;第…

字符的统计——423、657、551、696、467、535

423. 从英文中重建数字 最初思路 首先要有一个指针&#xff0c;对于3/4/5为一组地跳跃。起初想的是后瞻性&#xff0c;如果符合0-9任意&#xff0c;则更换index、跳跃。此时写了一个函数&#xff0c;用来判断s的截取段和0-9中有无符合。这个思路并没有进行下去&#xff0c;虽然…

昇思25天学习打卡营第九天|本地安装mindspore之一|Linux的系统在vmware上的安装以及mindspore的安装

课程已经学完了&#xff0c;打算再深入一些。初步的想法是&#xff0c;在本地安装&#xff0c;本地执行。 根据老师的指引&#xff0c;MindSpore官网&#xff0c;“https ://www.mindspore.cn/install/”&#xff0c;注意&#xff0c;因为csdn博客编辑器的原因&#xff0c;当我…

软考:软件设计师 — 6.信息安全

六. 信息安全 1. 加密技术与认证技术 &#xff08;1&#xff09;对称与非对称加密 关于对称与非对称加密算法的详细内容&#xff0c;可以参考文章&#xff1a;信息安全基础技术与原理 对称加密 对称加密也称为共享密钥加密。 对称加密中&#xff0c;加密和解密的密钥是同一…

【豆包Marscode体验官】揭秘MarsCode AI编辑助手:高效智能编辑新纪元之入门指导与最佳实践

文章目录 1. 概述2. 工具使用过程2.1 MarsCode插件简介2.2 安装和配置2.2.1 安装MarsCode插件2.2.2 配置MarsCode插件 2.3 各个功能的使用2.3.1 代码补全2.3.2 代码补全 Pro【操作提示&#xff0c;No suggestion from Model&#xff0c;不知道是不是版本的问题】2.3.3 代码生成…

从零开始构建你的第一个Python Web应用

在本文中&#xff0c;我们将带领你从零开始构建一个简单的Python Web应用。不需要任何先验知识&#xff0c;我们会一步步地指导你完成设置、框架选择、代码编写到部署的整个过程。无论你是Web开发新手还是希望扩展技能的老手&#xff0c;这篇文章都将为你提供一个实践操作的起点…

C语言 | Leetcode C语言题解之第278题第一个错误的版本

题目&#xff1a; 题解&#xff1a; int firstBadVersion(int n) {int left 1, right n;while (left < right) { // 循环直至区间左右端点相同int mid left (right - left) / 2; // 防止计算时溢出if (isBadVersion(mid)) {right mid; // 答案在区间 [left, mid] 中…

abc363+cf960div.2+牛客周赛49轮

C - Avoid K Palindrome 2 (atcoder.jp) 思路&#xff1a; 罗列出排列的每一种情况&#xff0c;再根据题目要求进行判断 代码&#xff1a; void solve() {ll n, k;cin >> n >> k;string s;vector<char>a;cin >> s;for (int i 0; i < n; i)a.pus…

C++与lua联合编程

C与lua联合编程 一、环境配置二、lua基本语法1.第一个lua和C程序2.基本数据类型和变量2.1 Nil2.2 Booleans2.3 Numbers2.4 String(最常用) 3. 字符串处理3.1 错误处理3.2 字符串长度:string.len3.3 字符串子串 :string.sub3.4 字符串查找: string.find3.5字符串替换: string.gs…

sortablejs使用filter属性后,元素的默认事件失效了

在使用vue-draggable-plus或者sortablejs时。为了防止某个元素可以拖拽&#xff0c;一般会加上filter属性 但是加上后&#xff0c;会发现这个元素不能点击了&#xff0c;像我这里的滑块也不能滑动了。 查了很久才发现&#xff0c;还有一个属性是 preventOnFilter: true, // 在触…

安宝特方案|解放双手,解决死角,AR带来质量监督新体验

AR质量监督 解放双手&#xff0c;解决死角 在当今制造业快速发展的背景下&#xff0c;质量监督成为确保产品高质量和完善的管理制度的关键环节。然而&#xff0c;传统的质量监督方式存在诸多挑战&#xff0c;如人工操作带来的效率低下、查岗不及时、摄像头死角等问题。 为了解…

浅谈C语言整型类数据在内存中的存储

1、整型类数据 C语言中的整型类数据都归类在整型家族中&#xff0c;其中包括&#xff1a;char、short、int、long、long long这5个大类&#xff0c;而每个大类中又分为两类signed和unsigned,这些都是C语言中的内置类型。以下重点基于char和int这两种类型的数据进行阐述&#x…

C++学习笔记-C++11中的智能指针

1.智能指针介绍 智能指针是C的特性用法&#xff0c;是一个类似指针功能的类对象&#xff0c;其目的是为了更好的管理动态分配的内存&#xff0c;避免出现内存泄漏、悬空指针等问题。C11的标准库里提供了三种智能指针模板类&#xff0c;分别是std::unique_ptr、std::shared_ptr…

苹果笔记本电脑如何优化系统 苹果电脑系统优化软件哪个好 cleanmymac x怎么用

随着时间的推移&#xff0c;你可能会发现你的MacBook运行速度变慢&#xff0c;甚至在执行一些基本任务时也会感觉到卡顿。这不仅影响了工作效率&#xff0c;也大大降低了使用体验。但别担心&#xff0c;优化你的Mac系统比做早餐还简单。本文将用一种轻松的风格向你介绍7种简单易…