使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息

简介: 本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。

头图.png

作者 | 辽天
来源 | 阿里巴巴云原生公众号

导读:本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。

在 Spring 生态中玩转 RocketMQ 系列文章:

  • 《如何在 Spring 生态中玩转 RocketMQ?》
  • 《罗美琪和春波特的故事...》
  • 《RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?》

本文配套可交互教程已登录阿里云知行动手实验室,PC 端登录 start.aliyun.com 在浏览器中立即体验。

1.png

通过本文,您将了解到:

  • Spring 的消息框架介绍
  • rocketmq-spring-boot 具体实现
  • 使用示例

前言

上世纪 90 年代末,随着 Java EE(Enterprise Edition) 的出现,特别是 Enterprise Java Beans 的使用需要复杂的描述符配置和死板复杂的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection), 控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的不足。

随着 Spring 的持续演进,基于注解(Annotation)的配置逐渐取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot 1.0.0 正式发布,它基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。这种简便直接快速构建和开发应用的过程,可以使用约定的配置并且简化部署,受到越来越多的开发者的欢迎。

Apache RocketMQ 是业界知名的分布式消息和流处理中间件,简单地理解,它由 Broker 服务器和客户端两部分组成:

其中客户端一个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;另外一个是消息的消费者客户端(Consumer),多个消费者可以组成一个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。

为了利用 Spring Boot 的快速开发和让用户能够更灵活地使用 RocketMQ 消息客户端,Apache RocketMQ 社区推出了 spring-boot-starter 实现。随着分布式事务消息功能在 RocketMQ 4.3.0 版本的发布,近期升级了相关的 spring-boot 代码,通过注解方式支持分布式事务的回查和事务消息的发送。

本文将对当前的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。

Spring 中的消息框架

顺便在这里讨论一下在 Spring 中关于消息的两个主要的框架,即 Spring Messaging 和 Spring Cloud Stream。它们都能够与 Spring Boot 整合并提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

1. Spring Messaging

Spring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能集。在与 Spring Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。

单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。

如果有兴趣深入的了解 Spring Messaging 及针对不同的消息产品的使用,推荐阅读这个文件。参考 Spring Messaging 的既有实现,RocketMQ 的 spring-boot-starter 中遵循了相关的设计模式并结合 RocketMQ 自身的功能特点提供了相应的 API(如顺序、异步和事务半消息等)。

2. Spring Cloud Stream

Spring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应用模型如下:

2.jpeg
该图片引自 spring cloud stream

Spring Cloud Stream 框架中提供一个独立的应用内核,它通过输入(@Input)和输出(@Output)通道与外部世界进行通信,消息源端(Source)通过输入通道发送消息,消费目标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专用的 Binder 实现与外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行编程,而不需要关心运行时具体的 Binder 绑定的消息中间件。在运行时,Spring Cloud Stream 能够自动探测并使用在 classpath 下找到的Binder。

这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在构建时包含进不同的 Binder。在更加复杂的使用场景中,也可以在应用中打包多个 Binder 并让它自己选择 Binder,甚至在运行时为不同的通道使用不同的 Binder。

Binder 抽象使得 Spring Cloud Stream 应用可以灵活的连接到中间件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置可以通过外部配置的属性和 Spring Boot 支持的任何形式来提供(包括应用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员可以在运行时动态选择通道连接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。

Binder SPI 的方式来让消息中间件产品使用可扩展的 API 来编写相应的 Binder,并集成到 Spring Cloud Steam 环境,目前 RocketMQ 还没有提供相关的 Binder,我们计划在下一步将完善这一功能,也希望社区里有这方面经验的同学积极尝试,贡献 PR 或建议。

spring-boot-starter的实现

在开始的时候我们已经知道,spring boot starter 构造的启动器对于使用者是非常方便的,使用者只要在 pom.xml引入starter 的依赖定义,相应的编译,运行和部署功能就全部自动引入。因此常用的开源组件都会为 Spring 的用户提供一个 spring-boot-starter 封装给开发者,让开发者非常方便集成和使用,这里我们详细的介绍一下 RocketMQ(客户端)的 starter 实现过程。

1. spring-boot-starter 的实现步骤

对于一个 spring-boot-starter 实现需要包含如下几个部分:

1)在 pom.xml 的定义

  • 定义最终要生成的 starter 组件信息
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
  • 定义依赖包

它分为两个部分:Spring 自身的依赖包和 RocketMQ 的依赖包。

3.png

