Redis Stream Redisson Stream

目录

    • 一、Redis Stream
      • 1.1 场景1:多个客户端可以同时接收到消息
        • 1.1.1 XADD - 向stream添加Entry(发消息 )
        • 1.1.2 XREAD - 从stream中读取Entry(收消息)
        • 1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)
      • 1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)
        • 1.2.1 XGROUP CREATE - 创建消费组
        • 1.2.2 XREADGROUP - 从消费组中读取消息
        • 1.2.3 XACK - 确认消息
        • 1.2.4 XPENDING - 读取PEL消息
        • 1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者
        • 1.2.6 统计命令
      • 1.3 其他
    • 二、Redisson Stream

一、Redis Stream

之前介绍过Redis Pub/Sub相关内容,通过Redis Pub/Sub可以实现发布/订阅消息传递范式,但是存在丢消息的可能,而本文介绍的Redis Stream是一种可用来实现 可靠消息队列、支持消息分组(类似Kafka Group) 的数据结构。

关于Redis Stream的使用存在如下2个场景

  • 场景1: 多个客户端可以同时接收到消息
  • 场景2: 多个客户端仅收到一部分消息(分片sharded),例如发送消息A,B,C,客户端1收到A,C,客户端2收到B(参考Kafka group概念)。

关于场景1,则可参考XADD、XREAD、XRANGE等相关命令的使用,
关于场景2,则需要了解XGROUP CREATE、XREADGROUP、XACK等相关命令的使用。

1.1 场景1:多个客户端可以同时接收到消息

场景1中相关命令XADD、XREAD、XRANGE的使用汇总如下图:
在这里插入图片描述

1.1.1 XADD - 向stream添加Entry(发消息 )

向stream添加Entry(多个key/value对),XADD命令格式:

XADD stream名称 id key1 value1 key2 value2 …

其中id为此次entry的唯一ID,而key1 value1 key2 value2 …即为entry的具体内容,
id为*则表示由Redis自动生成ID:<millisecondsTime>-<sequenceNumber>
亦可明确指定id。

示例:

XADD mystream * name 罗 age 18
XADD mystream 1692632086370-0 name 刘 age 18
1.1.2 XREAD - 从stream中读取Entry(收消息)

从stream中读取entry,XREAD命令格式:

XREAD COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 STREAMS stream名称 上次接收的id

通过XADD添加一条消息,多个执行XREAD的客户端都会读取到该消息,
XREAD会从参数中指定的 上次接收的id 之后开始读取后续的消息,
上次接受的id 可设置为$,需配合BLOCK使用,表示仅读取从阻塞开始后新添加的消息(即不关心历史消息),
上次接受的id 可设置为+,需要Redis版本>=7.4 RC1,表示仅读取最后一条消息。
阻塞等待的毫秒数 如果为0,则表示一直阻塞,直到读取到一条消息。

示例:

# 从头开始读取1条消息
XREAD STREAMS mystream 0# 从头开始读取2条消息
XREAD COUNT 2 STREAMS mystream 0-0
# 从指定消息ID之后开始读取2条消息
XREAD COUNT 2 STREAMS mystream 1692632086370-0# 最长阻塞5秒,最多读取100条消息,仅读取从阻塞开始后新添加的消息
XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
# 继续从上次接受的id之后继续读取
XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3# 读取最后一条消息(需要Redis版本>=7.4 RC1)
XREAD STREAM mystream +
1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)

从stream指定区间(起始ID范围)正向读取Entry,XRANGE命令格式:

XRANGE stream名称 起始id 结束id COUNT 最多读取数量

按起始到结束正向返回消息,
-表示最小ID,+表示最大ID

示例:

# 返回全部消息(从前到后依次返回)
XRANGE mystream - + 
# 返回5条消息(从前到后依次返回)
XRANGE mystream - + COUNT 5# 返回指定id(包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream 1718951980910-0 + COUNT 5# 返回指定id(不包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream (1718951980910-0 + COUNT 5

从stream指定区间(起始ID范围)逆向读取Entry,XREVRANGE命令格式:

XREVRANGE stream名称 结束id 起始id COUNT 最多读取数量

按结束到起始逆向返回消息。

示例:

返回全部消息(从后到前逆向依次返回)
XREVRANGE mystream + -
# 返回2条消息(从后到前逆向依次返回)
XREVRANGE mystream + - COUNT 2

