Redis 数据类型Streams

目录

1 基本特性

2 主要操作命令 

2.1 XADD key ID field value [field value ...]

2.2 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

2.3 XRANGE key start end [COUNT count]

2.4 XREVRANGE key end start [COUNT count]

2.5 XGROUP CREATE key groupname id-or-$ [MKSTREAM]

2.6 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

2.7 XACK key group ID [ID ...]

2.8 XPENDING key group [start end count] [IDLE idle]

2.9 XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE idle] [TIME time] [RETries count] [FORCE]

2.10 XINFO 

2.11  XDEL key ID [ID ...]

2.12 XTRIM key MAXLEN [~] len

3 使用场景


Redis Streams 是 Redis 5.0 引入的一种新的数据类型,它提供了一种强大的日志结构化数据存储方式。Streams 类型非常适合用于构建消息队列、事件日志以及其他需要持久化和高效处理时间序列数据的应用场景。

1 基本特性

  • 持久性:与传统的发布/订阅不同,Streams 中的消息是持久化的,即使客户端断开连接后重新连接,仍然可以访问到之前的消息。

  • 多消费者支持:支持多个消费者组(consumer groups),每个组可以独立地消费流中的消息。消费者组允许不同的消费者处理相同的消息,但每个消息在一个组内只能被一个消费者处理一次。

  • 消息 ID 和范围查询:每条消息都有一个唯一的 ID,由时间戳和序列号组成。可以通过指定消息 ID 范围来获取特定时间段内的消息。

  • 阻塞读取:支持阻塞读取(XREAD 和 XREADGROUP 命令的 BLOCK 选项),使得客户端可以在没有新消息时等待一段时间。

  • 自动删除:可以设置最大长度(MAXLEN 选项)来限制流的大小,超过长度的消息会自动被删除。

  • 灵活的消息格式:每条消息可以包含多个字段-值对,类似于哈希表,这使得消息可以携带丰富的信息。

2 主要操作命令 

2.1 XADD key ID field value [field value ...]

向指定的流中添加一条新消息,ID 可以是 *(表示自动生成)或指定的时间戳和序列号。

127.0.0.1:6379> xadd mystream * sensor_id 123 temmperature 22.5
"1729306027171-0"

返回的结构可以分为两部分:

  • 时间戳1729306027171 (表示条目被添加的时间,单位是毫秒。你可以将这个时间戳转换为可读的日期和时间格式。
  • 序列号0 (表示在同一毫秒内这是第一个条目。如果在同一毫秒内添加了多个条目,序列号将会递增,例如 1729306027171-11729306027171-2 等。
2.2 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
  • 从一个或多个 Stream 中读取数据。
  • COUNT 指定返回的最大条目数。
  • BLOCK 指定在没有新消息时阻塞的时间(毫秒)。
  • STREAMS 指定要读取的 Stream 和起始 ID。
127.0.0.1:6379> xread count 2 streams mystream 0-0
1) 1) "mystream"2) 1) 1) "1729306027171-0"2) 1) "sensor_id"2) "123"3) "temmperature"4) "22.5"
2.3 XRANGE key start end [COUNT count]
  • 返回指定 ID 范围内的条目。
  • start 和 end 是 ID,可以使用 - 表示最小 ID,+ 表示最大 ID。
127.0.0.1:6379> xrange mystream - +
1) 1) "1729306027171-0"2) 1) "sensor_id"2) "123"3) "temmperature"4) "22.5"
2.4 XREVRANGE key end start [COUNT count]

返回指定 ID 范围内的条目,但按逆序排列。

127.0.0.1:6379> xadd mystream * sensor_id 234 temmperature 23.5
"1729329067777-0"
127.0.0.1:6379> xadd mystream * sensor_id 345
"1729329079135-0"
127.0.0.1:6379> xrevrange mystream + - count 2
1) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
2) 1) "1729329067777-0"2) 1) "sensor_id"2) "234"3) "temmperature"4) "23.5"
2.5 XGROUP CREATE key groupname id-or-$ [MKSTREAM]
  • 创建一个新的消费者组。
  • id-or-$ 是起始位置,可以是具体的 ID 或 $ 表示只消费新的条目。
  • MKSTREAM 如果 Stream 不存在则创建它。
