RabbitMQ3.13.x之十_流过滤的内部结构设计与实现

RabbitMQ3.13.x之十_流过滤的内部结构设计与实现

文章目录

  • RabbitMQ3.13.x之十_流过滤的内部结构设计与实现
  • 1. 概念
    • 1. 消息发布
    • 2. 消息消费
  • 2. 流的结构
    • 1. 在代理端进行过滤
    • 2. 客户端筛选
    • 3. JavaAPI示例
    • 4. 流过滤配置
    • 5. AMQP上的流过滤
    • 6. 总结
  • 3. 相关链接

1. 概念

流过滤的思想是在代理端提供第一级的高效过滤,而无需代理解释消息。 这样,只需要流子集的使用者就不需要自己获取所有数据并处理所有过滤。 这可以大大减少传输给消费者的数据。

通过筛选,可以将筛选器值与每条消息相关联。 它可以是地理信息,例如每条消息来自的世界区域,如下图所示:

在这里插入图片描述

因此,我们的流有 1 条(绿色)消息、1 条(深蓝色)消息、2 条(紫色)消息,然后是 2 条消息。AMER``APAC``EMEA``AMER

1. 消息发布

发布者可以将每封出站邮件与其筛选器值相关联:

在这里插入图片描述

在上图中,发布者发布了 1 条(绿色)消息和 2 条(紫色)消息,这些消息将添加到流中。AMER``EMEA

2. 消息消费

当使用者订阅时,它可以指定一个或多个过滤器值,并且代理应仅发送具有此或这些过滤器值的消息。 我们很快就会看到这在实践中有点不同,但这足以理解这些概念。

在下图中,顶部的使用者指定它只想要(绿色)消息,而代理只发送这些消息。 中间的消费者和底部的消费者也是如此。AMER``EMEA``APAC

在这里插入图片描述

概念就到这里了,现在让我们来了解一下实现细节。

2. 流的结构

我们需要知道流是如何构建的,以便了解流过滤的内部结构。 流是包含段文件的目录。 每个区段文件都有一个关联的索引文件(用于了解在区段文件中给定偏移量处附加使用者的位置等)。 拥有多个“小”段文件比为整个流拥有一个大的整体文件要好:例如,删除“旧”段文件以截断流比删除大文件的开头更有效、更安全。

区段文件由包含消息的块组成。 区块中的消息数取决于入口速率(高入口速率表示一个块中的消息较多,低入口速率表示块中的消息较少)。 块中的消息数量从几条(甚至 1 条)到几千条不等。

块是怎么回事? 块是流中的工作单元:它们用于复制,更重要的是,对于我们的主题,用于消费者交付。 代理使用 sendfile 系统调用(将整个块从文件系统发送到网络套接字,而不将数据复制到用户空间)向使用者发送块,一次一个。

下图说明了流的结构:

在这里插入图片描述

有了这个,让我们看看代理如何知道是否要调度一个区块。

1. 在代理端进行过滤

想象一下,我们有一个只想要(绿色)消息的消费者。 当代理要调度一个区块时,它需要知道该区块是否包含消息。 如果是这样,它可以将块发送给消费者,如果没有,代理可以跳过该块,转到下一个块,然后重新迭代。AMER``AMER

每个区块都有一个标头,该标头可以包含一个 Bloom 过滤器,该标头告诉代理该块是否包含具有给定过滤器值的消息。 Bloom 过滤器是一种节省空间的概率数据结构,用于测试元素是否是集合的成员。 在我们的示例中,集合包含 、 和 ,元素是 。AMER``EMEA``APAC``AMER

下图说明了 3 个块的代理端过滤过程:

在这里插入图片描述

如上图所示,筛选器可能会返回误报,即不包含具有预期筛选器值的消息的块。 这是正常的,因为 Bloom 过滤器是概率性的。 不过,它们不会返回假阴性:如果过滤器显示没有(绿色)消息,我们可以确定它是真的。 我们必须忍受这种不确定性:有时我们可能会无缘无故地调度一些块,但这总比调度所有块要好。AMER

