RabbitMQ之顺序消费

什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。

不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,

虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。

如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现


private Queue creatQueue(String name){// 创建一个 单活模式 队列HashMap<String, Object> args=new HashMap<>();args.put("x-single-active-consumer",true);return new Queue(name,true,false,false,args);}

创建之后,我们可以在控制台看到 消费者的激活状态
在这里插入图片描述

=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {@Beanpublic Queue queue15_0() {return creatQueue(Message15.QUEUE_0);}@Beanpublic Queue queue15_1() {return creatQueue(Message15.QUEUE_1);}@Beanpublic Queue queue15_2() {return creatQueue(Message15.QUEUE_2);}@Beanpublic Queue queue15_3() {return creatQueue(Message15.QUEUE_3);}@Beanpublic DirectExchange exchange15() {// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它return new DirectExchange(Message15.EXCHANGE, true, false);}@Beanpublic Binding binding15_0() {return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");}@Beanpublic Binding binding15_1() {return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");}@Beanpublic Binding binding15_2() {return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");}@Beanpublic Binding binding15_3() {return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");}/*** 创建一个 单活 模式的队列* 注意 :* <p>* 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除** @param name* @return queue*/private Queue creatQueue(String name) {// 创建一个 单活模式 队列HashMap<String, Object> args = new HashMap<>();args.put("x-single-active-consumer", true);return new Queue(name, true, false, false, args);}
=================================》生产者
@Component
@Slf4j
public class Producer15 {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 这里的发送是 拟投递到多个队列中** @param id  业务id* @param msg 业务信息*/public void syncSend(int id, String msg) {Message15 message = new Message15(id, msg);rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);}/*** 根据 id 取余来决定丢到那个队列中去** @param id id* @return routingKey*/private String getRoutingKey(int id) {return String.valueOf(id % Message15.QUEUE_COUNT);}
}
============================》消费者
/*** 要想保证消息的顺序,每个队列只能有一个消费者** @author 深漂码农@明哥* @date 2024-03-18*/
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {@RabbitHandlerpublic void onMessage(Message15 message) throws InterruptedException {log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);// 这里随机睡一会,模拟业务处理时候的耗时long l = new Random(1000).nextLong();TimeUnit.MILLISECONDS.sleep(l);}
}
==============================》测试类
@Testvoid mock() throws InterruptedException {// 先启动这个测试类,模拟多个副本情况下,看如何消费new CountDownLatch(1).await();}@Testvoid syncSend() throws InterruptedException {// 模拟每个队列中扔 10 个数据,看看效果for (int i = 0; i < 10; i++) {for (int j = 0; j < 4; j++) {producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");}}TimeUnit.SECONDS.sleep(20);}
}

ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/6546.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3GPP官网下载协议步骤

1.打开官网 https://www.3gpp.org/ 2.点击 3.在界面选择要找的series&#xff0c;跳转到查找界面 以V2X通信协议为例&#xff0c;论文中通常会看到许多应用&#xff1a; [7] “Study on evaluation methodology of new Vehicle-to-Everything (V2X) use cases for LTE and NR…

3.2Java全栈开发前端+后端(全栈工程师进阶之路)-前端框架VUE3框架-企业级应用- Vuex

Vuex简介 Vuex概述 Vuex是一个专门为Vue.js应用程序开发的状态管理模式, 它采用集中式存储管理所有组件的公共状态, 并以相应的规 则保证状态以一种可预测的方式发生变化. 试想这样的场景, 比如一个Vue的根实例下面有一个根组件名为App.vue, 它下面有两个子组件A.vue和B.vu…

022、Python+fastapi,第一个Python项目走向第22步:ubuntu 24.04 docker 安装mysql8集群、redis集群(三)

这次来安装mysql8了&#xff0c;以前安装不是docker安装&#xff0c;这个我也是第一次&#xff0c;人人都有第一次嚒 前言 前面的redis安装还是花了点时间的&#xff0c;主要是网上教程&#xff0c;各有各的好&#xff0c;大家千万别取其长处&#xff0c;个人觉得这个环境影响…

ASP.NET网上车辆档案管理系统

摘 要 本文采用基于Web的Asp.net技术&#xff0c;并与sql server 2000数据库相结合&#xff0c;研发了一套车辆档案管理系统。该系统扩展性好&#xff0c;易于维护。简化了车辆档案设计流程&#xff0c;去除了冗余信息。汽车销售企业可以通过本系统完成整个销售及售后所有档案…

python爬虫实战

import requests import json yesinput(输入页数&#xff1a;) yesint(yes)headers {"accept": "application/json, text/plain, */*","accept-language": "zh-CN,zh;q0.9","content-type": "application/json",…

一对一WebRTC视频通话系列(三)——leave和peer-leave信令实现

本篇博客主要分为两部分&#xff0c;第一部分为leave信令的实现&#xff0c;即当有客户端离开房间后&#xff0c;服务端和其他在房间内的客户需知晓。第二部分为媒体协商和网络协商相关API。 本系列博客主要记录一对一WebRTC视频通话实现过程中的一些重点&#xff0c;代码全部进…

渗透之sql盲注(时间/boolean盲注)

sql盲注&#xff1a;sql盲注意思是我们并不能在web页面中看到具体的信息&#xff0c;我们只能通过输入的语句的真假来判断。从而拿到我们想要的信息。 我们通常使用ascii值来进行盲注。 目录 手动注入&#xff1a; 时间盲注&#xff1a; 布尔盲注&#xff1a; python脚本注…

【Java】基本程序设计结构(一)

前言&#xff1a;现在&#xff0c;假定已经成功安装了JDK&#xff0c;并且能够运行上篇示例程序。本篇将开始介绍Java程序中的基本设计结构&#xff0c;其中包括&#xff1a;一个简单的Java应用&#xff0c;注释&#xff0c;数据类型&#xff0c;变量与常量&#xff0c;运算符&…

【深度学习基础(3)】初识神经网络之深度学习hello world

文章目录 一. 训练Keras中的MNIST数据集二. 工作流程1. 构建神经网络2. 准备图像数据3. 训练模型4. 利用模型进行预测5. (新数据上)评估模型精度 本节将首先给出一个神经网络示例&#xff0c;引出如下概念。了解完本节后&#xff0c;可以对神经网络在代码上的实现有一个整体的了…

【架构系列】RabbitMQ应用场景及在实际项目中如何搭建可靠的RabbitMQ架构体系

作者:后端小肥肠 创作不易&#xff0c;未经允许禁止转载。 1. 前言 RabbitMQ&#xff0c;作为一款高性能、可靠的消息队列软件&#xff0c;已经成为许多企业和开发团队的首选之一。它的灵活性和可扩展性使得它适用于各种应用场景&#xff0c;从简单的任务队列到复杂的分布式系统…

算法设计与分析——期末1h

目录 第一章 算法的定义 算法的三要素 算法的基本性质 算法的时间复杂度数量级&#xff1a; 第二章 兔子繁殖问题&#xff08;递推法&#xff09; 猴子吃桃问题&#xff08;递推法&#xff09; 穿越沙漠问题&#xff08;递推法&#xff08;倒推&#xff09;&#xff09; 百钱百…

解决Maven本地仓库存在依赖包还需要远程下载的问题

背景 公司有自己maven私服&#xff0c;正在在私服可以使用的情况&#xff0c;打包是没问题的。但是这次是由于公司大楼整体因电路检修而停电&#xff0c;所有服务器关机&#xff0c;包括maven私服服务器。然后当天确有一个包需要打&#xff0c;这个时候发现死活打不了&#xf…

线性数据结构-手写链表-LinkList

为什么需要手写实现数据结构&#xff1f; 其实技术的本身就是基础的积累和搭建的过程&#xff0c;基础扎实 地基平稳 万丈高楼才会久战不衰&#xff0c;做技术能一通百&#xff0c;百通千就不怕有再难得技术了。 一&#xff1a;链表的分类 主要有单向&#xff0c;双向和循环链表…

飞书API(7):MySQL 入库通用版本

一、引入 在上一篇介绍了如何使用 pandas 处理飞书接口返回的数据&#xff0c;并将处理好的数据入库。最终的代码拓展性太差&#xff0c;本篇来探讨下如何使得上一篇的最终代码拓展性更好&#xff01;为什么上一篇的代码拓展性太差呢&#xff1f;我总结了几点&#xff1a; 列…

福布斯AI 50榜单发布!新兴势力颠覆传统,叫板谷歌、微软

整理 | 王轶群 责编 | 唐小引 出品丨AI 科技大本营&#xff08;ID&#xff1a;rgznai100&#xff09; ChatGPT带来的生成式人工智能热浪&#xff0c;促使众多企业争先恐后地试图实现生成式人工智能的最新进展。一个新的帮助企业开发和部署人工智能驱动的应用程序的科技经济体系…

NFS共享存储服务

一、NFS概述 1、简介 NFS是一种基于TCP/IP传输的网络文件系统协议。 NFS 服务的实现依赖于 RPC&#xff08;Remote Process Call&#xff0c;远端过程调用&#xff09;机制&#xff0c;通过使用 NFS 协议&#xff0c;客户机可以像访问本地目录一样访问远程服务器中的共享资源…

BUUCTF——web题目练习

[极客大挑战 2019]LoveSQL 输入1 123 输入1 123 输入2 123 这里可以看出注入位置为password的后面&#xff0c;开始手动注入 闭合方式为1 [极客大挑战 2019]Secret File 查看页面源代码&#xff0c;发现里面有一个跳转页面的连接&#xff0c;点击进去&#xff0c;查看这个…

Llama改进之——SwiGLU激活函数

引言 今天介绍LLAMA模型引入的关于激活函数的改进——SwiGLU1&#xff0c;该激活函数取得了不错的效果&#xff0c;得到了广泛地应用。 SwiGLU是GLU的一种变体&#xff0c;其中包含了GLU和Swish激活函数。 GLU GLU(Gated Linear Units,门控线性单元)2引入了两个不同的线性层…

83、动态规划-打家劫舍

思路&#xff1a; 首先使用递归方式求出最优解。从每个房屋开始&#xff0c;分别考虑偷与不偷两种情况&#xff0c;然后递归地对后续的房屋做同样的决策。这种方法确保了可以找到在不触发警报的情况下可能的最高金额。 代码如下&#xff1a; public static int rob(int[] nu…

【C++】深入剖析C++中的lambda表达式包装器bind

目录 一、lambda表达式 1、引入 2、lambda表达式 3、lambda表达式语法 ​4、lambda 的底层逻辑 二、包装器 1、包装器的表达式 ​ 2、实例化多份 3、可调用对象类型 4、实操例题 三、bind 1、bind 的表达式 2、调整参数的位置 3、绑定参数 一、lambda表达式 1、引…