【架构系列】RabbitMQ应用场景及在实际项目中如何搭建可靠的RabbitMQ架构体系

作者:后端小肥肠

创作不易,未经允许禁止转载。

1. 前言

RabbitMQ,作为一款高性能、可靠的消息队列软件,已经成为许多企业和开发团队的首选之一。它的灵活性和可扩展性使得它适用于各种应用场景,从简单的任务队列到复杂的分布式系统。本文将深入探讨RabbitMQ的应用场景以及如何在实际项目中构建可靠的RabbitMQ架构体系。

2. RabbitMQ应用场景

2.1 异步处理

在现代应用中,异步消息处理是提升用户体验和系统效率的关键。RabbitMQ可以有效地用于多种异步处理任务,例如:

  • 用户注册后的邮件发送:用户注册后,通过RabbitMQ发送一个消息到队列中,由后台服务监听并处理发送邮件的任务,从而不会延迟用户的注册过程。
  • 订单处理:在电商平台中,订单处理包括库存管理、支付确认等多个步骤,RabbitMQ可以用来在这些服务间异步传递订单信息,确保处理流程的连续性和效率。

2.2 应用解耦

RabbitMQ支持多种通信模式,如点对点、发布/订阅等,这些模式帮助系统各部分保持低耦合度,便于独立扩展和维护。例如:

  • 微服务架构中的服务通信:在微服务架构中,RabbitMQ允许各个微服务之间通过消息进行交互,而不是直接调用对方的API,这种方式减少了服务间的直接依赖。

2.3 流量削峰

在流量高峰期,如促销或大型活动期间,系统可能会遭遇巨大的访问压力。RabbitMQ可以用来缓冲入站消息,如订单或请求,从而保护后端服务不被过载:

  • 秒杀活动中的订单处理:在秒杀活动中,大量的购买请求可以先进入RabbitMQ队列,系统根据处理能力逐步从队列中取出并处理这些请求,有效避免了系统崩溃。

2.4 通信与集成

RabbitMQ提供了一个灵活的消息传递系统,可以集成复杂的企业系统。它支持多种协议和广泛的开发语言库,适用于:

  • 跨平台通信:在不同操作系统和不同编程语言编写的应用之间,RabbitMQ可以作为消息传递中间件,实现这些系统的有效通信。

2.5 日志处理和应用监控

RabbitMQ也常用于系统日志处理和监控。它可以聚合各服务产生的日志信息,并传输到日志分析系统:

  • 集中式日志管理:通过RabbitMQ,各个系统和应用的日志可以被统一收集至一个中央处理位置,便于进行日志分析、监控和报警。

2.6 数据同步

RabbitMQ 在数据同步中扮演着重要的角色,特别是在分布式系统中,它能够确保数据在多个系统或组件之间保持一致性和最新状态。这对于维护数据的完整性和及时性至关重要。例如:

  • 数据库同步:在多地数据中心运营的情况下,RabbitMQ 可以用来同步不同地点的数据库。通过消息队列,当一个数据中心的数据库更新时,相应的变更可以通过 RabbitMQ 发送到其他数据中心,从而保证所有地点的数据一致。

  • 实时数据复制:在金融服务或电子商务平台,实时数据复制是保证高可用性和灾难恢复的关键。使用 RabbitMQ,可以实现高效的数据复制策略,如将交易数据从主系统复制到备份系统或分析数据库。

  • 缓存刷新:在使用缓存提高应用性能的情况下,RabbitMQ 可以用来在数据更新时自动通知系统刷新缓存。这样,用户总是能够获取到最新的数据,而不是过时的缓存数据。

通过这些应用场景,可以看出RabbitMQ在现代软件架构中扮演的多样化角色,不仅增强了系统的可靠性和伸缩性,还提高了开发和运维的效率。

3. 在项目中如何搭建稳定RabbitMQ架构体系

3.1. RabbitMQ安装

网上RabbitMQ安装教程很多,本文只简述基于docker安装的核心步骤:

1. 环境准备,准备Cenos虚拟机,我的是7.x版本:

2. 拉取或解压RabbitMQ镜像:

3. 运行docker容器:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /home/docker/rabbitmq/rabbitmq:/var/lib/rabbitmq -v /home/docker/rabbitmq/rabbitmq_conf:/etc/rabbitmq   -e RABBITMQ_DEFAULT_VHOST=km_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:latest

4. 进入容器 :

 docker exec -it 容器id /bin/bash

5. 运行rabbitmq-plugins enable rabbitmq_management(解决无法访问网页端15672端口问题),即可完成RabbitMQ安装。

3.2. 总体技术流程

本文以异步处理应用场景为例,展示如何构建稳定可靠的RabbitMQ架构体系:

上述流程为异步消息通信的技术流程,在异步消息通信中当消息投递后就立刻返回了结果,我们无法获取消息消费的具体过程,这就导致了虽然我们可以即刻获取程序返回状态,但是程序执行细节或是否失败无法通过程序响应返回的方式获取。

基于以上RabbitMQ异步通信的优缺点,我们要搭建一个可靠的RabbitMQ架构需要从以下几个方面入手:

生产者稳定架构:

1. 消息投递回调监听。创建消息投递回调监听函数,监听生产者投递的消息是否投递成功。

2. 消息确认表创建。创建消息确认表message_confirmation,记录消息投递状态,其中字段status反应了是否投递成功(0为为投递成功,1为投递成功)。

CREATE TABLE "public"."message_confirmation" ("id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,"status" int4,"create_time" timestamp(6),"update_time" timestamp(6),"message" varchar(255) COLLATE "pg_catalog"."default",CONSTRAINT "message_confirmation_pkey" PRIMARY KEY ("id")
)
;ALTER TABLE "public"."message_confirmation" OWNER TO "postgres";

3. 创建定时任务监听消息投递确认表。每隔一段时间遍历消息确认表,筛选出status为0的消息数据,进行重复投递动作。

消费者稳定架构

1. 死信队列运用。由于网络或外部因素导致消息消费失败,可将消息投递至死信队列进行二次消费。

2. 日志表记录。如死信队列也消费失败,可将消息写入日志表(message_error)后进行手动消费,由技术人员获取日志表中消费失败记录,排查消费失败原因。

CREATE TABLE "public"."message_error" ("id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,"message_id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,"error_log" text COLLATE "pg_catalog"."default","create_time" timestamp(6),"update_time" timestamp(6),CONSTRAINT "message_error_pkey" PRIMARY KEY ("id")
)
;ALTER TABLE "public"."message_error" OWNER TO "postgres";

3.3. 实战讲解

3.3.1. 环境配置
3.3.1.1. 所需版本工具
3.3.1.2. pom依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId></dependency>
</dependencies>
3.3.2. 生产者核心代码讲解

