Redis 7.x 系列【14】数据类型之流(Stream)

有道无术,术尚可求,有术无道,止于术。

本系列Redis 版本 7.2.5

源码地址:https://gitee.com/pearl-organization/study-redis-demo

文章目录

    • 1. 概述
    • 2. 常用命令
      • 2.1 XADD
      • 2.2 XRANGE
      • 2.3 XREVRANGE
      • 2.4 XDEL
      • 2.5 XLEN
      • 2.6 XREAD
      • 2.7 XGROUP CREATE
      • 2.8 XACK
      • 2.9 XPENDING
    • 3. 应用场景

1. 概述

消息队列:是指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递,生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由服务端给其推送消息。

Redis 也支持消息队列功能,在 5.0 版本之前,基于以下两种方式实现:

  • Pub/Sub
  • List

Pub/Sub 发布订阅模式,消息的发送者不会将消息直接发送给特定的接收者,而是通过消息通道广播出去,让订阅该消息主题的订阅者消费到:

在这里插入图片描述
Pub/Sub 中的消息无法持久化,如果出现网络断开、宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

Redis List 也可以实现消息队列,按照插入顺序排序,可以添加一个元素到列表的头部(左边)或者尾部(右边)。 将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理:
在这里插入图片描述
Redis List 同样存在诸多问题,比如,不支持多消费者模式,不支持延时消息,不支持优先级,不支持消息确认机制等等。

Redis Stream5.0 版本中引入的一种新的数据结构,用于实现简单但功能强大的消息传递模式。以时间序列的方式存储消息,每个消息都有一个唯一的 ID ,并且可以被多个消费者订阅和消费。是 Redis 实现消息队列的另外一种模式,支持消息的持久化、支持自动生成全局唯一 1D、支持 Ack 确认消息模式、支持消费组模式等,旨在让消息队列更加的稳定和可靠。

其结构图如下:
在这里插入图片描述
各部分解释:

  • Message Content:消息内容
  • Consumer group:消费组,通过 XGROUP CREATE 命令创建,同一个消费组可以有多个消费者
  • Last_delivered_id:游标,每个消费组会有个游标 Last_delivered_id,任意一个消费者读取了消息都会使游标往前移动。
  • Consumer:消费者,消费组中的消费者
  • Pending_ ids:消费者会有一个状态变量,用于记录被当前消费已读取但未 ack 的消息 Id ,如果客户端没有 ack ,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack 它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

2. 常用命令

Stream 相关所有命令:

命名描述
XACK确认消费者已经成功处理从 Stream 中获取的消息
XADD添加消息到队列末尾
XAUTOCLAIM转移符合指定条件的待处理流条目的所有权
XCLAIM改变待处理消息的所有权
XDEL删除消息
XGROUP CREATE为存储在 key 的流创建一个新的消费者组
XGROUP CREATECONSUMER要在存储在key的流的消费者组中创建一个消费者
XGROUP DELCONSUMER消费者组中删除一个消费者
XGROUP DESTROY删除一个已存在的消费者组
XGROUP SETID为消费者组设置最后传递的ID
XINFO CONSUMERS返回消费者组中的消费者列表
XINFO GROUPS返回消费者组列表
XINFO STREAM存储在的key流的相关信息
XLEN获取 Stream 中的消息长度
XPENDING通过消费者组从流中获取数据但不确认这些数据,会产生待处理条目
XRANGE获取消息列表(可以指定范围)
XREAD获取消息(阻塞/非阻塞),返回大于指定 ID 的消息
XREADGROUPXREAD命令的一个特殊版本,支持消费者组
XREVRANGEXRANGE 相比区别在于反向获取,ID从大到小
XSETID内部命令。它用于主节点来复制流的最后传递的ID
XTRIM限制 Stream 的长度,如果已经超长会进行截取

2.1 XADD

XADD 命令用于向 Stream(流)数据结构末尾添加消息。

语法格式:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