可以肯定的是,消费者可以接收到它不想要的消息:看看我们左边的第一个块,它包含消费者要求的(绿色)消息,但也包含(紫色)和(深蓝色)消息。 这就是为什么客户端也必须进行过滤的原因。AMER``EMEA``APAC

2. 客户端筛选

代理在传递消息时处理第一级过滤,但由于传递单位是块,因此使用者仍然可以接收它不想要的消息。 因此,客户端还必须执行一些筛选,这显然必须与订阅时设置的筛选值一致。

下图说明了一个消费者,它只需要(绿色)消息,并且必须执行最后一步的筛选:AMER

在这里插入图片描述

让我们看看这如何转化为应用程序代码。

3. JavaAPI示例

筛选不是侵入性的,可以作为跨领域问题进行处理,从而最大限度地减少对应用程序代码的影响。 以下是在使用流 Java 客户端(方法)声明生产者时设置从消息中提取过滤器值的逻辑:filterValue(Function<Message,String>)

Producer producer = environment.producerBuilder().stream("invoices").filterValue(msg -> msg.getApplicationProperties().get("region").toString())  .build();

在消费端,流 Java 客户端提供了设置过滤器值的方法和设置客户端过滤逻辑的方法。 声明使用者时,必须调用这两种方法:filter().values(String... filterValues)``filter().postFilter(Predicate<Message> filter)

