spring boot 集成rocketMq + 基本使用

1. RocketMq基本概念

1. NameServer
每个NameServer结点之间是相互独立,彼此没有任何信息交互
启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,
相当于一个路由控制中心。主要是用来保存topic路由信息,管理Broker
2. Broker
消息存储和中转角色,负责存储和转发消息
在启动时会向NameServer进行注册并且定时发送心跳包。心跳包中包含当前 Broker 信息
以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。
3. topic : 一个消息的集合的名字
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。
4. 生产者
生产者发送消息。启动时先从 NameServer 集群中的其中一台拉取到路由表,缓存到本地,
并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,
轮询从队列列表中选择一个队列(默认轮询)
5. 消费者
消费者跟其中一台NameServer建立连接,获取当前订阅Topic存在哪些Broker上,
然后直接跟Broker建立连接通道,然后开始消费消息

2. maven 引入starter

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

3.yml配置

3.1 生产者yml 配置

rocketmq:name-server: 127.0.0.1:9876producer:group: my-group# 发送消息超时时间send-message-timeout: 5000# 发送消息失败重试次数retry-times-when-send-failed: 2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

3.2 消费者yml 配置

rocketmq:name-server: 127.0.0.1:9876consumer:topic: topic_testgroup: consumer_my-group

4.生产者发送消息

4.1 一般消息

@Resourceprivate RocketMQTemplate rocketMQTemplate;/***  一般消息* Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。* 使用 Tag 可以实现对 Topic 中的消息进行过滤。* **/@GetMapping("/send")public String send(){rocketMQTemplate.convertAndSend("topic_test", "Hello, World!");rocketMQTemplate.convertAndSend("topic_test:tagB","Hello, World222--tagB");return "rocketMq普通消息发送完成";}

4.2 顺序消息

/** 支持消费者按照发送消息的先后顺序获取消息 */@GetMapping("/send/orderly")public String sendOrder(){//发送顺序消息,参数:topic,消息,hashkey,相同hashkey发送至同一个队列rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 1).build(),"queue");rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 2).build(),"queue");return "rocketMq顺序-消息发送成功";}

4.3 同步消息

@GetMapping("/send/sync")public String sendMsg() {String message = "我是同步消息:" + LocalDateTime.now();SendResult result = rocketMQTemplate.syncSend("topic_test:tagA", MessageBuilder.withPayload(message).build());log.info("同步-消息发送成功:" + LocalDateTime.now());return "rocketMq 同步-消息发送成功:" + result.getSendStatus();}

4.4 异步消息

/** 发送异步消息 */@GetMapping("/send/async")public String asyncSendMsg(){String message = "我是异步消息:" + LocalDateTime.now();rocketMQTemplate.asyncSend("topic_test:tagA",message,new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("发送成功 (后执行),SendStatus = {}",sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.info("发送失败 (后执行)");}});return "rocketMq 异步-消息发送成功:" + LocalDateTime.now();}

 4.5 单向消息:一般用来发送日志等不重要的消息

@GetMapping("/send/oneWay")public String sendOneWayMessage() {String message =  "我是单向消息:"+LocalDateTime.now();this.rocketMQTemplate.sendOneWay("topic_test:tagA", message);log.info("单向发送消息完成:message = {}", message);return "rocketMq 单向-消息发送成功:" + LocalDateTime.now();}

 

4.6 延时消息

