RabbitMQ Streams 详解

RabbitMQ Streams是一种持久复制数据结构,可以完成与队列相同的任务:它们缓冲来自生产者的消息,这些消息由消费者读取。然而,流与队列的区别在于两个重要方面:消息的存储和消费方式。
Streams为仅追加的消息日志建模,这些消息可以重复读取,直到过期。流始终是持久的和复制的。这种流行为的更技术性的描述是“非破坏性消费者语义”。
要从RabbitMQ中的流中读取消息,一个或多个使用者订阅它,并根据需要多次读取相同的消息。
流中的数据可以通过RabbitMQ客户端库或通过专用的二进制协议插件和关联的客户端使用。强烈建议使用后一个选项,因为它提供对所有流特定功能的访问,并提供尽可能最好的吞吐量(性能)。
现在,您可能会问以下问题:

  • 那么流会取代队列吗?
  • 我应该放弃使用队列吗?

为了回答这些问题,引入流不是为了取代队列,而是为了补充队列。Streams为新的RabbitMQ用例开辟了许多机会,这些用例在使用Streams的用例中进行了描述。
以下信息详细说明流的使用以及流的管理和维护操作。
您还应该查看流插件信息,以了解有关使用二进制RabbitMQ stream协议的流的更多信息,以及功能矩阵的流核心和流插件比较页面。 

一、使用流的用例

开发流最初是为了涵盖现有队列类型无法提供或具有缺点的4个消息传递用例:

1、大型扇形分叉

当想要将相同的消息传递给多个订阅者时,用户当前必须为每个消费者绑定一个专用队列。如果使用者数量很大,这可能会变得效率低下,特别是在需要持久性和/或复制时。流将允许任意数量的使用者以非破坏性的方式消费来自同一队列的相同消息,从而不需要绑定多个队列。流消费者还可以从副本中读取数据,从而允许读取负载在集群中分布。

2、重放(时间旅行)

由于所有当前的RabbitMQ队列类型都具有破坏性消费行为,即当消费者用完消息时,将从队列中删除消息,因此不可能重新读取已消费的消息。流将允许消费者在日志中的任何点连接并从那里读取。

3、吞吐量性能

没有任何持久队列类型能够提供可以与任何现有的基于日志的消息传递系统竞争的吞吐量。Streams的设计以性能为主要目标。

4、大量积压工作

大多数RabbitMQ队列被设计为收敛于空状态,并因此进行了优化,当给定队列上有数百万条消息时,性能可能会更差。流旨在以有效的方式存储大量数据,并将内存开销降至最低。

二、如何使用RabbitMQ Streams 

可以指定可选队列和使用者参数的AMQP 0.9.1客户端库将能够将流用作常规AMQP 09.1队列。
就像队列一样,必须首先声明流。

1、声明RabbitMQ Stream

要声明流,请将x-queue-type队列参数设置为stream(默认值为classic)。此参数必须由客户端在声明时提供;不能使用策略设置或更改它。这是因为策略定义或适用的策略可以动态更改,但队列类型不能更改。必须在声明时指定。
下面的片段显示了如何使用AMQP 0.9.1 Java客户端创建流:

ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("my-stream",true,         // durablefalse, false, // not exclusive, not auto-deleteCollections.singletonMap("x-queue-type", "stream")
);

使用设置为stream的x-queue-type参数声明队列将在每个配置的RabbitMQ节点上创建一个具有副本的流。流是仲裁系统,因此强烈建议使用不均匀的集群大小。
流仍然是AMQP 0.9.1队列,因此它可以在创建后绑定到任何交换,就像任何其他RabbitMQ队列一样。
如果使用管理UI声明,则必须使用队列类型下拉菜单指定流类型。
流支持3个额外的队列参数,这些参数最好使用策略配置:

  • x-max-length-bytes

 设置流的最大大小(以字节为单位)。默认值:未设置。 

  • x-max-age

设置流的最长期限。默认值:未设置。

  • x-stream-max-segment-size-bytes

单位:字节。流在磁盘上被划分为固定大小的段文件。 此设置控制它们的大小。 默认值:(500000000 字节)。

