如何在优雅地Spring 中实现消息的发送和消费

本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个spring-boot-starter工具包来配置,发送和消费RocketMQ消息。

作者简介:辽天,阿里巴巴技术专家,Apache RocketMQ 内核控,拥有多年分布式系统研发经验,对Microservice、Messaging和Storage等领域有深刻理解, 目前专注 RocketMQ 内核优化以及 Messaging 生态建设。

通过本文,您将了解到:

  • Spring的消息框架介绍
  • rocketmq-spring-boot具体实现
  • 使用示例

插播一条广告:本周六下午,Apache RocketMQ 开发者沙龙将来到杭州,欢迎大家到现场,活动详情请点击“阅读原文”。

前言

上世纪90年代末,随着Java EE(Enterprise Edition)的出现,特别是Enterprise Java Beans的使用需要复杂的描述符配置和死板复杂的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的XML配置和普通Java对象(Plain Old Java Objects)的Spring技术应运而生,依赖注入(Dependency Injection), 控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统Java企业及版本的不足。

随着Spring的持续演进,基于注解(Annotation)的配置逐渐取代了XML文件配置, 2014年4月1日,Spring Boot 1.0.0正式发布,它基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署Spring应用,并能通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。这种简便直接快速构建和开发应用的过程,可以使用约定的配置并且简化部署,受到越来越多的开发者的欢迎。

Apache RocketMQ是业界知名的分布式消息和流处理中间件,简单地理解,它由Broker服务器和客户端两部分组成:

其中客户端一个是消息发布者客户端(Producer),它负责向Broker服务器发送消息;

另外一个是消息的消费者客户端(Consumer),多个消费者可以组成一个消费组,来订阅和拉取消费Broker服务器上存储的消息。

为了利用Spring Boot的快速开发和让用户能够更灵活地使用RocketMQ消息客户端,Apache RocketMQ社区推出了spring-boot-starter实现。随着分布式事务消息功能在RocketMQ 4.3.0版本的发布,近期升级了相关的spring-boot代码,通过注解方式支持分布式事务的回查和事务消息的发送。

本文将对当前的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个spring-boot-starter工具包来配置,发送和消费RocketMQ消息。

Spring 中的消息框架

顺便在这里讨论一下在Spring中关于消息的两个主要的框架,即Spring Messaging和Spring Cloud Stream。它们都能够与Spring Boot整合并提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

2.1 Spring Messaging

Spring Messaging是Spring Framework 4中添加的模块,是Spring与消息系统集成的一个扩展性的支持。它实现了从基于JmsTemplate的简单的使用JMS接口到异步接收消息的一整套完整的基础架构,Spring AMQP提供了该协议所要求的类似的功能集。 在与Spring Boot的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。

单纯对于客户端而言,Spring Messaging提供了一套抽象的API或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的Spring实现:在消息发送端需要实现的是一个XXXTemplate形式的Java Bean,结合Spring Boot的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个XXXMessageListener接口(实现方式通常会使用一个注解来声明一个消息驱动的POJO),提供回调方法来监听和消费消息,这个接口同样可以使用Spring Boot的自动化选项和一些定制化的属性。

如果有兴趣深入的了解Spring Messaging及针对不同的消息产品的使用,推荐阅读这个文件。参考Spring Messaging的既有实现,RocketMQ的spring-boot-starter中遵循了相关的设计模式并结合RocketMQ自身的功能特点提供了相应的API(如,顺序,异步和事务半消息等)。

2.2 Spring Cloud Stream

Spring Cloud Stream结合了Spring Integration的注解和功能,它的应用模型如下:

Spring Cloud Stream框架中提供一个独立的应用内核,它通过输入(@Input)和输出(@Output)通道与外部世界进行通信,消息源端(Source)通过输入通道发送消息,消费目标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专用的Binder实现与外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行编程,而不需要关心运行时具体的Binder绑定的消息中间件。在运行时,Spring Cloud Stream能够自动探测并使用在classpath下找到的Binder。

