rabbitmq 延迟队列_Delayed Message 插件实现 RabbitMQ 延迟队列

319cfc0030c8af608c0e4b76c0c72e3e.png

延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行。

DLX + TTL 方式存在的时序问题

对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有介绍过如何使用 RabbitMQ 死信队列(DLX) + TTL 的方式来模拟实现延迟队列,这也是通常的一种做法,可参见我的另一篇文章 利用 RabbitMQ 死信队列和 TTL 实现定时任务。

今天我想说的是这种方式会存在一个时序问题,看下图:4b535f57705f3e9099f7e3570a6a1508.png

左侧队列 queue1 分别两条消息 msg1、msg2 过期时间都为 1s,输出顺序为 msg1、msg2 是没问题的

右侧队列 queue2 分别两条消息 msg1、msg2 注意问题来了,msg2 的消息过期时间为 1S 而 msg1 的消息过期为 2S,你可能想谁先过期就谁先消费呗,显然不是这样的,因为这是在同一个队列,必须前一个消费,第二个才能消费,所以就出现了时序问题

如果你的消息过期时间是有规律的,例如,有的 1S、有的 2S,那么我们可以以时间为维度设计为两个队列,如下所示:92544ad7a70b18481c9bebf8aac78797.png

上面我们将 1S 过期的消息拆分为队列 queue_1s,2S 过期的消息拆分为队列 queue_2s,事情得到进一步解决。如果此时消息的过期时间不确定或者消息过期时间维度过多,在消费端我们就要去监听多个消息队列且对于消息过期时间不确定的也是很难去设计的。

针对消息无序的不妨看下以下解决方案。

Delayed Message 插件

这里要感谢 @神奇的包子,掘金(juejin.im/user/5bfc1b9d6fb9a049b347a9e2) 提出的 Delayed Message 插件方案。

这里将使用的是一个 RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange,目前维护在 RabbitMQ 插件社区,我们可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。1b6ae02eddc9223ca72b21a18ed40bb2.png

实现原理

上面使用 DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。目前资料介绍的不是很多)

这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。

插件安装

根据你的 RabbitMQ 版本来安装相应插件版本,RabbitMQ community-plugins 上面有版本对应信息可参考。

注意:需要 RabbitMQ 3.5.3 和更高版本。

# 注意要下载至你的 RabbitMQ 服务器的 plugins 目录下,例如:/usr/local/rabbitmq/plugins

wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

# 解压
unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

# 解压之后得到如下文件
rabbitmq_delayed_message_exchange-20171215-3.6.x.ez

启用插件

使用 rabbitmq-plugins enable 命令启用插件,启动成功会看到如下提示:

$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
The following plugins have been enabled:
rabbitmq_delayed_message_exchange

Applying plugin configuration to rabbit@xxxxxxxx... started 1 plugin.

管理控制台声明 x-delayed-message 交换机

在开始代码之前先打开 RabbitMQ 的管理 UI 界面,声明一个 x-delayed-message 类型的交换机,否则你会遇到下面的错误:

Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type"

这个问题困扰我了一会儿,详情可见 Github Issues rabbitmq-delayed-message-exchange/issues/19,正确操作如下图所示:b15448b4ae832ba159afb6aacede26c7.png

Nodejs 代码实践

上面准备工作完成了,开始我们的代码实践吧,官方没有提供 Nodejs 示例,只提供了 Java 示例,对于一个写过 Spring Boot 项目的 Nodeer 这不是问题(此处,兄得你有点飘了啊 smiley_38.png)其实如果有时间能多了解点些,你会发现还是有益的。

构建生产者

几个注意点:

  • 交换机类型一定要设置为 x-delayed-message
  • 设置 x-delayed-type 为 direct,当然也可以是 topic 等
  • 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递
const amqp = require('amqplib');

async function producer(msg, expiration) {
try {
const connection = await amqp.connect('amqp://localhost:5672');
const exchange = 'my-delayed-exchange';
const exchangeType = 'x-delayed-message'; // x-delayed-message 交换机的类型
const routingKey = 'my-delayed-routingKey';

const ch = await connection.createChannel();
await ch.assertExchange(exchange, exchangeType, {
durable: true,
'x-delayed-type': 'direct'
});

console.log('producer msg:', msg);
await ch.publish(exchange, routingKey, Buffer.from(msg), {
headers: {
'x-delay': expiration, // 一定要设置,否则无效
}
});

ch.close();
} catch(err) {
console.log(err)
}
}

