nodejs 操作rabbitMQ rascal库(针对amqplib的封装)

Rascal 是一个围绕amqplib 的丰富的 pub/sub 包装器。amqplib 最好的事情之一是它不会对您如何使用它做出假设。另一个是它不尝试抽象AMQP Concepts。因此,该库提供了大量的控制和灵活性,但您有责任采用适当的模式和配置。您需要注意的是:

  • 默认情况下,消息不是持久的,如果您的代理重新启动,消息将会丢失
  • 导致应用程序崩溃的消息将被无限重试
  • 如果没有预取,突然的大量消息可能会破坏您的事件循环
  • 断开的连接和中断的通道不会自动恢复
  • 任何连接或通道错误都会作为“错误”事件发出。除非您处理它们或使用域,否则它们将导致您的应用程序崩溃
  • 如果使用确认通道发布消息,而代理未能确认,则执行流程可能会无限期阻塞

Rascal 试图通过将以下内容添加到amqplib来解决这些问题,使它们更容易处理或引起您的注意

  • 配置驱动的虚拟主机、交换器、队列、绑定、生产者和消费者
  • 集群连接支持
  • 透明内容解析
  • 透明加密/解密
  • 自动重新连接和重新订阅
  • 高级错误处理,包括延迟、有限重试
  • 远程过程调用支持
  • 再次投递保护
  • 通道池
  • 流量控制
  • 发布超时
  • 安全默认值
  • Promise 和回调支持
  • 时分双工支持

注意:

一、当连接或通道遇到问题时,amqplib会抛出错误事件。Rascal 将监听这些事件,并且如果您使用默认配置,则会尝试自动恢复(重新连接等),但是这些事件可能表明代码中存在错误,因此引起您的注意也很重要。Rascal 通过重新发出错误事件来做到这一点,这意味着如果您不处理它们,它们将冒泡到未捕获的错误处理程序并使您的应用程序崩溃。您应该在四个地方执行此操作:

1.获取broker实例后立即 broker.on('error', console.error);

2.订阅消息后 await broker.subscribe('s1').on('error', console.error)

3.发布消息后 await broker.publish('p1', 'some text').on('error', console.error)

4.转发消息后 await broker.forward('p1', message).on('error', console.error)

二、避免潜在的消息丢失

在三种情况下,Rascal 会在不重新排队的情况下确认消息,从而导致潜在的数据丢失。

1.当无法解析消息内容并且订阅者没有“invalid_content”侦听器时

2.当订阅者的(可选)重新传递限制已被超出并且订阅者既没有“redelivery_error”也没有“redelivery_exceeded”侦听器时

3.当尝试通过重新发布、转发进行恢复时,但恢复操作失败。

Rascal 拒绝消息的原因是因为替代方案是无限期地不确认消息,或者在无限紧密的循环中回滚并重试消息。这可能会对您的应用程序进行 DDOS,并导致您的基础设施出现问题。如果您已正确配置死信队列或侦听“invalid_content”和“redelivery_exceeded”订户事件,您的消息应该是安全的。

config.js

