RabbitMQ实践——Stream队列的使用方法

大纲

  • 什么是Stream队列
  • 创建Stream
  • 发布消息
  • 消费
    • 从第一条消息开始读取
    • 从最后一条消息开始读取
    • 从某个时间戳开始读取
    • 从某个偏移量开始读取
  • 样例
  • 长度控制
    • 长度控制
    • 时间控制
    • 服务端筛选消息
      • 发布方设定过滤值
      • 消费方设置服务端过滤,且要二次过滤
  • 工程代码
  • 参考资料

在 《RabbitMQ实践——搭建多人聊天服务》一文中,我们使用Stream队列存储了聊天室记录。但是每个进入聊天室的人不能读取历史消息,只能读取当前时间之后的消息。这是因为我们对读取逻辑做了特殊设置。本文我们将全面介绍Stream队列的使用。

什么是Stream队列

Stream队列保存了发布到其上所有未过期(时间或Size判断)的消息。消费者只可以读取该队列,但是不能让队列将已读消息删除。这样就可以保证相同配置的消费者可以读取到相同的消息。
鉴于它保留了未过期消息,所以非常适合需要读取历史消息的场景。
鉴于消费者不能让其删除已读消息,所以对于需要“扇出”大量相同消息的场景,可以使用一个Stream来替代Fanout交换器绑定多个相同消息队列的方案。这样即可以降低系统设计的复杂度,也会提升Rabbitmq服务效率。
在这里插入图片描述

创建Stream

下面代码会创建一个Stream。

action.queueDeclare(roomName, true, false, false,Collections.singletonMap("x-queue-type", "stream"));

需要注意的是:

  • durable(第二个参数)只能设置为true。
  • exclusive(第三个参数)只能设置为false
  • autoDelete(第四个参数)只能设置为false。
    下面完整代码,除了创建了Stream,还创建了交换器以及它们之间的绑定。
    private void createChatRoom(String roomName) {rabbitTemplate.execute(action -> {action.exchangeDeclare(roomName, "fanout", false, true, null);action.queueDeclare(roomName, true, false, false,Collections.singletonMap("x-queue-type", "stream"));action.queueBind(roomName, roomName, "");return null;});}

发布消息

发布消息没什么特别,直接给交换器发送消息即可。

rabbitTemplate.send(roomName, "", msg);

消费

由于Stream中保存了所有未超时的消息,所以存在一个起始读取位置的问题。
还有两个比较特殊的情况需要注意:

  • 不可以“自动应答”,即AutoACK只能是false。所以我们要对每条消息手工ack。
  • 必须指定Qos。因为消费者需要手工应答,所以需要设置一个配额,这样可以保证过慢的服务减少获取消息,从而让服务分发消息更加合理。

一般常见的模式如下:

从第一条消息开始读取

“x-stream-offset"设置为"first”,就是从第一条消息开始读取。

channel.basicQos(100);
channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", "first"),(consumerTag, message) -> {// Your codechannel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });

从最后一条消息开始读取

“x-stream-offset"设置为"last”,就是从第一条消息开始读取。

channel.basicQos(100);
channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", "last"),(consumerTag, message) -> {// Your codechannel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });

从某个时间戳开始读取

Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100);
channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", "last"),(consumerTag, message) -> {// Your codechannel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });

从某个偏移量开始读取

channel.basicQos(100);
channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", offset),(consumerTag, message) -> {emitter.next(new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });

样例

