基于RabbitMQ的消息监听器

1. 背景

机构的新增、更新、删除在微服务A中已经完成了(微服务A已经部署,不能修改代码),如果在微服务A中对机构进行新增、更新、删除操作后,需要同步到自己的微服务B中,这里采用MQ消息通知的方式实现。

微服务A中配置如下:

消息发往的交换机为:itcast-auth,交换机的类型为:topic

发送消息的规则如下:

● 消息为json字符串○ 如:{"type":"ORG","content":[{"managerId":"1","parentId":"0","name":"测试组织","id":"973902113476182273","status":true}],"operation":"UPDATE"}
● type表示变更的对象,比如组织:ORG 
● content为更改对象列表
● operation类型列表○ 新增-ADD○ 修改-UPDATE○ 删除-DEL

2. 消息监听器

/*** 对于微服务A消息的处理*/
@Slf4j
@Component
public class AuthMQListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),key = "#"))public void listenAgencyMsg(String msg) {//{"type":"ORG","operation":"ADD","content":[{"id":"977263044792942657","name":"55","parentId":"0","managerId":null,"status":true}]}log.info("接收到消息 -> {}", msg);JSONObject jsonObject = JSONUtil.parseObj(msg);String type = jsonObject.getStr("type");if (!StrUtil.equalsIgnoreCase(type, "ORG")) {//非机构消息return;}String operation = jsonObject.getStr("operation");JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);String name = content.getStr("name");Long parentId = content.getLong("parentId");// 。。。消息处理。。。}}

2.1 标记监听器

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),key = "#"
))
public void listenAgencyMsg(String msg) {
  • @RabbitListener:标记该方法为RabbitMQ的消息监听器。它会监听指定的队列并处理收到的消息。
  • @QueueBinding:将队列与交换机绑定。
    • @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT):指定要监听的队列的名称,这里在常量类里定义了。
    • @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC):指定交换机的名称和类型(Topic),这里在常量类里定义了,与微服务A中配置相同。
    • key = "#":路由键,这里#表示匹配所有路由键。
  • listenAgencyMsg(String msg):当消息队列接收到消息时,会调用这个方法,并将消息内容传递进来。

2.2 消息解析

log.info("接收到消息 -> {}", msg);
JSONObject jsonObject = JSONUtil.parseObj(msg);
String type = jsonObject.getStr("type");
if (!StrUtil.equalsIgnoreCase(type, "ORG")) {//非机构消息return;
}
String operation = jsonObject.getStr("operation");
JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);
String name = content.getStr("name");
Long parentId = content.getLong("parentId");
  • log.info("接收到消息 -> {}", msg):记录接收到的消息日志。
  • JSONObject jsonObject = JSONUtil.parseObj(msg):将消息字符串解析为JSON对象。
  • String type = jsonObject.getStr("type"):从消息中提取type字段。
  • if (!StrUtil.equalsIgnoreCase(type, "ORG")) { return; }:判断消息类型是否为“ORG”,如果不是,直接返回不做处理。
  • 提取operation字段:操作类型(如ADD、UPDATE、DEL)。
  • 提取content内容:content字段是一个数组,这里取第一个对象。
  • 提取name字段:表示机构的名称。
  • 提取parentId字段:表示父机构的ID。

3. RabbitMQ介绍

RabbitMQ是一种广泛使用的消息队列(Message Queue)系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在不同的系统或组件之间传递消息。通过消息队列,系统可以实现解耦、异步处理、负载均衡等特性,从而提高系统的可扩展性和可靠性。

3.1 RabbitMQ的核心概念

  1. 生产者(Producer)

    • 生产者是消息的发送方。它负责将消息发送到RabbitMQ的交换机中。
  2. 消费者(Consumer)

    • 消费者是消息的接收方。它从RabbitMQ的队列中获取并处理消息。
  3. 队列(Queue)

    • 队列是RabbitMQ内部存储消息的地方。消息从生产者发送到队列中,消费者从队列中获取消息。队列类似于一个消息的存储池。
  4. 交换机(Exchange)

    • 交换机负责接收生产者发送的消息,并根据一定的路由规则将消息路由到一个或多个队列。交换机有不同的类型,常见的有:
      • Direct Exchange:直接交换机,根据消息的路由键精确匹配队列。
      • Fanout Exchange:扇出交换机,不考虑路由键,直接将消息广播到所有绑定的队列中。
      • Topic Exchange:主题交换机,根据路由键的模式匹配(使用通配符)将消息路由到一个或多个队列。
      • Headers Exchange:头交换机,通过消息的头部属性来路由消息。
  5. 路由键(Routing Key)

    • 路由键是生产者在将消息发送到交换机时指定的一个字符串。交换机会根据这个字符串决定将消息路由到哪个队列。
  6. 绑定(Binding)

    • 绑定是交换机和队列之间的连接关系。通过绑定,可以将交换机和队列关联起来,并通过路由键决定消息的流向。

