手拉手整合Springboot3+RocketMQ2.3

RocketMQ 基本概念

消息模型Message Model

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址, Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

消息生产者Producer

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

消息消费者Consumer

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。提供了两种消费形式:拉取式消费、推动式消费。

主题Topic

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。

代理服务器Broker Server

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名字服务Name Server

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

Pom.xml加入依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>

application.yml配置

rocketmq:
name-server: 192.168.68.133:9876
producer:
#生产者组名,一个应用里面必须唯一
group: test-producer
#消息发送的超时时间 默认3000ms
send-message-timeout: 3000
#消息达到4096字节的时候,消息就会被压缩。默认 4096
compress-message-body-threshold: 4096
#最大的消息限制,默认为128K
max-message-size: 4194304
#同步消息发送失败重试次数
retry-times-when-send-failed: 2
#在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
retry-next-server: true
#异步消息发送失败重试的次数
retry-times-when-send-async-failed: 2

消费者监听器

报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队,根据application.yml的配置

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
/不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队
System.out.println("接收到消息:"+new String(messages.getBody()));}
}


 

发送同步消息

生产者

* 发送同步消息
* destination 目的地-主题
* payload 消息

@Test
void sendMsg() {
rocketMQTemplate.syncSend("TopicTest", "同步消息");
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送异步消息

生产者


* 发送异步消息
* destination 目的地-主题
* payload 消息

@Test
void asyncTest() {rocketMQTemplate.asyncSend("TopicTest", "异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送延时消息

生产者

* 发送延时消息
* destination 目的地-主题
* payload 消息
* timestamp 连接超时
* delayLevel 延时级别

@Test
void delayTest() {Message<String> msg = MessageBuilder.withPayload("延时消息").build();
rocketMQTemplate.syncSend("TopicTest", msg, 3000, 3);
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送单向消息

生产者

* 发送单向消息
* destination 目的地-主题
* payload 消息

@Test
void OneWayTest() {rocketMQTemplate.sendOneWay("TopicTest", "单向消息");
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送顺序消息

顺序消息 生产者需要将一组消息都发送到同一个队列 ,消费者需要单线程消费

生产者

生产者需要将一组消息都发送到同一个队列

List<MessageM> messageMs = Arrays.asList(
new MessageM("sn0001", 1, "下单"),
new MessageM("sn0001", 1, "付款"),
new MessageM("sn0001", 1, "配送"),
new MessageM("sn0002", 2, "下单"),
new MessageM("sn0002", 2, "付款"),
new MessageM("sn0002", 2, "配送")
);@Test
void orderlyTest() {
/*** destination 目的地-主题
* payload 消息
*/
for (MessageM messageM : messageMs) {
rocketMQTemplate.syncSendOrderly("orderlyTest", JSON.toJSON(messageM), messageM.getSn());
}
}

消费者

CONCURRENTLY 同时

ORDERLY有序

消费者需要单线程消费

@Component
@RocketMQMessageListener(topic = "orderlyTest",consumerGroup = "orderly",consumeMode = ConsumeMode.ORDERLY)
public class orderlyListener implements RocketMQListener<MessageExt> {@Override
public void onMessage(MessageExt messageExt) {
MessageM messageM = JSON.parseObject(new String(messageExt.getBody()), MessageM.class);
System.out.println(messageM);
}
}

发送带标签tag

生产者

@Test
void ProducerTagTest(){
rocketMQTemplate.syncSend("TagMQ:tagA","带tagA的消息");
rocketMQTemplate.syncSend("TagMQ:tagB","带tagB的消息");
}

消费者

@Component
@RocketMQMessageListener(topic = "TagMQ",
consumerGroup = "TagMQGroup",
selectorType = SelectorType.TAG, //tag过滤模式
selectorExpression = "tagA || tagB")
public class MsgListenerTag implements RocketMQListener<MessageExt> {@Override
public void onMessage(MessageExt messageExt) {
System.out.println(new String( messageExt.getBody()));
}
}

发送带Key消息

Key带在消息头中

生产者

@Test
void keyTest(){
String Key = UUID.randomUUID().toString();
Message<String> msg = MessageBuilder
.withPayload("带key消息").
setHeader(RocketMQHeaders.KEYS, Key)
.build();
/**
* 带Key消息
*/
rocketMQTemplate.syncSend("ketTopic",msg);
}

消费者

@Component
@RocketMQMessageListener(topic = "ketTopic",consumerGroup = "ketConsumerGroup-springboot")
public class keyMQListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
System.out.println("key:"+messages.getKeys());
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/758949.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP全新美化广告横幅在线制作源码

源码简介 可以做网站的引流不需要安装上传就可以使用&#xff0c;在第一版基础上做了二次开发更加好用 注意&#xff1a;主机和服务器均可架设搭建,如果使用宝塔架设点击访问的时候提示找不到文件路径的时候,记得点击网站目录把防跨站攻击先关闭,这样就可以正常访问了,这款是…

两台电脑简单的通信过程详解(局域网,同网段)

来源&#xff1a; https://www.bilibili.com/video/BV1BA411373b/ 一、原理 描述过程&#xff1a;分别以PC1、PC2、PC2、PC1的角度 二、eNSP测试 1.连接设备 2.查看PC1情况 3.打开抓包后&#xff0c;再ping一下PC2 4.PC1发送ARP报文 broadcast 意思为广播(IP都是f,意为255…

【数字图像处理matlab系列】保存图像

【数字图像处理系列】保存图像imwrite函数 使用函数imwrite可以将图像保存到本地上&#xff0c;该函数的语法为 imwrite(image_data, filename)其中&#xff0c;image_data是要写入的图像数据&#xff0c;可以是一个矩阵或一个三维数组&#xff08;对于彩色图像&#xff09;&…

Java代码基础算法练习-求给定3个数, 进行从小到大排序-2024.03.20

任务描述&#xff1a; 输入三个整数 x,y,z(0<x<1000&#xff0c;0<y<1000&#xff0c;0<z<1000)&#xff0c;请把这三个数由小到大输出。 任务要求&#xff1a; 代码示例&#xff1a; package march0317_0331;import java.util.Scanner;public class m24…

系统设计实例(一)百万级别用户系统

二、百万级别用户系统 原则&#xff1a; 尽可能地缓存数据采用无状态Web层支持多个数据中心在 CDN 中托管静态资源通过分片扩展数据层将层级拆分为独立的服务 负载均衡器 负载均衡器会将传入的流量均匀分配给在负载均衡集合中定义的Web服务器&#xff0c;用户直接连接负载均…

【软件测试】如何设计自动化测试脚本

企业中如何设计自动化测试脚本呢&#xff1f;今天我们就来为大家分享一些干货。 一、线性设计 线性脚本设计方式是以脚本的方式体现测试用例&#xff0c;是一种非结构化的编码方式&#xff0c;多数采用录制回放的方式&#xff0c;测试工程师通过录制回访的访问对被测系统进行…

[LCP 51. 烹饪料理] 子集型回溯

Problem: LCP 51. 烹饪料理 文章目录 思路案例分析Code 思路 子集型回溯有两种解法 输入视角 通过判断集合中的每一个元素 k&#xff0c;认为元素 k 被选入子集或不被选入子集&#xff0c;从而得到答案。其 dfs 形状是一个高度为 n 的二叉树。作为当前节点的元素 k 代表的是这…

Unity Toggle与Toggle Group的妙用

Toggle与Toggle Group结合使用&#xff0c;妙处多多。 因为在同一Toggle Group内只有一个Toggle可以被选中&#xff0c;那么对于我们要创建单选按钮组、游戏的一些开关、暗夜模式、筛选不同显示内容等功能都非常好用。 比如我要实现通过点击不同按钮,从而筛选显示不同内容&am…

ES8生产实践——ES跨集群数据迁移方案测评

引言 场景需求 经常有小伙伴咨询如何将整个es集群数据如何迁移到另一个集群&#xff0c;其中往往会涉及到以下的问题&#xff1a; 跨es版本&#xff1a;老版本es集群数据迁移到新版本es集群。 跨集群&#xff1a;源数据和目的数据分布在两个不同的集群。 跨网络&#xff1a;两…

ch6文件操作和异常处理

os.listdir(path) 函数详解 功能: os.listdir(path) 函数用于返回指定目录下的所有文件和文件夹的名字列表&#xff0c;但不包括 . 和 ..。 参数: path: 要列出的目录的路径。 返回值: 一个包含目录下所有文件和文件夹名字的列表。 示例: import ospath "/home/u…

备战秋招(coding篇)

其中coding题目来源于师兄面试经验 1、链表的结构体反转链表 本质上就是一个构造函数 struct ListNode{int val_;ListNode* next_;ListNode() : val_(0), next_(NULL) {}ListNode(int x) : val_(x), next_(NULL) {}ListNode(int x, ListNode* next) : val_(x), next_(next) …

Crypto Gladiator League (CGL)

《加密角斗士》是一款完全链上游戏。所有角斗士、装备、代币等的生成过程都可以透明追溯。不可能被篡改或欺骗&#xff0c;使所有游戏物品都是真实资产。 CGL 现已升级为全链游戏平台和 Web3 游戏流量门户&#xff0c;通过多维度收集用户数据&#xff0c;并将数据应用于游戏中&…

【Java11下载、安装、部署指南】

oracle jdk11下载 oracle jdk所有版本归档【archive】下载地址&#xff1a; https://www.oracle.com/java/technologies/downloads/archive/ oracle jdk11下载地址&#xff1a; https://www.oracle.com/java/technologies/javase/jdk11-archive-downloads.html 配置或修改wi…

软件工程-第9章 软件工程项目管理概述

9.1 软件工程管理活动 9.2 软件规模、成本和进度估算 9.3 能力成熟度模型CMM 9.4 ISO 9000系列标准简介 9.5 CMM与ISO 9000系列标准的比较 9.6 本章小结

Matlab|基于多目标粒子群算法的配电网储能选址定容

目录 一、主要内容 二、主要流程 三、部分程序 四、程序结果 五、程序链接 一、主要内容 程序是对文章《基于多目标粒子群算法的配电网储能选址定容》的方法复现&#xff0c;具体内容如下&#xff1a; 以系统节点电压水平&#xff08;电网脆弱性&#xff09;、网络损耗以及…

数据库系统概论-第5章 数据库完整性

5.1 实体完整性 5.2 参照完整性 5.3 用户定义完整性 5.4 完整性约束命名子句 5.5 域中的完整性限制 5.6 断言 5.7 触发器 5.8 小结

Pytest自动化测试框架快速上手(超详细)

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号&#xff1a;互联网杂货铺&#xff0c;回复1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;薪资嘎嘎涨 pytest是一个非常成熟的全功能的Python测试框架&#…

蓝桥杯单片机快速开发笔记——NE555测频

一、原理分析 NE555作为一种多功能集成电路&#xff0c;在信号发生和频率测量方面具有广泛的应用。通过合理配置和连接外部元件&#xff0c;可以实现不同类型的信号发生和频率测量功能。 原理&#xff1a; 信号发生器&#xff1a; NE555可以配置为多种不同的振荡器电路&#x…

【鸿蒙HarmonyOS开发笔记】通知模块之发布基础类型通知,内含如何将图片变成PixelMap对象

通知简介 应用可以通过通知接口发送通知消息&#xff0c;终端用户可以通过通知栏查看通知内容&#xff0c;也可以点击通知来打开应用。 通知常见的使用场景&#xff1a; 显示接收到的短消息、即时消息等。 显示应用的推送消息&#xff0c;如广告、版本更新等。 显示当前正…

基于cnn深度学习的yolov5+pyqt+分类+resnet+骨龄检测系统

往期热门博客项目回顾&#xff1a; 计算机视觉项目大集合 改进的yolo目标检测-测距测速 路径规划算法 图像去雨去雾目标检测测距项目 交通标志识别项目 yolo系列-重磅yolov9界面-最新的yolo 姿态识别-3d姿态识别 深度学习小白学习路线 YOLOv5与骨龄识别 YOLOv5&a…