1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)

场景2中相关命令XGROUP CREATE、XREADGROUP、XACK、XPENDING、XCLAIM等使用汇总如下图:

在这里插入图片描述

1.2.1 XGROUP CREATE - 创建消费组

给stream创建消费分组,分组间彼此隔离,分组内多个consumer会轮流消费消息(分片),XGROUP CREATE命令格式:

XGROUP CREATE stream名称 group名称 起始读取id [MKSTREAM]

起始读取id0,表示从头开始读取,
起始读取id$,表示从最后一条消息之后开始读取,
MKSTREAM子命令是可选的,表示自动创建stream。

示例:

# 为mystream创建分组mygroup1,且从最新消息开始消费XGROUP CREATE mystream mygroup1 $
1.2.2 XREADGROUP - 从消费组中读取消息

以分组group读取stream中的消息,group中每个客户端需要指定consumer名称,多个consumer分摊group中的消息,而多个group间彼此隔离,XREADGROUP命令格式:

XREADGROUP GROUP group名称 consumer名称 COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 [NOACK] STREAMS stream名称 上次接收的id

PEL(Pending Entries List): 当使用XREADGROUP读取分组下消息时,服务器会记住哪条消息发给了分组下的哪个消费者,该记录存储在消费者组中,称为PEL,即已发送但尚未确认的消息ID列表。后续在消费者处理完消息后,消费者必须手动调用XACK命令对消息ID进行确认,以便从PEL中删除挂起的消息,关于PEL的结构可参见下图(截取自RedisInsight工具):
在这里插入图片描述

上次接收的id>,表示消费者只希望接收从未传递给任何其他消费者的消息,即给我新的信息>号表示从当前消费组的last_delivered_id后面开始读。
上次接收的id 设为0或其他有效的id,则表示仅读取 PEL(当前consumer没有确认的消息) 中指定id之后的消息。

NOACK子命令式可选的,表示无需确认消息,NOACK子命令适用于对可靠性要求不高、偶尔的消息丢失是可以接受的情况,使用NOACK子命令可以避免将消息添加到PEL( Pending Entries List),相当于在读取消息后自动确认消息,后续无需再调用XACK命令进行确认,

示例:

# 消费者c1阻塞读取mystream下分组mygroup1的最新消息(直到读取到1条消息后解除阻塞)
XREADGROUP GROUP mygroup1 c1 BLOCK 0 STREAMS mystream ># 消费者c1读取mystream下分组mygroup1的PEL消息(即已投递给c1但c1未进行确认的消息列表)
XREADGROUP GROUP mygroup1 c1 STREAMS mystream 0
1.2.3 XACK - 确认消息

确认stream下指定分组group的某条消息已被成功消费,XACK命令格式:

XACK stream名称 group名称 消息id

示例:

# 确认1条消息 
XACK mystream mygroup1 1719206857966-0 # 同时确认3条消息
XACK mystream mygroup1 1719206857966-0 1719206909894-0 1719207195666-0
1.2.4 XPENDING - 读取PEL消息

读取stream中指定分组group的PEL挂起消息列表,XPENDING命令格式:

XPENDING stream名称 group名称 IDEL 空闲毫秒数 起始消息id 结束消息id 查询数量 consumer名称

示例:

# 查询mystream下mygroup1分组的PEL列表
XPENDING mystream mygroup1# 查询mystream下mygroup1分组下的消费者c1的空闲9秒的最多10条PEL消息
XPENDING mystream mygroup1 IDLE 9000 - + 10 c1
1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者

通过XPENDING查询出PEL消息(已投递未确认)后,若原先消息对应的consumer已经挂掉,没有能力继续处理消息,则可通过XCLIAM将对应的消息转移给同分组下的其他consumer进行处理,XCLAIM命令格式如下:

XCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 消息id1 消息id2

转移后消息上次投递时间会重置为当前时间(即消息空闲idle时间为0),
默认会返回已经转移成功的消息内容,且消息投递计数会加1,
也可添加JUSTID子命令,则只返回消息ID不返回消息内容,且消息投递计数不变,
若多个客户端同时通过XCLAIM转移同一条消息的所有权,则只会有一个客户端转移成功。
Redis官方原文如下:

Note that the message is claimed only if its idle time is greater than the minimum idle time we specify when calling XCLAIM. Because as a side effect XCLAIM will also

  • reset the idle time (since this is a new attempt at processing the message),
  • two consumers trying to claim a message at the same time will never both succeed: only one will successfully claim the message. This avoids that we process a given message multiple times in a trivial way (yet multiple processing is possible and unavoidable in the general case).

示例:

# mystream下mygroup1分组下的PEL消息1526569498055-0且空闲时长超过1小时,则将其转移给消费者c2
XCLAIM mystream mygroup1 c2 3600000 1526569498055-0

亦可通过XAUTOCLAIM将PEL中指定起始消息ID后的消息批量进行转移,XAUTOCLIAM命令格式如下:

XAUTOCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 起始消息id COUNT 消息数量

示例:

# 扫描mystream下mygroup1分组下的所有PEL消息,空闲时长超过1小时,则最多转移25条消息给消费者c2
XAUTOCLAIM mystream mygroup1 c2 3600000 0-0 COUNT 25
1.2.6 统计命令
# 查询stream下的分组信息
XINFO GROUPS stream名称# 查询stream信息
XINFO STREAM stream名称# 查询stream下指定分组的消费者信息
XINFO CONSUMERS stream名称 group名称

1.3 其他

删除stream中的消息:

XDEL stream名称 id1 id2 …

查询stream中的消息(entry)数量:

XLEN stream名称

压缩stream中的消息数据量:

XTRIM stream名称 MAXLEN 保留的最近消息数量
XTRIM stream名称 MINID 消息ID(小于此ID的消息均会被删除)

二、Redisson Stream

在Redisson中可通过Stream实现Redis Stream,

场景1 相关示例代码如下:

@Test
void testStream() throws InterruptedException {String streamName = "mystream";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//获取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//发消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//读消息 - XREAD COUNT 5 BLOCK 5000 STREAMS mystream 0Map<StreamMessageId, Map<String, Object>> entries = stream.read(StreamReadArgs.greaterThan(StreamMessageId.ALL).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] read message: id={}, entry: {}", streamName, id, entryMap);});//读取区间内消息 - XRANGE mystream 0 entryId COUNT 10entries = stream.range(10, StreamMessageId.ALL, entryId);entries.forEach((id, entryMap) -> {log.info("stream[{}] range message: id={}, entry: {}", streamName, id, entryMap);});
}

场景2 相关示例代码如下:

