插件rabbitmq_delayed_message_exchange是RabbitMQ官方提供的一种用于实现延迟消息的解决方案。该插件将交换机类型扩展至x-delayed-message,这种类型的交换机能够将消息暂时挂起,直到设定的延迟时间到达,才将消息投递到绑定的队列中。这一特性使得RabbitMQ能够轻松处理延迟消息的场景,无需额外的业务逻辑来定时检查和触发消息的投递。
插件需要在服务端安装并开启后使用。
消息发送:生产者向一个x-delayed-message类型的交换机发送消息,同时在消息属性中设置x-delay头,表示消息应延迟的时间(单位:毫秒)。
延迟处理:交换机接收到消息后,不会立即投递给队列,而是将其挂起,等待设定的延迟时间。在此期间,消息处于未投递状态。
消息投递:一旦达到延迟时间,交换机会将消息投递给与之绑定的队列。此时,消息的行为就像普通消息一样,可以被消费者消费。
消息消费:消费者从队列中拉取消息,执行相应的业务逻辑。
1、生产者:在service文件夹下建立rabbitmq.service.ts文件,通过调用sendDelayOrderToExchange方法发送消息,x-delay 设置延时时间 单位ms
import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config, Inject } from '@midwayjs/decorator';
import * as amqp from 'amqp-connection-manager';
import { ChannelWrapper, AmqpConnectionManager } from 'amqp-connection-manager';import * as dayjs from 'dayjs';const OPTIONS = { durable: true, autoDelete: true }; // 队列opts
const EXCHANGE_CHARGE_DELAY = 'exchange.charge.delay'; // 延时订单
const QUEUE_CHARGE_DELAY = 'queue.charege.delay';@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
export class RabbitmqService {private connection: AmqpConnectionManager;private channelWrapper: ChannelWrapper;@Config('rabbitmq')mqConfig;@Inject()logger;@Init()async connect() {// 创建连接,你可以把配置放在 Config 中,然后注入进来this.connection = await amqp.connect(this.mqConfig);// 创建 channelthis.channelWrapper = await this.connection.createChannel({json: true,setup: function (channel) {return Promise.all([// 延时Exchangechannel.assertExchange(EXCHANGE_CHARGE_DELAY, 'x-delayed-message', {durable: true,autoDelete: true,arguments: {'x-delayed-type': 'direct',},}),channel.assertQueue(QUEUE_CHARGE_DELAY, OPTIONS),// 队列channel.bindQueue(QUEUE_CHARGE_DELAY, EXCHANGE_CHARGE_DELAY, 'DELAY_ORDER'),// 绑定交换机]);},});}// 发送预约订单public async sendDelayOrderToExchange(message: string) {this.logger.info(`发送延时订单:${message} 当前时间:${dayjs().format('YYYY-MM-DD HH:mm:ss')}`);await this.channelWrapper.publish(EXCHANGE_CHARGE_DELAY, 'DELAY_ORDER', message, {headers: { 'x-delay': 10 * 1000 },// 延时时间 单位毫秒});}@Destroy()async close() {await this.channelWrapper.close();await this.connection.close();}
}
2、消费者:在consumer文件夹下新建mq.consumer.ts,通过监听延时队列接受消息
import { Consumer, MSListenerType, RabbitMQListener, Inject } from '@midwayjs/decorator';
import { ConsumeMessage } from 'amqplib';
import { Context } from '@midwayjs/rabbitmq';import * as dayjs from 'dayjs';const QUEUE_CHARGE_DELAY = 'queue.charege.delay';@Consumer(MSListenerType.RABBITMQ)
export class UserConsumer {@Inject()ctx: Context;@Inject()logger;@RabbitMQListener(QUEUE_CHARGE_DELAY, {durable: true,autoDelete: true,})async delayOrder(msg: ConsumeMessage) {if (msg && msg.content) {const id = msg.content.toString('utf-8');this.logger.info(`预约订单号:${id} 当前时间:${dayjs().format('YYYY-MM-DD HH:mm:ss')}`);}}
}
在configuration.ts文件中调用测试
import { Configuration, App } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
.....
.....
.....
export class ContainerLifeCycle {@App()app: koa.Application;@Inject()rabbitmqService: RabbitmqService;async onReady() {await this.rabbitmqService.sendDelayOrderToExchange('123456789');}
}