const { MQ_HOST, HOST, MQ_PORT } = process.env;
const mqHost = MQ_HOST || HOST || "127.0.0.1";
const mqPort = MQ_PORT || 5672;
const mqUsername = "root";
const mqPassword = "paasword";const exchangeName = 'exchange_direct_saas'; //交换机
const queueName = 'queue_direct_saas';
const routingKey = 'saasIsolution';//路由keyconst config = {"vhosts": {"/": {"publicationChannelPools": { //使用池通道来发布消息.为每个虚拟主机创建两个池 一个用于确认通道,另一个用于常规通道。但在第一次使用之前不会创建两个池(默认autostart: false)空闲通道会自动从池中驱逐"regularPool": {"max": 10,"min": 5,"evictionRunIntervalMillis": 10000,"idleTimeoutMillis": 60000,"autostart": true},"confirmPool": {"max": 10,"min": 5,"evictionRunIntervalMillis": 10000,"idleTimeoutMillis": 60000,"autostart": true}},"connectionStrategy": "random","connection": {"slashes": true,"protocol": "amqp","hostname": mqHost,"user": mqUsername,"password": mqPassword,"port": mqPort,"vhost": "/","options": {"heartbeat": 10,//心跳时间。 如果你的任务执行时间比较长,调大此配置。 rabbit-server的heartbeat 默认为60"connection_timeout": 10000,"channelMax": 100},"socketOptions": {"timeout": 10000},"management": {"options": {"timeout": 1000}},"retry": {"min": 1000,"max": 60000,"factor": 2,"strategy": "exponential" //exponential:指数配置将导致 rascal 以指数增加的间隔(最多一分钟)重试连接。间隔会随机调整,这样如果您有多个服务,它们就不会同时重新连接。 linear: 线性配置将导致 rascal 以线性增加的间隔(一到五秒之间)重试连接}},"exchanges": {//定义exchange[exchangeName]: {"type": "direct","options": {"durable": true}}},"queues": { //定义queue[queueName]: {"options": {"autoDelete": false,"durable": true}}},"bindings": {//定义binding"b1": {"source": exchangeName,"destination": queueName,"destinationType": "queue","bindingKey": routingKey}}}},"subscriptions": {//订阅消息 "s1": {"queue": queueName,"vhost": "/","prefetch": 1,"retry": {"delay": 1000}}},"publications": {//发布消息"p1": {"vhost": "/","exchange": exchangeName,"routingKey": routingKey,"confirm": true,"options": {"persistent": true}}}
}
module.exports = config;

生产者端