参数说明:

  • key:指定要添加消息的 Stream 的名称。
  • [NOMKSTREAM]:可选参数,用于指定当流不存在时是否报错。默认情况下,如果指定的流不存在,XADD命令会创建。如果使用NOMKSTREAM选项,则流不存在时命令会失败。
  • [MAXLEN|MINID [=|~] threshold [LIMIT count]]:这组选项用于控制流的最大长度或最小消息 ID
    • MAXLEN maxlen:限制 Stream 的最大长度。当长度达到maxlen时,旧的消息会被自动删除。
    • MINID minid:指定最旧的消息ID。当插入新消息时,如果已经存在比minid更旧的消息,则会将这些消息删除。
    • [=|~]:操作符,=表示精确匹配,~表示小于等于(对于MINID而言)或大于等于(对于MAXLEN而言)。
    • [LIMIT count]:当使用MAXLEN~时,指定需要保留的消息数量的最小值。
  • *|ID:消息的ID。使用*表示自动生成一个唯一的ID。如果不使用*,则需要提供一个有效的消息ID,它必须大于流中所有已存在的消息的ID
  • field value [field value ...]:消息的字段和值。可以指定一个或多个字段及其对应的值。

示例,插入消息:

localhost:0>XADD mystream * msg_1 100 msg_2 38
"1719279960591-0"

示例, 插入消息,并限制长度不超过 1000 条:

localhost:0>XADD mystream MAXLEN 1000 * msg_3 100 msg_4 38
"1719279971749-0"

查看控制台:

在这里插入图片描述

2.2 XRANGE

XRANGE 命令用于获取指定范围内的消息。

命令格式:

XRANGE key start end [COUNT count]

参数说明:

  • key:指定 Streamkey
  • start:指定要检索的消息范围的起始 ID 。可以使用特殊值-来表示最小值。
  • end:指定要检索的消息范围的结束 ID 。可以使用特殊值+来表示最大值。
  • [COUNT count]:可选参数,用于限制返回的消息数量。

注意事项:

  • Stream 的消息 ID 由两部分组成:一个时间戳和一个序列号。时间戳表示消息被添加到 Stream 的时间,而序列号则用于在同一时间戳内区分不同的消息。
  • XRANGE 命令返回的消息是按照它们在 Stream 中的顺序排列的,即按照消息 ID 的顺序。
  • 如果在检索消息时使用了 COUNT 参数,但指定的范围内的消息数量少于 COUNT 指定的数量,那么只会返回范围内的所有消息。

示例,检索所有消息:

localhost:0>XRANGE mystream - +1)    1)   "1719279960591-0"2)      1)    "msg_1"2)    "100"3)    "msg_2"4)    "38"2)    1)   "1719279971749-0"2)      1)    "msg_3"2)    "100"3)    "msg_4"4)    "38"

示例,检索特定范围内的消息:

localhost:0>XRANGE mystream  1719279960591-0 1719279960600-01)    1)   "1719279960591-0"2)      1)    "msg_1"2)    "100"3)    "msg_2"4)    "38"

示例,限制返回的消息数量:

localhost:0>XRANGE mystream - + COUNT 11)    1)   "1719279960591-0"2)      1)    "msg_1"2)    "100"3)    "msg_2"4) 

2.3 XREVRANGE

XREVRANGE 命令与 XRANGE 命令类似,但它是按照消息 ID 的递减顺序(用于反向)获取指定范围内的消息。

命令格式:

XREVRANGE key end start [COUNT count]

示例,检索最后两个时间序列的消息:


localhost:0>XREVRANGE mystream + - COUNT 21)    1)   "1719279971749-0"2)      1)    "msg_3"2)    "100"3)    "msg_4"4)    "38"2)    1)   "1719279960591-0"2)      1)    "msg_1"2)    "100"3)    "msg_2"4)    "38"

2.4 XDEL

XDEL 命令用于从 Stream 中删除指定的消息。返回一个整数,表示被成功删除的消息数量。

命令格式:

XDEL key ID [ID ...]

参数说明:

  • key:指定 Streamkey
  • ID:一个或多个要删除的消息的 ID

注意事项:

  • 在使用 XDEL 命令时,你需要确保提供的消息 ID 是存在的,否则命令将不会删除任何消息,并返回0。
  • 可以通过一次 XDEL 命令删除多个消息,只需在命令中提供多个消息 ID 即可。
  • XDEL 命令不会改变 Stream 的其他消息的顺序或 ID

示例,删除消息:

localhost:0>XDEL mystream 1719280747405-0
"1"

2.5 XLEN

XLEN 命令用于获取指定 Stream 中包含的消息数量,即流的长度。如果 Stream 不存在或为空,则返回 0

命令格式:

XLEN key

示例:

localhost:0>XLEN mystream
"1"

2.6 XREAD

XREAD 命令是用于从 Stream 独立消费消息,支持阻塞等待新消息的到来。返回一个数组,其中每个元素都是一个包含 Stream key 和消息列表的数组。消息列表是一个包含消息 ID 和消息数据的数组。

命令格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