3.2 消息的生命周期

  1. 生产者发送消息

    • 生产者将消息发送到交换机,并指定一个路由键。
  2. 交换机路由消息

    • 交换机根据路由键和绑定规则,将消息路由到一个或多个队列中。
  3. 消费者接收消息

    • 消费者从队列中取出消息并进行处理。处理完成后,消费者可以向RabbitMQ发送一个确认消息(ACK),告知RabbitMQ该消息已成功处理。
  4. 消息确认与重试

    • 如果消费者处理消息失败,可以选择不发送确认消息,RabbitMQ会将消息重新放回队列,等待其他消费者处理,或进行重试。

3.3 RabbitMQ的常见使用场景

  1. 解耦

    • 在复杂系统中,各个组件之间可能有很强的依赖性。通过消息队列,生产者和消费者可以实现解耦,生产者只需将消息发送到队列,不需要关心谁会处理这些消息。
  2. 异步处理

    • 有些任务可能是耗时操作,例如生成报告、图片处理等。通过消息队列,系统可以将这些耗时操作异步处理,不会阻塞主流程。
  3. 负载均衡

    • RabbitMQ可以将消息分发给多个消费者,从而实现负载均衡。即使流量高峰期,消息处理也不会成为系统瓶颈。
  4. 消息广播

    • 通过Fanout Exchange,可以实现消息广播,将同一消息同时发送给多个队列,让多个系统或服务同时收到消息并处理。

4. 总结

这段代码的主要作用是通过监听RabbitMQ消息队列,处理微服务A中与机构相关的消息。在微服务B中通过解析消息内容,动态确定消息的类型和需要执行的操作,并调用相应的服务处理该消息。这种设计可以有效地处理异步消息,并将业务逻辑与消息队列解耦。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/50839.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何建立一个既能快速记录又易于回顾的笔记系统?

在快节奏的学习和工作中,能够快速记录和回顾信息变得尤为重要。尤其对于编程学习者来说,构建一个高效、有序的笔记系统不仅可以提高学习效率,还能帮助我们在未来轻松回溯知识要点。本文将详细探讨如何打造一个既快速记录又易于回顾的笔记系统…

打卡第48天------单调栈

今天正式开始单调栈,开启新的篇章了,那个动态规划真的太难了。不过卡尔总结的很全面,真的是收获不少呀。小坚持带来大改变。加油✊ 1. 每日温度 leetcode题目链接:739. 每日温度 题目描述: 给定一个整数数组 temperat…

使用EasyExcel填充Excel并上传至OSS

在企业级应用中,经常需要处理数据的导入导出功能。Excel作为最常用的数据交换格式之一,其自动化处理尤为重要。本文将介绍如何使用EasyExcel库来填充Excel模板,并将生成的文件上传到对象存储服务(OSS)。 EasyExcel简介…

Python 异步爬虫:高效数据抓取的现代武器

标题:“Python 异步爬虫:高效数据抓取的现代武器” 在当今信息爆炸的时代,网络爬虫已成为数据采集的重要工具。然而,传统的同步爬虫在处理大规模数据时往往效率低下。本文将深入探讨如何使用 Python 实现异步爬虫,以提…

HTTP?HTTPS?HTTP2.0

Http HTTP(HyperText Transfer Protocol,超文本传输协议)是一种用于分布式、协作式、超媒体信息系统的应用层协议。它基于TCP/IP通信协议来传递数据,如HTML文件、图片文件等。以下是HTTP的详细解析: 一、HTTP的基本…

YAML在Spring Boot中的应用

1. 基本语法 YAML使用缩进来表示层级关系,通常使用空格进行缩进(推荐使用2个空格)。 基本语法示例: key: value nested:key: value list:- item1- item22. 配置文件命名 在Spring Boot中,YAML配置文件通常命名为 a…

手撕C++入门基础

1.C介绍 C课程包括:C语法、STL、高阶数据结构 C参考文档:Reference - C Reference C 参考手册 - cppreference.com cppreference.com C兼容之前学习的C语言 2.C的第一个程序 打印hello world #define _CRT_SECURE_NO_WARNINGS 1 // test.cpp // …

RDKit在数据科学中的应用|药物筛选中的数据清理与标准化