这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在构建时包含进不同的Binder。在更加复杂的使用场景中,也可以在应用中打包多个Binder并让它自己选择Binder,甚至在运行时为不同的通道使用不同的Binder。

Binder抽象使得Spring Cloud Stream应用可以灵活的连接到中间件,加之Spring Cloud Stream使用利用了Spring Boot的灵活配置配置能力,这样的配置可以通过外部配置的属性和Spring Boo支持的任何形式来提供(包括应用启动参数、环境变量和application.yml或者application.properties文件),部署人员可以在运行时动态选择通道连接destination(例如,Kafka的topic或者RabbitMQ的exchange)。

Binder SPI的方式来让消息中间件产品使用可扩展的API来编写相应的Binder,并集成到Spring Cloud Steam环境,目前RocketMQ还没有提供相关的Binder,我们计划在下一步将完善这一功能,也希望社区里有这方面经验的同学积极尝试,贡献PR或建议。

spring-boot-starter的实现

在开始的时候我们已经知道,spring boot starter构造的启动器对于使用者是非常方便的,使用者只要在pom.xml引入starter的依赖定义,相应的编译,运行和部署功能就全部自动引入。因此常用的开源组件都会为Spring的用户提供一个spring-boot-starter封装给开发者,让开发者非常方便集成和使用,这里我们详细的介绍一下RocketMQ(客户端)的starter实现过程。

3.1. spring-boot-starter的实现步骤

对于一个spring-boot-starter实现需要包含如下几个部分:

  1. 在pom.xml的定义
  • 定义最终要生成的starter组件信息
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
  • 定义依赖包,

它分为两个部分: A、Spring自身的依赖包; B、RocketMQ的依赖包

<dependencies><!-- spring-boot-start internal depdencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>         <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- rocketmq dependencies --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq-version}</version></dependency>
</dependencies>    <dependencyManagement><dependencies><!-- spring-boot-start parent depdency definition --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
  1. 配置文件类

定义应用属性配置文件类RocketMQProperties,这个Bean定义一组默认的属性值。用户在使用最终的starter时,可以根据这个类定义的属性来修改取值,当然不是直接修改这个类的配置,而是spring-boot应用中对应的配置文件:src/main/resources/application.properties.

  1. 定义自动加载类

定义 src/resources/META-INF/spring.factories文件中的自动加载类, 其目的是让spring boot更具文中中所指定的自动化配置类来自动初始化相关的Bean,Component或Service,它的内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration

在RocketMQAutoConfiguration类的具体实现中,定义开放给用户直接使用的Bean对象. 包括:

  • RocketMQProperties 加载应用属性配置文件的处理类;
  • RocketMQTemplate 发送端用户发送消息的发送模板类;
  • ListenerContainerConfiguration 容器Bean负责发现和注册消费端消费实现接口类,这个类要求:由@RocketMQMessageListener注解标注;实现RocketMQListener泛化接口。
  1. 最后具体的RocketMQ相关的封装
    在发送端(producer)和消费端(consumer)客户端分别进行封装,在当前的实现版本提供了对Spring Messaging接口的兼容方式。

3.2. 消息发送端实现

  1. 普通发送端

发送端的代码封装在RocketMQTemplate POJO中,下图是发送端的相关代码的调用关系图:

为了与Spring Messaging的发送模板兼容,在RocketMQTemplate集成了AbstractMessageSendingTemplate抽象类,来支持相关的消息转换和发送方法,这些方法最终会代理给doSend()方法;doSend()以及RocoketMQ所特有的一些方法如异步,单向和顺序等方法直接添加到RoketMQTempalte中,这些方法直接代理调用到RocketMQ的Producer API来进行消息的发送。

  1. 事务消息发送端

对于事务消息的处理,在消息发送端进行了部分的扩展,参考下图的调用关系类图:

RocketMQTemplate里加入了一个发送事务消息的方法sendMessageInTransaction(), 并且最终这个方法会代理到RocketMQ的TransactionProducer进行调用,在这个Producer上会注册其关联的TransactionListener实现类,以便在发送消息后能够对TransactionListener里的方法实现进行调用。

3.3. 消息消费端实现

在消费端Spring-Boot应用启动后,会扫描所有包含@RocketMQMessageListener注解的类(这些类需要集成RocketMQListener接口,并实现onMessage()方法),这个Listener会一对一的被放置到DefaultRocketMQListenerContainer容器对象中,容器对象会根据消费的方式(并发或顺序),将RocketMQListener封装到具体的RocketMQ内部的并发或者顺序接口实现。在容器中创建RocketMQ Consumer对象,启动并监听定制的Topic消息,如果有消费消息,则回调到Listener的onMessage()方法。

使用示例

上面的一章介绍了RocketMQ在spring-boot-starter方式的实现,这里通过一个最简单的消息发送和消费的例子来介绍如何使这个rocketmq-spring-boot-starter。

4.1 RocketMQ服务端的准备

  1. 启动NameServer和Broker

要验证RocketMQ的Spring-Boot客户端,首先要确保RocketMQ服务正确的下载并启动。可以参考RocketMQ主站的快速开始来进行操作。确保启动NameServer和Broker已经正确启动。

  1. 创建实例中所需要的Topics

在执行启动命令的目录下执行下面的命令行操作

bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic

4.2. 编译rocketmq-spring-boot-starter

目前的spring-boot-starter依赖还没有提交的Maven的中心库,用户使用前需要自行下载git源码,然后执行mvn clean install 安装到本地仓库。

git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-spring-boot-starter
mvn clean install

4.3. 编写客户端代码

用户如果使用它,需要在消息的发布和消费客户端的maven配置文件pom.xml中添加如下的依赖:

<properties>   <spring-boot-starter-rocketmq-version>1.0.0-SNAPSHOT</spring-boot-starter-rocketmq-version>
</properties><dependency><groupId>org.apache.rocketmq</groupId><artifactId>spring-boot-starter-rocketmq</artifactId><version>${spring-boot-starter-rocketmq-version}</version>
</dependency>

属性spring-boot-starter-rocketmq-version的取值为:1.0.0-SNAPSHOT, 这与上一步骤中执行安装到本地仓库的版本一致。

  1. 消息发送端的代码

发送端的配置文件application.properties

# 定义name-server地址
spring.rocketmq.name-server=localhost:9876
# 定义发布者组名
spring.rocketmq.producer.group=my-group1
# 定义要发送的topic
spring.rocketmq.topic=string-topic

发送端的Java代码

import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
...@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {// 声明并引用RocketMQTemplate@Resourceprivate RocketMQTemplate rocketMQTemplate;// 使用application.properties里定义的topic属性@Value("${spring.rocketmq.springTopic}")private String springTopic;public static void main(String[] args){SpringApplication.run(ProducerApplication.class, args);}public void run(String... args) throws Exception {// 以同步的方式发送字符串消息给指定的topicSendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");// 打印发送结果信息System.out.printf("string-topic syncSend1 sendResult=%s %n", sendResult);}
}
  1. 消息消费端代码

消费端的配置文件application.properties

# 定义name-server地址
spring.rocketmq.name-server=localhost:9876
# 定义发布者组名
spring.rocketmq.consumer.group=my-customer-group1
# 定义要发送的topic
spring.rocketmq.topic=string-topic

消费端的Java代码

@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}
}// 声明消费消息的类,并在注解中指定,相关的消费信息
@Service
@RocketMQMessageListener(topic = "${spring.rocketmq.topic}", consumerGroup = "${spring.rocketmq.consumer.group}")
class StringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.printf("------- StringConsumer received: %s %f", message);}
}

 