/** 延时消息 */@GetMapping("/sendDelay")public String sendDelay(){String message = "我是延时消息:" + LocalDateTime.now();// 第四个参数为延时级别,分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2hrocketMQTemplate.syncSend("topic_test:tagC", MessageBuilder.withPayload(message).build(), 3000, 2);return "rocketMq延时-消息发送成功";}

4.7 事务消息

4.7.1 事务消息发送代码

/** 事务消息 */@GetMapping("/send/transaction/{id}")public void sendTransactionMessage(@PathVariable("id") Integer id){//发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等//参数一:topic;参数二:消息// 事务idString[] tags = {"tagA", "tagB", "tagC"};int i = id%3;String transactionId = UUID.randomUUID().toString();String message = "我是事务消息:" + LocalDateTime.now();TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topic_test:" + tags[i], MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build(),// 给本地事务的参数2);//发送状态String sendStatus = result.getSendStatus().name();//本地事务执行状态String localState = result.getLocalTransactionState().name();log.info("发送状态:"+sendStatus+";本地事务执行状态"+localState);}

4.7.2 继承 RocketMQLocalTransactionListener

@Slf4j
@RocketMQTransactionListener
public class MyTransactionListener implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info("执行本地事务 ,transactionId is {}, orderId is {}",transactionId, message.getHeaders().get("rocketmq_TAGS"));try{//模拟网络波动Thread.sleep(3000);/**** 首先发送一个半消息(half message),这个消息不会立即投递给消费者;然后执行本地事务(比如数据库操作)。* 根据本地事务的执行结果,决定是提交(commit)还是回滚(rollback)这个消息。* 如果本地事务成功,消息会被提交并发送给消费者;* 如果失败,消息会被回滚,消费者不会接收到这个消息*/}catch (Exception e){return RocketMQLocalTransactionState.ROLLBACK;}// 执行本地事务String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));if (StringUtils.equals("tagA", tag)){//这里只讲TAGA消息提交,状态为可执行return RocketMQLocalTransactionState.COMMIT;}else if (StringUtils.equals("tagB", tag)) {return RocketMQLocalTransactionState.ROLLBACK;} else if (StringUtils.equals("tagC",tag)) {return RocketMQLocalTransactionState.UNKNOWN;}log.info("事务提交,消息正常处理: " + LocalDateTime.now());//执行成功,可以提交事务return RocketMQLocalTransactionState.COMMIT;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(transactionId + ",消息回查"+ LocalDateTime.now());return RocketMQLocalTransactionState.ROLLBACK;}
}

tagA、tagB、tagC 三种事务消息,只有Commit的才能发送到broker 

 

 5. 消费端

/*** topic指定消费的主题,consumerGroup指定消费组,* 一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费*  2.实现RocketMQListener接口*  如果想拿到消息的其他参数可以写成MessageExt*  selectorExpression = "tagA || tagB" 指定tag 的消费*/
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}")
public class RocketMqConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String s) {log.info("topic_test: 所有的收到消息:"+s);}}

6.广播消费模式

生产端是一样的,但是消费端需要增加一个参数

messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String s) {log.info("consumer2---topic_test: 所有的收到消息:"+s);}}// 第2个消费者类,他们都是一样的代码,
//为了表示广播,就是一个消息,会被这两个消费者消费@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String s) {log.info("consumer1--topic_test: 所有的收到消息:"+s);}}

7.其他

RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/809300.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ETL中如何运用好MQ消息集成

一、ETL的主要作用 ETL&#xff08;Extract, Transform, Load&#xff09;是数据仓库中的关键环节&#xff0c;其主要作用是将数据从源系统中抽取出来&#xff0c;经过转换和清洗后加载到数据仓库中。具体而言&#xff1a; Extract&#xff08;抽取&#xff09;&#xff1a;从…

Ubuntu 安装Java、Git、maven、Jenkins等持续集成环境

Ubuntu 持续集成 安装OpenJdk 查看所有可安装的 JDK 版本 apt list OpenJDK\*使用 apt 安装 JDK&#xff08;以 11为例&#xff09;,最好是用11&#xff0c;java8对应的jenkins会有兼容问题。 sudo apt install openjdk-11-jdk openjdk-11-jre安装成功后&#xff0c;可以使用以…

WS2812B彩灯

目录 1、介绍 2、参数 3、引脚功能 4、应用电路 5、Code 1、介绍 WS2812是一种智能控制LED灯源&#xff0c;集成了控制电路和RGB芯片在一个5050封装组件中。它的主要特点和技术规格如下&#xff1a; 集成设计&#xff1a;WS2812将控制电路和RGB芯片集成在同一个封装中&…

软考高级架构师:数据库模式概念和例题

一、AI 讲解 数据库模式分为三个层次&#xff1a;外模式、概念模式和内模式。这三个层次分别对应不同的抽象级别&#xff0c;帮助数据库管理员和用户以不同的视角理解数据库结构。 外模式&#xff08;用户级&#xff09;&#xff1a;是数据库用户的视图。每个用户可以通过外模…

HarmonyOS NEXT应用开发—在Native侧实现进度通知功能

介绍 本示例通过模拟下载场景介绍如何将Native的进度信息实时同步到ArkTS侧。 效果图预览 使用说明 点击“Start Download“按钮后&#xff0c;Native侧启动子线程模拟下载任务Native侧启动子线程模拟下载&#xff0c;并通过Arkts的回调函数将进度信息实时传递到Arkts侧 实…

神经网络背后的数学原理

原文地址&#xff1a;The Math Behind Neural Networks 2024 年 3 月 29 日 深入研究现代人工智能的支柱——神经网络&#xff0c;了解其数学原理&#xff0c;从头开始实现它&#xff0c;并探索其应用。 神经网络是人工智能 &#xff08;AI&#xff09; 的核心&#xff0c;为…

智能网络新纪元:机器学习赋能未来计算机网络高速发展

&#x1f9d1; 作者简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟,欢迎关注。提供嵌入式方向的学习指导…

软件无线电系列——多率信号处理之抽取

本节目录 一、等效基带谱 二、抽取概念 三、低通信号的整数倍抽取 四、数字带通信号的抽取 1、整带抽取 2、带通信号的正交复抽取 3、带通信号的正交实抽取本节内容 一、等效基带谱 对于任何采样频率为fs的实采样信号&#xff0c;无论是Nyquist采样还是带通采样&#xff0c;采…

