rabbitmq 使用SAC队列实现顺序消息

rabbitmq 使用SAC队列实现顺序消息

前提

SAC: single active consumer, 是指如果有多个实例,只允许其中一个实例消费,其他实例为空闲

目的

实现消息顺序消费,操作:

  • 创建4个SAC队列,
  • 消息的路由key 取队列个数模,这里是4
  • 发送消息到每个队列,保证每个队列只有一个消费者!!

实现

定义消息 SeqMessage
@Data
@AllArgsConstructor
public class SeqMessage implements Serializable {//消息idprivate String requestNo;//消息中顺序,1,2,3,4private int order;
}
创建 队列 绑定
@Configuration
public class OrderQueueConfiguration {public static final String EXCHANGE = "order-ex";public static final String RK_PREFIX = "rk-";public static final String ONE_QUEUE = "one-queue";public static final String TWO_QUEUE = "two-queue";public static final String THREE_QUEUE = "three-queue";public static final String FOUR_QUEUE = "four-queue";@Beanpublic DirectExchange exchange() { // 使用直连的模式return new DirectExchange(EXCHANGE, true, false);}@Beanpublic Binding oneBinding() {return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX + 1);}@Beanpublic Binding twoBinding() {return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX + 2);}@Beanpublic Binding threeBinding() {return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX + 3);}@Beanpublic Binding fourBinding() {return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX + 3);}@Beanpublic Queue oneQueue() {return createSacQueue(ONE_QUEUE);}@Beanpublic Queue twoQueue() {return createSacQueue(TWO_QUEUE);}@Beanpublic Queue threeQueue() {return createSacQueue(THREE_QUEUE);}@Beanpublic Queue fourQueue() {return createSacQueue(FOUR_QUEUE);}private static Queue createSacQueue(String queueName) {Map<String, Object> arguments = new HashMap<>(2);arguments.put("x-single-active-consumer", true);return new Queue(queueName, true, false, false, arguments);}}

重要的是 x-single-active-consumer 这个属性, 只有一个实例生效

在这里插入图片描述

创建 消费者

为每个队列创建一个监听消费者

@Slf4j
@Component
public class OrderListener {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = ONE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 1))public void onMessage1(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", ONE_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = TWO_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 2))public void onMessage2(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", TWO_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = THREE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 3))public void onMessage3(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", THREE_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = FOUR_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 4))public void onMessage4(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", FOUR_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}}
生产者发送消息
@GetMapping("/send/seq/messqge")public String sendSeqMessage() throws JsonProcessingException {int cnt = 100;int mod = 4;int seqSize = 6;for (int i = 0; i < cnt; i++) {for (int j = 0; j < seqSize; j++) {int rk = i % mod + 1;SeqMessage seqMessage = new SeqMessage("seq-" + i, j);String s = objectMapper.writeValueAsString(seqMessage);log.info("routeKey: {}, send msg: {}", rk, s);rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX + rk, s);}}return "success";}

运行结果:

two-queue recv: {"requestNo":"seq-1","order":0}
two-queue recv: {"requestNo":"seq-1","order":1}
two-queue recv: {"requestNo":"seq-1","order":2}
two-queue recv: {"requestNo":"seq-1","order":3}
two-queue recv: {"requestNo":"seq-1","order":4}
two-queue recv: {"requestNo":"seq-1","order":5}
two-queue recv: {"requestNo":"seq-5","order":0}
two-queue recv: {"requestNo":"seq-5","order":1}
two-queue recv: {"requestNo":"seq-5","order":2}
two-queue recv: {"requestNo":"seq-5","order":3}
two-queue recv: {"requestNo":"seq-5","order":4}
two-queue recv: {"requestNo":"seq-5","order":5}three-queue recv: {"requestNo":"seq-2","order":0}
three-queue recv: {"requestNo":"seq-2","order":1}
three-queue recv: {"requestNo":"seq-2","order":2}
three-queue recv: {"requestNo":"seq-2","order":3}
three-queue recv: {"requestNo":"seq-2","order":4}
three-queue recv: {"requestNo":"seq-2","order":5}
three-queue recv: {"requestNo":"seq-6","order":0}
three-queue recv: {"requestNo":"seq-6","order":1}
three-queue recv: {"requestNo":"seq-6","order":2}
three-queue recv: {"requestNo":"seq-6","order":3}
three-queue recv: {"requestNo":"seq-6","order":4}
three-queue recv: {"requestNo":"seq-6","order":5}