参数说明:

  • COUNT count:指定一次读取的最大消息数量。如果未指定,则默认为 1
  • BLOCK milliseconds:用于指定阻塞的时间(以毫秒为单位)。如果指定了此参数,并且 Stream 中没有可消费的消息,客户端将在指定的时间内阻塞等待。如果设置为 0 ,则表示非阻塞模式,即如果没有消息可消费,则立即返回。
  • STREAMS key [key ...]:指定要从中读取消息的 Streamkey 。可以指定一个或多个。
  • ID [ID ...]:对于每个指定的 key ,可以提供一个或多个消息 ID 。这些 ID 用于指定从哪个位置开始读取消息。如果某个 key 后面没有指定 ID ,则默认为从该 Stream 的最新消息开始读取。

示例,非阻塞模式读取最新消息:

XREAD COUNT 1 STREAMS mystream $

示例,阻塞模式,读取最新消息并等待新消息:

XREAD COUNT 1 BLOCK 10000 STREAMS mystream $

2.7 XGROUP CREATE

XGROUP CREATE 命令用于在已存在的流(stream)上创建一个新的消费者组(consumer group)。消费者组允许多个消费者(consumer)协作消费同一个流中的数据,并且每个消费者都可以从自己的位置开始读取流。

命令格式:

XGROUP CREATE <key> <groupname> <id> [MKSTREAM] [MKTABLE] [CREATECONSUMER <consumername>]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <id>:消费者组初始的最后一个条目 ID ,即消费者组开始读取的起始点。可以使用$表示流的最新条目,或者使用0表示流的起始点,或者使用任何其他有效的 ID
  • [MKSTREAM]:可选参数,如果流不存在,则创建它。
  • [MKTABLE]:在 Redis 6.2 及更高版本中引入的可选参数,用于创建与流关联的二级索引表(secondary index table)。这通常用于支持基于特定字段的查询。
  • [CREATECONSUMER <consumername>]:在 Redis 6.2 及更高版本中引入的可选参数,用于在创建消费者组时同时创建一个消费者。

示例,创建一个新的消费者组,从流的最新条目开始读取:

localhost:0>XGROUP CREATE mystream mygroup $ MKSTREAM
"OK"

2.8 XACK

XACK 命令用于确消费者已经成功处理了一个或多个消息。这些消息通常是从流(Stream)中读取的,并存储在消费者组的待处理条目列表(Pending Entry ListPEL)中。通过发送 XACK 命令,消费者通知 Redis 服务器它已经完成了一个或多个消息的处理,从而将这些消息从 PEL 中移除。

命令格式:

XACK <key> <groupname> <consumername> <ID> [<ID> ...]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <consumername>:消费者的名称。
  • <ID>:要确认的消息的ID,可以指定一个或多个。

示例,假设消费者已经读取了一些消息,并决定它们已经被成功处理。现在,消费者想要确认这些消息:

XACK mystream mygroup myconsumer 1526569900000-0 1526569900002-0

在这个例子中,消费者确认了两个消息,它们的 ID 分别是 1526569900000-01526569900002-0

一旦消息被确认,它们将从该消费者组的 PEL 中移除,表示这些消息已经被成功处理。注意,即使消息被确认并从 PEL 中移除,它们仍然保留在流中,并且可以被其他消费者组或消费者读取。

如果消费者在处理消息时失败,或者需要稍后重试,它可以选择不发送 XACK 命令,这样消息将保持在 PEL 中,直到消费者准备好确认它们或它们因超时而被自动从 PEL 中移除(取决于消费者组的配置)。

2.9 XPENDING

XPENDING 命令用于查询消费者组中未确认消息的详细信息。允许你查看哪些消息正在等待被处理,以及哪些消费者拥有这些消息。

命令格式:

XPENDING <key> <groupname> [start end count] [consumername]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • [start end count]:这三个参数是可选的,用于限制查询结果的范围。
  • start:查询的开始消息ID
  • end:查询的结束消息ID
  • count:要返回的消息数量。
  • [consumername]:可选参数,指定要查询的消费者的名称。如果不提供此参数,将返回消费者组中的所有未确认消息。

XPENDING 命令返回一个数组,其中包含以下信息:

  • 总未确认消息数:整数,表示在指定范围内未确认的消息总数。
  • 最小消息ID:字符串,表示在指定范围内未确认消息的最小ID
  • 最大消息ID:字符串,表示在指定范围内未确认消息的最大ID
  • 每个消费者的未确认消息:一个数组,其中每个元素都是一个包含消费者名称和该消费者拥有的未确认消息数的数组。