@Resource
private RedissonClient redisson;@Test
void testStreamGroup() throws InterruptedException {String streamName = "mystream";String groupName = "mygroup1";String consumerName = "c1";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//获取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//发消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//查询已存在的分组 - XINFO GROUPS mystreamList<StreamGroup> streamGroups = stream.listGroups();streamGroups.forEach(streamGroup -> {log.info("stream[{}] listGroups groupName: {}", streamName, streamGroup.getName());});Boolean existGroup = streamGroups.stream().anyMatch(group -> groupName.equals(group.getName()));if (!existGroup) {//创建分组 - XGROUP CREATE mygroup1 $stream.createGroup(StreamCreateGroupArgs.name(groupName)//此处id支持:NEWEST即$,ALL即0.id(StreamMessageId.ALL));log.info("stream[{}] createGroup success, groupName: {}", streamName, groupName);}//读分组消息 - XREADGROUP GROUP mygroup1 c1 COUNT 5 BLOCK 5000 STREAMS mystream >Map<StreamMessageId, Map<String, Object>> entries = stream.readGroup(groupName, consumerName,//greaterThan即设置从哪个消息ID之后开始读取,支持:NEVER_DELIVERED即>、ALL即0StreamReadGroupArgs.greaterThan(StreamMessageId.NEVER_DELIVERED).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] readGroup groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);});//读取PEL中未确认的消息 - XPENDING mystream mygroup1 - + 100 c1Map<StreamMessageId, Map<String, Object>> streamMessageIdMapMap = stream.pendingRange(groupName, consumerName, StreamMessageId.MIN, StreamMessageId.MAX, 100);streamMessageIdMapMap.forEach((id, entryMap) -> {log.info("stream[{}] pendingRange groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);//确认消息(从PEL中移除) - XACK mystream mygroup1 1600000000000-0stream.ack(groupName, id);log.info("stream[{}] ack groupName: {}, consumerName: {}, message: id={}",streamName, groupName, consumerName, id);});}

参考:

Redis Stream
https://redis.io/docs/latest/develop/data-types/streams/
https://redis.io/docs/latest/commands/xreadgroup/

Redisson Stream
https://github.com/redisson/redisson/wiki/7.-Distributed-collections#720-stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/36745.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【DevExpress】WPF DevExpressMVVM 24.1版本开发指南

DevExpressMVVM WPF 环境安装 前言重要Bug&#xff08;必看&#xff09;环境安装控件目录Theme 主题LoginWindow 登陆窗口INavigationService 导航服务DockLayout Dock类型的画面布局TreeView 树状列表注意引用类型的时候ImageSource是PresentationCore程序集的博主找了好久&am…

Navicat 外网连接 mysql (1、通过SSH方式内网访问 2、对外开放3306端口)

1、通过SSH方式内网访问 直接常规方式使用IP、账号密码连接&#xff0c;失败 SSH方式&#xff1a; 常规 选项卡中&#xff1a;localhost录入数据库账号密码 SSH 选项卡中&#xff1a;勾选使用SSH&#xff0c;输入服务器IP、账号、密码 如果出现该错误&#xff0c;可能是服务器…

Windows下activemq开启jmx

1.activemq版本信息 activemq&#xff1a;apache-activemq-5.18.4 2.Windows下activemq开启jmx 1.进入activemq conf目录&#xff0c;备份activemq.xml文件 2.编辑activemq.xml文件&#xff0c;在broker节点增加useJmx"true" <broker xmlns"http://active…

无线通讯几种常规天线类别简介

天线对于无线模块来说至关重要&#xff0c;合适的天线可以优化通信网络&#xff0c;增加其通信的范围和可靠性。天线的选型对最后的模块通信影响很大&#xff0c;不合适的天线会导致通信质量下降。针对不同的市场应用&#xff0c;天线的材质、安置方式、性能也大不一样。下面简…

基于Vue 3.x与TypeScript的PPTIST本地部署与无公网IP远程演示文稿

文章目录 前言1. 本地安装PPTist2. PPTist 使用介绍3. 安装Cpolar内网穿透4. 配置公网地址5. 配置固定公网地址 前言 本文主要介绍如何在Windows系统环境本地部署开源在线演示文稿应用PPTist&#xff0c;并结合cpolar内网穿透工具实现随时随地远程访问与使用该项目。 PPTist …

基于STM32的智能水质监测系统

目录 引言环境准备智能水质监测系统基础代码实现&#xff1a;实现智能水质监测系统 4.1 数据采集模块4.2 数据处理与分析4.3 控制系统实现4.4 用户界面与数据可视化应用场景&#xff1a;水质管理与优化问题解决方案与优化收尾与总结 1. 引言 智能水质监测系统通过使用STM32嵌…

RISC-V知识总结 —— 向量(扩展)指令集

资源1:晏明 - RISC-V向量扩展指令架构及LLVM自动向量化支持 - 202112118 - 第13届开源开发工具大会&#xff08;OSDTConf2021&#xff09;_哔哩哔哩_bilibili资源2:张先轶 - 基于RISC-V向量指令集优化基础计算软件生态【第12届开源开发工具大会&#xff08;OSDT2020&#xff09…

研导智能科技——AI辅助科研产品开发

人工智能&#xff08;AI&#xff09;技术的飞速发展为科研领域带来了革命性的变化。本公司致力于开发基于人工智能的科研辅助产品&#xff0c;旨在通过智能化手段提高科研人员的工作效率和研究质量。目前&#xff0c;我们成功开发了研导学术平台&#xff08;www.zhiyanxueshu.c…

Linux运维:MySQL数据库(1)

1.信息与数据&#xff1a; 数据是信息的载体&#xff0c;信息是数据的内涵。数据库就是存储数据的仓库&#xff0c;并长期存储在计算机磁盘中&#xff0c;可由多个用户和应用程序共享的数据集合&#xff0c;就是数据库。 2.数据库中的数据的特点&#xff1a; 2.1.数据是按照某…

RuleApp1.4.6文章社区客户端 广告联盟支持Docx导入

支持编译为安卓&#xff0c;苹果&#xff0c;小程序&#xff0c;H5网页的社区客户端代码&#xff0c;包括文章模块&#xff0c;用户模块&#xff0c;动态模块&#xff0c;支付模块&#xff0c;聊天模块&#xff0c;广告模块&#xff0c;商城模块等基础功能&#xff0c;包含VIP会…

10位时间戳、13位时间戳、17位时间戳,以及在JavaScript中的格式转换

一、介绍 1、10位时间戳 2、13位时间戳 3、17位时间戳 4、时间戳转换工具 二、13位时间戳的转换 1、转标准日期 2、转格式化日期 三、10位时间戳的转换 1、转标准日期 2、转格式化日期 四、17位时间戳的转换 1、解析思路 2、解析过程 &#xff08;1&#xff09;统…

C++系统编程篇——Linux第一个小程序--进度条

&#xff08;1&#xff09;先引入一个概念&#xff1a;行缓冲区 \r和\n \r表示回车 \n表示回车并换行 ①代码一 #include<stdio.h> #include<unistd.h> int main()…

django学习入门系列之第三点《伪类简单了解》

文章目录 hover&#xff08;伪类&#xff09;after&#xff08;伪类&#xff09;往期回顾 hover&#xff08;伪类&#xff09; 伪类指的是用冒号加的 hover样式指的是&#xff0c;当用户光标移动到设定区域后&#xff0c;所执行的用法 如&#xff1a; <!DOCTYPE html>…

通过代理从ARDUINO IDE直接下载开发板包

使用免费代理 实现ARDUINO IDE2.3.2 下载ESP8266/ESP32包 免费代理 列表 测试代理是否可用的 网站 有时&#xff0c;代理是可用的&#xff0c;但依然有可能找不到开发板管理器的资料包。 可以多换几个代理试试。 代理的配置 文件 -> 首选项 -> 网络 进入后做如下配置…

2024百度之星第二场-小度的01串

补题链接&#xff1a; 码蹄集 一道经典线段树板子题。 区间修改01置换&#xff0c;区间查询子串权值。 唯一区别&#xff0c;权值要求的是相邻字符都不同所需修改的最小字符个数。 我们在线段树节点上分别维护当前连续区间&#xff1a; 奇数位是0的个数&#xff08;j0&…

Python和tkinter实现的字母记忆配对游戏

Python和tkinter实现的字母记忆配对游戏 因为这个小游戏用到了tkinter&#xff0c;先简要介绍一下它。tkinter是Python的标准GUI(图形用户界面)库&#xff0c;它提供了一种简单而强大的方式来创建图形界面应用程序。它提供了创建基本图形界面所需的所有工具&#xff0c;同时保…

OSI七层模型TCP/IP四层面试高频考点

OSI七层模型&TCP/IP四层&面试高频考点 1 OSI七层模型 1. 物理层&#xff1a;透明地传输比特流 在物理媒介上传输原始比特流&#xff0c;定义了连接主机的硬件设备和传输媒介的规范。它确保比特流能够在网络中准确地传输&#xff0c;例如通过以太网、光纤和无线电波等媒…

什么是有效的电子签名?PDF电子签名怎样具备法律效力?

电子签名逐渐成为商务文书和法律文件中不可或缺的一部分。《电子签名法》自2005年4月1日起施行&#xff0c;这一立法是中国信息化法律的重要里程碑&#xff0c;为电子签名应用奠定了法律基础。电子签名不仅仅是一种技术手段&#xff0c;更是一种法律认可的签名形式。那么究竟什…

Python私教张大鹏 PyWebIO通过事件回调实现表格的编辑和删除功能

从上面可以看出&#xff0c;PyWebIO把交互分成了输入和输出两部分&#xff1a;输入函数为阻塞式调用&#xff0c;会在用户浏览器上显示一个表单&#xff0c;在用户提交表单之前输入函数将不会返回&#xff1b;输出函数将内容实时输出至浏览器。这种交互方式和控制台程序是一致的…

学习TTS遇到的问题2 什么是TCN模型

学习TTS遇到的问题2 什么是TCN模型 什么是TCN模型怎么理解 TCN中的 dilation&#xff1f;什么是 Dilation具体例子数学表达作用例子代码示例 什么是TCN模型 https://juejin.cn/post/7262269863343079479 https://blog.csdn.net/weixin_57726558/article/details/132163074 由下…