Redis Stream Redisson Stream

目录

    • 一、Redis Stream
      • 1.1 场景1:多个客户端可以同时接收到消息
        • 1.1.1 XADD - 向stream添加Entry(发消息 )
        • 1.1.2 XREAD - 从stream中读取Entry(收消息)
        • 1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)
      • 1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)
        • 1.2.1 XGROUP CREATE - 创建消费组
        • 1.2.2 XREADGROUP - 从消费组中读取消息
        • 1.2.3 XACK - 确认消息
        • 1.2.4 XPENDING - 读取PEL消息
        • 1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者
        • 1.2.6 统计命令
      • 1.3 其他
    • 二、Redisson Stream

一、Redis Stream

之前介绍过Redis Pub/Sub相关内容,通过Redis Pub/Sub可以实现发布/订阅消息传递范式,但是存在丢消息的可能,而本文介绍的Redis Stream是一种可用来实现 可靠消息队列、支持消息分组(类似Kafka Group) 的数据结构。

关于Redis Stream的使用存在如下2个场景

  • 场景1: 多个客户端可以同时接收到消息
  • 场景2: 多个客户端仅收到一部分消息(分片sharded),例如发送消息A,B,C,客户端1收到A,C,客户端2收到B(参考Kafka group概念)。

关于场景1,则可参考XADD、XREAD、XRANGE等相关命令的使用,
关于场景2,则需要了解XGROUP CREATE、XREADGROUP、XACK等相关命令的使用。

1.1 场景1:多个客户端可以同时接收到消息

场景1中相关命令XADD、XREAD、XRANGE的使用汇总如下图:
在这里插入图片描述

1.1.1 XADD - 向stream添加Entry(发消息 )

向stream添加Entry(多个key/value对),XADD命令格式:

XADD stream名称 id key1 value1 key2 value2 …

其中id为此次entry的唯一ID,而key1 value1 key2 value2 …即为entry的具体内容,
id为*则表示由Redis自动生成ID:<millisecondsTime>-<sequenceNumber>
亦可明确指定id。

示例:

XADD mystream * name 罗 age 18
XADD mystream 1692632086370-0 name 刘 age 18
1.1.2 XREAD - 从stream中读取Entry(收消息)

从stream中读取entry,XREAD命令格式:

XREAD COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 STREAMS stream名称 上次接收的id

通过XADD添加一条消息,多个执行XREAD的客户端都会读取到该消息,
XREAD会从参数中指定的 上次接收的id 之后开始读取后续的消息,
上次接受的id 可设置为$,需配合BLOCK使用,表示仅读取从阻塞开始后新添加的消息(即不关心历史消息),
上次接受的id 可设置为+,需要Redis版本>=7.4 RC1,表示仅读取最后一条消息。
阻塞等待的毫秒数 如果为0,则表示一直阻塞,直到读取到一条消息。

示例:

# 从头开始读取1条消息
XREAD STREAMS mystream 0# 从头开始读取2条消息
XREAD COUNT 2 STREAMS mystream 0-0
# 从指定消息ID之后开始读取2条消息
XREAD COUNT 2 STREAMS mystream 1692632086370-0# 最长阻塞5秒,最多读取100条消息,仅读取从阻塞开始后新添加的消息
XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
# 继续从上次接受的id之后继续读取
XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3# 读取最后一条消息(需要Redis版本>=7.4 RC1)
XREAD STREAM mystream +
1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)

从stream指定区间(起始ID范围)正向读取Entry,XRANGE命令格式:

XRANGE stream名称 起始id 结束id COUNT 最多读取数量

按起始到结束正向返回消息,
-表示最小ID,+表示最大ID

示例:

# 返回全部消息(从前到后依次返回)
XRANGE mystream - + 
# 返回5条消息(从前到后依次返回)
XRANGE mystream - + COUNT 5# 返回指定id(包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream 1718951980910-0 + COUNT 5# 返回指定id(不包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream (1718951980910-0 + COUNT 5

从stream指定区间(起始ID范围)逆向读取Entry,XREVRANGE命令格式:

XREVRANGE stream名称 结束id 起始id COUNT 最多读取数量

按结束到起始逆向返回消息。

示例:

返回全部消息(从后到前逆向依次返回)
XREVRANGE mystream + -
# 返回2条消息(从后到前逆向依次返回)
XREVRANGE mystream + - COUNT 2

1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)

场景2中相关命令XGROUP CREATE、XREADGROUP、XACK、XPENDING、XCLAIM等使用汇总如下图:

在这里插入图片描述

1.2.1 XGROUP CREATE - 创建消费组

给stream创建消费分组,分组间彼此隔离,分组内多个consumer会轮流消费消息(分片),XGROUP CREATE命令格式:

XGROUP CREATE stream名称 group名称 起始读取id [MKSTREAM]

起始读取id0,表示从头开始读取,
起始读取id$,表示从最后一条消息之后开始读取,
MKSTREAM子命令是可选的,表示自动创建stream。

示例:

# 为mystream创建分组mygroup1,且从最新消息开始消费XGROUP CREATE mystream mygroup1 $
1.2.2 XREADGROUP - 从消费组中读取消息

以分组group读取stream中的消息,group中每个客户端需要指定consumer名称,多个consumer分摊group中的消息,而多个group间彼此隔离,XREADGROUP命令格式:

XREADGROUP GROUP group名称 consumer名称 COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 [NOACK] STREAMS stream名称 上次接收的id

PEL(Pending Entries List): 当使用XREADGROUP读取分组下消息时,服务器会记住哪条消息发给了分组下的哪个消费者,该记录存储在消费者组中,称为PEL,即已发送但尚未确认的消息ID列表。后续在消费者处理完消息后,消费者必须手动调用XACK命令对消息ID进行确认,以便从PEL中删除挂起的消息,关于PEL的结构可参见下图(截取自RedisInsight工具):
在这里插入图片描述

上次接收的id>,表示消费者只希望接收从未传递给任何其他消费者的消息,即给我新的信息>号表示从当前消费组的last_delivered_id后面开始读。
上次接收的id 设为0或其他有效的id,则表示仅读取 PEL(当前consumer没有确认的消息) 中指定id之后的消息。

NOACK子命令式可选的,表示无需确认消息,NOACK子命令适用于对可靠性要求不高、偶尔的消息丢失是可以接受的情况,使用NOACK子命令可以避免将消息添加到PEL( Pending Entries List),相当于在读取消息后自动确认消息,后续无需再调用XACK命令进行确认,

示例:

# 消费者c1阻塞读取mystream下分组mygroup1的最新消息(直到读取到1条消息后解除阻塞)
XREADGROUP GROUP mygroup1 c1 BLOCK 0 STREAMS mystream ># 消费者c1读取mystream下分组mygroup1的PEL消息(即已投递给c1但c1未进行确认的消息列表)
XREADGROUP GROUP mygroup1 c1 STREAMS mystream 0
1.2.3 XACK - 确认消息

确认stream下指定分组group的某条消息已被成功消费,XACK命令格式:

XACK stream名称 group名称 消息id

示例:

# 确认1条消息 
XACK mystream mygroup1 1719206857966-0 # 同时确认3条消息
XACK mystream mygroup1 1719206857966-0 1719206909894-0 1719207195666-0
1.2.4 XPENDING - 读取PEL消息

读取stream中指定分组group的PEL挂起消息列表,XPENDING命令格式:

XPENDING stream名称 group名称 IDEL 空闲毫秒数 起始消息id 结束消息id 查询数量 consumer名称

示例:

# 查询mystream下mygroup1分组的PEL列表
XPENDING mystream mygroup1# 查询mystream下mygroup1分组下的消费者c1的空闲9秒的最多10条PEL消息
XPENDING mystream mygroup1 IDLE 9000 - + 10 c1
1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者

通过XPENDING查询出PEL消息(已投递未确认)后,若原先消息对应的consumer已经挂掉,没有能力继续处理消息,则可通过XCLIAM将对应的消息转移给同分组下的其他consumer进行处理,XCLAIM命令格式如下:

XCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 消息id1 消息id2

转移后消息上次投递时间会重置为当前时间(即消息空闲idle时间为0),
默认会返回已经转移成功的消息内容,且消息投递计数会加1,
也可添加JUSTID子命令,则只返回消息ID不返回消息内容,且消息投递计数不变,
若多个客户端同时通过XCLAIM转移同一条消息的所有权,则只会有一个客户端转移成功。
Redis官方原文如下:

Note that the message is claimed only if its idle time is greater than the minimum idle time we specify when calling XCLAIM. Because as a side effect XCLAIM will also

  • reset the idle time (since this is a new attempt at processing the message),
  • two consumers trying to claim a message at the same time will never both succeed: only one will successfully claim the message. This avoids that we process a given message multiple times in a trivial way (yet multiple processing is possible and unavoidable in the general case).

示例:

# mystream下mygroup1分组下的PEL消息1526569498055-0且空闲时长超过1小时,则将其转移给消费者c2
XCLAIM mystream mygroup1 c2 3600000 1526569498055-0

亦可通过XAUTOCLAIM将PEL中指定起始消息ID后的消息批量进行转移,XAUTOCLIAM命令格式如下:

XAUTOCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 起始消息id COUNT 消息数量

示例:

# 扫描mystream下mygroup1分组下的所有PEL消息,空闲时长超过1小时,则最多转移25条消息给消费者c2
XAUTOCLAIM mystream mygroup1 c2 3600000 0-0 COUNT 25
1.2.6 统计命令
# 查询stream下的分组信息
XINFO GROUPS stream名称# 查询stream信息
XINFO STREAM stream名称# 查询stream下指定分组的消费者信息
XINFO CONSUMERS stream名称 group名称

1.3 其他

删除stream中的消息:

XDEL stream名称 id1 id2 …

查询stream中的消息(entry)数量:

XLEN stream名称

压缩stream中的消息数据量:

XTRIM stream名称 MAXLEN 保留的最近消息数量
XTRIM stream名称 MINID 消息ID(小于此ID的消息均会被删除)

二、Redisson Stream

在Redisson中可通过Stream实现Redis Stream,

场景1 相关示例代码如下:

@Test
void testStream() throws InterruptedException {String streamName = "mystream";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//获取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//发消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//读消息 - XREAD COUNT 5 BLOCK 5000 STREAMS mystream 0Map<StreamMessageId, Map<String, Object>> entries = stream.read(StreamReadArgs.greaterThan(StreamMessageId.ALL).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] read message: id={}, entry: {}", streamName, id, entryMap);});//读取区间内消息 - XRANGE mystream 0 entryId COUNT 10entries = stream.range(10, StreamMessageId.ALL, entryId);entries.forEach((id, entryMap) -> {log.info("stream[{}] range message: id={}, entry: {}", streamName, id, entryMap);});
}

场景2 相关示例代码如下:

@Resource
private RedissonClient redisson;@Test
void testStreamGroup() throws InterruptedException {String streamName = "mystream";String groupName = "mygroup1";String consumerName = "c1";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//获取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//发消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//查询已存在的分组 - XINFO GROUPS mystreamList<StreamGroup> streamGroups = stream.listGroups();streamGroups.forEach(streamGroup -> {log.info("stream[{}] listGroups groupName: {}", streamName, streamGroup.getName());});Boolean existGroup = streamGroups.stream().anyMatch(group -> groupName.equals(group.getName()));if (!existGroup) {//创建分组 - XGROUP CREATE mygroup1 $stream.createGroup(StreamCreateGroupArgs.name(groupName)//此处id支持:NEWEST即$,ALL即0.id(StreamMessageId.ALL));log.info("stream[{}] createGroup success, groupName: {}", streamName, groupName);}//读分组消息 - XREADGROUP GROUP mygroup1 c1 COUNT 5 BLOCK 5000 STREAMS mystream >Map<StreamMessageId, Map<String, Object>> entries = stream.readGroup(groupName, consumerName,//greaterThan即设置从哪个消息ID之后开始读取,支持:NEVER_DELIVERED即>、ALL即0StreamReadGroupArgs.greaterThan(StreamMessageId.NEVER_DELIVERED).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] readGroup groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);});//读取PEL中未确认的消息 - XPENDING mystream mygroup1 - + 100 c1Map<StreamMessageId, Map<String, Object>> streamMessageIdMapMap = stream.pendingRange(groupName, consumerName, StreamMessageId.MIN, StreamMessageId.MAX, 100);streamMessageIdMapMap.forEach((id, entryMap) -> {log.info("stream[{}] pendingRange groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);//确认消息(从PEL中移除) - XACK mystream mygroup1 1600000000000-0stream.ack(groupName, id);log.info("stream[{}] ack groupName: {}, consumerName: {}, message: id={}",streamName, groupName, consumerName, id);});}

参考:

Redis Stream
https://redis.io/docs/latest/develop/data-types/streams/
https://redis.io/docs/latest/commands/xreadgroup/

Redisson Stream
https://github.com/redisson/redisson/wiki/7.-Distributed-collections#720-stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/36745.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【DevExpress】WPF DevExpressMVVM 24.1版本开发指南

DevExpressMVVM WPF 环境安装 前言重要Bug&#xff08;必看&#xff09;环境安装控件目录Theme 主题LoginWindow 登陆窗口INavigationService 导航服务DockLayout Dock类型的画面布局TreeView 树状列表注意引用类型的时候ImageSource是PresentationCore程序集的博主找了好久&am…

[笔记] keytool 导入服务器证书和证书私钥

背景 我当前手头已有一个服务器证书和对应的私钥&#xff0c;现在需要转换为 Java KeyStore 格式使用&#xff0c;找了一大圈才发现 keytool 无法直接导入服务器证书和私钥&#xff0c;当然证书可以直接导入&#xff0c;但是私钥是无法直接导入。找了一大圈发现可以先将服务器…

LeetCode题解:1669. 合并两个链表,JavaScript,详细注释

原题链接&#xff1a; https://leetcode.cn/problems/merge-in-between-linked-lists/ 解题思路&#xff1a; 注意该题传入的a和b是链表的索引&#xff0c;而不是节点的值先遍历list1&#xff0c;找到a-1和b1节点将a-1的next指向list2的头节点在将list2的尾节点的next指向b1节…

Navicat 外网连接 mysql (1、通过SSH方式内网访问 2、对外开放3306端口)

1、通过SSH方式内网访问 直接常规方式使用IP、账号密码连接&#xff0c;失败 SSH方式&#xff1a; 常规 选项卡中&#xff1a;localhost录入数据库账号密码 SSH 选项卡中&#xff1a;勾选使用SSH&#xff0c;输入服务器IP、账号、密码 如果出现该错误&#xff0c;可能是服务器…

计算机网络重点名词解释整理

名词解释 GPTVersion 一、网络协议 网络协议 数据交换的规则 组成&#xff1a;语义、语法、定时 二、DHCP DHCP 动态规划主机配置协议 作用&#xff1a;让计算机自动获取IP地址 特点&#xff1a;即插即用&#xff0c;不需要手动设置 三、信号的基本调制方法以及定义 …

Windows下activemq开启jmx

1.activemq版本信息 activemq&#xff1a;apache-activemq-5.18.4 2.Windows下activemq开启jmx 1.进入activemq conf目录&#xff0c;备份activemq.xml文件 2.编辑activemq.xml文件&#xff0c;在broker节点增加useJmx"true" <broker xmlns"http://active…

C++循环队列 自定义queue

原理解析 看main部分的注释&#xff0c;对照着函数&#xff0c;应该能看懂。 #include <iostream> class Queue {public:static constexpr int MAX_SIZE 5;int items[MAX_SIZE];int front, rear;Queue() : front(-1), rear(-1) {}void enqueue(int value) {if ((rear …

理解 Vue.js 中的 immediate: true

理解 Vue.js 中的 immediate: true 在使用 Vue.js 时&#xff0c;监听器 (watchers) 是一种非常重要的工具&#xff0c;它允许我们观察和响应数据的变化。在定义监听器时&#xff0c;我们通常会在组件的 watch 选项中添加相关配置。immediate: true 是其中的一个配置选项。本文…

无线通讯几种常规天线类别简介

天线对于无线模块来说至关重要&#xff0c;合适的天线可以优化通信网络&#xff0c;增加其通信的范围和可靠性。天线的选型对最后的模块通信影响很大&#xff0c;不合适的天线会导致通信质量下降。针对不同的市场应用&#xff0c;天线的材质、安置方式、性能也大不一样。下面简…

近期计算机领域的热点技术

随着科技的飞速发展&#xff0c;计算机领域的新技术、新趋势层出不穷。本文将探讨近期计算机领域的几个热点技术趋势&#xff0c;并对它们进行简要的分析和展望。 一、人工智能与机器学习 人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09;是近年来计算…

基于Vue 3.x与TypeScript的PPTIST本地部署与无公网IP远程演示文稿

文章目录 前言1. 本地安装PPTist2. PPTist 使用介绍3. 安装Cpolar内网穿透4. 配置公网地址5. 配置固定公网地址 前言 本文主要介绍如何在Windows系统环境本地部署开源在线演示文稿应用PPTist&#xff0c;并结合cpolar内网穿透工具实现随时随地远程访问与使用该项目。 PPTist …

[gpt胡说八道篇] 使用Docker快速启动Doris

Docker 是一种轻量级的虚拟化技术&#xff0c;我们可以利用 Docker 快速的在本地启动一个 Doris 的实例&#xff0c;方便进行开发和测试。下面我们来看一下如何操作。 1. 拉取 Docker 镜像 首先&#xff0c;我们需要从 Docker Hub 上拉取 Doris 的镜像。打开终端&#xff0c;输…

Qt Qvariant

QVariant 是 Qt 框架中的一个非常强大的类&#xff0c;它用于存储各种不同类型的数据&#xff0c;并提供了一种统一的方式来处理这些数据。QVariant 可以存储大多数基本数据类型&#xff0c;如整数、浮点数、字符串、日期时间等&#xff0c;以及更复杂的数据类型&#xff0c;如…

ChatGPT的原理可以通俗易懂地介绍

ChatGPT的原理可以通俗易懂地介绍如下&#xff1a; 基础架构&#xff1a; ChatGPT基于OpenAI的GPT&#xff08;Generative Pre-trained Transformer&#xff09;模型&#xff0c;尤其是GPT-3的架构进行构建。GPT模型是一种基于Transformer架构的预训练语言模型&#xff0c;特别…

基于STM32的智能水质监测系统

目录 引言环境准备智能水质监测系统基础代码实现&#xff1a;实现智能水质监测系统 4.1 数据采集模块4.2 数据处理与分析4.3 控制系统实现4.4 用户界面与数据可视化应用场景&#xff1a;水质管理与优化问题解决方案与优化收尾与总结 1. 引言 智能水质监测系统通过使用STM32嵌…

RISC-V知识总结 —— 向量(扩展)指令集

资源1:晏明 - RISC-V向量扩展指令架构及LLVM自动向量化支持 - 202112118 - 第13届开源开发工具大会&#xff08;OSDTConf2021&#xff09;_哔哩哔哩_bilibili资源2:张先轶 - 基于RISC-V向量指令集优化基础计算软件生态【第12届开源开发工具大会&#xff08;OSDT2020&#xff09…

设计模式(实际项目)-状态机模式

需求背景&#xff1a;存在状态流转的预约单 一.数据库设计 CREATE TABLE appointment (id bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 主键id,appoint_type int(11) NOT NULL COMMENT 预约类型(0:线下查房...),appoint_user_id bigint(20) NOT NULL COMMENT 预约人…

研导智能科技——AI辅助科研产品开发

人工智能&#xff08;AI&#xff09;技术的飞速发展为科研领域带来了革命性的变化。本公司致力于开发基于人工智能的科研辅助产品&#xff0c;旨在通过智能化手段提高科研人员的工作效率和研究质量。目前&#xff0c;我们成功开发了研导学术平台&#xff08;www.zhiyanxueshu.c…

Linux运维:MySQL数据库(1)

1.信息与数据&#xff1a; 数据是信息的载体&#xff0c;信息是数据的内涵。数据库就是存储数据的仓库&#xff0c;并长期存储在计算机磁盘中&#xff0c;可由多个用户和应用程序共享的数据集合&#xff0c;就是数据库。 2.数据库中的数据的特点&#xff1a; 2.1.数据是按照某…

RuleApp1.4.6文章社区客户端 广告联盟支持Docx导入

支持编译为安卓&#xff0c;苹果&#xff0c;小程序&#xff0c;H5网页的社区客户端代码&#xff0c;包括文章模块&#xff0c;用户模块&#xff0c;动态模块&#xff0c;支付模块&#xff0c;聊天模块&#xff0c;广告模块&#xff0c;商城模块等基础功能&#xff0c;包含VIP会…