原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/520359.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

假如有人把支付宝存储服务器炸了

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 净整些没用的责编 | 阿秃近日&#xff0c;在知乎看到了一个问题《假如有人把支付宝存储服务器炸了&#xff08;物理炸&#xff09;&#xff0c;大众在支付宝里的钱是不是就都没有了呢&#xff1f;》外行人问题。网站都是有服…

如何在一分钟内实现微服务系统下的架构可视化

为什么需要架构可视化 随着企业进行微服务架构改造&#xff0c;系统架构复杂度越来越高&#xff0c;架构变化日益频繁&#xff0c;微服务改造后的实际架构模型可能与预期已经产生了巨大差异&#xff0c;架构师或系统运维人员很难准确记忆所有资源实例的构成和交互情况&#xf…

SpringBoot2.x整合Redis 分布式集群_02

文章目录1. maven依赖2. RedisConfig3. RedisUtils4. application.yml5. 单元测试6. redis 客户端查看1. maven依赖 <!--redis Start--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis<…

驱动阿里云的高性能网络引擎- 飞天洛神

大家都知道阿里云部件的系统都是以神仙命名的&#xff0c;比如说洛神、伏羲、盘古、女娲等等。而在11月15日的GNTC 云专场峰会上&#xff0c;阿里云资深网络技术专家宗志刚先生首先分享了“驱动阿里云的高性能网络引擎- 飞天洛神”主题演讲。洛神是阿里云飞天系统的虚拟网络系统…

6G为什么不被看好?

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 小枣君责编&#xff5c;阿秃前段时间&#xff0c;科技部官宣我国正式启动6G研发的新闻&#xff0c;在网上引起了广泛的转发&#xff0c;相信大家都有看到。新闻摘要&#xff1a;2019年11月3日&#xff0c;科技部会同发展改革…

阿里巴巴IPv6应用平台引领下一代互联网

在11月15日的GNTC IPv6专场峰会上&#xff0c;阿里巴巴网络架构师张先国先生首先分享了“阿里巴巴IPv6应用平台引领下一代互联网”主题演讲。演讲中讲述了阿里巴巴为何尽早启动IPv6项目、阿里巴巴PV6应用平台实践、以及阿里巴巴五大应用及集团各种平台如何构建在阿里云平台之上…

SpringBoot 整合 Redis 哨兵机制_02

文章目录1. maven依赖2. RedisConfig3. RedisUtils4. application.yml5. 单元测试6. redis客户端查看1. maven依赖 <!--redis Start--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis<…

mysql慢查询日志分析工具比较_MySQL慢查询日志总结 日志分析工具mysqldumpslow

慢查询日志概念MySQL的慢查询日志是MySQL提供的一种日志记录&#xff0c;它用来记录在MySQL中响应时间超过阀值的语句&#xff0c;具体指运行时间超过long_query_time值的SQL&#xff0c;则会被记录到慢查询日志中。long_query_time的默认值为10&#xff0c;意思是运行10S以上的…

北大教授张大庆:无线感知,让你变老也优雅

受访者 | 张大庆记者 | 胡巍巍出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;在国内高校中&#xff0c;北大的校庆日很特殊——5月4日。这一天&#xff0c;也是青年节。北大&#xff0c;是五四运动的策源地。100年来&#xff0c;“爱国、进步、民主、科学”的五四…

揭秘阿里云EB级大数据计算引擎MaxCompute

日前&#xff0c;全球权威咨询与服务机构Forrester发布了《The Forrester WaveTM: Cloud Data Warehouse, Q4 2018》报告。这是Forrester Wave首次发布关于云数仓解决方案&#xff08;Cloud Data Warehouse&#xff0c;简称CDW&#xff09;的测评。报告对云数仓的当前产品功能、…

SpringBoot整合Redis 主从复制_02

