RabbitMQ 实现分组消费满足服务器集群部署

实现思路

  1. 使用扇出交换机(Fanout Exchange):扇出交换机会将消息广播到所有绑定的队列,确保每个消费者组都能接收到相同的消息。
  2. 为每个消费者组创建独立的队列:每个消费者组拥有自己的队列,所有属于该组的消费者都订阅这个队列。
  3. 确保同一组内的消费者竞争消费:RabbitMQ 会将消息推送给组内第一个可用的消费者,确保同一消息不会被同一组中的多个消费者处理。

示例场景

假设我们有两个消费者组:GroupAGroupB。我们希望发送的消息能够同时被 GroupAGroupB 消费,但每个组内的多个消费者只会有一个成员消费该消息。

Spring Boot + RabbitMQ 广播式分组消费示例

1. 引入依赖

首先,在 pom.xml 中添加 RabbitMQ 和 Spring AMQP 的依赖:

<dependencies><!-- Spring Boot Starter for RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 其他依赖... -->
</dependencies>
2. 配置 RabbitMQ 交换机和队列

application.yml 中配置 RabbitMQ 的交换机、队列和绑定关系。使用 fanout 交换机,并为每个消费者组创建一个独立的队列。

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest# RabbitMQ 配置rabbitmq:listener:simple:acknowledge-mode: manual  # 手动确认消息concurrency: 5           # 每个队列的并发消费者数量max-concurrency: 10      # 最大并发消费者数量# 自定义队列和交换机配置rabbitmq:queues:group-a-queue:name: group_a_queuedurable: truegroup-b-queue:name: group_b_queuedurable: trueexchanges:fanout-exchange:name: fanout_exchangetype: fanoutdurable: truebindings:group-a-binding:exchange: fanout_exchangequeue: group_a_queuegroup-b-binding:exchange: fanout_exchangequeue: group_b_queue
3. 创建 RabbitMQ 配置类

创建一个配置类来声明交换机、队列和绑定关系。Spring AMQP 会自动根据配置创建这些资源。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义扇出交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout_exchange", true, false);}// 定义 Group A 队列@Beanpublic Queue groupAQueue() {return new Queue("group_a_queue", true);}// 定义 Group B 队列@Beanpublic Queue groupBQueue() {return new Queue("group_b_queue", true);}// 绑定 Group A 队列到扇出交换机@Beanpublic Binding groupABinding(Queue groupAQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupAQueue).to(fanoutExchange);}// 绑定 Group B 队列到扇出交换机@Beanpublic Binding groupBBinding(Queue groupBQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupBQueue).to(fanoutExchange);}
}
4. 发送消息

创建一个服务类来发送消息到扇出交换机。由于 fanout 交换机不使用路由键,它会将消息广播到所有绑定的队列。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic MessageProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}// 发送消息到扇出交换机public void sendMessage(String message) {System.out.println("Sending message: " + message);rabbitTemplate.convertAndSend("fanout_exchange", "", message);}
}
5. 创建消费者

为每个消费者组创建单独的消费者类。每个消费者组内的多个消费者会竞争性地从队列中消费消息。使用 @RabbitListener 注解来监听队列中的消息。