2)配置文件类

定义应用属性配置文件类 RocketMQProperties,这个 Bean 定义一组默认的属性值。用户在使用最终的 starter 时,可以根据这个类定义的属性来修改取值,当然不是直接修改这个类的配置,而是 spring-boot 应用中对应的配置文件:src/main/resources/application.properties。

3)定义自动加载类

定义 src/resources/META-INF/spring.factories 文件中的自动加载类, 其目的是让 spring boot 更具文中中所指定的自动化配置类来自动初始化相关的 Bean、Component 或 Service,它的内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration

在 RocketMQAutoConfiguration 类的具体实现中,定义开放给用户直接使用的 Bean 对象包括:

  • RocketMQProperties 加载应用属性配置文件的处理类;
  • RocketMQTemplate 发送端用户发送消息的发送模板类;
  • ListenerContainerConfiguration 容器 Bean 负责发现和注册消费端消费实现接口类,这个类要求:由 @RocketMQMessageListener 注解标注;实现 RocketMQListener 泛化接口。

4)最后具体地进行 RpcketMQ 相关的封装

在发送端(producer)和消费端(consumer)客户端分别进行封装,在当前的实现版本提供了对 Spring Messaging 接口的兼容方式。

2. 消息发送端实现

1)普通发送端

发送端的代码封装在 RocketMQTemplate POJO 中,下图是发送端的相关代码的调用关系图:

4.png

为了与 Spring Messaging 的发送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象类,来支持相关的消息转换和发送方法,这些方法最终会代理给 doSend() 方法、doSend() 以及 RocoketMQ 所特有的一些方法如异步,单向和顺序等方法直接添加到 RoketMQTempalte 中,这些方法直接代理调用到 RocketMQ 的 Producer API 来进行消息的发送。

2)事务消息发送端

对于事务消息的处理,在消息发送端进行了部分的扩展,参考上面的调用关系类图。

RocketMQTemplate 里加入了一个发送事务消息的方法 sendMessageInTransaction(),并且最终这个方法会代理到 RocketMQ 的 TransactionProducer 进行调用,在这个 Producer 上会注册其关联的 TransactionListener 实现类,以便在发送消息后能够对 TransactionListener 里的方法实现进行调用。

3. 消息消费端实现

5.png

在消费端 Spring-Boot 应用启动后,会扫描所有包含 @RocketMQMessageListener 注解的类(这些类需要集成 RocketMQListener 接口,并实现 onMessage()方法),这个 Listener 会一对一的被放置到。

DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQ Consumer 对象,启动并监听定制的 Topic 消息,如果有消费消息,则回调到 Listener 的 onMessage() 方法。

使用示例

上面的一章介绍了 RocketMQ 在 spring-boot-starter 方式的实现,这里通过一个最简单的消息发送和消费的例子来介绍如何使这个 rocketmq-spring-boot-starter。

1. RocketMQ 服务端的准备

1)启动 NameServer 和 Broker

要验证 RocketMQ 的 Spring-Boot 客户端,首先要确保 RocketMQ 服务正确的下载并启动。可以参考 RocketMQ 主站的快速开始来进行操作。确保启动 NameServer 和 Broker 已经正确启动。

2)创建实例中所需要的 Topics

在执行启动命令的目录下执行下面的命令行操作:

bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic

2. 编译 rocketmq-spring-boot-starter

目前的 spring-boot-starter 依赖还没有提交的 Maven 的中心库,用户使用前需要自行下载 git 源码,然后执行 mvn clean install 安装到本地仓库。

git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-spring-boot-starter
mvn clean install

3. 编写客户端代码

用户如果使用它,需要在消息的发布和消费客户端的 maven 配置文件 pom.xml 中添加如下的依赖:

6.png

属性 spring-boot-starter-rocketmq-version 的取值为:1.0.0-SNAPSHOT, 这与上一步骤中执行安装到本地仓库的版本一致。

1)消息发送端的代码

发送端的配置文件 application.properties:

7.png

发送端的 Java 代码:

8.png

2)消息消费端代码

消费端的配置文件 application.properties:

9.png

消费端的 Java 代码:

10.png

这里只是简单的介绍了使用 spring-boot 来编写最基本的消息发送和接收的代码,如果需要了解更多的调用方式,如: 异步发送,对象消息体,指定 tag 标签以及指定事务消息,请参看 github 的说明文档和详细的代码。我们后续还会对这些高级功能进行陆续的介绍。

