RabbitMQ之消费者并发消费

为什么要引入消费者的并发消费?
当生产者的推送速度是远远超过消费者的能力的,可以提高消费者的消费速度。比如在java中我们可以启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度,在mq中也可以通过设置配置。
@RabbitListener 注解中,有 concurrency 属性,它可以指定并发消费的线程数。
下面演示消费者并发消费实现
配置文件yml

spring:rabbitmq:#host为一般模式 若集群模式 将key换成addresses的形式host: 192.168.9.104port: 5672#账号密码自行替换username: adminpassword: adminlistener:# 选择的 ListenerContainer 的类型。默认为 simple 类型type: simplesimple:# 每个 @ListenerContainer 的并发消费的线程数concurrency: 2# 每个 @ListenerContainer 允许的并发消费的线程数max-concurrency: 10 

rabbitmq.listener.type 的枚举值可以参考 ContainerType

SIMPLE 对应 SimpleMessageListenerContainer 消息监听器容器。它一共有两类线程:

Consumer 线程,负责从 RabbitMQ Broker 获取 Queue 中的消息,存储到内存中的 BlockingQueue 阻塞队列中。
Listener 线程,负责从内存中的 BlockingQueue 获取消息,进行消费逻辑

==================================》配置类代码
@Configuration
public class DirectExchangeConfiguration {/*** 创建一个 Queue** @return Queue*/@Beanpublic Queue queue08() {// Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除return new Queue(Message08.QUEUE,true,false,false);}/*** 创建 Direct Exchange** @return DirectExchange*/@Beanpublic DirectExchange exchange08() {// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它return new DirectExchange(Message08.EXCHANGE,true,false);}/*** 创建 Binding* Exchange:Message08.EXCHANGE* Routing key:Message08.ROUTING_KEY* Queue:Message08.QUEUE** @return Binding*/@Beanpublic Binding binding08() {return BindingBuilder.bind(queue08()).to(exchange08()).with(Message08.ROUTING_KEY);}
========================》生产者代码
@Component
public class Producer08 {@Resourceprivate RabbitTemplate rabbitTemplate;public void syncSend(String id, String routingKey) {// 创建 Message08 消息Message08 message = new Message08();message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Message08.EXCHANGE, routingKey, message);}
}
===========================》消费者代码
@Component
// 开启并发消费
@RabbitListener(queues = Message08.QUEUE, concurrency = "2")
@Slf4j
public class Consumer08 {@RabbitHandlerpublic void onMessage(Message08 message) throws InterruptedException {log.info("[{}][Consumer08 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);// 模拟消费耗时,为了让并发消费效果更好的展示TimeUnit.SECONDS.sleep(1);}
}
@ResourceProducer08 producer08;@Testvoid mock() throws InterruptedException {TimeUnit.SECONDS.sleep(20);}@SneakyThrows@Testvoid syncSend() {// 循环发送十个,观察消费者情况for (int i = 0; i < 10; i++) {String id = UUID.randomUUID().toString();producer08.syncSend(id, Message08.ROUTING_KEY);}log.info("[{}][test producer08 syncSend] 发送成功", LocalDateTime.now());// 这里多睡一会,确保消息全部消费完成TimeUnit.SECONDS.sleep(10);}

以上的是消费者并发消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/7963.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用Python创建服务器向Android设备发送GCM推送通知

简介 推送通知可以让您的 Android 应用在用户不使用应用时通知用户发生的事件。本教程的目标是向您的应用发送一个简单的推送通知。我们将在服务器上使用 Ubuntu 14.04 和 Python 2.7&#xff0c;以及 Google Cloud Messaging 作为推送通知服务。 我们将使用术语 服务器 来指…

leetcode83-Remove Duplicates from Sorted List

题目 给定一个已排序的链表的头 head &#xff0c; 删除所有重复的元素&#xff0c;使每个元素只出现一次 。返回 已排序的链表 。 示例 1&#xff1a; 输入&#xff1a;head [1,1,2] 输出&#xff1a;[1,2] 示例 2&#xff1a; 输入&#xff1a;head [1,1,2,3,3] 输出&…

鸿蒙内核源码分析(进程通讯篇) | 九种进程间通讯方式速揽

进程间为何要通讯 ? 鸿蒙内核默认支持 64个进程和128个任务&#xff0c;由进程池和任务池统一管理.内核设计尽量不去打扰它们&#xff0c;让各自过好各自的日子&#xff0c; 但大家毕竟在一口锅里吃饭&#xff0c; 不可能不与外界联系&#xff0c; 联系就得有渠道&#xff0c…

MySQL VARCHAR 最佳长度评估实践

你的 VARCHAR 长度合适么&#xff1f; 作者&#xff1a;官永强&#xff0c;爱可生 DBA 团队成员&#xff0c;擅长 MySQL 运维方面的技能。热爱学习新知识&#xff0c;亦是个爱打游戏的宅男。 作者&#xff1a;李富强&#xff0c;爱可生 DBA 团队成员&#xff0c;熟悉 MySQL&…

Go微服务精讲:Go-Zero全流程实战即时通讯(超清)

go-zero 是一个集成了各种工程实践的 web 和 rpc 框架。通过弹性设计保障了大并发服务端的稳定性&#xff0c;经受了充分的实战检验。 Go微服务精讲&#xff1a;Go-Zero全流程实战即时通讯(超清) go-zero 中的 api&#xff0c;rpc&#xff0c;数据库等涉及的代码&#xff0c;…

C#标签设计打印软件开发

1、新建自定义C#控件项目Custom using System; using System.Collections.Generic; using System.Text;namespace CustomControls {public class CommonSettings{/// <summary>/// 把像素换算成毫米/// </summary>/// <param name"Pixel">多少像素…

Springboot 集成 Consul 实现服务注册中心-05

因为后续很多模块都要用到注册中心&#xff0c;所以此处先实现此模块。 Consul简介 Consul是一个开源的服务发现和配置管理工具&#xff0c;具有跨平台、运行高效等特点。它由HashiCorp公司开发&#xff0c;并使用Go语言编写。Consul主要用于实现分布式系统中的服务发现、健康…

解决Node.js mysql客户端不支持认证协议引发的“ER_NOT_SUPPORTED_AUTH_MODE”问题

这是一个版本问题 我用koa2和mysql2链接就没有问题 不知道这个老项目运行为啥有这个问题 解决方案 打开mysql运行这个两个命令&#xff1a; ALTER USER rootlocalhost IDENTIFIED WITH mysql_native_password BY 123321; FLUSH PRIVILEGES; 须知(给小白看的&#xff01;) …

搜维尔科技:Senseglove Nova 数据手套触觉反馈测试

Senseglove Nova 数据手套触觉反馈测试 搜维尔科技&#xff1a;Senseglove Nova 数据手套触觉反馈测试

PCB板上的Mark点

PCB生产中Mark点设计 1.pcb必须在板长边对角线上有一对应整板定位的Mark点,板上集成电路引脚中心距小于0.65mm的芯片需在集成电路长边对角线上有一对对应芯片定位的Mark点;pcb双面都有贴片件时,则pcb的两面都按此条加Mark点。 2.pcb边需留5mm工艺边(机器夹持PCB最小间距要求…

优立科技:从数字孪生到元宇宙

2021年10月&#xff0c;Facebook致力于发展元宇宙&#xff0c;并更名为Meta。加上此前Roblox上市的消息&#xff0c;让「元宇宙」一词迅速在2021年爆火&#xff0c;刺激了VR、游戏等相关产业的发展&#xff0c;甚至有人宣称2021年为「元宇宙『元年』」。然而&#xff0c;「元年…

YOLOv8的训练、验证、预测及导出[目标检测实践篇]

这一部分内容主要介绍如何使用YOLOv8训练自己的数据集&#xff0c;并进行验证、预测及导出&#xff0c;采用代码和指令的两种方式&#xff0c;参考自官方文档&#xff1a;Detect - Ultralytics YOLOv8 Docs。实践篇不需要关注原理&#xff0c;只需要把流程跑通就行&#xff0c;…

信号产生的五种方式

文章目录 正文前的知识准备kill 命令查看信号man手册查看信号信号的处理方法 认识信号产生的5种方式1. 工具2. 键盘3. 系统调用kill 向任意进程发送任意信号raise 给调用方发送任意信号abort 给调用方发送SIGABRT信号 4. 软件条件5. 异常 正文前的知识准备 kill 命令查看信号 …

Boost的日志库Log使用详解

项目中使用到了Boost日志库&#xff0c;今日来探索一下&#xff1a; #include"boost/log/trivial.hpp" #include"boost/log/sinks/text_file_backend.hpp" #include"boost/log/utility/setup/file.hpp" #include"boost/log/utility/setup/…

构建 WebRTC 一对一信令服务器

构建 WebRTC 一对一信令服务器 构建 WebRTC 一对一信令服务器前言为什么选择 Nodejs&#xff1f;Nodejs 的基本原理浏览器使用 Nodejs安装 Nodejs 和 NPMsocket.io信令服务器搭建信令服务器客户端服务端启动服务器并测试 总结参考 构建 WebRTC 一对一信令服务器 前言 我们在学…

【Node.js从基础到高级运用】二十八、Node.js 内存管理浅析

Node.js 作为一个基于 Chrome V8 引擎的 JavaScript 运行环境&#xff0c;其性能和效率在很大程度上取决于内存管理的优劣。 1. Node.js 内存结构 在深入了解内存管理之前&#xff0c;我们需要先了解 Node.js 的内存结构。Node.js 的内存可以大致分为以下几个部分&#xff1a;…

Jmeter页面汉化和字体显示过小调整

在频繁解压使用Jmeter的时候&#xff0c;经常会遇到需要将页面的英文调整为中文&#xff0c;页面文字和编辑区域内容文字显示较小的问题&#xff0c;记录一下方便以后查阅。 1.页面汉化 Jmeter在解压启动之后页面显示是英文&#xff0c;如果需要修改为中文&#xff0c;可以修改…

uniapp的app端软件更新弹框

1&#xff1a;使用html PLUS实现&#xff1a;地址HTML5 API Reference (html5plus.org)&#xff0c;效果图 2&#xff1a;在app.vue的onLaunch生命周期中&#xff0c;代码如下&#xff1a; onLaunch: function() {let a 0let view new plus.nativeObj.View(maskView, {backg…

【WEEK11】学习目标及总结【Spring Boot】【中文版】

学习目标&#xff1a; 学习SpringBoot 学习内容&#xff1a; 参考视频教程【狂神说Java】SpringBoot最新教程IDEA版通俗易懂员工管理系统 页面国际化登录功能展示员工列表增加员工修改员工信息删除及404处理 学习时间及产出&#xff1a; 第十一周MON~SAT 2024.5.6【WEEK11】…

YOLOv5改进 | 主干篇 | 2024.5全新的移动端网络MobileNetV4改进YOLOv5(含MobileNetV4全部版本改进)

一、本文介绍 本文给大家带来的改进机制是MobileNetV4&#xff0c;其发布时间是2024.5月。MobileNetV4是一种高度优化的神经网络架构&#xff0c;专为移动设备设计。它最新的改动总结主要有两点&#xff0c;采用了通用反向瓶颈&#xff08;UIB&#xff09;和针对移动加速器优化…