延续《RabbitMQ实践——搭建多人聊天服务》的案例,上面几个场景的读取代码如下:

    public Flux<String> getMessageFromFirst(String username, String roomName) {return Flux.create(emitter -> {rabbitTemplate.execute((ChannelCallback<Void>) channel -> {channel.basicQos(100);channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", "first"),(consumerTag, message) -> {emitter.next(new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}public Flux<String> getMessageFromLast(String username, String roomName) {return Flux.create(emitter -> {rabbitTemplate.execute((ChannelCallback<Void>) channel -> {channel.basicQos(100);channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", "last"),(consumerTag, message) -> {emitter.next(new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}public Flux<String> getMessageFromTimestamp(String username, String roomName, Date timestamp) {return Flux.create(emitter -> {rabbitTemplate.execute((ChannelCallback<Void>) channel -> {channel.basicQos(100);channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", timestamp),(consumerTag, message) -> {emitter.next(new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}public Flux<String> getMessageFromOffset(String username, String roomName, long offset) {return Flux.create(emitter -> {rabbitTemplate.execute((ChannelCallback<Void>) channel -> {channel.basicQos(100);channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", offset),(consumerTag, message) -> {emitter.next(new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}

长度控制

由于Stream并不会因为消费者而删除消息,导致其保存的消息数量一直在增加。所以需要通过一定的策略控制其大小。

长度控制

在创建Stream时,我们可以通过x-max-length-bytes设置其最大Size。这样如果Stream内容达到这个Size,最早的消息就会被Stream淘汰掉。

Map<String, Object> args = new HashMap<>();
args.put("x-max-length-bytes", maxSize);
args.put("x-queue-type", "stream");
action.queueDeclare(roomName, true, false, false, args);

时间控制

在创建Stream时,我们可以通过x-max-age设置消息的最长生命周期。超过这个时长的消息会被淘汰。它的取值可以是如下单位:Y, M, D, h, m, s。比如“1m”表示一分钟。

Map<String, Object> args = new HashMap<>();
args.put("x-max-age", ttl);
args.put("x-queue-type", "stream");
action.queueDeclare(roomName, true, false, false, args);

服务端筛选消息

如果消费者并不关系Stream中所有消息,它可以通过"x-stream-filter"来做过滤。这个过滤会发生在服务端,这样可以大大减轻消费者和服务端的压力。但是需要注意的是,服务端的过滤使用的是布隆过滤器,所以发送到消费者端的消息会包含不符合条件的消息,所以消费端需要做二次校验才可以使用。

发布方设定过滤值

channel.basicPublish("", // default exchange"my-stream",new AMQP.BasicProperties.Builder().headers(Collections.singletonMap("x-stream-filter-value", "california" // set filter value)).build(),body
);

消费方设置服务端过滤,且要二次过滤

channel.basicQos(100); // QoS must be specified
channel.basicConsume("my-stream",false,Collections.singletonMap("x-stream-filter", "california"), // set filter(consumerTag, message) -> {Map<String, Object> headers = message.getProperties().getHeaders();// there must be some client-side filter logicif ("california".equals(headers.get("x-stream-filter-value"))) {// message processing// ...}channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required},consumerTag -> { });

工程代码

https://github.com/f304646673/RabbitMQDemo/tree/main/chat

参考资料

  • https://www.rabbitmq.com/docs/streams

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/34249.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Lua网站开发之文件表单上传

这个代码示例演示如何上传文件或图片&#xff0c;获取上传信息及保存文件到本地。 local fw require("fastweb") local request require("fastweb.request") local response require("fastweb.response") local cjson require("cjson&q…

wps的domain转为shp矢量

wps的namelist制作、python出图和转矢量 简介 wps&#xff08;WRF Preprocessing System&#xff09;是中尺度数值天气预报系统WRF(Weather Research and Forecasting)的预处理系统。 wps的安装地址在GitHub上&#xff1a;https://github.com/wrf-model/WPS 下载完成后&…

巴中市红色旅游地管理系统

摘 要 随着红色旅游的兴起&#xff0c;越来越多的人开始对巴中市的红色旅游地产生兴趣。巴中市作为中国革命的重要发源地之一&#xff0c;具有丰富的红色旅游资源。然而&#xff0c;目前巴中市红色旅游地的管理仍然存在许多问题&#xff0c;如信息不对称、资源利用效率低等。为…

Robust semi-supervised segmentationwith timestep ensembling diffusion models

时间步合成扩散模型的鲁棒半监督分割 摘要 医学图像分割是一项具有挑战性的任务&#xff0c;由于许多数据集的大小和注释的限制&#xff0c;使得分割更加困难。消噪扩散概率模型(DDPM)最近在模拟自然图像的分布方面显示出前景&#xff0c;并成功地应用于各种医学成像任务。这…

如何给小语种视频生成字幕

目前我们常看的有视频有中、英、日、韩这四种语言&#xff0c;如果我们想给其他的不常用的语言生成字幕怎么办&#xff1f;今天教大家如何给其他语言生成视频字幕文件 打开智游剪辑&#xff08;zyjj.cc&#xff09;搜索字幕生成&#xff0c;选择多语种那个就可以了 然后上传我们…

10.XSS绕过之htmlspecialchars()函数

XSS绕过之htmlspecialchars()函数 首先可以测试一下是否将字符被转移成html实体&#xff0c;输入字符测试 1111"<>$点击提交 查看页面元素代码&#xff0c;发现单引号不变&#xff0c;可以利用 重新输入攻击代码&#xff0c;用单引号闭合前面的&#xff0c;进…

python爬虫-爬虫的基础知识储备

爬虫就是一个不断的去抓去网页的程序&#xff0c;根据我们的需要得到我们想要的结果&#xff01;但我们又要让服务器感觉是我们人在通过浏览器浏览不是程序所为&#xff01;归根到底就是我们通过程序访问网站得到html代码&#xff0c;然后分析html代码获取有效内容的过程。下面…

【Python实战因果推断】1_因果效应异质性1

目录 From ATE to CATE Why Prediction Is Not the Answer CATE and ITE 本文将介绍应用于行业的因果推理中最有趣的发展&#xff1a;效应异质性。在此之前&#xff0c;你们了解的是一种治疗方法的一般影响。现在&#xff0c;你将专注于发现它如何对不同的人产生不同的影响。…

Java | Leetcode Java题解之第198题打家劫舍

题目&#xff1a; 题解&#xff1a; class Solution {public int rob(int[] nums) {if (nums null || nums.length 0) {return 0;}int length nums.length;if (length 1) {return nums[0];}int first nums[0], second Math.max(nums[0], nums[1]);for (int i 2; i <…

【Oracle篇】逻辑备份工具expdp(exp)/impdp(imp)和物理备份工具rman的区别和各自的使用场景总汇(第八篇,总共八篇)

&#x1f4ab;《博主介绍》&#xff1a;✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ &#x1f4ab;《擅长领域》&#xff1a;✌️擅长Oracle、MySQL、SQLserver、阿里云AnalyticDB for MySQL(分布式数据仓库)、Linux&#xff0c;也在扩展大数据方向的知识面✌️…

链表数组遍历输出的辨析(二者都含指针的情况下)----PTA期末复习题

输入输出三位学生的学号和信息 一开始我认为是指针&#xff0c;直接背了指针输出的方式&#xff1b;p;p!NULL;pp->next 这个是错误的 下面这个输出是正确的方式 分析怎么区分这两个 举个例子来 数组遍历&#xff1a; 链表遍历&#xff1a; 输出的结果&#xff1a; 如果将…

区块链技术与数字货币

1.起源 ➢中本聪(Satoshi Nakamoto), 2008 ➢比特币:一种点对点的电子现金系统 2.分布式账本技术原理 1.两个核心技术&#xff1a; ➢以链式区块组织账本数据实现账本数据的不可篡改 ➢分布式的可信记账机制 2.共识机制&#xff1a;由谁记账 ➢目的&#xff1a; ⚫ 解…

【数据结构(邓俊辉)学习笔记】二叉搜索树03——平衡

文章目录 1. 极端退化2. 平均高度3. 理想 适度4. 歧义 等价5. 等价变换 1. 极端退化 二叉搜索树为我们同时实现对数据集高效的静态操作以及动态操作打开了一扇新的大门。 正如我们所看到的&#xff0c;从策略上&#xff0c;BST可以视作是试图将此前的向量结构以及列表结构优…

SpringBoot整合MongoDB JPA使用

一、整合MongoDB SpringDataMongoDB是 SpringData家族成员之一&#xff0c;MongoDB的持久层框架&#xff0c;底层封装了 mongodb-driver。mongodb-driver 是 MongoDB官方推出的 Java连接 MongoDB的驱动包&#xff0c;相当于JDBC驱动。 SpringBoot整合 MongoDB&#xff0c;引入…

【Mac】XnViewMP for Mac(图片浏览查看器)及同类型软件介绍

软件介绍 XnViewMP 是一款多功能、跨平台的图像查看和管理软件&#xff0c;适用于 macOS、Windows 和 Linux 系统。它是经典 XnView 软件的增强版本&#xff0c;更加现代化且功能更强大。XnViewMP 支持数百种图像格式&#xff0c;并提供多种图像处理工具&#xff0c;使其成为摄…

【摄像头标定】使用kalibr进行双目摄像头标定(ros1、ros2)

使用kalibr进行双目摄像头标定 前言标定板标定①板端准备和录制②上位机准备和标定 前言 本文不是纯用ros1进行标定&#xff0c;需要ros1和ros2通信。给使用ros2进行开发&#xff0c;但又想用kalibr标定双目摄像头的小伙伴一个教程。本文双目摄像头的数据发布使用ros2&#xf…

收银系统源码-千呼新零售2.0【线上营销】

千呼新零售2.0系统是零售行业连锁店一体化收银系统&#xff0c;包括线下收银线上商城连锁店管理ERP管理商品管理供应商管理会员营销等功能为一体&#xff0c;线上线下数据全部打通。 适用于商超、便利店、水果、生鲜、母婴、服装、零食、百货等连锁店使用。 详细介绍请查看&a…

Js逆向爬虫基础篇

这里写自定义目录标题 逆向技巧断点一 、请求入口定位1. 关键字搜索2. 请求堆栈3. hook4. JSON.stringify 二、响应入口定位&#xff1a;1. 关键字搜索2. hook3. JSON.parse 逆向技巧 断点 普通断点 条件断点 日志断点 XHR断点 一 、请求入口定位 1. 关键字搜索 key关…

办公软件的答案?ONLYOFFICE 桌面应用编辑器会是最好用的 Office 软件?ONLYOFFICE 桌面编辑器使用初体验

文章目录 &#x1f4cb;前言&#x1f3af;什么是 ONLYOFFICE&#x1f3af; 主要功能介绍及 8.1 新功能体验&#x1f3af; 在线体验&#x1f4dd;最后 &#x1f4cb;前言 提到办公软件&#xff0c;大家最常用的可能就是微软的 Microsoft Office 和国产的 WPS Office。这两款软件…

jenkins环境搭建--关于jenkins在Ubuntu下的安装篇(一)

在ubuntu下使用命令进行下载安装包&#xff1a; 关于jenkins的安装有多种&#xff0c;可以借助docker容器进行安装&#xff0c;也可以通过传统方法手动一步步的进行安装&#xff0c;以下介绍手动一步步的安装方法&#xff0c;后续我们将解释关于jenkins的相关配置以及实战使用…