可以发现,消息消费是顺序的

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/826238.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python可视化数据分析-柱状图/折线图

一、前言 使用python编写一个图表生成器&#xff0c;输入各公司的不良品数量&#xff0c;可以在一张图中同时展示数据的柱状图和折线图。 效果如下&#xff1a; 二、基础知识 绘制折线图和柱状图主要使用到了 pyecharts.charts 模块中的 Line 和 Bar 类。它们允许用户通过简…

完整、免费的把pdf转word文档

在线工具网 https://www.orcc.online/pdf 支持pdf转word&#xff0c;免费、完整、快捷 登录网站 https://orcc.online/pdf 选择需要转换的pdf文件&#xff1a; 等待转换完成 点击蓝色文件即可下载 无限制&#xff0c;完整转换。

动态IP与静态IP的区别,你选对了吗?

在互联网世界中&#xff0c;IP地址是每台设备在网络上的唯一标识。这些地址可以是动态的&#xff0c;也可以是静态的。对于非专业人士来说&#xff0c;理解这两者之间的区别可能会有些困难。本文旨在深入探讨动态IP和静态IP的主要差异&#xff0c;帮助读者根据自己的需求做出明…

Golang | Leetcode Golang题解之第37题解数独

题目&#xff1a; 题解&#xff1a; func solveSudoku(board [][]byte) {var line, column [9][9]boolvar block [3][3][9]boolvar spaces [][2]intfor i, row : range board {for j, b : range row {if b . {spaces append(spaces, [2]int{i, j})} else {digit : b - 1line…

docker网路和主机通讯问题

#注 1&#xff0c;安装docker和启动容器服务的时候如果防火墙处于开启状态&#xff0c;那么重启docker里面的容器的时候必须开启防火墙&#xff0c;否则会出现iptable错误&#xff1b; 2&#xff0c;linux开启防火墙会导致主机和docker网络之间单向通讯&#xff0c;主机可以访…

一周IT资讯

又降了&#xff1f;运维4月平均月薪1W6&#xff1f; 薪资作为大部分人的主要收入来源&#xff0c;是每个人最关注的话题之一。 最近&#xff0c;小编搜索了近半年的运维薪资趋势&#xff0c;看看你的钱包缩水了没&#xff1f; *数据来自看准网 据了解&#xff0c;运维2024年…

单链表详解(无哨兵位),实现增删改查

1.顺序表对比单链表的缺点 中间或头部插入时&#xff0c;需要移动数据再插入&#xff0c;如果数据庞大会导致效率降低每次增容就需要申请空间&#xff0c;而且需要拷贝数据&#xff0c;释放旧空间增容造成浪费&#xff0c;因为一般都是以2倍增容 2.链表的基础知识 链表也是线…

LeetCode---128双周赛

题目列表 3110. 字符串的分数 3111. 覆盖所有点的最少矩形数目 3112. 访问消失节点的最少时间 3113. 边界元素是最大值的子数组数目 一、字符串的分数 按照题目要求&#xff0c;直接模拟遍历即可&#xff0c;代码如下 class Solution { public:int scoreOfString(string …

如何使用ArcGIS Pro进行路径分析

路径分析是一种空间分析技术&#xff0c;用于确定两个或多个地点之间最佳路径或最短路径&#xff0c;这里为大家介绍一下在ArcGIS Pro中如何进行路径分析&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的数据是从水经微图中下载的道路数据&#xff0c;除了道路数据&a…

阿里TTl使用管理日志

在管理日志的时候我们需要查看生成日志都是那些人干了那些事&#xff0c;那么怎么在日志上查看这些事情呢&#xff0c;首先呢可以直接使用Slf4j,然后再配置文件里配置一下 #日志文件最大上限 logging.file.max-size100MB #日志文件存储位置 logging.file.path./logs #日志文件…

