在nodejs中使用RabbitMQ(六)sharding消息分片

RabbitMQ 的分片插件(rabbitmq_sharding)允许将消息分布到多个队列中,这在消息量很大或处理速度要求高的情况下非常有用。分片功能通过将消息拆分到多个队列中来平衡负载,从而提升消息处理的吞吐量和可靠性。它能够在多个队列之间分配负载,避免单个队列过载。(注:不能单独消费分片消息。消息分片不利于消息顺序区分)

启用消息分片插件。 

rabbitmq-plugins enable rabbitmq_sharding 

示例

通过rabbitmq management添加策略,用于分片消息匹配转发。

或者通过命令添加策略 

CTL set_policy images-shard "queue10" '{"shards-per-node": 3, "routing-key": "sharding"}'

producer.ts

import RabbitMQ from 'amqplib';async function start() {try {const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60");conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");let channel = null;try {channel = await conn.createChannel();} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}const exchangeName = 'exchange_queue10';await channel.assertExchange(exchangeName,'x-modulus-hash',{durable: true,arguments: {'x-modulus': 3 // 分片数量(需与队列分片数匹配)}},);let routeKey = '';for (let i = 0; i < 1000; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));let num = Math.ceil(Math.random() * 100000);console.log('消息发送是否成功', num, routeKey, channel.publish(exchangeName,`${routeKey}${i}`,Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),{persistent: true,},));}setTimeout(() => {conn.close();process.exit(0);}, 1000);} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}
}start();

consumer.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672//mirror', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const exchangeName = 'exchange_queue10';channel.prefetch(32);// for(let i=0;i<3;++i){//   channel.assertQueue(queueName, { durable: true }, () => {//     channel.bindQueue(queueName, exchangeName, `shard_${shardId}`);//   });// }channel.consume(exchangeName, function (msg) {if(msg){console.log(`队列'${exchangeName}'接收到的消息`, msg?.content.toString());// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}}, (err: any, ok: Replies.Empty) => {console.log(err, ok);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/895592.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1.7 AI智能体实战指南:从单任务自动化到企业级智能体集群架构

AI智能体实战指南:从单任务自动化到企业级智能体集群架构 一、智能体技术演进:从脚本工具到认知革命的跨越 1.1 三代智能体能力对比 能力维度第一代(规则驱动)第二代(机器学习)第三代(LLM驱动)任务理解固定模式匹配统计模式识别语义推理与逻辑链分解环境适应需人工重写…

Github 2025-02-14 Java开源项目日报 Top10

根据Github Trendings的统计,今日(2025-02-14统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Java项目10C#项目1Guava: 谷歌Java核心库 创建周期:3725 天开发语言:Java协议类型:Apache License 2.0Star数量:49867 个Fork数量:10822 次…

C++17中的clamp函数

一、std::clamp() 其实在前面简单介绍过这个函数&#xff0c;但当时只是一个集中的说明&#xff0c;为了更好的理解std::clamp的应用&#xff0c;本篇再详细进行阐述一次。std::clamp在C17中其定义的方式为&#xff1a; template< class T > constexpr const T& cl…

WEB安全--SQL注入--常见的注入手段

一、联表查询&#xff1a; 1.1原理&#xff1a; 当payload参数被后端查询语句接收到时&#xff0c;其中的非法语句通过union关联显示出其他的数据 1.2示例&#xff1a; #payload: -1 and union select 1,2,database()--#query: $sqlselect * from users where id-1 and union …

QT笔记——QPlainTextEdit

文章目录 1、概要2、文本设计2.1、设置文本2.1、字体样式&#xff08;大小、下划线、加粗、斜体&#xff09; 1、概要 QPlainTextEdit 是 Qt 框架中用于处理纯文本编辑的控件&#xff0c;具有轻量级和高效的特点&#xff0c;以下是它常见的应用场景&#xff1a; 文本编辑器&am…

【D2】神经网络初步学习

总结&#xff1a;学习了 PyTorch 中的基本概念和常用功能&#xff0c;张量&#xff08;Tensor&#xff09;的操作、自动微分&#xff08;Autograd&#xff09;、正向传播、反向传播。通过了解认识LeNet 模型&#xff0c;定义神经网络类&#xff0c;熟悉卷积神经网络的基本结构和…

DeepSeek处理自有业务的案例:让AI给你写一份小众编辑器(EverEdit)的语法着色文件

1 DeepSeek处理自有业务的案例&#xff1a;让AI给你写一份小众编辑器(EverEdit)的语法着色文件 1.1 背景 AI能力再强&#xff0c;如果不能在企业的自有业务上产生助益&#xff0c;那基本也是一无是处。将企业的自有业务上传到线上训练&#xff0c;那是脑子进水的做法&#xff…

DeepSeek教unity------MessagePack-05

动态反序列化 当调用 MessagePackSerializer.Deserialize<object> 或 MessagePackSerializer.Deserialize<dynamic> 时&#xff0c;二进制数据中存在的任何值都将被转换为基本值&#xff0c;即 bool、char、sbyte、byte、short、int、long、ushort、uint、ulong、…

C++入门之《拷贝构造函数》详解

拷贝构造函数是构造函数的一个重载 拷贝构造函数是特殊的构造函数&#xff0c;用于基于已存在对象创建新对象。比如定义一个 Person 类&#xff1a; class Person { private:std::string name;int age; public:Person(const std::string& n, int a) : name(n), age(a…

Ollama命令使用指南

Ollama 命令使用指南 Ollama 命令使用指南1. Ollama 命令概览2. Ollama 命令详解2.1 启动 Ollama2.2 创建模型2.3 查看模型信息2.4 运行模型2.5 停止运行的模型2.6 从注册表拉取模型2.7 推送模型到注册表2.8 列出本地模型2.9 查看正在运行的模型2.10 复制模型2.11 删除模型 3. …

为什么配置Redis时候要序列化配置呢

序列化和反序列化&#xff1f;&#xff1a; 序列化&#xff1a;将对象转换为二进制数据&#xff0c;以便存储到Redis中。 反序列化&#xff1a;将Redis中的二进制数据转换回对象&#xff0c;以便在应用程序中使用。 1. 默认序列化器的问题 如果不配置序列化器&#xff0c;Re…

【问】强学如何支持 迁移学习呢?

案例&#xff1a;从CartPole-v1迁移到MountainCar-v0 在源环境&#xff08;CartPole-v1&#xff09;中训练模型 首先&#xff0c;我们使用DQN算法在CartPole-v1环境中训练一个强化学习模型。以下是代码示例&#xff1a; import gym import torch import torch.nn as nn impor…

深入浅出Java反射:掌握动态编程的艺术

小程一言反射何为反射反射核心类反射的基本使用获取Class对象创建对象调用方法访问字段 示例程序应用场景优缺点分析优点缺点 注意 再深入一些反射与泛型反射与注解反射与动态代理反射与类加载器 结语 小程一言 本专栏是对Java知识点的总结。在学习Java的过程中&#xff0c;学习…

【算法与数据结构】并查集详解+题目

目录 一&#xff0c;什么是并查集 二&#xff0c;并查集的结构 三&#xff0c;并查集的代码实现 1&#xff0c;并查集的大致结构和初始化 2&#xff0c;find操作 3&#xff0c;Union操作 4&#xff0c;优化 小结&#xff1a; 四&#xff0c;并查集的应用场景 省份…

C语言简单练习题

文章目录 练习题一、计算n的阶乘bool类型 二、计算1!2!3!...10!三、计算数组arr中的元素个数二分法查找 四、动态打印字符Sleep()ms延时函数system("cls")清屏函数 五、模拟用户登录strcmp()函数 六、猜数字小游戏产生一个随机数randsrandRAND_MAX时间戳time() 示例 …

ShenNiusModularity项目源码学习(8:数据库操作)

ShenNiusModularity项目使用SqlSugar操作数据库。在ShenNius.Repository项目中定义了ServiceCollectionExtensions.AddSqlsugarSetup函数注册SqlSugar服务&#xff0c;并在ShenNius.Admin.API项目的ShenniusAdminApiModule.OnConfigureServices函数中调用&#xff0c;SqlSugar所…

MATLAB图像处理:图像特征概念及提取方法HOG、SIFT

图像特征是计算机视觉中用于描述图像内容的关键信息&#xff0c;其提取质量直接影响后续的目标检测、分类和匹配等任务性能。本文将系统解析 全局与局部特征的核心概念&#xff0c;深入讲解 HOG&#xff08;方向梯度直方图&#xff09;与SIFT&#xff08;尺度不变特征变换&…

java枚举类型的查找

AllArgsConstructor Getter public enum FileFilterRangeEnum {FILE_NAME("文件名称","fileName"),FILE_CONTENT("文件内容","fileContent");private final String text;private final String value;// 根据传入的字符串值查找对应的枚…

小白win10安装并配置yt-dlp

需要yt-dlp和ffmpeg 注意存放路径最好都是全英文 win10安装并配置yt-dlp 一、下载1.下载yt-dlp2. fffmpeg下载 二、配置环境三、cmd操作四、yt-dlp下视频操作 一、下载 1.下载yt-dlp yt-dlp地址 找到win的压缩包点下载&#xff0c;并解压 2. fffmpeg下载 ffmpeg官方下载 …

【技术解析】MultiPatchFormer:多尺度时间序列预测的全新突破

今天给我大家带来一篇最新的时间序列预测论文——MultiPatchFormer。这篇论文提出了一种基于Transformer的创新模型&#xff0c;旨在解决时间序列预测中的关键挑战&#xff0c;特别是在处理多尺度时间依赖性和复杂通道间相关性时的难题。MultiPatchFormer通过引入一维卷积技术&…