127.0.0.1:6379> xgroup create mystream mygroup 0
OK
2.6 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • 从消费者组中读取数据。
  • GROUP:指定消费者组的名称。
  • consumer:指定消费者的名称。
  • COUNT count:可选参数,指定一次最多读取的消息数量。
  • BLOCK milliseconds:可选参数,如果当前没有可用的消息,命令将阻塞指定的时间(以毫秒为单位),等待新消息的到来。
  • NOACK: 表示不确认消息,通常用于快速消费。
  • STREAMS:指定要读取的流及其对应的 ID。ID 通常是一个特殊值 >,表示只读取新的消息;也可以是具体的 ID,表示从该 ID 开始读取。
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream >
1) 1) "mystream"2) 1) 1) "1729306027171-0"2) 1) "sensor_id"2) "123"3) "temmperature"4) "22.5"2) 1) "1729329067777-0"2) 1) "sensor_id"2) "234"3) "temmperature"4) "23.5"
2.7 XACK key group ID [ID ...]

确认已处理的消息。XACK 命令用于确认消费者组中的消息已经被成功处理。当你使用 XACK 命令时,Redis 会将指定的消息从“待处理”状态转换为“已确认”状态,并从消费者的待处理列表中移除。

127.0.0.1:6379> xack mystream mygroup 1729329067777-0
(integer) 1
127.0.0.1:6379> xack mystream mygroup 1729329079135-0
(integer) 0

当 XACK 命令成功确认一条消息时,返回值为 1,表示该消息已经被确认并且从待处理列表中移除。例如,如果消息 1729329067777-0 是由 consumer1 处理的,并且现在调用 XACK 确认它,那么这条消息将不再出现在 consumer1 的待处理列表中。

2.8 XPENDING key group [start end count] [IDLE idle]

查看待处理的消息。

127.0.0.1:6379> xpending mystream mygroup
1) (integer) 1
2) "1729306027171-0"
3) "1729306027171-0"
4) 1) 1) "consumer1"2) "1"
127.0.0.1:6379> xack mystream mygroup 1729306027171-0
(integer) 1
127.0.0.1:6379> xpending mystream mygroup
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
2.9 XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE idle] [TIME time] [RETries count] [FORCE]

用于将一个或多个消息从一个消费者转移到另一个消费者。这个命令通常用于处理消息超时或重新分配消息的情况。XCLAIM 允许你手动将消息从一个消费者的待处理列表移动到另一个消费者的待处理列表。

127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream >
1) 1) "mystream"2) 1) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
127.0.0.1:6379> xclaim mystream mygroup consumer2 10000 1729329079135-0
1) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
  • mystream:流的名称。
  • mygroup:消费者组的名称。
  • consumer2:目标消费者的名称,即消息将被转移给这个消费者。
  • 10000:消息的空闲时间(以毫秒为单位)。只有那些空闲时间超过这个值的消息才会被转移。
  • 1729329079135-0:要转移的消息 ID。
2.10 XINFO 

获取 Stream 或消费者组的信息。

127.0.0.1:6379> xinfo stream mystream1) "length"2) (integer) 33) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "groups"8) (integer) 19) "last-generated-id"
10) "1729329079135-0"
11) "first-entry"
12) 1) "1729306027171-0"2) 1) "sensor_id"2) "123"3) "temmperature"4) "22.5"
13) "last-entry"
14) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"2) "mygroup"3) "consumers"4) (integer) 25) "pending"6) (integer) 17) "last-delivered-id"8) "1729329079135-0"
127.0.0.1:6379> xinfo consumers mystream mygroup
1) 1) "name"2) "consumer1"3) "pending"4) (integer) 05) "idle"6) (integer) 255317
2) 1) "name"2) "consumer2"3) "pending"4) (integer) 15) "idle"6) (integer) 191940

XINFO STREAM mystream

  • length:

    • 流中的消息总数:3 条。
  • radix-tree-keys:

    • 用于存储流数据的 radix tree 中的键的数量:1 个。
  • radix-tree-nodes:

    • 用于存储流数据的 radix tree 中的节点数量:2 个。
  • groups:

    • 与该流关联的消费者组数量:1 个。
  • last-generated-id:

    • 流中最后生成的消息 ID:1729329079135-0
  • first-entry:

    • 流中的第一条消息:
      • 消息 ID: 1729306027171-0
      • 消息内容:
        • sensor_id123
        • temmperature22.5
  • last-entry:

    • 流中的最后一条消息:
      • 消息 ID: 1729329079135-0
      • 消息内容:
        • sensor_id345

XINFO GROUPS mystream

  • name:

    • 消费者组的名称:mygroup
  • consumers:

    • 该组中的消费者数量:2 个。
  • pending:

    • 该组中待处理的消息数量:1 条。
  • last-delivered-id:

    • 该组中最后一个被交付的消息 ID:1729329079135-0