producer('msg0 1S Expire', 1000) // 1S
producer('msg1 30S Expire', 1000 * 30) // 30S
producer('msg2 10S Expire', 1000 * 10) // 10S
producer('msg3 5S Expire', 1000 * 5) // 5S

构建消费端

消费端改变不大,交换机声明处同生产者保持一样,设置交换机类型(x-delayed-message)和 x-delayed-type

const amqp = require('amqplib');

async function consumer() {
const exchange = 'my-delayed-exchange';
const exchangeType = 'x-delayed-message';
const routingKey = 'my-delayed-routingKey';
const queueName = 'my-delayed-queue';

try {
const connection = await amqp.connect('amqp://localhost:5672');
const ch = await connection.createChannel();

await ch.assertExchange(exchange, exchangeType, {
durable: true,
'x-delayed-type': 'direct'
});
await ch.assertQueue(queueName);
await ch.bindQueue(queueName, exchange, routingKey);
await ch.consume(queueName, msg => {
console.log('consumer msg:', msg.content.toString());
}, { noAck: true });
} catch(err) {
console.log('Consumer Error: ', err);
}
}

consumer()

以上示例源码地址:

https://github.com/Q-Angelo/project-training/tree/master/rabbitmq/rabbitmq-delayed-message-node

最后,让我们对以上程序做个测试,左侧窗口展示了生产端信息,右侧窗口展示了消费端信息,这次实现了同一个队列里不同过期时间的消息,可以按照我们预先设置的 TTL 时间顺序性消费,我们的目的达到了。7ea8260b58a798dacc441f70ba39a012.png

局限性

Delayed Message 插件实现 RabbitMQ 延迟队列这种方式也不完全是一个银弹,它将延迟消息存在于 Mnesia 表中,并且在当前节点上具有单个磁盘副本,它们将在节点重启之后幸存。

目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景,详情参见 #/issues/72 另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。

插件的禁用要慎重,以下方式可以实现将插件禁用,但是注意如果此时还有延迟消息未消费,那么禁掉此插件后所有的未消费的延迟消息将丢失。

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

如果你采用了 Delayed Message 插件这种方式来实现,对于消息可用性要求较高的,在发现消息之前可以先落入 DB 打标记,消费之后将消息标记为已消费,中间可以加入定时任务做检测,这可以进一步保证你的消息的可靠性。

总结

经过一番实践测试、学习之后发现,DLX + TTLDelayed Message 插件这两种 RabbitMQ 延迟消息解决方案都有一定的局限性。

如果你的消息 TTL 是相同的,使用 DLX + TTL 的这种方式是没问题的,对于我来说目前还是优选。

如果你的消息 TTL 过期值是可变的,可以尝试下使用 Delayed Message 插件,对于某些应用而言它可能很好用,对于那些可能会达到高容量延迟消息应用而言,则不是很好。

关于 RabbitMQ 延迟队列,如果你有更多其它实现,欢迎关注公众号 “Nodejs技术栈” 在后台取得我的联系方式进行讨论,我很期待。

Reference

  • github.com/rabbitmq/rabbitmq-delayed-message-exchange
  • www.rabbitmq.com/community-plugins.html

作者简介:五月君,Nodejs Developer,慕课网认证作者,热爱技术、喜欢分享的 90 后青年,欢迎关注 Nodejs技术栈(id:NodejsRoadmap) 和 Github 开源项目 https://www.nodejs.red

敬请关注「Nodejs技术栈」微信公众号,获取优质文章

▼往期精彩回顾▼Node.js 如何利用 Libuv 实现事件循环和异步Nodejs 进阶:解答 Cluster 模块的几个疑问多维度分析 Express、Koa 之间的区别你需要了解的有关 Node.js 的所有信息Node.js 服务 Docker 容器化应用实践一文零基础教你学会 Docker 入门到实践JavaScript 浮点数之迷:大数危机Node.js 是什么?我为什么选择它?分享 10 道 Nodejs 进程相关面试题不容错过的 Node.js 项目架构