在化学信息学和药物研发的过程中,分子数据的质量至关重要。数据清理与标准化是确保分子库数据一致性、可靠性和可比较性的关键步骤。RDKit 提供了丰富的工具,帮助用户清理和标准化分子数据,从而提高下游分析和建模的准确性。 1 数据清理的重要性 分子数据通常来自多种来源…

获取操作系统的信息(Go语言)

在 Go 语言中,你可以使用 runtime 和 os 包来查看操作系统的信息。以下是一些常见的操作系统信息获取方法: 1. 获取操作系统类型和架构信息 Go 的 runtime 包提供了基本的操作系统和架构信息: package mainimport ("fmt""r…

c_cpp_properties.json、launch.json、 tasks.json

在 Visual Studio Code 中,c_cpp_properties.json、launch.json 和 tasks.json 是三个重要的配置文件,它们的作用如下: c_cpp_properties.json: 这个文件用于配置 C/C 扩展的 IntelliSense、编译器路径和包括路径等。它帮助 VS Co…

Unity Dots学习 (一)

先学习怎么使用,再研究底层代码。Dots大家都有所耳闻。一直没时间研究,最近研究一下 看上图可知,哪怕是CPU的第三级缓存也比内存要快2-5倍。 资料: 《DOTS之路》第零节——前导课(1)——DOTS的5W1H问题_哔哩哔哩_bilibili 《DOT…

快速搭建全向轮小车

总体介绍 使用两块ordive控制ros-mobile app进行控制,odrive通过python可以轻松控制,ros-mobile可以进行与电脑的ros连接充当一个遥控器。 记录代码 读取rosmobile的遥控数据 #!/usr/bin/env python3import threading import time from queue import…

景联文科技:图像标注的类型有哪些?

图像标注是计算机视觉领域中一个非常重要的步骤,它是创建训练数据集的关键组成部分,主要用于帮助机器学习算法理解图像内容。 以下是图像标注的一些主要类型: 1. 边界框标注: • 这是最常见的标注方式之一,通常用于…

多字段聚合查询在Elasticsearch中的实践

Elasticsearch是一个功能强大的搜索引擎,它不仅支持全文搜索,还提供了丰富的聚合功能。聚合可以帮助我们对数据进行分组和统计,从而得到有意义的分析结果。本文将通过Java代码示例,介绍如何在Elasticsearch中实现多字段的聚合查询…

第四范式发布AI+5G视频营销产品 助力精准获客与高效转化

产品上新 Product Release 今天,第四范式AI5G视频电话互动营销产品全新发布。 相较于以往销效率低、互动差、转化差的传统电话外呼和短信营销方式,视频电话互动营销基于AI、5G等技术,可让用户接听电话时观看个性化视频广告并实时互动&#xf…

Unity的UI设计

目录 创建和布局 布局与交互 性能优化 最佳实践 学习资源 Unity UI Toolkit与uGUI和IMGUI之间的具体区别和适用场景是什么? Unity UI Toolkit uGUI IMGUI 如何在Unity中实现响应式UI设计以适应不同设备尺寸? Unity UI性能优化的最新技术和方法…

机器学习:逻辑回归算法实现鸢尾花预测和银行数据处理

1、鸢尾花预测 1、特征选择 2、对特征处理 trainpd.read_excel("鸢尾花训练数据.xlsx") testpd.read_excel("鸢尾花测试数据.xlsx") x_traintrain[["萼片长(cm)","萼片宽(cm)","花瓣长(cm)","花瓣宽(cm)"]] y_tr…

Vue 生命周期详解含demo、面试常问问题案例

Vue 生命周期详解、面试常问问题案例 含 demo 文章目录 Vue 生命周期详解、面试常问问题案例 含 demo一、Vue 生命周期是什么二、Vue 中如何使用生命周期钩子1. **beforeCreate**2. **created**3. **beforeMount**4. **mounted**5. **beforeUpdate**6. **updated**7. **beforeD…

Grafana学习笔记

介绍 Grafana 1.1 什么是 Grafana? Grafana 是一个开源的数据可视化和监控平台,专门设计用于从各种数据源中收集和展示数据。它最初作为一个图表生成工具,用于显示时间序列数据,但已经发展成一个功能强大且灵活的仪表板工具&…

Leetcode 3259. Maximum Energy Boost From Two Drinks

Leetcode 3259. Maximum Energy Boost From Two Drinks 1. 解题思路2. 代码实现 题目链接:3259. Maximum Energy Boost From Two Drinks 1. 解题思路 这一题就是一个动态规划,分别考察下两个数列的选择即可。 2. 代码实现 给出python代码实现如下&a…