作者简介

辽天,阿里巴巴技术专家,Apache RocketMQ 内核控,拥有多年分布式系统研发经验,对 Microservice、Messaging 和 Storage 等领域有深刻理解, 目前专注 RocketMQ 内核优化以及 Messaging 生态建设。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/513691.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

回归的误差服从正态分布吗_盘点10大回归类型:总有一款深得你心

全文共2507字&#xff0c;预计学习时长5分钟除了统计模型和其他的一些算法&#xff0c;回归是机器学习成功运行的重要构成要素。回归的核心是寻找变量之间的关系&#xff0c;而机器学习需要根据这种关系来预测结果。显然&#xff0c;任何称职的机器学习工程师都应重视回归&…

What‘s new in dubbo-go v1.5.6

简介&#xff1a; dubbogo 社区近期发布了 dubbogo v1.5.6。该版本和 dubbo 2.7.8 对齐&#xff0c;提供了命令行工具&#xff0c;并提供了多种加载配置的方式。 作者 | 铁城 dubbo-go 社区 committer 来源 | 阿里巴巴云原生公众号 dubbogo 社区近期发布了 dubbogo v1.5.6。该…

华为彭松:基于C.A.F模型构建联接竞争力,创造新增长

10月19日&#xff0c;第七届全球超宽带高峰论坛&#xff08;Ultra-Broadband Forum 2021&#xff09;在迪拜开幕。期间&#xff0c;华为运营商BG Marketing与解决方案销售部总裁彭松发表了题为“联接&#xff0c;新增长”的主题演讲&#xff0c;定义并深入探讨了C.A.F&#xff…

关于写文章的一点经验

简介&#xff1a; 过去的一年&#xff0c;借着《如何画好一张架构图&#xff1f;》、《2020总结&#xff08;个人篇&#xff09;&#xff1a;关于个人成长的再认知》以及《2020 总结&#xff08;团队篇&#xff09;&#xff1a;招之即来&#xff0c;来之即战&#xff0c;战之必…

倒计时 3 天!1024 程序员节全日程曝光,105 场深度演讲点燃数字经济新时代

湘江之滨&#xff0c;岳麓山下&#xff0c;一年前&#xff0c;我们于此完成了一场备受业界关注的硬核技术与开源文化深度融合的大型技术大会——长沙中国1024程序员节&#xff0c;国内顶尖技术专家学者齐聚千年书院&#xff0c;九大操作系统掌门人共话开源技术创新、操作系统新…

13新功能_新功能简介|MySQL8.0数据查询脱敏

数据库管理员会负责维护数据的隐私和完整性。针对数据的脱敏&#xff0c;通常的方案是&#xff1a;应用端实现或者引入加密机等。不过现在MySQL8.0实现了数据脱敏这个功能&#xff0c;可以减少应用的复杂性、减少开发的工作量&#xff0c;也能友好的保护了数据的隐私和完整性。…

贝壳基于 Flink 的实时计算演进之路

简介&#xff1a; 贝壳找房在实时计算之路上的平台建设以及实时数仓应用。 摘要&#xff1a;贝壳找房大数据平台实时计算负责人刘力云带来的分享内容是贝壳找房的实时计算演进之路&#xff0c;内容如下&#xff1a; 发展历程平台建设实时数仓及其应用场景事件驱动场景未来规划G…

python动态规划详解_python----动态规划

不能放弃治疗,每天都要进步&#xff01;&#xff01; 什么时候使用动态规划呢&#xff1f; 1. 求一个问题的最优解 2. 大问题可以分解为子问题&#xff0c;子问题还有重叠的更小的子问题 3. 整体问题最优解取决于子问题的最优解&#xff08;状态转移方程&#xff09; 4. 从上往…

Flink 在唯品会的实践

简介&#xff1a; Flink 在唯品会的容器化实践应用以及产品化经验。 唯品会自 2017 年开始基于 k8s 深入打造高性能、稳定、可靠、易用的实时计算平台&#xff0c;支持唯品会内部业务在平时以及大促的平稳运行。现平台支持 Flink、Spark、Storm 等主流框架。本文主要分享 Flink…

1024 程序员节专题论坛来袭,聚焦企业级开源数据库 openGauss

技术驱动下&#xff0c;现代企业快速发展&#xff0c;产生海量的数据。被称为基础软件三驾马车之一的数据库&#xff0c;一直处于 IT 系统的核心地位&#xff0c;并在技术发展中不断变化。基础数据是“十四五”的重点关注方向&#xff0c;中国数据库正在快速发展崛起&#xff0…

