springboot整合rocketmq_面试官:简单说一下RocketMQ整合SpringBoot吧

前言

在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了

应用结构

bbf0d88d72cff5b321ebd15d05811ba3.png

TestController: 测试入口, 有基本消息测试和事务消息测试
TopicListener: 是监听"topic"这个主题的普通消息监听器
TopicTransactionListener: 是监听"topic"这个主题的事务消息监听器, 和TopicTransactionRocketMQTemplate绑定(一一对应关系)
Customer: 是测试消息体的一个entity对象
TopicTransactionRocketMQTemplate: 是扩展自RocketMQTemplate的另一个RocketMQTemplate, 专门用来处理某一个业务流程, 和TopicTransactionListener绑定(一一对应关系)

pom.xml

org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.1, 引用的springboot版本是2.0.5.RELEASE

<?xml version="1.0" encoding="UTF-8"?>4.0.0com.mrathena.middle.ware    rocket.mq.springboot    1.0.0org.springframework.boot                spring-boot-dependencies                2.4.0pomimportorg.projectlombok            lombok            1.18.12org.slf4j            slf4j-api            1.7.30ch.qos.logback            logback-classic            1.2.3org.apache.rocketmq            rocketmq-spring-boot-starter            2.1.1org.springframework.boot                    spring-boot-starter                org.springframework                    spring-core                org.springframework                    spring-webmvc                org.springframework                    spring-aop                org.springframework                    spring-context                org.springframework                    spring-messaging                com.fasterxml.jackson.core                    jackson-databind                org.springframework.boot            spring-boot-starter-web        org.springframework.boot            spring-boot-starter-test        org.springframework            spring-messaging        com.fasterxml.jackson.core            jackson-databind        io.springfox            springfox-swagger-ui            2.9.2io.springfox            springfox-swagger2            2.9.2org.apache.maven.plugins                maven-compiler-plugin                3.8.11.81.8UTF-8

application.yml

server:  servlet:    context-path:  port: 80rocketmq:  name-server: 116.62.162.48:9876  producer:    group: producer

Customer

package com.mrathena.rocket.mq.entity;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;@Getter@Setter@NoArgsConstructor@AllArgsConstructorpublic class Customer {private String username;private String nickname;}

生产者 TestController