文章目录1. maven依赖2. RedisConfig3. RedisUtils4. application.yml5. 单元测试6. redis客户端查看1. maven依赖 <!--redis Start--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis<…

idea 找不到 Free MyBatis plugin

idea 找不到 free mybatis plugin 可以使用mybatisX替换&#xff1a; 插件安装成功后&#xff0c;重启idea。

mysql malloc lib_CVE-2016-6662-MySQL ‘malloc_lib’变量重写命令执行分析 | CN-SEC 中文网...

摘要今天有个关于MySQL的漏洞被披露出来&#xff0c;编号CVE-2016-6662。该漏洞主要涉及到 mysqld_safe 脚本中在加速/处理内存时会采用 “malloc_lib”变量作为辨别标记选择性加载(preload方式)比如tcmalloc之类的malloc库。不幸的的是这个变量可以被my.cnf所控制&#xff0c;…

TPC-C中跑赢Oracle的OceanBase,最近有何惊艳?

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 晶少责编 | 阿秃出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcloud&#xff09;就在一年一度震撼人心的双11前夕&#xff0c;有消息称前段时间火爆到瞬间刷屏的OceanBase已经完成了Oracle模式的研发&#xff0c;助力银行…

亚马逊计划明年推出云游戏服务,紧追微软谷歌;华为GaussDB数据库推出双分布式架构;腾讯擎天5G智慧灯杆助智慧城市服务升级……...

戳蓝字“CSDN云计算”关注我们哦&#xff01;嗨&#xff0c;大家好&#xff0c;重磅君带来的【云重磅】特别栏目&#xff0c;如期而至&#xff0c;每周五第一时间为大家带来重磅新闻。把握技术风向标&#xff0c;了解行业应用与实践&#xff0c;就交给我重磅君吧&#xff01;重…

万豪数据泄漏门再敲警钟 酒店集团7步安全建议

​​11月30日&#xff0c;万豪酒店官方发布消息称&#xff0c;多达5亿人次预订喜达屋酒店客人的详细个人信息可能遭到泄露。万豪国际在调查过程中了解到&#xff0c;自2014年起即存在第三方对喜达屋网络未经授权的访问&#xff0c;但公司直到2018年9月才第一次收到警报。 万豪…

八年技术加持,性能提升10倍,阿里云HBase 2.0首发商用

阿里云HBase 2.0成长手记&#xff0c;含着金汤匙出身 HBase本身是一个分布式存储、数据库引擎&#xff0c;可以支持千万的QPS、PB级别的存储&#xff0c;这些都已经在生产环境验证&#xff0c;并且在阿里得以验证。 早在2010年开始&#xff0c;阿里巴巴集团开始研究并把HBase…

不得不看之跳槽加薪利器:2019需求最旺盛的十大IT技能

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 新技术 新商业责编&#xff5c;阿秃2018年岁末&#xff0c;中国互联网IT职场掀起一股裁员风暴&#xff0c;为所有的码农和IT专业人士敲响了警钟&#xff0c;要想职场顺风满帆&#xff0c;就应当及早预判热点&#xff0c;规划…

阿里云马劲:保证云产品持续拥有稳定性的实践和思考

对所有的技术人员来说&#xff0c;业务可靠性提升是一个系统工程&#xff0c;涉及网络管理、IDC管理、服务器管理、交付管理、变更管理、故障管理、监控管理、预案管理、根因分析、容量规划、容灾演练、标准化建设、集成测试、泛操作管理、权限管理、数据安全管理等方方面面&am…

实战:基于 Spring 的应用配置如何迁移至阿里云应用配置管理 ACM

最近遇到一些开发者朋友&#xff0c;准备将原有的Java Spring的应用配置迁移到 阿里云应用配置管理 ACM 中。迁移过程中&#xff0c;遇到不少有趣的问题。本文将通过一个简单的样例来还原迁移过程中遇到的问题和相关解决思路&#xff0c;以期达到和读者交流的目的。 什么样的配…