注意事项:

  • XPENDING 是一个只读命令,它不会修改任何数据。
  • 如果提供了 consumername 参数,则只返回该消费者的未确认消息信息。
  • 如果提供了 [start end count] 参数,则只返回指定范围内的未确认消息信息。
  • 通过 XPENDING 命令,你可以轻松地监控消费者组中的未确认消息,从而确保消息得到及时处理,并在必要时进行故障排除。

示例:

XPENDING mystream mygroup
2) "1526569900000-0"  # 最小消息ID  
3) "1526569900002-0"  # 最大消息ID  
4) 1) 1) "myconsumer" # 消费者名称  2) (integer) 2   # 该消费者拥有的未确认消息数

3. 应用场景

Redis Stream 主要用于消息队列,所以可以用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。但是更推荐使用专业的消息队列,比如RabbitMQRocketMQ等,对于简单的应用场景,如果能满足需求,也可以使用Redis Stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/39266.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【自用】CentOS7.6 安装 node-RED 4.0.2 教程(各种坑都摆脱的版本)

步骤总览 1.下载安装 nodejs 2.安装并配置 node-RED 3.重启服务器&#xff0c;验证 node-RED 是否安装 and 配置成功 一、下载安装 nodejs 1.下载 nodejs 18 为什么要下载 nodejs 18 呢&#xff1f; 因为 node-RED 4.0.1 支持的最低 nodejs 版本就是 nodejs 18。 当然了&a…

实在智能对话钉钉:宜搭+实在Agent,AI时代的工作方式

比起一个需求需要等产品、技术排期&#xff0c;越来越多的人开始追求把自己武装成「全能战士」&#xff0c;通过低代码工具一搭&#xff0c;一个高效的工作平台便产生了。 宜搭是钉钉自研的低代码应用构建平台&#xff0c;无论是专业开发者还是没有代码基础的业务人员&#xf…

不知几DAY的Symfony---RCE复现

感谢红队大佬老流氓的供稿&#xff0c;此篇文章是针对Symfony框架的一个RCE漏洞复现 ​框架简介 Symfony是一个开源的PHP Web框架&#xff0c;它现在是许多知名 CMS 的核心组件&#xff0c;例如Drupal、Joomla!、eZPlatform&#xff08;以前称为 eZPublish&#xff09;或Bolt。…

和鲸“101”计划领航!和鲸科技携手北中医,共话医学+AI 实验室建设及创新人才培养

为进一步加强医学院校大数据管理与应用、信息管理与信息系统&#xff0c;医学信息工程等专业建设&#xff0c;交流实验室建设、专业发展与人才培养经验&#xff0c;6 月 22 日&#xff0c;由北京中医药大学&#xff08;简称“北中医”&#xff09;主办&#xff0c;上海和今信息…

短剧系统开发:如何让你的创意变成现实

短剧系统开发是一个将创意转化为现实的过程&#xff0c;它涉及多个方面&#xff0c;包括需求分析、系统设计、开发环境搭建、前后端开发、测试与发布等。 1. 需求分析 &#xff08;1&#xff09;明确目标&#xff1a;首先&#xff0c;明确短剧系统的目标和定位&#xff0c;包括…

APP逆向 day9 安卓开发基础1

一.前言 app逆向当然要学安卓基础啦&#xff01;今天我们来教安卓基础当然&#xff0c;安卓基础不会教的很多&#xff0c;比java还要少&#xff0c;还是那句话&#xff0c;了解就好。 二.安卓环境搭建 2.1 安卓介绍 如果做安卓开发 需要会java代码安卓SDK(安卓提供的内置…

Hack The Box-Blazorized