Group A 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupAConsumer {@RabbitListener(queues = "group_a_queue")public void consumeMessage(String message) {System.out.println("Group A consumer received: " + message);// 处理 Group A 的逻辑}
}
Group B 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupBConsumer {@RabbitListener(queues = "group_b_queue")public void consumeMessage(String message) {System.out.println("Group B consumer received: " + message);// 处理 Group B 的逻辑}
}
6. 启动应用程序并测试

启动 Spring Boot 应用程序后,GroupAConsumerGroupBConsumer 会分别监听 group_a_queuegroup_b_queue。通过以下方式测试消息的发送和消费:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;@Component
public class TestRunner implements CommandLineRunner {private final MessageProducer messageProducer;@Autowiredpublic TestRunner(MessageProducer messageProducer) {this.messageProducer = messageProducer;}@Overridepublic void run(String... args) throws Exception {// 发送一条消息messageProducer.sendMessage("This is a broadcast message");}
}

7. 运行结果
当你启动应用程序时,TestRunner 会发送一条消息到 fanout_exchange。由于 fanout 交换机会将消息广播到所有绑定的队列,因此 group_a_queue 和 group_b_queue 都会接收到这条消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/63525.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开发 UEFI 驱动

服务型驱动的特点&#xff1a; 1&#xff09;在 Image 的入口函数中执行安装&#xff1b; 2&#xff09;服务型驱动不需要驱动特定硬件&#xff0c;可以安装到任意控制器上&#xff1b; 3&#xff09;没有提供卸载函数。 一个设备 / 总线驱动程序在安装时首先要找到对应的硬件…

《Python密码算法例程》

《Python密码算法例程》 一、哈希算法&#xff08;以MD5为例&#xff0c;虽然MD5安全性存在缺陷&#xff0c;但常用于简单示例&#xff09;1. 计算字符串的MD5哈希值2. 更安全的哈希算法 - SHA - 256 二、对称加密算法&#xff08;以AES为例&#xff09;1. AES加密和解密简单示…

java时间处理SimpleDateFormat详解

文章目录 常用构造函数日期格式模式常见用法1. 格式化日期2. 解析日期字符串 注意事项示例扩展&#xff1a;指定区域和时区 SimpleDateFormat 是 Java 中用于日期和时间格式化的类&#xff0c;属于 java.text 包。它允许开发者将日期对象格式化为字符串&#xff0c;或者将字符…

夜莺运维指南之故障自愈

注意: 夜莺v7版本已内置故障自愈, 只需要更给n9e下的config.yaml 文件有关ibex配置即可 所谓的告警自愈&#xff0c;典型手段是在告警触发时自动回调某个 webhook 地址&#xff0c;在这个 webhook 里写告警自愈的逻辑&#xff0c;夜莺默认支持这种方式。另外&#xff0c;夜莺还…

【oracle】大数据删除插入

文章目录 引言本文目标 Oracle大数据插入操作插入操作的场景和需求使用并行查询进行数据插入示例代码&#xff1a;创建新表并插入数据解释代码中的关键点 性能优化建议 Oracle大数据删除操作删除操作的场景和需求使用游标和批量处理进行数据删除示例代码&#xff1a;批量删除数…

深入浅出:序列化与反序列化的全面解析

文章目录 1. 引言2. 什么是序列化&#xff1f;2.1 为什么需要序列化&#xff1f; 3. 什么是反序列化&#xff1f;3.1 反序列化的重要性 4. 序列化与反序列化的实现4.1 JSON (JavaScript Object Notation)4.2 XML (eXtensible Markup Language)4.3 Protocol Buffers (Protobuf)4…

Windows命令行使用技巧(持续更新)

删除指定目录下指定后缀的文件 重要的事情说在前面&#xff1a;不能恢复&#xff0c;谨慎操作 今天大意了&#xff0c;导出sql文件的时候没指定目录&#xff0c;默认放到桌面上了&#xff0c;看着桌面上密密麻麻的sql文件&#xff0c;我人傻了&#xff0c;一个一个删不是办法…

LDR6500:音频双C支持,数字与模拟的完美结合

在当今数字化快速发展的时代&#xff0c;音频设备的兼容性和性能成为了用户关注的重点。LDR6500&#xff0c;作为乐得瑞科技精心研发的USB Power Delivery&#xff08;PD&#xff09;协议芯片&#xff0c;凭借其卓越的性能和广泛的应用兼容性&#xff0c;为音频设备领域带来了新…

python rstrip 的迷惑行为

在项目中&#xff0c;我需要把字符串末尾的一部分去掉&#xff0c;类似截断&#xff0c;我用ide的随笔提示&#xff0c;发现了rstrip这个方法&#xff0c;然后我试了下&#xff0c;满足我的需求&#xff0c;但在测试过程中&#xff0c;我发现了rstrip的一些行为很让我迷惑。 开…

计算机网络编程(Linux):I/O多路转接之 select,poll

I/O多路复用&#xff08;I/O Multiplexing&#xff09;是一种高效的网络编程技术&#xff0c;允许一个线程同时监控多个文件描述符的状态&#xff0c;当某个文件描述符就绪时进行相应处理。这种技术在高并发服务器中广泛使用。本文将介绍I/O多路复用的核心概念及在Linux中的实现…

【原生js案例】webApp实现鼠标移入移出相册放大缩小动画

图片相册这种动画效果也很常见&#xff0c;在我们的网站上。鼠标滑入放大图片&#xff0c;滑出就恢复原来的大小。现在我们使用运动定时器来实现这种滑动效果。 感兴趣的可以关注下我的系列课程【webApp之h5端实战】&#xff0c;里面有大量的css3动画效果制作原生知识分析&…

Spring Boot助力,一键解锁招聘全流程信息精细化管理

2系统相关技术 2.1 Java语言介绍 Java是由SUN公司推出&#xff0c;该公司于2010年被oracle公司收购。Java本是印度尼西亚的一个叫做爪洼岛的英文名称&#xff0c;也因此得来java是一杯正冒着热气咖啡的标识。Java语言在移动互联网的大背景下具备了显著的优势和广阔的前景&#…

深入理解 NumPy 广播机制:从基础到应用

目录 什么是广播机制&#xff1f;广播机制的规则广播机制示例1. 一维数组与标量运算2. 二维数组与一维数组运算3. 维度不同的数组运算4. 广播失败的情况 广播机制的实际应用场景1. 数据归一化2. 批量计算欧氏距离 总结广播机制的核心要点&#xff1a; 在使用 NumPy 进行数组操作…

Day28两个数组的交集

给定两个数组 nums1 和 nums2 &#xff0c;返回 它们的 交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 class Solution{public int[] intersection(int[] nums1, int[] nums2) {Set<Integer> set new HashSet<>();for (int i :…

VRRP的知识点总结及实验

1、VRRP VRRP(Virtual Router Redundancy Protocol&#xff0c;虚拟路由器冗余协议)既能够实现网关的备份&#xff0c;又能解决多个网关之间互相冲突的问题&#xff0c;从而提高网络可靠性。 2、VRRP技术概述&#xff1a; 通过把几台路由设备联合组成一台虚拟的“路由设备”…

DP协议:概括

来了来了&#xff01;&#xff01;&#xff01; 开始之前扯点概念&#xff0c;知道DP好在哪里&#xff0c;以及看到它的发展趋势&#xff0c;才知道我们为什么有学习的必要。 DP的优势 DisplayPort&#xff08;DP&#xff09;协议作为一种专为数字音频和视频传输设计的高速串行…

Ant Design Vue 中 Tree 组件复选框修改样式

一、问题 最近需要实现一个业务需求&#xff0c;要修改勾选框中的颜色&#xff0c;默认勾选框的颜色是蓝色&#xff0c;现在需要变成绿色。 1、官网示例&#xff1a; 2、业务需求&#xff1a; 3、具体实现&#xff1a; HTML 部分代码 <template><div class"s…

【JavaWeb后端学习笔记】登录校验(JWT令牌技术、Interceptor拦截器、Filter过滤器)

登录校验 1、JWT令牌技术1.1 JWT令牌介绍1.2 Java代码生成与校验JWT令牌 2、Filter过滤器2.1 Filter过滤器的简单实现2.2 配置拦截路径2.3 Filter接口中的三个方法&#xff1a;2.4 Filter过滤器登录校验2.5 过滤器链 3、Interceptor拦截器3.1 拦截器(Interceptor)的简单实现3.2…

SpringBoot的Bean类三种注入方式(附带LomBok注入)

SpringBoot的Bean类三种注入方式&#xff08;附带LomBok注入&#xff09; 在 Spring Boot 中&#xff0c;Bean 的注入方式主要包括构造函数注入&#xff08;Constructor Injection&#xff09;、字段注入&#xff08;Field Injection&#xff09;以及 Setter 方法注入&#xf…

Linux系统下常用资源查看

一、查看CPU使用率 top 命令 top命令可以看到总体的系统运行状态和cpu的使用率 。 %us&#xff1a;表示用户空间程序的cpu使用率&#xff08;没有通过nice调度&#xff09; %sy&#xff1a;表示系统空间的cpu使用率&#xff0c;主要是内核程序。 %ni&#xff1a;表示用户空间且…