3.3.2.1. yml配置
server:port: 8873
spring:datasource:url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_producerusername: postgrespassword: postgresdriver-class-name: org.postgresql.Driverrabbitmq:port: 5672host: 192.168.10.11username: adminpassword: adminvirtual-host: my_vhostpublisher-confirm-type: correlatedlistener:simple:acknowledge-mode: manual
3.3.2.2. 编写回调函数
 @PostConstructpublic void regCallback() {// 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("cause:"+cause);// 如果ack为true代表消息已经收到String messageId = correlationData.getId();if (!ack) {// 这里可能要进行其他的方式进行存储log.error("MQ队列应答失败,messageId是:" + messageId);return;}try {MessageConfirmation messageConfirmation = messageConfirmationMapper.selectById(messageId);messageConfirmation.setStatus(1);int count=messageConfirmationMapper.updateById(messageConfirmation);if (count == 1) {log.info("本地消息状态修改成功,消息成功投递到消息队列中...");}} catch (Exception ex) {log.error("本地消息状态修改失败,出现异常:" + ex.getMessage());}}});}

上述回调函数主要用于监听生产者发送的消息是否发送成功,并将消息发送状态更新至消息确认表中。

3.3.2.3. 编写定时任务监听消息确认表
@Configuration
@EnableScheduling
@Slf4j
public class confirmMessageTaskService {@Autowiredprivate RabbitTemplate rabbitTemplate;@AutowiredMessageConfirmationMapper messageConfirmationMapper;@Scheduled(cron = "0 */1 * * * ?")public void sendMessage(){// 把消息为0的状态消息重新查询出来,投递到MQ中。LambdaQueryWrapper<MessageConfirmation> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(MessageConfirmation::getStatus, 0);List<MessageConfirmation> noConfirmMessages = messageConfirmationMapper.selectList(queryWrapper).stream().collect(Collectors.toList());noConfirmMessages.forEach((noConfirmMessage)->{rabbitTemplate.convertAndSend("xz_push_exchange","", JsonUtil.obj2String(noConfirmMessage),new CorrelationData(noConfirmMessage.getId()));});}
}

 上述定时任务为每分钟遍历消息确认表,将status=0的消息筛选出来进行消息投递。

3.3.2.4. 消息投递
    public void sendMessage(MessageConfirmation messageConfirmation) {messageConfirmationMapper.insert(messageConfirmation);rabbitTemplate.convertAndSend("xfc_fanout_exchange","", JsonUtil.obj2String(messageConfirmation),new CorrelationData(messageConfirmation.getId()));}

3.4. 消费者核心代码讲解

3.4.1. yml配置
server:port: 8872
spring:datasource:url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_consumerusername: postgrespassword: postgresdriver-class-name: org.postgresql.Driverrabbitmq:port: 5672host: 192.168.10.11username: adminpassword: adminvirtual-host: my_vhostlistener:simple:acknowledge-mode: manual
mybatis-plus:typeAliasesPackage: com.xfc.consumer.entitiesmapper-locations: classpath:mapper/*.xml
3.4.2. RabbitMQ配置类
@Configuration
public class RabbitMQConfig {/*** 死信队列* @return*/@Beanpublic FanoutExchange deadExchange() {return new FanoutExchange("dead_xfc_fanout_exchange", true, false);}@Beanpublic Queue deadXfcQueue() {return new Queue("dead.xfc.queue", true);}@Beanpublic Binding bindDeadXfc() {return BindingBuilder.bind(deadXfcQueue()).to(deadExchange());}/*** 队列* @return*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("xfc_fanout_exchange", true, false);}@Beanpublic Queue xfcQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dead_xfc_fanout_exchange");return new Queue("xfc.queue", true, false, false, args);}@Beanpublic Binding bindXfc() {return BindingBuilder.bind(xfcQueue()).to(fanoutExchange());}
}

上述代码为RabbitMQ配置类,用于在项目初始化时生成相应的交换机和队列。 

3.4.3. 队列消费
@Service
@Slf4j
public class XfcMqConsumer {@RabbitListener(queues = {"xfc.queue"})public void messageconsumer(String message, Channel channel,CorrelationData correlationData,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {MessageConfirmation messageConfirmation=null;try {log.info("收到MQ的消息是: " + message );messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);/*** 编写业务逻辑*/} catch (Exception e) {e.printStackTrace();log.error("消息投放到死信队列"+e.getMessage(),e);channel.basicNack(tag,false,false);// 死信队列}}
}
3.4.4. 死信队列消费
@Service
@Slf4j
public class DeadMqConsumer {@AutowiredMessageErrorMapper messageErrorMapper;@RabbitListener(queues = {"dead.xfc.queue"})public void messageconsumer(String message, Channel channel,CorrelationData correlationData,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {MessageConfirmation messageConfirmation=null;try {log.info("收到MQ的消息是: " + message );messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);/*** 编写业务逻辑*/} catch (Exception e) {e.printStackTrace();/*** 写入message_error*/messageErrorMapper.insert(new MessageError(messageConfirmation.getId(),e.getMessage(),new Date()));channel.basicNack(tag,false,false);// 死信队列}}
}

3.5 效果测试

以上代码编写完成后需要进行架构效果测试,其步骤如下:

1. 消息投递测试

上图调用了消息投递接口。

在消息确认表中,新增了一条消息且status=1,代表该条消息已投递成功。

2. 消费者正常消费测试

3. 消费异常测试

上图可看出消息消费异常投入到了死信队列。

在死信队列中依然消费失败。

消费失败后成功写入了日志表。

4. 结语

本文讲解了RabbitMQ应用场景以及在异步处理场景中如何搭建稳定的RabbitMQ架构体系,逐步详细的给出了生产者及消费者端代码并在文章最后对架构效果进行了测试,感兴趣的同学可根据代码进行实操,有疑问和其他见解也可在评论区留言,我看到都会回复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/6532.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法设计与分析——期末1h

目录 第一章 算法的定义 算法的三要素 算法的基本性质 算法的时间复杂度数量级&#xff1a; 第二章 兔子繁殖问题&#xff08;递推法&#xff09; 猴子吃桃问题&#xff08;递推法&#xff09; 穿越沙漠问题&#xff08;递推法&#xff08;倒推&#xff09;&#xff09; 百钱百…

解决Maven本地仓库存在依赖包还需要远程下载的问题

背景 公司有自己maven私服&#xff0c;正在在私服可以使用的情况&#xff0c;打包是没问题的。但是这次是由于公司大楼整体因电路检修而停电&#xff0c;所有服务器关机&#xff0c;包括maven私服服务器。然后当天确有一个包需要打&#xff0c;这个时候发现死活打不了&#xf…

线性数据结构-手写链表-LinkList

为什么需要手写实现数据结构&#xff1f; 其实技术的本身就是基础的积累和搭建的过程&#xff0c;基础扎实 地基平稳 万丈高楼才会久战不衰&#xff0c;做技术能一通百&#xff0c;百通千就不怕有再难得技术了。 一&#xff1a;链表的分类 主要有单向&#xff0c;双向和循环链表…

飞书API(7):MySQL 入库通用版本

一、引入 在上一篇介绍了如何使用 pandas 处理飞书接口返回的数据&#xff0c;并将处理好的数据入库。最终的代码拓展性太差&#xff0c;本篇来探讨下如何使得上一篇的最终代码拓展性更好&#xff01;为什么上一篇的代码拓展性太差呢&#xff1f;我总结了几点&#xff1a; 列…

福布斯AI 50榜单发布!新兴势力颠覆传统,叫板谷歌、微软

整理 | 王轶群 责编 | 唐小引 出品丨AI 科技大本营&#xff08;ID&#xff1a;rgznai100&#xff09; ChatGPT带来的生成式人工智能热浪&#xff0c;促使众多企业争先恐后地试图实现生成式人工智能的最新进展。一个新的帮助企业开发和部署人工智能驱动的应用程序的科技经济体系…

NFS共享存储服务

一、NFS概述 1、简介 NFS是一种基于TCP/IP传输的网络文件系统协议。 NFS 服务的实现依赖于 RPC&#xff08;Remote Process Call&#xff0c;远端过程调用&#xff09;机制&#xff0c;通过使用 NFS 协议&#xff0c;客户机可以像访问本地目录一样访问远程服务器中的共享资源…

BUUCTF——web题目练习

[极客大挑战 2019]LoveSQL 输入1 123 输入1 123 输入2 123 这里可以看出注入位置为password的后面&#xff0c;开始手动注入 闭合方式为1 [极客大挑战 2019]Secret File 查看页面源代码&#xff0c;发现里面有一个跳转页面的连接&#xff0c;点击进去&#xff0c;查看这个…

Llama改进之——SwiGLU激活函数

引言 今天介绍LLAMA模型引入的关于激活函数的改进——SwiGLU1&#xff0c;该激活函数取得了不错的效果&#xff0c;得到了广泛地应用。 SwiGLU是GLU的一种变体&#xff0c;其中包含了GLU和Swish激活函数。 GLU GLU(Gated Linear Units,门控线性单元)2引入了两个不同的线性层…

83、动态规划-打家劫舍

思路&#xff1a; 首先使用递归方式求出最优解。从每个房屋开始&#xff0c;分别考虑偷与不偷两种情况&#xff0c;然后递归地对后续的房屋做同样的决策。这种方法确保了可以找到在不触发警报的情况下可能的最高金额。 代码如下&#xff1a; public static int rob(int[] nu…

【C++】深入剖析C++中的lambda表达式包装器bind

目录 一、lambda表达式 1、引入 2、lambda表达式 3、lambda表达式语法 ​4、lambda 的底层逻辑 二、包装器 1、包装器的表达式 ​ 2、实例化多份 3、可调用对象类型 4、实操例题 三、bind 1、bind 的表达式 2、调整参数的位置 3、绑定参数 一、lambda表达式 1、引…

wpf线程中更新UI的4种方式

在wpf中&#xff0c;更新UI上面的数据&#xff0c;那是必经之路&#xff0c;搞不好&#xff0c;就是死锁&#xff0c;或者没反应&#xff0c;很多时候&#xff0c;都是嵌套的非常深导致的。但是更新UI的方式&#xff0c;有很多的种&#xff0c;不同的方式&#xff0c;表示的意思…

hadoop学习---基于Hive的教育平台数据仓库分析案例(一)

案例背景&#xff1a; 大数据技术的应用可以从海量的用户行为数据中进行挖掘分析&#xff0c;根据分析结果优化平台的服务质量&#xff0c;最终满足用户的需求。教育大数据分析平台项目就是将大数据技术应用于教育培训领域&#xff0c;为企业经营提供数据支撑。 案例数据产生流…

现代循环神经网络(GRU、LSTM)(Pytorch 14)

一 简介 前一章中我们介绍了循环神经网络的基础知识&#xff0c;这种网络 可以更好地处理序列数据。我们在文本数据上实现 了基于循环神经网络的语言模型&#xff0c;但是对于当今各种各样的序列学习问题&#xff0c;这些技术可能并不够用。 例如&#xff0c;循环神经网络在…

使用OpenCV实现图像平移

使用OpenCV实现图像平移 程序流程效果代码 程序流程 读取图像并获取其高度、宽度和通道数。定义平移量tx和ty&#xff0c;并创建平移矩阵M。使用cv2.warpAffine函数对图像进行仿射变换&#xff08;平移&#xff09;&#xff0c;得到平移后的图像。显示平移后的图像。等待用户按…

【副本向】Lua副本逻辑

副本生命周期 OnCopySceneTick() 子线程每次心跳调用 --副本心跳 function x3323_OnCopySceneTick(elapse)if x3323_g_IsPlayerEnter 0 thenreturn; -- 如果没人进入&#xff0c;则函数直接返回endif x3323_g_GameOver 1 thenif x3323_g_EndTick > 0 thenx3323_CountDown…

【SRC-Python】在数字与字母 / 中文与英文之间插入空格的自动化解决方案

文章目录 Part.I IntroductionPart.II 使用方法Chap.I 直接处理字符串Chap.II 处理文件 Part.III Source CodeReference Part.I Introduction 在编辑文本的过程中&#xff0c;尤其是在 COPY 的过程中&#xff0c;经常会遇到如下问题&#xff1a; 源文本数字与英文字母之间没有…

循环神经网络完整实现(Pytorch 13)

一 循环神经网络的从零开始实现 从头开始基于循环神经网络实现字符级语言模型。 %matplotlib inline import math import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2lbatch_size, num_steps 32, 35 train_iter, vocab …

【AI】ONNX

长期更新&#xff0c;建议收藏关注&#xff01; 友情链接 Netron 开放神经网络交换&#xff08;Open Neural Network Exchange&#xff09;简称ONNX,是微软和Facebook提出用来表示深度学习模型的开放格式。所谓开放就是ONNX定义了一组和环境&#xff0c;平台均无关的标准格式…

ASP.NET IIS Express一定vs停止调试,就退出了,如何不退出

》》》 在项目右击属性&#xff0c;找到Web&#xff0c;把启用”编辑并继续“ 复选框 去掉

asp.net结课作业中遇到的问题解决2

目录 1、如何实现评论交流的界面 2、如果想要将文字添加到数据库中&#xff0c;而不是乱码&#xff0c;该怎么修改 3、如果想要添加的数据已经存在于数据库&#xff0c;就不允许添加了&#xff0c;该如何实现 4、想要实现某个模块下有好几个小的功能该如何实现 5、想要实现…