c1b634b9826e5077d93c5679bc69dda2.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/431085.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JS - 按钮倒计时

效果&#xff1a; html代码&#xff1a; <input type"button" id"btn" value"点击获取效验码" />js代码&#xff1a; //倒计时var wait60;function time(o) {if (wait 0) {o.removeAttribute("disabled"); o.value&qu…

python安装报错类型_Python处理验证码第一篇(pytesser初探及报错处理)

前言&#xff1a;春节期间&#xff0c;无法全身心投入地去写爬虫&#xff0c;那就玩玩验证码吧&#xff0c;应该比较有趣&#xff01;首次接触验证码识别&#xff0c;用pytesser接触一下最简单的验证码先&#xff0c;代码参照&#xff1a;使用python以及工具包进行简单的验证码…

bin文件怎么转换成文本文档_怎么把pdf文件转换成word文档?这样转很简单

在日常的学习、工作生活中&#xff0c;小伙伴们总少不了要对文件格式进行转换操作&#xff0c;例如把pdf文件转换成word文档。对于职场老手来说&#xff0c;这当然不算一个难度操作&#xff0c;甚至还很简单。但对于职场新人而言&#xff0c;找不对方法&#xff0c;可能操作起来…

delphi if多个条件_Python从入门到精通——一文读懂if语句用法

1、if语句概述if语句能够进行条件测试&#xff0c;并依据一定的条件进行具体的操作2、if语句条件测试if语句的核心是一个值为True或False的表达式&#xff0c;这种表达式称为条件测试。Python根据条件测试的值为True或False来决定是否执行if语句中的代码。2.1 条件测试是否相等…

jquery交换数组元素位置_跟我一起学jQuery——第一集

《锋利的JQuery》第二版阅读笔记-第一章jQuery对象和DOM对象想学习jQuery&#xff0c;首先要学会区分jQuery对象和DOM对象。1)jQuery对象是用jQuery类库的选择器获得的对象。2)DOM对象是用传统(javascript)获得的对象。举个栗子//DOM对象var domO document.getElementById(&qu…

Gitlab的develop角色的人没有权限无法提交的问题解决方案

问题 事情是这样的&#xff0c;最近跟几位同事搞一些东西&#xff0c;打算在Gitlab上建一个仓库&#xff0c;然后协同开发。 我建好仓库&#xff0c;将其他几位同事添加进来&#xff0c;角色分配为Develop。 之后提交初始代码到master分支后&#xff0c;他们用sourceTree拉取代…

macbook不能进系统 备份数据_不基于备份和表,生产系统数据误删就能完全恢复?!...

作者介绍刘宝珍&#xff0c;架构师&#xff0c;目前就职于大型资产管理公司的科技子公司&#xff0c;拥有多年的大型私有云的规划和设计工作经验&#xff0c;熟悉软件的开发流程&#xff0c;目前醉心于研究基于DDD和敏捷的软件的开发模式&#xff0c;对分布式架构有深入的理解&…

arduino代码_arduino智能小车项目——01、配件介绍及代码部分教程

各位小伙伴大家好&#xff1a;本期我们开始制作智能小车项目&#xff0c;这因该是资源包里面难度相对较大的项目。所以我们从易到难把项目进行分解&#xff0c;先从实现较为简单的功能开始。本期我们的目标是实现小车的自主运动&#xff0c;代码部分也相对比较简单&#xff0c;…

python+mysql库+json_用python写爬虫-5.1json用pandas入mysql库

pandas是一个数据处理模块&#xff0c;前面也已经提到了好些&#xff0c;用python写爬虫--4.5pandas存入excel.这次来统一说一说&#xff0c;使用感想。pandas主要是Seriers和Dataframe&#xff0c;Seriers相当于list&#xff0c;dataframe相当于excel表格&#xff0c;有行也有…

pdf温度记录仪开发_蔬菜、鲜果、奶制品冷链温度监控系统监控食品让客户放心...

冷链温度监控系统对于蔬菜鲜果奶制品的冷藏&#xff0c;不仅有助于减慢它们的腐坏速度&#xff0c;保持新鲜&#xff0c;而且对全国物品的运输和合理配置有极大的影响。选择GPS冷链温度监控食品传输过程让客户不再担心质量问题&#xff0c;人们也吃的放心。在多年自主开发GPS物…