Consumer consumer = environment.consumerBuilder().stream("invoices").filter().values("AMER")  .postFilter(msg -> "AMER".equals(msg.getApplicationProperties().get("region")))  .builder().messageHandler((ctx, msg) -> {// message processing code}).build();

如您所见,筛选不会更改发布和使用代码,而只是更改生产者和使用者的声明。

现在让我们看看如何以最合适的方式为用例配置流过滤。

4. 流过滤配置

关于流过滤的第一篇文章提供了一些数字(与不过滤相比,过滤节省了大约 80% 的带宽)。 流过滤的好处很大程度上取决于用例:入口速率、基数和过滤器值的分布,以及过滤器大小。 过滤器越大越好(错误率越小)。 可以为块中使用的筛选器大小设置一个介于 16 到 255 字节之间的值,默认值为 16 字节。

流 Java 客户端提供了在创建流时设置过滤器大小的方法(它在内部设置参数):filterSize(int)``stream-filter-size-bytes

environment.streamCreator().stream("invoices").filterSize(32).create()

如何估算过滤器的尺寸? 网上有许多 Bloom 滤镜计算器。 参数包括哈希函数的数量(RabbitMQ 流过滤为 2 个)、预期元素的数量、错误率和大小。 您通常对元素的数量有所了解,因此您需要在错误率和过滤器大小之间找到权衡。

以下是一些示例:

  • 10 个值,16 个字节 => 2 % 错误率
  • 30 个值,16 个字节 => 14 % 错误率
  • 200 个值,128 个字节 => 10 % 的错误率

那么,过滤器越大越好? 不完全是:尽管 Bloom 过滤器在存储方面非常有效,因为它不存储元素,只是元素是否在集合中,过滤器大小是预先分配的。 如果将筛选器大小设置为 255,并且每个块至少包含一条具有筛选器值的消息,则每个块标头中将分配 255 个字节。 如果块包含许多大消息,这很好,因为与块大小相比,筛选器大小可以忽略不计。 但是,对于退化的情况,例如具有 10 字节消息和 10 字节筛选器值的单消息块,您最终会得到一个比实际数据更大的筛选器。

您必须尝试自己的用例,以估计过滤器大小对流大小的影响。 关于流过滤的第一篇文章提供了一个使用 Stream PerfTest 估计流大小的技巧(在不过滤的情况下读取整个流并查阅指标)。rabbitmq_stream_read_bytes_total

5. AMQP上的流过滤

尽管访问流的首选方式是流协议,但支持其他协议,例如 AMQP。 任何 AMQP 客户端库也支持流筛选:

  • 声明:将参数设置为 并使用 在声明流时设置筛选器大小。x-queue-type``stream``x-stream-filter-size-bytes
  • 发布:使用标头设置出站邮件的筛选器值。x-stream-filter-value
  • 使用:使用 consumer 参数设置预期的筛选器值(字符串或字符串数组),并使用 consumer 参数(可选)接收没有任何筛选值的消息(默认值为 )。客户端过滤仍然是必要的!x-stream-filter``x-stream-match-unfiltered``false

6. 总结

流过滤易于使用并从中受益,但有关内部的一些知识可用于优化其使用,尤其是对于棘手的用例。 请记住,客户端筛选是必需的,并且必须与配置的筛选器值一致。 这通常很容易实现。 还可以为给定的用例以最合适的方式设置过滤器大小。

3. 相关链接

参考:

Stream Filtering Internals | RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/795113.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端与后端协同:实现Excel导入导出功能

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

RISC-V GNU Toolchain 工具链安装问题解决(含 stdio.h 问题解决)

我的安装过程主要参照 riscv-collab/riscv-gnu-toolchain 的官方 Readme 和这位佬的博客&#xff1a;RSIC-V工具链介绍及其安装教程 - 风正豪 &#xff08;大佬的博客写的非常详细&#xff0c;唯一不足就是 sudo make linux -jxx 是全部小写。&#xff09; 工具链前前后后我装了…

非关系型数据库--------------------Redis 群集模式

目录 一、集群原理 二、集群的作用 &#xff08;1&#xff09;数据分区 &#xff08;2&#xff09;高可用 Redis集群的作用和优势 三、Redis集群的数据分片 四、Redis集群的工作原理 五、搭建redis群集模式 5.1启用脚本配置集群 5.2修改集群配置 5.3启动redis节点 5…

Django--admin 后台管理站点

Django最大的优点之一&#xff0c;就是体贴的提供了一个基于项目model创建的一个后台管理站点admin。这个界面只给站点管理员使用&#xff0c;并不对大众开放。虽然admin的界面可能不是那么美观&#xff0c;功能不是那么强大&#xff0c;内容不一定符合你的要求&#xff0c;但是…

dm8 备份与恢复

dm8 备份与恢复 基础环境 操作系统&#xff1a;Red Hat Enterprise Linux Server release 7.9 (Maipo) 数据库版本&#xff1a;DM Database Server 64 V8 架构&#xff1a;单实例1 设置bak_path路径 --创建备份文件存放目录 su - dmdba mkdir -p /dm8/backup--修改dm.ini 文件…

非关系型数据库——Redis基本操作

目录 一、Redis数据库常用命令 1.Set——存放数据 2.Get——获取数据 3.Keys——获取符合条件的键值 4.Exists——判断键值是否存在 5.Del——删除指定键值 6.Type——获取键值对应的类型 7.Rename——对已有键值重命名&#xff08;覆盖&#xff09; 8.Renamenx——对…

【蓝桥杯嵌入式】13届程序题刷题记录及反思

一、题目分析 考察内容&#xff1a; led按键&#xff08;短按&#xff09;PWM输出&#xff08;PA1&#xff09;串口接收lcd显示 根据PWM输出占空比调节&#xff0c;高频与低频切换 串口接收&#xff08;指令解析&#xff09;【中断接收】 2个显示界面 led灯闪烁定时器 二…

SV学习笔记(六)

覆盖率类型 写在前面 覆盖率是 衡量设计验证完备性 的一个通用词。随着测试逐步覆盖各种合理的场景&#xff0c;仿真过程会慢慢勾画出你的设计情况。覆盖率工具会 在仿真过程中收集信息 &#xff0c;然后进行后续处理并且得到覆盖率报告。通过这个报告找出覆盖之外的盲区&…

动态属性的响应式问题和行内编辑的问题

动态属性的响应式问题 通过点击给目标添加动态数据&#xff0c;该数据不具备响应式特性 如下图&#xff1a; 点击编辑&#xff0c;前面的数据框会变成输入框&#xff0c;点取消会消失 // 获取数据 async getList () {const res await xxxthis.list res.data.rows// 1. 获…

【QT+QGIS跨平台编译】074:【libdxfrw跨平台编译】(一套代码、一套框架,跨平台编译)

点击查看专栏目录 文章目录 一、libdxfrw介绍二、QGIS下载三、文件分析四、pro文件五、编译实践一、libdxfrw介绍 libdxfrw是一个用于读取和写入DXF(Drawing Exchange Format)文件的开源C++库。DXF是一种由AutoCAD开发的文件格式,用于存储CAD(计算机辅助设计)图形数据,它…

使用 LLMLingua-2 压缩 GPT-4 和 Claude 提示

原文地址&#xff1a;Compress GPT-4 and Claude prompts with LLMLingua-2 2024 年 4 月 1 日 向大型语言模型&#xff08;LLM&#xff09;发送的提示长度越短&#xff0c;推理速度就会越快&#xff0c;成本也会越低。因此&#xff0c;提示压缩已经成为LLM研究的热门领域。 …

ensp华为AC+AP上线配置

AR1配置&#xff1a; <Huawei>system-view # 进入系统视图<Huawei>sysname R1 # 设备重命名[R1]dhcp enable # 开启DHCP功能[R1]interface GigabitEthernet0/0/0 # 进入接口 [R1-GigabitEthernet0/0/0]ip address 192.168.0.1 23 # 配置接口地址 [R1-GigabitE…

SpringBoot配置文件加载的优先级顺序

SpringBoot配置文件加载的优先级顺序 1.按文件类型2.按路径比较3.按命令行参数设置 1.按文件类型 SpringBoot的配置文件可以分为.properties .yml .yaml 在同一路径下&#xff08;比如都在classpath下&#xff09;三者的优先级顺序是.properties> .yml> .yaml 2.按路径…

基于tensorflow和kereas的孪生网络推理图片相似性

一、环境搭建 基础环境&#xff1a;cuda 11.2 python3.8.13 linux ubuntu18.04 pip install tensorflow-gpu2.11.0 验证&#xff1a;# 查看tensorflow版本 import tensorflow as tf tf.__version__ # 是否能够成功启动GPU from tensorflow.python.client import device_lib pr…

jvm基础三——类加载器

类加载器 在Java中&#xff0c;类加载器&#xff08;Class Loader&#xff09;是Java虚拟机&#xff08;JVM&#xff09;的一部分&#xff0c;负责将类文件&#xff08;.class文件&#xff09;加载到JVM中&#xff0c;使得程序能够使用这些类。类加载器在Java中具有重要的作用&…

6 种事件驱动的架构模式

事件驱动架构(Event-Driven Architecture)是一种基于事件和事件处理的软件架构&#xff0c;它的核心思想是将系统的行为和逻辑抽象成一系列事件&#xff0c;这些事件在系统中按照一定的规则和顺序产生和传播&#xff0c;并被相应的处理器处理。事件驱动架构具有高度的灵活性、可…

【数据结构】考研真题攻克与重点知识点剖析 - 第 3 篇:栈、队列和数组

前言 本文基础知识部分来自于b站&#xff1a;分享笔记的好人儿的思维导图与王道考研课程&#xff0c;感谢大佬的开源精神&#xff0c;习题来自老师划的重点以及考研真题。此前我尝试了完全使用Python或是结合大语言模型对考研真题进行数据清洗与可视化分析&#xff0c;本人技术…

LogicFlow 在HTML中的引入与使用

LogicFlow 在HTML中的引入与使用 LogicFlow的引入与使用&#xff0c;相较于BPMNJS相对容易一些&#xff0c;更加灵活一些&#xff0c;但是扩展代码可能写得更多一些。 示例展示 使用方式 这个的使用方式就简单很多了&#xff0c;利用cdn把js下载下来&#xff0c;引入到HTML文…

c语言之向main函数传递参数

在c语言中&#xff0c;main函数也是可以传递传递参数的&#xff0c;业内向main函数传递参数的格式是 main(int argc,char *argv[]) 向main函数传递参数不是通过代码传递的&#xff0c;一般是通过dos命令传递 举个例子 #include<stdio.h> void main(int argc,char *ar…

PyTorch之计算模型推理时间

一、参考资料 如何测试模型的推理速度 Pytorch 测试模型的推理速度 二、计算PyTorch模型推理时间 1. 计算CPU推理时间 import torch import torchvision import time import tqdm from torchsummary import summarydef calcCPUTime():model torchvision.models.resnet18()…