以下代码片段演示如何将流的最大大小设置为 20 GB,并使用 100 MB 的段文件:

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
arguments.put("x-max-length-bytes", 20_000_000_000); // maximum stream size: 20 GB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
channel.queueDeclare("my-stream",true,         // durablefalse, false, // not exclusive, not auto-deletearguments
);

 

三、客户端操作

1、Consuming

由于流永远不会删除任何消息,因此任何消费者都可以开始读取/消费 从日志中的任何一点。这由 x-stream-offset consumer 参数控制。 如果未指定,消费者将从写入的下一个偏移量开始读取 添加到使用者启动后的日志中。支持以下值:

  • first - 从日志中的第一条可用消息开始
  • last - 从最后写入的消息“块”(块 是流中使用的储运单位,简单来说就是批次 由几到几千条消息组成的消息,具体取决于入口)
  • next - 与不指定任何偏移量相同
  • 偏移量 - 一个数值,指定要附加到日志的确切偏移量。 如果此偏移量不存在,它将分别钳制到日志的开始或结束。
  • 时间戳 - 一个时间戳值,指定要附加到日志的时间点。 它将限制到最接近的偏移量,如果时间戳超出流的范围,它将分别限制日志的开始或结束。 在 AMQP 0.9.1 中,使用的时间戳是精度为 00 秒的 POSIX 时间,即自 00-00-1970 01:01:<> UTC 以来的秒数。 请注意,使用者可以接收在指定时间戳之前发布的消息。
  • 间隔 - 一个字符串值,指定相对于当前时间附加日志的时间间隔。使用与 x-max-age 相同的规范

以下代码片段演示如何使用第一个偏移量规范:

<span style="background-color:#232323"><span style="color:#e6e1dc">channel.basicQos(<span style="color:#a5c261">100</span>); <span style="color:#bc9458"><em>// QoS must be specified</em></span>
channel.basicConsume(<span style="color:#a5c261">"my-stream"</span>,false,Collections.singletonMap(<span style="color:#a5c261">"x-stream-offset"</span>, <span style="color:#a5c261">"first"</span>), <span style="color:#bc9458"><em>// "first" offset specification</em></span>(consumerTag, message) -> {<span style="color:#bc9458"><em>// message processing</em></span><span style="color:#bc9458"><em>// ...</em></span>channel.basicAck(message.getEnvelope().getDeliveryTag(), false); <span style="color:#bc9458"><em>// ack is required</em></span>},consumerTag -> { });
</span></span>

以下代码片段演示如何指定要从中消费的特定偏移量:

<span style="background-color:#232323"><span style="color:#e6e1dc">channel.basicQos(<span style="color:#a5c261">100</span>); <span style="color:#bc9458"><em>// QoS must be specified</em></span>
channel.basicConsume(<span style="color:#a5c261">"my-stream"</span>,false,Collections.singletonMap(<span style="color:#a5c261">"x-stream-offset"</span>, <span style="color:#a5c261">5000</span>), <span style="color:#bc9458"><em>// offset value</em></span>(consumerTag, message) -> {<span style="color:#bc9458"><em>// message processing</em></span><span style="color:#bc9458"><em>// ...</em></span>channel.basicAck(message.getEnvelope().getDeliveryTag(), false); <span style="color:#bc9458"><em>// ack is required</em></span>},consumerTag -> { });
</span></span>

以下代码片段演示如何指定要从中使用的特定时间戳:

<span style="background-color:#232323"><span style="color:#e6e1dc"><span style="color:#bc9458"><em>// an hour ago</em></span>
<span style="color:#da4939">Date</span> <span style="color:#a5c261">timestamp</span> = <span style="color:#c26230">new</span> <span style="color:#ffc66d">Date</span>(System.currentTimeMillis() - <span style="color:#a5c261">60</span> * <span style="color:#a5c261">60</span> * <span style="color:#a5c261">1_000</span>)
channel.basicQos(<span style="color:#a5c261">100</span>); <span style="color:#bc9458"><em>// QoS must be specified</em></span>
channel.basicConsume(<span style="color:#a5c261">"my-stream"</span>,false,Collections.singletonMap(<span style="color:#a5c261">"x-stream-offset"</span>, timestamp), <span style="color:#bc9458"><em>// timestamp offset</em></span>(consumerTag, message) -> {<span style="color:#bc9458"><em>// message processing</em></span><span style="color:#bc9458"><em>// ...</em></span>channel.basicAck(message.getEnvelope().getDeliveryTag(), false); <span style="color:#bc9458"><em>// ack is required</em></span>},consumerTag -> { });
</span></span>

2、其他流操作

以下操作的使用方式与经典队列和仲裁队列类似 但有些具有一些特定于队列的行为。

  • 声明
  • 队列删除
  • 发布者确认
  • 消费(订阅):消费需要 QoS 要设置的预取。acks 作为一种信用机制来推进当前 消费者的偏移量。
  • 为使用者设置 QoS 预取
  • 消费者确认(牢记 QoS 预取限制)
  • 取消消费者

四、流的单个活动消费者功能

流的单个活动使用者是 RabbitMQ 3.11 及更高版本中提供的一项功能。 它在流上提供独占消费和消费连续性。 当共享同一流和名称的多个使用者实例启用单个活动使用者时,这些实例中只有一个实例将同时处于活动状态,因此将接收消息。 其他实例将处于空闲状态。

单一活跃使用者功能提供 2 个好处:

  • 消息按顺序处理:一次只有一个使用者。
  • 保持消费连续性:如果活动消费者停止或崩溃,则该组中的消费者将接管。

五、超级流

超级流是一种通过将大型流划分为较小的流来横向扩展的方法。 它们与单个活动使用者集成,以保留分区内的消息顺序。 超级流从 RabbitMQ 3.11 开始可用。

超级流是由单个常规流组成的逻辑流。 这是一种使用 RabbitMQ 流横向扩展发布和使用的方法:将大型逻辑流划分为分区流,将存储和流量拆分到多个集群节点上。

超级流仍然是一个逻辑实体:由于客户端库的智能性,应用程序将其视为一个“大”流。 超级流的拓扑基于 AMQP 0.9.1 模型,即它们之间的交换、队列和绑定。

可以使用任何 AMQP 0.9.1 库或管理插件创建超级流的拓扑,它需要创建直接交换,即“分区”流,并将它们绑定在一起。 不过,使用 rabbitmq-streams add_super_stream 命令可能更容易。 以下是如何使用它来创建具有 3 个分区的发票超级流:

<span style="background-color:#232323"><span style="color:#e6e1dc">rabbitmq-streams add_super_stream invoices --partitions 3
</span></span>

使用 rabbitmq-streams add_super_stream --help 了解有关该命令的更多信息。

与单个流相比,超级流增加了复杂性,因此不应将其视为涉及流的所有用例的默认解决方案。 仅当您确定已达到单个流的限制时,才考虑使用超级流。

六、功能比较:常规队列与流

流不是传统意义上的真正队列,因此不是 与 AMQP 0.9.1 队列语义非常一致。其他队列类型的许多功能 不支持支持,并且永远不会由于队列类型的性质而支持。

可以使用常规队列的 AMQP 0.9.1 客户端库将能够使用流 只要它使用消费者确认。

由于流中许多功能是非破坏性的,因此它们永远不会被流支持 读取语义。

功能矩阵

特征

经典

非持久性队列

是的

排他性

是的

每条消息的持久性

每条消息

总是

成员资格变更

自动

手动

TTL的

是的

否(但请参阅保留期)

队列长度限制

是的

否(但请参阅保留期)

懒惰行为

是的

固有

消息优先级

是的

消费者至上

是的

死信交换

是的

遵守政策

是的

(请参阅保留期)

对内存警报做出反应

是的

否(使用最少的 RAM)

病毒邮件处理

全局 QoS 预取

是的

非持久性队列

根据其假定的用例,流始终是持久的, 它们不能像常规队列那样是非持久的。

排他性

根据其假定的用例,流始终是持久的,它们不能像常规队列那样是排他性的。 它们不应用作临时队列。

懒惰模式

写入消息后,流将所有数据直接存储在磁盘上 在读取之前,它不会使用任何内存。可以这么说,流本质上是懒惰的。

全球服务质量

流不支持全局 QoS 预取,其中通道设置单个 使用该通道的所有使用者的预取限制。如果尝试 从启用了全局 QoS 的通道中的流中使用 将返回通道错误。

使用每使用者 QoS 预取,这是几个常用客户端中的默认设置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/217909.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

希亦|鲸立|小吉内衣洗衣机好用吗?强势PK“洗护一体”王者!

随着人们的生活水平的提升&#xff0c;越来越多小伙伴来开始追求更高的生活水平&#xff0c;一些智能化的小家电就被发明出来&#xff0c;而且内衣洗衣机是其中一个。我们对内衣裤的清洗频次会高于普通衣服&#xff0c;大多数人会选择手洗内衣裤&#xff0c;都在手洗过程不仅会…

会计学上机实验

使用说明&#xff1a; 蓝色标记提示需要明细核算最后算出来的净利润为 732150本文不是标准答案&#xff0c;老师也没给标准答案 1. 10 月 1 日&#xff0c;盛达有限责任公司成立&#xff0c;注册资本&#xff08;实收资本&#xff09;800 万人民币&#xff0c;其中&#xff1a…

Live800:企业做好客服质检的5大方法

在现代商业社会中&#xff0c;客服质量已经成为了企业竞争力的重要组成部分。一家企业的客服质量直接关系到其品牌形象和客户满意度&#xff0c;因此企业必须要重视客服质量&#xff0c;并且采取一些有效的方法来做好客服质检。下面将介绍企业做好客服质检的5大方法。 一、建立…

HI3559AV100和FPGA 7K690T的PCIE接口调试记录

1、基本情况 HI3559AV100和690t之间使用pcie2.0 x2接口连接&#xff0c;3559作为RC端&#xff0c;690T作为EP端&#xff0c;驱动使用XDMA。系统主要功能是FPGA采集srio接口过来的图像数据&#xff0c;再通过pcie把数据传递给3559&#xff0c;3559再实现图像数据的存储、AI处理、…

HarmonyOS鸿蒙应用开发——数据持久化Preferences

文章目录 数据持久化简述基本使用与封装测试用例参考 数据持久化简述 数据持久化就是将内存数据通过文件或者数据库的方式保存到设备中。HarmonyOS提供两两种持久化方案&#xff1a; Preferences&#xff1a;主要用于保存一些配置信息&#xff0c;是通过文本的形式存储的&…

X86汇编语言:从实模式到保护模式--命令篇

X86汇编语言&#xff1a;从实模式到保护模式–命令篇 补充汇编命令 注&#xff1a;不能直接将内存赋值给内存&#xff0c;也不能将立即数直接赋值给段寄存器&#xff08;CS DS ES SS&#xff09;&#xff0c;但是可以将内存直接赋值给段寄存器 div&#xff1a;使用操作数作为…

面试必备的Linux常用命令

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;对网络安全感兴趣的小伙伴可以关注专栏《网络安全入门到精通》 Linux常用命令 1、文件及内容2、网络3、进程服务4、…

PostgreSql 设置自增字段

一、概述 序列类型是 PostgreSQL 特有的创建一个自增列的方法。包含 smallserial、serial和 bigserial 类型&#xff0c;它们不是真正的类型&#xff0c;只是为了创建唯一标识符列而存在的方便符号。其本质也是调用的序列&#xff0c;序列详情可参考&#xff1a;《PostgreSql 序…

【FPGA】综合设计练习题目

前言 这是作者这学期上的数电实验期末大作业的题目&#xff0c;综合性还是十分强的&#xff0c;根据组号作者是需要做“4、篮球比赛计分器”&#xff0c;相关代码会在之后一篇发出来&#xff0c;这篇文章用于记录练习题目&#xff0c;说不定以后有兴趣或者有时间了回来做做。 …

慢SQL的治理经验

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、慢SQL导致的后果 二、可能导致慢SQL的原因 三、如何发现慢SQL 3.1 JVM Sandbox 四、识别高危SQL 4.1 阿里的重点强制SQL规…

python 拆分视频为图像序列

import cv2 import os#此删除文件夹内容的函数来源于网上 def del_file(filepath):"""删除某一目录下的所有文件或文件夹:param filepath: 路径:return:"""del_list os.listdir(filepath)for f in del_list:file_path os.path.join(filepath, …

微信小程序ios中非cover组件点击重复触发地图tap事件

现象&#xff1a; map中使用view组件的click事件会重复触发地图的tap组件&#xff0c;只在ios上出现 <map id"maps" style"width: 100vw;height: 100vh;" :latitude"latitude" :longitude"longitude":markers"markers"…

element-ui以服务方式调用loading,自定义修改icon

一、以服务的方式调用Loading 除了常用的v-loading、this.$loading我们还可以以服务的方式调用。主要有以下步骤 引入Loading服务 import { Loading } from element-ui;在需要时调用 Loading.service(options);其中 options 参数为 Loading 的配置项&#xff0c;具体见下表…

(第8天)保姆级 PL/SQL Developer 安装与配置

PL/SQL Developer 安装与配置(第8天) 咱们前面分享了很多 Oracle 数据库的安装,但是还没有正式使用过 Oracle 数据库,怎么连接 Oracle 数据库?今天就来讲讲我学习中比较常用的 Oracle 数据库连接工具:PL/SQL DEVELOPER。 PL/SQL Developer 的安装和配置对于新手来说还是…

Unity 射线检测(Raycast)检测图层(LayerMask)的设置

目录 主要内容 拓展&#xff1a; 主要内容 Raycast函数有很多重载(函数的重载根据函数的参数来决定) 这里只涉及这个重载,其余重载可以很方便得在Visual Studio中看源码获取&#xff1b; public static bool Raycast(Vector3 origin, Vector3 direction, out RaycastHit hit…

链游成为蓝海,潮游世界开创未来新时代

区块链、元宇宙浪潮来袭&#xff0c;为数字世界开启崭新的大门&#xff0c;一场链游模式的范式革命正在发生&#xff01; 未来&#xff0c;元宇宙中&#xff0c;链游将成为中坚力量。 潮游世界抢占时代先机&#xff0c;利用区块链技术的去中心化和数字资产的不可替代性&#x…

设计原则 | 接口隔离原则

一、接口隔离原则 1、原理 客户端不应该依赖它不需要的接口&#xff0c;即一个类对另一个类的依赖应该建立在最小的接口上。如果强迫客户端依赖于那些它们不使用的接口&#xff0c;那么客户端就面临着这个未使用的接口的改变所带来的变更&#xff0c;这无意间导致了客户程序之…

【MySQL】MySQL库的操作

MySQL库的操作 一、创建数据库创建数据库案例字符集和校验规则校验规则对数据库的影响 二、操纵数据库1、查看数据库2、查看当前正在使用的数据库3、使用数据库4、显示创建语句5、数据库删除6、数据库的修改7、备份和恢复8、查看连接情况 一、创建数据库 创建数据库的语法如下…

计网 - TCP扫盲

文章目录 知识点TCP头格式TCP有限状态机&#xff08;FSM&#xff09;为何需要TCP协议TCP的定义TCP连接的概念如何唯一确定一个TCP连接TCP vs UDPTCP拥塞控制TCP流量控制 导图 知识点 TCP头格式 TCP头部包含多个字段&#xff0c;其中一些是必需的&#xff0c;而另一些是可选的…

MySQL笔记-第18章_MySQL8其它新特性

视频链接&#xff1a;【MySQL数据库入门到大牛&#xff0c;mysql安装到优化&#xff0c;百科全书级&#xff0c;全网天花板】 文章目录 第18章_MySQL8其它新特性1. MySQL8新特性概述1.1 MySQL8.0 新增特性1.2 MySQL8.0移除的旧特性 2. 新特性1&#xff1a;窗口函数2.1 使用窗口…