python中什么是数据驱动_Python数据驱动DDT的应用

原标题&#xff1a;Python数据驱动DDT的应用 在开始之前&#xff0c;我们先来明确一下什么是数据驱动&#xff0c;在百度百科中数据驱动的解释是&#xff1a;数据驱动测试&#xff0c;即黑盒测试&#xff08;Black-box Testing&#xff09;&#xff0c;又称为功能测试&#xff…

2008日志清理 server sql_SQL Server 2008 收缩日志 清空删除大日志文件

由于SQL2008对文件和日志管理进行了优化&#xff0c;所以以下语句在SQL2005中可以运行但在SQL2008中已经被取消&#xff1a;(SQL2005)BackupLog DNName with no_loggodumptransaction DNName with no_loggoUSE DNNameDBCC SHRINKFILE (2)Go-----------------------------------…

go java性能_服务端I/O性能大比拼:Node、PHP、Java和Go

理解应用程序的输入/输出(I/O)模型&#xff0c;意味着其在计划处理负载与残酷的实际使用场景之间的差异。若应用程序比较小&#xff0c;也没有服务于很高的负载&#xff0c;也许它影响甚微。但随着应用程序的负载逐渐上涨&#xff0c;采用错误的I/O模型有可能会让你到处踩坑&am…

python数据库安装_python数据库-MySQL安装问题总结(48)

一、ERROR 1698(28000):Access denied for user rootlocalhost错误 我的操作系统是ubuntu&#xff1a;我的MySQL版本是&#xff1a;安装完成后&#xff0c;登录mysql的时候就出现了如下错误&#xff1a;因为安装的过程中没让设置密码&#xff0c;可能密码为空&#xff0c;但无论…

[转]什么是RSS

http://www.blogbus.com/bangzhuzhongxin-logs/5452786.html 如果您够仔细的话&#xff0c;一定会发现BlogBus的每一个Blog站上都会有一个 图标。这个图标表示Blogbus支持RSS功能&#xff08;目前是RSS 2.0&#xff09;&#xff0c;即所谓的新闻聚合功能。 点击 图标&#xf…

京东的商品搜索功能是如何实现的_如何精准查询京东商品销量?分享一个京东运营小妙招...

京东平台和其他平台商品信息展示不一样&#xff0c;在京东平台商品的销售额无论是累计的还是当月的数据都没有展示&#xff0c;即便是通过商家后台的京东商智也是只能看到销售额的指数&#xff0c;这就给京东商家做竞品分析时设置了障碍&#xff0c;无法获取到竞品的真实销售额…

python 可视化监控平台_python可视化篇之流式数据监控的实现

preface 流式数据的监控&#xff0c;以下主要是从算法的呈现出发&#xff0c;提供一种python的实现思路 其中&#xff1a; 1.python是2.X版本 2.提供两种实现思路&#xff0c;一是基于matplotlib的animation&#xff0c;一是基于matplotlib的ion 话不多说&#xff0c;先了解大概…

codeforce 606A - Magic Spheres

题意&#xff1a;a,b,c三种球&#xff0c;能把俩个一样的球变成另一颜色不一样的球。给你目标x,y,z&#xff0c;问能否经过变化至少达打目标。 1 #include<iostream>2 #include<stdio.h>3 #include<stdlib.h>4 #include<memory.h>5 #include<string…

cmd cd 无法切换目录_一分钟掌握cmd基础操作,告别鼠标

cmd基础操作cmdcmd是command的缩写&#xff0c;一直伴随着windows操作系统。有时称为&#xff1a;控制台窗口&#xff0c;cmd窗口&#xff0c;黑窗口&#xff0c;命令行窗口等。其实&#xff0c;在unix系统&#xff0c;Linux&#xff0c;MacOS等几乎所有的操作系统中&#xff0…

css 背景图怎么设置自动填充满_CSS属性设置 -- 背景样式

Ⅰ background-color: -- 设置标签的背景颜色rgba(0,0,0,0.65); -- (红,緑,蓝三原色,透明度)只能给背景设置透明度opacity: 0.65; -- 改变整个标签的透明度<style>Ⅱ background-image: --设置标签的背景图片url("图片网址"); -- 如果图片的大小没有标签大&…