中颖51芯片学习7. ADC模数转换

中颖51芯片学习7. ADC模数转换 一、ADC工作原理简介1. 概念2. ADC实现方式3. 基准电压 二、中颖芯片ADC功能介绍1. 中颖芯片ADC特性2. ADC触发源&#xff08;1&#xff09;**软件触发**&#xff08;2&#xff09;**TIMER4定时器触发**&#xff08;3&#xff09;**外部中断2触发…

面试: 悲观锁和乐观锁

一、悲观锁的代表是synchronized和Lock 锁 其核心思想是【线程只有占有了锁&#xff0c;才能去操作共享变量&#xff0c;每次只有一个线程占锁成功&#xff0c;获取锁失败的线程&#xff0c;都得停下来等待】线程从运行到阻塞、再从阻塞到唤醒&#xff0c;涉及线程上下文切换&a…

CTFHub(web sql注入)(三)

MYSQL 手工注入 1.判断字段数 输入1 输入2 输入3 得知字段有两个 2.判断注入类型 1 and 1 1 1 and 12 回显错误&#xff0c;说明存在sql注入 3.查看数据库内容 知道字段数量为2后&#xff0c;可以查看数据库位置 1 union select 1,2 使用union select 1,2查看未发现数…

【Java基础】21.重写(Override)与重载(Overload)

文章目录 一、重写(Override)1.方法重写2.方法的重写规则3.Super 关键字的使用 二、重载(Overload)1.方法重载2.重载规则3.实例 三、重写与重载之间的区别 一、重写(Override) 1.方法重写 重写&#xff08;Override&#xff09;是指子类定义了一个与其父类中具有相同名称、参…

阿里云OSS 存储对象的注册与使用

目录 一、什么是阿里云OSS 二、 点击免费试用 2.1 选择第一个&#xff0c;点击免费试用 ​编辑 2.2 登录管理控制台 2.3 进入Bucket 2.4、在阿里云网站上的个人中心配置Accesskey,查询accessKeyId和accessKeySecret。 2.5、进入AccssKey管理页面应该会出现下图提示&…

【VI/VIM】基本操作备忘录

简介 新建/打开文件 工作模式 常用命令 移动命令 文本选中 撤销、删除 复制粘贴 替换 缩排 查找 替换 插入 分屏 练习

【动态规划】C++简单多状态dp问题(打家劫舍、粉刷房子、买卖股票的最佳时机...)

文章目录 前言1. 前言 - 理解动态规划算法2. 关于 简单多状态的dp问题2.5 例题按摩师/打家劫舍 3. 算法题3.1_打家劫舍II3.2_删除并获得点数3.3_粉刷房子3.4_买卖股票的最佳时机含冷冻期3.5_买卖股票的最佳时机含手续费3.6_买卖股票的最佳时机III3.7_买卖股票的最佳时机IV 前言…

交换机的种类有哪些?主要都具有哪些作用?

在当今数字化时代&#xff0c;网络已经成为我们生活和工作中不可或缺的一部分。无论是家庭网络还是企业网络&#xff0c;都需要有效的网络设备来实现数据通信和资源共享。而网络交换机作为一种重要的网络设备&#xff0c;扮演着连接和管理网络设备的关键角色。本文将探讨交换机…

开源贡献代码之​探索一下CPython

探索一下Cython 本篇文章将会围绕最近给Apache提的一个feature为背景&#xff0c;展开讲讲CPython遇到的问题&#xff0c;以及尝试自己从0写一个库出来&#xff0c;代码也已经放星球了&#xff0c;感兴趣的同学可以去下载学习。 0.背景 最近在给apache arrow提的一个feature因为…

《TinyLlama: An Open-Source Small Language Model》全文翻译

【Title】 TinyLlama&#xff1a;开源小语言模型 【Abstract】 我们推出了 TinyLlama&#xff0c;这是一个紧凑的 1.1B 语言模型&#xff0c;在大约 1 万亿个令牌上进行了大约 3 个时期的预训练。 TinyLlama 基于 Llama 2&#xff08;Touvron 等人&#xff0c;2023b&#xff…