const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
const definitions = require('./config.js');
const { getInitParams } = require('../lib')
const params = getInitParams();async function product(msg) {let broker;try {broker = await Broker.create(withDefaultConfig(definitions));//withDefaultConfig附带了适用于生产和测试环境的合理默认值(针对可靠性而不是速度进行了优化)broker.on('error', console.error);// Publish a messageconst publication = await broker.publish('p1', msg);console.log("生产者消息发送完毕");publication.on('error', console.error);} catch (err) {console.error(err);}finally{await broker?.shutdown();}
}product(JSON.stringify(params));

消费者端

const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
const definitions = require('./config.js');
const { getDoIsolation, getDoClear } = require("../lib");async function consumer(i) {try {const broker = await Broker.create(withDefaultConfig(definitions));//withDefaultConfig附带了适用于生产和测试环境的合理默认值(针对可靠性而不是速度进行了优化)broker.on('error', error => { console.error(error, "broker Error"); });// Consume a messageconst subscription = await broker.subscribe('s1'); //subscription 不存在会抛出异常subscription.on('message', async(message, content, ackOrNack) => {const params = JSON.parse(content);const doIsolation = getDoIsolation(params);console.log(`消费者${i}`, params, doIsolation);await doIsolation();ackOrNack();}).on('error', error => { console.error("subscribe Error",error); }).on('invalid_content', (err, message, ackOrNack) => { //若无法解析内容(例如,消息的内容类型为“application/json”,但内容不是json),它将发出“invalid_content”事件console.error('Invalid content', err);ackOrNack(err);//默认nack 策略}).on('redeliveries_exceeded', (err, message, ackOrNack) => { //如果重新传递的数量超过订阅者限制,订阅者将发出“redelivery_exceeded”事件,并且可以由您的应用程序处理console.error('Redeliveries exceeded', err);ackOrNack(err, [{ strategy: 'republish', defer: 1000, attempts: 10 }, { strategy: 'nack' }]); //将消息重新发布回其来自的队列。 当指定尝试次数时,始终应该链接一个后备策略,否则如果超出尝试次数,您的消息将不会被确认或拒绝});} catch (err) {console.error("其他Error",err);}console.log(`消费端${i}启动成功`)
}for(i=0; i<=2; i++){consumer(i)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/133733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Python OpenCV的金铲铲自动进游戏、D牌...

基于Python OpenCV的金铲铲自动进游戏、D牌... 1. 自动点击进入游戏1.1 环境准备1.2 功能实现2. 自动D牌3. 游戏结束自动退1. 自动点击进入游戏 PS: 本测试只用于交流学习OpenCV的相关知识,不能用于商业用途,后果自负。 1.1 环境准备 需要金铲铲在win10的模拟器,我们这里选…

Vue 3 中,watch 和 watchEffect 的区别

结论先行&#xff1a; watch&#xff1a;需要指明要监听的数据&#xff0c;而且在回调函数中可以获取到属性变化的前后值&#xff1b; 适用于需要精确控制监视范围的情况&#xff1b;也就是需要针对特定数据变化执行操作。 watchEffect&#xff1a;不用指明监听哪个属性&…

python单元测试框架(继承、unittest参数化、断言、测试报告)

一、继承 继承能解决什么问题&#xff1f; unittest每个模块都要用到前提条件以及清理&#xff0c;如果有上百个模块&#xff0c;我们要改域名和浏览器&#xff0c;就会工作量很大特别麻烦&#xff0c;这时我们可以用继承的思想只用改一次 我们可以将前提和清理提出来单独放…

新登录接口独立版变现宝升级版知识付费小程序-多领域素材资源知识变现营销系统

源码简介&#xff1a; 资源入口 点击进入 源码亲测无bug&#xff0c;含前后端源码&#xff0c;非线传&#xff0c;修复最新登录接口 梦想贩卖机升级版&#xff0c;变现宝吸取了资源变现类产品的很多优点&#xff0c;摒弃了那些无关紧要的东西&#xff0c;使本产品在运营和变现…

MVC、MVP、MVVM区别

MVC、MVP、MVVM区别 MVC&#xff08;Model-View-Controller&#xff09; 。是一种设计模式&#xff0c;通常用于组织与应用程序的数据流。它通常包括三个组件&#xff1a;模型&#xff08;Model&#xff09;、视图&#xff08;View&#xff09;和控制器&#xff08;Controller&…

TDengine 上榜 BenchCouncil 全球首个开源贡献榜

近日&#xff0c;Bench Council&#xff08;国际测试委员会&#xff09;公布了“世界首个开源贡献榜”&#xff0c;该榜单号称“只以贡献分高下”。值得一提的是&#xff0c;涛思数据、TDengine 上榜 BenchCouncil 发布的开源计算机系统机构榜、成果榜&#xff0c;TDengine 创始…

SQL语句性能优化

1、查询 SQL 尽量不要使用 select *,而是 select 具体字段 反例子: select * from sys_user; 正例子: select id,name from sys_user; 理由如下: 只取需要的字段,节省资源、减少网络开销。select * 进行查询时,很可能就不会使用到覆盖索引了,就会造成回表查询。…

CDN策略好坏的重要性

CDN加速技术在今天的互联网世界中扮演着至关重要的角色&#xff0c;它可以显著提高网站和应用程序的性能&#xff0c;同时也有助于提供更好的安全性。然而&#xff0c;设定安全策略的好坏对CDN的影响是一个关键的议题&#xff0c;本文将深入探讨这个问题。 CDN&#xff08;内容…

pdfH5实现pdf预览功能

1.引入 npm install pdfh5 2.使用 <view id"pdfBox" class""></view> showPdf(url) {this.pdfh5 new Pdfh5("", {URIenable: false,zoomEnanle: true,maxZoom: 2,pdfurl: url})this.pdfh5.on("complete", function(st…

2、Sentinel基本应用限流规则(2)

2.2.1 是什么 Sentinel 是阿里中间件团队开源的&#xff0c;面向分布式服务架构的轻量级高可用流量控制组件&#xff0c;主要以流量为切入点&#xff0c;从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。 2.2.2 基本概念 • 资源 (需要被保护的东西…

ASP.net C# 用Aspose.pdf实现pdf合并

直接上代码&#xff0c;供参考&#xff0c;备忘&#xff01; using System; using System.Collections.Generic; using System.Web; using System.Web.UI; using System.Web.UI.WebControls; using System.Data; using System.Data.SqlClient; using System.Xml; using System…

盘点 MySQL 创建内部临时表的所有场景

作者总结了 MySQL 中所有触发使用内部临时表的场景。 作者&#xff1a;刘嘉浩&#xff0c;爱可生团队 DBA 成员&#xff0c;重度竞技游戏爱好者。 爱可生开源社区出品&#xff0c;原创内容未经授权不得随意使用&#xff0c;转载请联系小编并注明来源。 本文约 2000 字&#xff…

宝马——使用人工智能制造和驾驶汽车

德国汽车制造商宝马(BMW)每年在全球制造和销售250万台汽车&#xff0c;其品牌包括宝马、MINI和劳斯莱斯。 宝马汽车以其卓越的性能和对新技术的应用而著名&#xff0c;它是道路上最精致的汽车之一&#xff0c;并且和其竞争对手戴姆勒(Daimler)一样&#xff0c;在将自动驾驶汽车…

Redis中的Zset类型

目录 Zset的相关命令 zadd zrange zcard zcount zrevrange zrangebyscore zpopmax bzpopmax zpopmin和bzpopmin zrank zrevrank zscore zrem zremrangebyrank zremrangebyscore 操作集合间的命令 zinterstore和zunionstore 内部编码 Zset的应用场景 Zset表…

独立键盘接口设计(Keil+Proteus)

前言 软件的操作参考这篇博客。 LED数码管的静态显示与动态显示&#xff08;KeilProteus&#xff09;-CSDN博客https://blog.csdn.net/weixin_64066303/article/details/134101256?spm1001.2014.3001.5501实验&#xff1a;用4个独立按键控制8个LED指示灯。 按下k1键&#x…

数字化转型需要RPA,那么RPA如何落地?

首先&#xff0c;我们来探讨一下RPA为什么这么重要&#xff0c;并非一个简单的自动化脚本就可以代替的。 主要从高效性、准确性、稳定性三方面体现RPA流程自动化的价值。 一、高效性 相比人类员工&#xff0c;机器人可以以飞快的速度完成工作&#xff0c;而且不需要休息或中…

Mysql进阶-视图篇

介绍 视图&#xff08;View&#xff09;是一种虚拟存在的表。视图中的数据并不在数据库中实际存在&#xff0c;行和列数据来自定义视图的查询中使用的表&#xff0c;并且是在使用视图时动态生成的。 通俗的讲&#xff0c;视图只保存了查询的SQL逻辑&#xff0c;不保存查询结果。…

Si4010 一款带有MCU SoC RF发射机芯片 无线遥控器

Si4010是一款完全集成的SoC RF发射机&#xff0c;带有嵌入式CIP-51 8051 MCU&#xff0c;专为1GHz以下ISM频带设计。该芯片针对电池供电的应用进行了优化&#xff0c;工作电压为1.8至3.6 V&#xff0c;待机电流小于10 nA的超低电流消耗。高功率放大器可提供高达10 dBm的输出功率…

手术训练系统项目

★ 手术训练系统项目 项目描述&#xff1a;手术训练系统&#xff0c;它提供了多项功能&#xff0c;包括账户登录与创建、数据库与账户管理、课程管理、小组管理、成绩统计、证书发布、训练和系统设置。 职责描述: 1、训练功能开发&#xff08;任务概述、任务指导、评分规则、评…

【数据结构】手撕单链表

目录 前言 1 链表 1.1 链表的概念及结构 1.2 链表的分类 1.2.1 单向或者双向 1.2.2 带头或者不带头 1.2.3 循环或者非循环 1.2.4 无头单向非循环链表 1.2.5 带头双向循环链表 2 链表的实现 2.1 结构 2.2 结点的创建 2.3 尾插 2.4 头插 2.5 尾删 2.6 头删 2.7 …