6 张图带你彻底搞懂分布式事务 XA 模式

简介&#xff1a; XA 协议是由 X/Open 组织提出的分布式事务处理规范&#xff0c;主要定义了事务管理器 TM 和局部资源管理器 RM 之间的接口。目前主流的数据库&#xff0c;比如 oracle、DB2 都是支持 XA 协议的。 作者 | 朱晋君 来源 | 阿里巴巴云原生公众号 XA 协议是由 X/O…

龙蜥降世,神龙升级,阿里云投入 20 亿发力操作系统

作者 | 贾凯强、伍杏玲 出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcloud&#xff09;10 月 20 日&#xff0c;阿里巴巴云栖大会继续在杭州进行&#xff0c;与开幕第一天的主论坛不同&#xff0c;第二天活动的主论坛更加聚焦与技术领域和技术实践。20 日上午&#…

连续三年入围 Gartner 容器竞争格局,阿里云容器服务新布局首次公开

简介&#xff1a; 近日&#xff0c;国际知名信息技术咨询机构 Gartner 发布 2021 年容器竞争格局报告&#xff0c;阿里云成为国内唯一连续三年入选的中国企业&#xff0c;产品丰富度与成熟度持续保持全球领先水平。 来源 | 阿里巴巴云原生公众号 近日&#xff0c;国际知名信息…

双向可控硅触发电路图大全

双向可控硅触发电路图一&#xff1a; 为了提高效率&#xff0c;使触发脉冲与交流电压同步&#xff0c;要求每隔半个交流电的周期输出一个触发脉冲&#xff0c;且触发脉冲电压应大于4V&#xff0c;脉冲宽度应大于20us.图中BT为变压器&#xff0c;TPL521-2为光电耦合器&#xff…

视图计算背后的技术架构思考

简介&#xff1a; 5G时代海量视图计算场景&#xff0c;阿里云边缘计算节点聚焦视频上云和处理方向&#xff0c;阿里云高级技术专家为您解读海量视图计算背后的技术与架构能力。 作者&#xff1a;胡帆 数据载体、算力分布正在根本性变化 视频和图片因其强大的信息承载力&…

Graph + AI 2021全球峰会圆满落幕 TigerGraph企业版3.2发布

中国上海&#xff0c;2021年10月22日——由企业级可扩展图分析平台TigerGraph主办的“图创未来无界精彩”Graph AI 2021中国峰会于前日圆满落幕。本次峰会超3500人参与&#xff0c;较往届增长340%&#xff0c;与会者包括来自耐克、特斯拉、联合利华、西门子、通用电气等上百家…

重磅发布 | 阿里云视图计算,边缘计算的主“战”场

简介&#xff1a; 云计算情报局第10期&#xff0c;阿里云产品专家云觉对新产品——视图计算的产品设计背景、产品功能以及应用场景和价值进行了全面的在线揭秘&#xff0c;带领网友探索全新“视”界。 近日云计算情报局第10期&#xff0c;阿里云产品专家云觉对新产品——视图计…

两个对象数组交集_yiduobo的每日leetcode 349.两个数组的交集 amp;amp; 350.两个数组的交集II...

祖传的手艺不想丢了&#xff0c;所以按顺序写一个leetcode的题解。计划每日两题&#xff0c;争取不卡题吧349.两个数组的交集https://leetcode-cn.com/problems/intersection-of-two-arrays/350.两个数组的交集II https://leetcode-cn.com/problems/intersection-of-two-arrays…

flink sql设置并行度_Flink集成Hivestream模式用例

01背景基于前面的文章Flink集成hive bath模式用例knowfarhhy&#xff0c;公众号&#xff1a;大数据摘文Flink 集成Hive&#xff0c;我们继续介绍stream模式下的用例。02流模式读取HiveEnvironmentSettings bsSettings EnvironmentSettings.newInstance().useBlinkPlanner().in…

微软副总裁、Kubernetes 头号贡献者的他,给云原生带来奇迹!

我们做了一个街头采访&#xff0c;调查路人眼中的程序员到底是怎样的&#xff1f;提到程序员&#xff0c;大家似乎都有刻板印象&#xff1a;总是格子衬衫牛仔裤双肩包打扮&#xff0c;总是埋头敲代码&#xff0c;加班是常态……谁说程序员呆板木讷&#xff0c;只会埋头敲一行行…