XINFO CONSUMERS mystream mygroup

  • 第一个消费者:

    • nameconsumer1
    • pending: 待处理的消息数量:0 条
    • idle: 空闲时间(以毫秒为单位):255,317 毫秒(约 4 分钟 15 秒)
  • 第二个消费者:

    • nameconsumer2
    • pending: 待处理的消息数量:1 条
    • idle: 空闲时间(以毫秒为单位):191,940 毫秒(约 3 分钟 12 秒)
2.11  XDEL key ID [ID ...]

从 Stream 中删除一个或多个条目。

127.0.0.1:6379> xdel mystream 1729306027171-0
(integer) 1
127.0.0.1:6379> xrange mystrea - +
(empty list or set)
127.0.0.1:6379> xrange mystream - +
1) 1) "1729329067777-0"2) 1) "sensor_id"2) "234"3) "temmperature"4) "23.5"
2) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
2.12 XTRIM key MAXLEN [~] len

修剪 Stream,保留最多 len 个条目,~ 表示近似长度。

127.0.0.1:6379> xrange mystream - +
1) 1) "1729329067777-0"2) 1) "sensor_id"2) "234"3) "temmperature"4) "23.5"
2) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"
127.0.0.1:6379> xtrim mystream maxlen 1
(integer) 1
127.0.0.1:6379> xrange mystream - +
1) 1) "1729329079135-0"2) 1) "sensor_id"2) "345"

3 使用场景

  • 日志记录:可以用来存储系统的日志信息,方便后续分析和处理。
  • 事件流:处理实时事件,如传感器数据、用户行为等。
  • 消息队列:实现可靠的消息传递系统,支持多个消费者组。
  • 任务队列:管理后台任务,确保任务被正确处理。

更多命令请参考:Commands | Docs 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文件处理新纪元:微信小程序的‘快递员’与‘整理师’

嗨,我是中二青年阿佑,今天阿佑将带领大家如何通过巧妙的文件处理功能,让用户体验从‘杂乱无章’到‘井井有条’的转变! 文章目录 微信小程序的文件处理文件上传:小程序的“快递服务”文件下载:小程序的“超…

Sigrity Power SI Model Extraction模式如何提取电源网络的S参数和阻抗操作指导(一)

Sigrity Power SI Model Extraction模式如何提取电源网络的S参数和阻抗操作指导(一) Sigrity PowerSI是频域电磁场仿真工具,以下图为例介绍如果用它观测电源的网络的S参数以及阻抗的频域曲线. 观测IC端电源网络的自阻抗 1. 用powerSi.exe打开该SPD文件

AWD入门

一、简介 AWD(Attack With Defense,攻防兼备)模式。你需要在一场比赛里要扮演攻击方和防守方,攻者得分,失守者会被扣分。也就是说攻击别人的靶机可以获取 Flag 分数时,别人会被扣分,同时你也要保护自己的主机不被别人…

强心剂!EEMD-MPE-KPCA-LSTM、EEMD-MPE-LSTM、EEMD-PE-LSTM故障识别、诊断