企业培训系统私有化解决方案:PlayEdu

PlayEdu&#xff1a;打造私有化的企业智慧教育平台&#xff0c;赋能全员高效成长&#xff01;- 精选真开源&#xff0c;释放新价值。 概览 随着企业不断发展及市场竞争加剧&#xff0c;内部培训的重要性日益凸显。然而&#xff0c;在实施过程中&#xff0c;如何确保培训内容与…

Android Studio开发学习(六)———TableLayout(表格布局)、FrameLayout(帧布局)

目录 前言 一、Tablelayout &#xff08;一&#xff09;Tablelayout的相关简介 &#xff08;二&#xff09;TableLayout使用方法 1. 当TableLayout下面写控件、则控件占据一行的大小。(自适应一行&#xff0c;不留空白) 2.多个组件占据一行&#xff0c;则配合TableRow实现…

【论文阅读笔记】Head-Free Lightweight Semantic Segmentation with Linear Transformer

莫名地这篇论文我特别难理解&#xff0c;配合代码也食用不了 1.论文介绍 Head-Free Lightweight Semantic Segmentation with Linear Transformer 基于线性Transformer的无头轻量级语义分割 2023年 AAAI Paper Code 2.摘要 现有的语义分割工作主要集中在设计有效的解码器&am…

java数据结构与算法刷题-----LeetCode461. 汉明距离

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 异或统计1的个数2. 位移操作处理3. Brian Kernighan算法 位运…

机器学习和深度学习--李宏毅(笔记与个人理解)Day11-12

Day11 when gradient is small…… 怎么知道是局部小 还是鞍点&#xff1f; using Math 这里巧妙的说明了hessan矩阵可以决定一个二次函数的凹凸性 也就是 θ \theta θ 是min 还是max&#xff0c;最后那个有些有些 哈 是一个saddle&#xff1b; 然后这里只要看hessan矩阵是不…

图形学基础:二维三维刚体的移动、缩放和旋转矩阵

一、二维 1.1 缩放矩阵 x&#xff0c;y分别表示在x轴&#xff0c;y轴缩放的倍数 示例&#xff1a; 1.2 平移矩阵 x&#xff0c;y分表表示在x轴&#xff0c;y轴上移动的距离 示例&#xff1a; 1.3 旋转矩阵 θ 表示点绕原点逆时针旋转θ 示例&#xff1a; 点 (2,1) 绕原点旋转…

MapReduce过程解析

一、Map过程解析 Read阶段&#xff1a;MapTask通过用户编写的RecordReader&#xff0c;从输入的InputSplit中解析出一个个key/value。Map阶段&#xff1a;将解析出的key/value交给用户编写的Map()函数处理&#xff0c;并产生一系列的key/value。Collect阶段&#xff1a;在用户编…

从 SQLite 3.5.9 迁移到 3.6.0(二十一)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;从 SQLite 3.4.2 迁移到 3.5.0&#xff08;二十&#xff09; 下一篇&#xff1a;SQLite—系列文章目录 ​SQLite 版本 3.6.0 &#xff08;2008-07-16&#xff09; 包含许多更改。按照惯例 SQLite项目&#xff…

怎么在外地控制自家的电视

怎么在外地控制自家的电视 随着科技的进步和智能家居的普及&#xff0c;远程控制家中的电器设备已经成为现实。电视作为家庭娱乐的中心&#xff0c;远程控制功能更是备受关注。那么&#xff0c;如何在外地控制自家的电视呢&#xff1f;本文将为你提供详细的步骤和有价值的信息…

为什么要“挺”鸿蒙?

鸿蒙到底是什么&#xff1f; 随着5G、物联网等技术的快速发展&#xff0c;智能终端设备的应用场景也越来越广泛。为了满足不同设备间的互联互通需求&#xff0c;华为在2019年推出了自主研发的操作系统——鸿蒙OS。值得关注的是&#xff0c;这也是首款国产操作系统。 要了解鸿…

UE5学习日记——制作多语言版本游戏,同时初步学习UI制作、多语言化、控制器配置、独立进程测试、打包配置和快速批量翻译等

所有的文本类&#xff0c;无论变量还是控件等都能实现本地化&#xff0c;以此实现不同语言版本。 在这里先将重点注意标注一下&#xff1a; 所有文本类的变量、控件等都可以多语言&#xff1b;本地化控制板中收集、编译时&#xff0c;别忘了编译这一步&#xff1b;支持批量复制…

ClickHouse--16--普通函数

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、日期函数1、时间或日期截取函数&#xff08;返回非日期&#xff09;2、时间或日期截取函数&#xff08;返回日期&#xff09;3、日期或时间日期生成函数 二、类…