Rascal 是一个围绕amqplib 的丰富的 pub/sub 包装器。amqplib 最好的事情之一是它不会对您如何使用它做出假设。另一个是它不尝试抽象AMQP Concepts。因此,该库提供了大量的控制和灵活性,但您有责任采用适当的模式和配置。您需要注意的是:
- 默认情况下,消息不是持久的,如果您的代理重新启动,消息将会丢失
- 导致应用程序崩溃的消息将被无限重试
- 如果没有预取,突然的大量消息可能会破坏您的事件循环
- 断开的连接和中断的通道不会自动恢复
- 任何连接或通道错误都会作为“错误”事件发出。除非您处理它们或使用域,否则它们将导致您的应用程序崩溃
- 如果使用确认通道发布消息,而代理未能确认,则执行流程可能会无限期阻塞
Rascal 试图通过将以下内容添加到amqplib来解决这些问题,使它们更容易处理或引起您的注意
- 配置驱动的虚拟主机、交换器、队列、绑定、生产者和消费者
- 集群连接支持
- 透明内容解析
- 透明加密/解密
- 自动重新连接和重新订阅
- 高级错误处理,包括延迟、有限重试
- 远程过程调用支持
- 再次投递保护
- 通道池
- 流量控制
- 发布超时
- 安全默认值
- Promise 和回调支持
- 时分双工支持
注意:
一、当连接或通道遇到问题时,amqplib会抛出错误事件。Rascal 将监听这些事件,并且如果您使用默认配置,则会尝试自动恢复(重新连接等),但是这些事件可能表明代码中存在错误,因此引起您的注意也很重要。Rascal 通过重新发出错误事件来做到这一点,这意味着如果您不处理它们,它们将冒泡到未捕获的错误处理程序并使您的应用程序崩溃。您应该在四个地方执行此操作:
1.获取broker实例后立即 broker.on('error', console.error);
2.订阅消息后 await broker.subscribe('s1').on('error', console.error)
3.发布消息后 await broker.publish('p1', 'some text').on('error', console.error)
4.转发消息后 await broker.forward('p1', message).on('error', console.error)
二、避免潜在的消息丢失
在三种情况下,Rascal 会在不重新排队的情况下确认消息,从而导致潜在的数据丢失。
1.当无法解析消息内容并且订阅者没有“invalid_content”侦听器时
2.当订阅者的(可选)重新传递限制已被超出并且订阅者既没有“redelivery_error”也没有“redelivery_exceeded”侦听器时
3.当尝试通过重新发布、转发进行恢复时,但恢复操作失败。
Rascal 拒绝消息的原因是因为替代方案是无限期地不确认消息,或者在无限紧密的循环中回滚并重试消息。这可能会对您的应用程序进行 DDOS,并导致您的基础设施出现问题。如果您已正确配置死信队列或侦听“invalid_content”和“redelivery_exceeded”订户事件,您的消息应该是安全的。
config.js
const { MQ_HOST, HOST, MQ_PORT } = process.env;
const mqHost = MQ_HOST || HOST || "127.0.0.1";
const mqPort = MQ_PORT || 5672;
const mqUsername = "root";
const mqPassword = "paasword";const exchangeName = 'exchange_direct_saas'; //交换机
const queueName = 'queue_direct_saas';
const routingKey = 'saasIsolution';//路由keyconst config = {"vhosts": {"/": {"publicationChannelPools": { //使用池通道来发布消息.为每个虚拟主机创建两个池 一个用于确认通道,另一个用于常规通道。但在第一次使用之前不会创建两个池(默认autostart: false)空闲通道会自动从池中驱逐"regularPool": {"max": 10,"min": 5,"evictionRunIntervalMillis": 10000,"idleTimeoutMillis": 60000,"autostart": true},"confirmPool": {"max": 10,"min": 5,"evictionRunIntervalMillis": 10000,"idleTimeoutMillis": 60000,"autostart": true}},"connectionStrategy": "random","connection": {"slashes": true,"protocol": "amqp","hostname": mqHost,"user": mqUsername,"password": mqPassword,"port": mqPort,"vhost": "/","options": {"heartbeat": 10,//心跳时间。 如果你的任务执行时间比较长,调大此配置。 rabbit-server的heartbeat 默认为60"connection_timeout": 10000,"channelMax": 100},"socketOptions": {"timeout": 10000},"management": {"options": {"timeout": 1000}},"retry": {"min": 1000,"max": 60000,"factor": 2,"strategy": "exponential" //exponential:指数配置将导致 rascal 以指数增加的间隔(最多一分钟)重试连接。间隔会随机调整,这样如果您有多个服务,它们就不会同时重新连接。 linear: 线性配置将导致 rascal 以线性增加的间隔(一到五秒之间)重试连接}},"exchanges": {//定义exchange[exchangeName]: {"type": "direct","options": {"durable": true}}},"queues": { //定义queue[queueName]: {"options": {"autoDelete": false,"durable": true}}},"bindings": {//定义binding"b1": {"source": exchangeName,"destination": queueName,"destinationType": "queue","bindingKey": routingKey}}}},"subscriptions": {//订阅消息 "s1": {"queue": queueName,"vhost": "/","prefetch": 1,"retry": {"delay": 1000}}},"publications": {//发布消息"p1": {"vhost": "/","exchange": exchangeName,"routingKey": routingKey,"confirm": true,"options": {"persistent": true}}}
}
module.exports = config;
生产者端
const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
const definitions = require('./config.js');
const { getInitParams } = require('../lib')
const params = getInitParams();async function product(msg) {let broker;try {broker = await Broker.create(withDefaultConfig(definitions));//withDefaultConfig附带了适用于生产和测试环境的合理默认值(针对可靠性而不是速度进行了优化)broker.on('error', console.error);// Publish a messageconst publication = await broker.publish('p1', msg);console.log("生产者消息发送完毕");publication.on('error', console.error);} catch (err) {console.error(err);}finally{await broker?.shutdown();}
}product(JSON.stringify(params));
消费者端
const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
const definitions = require('./config.js');
const { getDoIsolation, getDoClear } = require("../lib");async function consumer(i) {try {const broker = await Broker.create(withDefaultConfig(definitions));//withDefaultConfig附带了适用于生产和测试环境的合理默认值(针对可靠性而不是速度进行了优化)broker.on('error', error => { console.error(error, "broker Error"); });// Consume a messageconst subscription = await broker.subscribe('s1'); //subscription 不存在会抛出异常subscription.on('message', async(message, content, ackOrNack) => {const params = JSON.parse(content);const doIsolation = getDoIsolation(params);console.log(`消费者${i}`, params, doIsolation);await doIsolation();ackOrNack();}).on('error', error => { console.error("subscribe Error",error); }).on('invalid_content', (err, message, ackOrNack) => { //若无法解析内容(例如,消息的内容类型为“application/json”,但内容不是json),它将发出“invalid_content”事件console.error('Invalid content', err);ackOrNack(err);//默认nack 策略}).on('redeliveries_exceeded', (err, message, ackOrNack) => { //如果重新传递的数量超过订阅者限制,订阅者将发出“redelivery_exceeded”事件,并且可以由您的应用程序处理console.error('Redeliveries exceeded', err);ackOrNack(err, [{ strategy: 'republish', defer: 1000, attempts: 10 }, { strategy: 'nack' }]); //将消息重新发布回其来自的队列。 当指定尝试次数时,始终应该链接一个后备策略,否则如果超出尝试次数,您的消息将不会被确认或拒绝});} catch (err) {console.error("其他Error",err);}console.log(`消费端${i}启动成功`)
}for(i=0; i<=2; i++){consumer(i)
}