强心剂!EEMD-MPE-KPCA-LSTM、EEMD-MPE-LSTM、EEMD-PE-LSTM故障识别、诊断 目录 强心剂!EEMD-MPE-KPCA-LSTM、EEMD-MPE-LSTM、EEMD-PE-LSTM故障识别、诊断效果一览基本介绍程序设计参考资料 效果一览 基本介绍 EEMD-MPE-KPCA-LSTM(集合经验模态分解-多尺…

【JAVA】第三张_Eclipse下载、安装、汉化

简介 Eclipse是一种流行的集成开发环境(IDE),可用于开发各种编程语言,包括Java、C、Python等。它最初由IBM公司开发,后来被Eclipse Foundation接手并成为一个开源项目。 Eclipse提供了一个功能强大的开发平台&#x…

IDEA如何给debug断点加上筛选条件判断

前言 我们在使用IDEA开发Java应用时,经常是需要进行代码调试的,这就需要打断点进行操作。但有些时候,我们只希望在符合某种条件的情况下,才去到这个断点,不符合的情况下,直接跳过断点,这其实也…

【linux】线程 (三)

13. 常见锁概念 (一)了解死锁 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程占有的,且不释放的资源,而处于的一种永久等待状态 (二)死锁四个必要条件 互斥条件…

RK3568平台开发系列讲解(调试篇)debugfs 文件系统

🚀返回专栏总目录 文章目录 一、debugfs使用案例二、enable debugfs三、debugfs API3.1、创建目录3.2、创建文件3.3、帮助函数四、使用示例📢Linux 上有一些典型的问题分析手段,从这些基本的分析方法入手,你可以一步步判断出问题根因。这些分析手段,可以简单地归纳为下图…

Linux·文件与IO

1. 回忆文件操作相关知识 我们首先回忆一下关于文件的一些知识。 如果一个文件没有内容,那它到底有没有再磁盘中存在?答案是存在,因为 文件 内容 属性,即使文件内容为空,但属性信息也是要记录的。就像进程的…

基于STM32的Android控制智能家政机器人

基于STM32的Android控制智能家政机器人 基于STM32的Android控制智能家政机器人一、项目背景与意义二、系统设计方案三、硬件电路设计四、软件设计与实现4.1 Android端软件设计4.2 机器人端软件设计 五、系统调试与测试六、结论与展望七、附录 基于STM32的Android控制智能家政机…

从外行人的角度解释1Bit的模型,是怎样改变世界的

一个框架,和一篇论文,改变了模型训练的规则 框架是BitNET 论文https://arxiv.org/abs/2410.16144 有人问我什么是1.58Bit 是这样的。 fp16是一般情况下模型训练后产物的精度。 比如qwen2 8B fp16,文件大小15GB 如果量化成Q_4O&#xff…

24下河南秋季教资认定保姆级教程

教师资格认定前需要做的准备材料 准备身份证户口本 居住证 学生证 教师考试合格证明 普通话证书 学历证书 体检合格证书 近期一寸白底证件照 网上报名 河南24下教资认定 网上报名时间:10月21日-11月1日 现场确认 网上审核未通过的宝子,需要…

html+css+js实现Notification 通知

实现效果&#xff1a; 代码实现&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Notif…

k8s use-context是什么

kubectl 的 use-context 命令用于在 Kubernetes 集群中切换上下文&#xff08;context&#xff09;&#xff0c;从而方便地在多个集群或命名空间之间进行操作。一个上下文定义了 kubectl 使用的 集群、用户 和 命名空间 的组合。 use-context 的作用&#xff1a; 每个上下文&…

AOP学习

corol调用serverce不在是直接调用的是调用底层代理对象&#xff0c;由代理对象统一帮我们处理 AOP常见概念 通知类型 切面顺序

【C++】— 一篇文章让你认识STL

文章目录 &#x1f335;1.什么是STL&#xff1f;&#x1f335;2.STL的版本&#x1f335;3.STL的六大组件&#x1f335;4.STL的重要性&#x1f335;5. 如何学习STL&#x1f335;6. 学习STL的三种境界 &#x1f335;1.什么是STL&#xff1f; STL是Standard Template Library的简称…

Matlab软件进行金融时间序列数据的描述性统计代码

1、数据S&P500的收盘价格&#xff0c;return100*log(pt/pt-1) 方法1&#xff1a;用python代码 import numpy as np import pandas as pddef calculate_log_returns(prices):"""计算价格序列的对数收益率。参数:prices (numpy.array): 价格序列。返回:log_…

【实战指南】Vue.js 介绍组件数据绑定路由构建高效前端应用

学习总结 1、掌握 JAVA入门到进阶知识(持续写作中……&#xff09; 2、学会Oracle数据库入门到入土用法(创作中……&#xff09; 3、手把手教你开发炫酷的vbs脚本制作(完善中……&#xff09; 4、牛逼哄哄的 IDEA编程利器技巧(编写中……&#xff09; 5、面经吐血整理的 面试技…

ChatGPT 现已登陆 Windows 平台

今天&#xff0c;OpenAI 宣布其人工智能聊天机器人平台 ChatGPT 已开始预览专用 Windows 应用程序。OpenAI 表示&#xff0c;该应用目前仅适用于 ChatGPT Plus、Team、Enterprise 和 Edu 用户&#xff0c;是一个早期版本&#xff0c;将在今年晚些时候推出"完整体验"。…

LeetCode 热题100之哈希

1.两数之和 思路分析1&#xff08;暴力法&#xff09; 双重循环枚举满足num[i] nums[j] target的索引&#xff0c;刚开始不知道如何返回一对索引。后来知道可以直接通过return {i,j}返回索引&#xff1b;注意&#xff1a;j应该从i1处开始&#xff0c;避免使用两次相同的元素…