package com.mrathena.rocket.mq.controller;import com.mrathena.rocket.mq.configuration.TopicTransactionRocketMQTemplate;import com.mrathena.rocket.mq.entity.Customer;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.core.MessagePostProcessor;import org.springframework.messaging.support.MessageBuilder;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;import java.util.Map;@Slf4j@RestController@RequestMapping("test")public class TestController {private static final String TOPIC = "topic";@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate TopicTransactionRocketMQTemplate topicTransactionRocketMQTemplate;@GetMapping("base")public Object base() {// destination: topic/topic:tag, topic或者是topic拼接tag的整合体// payload: 荷载即消息体// message: org.springframework.messaging.Message, 是Spring自己封装的类, 和RocketMQ的Message不是一个类, 里面没有tags/keys等内容rocketMQTemplate.send(TOPIC, MessageBuilder.withPayload("你好").setHeader("你是谁", "你猜").build());// tags nullrocketMQTemplate.convertAndSend(TOPIC, "tag null");// tags empty, 证明 tag 要么有值要么null, 不存在 empty 的 tagrocketMQTemplate.convertAndSend(TOPIC + ":", "tag empty ?");// 只有 tag 没有 keyrocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a");rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b");// 有 property, 即 RocketMQ 基础 API 里面, Message(String topic, String tags, String keys, byte[] body) 里面的 key// rocketmq-spring-boot-starter 把 userProperty 和其他的一些属性都糅合在 headers 里面可, 具体可以参考 org.apache.rocketmq.spring.support.RocketMQUtil.addUserProperties// 获取某个自定义的属性的时候, 直接 headers.get("自定义属性key") 就可以了Map properties = new HashMap<>();properties.put("property", 1);properties.put("another-property", "你好");rocketMQTemplate.convertAndSend(TOPIC, "property 1", properties);rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 1", properties);rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b property 1", properties);properties.put("property", 5);rocketMQTemplate.convertAndSend(TOPIC, "property 5", properties);rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 5", properties);rocketMQTemplate.convertAndSend(TOPIC + ":c", "tag c property 5", properties);// 消息后置处理器, 可以在发送前对消息体和headers再做一波操作rocketMQTemplate.convertAndSend(TOPIC, "消息后置处理器", new MessagePostProcessor() {/** * org.springframework.messaging.Message */@Overridepublic Message> postProcessMessage(Message> message) {Object payload = message.getPayload();MessageHeaders messageHeaders = message.getHeaders();return message;}});// convertAndSend 底层其实也是 syncSend// syncSendlog.info("{}", rocketMQTemplate.syncSend(TOPIC, "sync send"));// asyncSendrocketMQTemplate.asyncSend(TOPIC, "async send", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("onSuccess");}@Overridepublic void onException(Throwable e) {log.info("onException");}});// sendOneWayrocketMQTemplate.sendOneWay(TOPIC, "send one way");// 这个我还是不太清楚是干嘛的? 跑的时候会报错!!!//Object receive = rocketMQTemplate.sendAndReceive(TOPIC, "你好", String.class);//log.info("{}", receive);return "success";}@GetMapping("transaction")public Object transaction() {Message message = MessageBuilder.withPayload(new Customer("mrathena", "你是谁")).build();// 这里使用的是通过 @ExtRocketMQTemplateConfiguration(group = "anotherProducer") 扩展出来的另一个 RocketMQTemplatelog.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC, message, null));log.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC + ":tag-a", message, null));return "success";}}

配置 TopicTransactionRocketMQTemplate

package com.mrathena.rocket.mq.configuration;import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;import org.apache.rocketmq.spring.core.RocketMQTemplate;/** * 一个事务流程和一个RocketMQTemplate需要一一对应 * 可以通过 @ExtRocketMQTemplateConfiguration(注意该注解有@Component注解) 来扩展多个 RocketMQTemplate * 注意: 不同事务流程的RocketMQTemplate的producerGroup不能相同 * 因为MQBroker会反向调用同一个producerGroup下的某个checkLocalTransactionState方法, 不同流程使用相同的producerGroup的话, 方法可能会调用错 */@ExtRocketMQTemplateConfiguration(group = "anotherProducer")public class TopicTransactionRocketMQTemplate extends RocketMQTemplate {}

消费者 TopicListener

package com.mrathena.rocket.mq.listener;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/** * 最简单的消费者例子 * topic: 主题 * consumerGroup: 消费者组 * selectorType: 过滤方式, TAG:标签过滤,仅支持标签, SQL92:SQL过滤,支持标签和属性 * selectorExpression: 过滤表达式, 根据selectorType定, TAG时, 写标签如 "a || b", SQL92时, 写SQL表达式 * consumeMode: CONCURRENTLY:并发消费, ORDERLY:顺序消费 * messageModel: CLUSTERING:集群竞争消费, BROADCASTING:广播消费 */@Slf4j@Component@RocketMQMessageListener(topic = "topic",// 只过滤tag, 不管headers中的key和value//selectorType = SelectorType.TAG,// 必须指定selectorExpression, 可以过滤tag和headers中的key和value//selectorType = SelectorType.SQL92,// 不限tag//selectorExpression = "*",// 不限tag, 和 * 一致//selectorExpression = "",// 只要tag为a的消息//selectorExpression = "a",// 要tag为a或b的消息//selectorExpression = "a || b",// SelectorType.SQL92时, 可以跳过tag, 直接用headers里面的key和value来判断//selectorExpression = "property = 1",// tag不为null//selectorExpression = "TAGS is not null",// tag为empty, 证明tag不会是empty, 要么有值要么null//selectorExpression = "TAGS = ''",// SelectorType.SQL92时, 即过滤tag, 又过滤headers里面的key和value//selectorExpression = "(TAGS is not null and TAGS = 'a') and (property is not null and property between 4 and 6)",// 并发消费consumeMode = ConsumeMode.CONCURRENTLY,// 顺序消费//consumeMode = ConsumeMode.ORDERLY,// 集群消费messageModel = MessageModel.CLUSTERING,// 广播消费//messageModel = MessageModel.BROADCASTING,consumerGroup = "consumer")public class TopicListener implements RocketMQListener {public void onMessage(String s) {log.info("{}", s);}}

消费者 TopicTransactionListener

package com.mrathena.rocket.mq.listener;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.stereotype.Component;@Slf4j@Component@RocketMQTransactionListener(rocketMQTemplateBeanName = "topicTransactionRocketMQTemplate")public class TopicTransactionListener implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {// message: org.springframework.messaging.Message, 是Spring自己封装的类, 和RocketMQ的Message不是一个类, 里面没有tags/keys等内容// 一般来说, 并不会在这里处理tags/keys等内容, 而是根据消息体中的某些字段做不同的操作, 第二个参数也可以用来传递一些数据到这里log.info("executeLocalTransaction message:{}, object:{}", message, o);log.info("payload: {}", new String((byte[]) message.getPayload()));MessageHeaders headers = message.getHeaders();log.info("tags: {}", headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));log.info("rocketmq_TOPIC: {}", headers.get("rocketmq_TOPIC"));log.info("rocketmq_QUEUE_ID: {}", headers.get("rocketmq_QUEUE_ID"));log.info("rocketmq_MESSAGE_ID: {}", headers.get("rocketmq_MESSAGE_ID"));log.info("rocketmq_TRANSACTION_ID: {}", headers.get("rocketmq_TRANSACTION_ID"));log.info("TRANSACTION_CHECK_TIMES: {}", headers.get("TRANSACTION_CHECK_TIMES"));log.info("id: {}", headers.get("id"));return RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {log.info("checkLocalTransaction message:{}", message);// 在调用了checkLocalTransaction后, 另一个常规消息监听器才能收到消息return RocketMQLocalTransactionState.COMMIT;}}

最后

欢迎关注小编后,可以私信小编【666】即可领取一线大厂Java面试题总结+各知识点学习思维导+一份300页pdf文档的Java核心知识点总结!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/551361.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java面试常考_JAVA面试常考系列十

JAVA面试常考系列十题目一Servlet是什么&#xff1f;Servlet(Server Applet)是Java Servlet的简称&#xff0c;称为小服务程序或服务连接器&#xff0c;是用Java编写的服务器端程序&#xff0c;主要的作用是处理客户端请求并生成动态Web内容。一般情况下&#xff0c;Servlet主要…

的ui在vs中显示没有成员_在电脑桌面使用敬业签团队便签怎么设置新增内容在上面显示?...

实时跟进小组成员的各项任务的完成情况&#xff0c;及时将工作任务安排下去&#xff0c;是提高团队工作效率的较为有效的方法。在监督小组成员任务状况以及及时安排工作方面&#xff0c;选择一款支持多人同步在线协作的软件是非常有必要的。敬业签团队便签是一款不受系统、设备…

多表关联查询_【函数007】 EXCEL多表关联查询实战

继续函数实战系列教程&#xff0c;今天要分享的案例是根据填写的表名自动提取对应表中的数据数据准备&#xff1a;需求说明&#xff1a;我们希望实现&#xff0c;我们选择不同月份&#xff0c;显示对应月份表中的数据&#xff01;处理方案&#xff1a;1、构建下拉列表(菜单)数据…

管理动物园动物c++_《过山车大亨》开发商公开新作 建立自己的动物园

知名模拟经营游戏开发商Frontier Developments近日宣布&#xff0c;旗下游戏《动物园之星》(Planet Zoo)即将在不久后发售&#xff0c;登录Steam平台。本座是一款模拟经营游戏&#xff0c;玩家在游戏中将会从零开始&#xff0c;建立一个属于自己的动物园世界。游戏中有多种不同…

安费诺amphenol连接器_安费诺的Ellison谈信号完整性职业和他的免费开源PCB设计软??件...

Amphenol的信号完整性工程师杰森埃里森(Jason Ellison )Amphenol(安费诺) ICC的高级职员信号完整性工程师杰森埃里森(Jason Ellison )深入了解了网络&#xff0c;给予EE(电子)社区以及他的开源信号完整性项目的重要性。信号完整性工程与其他EE(电子)领域相比如何&#xff1f;他…

mysql的contains_mysql中json_contains、json_extract等json查询方法的使用

新版 Mysql 中加入了对 JSON Document 的支持&#xff0c;可以创建 JSON 类型的字段&#xff0c;并有一套函数支持对JSON的查询、修改等操作。JSON是一种轻量级的数据交换格式&#xff0c;采用了独立于语言的文本格式&#xff0c;类似XML&#xff0c;但是比XML简单&#xff0c;…

260多媒体语言如何调节_260马力配9.7米货厢,实拍柳汽H5小三轴载货车

【卡车之家 原创】高速公路按轴的收费实施让9米6大单桥载货车火了起来&#xff0c;能够满足快递快运、电商物流等以方量为主的轻型货物运输。但这种大单桥18吨的限重对于不少卡友来说或许不太够用&#xff0c;而8x4载货车又有些大材小用了。这时&#xff0c;处于中间位置的三轴…

linux php 如何上传webshell,linux+apache+php的一次拿webshell的心得

首先俺先声明俺是个菜鸟&#xff0c;俺虽然是菜鸟但俺不会一直是菜鸟的(一旁兄弟喊到&#xff1a;别俺&#xff0c;俺&#xff0c;俺的&#xff0c;说普通话&#xff01;)。俺&#xff0c;不对&#xff0c;我一直遵照着实践是检验真理的唯一标准这句话学习技术&#xff0c;这不…

织梦php重新安装教程,织梦CMS系统后台重装的操作教程

原标题&#xff1a;织梦CMS系统后台重装的操作教程网站在线运行&#xff0c;不可能一点问题都不存在&#xff0c;或者当初的网站设计、架构、功能已经不能满足现在用户的需求&#xff0c;所以我们会对网站做出相应的改动。网站除了前端的改版&#xff0c;还会有后台系统重装、服…

mpu9250姿态融合算法_基于投票方式的机器人装配姿态估计

作者&#xff1a;仲夏夜之星来源&#xff1a;公众号 3D视觉工坊链接&#xff1a;基于投票方式的机器人装配姿态估计论文题目&#xff1a;《Voting-Based Pose Estimation for Robotic Assembly Using a 3D Sensor》这篇文章被发表在2012年的IEEE International Conference on R…

skywalking原理_微服务链路追踪原理

作者&#xff1a;平也来源&#xff1a;关爱程序员社区背景介绍在微服务横行的时代&#xff0c;服务化思维逐渐成为了程序员的基本思维模式&#xff0c;但是&#xff0c;由于绝大部分项目只是一味地增加服务&#xff0c;并没有对其妥善管理&#xff0c;当接口出现问题时&#xf…

文本分析软件_读书笔记:伍多库卡茨质性文本分析:方法、实践与软件使用指南...

读书笔记&#xff1a;伍多库卡茨《质性文本分析&#xff1a;方法、实践与软件使用指南》一、这篇文章、这本书或这篇论文的中心思想、核心观点是什么&#xff1f;核心观点&#xff1a;质性数据如何系统化分析&#xff1f;三大主要方法&#xff1a;主题分析、评估分析和类型建构…

matlab两轮自平衡小车,两轮自平衡小车(全部设计资料+设计分析)

自己做的自平衡小车&#xff0c;基本达到预期效果。制作资料在压缩包里面&#xff0c;供参考。该两轮自平衡小车硬件设计概述&#xff1a;控制器&#xff1a;ATmega16&#xff1b;8MHz&#xff1b;加速度传感器&#xff1a;MMA2260&#xff1b;陀螺仪&#xff1a;EWTS82&#x…

异步fifo_【推荐】数字芯片异步FIFO设计经典论文

之前有一篇文章我已经推荐过了数字芯片跨时钟域设计的经典论文 &#xff08;【推荐】数字芯片跨时钟域设计经典论文 &#xff09;&#xff0c;希望看过的读者都有一定的收获。不过有点遗憾的是那片论文中虽然提到了异步FIFO&#xff0c;却没有讲具体的原理和设计细节。本篇文章…

php fuzzy,模糊C均值聚类算法(Fuzzy C-means)

模糊c均值聚类与k均值聚类区别k均值聚类k均值聚类的实现中&#xff0c;把每个样本划分到单一的类别中&#xff0c;亦即是每个样本只能属于一种类别&#xff0c;不能属于多种类别。这样的划分&#xff0c;称为硬划分。模糊c均值均类为了解决硬划分所带来的问题&#xff0c;因此有…

dw新建php文件自动生成html,dw如何新建css规则

1、在菜单中单击“文件”选择“新建”2、在新建文档窗口&#xff0c;选择“空白页”—“HTML”&#xff0c;文档类型选择“XHTML1.0 transitional”,单击“创建”按钮3、将插入点放在文档中&#xff0c;然后在菜单栏单击“格式”&#xff0c;在弹出的下拉菜单中选择“CSS样式”…

oracle获取登录名,oracle如何获取当前登录的用户名

Microsoft Windows [版本 5.2.3790](C) 版权所有 1985-2003 Microsoft Corp.C:/>sqlplusSQL*Plus: Release 9.2.0.1.0 - Production on 星期三 5月 30 00:04:26 2007Copyright (c) 1982, 2002, Oracle Corporation. All rights reserved.请输入用户名: scott请输入口令:连…

python实现英文新闻摘要自动提取_Automotive Innovation摘要集系列2:Intelligent and Connected Vehicles...

为便于广大科技工作者更好的了解中国汽车行业首个英文学术期刊《Automotive Innovation》&#xff0c;并更快的定位到自己感兴趣的论文&#xff0c;编辑部把2018-2019年刊出的70篇论文摘要进行集结&#xff0c;并按照节能与环保(Energy-saving & Eco-systems)、智能网联汽车…

linux 改变文件夹属性,技术|在Linux中用chattr和lsattr命令管理文件和目录属性

为了允许添加数据&#xff0c;防止更改或者删除等&#xff0c;文件和文件夹可以设定了特定的控制属性。例如&#xff0c;你可以在关键的系统文件或者文件夹中启用属性&#xff0c;然后没有任何用户&#xff0c;包括root&#xff0c;可以删除或者修改它&#xff0c;比如不允许使…

linux 系统盘无法ls,系统故障排除

1.系统故障排除1)模拟磁盘/dev/sda的MBR故障&#xff0c;并执行修复01.备份磁盘/dev/sda的MBR扇区选择一个/dev/sda以外的文件系统(比如/dev/sdb1)来存放备份文件&#xff1a;[rootsvr5 ~]# df -hT/home//选择/dev/sdb1存放备份文件系统 类型 容量 已用 可…