总体思路 Blazor JWT->SPN劫持->登录脚本劫持->DCSync 信息收集&端口利用 nmap -sSVC blazorized.htbStarting Nmap 7.94SVN ( https://nmap.org ) at 2024-07-01 02:37 EDT Nmap scan report for blazorized.htb (10.10.11.22) Host is up (0.30s latency). N…

编译调试swift5.7源码

环境&#xff1a; 电脑&#xff1a;apple m1 pro系统&#xff1a;macOS13Xcode: 14.2Cmake: 3.25.1Ninja: 1.11.1sccache: 0.3.3python: 3.10 (如果你的mac不是这个版本&#xff0c;可以通过 brew install python3.10下载&#xff0c;然后看这篇文章切换到该python版本)swift代…

RK3568驱动指南|第十五篇 I2C-第176章 通过逻辑分析仪认识I2C波形

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…

甄选版“论软件系统架构评估”,软考高级论文,系统架构设计师论文

论文真题 对于软件系统,尤其是大规模的复杂软件系统来说,软件的系统架构对于确保最终系统的质量具有十分重要的意义,不恰当的系统架构将给项目开发带来高昂的代价和难以避免的灾难。对一个系统架构进行评估,是为了:分析现有架构存在的潜在风险,检验设计中提出的质量需求,…

mac软件卸载后的残留文件删除 mac如何卸载应用程序

很多人都不知道&#xff0c;mac使用系统方式卸载后会有残留文件未被删除&#xff0c;久而久之就会占用大量的磁盘空间。今天小编就来教大家如何删除mac软件卸载后的残留文件&#xff0c;如果你想不留痕迹的删除&#xff0c;mac又该如何正确卸载应用程序&#xff0c;本文将一一为…

Python 获取字典中的值(八种方法)

Python 字典(dictionary)是一种可变容器模型&#xff0c;可以存储任意数量的任意类型的数据。字典通常用于存储键值对&#xff0c;每个元素由一个键&#xff08;key&#xff09;和一个值(value&#xff09;组成&#xff0c;键和值之间用冒号分隔。 以下是 Python 字典取值的几…

嵌入式软件工程应该学些什么?

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「嵌入式的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“666”之后私信回复“666”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;毕业后相当嵌入式软件工程…

从理论到实践的指南:企业如何建立有效的EHS管理体系?

企业如何建立有效的EHS管理体系&#xff1f;对于任何企业&#xff0c;没有安全就谈不上稳定生产和经济效益&#xff0c;因此建立EHS管理体系是解决企业长期追求的建立安全管理长效机制的最有效手段。良好的体系运转&#xff0c;可以最大限度地减少事故发生。 这篇借着开头这个…

C++ (第二天下午---面向对象之类与对象)

一、面向过程与面向对象 1、面向过程 面向过程是一种以事件为中心的编程思想&#xff0c;编程的时候把解决问题的步骤分析出来&#xff0c;然后用函数把这些步骤实现&#xff0c;在一步一步的具体步骤中再按顺序调用函数。 举个例子&#xff0c;下五子棋&#xff0c;面向过程…

LLM大模型工程师面试经验宝典--进阶版2(2024.7月最新)

目录 1 大模型怎么评测&#xff1f; 2 大模型的honest原则是如何实现的&#xff1f;模型如何判断回答 的知识是训练过的已知的知识&#xff0c;怎么训练这种能力&#xff1f; 3 如何衡量大模型水平&#xff1f; 4 大模型评估方法 有哪些&#xff1f; 5 大模型评估工具 有哪…

解锁数据资产的无限潜能:深入探索创新的数据分析技术,挖掘其在实际应用场景中的广阔价值,助力企业发掘数据背后的深层信息,实现业务的持续增长与创新

目录 一、引言 二、创新数据分析技术的发展 1、大数据分析技术 2、人工智能与机器学习 3、可视化分析技术 三、创新数据分析技术在实际应用场景中的价值 1、市场洞察与竞争分析 2、客户细分与个性化营销 3、业务流程优化与风险管理 4、产品创新与研发 四、案例分析 …

Python处理Excel文件的实用技巧使用详解

概要 在数据分析和处理的过程中,Excel 是一种广泛使用的数据存储和交换格式。Python 提供了多个强大的库来处理 Excel 文件,如 pandas、openpyxl 和 xlrd 等。本文将详细介绍如何使用这些库进行 Excel 文件的常用操作,包括读取、写入、修改和格式化等。 使用 pandas 处理 E…

仪器校准的概念与定义,计量校准是什么?

仪器校准的定义&#xff0c;在之前所颁布的《国际计量学词汇 基础和通用概念及相关术语》文件中&#xff0c;已经有了明确说明&#xff0c;而该文件做了修改以后&#xff0c;在后续新的定义中&#xff0c;仪器校准具体被分为两部分&#xff0c;第一步是将被计量仪器和计量校准的…

数据库测试数据准备厂商 Snaplet 宣布停止运营

上周刚获知「数据库调优厂商 OtterTune 宣布停止运营」。而今天下班前&#xff0c;同事又突然刷到另一家海外数据库工具商 Snaplet 也停止运营了。Snaplet 主要帮助开发团队在数据库中生成仿真度高且合规的测试数据。我们在年初还撰文介绍过它「告别手搓&#xff01;Postgres 一…