09- Redis 中的 Stream 数据类型和应用场景

1. 介绍

Redis Stream 是 Redis 5.0 版本新增加的数据类型,Redis 专门为消息队列设计的数据类型。

在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:

  • 发布订阅模式,不能持久化也就无法可靠的保证消息,并且对于离线重连的客户端不能读取历史消息的缺陷;

  • List 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 Id

基于以上问题,Redis 5.0 便推出了 Stream 类型也是此版本最重要的功能,用于完美地实现消息队列,它支持消息的持久化、支持自动生成全局唯一 Id、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。

2. 常见命令

Stream 消息队列操作命令:

  • XADD:插入消息,保证有序,可以自动生成全局唯一 Id;

  • XLEN:查询消息长度;

  • XREAD:用于读取消息,可以按 ID 读取数据;

  • XDEL:根据消息 ID 删除消息;

  • DEL:删除整个 Stream;

  • XRANGE:读取区间消息

  • XREADGROUP:按消费组形式读取消息;

  • XPENDING 和 XACK:

    • XPENDING 命令可以用来查询每个消费组内所有消费者【已读取、但尚未确认】的消息;

    • XACK 命令用于向消息队列确认消息处理已完成;

3. 应用场景

3.1 消息队列

生产者通过 XADD 命令插入一条消息:

# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID
# 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 a
> XADD mymq * name a
"1654254953808-0"

插入成功后会返回全局唯一的 Id:"1654254953808-0"。消息的全局唯一 ID 由两部分组成:

  • 第一部分"1654254953808"是出具插入时,以毫秒为单位计算的当前服务器时间;

  • 第二部分表示插入消息再当前毫秒内的消息序号,这是从 0 开始编号的,例如,"1654254953808-0"就表示在"1654254953808"毫秒内的第 1 条消息。

消费者通过 XREAD 命令从消息队列中读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取(注意是输入消息 ID 的下一条消息开始读取,不是查询输入 ID 的消息)。

# 从 ID 号为"1654254953807-0"的消息开始,读取后续的所有消息(示例中一共 1 条)
> XREAD STREAMS mymq 1654254953808-0
1)1) "mymq"2)1)1) "1654254953808-0"2)1) "name"2) "a"

如果想要实现阻塞读(当没有数据时,阻塞住),可以调用 XREAD 时设定 BLOCK 配置项,实现类似于 BRPOP 的阻塞读取操作。

比如,下面这命令,设置了 BLOCK 10000 的配置项,10000 的单位是毫秒,表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000毫秒(即 10 秒),然后再返回。

# 命令最后的 $ 符号表示读取最新的消息
> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.00s)

Stream 的基础方法,使用 xadd 存入消息和 xread 循环阻塞读取消息的方式可以实现简易版的消息队列,交互流程如下图所示:

     xadd 发送数据        xread 拉取数据
生产者----------->Stream<-----------消费者

前面介绍的这些操作 List 也支持,接下来看看 Stream 特有的功能

Stream 可以使用 XGROUP 创建消费组,创建消费组后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。

创建两个消费组,这两个消费组消费的消息队列是 mymq,都指定从第一条消息开始读取:

# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取
> XGROUP CREATE mymq group1 0-0
OK
​
# 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取
> XGROUP CREATE mymq group2 0-0
OK

消费组 group1 内的消费者 consumer1 从 mymq 消息队列中读取所有消息的命令如下:

# 命令最后的参数 > 表示从第一条尚未被消费的消息开始读取。
> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
1) 1) "mymq"2) 1) 1) "1654254953808-0"2) 1) "name"2) "a"

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:

> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)

但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消费组的时候,不同消费组指定了相同位置开始读取消息)

比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1654254953808-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:

> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
1) 1) "mymq"2) 1) 1) "1654254953808-0"2) 1) "name"2) "a"

因为创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1654254953808-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"2) 1) 1) "1654254953808-0"2) 1) "name"2) "a"
# 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
1) 1) "mymq"2) 1) 1) "1654256265584-0"2) 1) "name"2) "b"
# 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
1) 1) "mymq"2) 1) 1) "1654256271337-0"2) 1) "name"2) "c"

基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?

Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Stream “消息已处理完成”。

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下所示:

                   xgroup create            xreadgroup_____________>Group1<---------------> 接受者xadd         |                            ack
发布者------->Stream|_____________>Group2xgroup create

如果消费者没有成功处理消息,它就不会给 Stream 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未处理完成的消息

例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:

127.0.0.1:6379> XPENDING mymq group2
1) (integer) 3
2) "1654254953808-0"  # 表示 group2 中所有消费者读取的消息最小 ID
3) "1654256271337-0"  # 表示 group2 中所有消费者读取的消息最大 ID
4) 1) 1) "consumer1"2) "1"2) 1) "consumer2"2) "1"3) 1) "consumer3"2) "1"

如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:

# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息
> XPENDING mymq group2 - + 10 consumer2
1) 1) "1654256265584-0"2) "consumer2"3) (integer) 4107004) (integer) 1

可以看到,consumer2 已读取的消息的 ID 是 1654256265584-0。

一旦消息 1654256265584-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除

> XACK mymq group2 1654256265584-0
(integer) 1

当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

> XPENDING mymq group2 - + 10 consumer2
(empty array)

好了,基于 Stream 实现的消息队列就说到这里,小结一下:

  • 消息保存:XADD/XREAD

  • 阻塞读取:XREAD block

  • 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;

  • 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消息组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;

  • 支持消费组形式消费数据

Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?

一个专业的消息队列,必须要做到两大块:

  • 消息不丢

  • 消息可堆积

1、Redis Stream 消息会丢失吗?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。

Redis Stream 消息队列能不能保证三个环节都不丢失数据?

  • Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到(MQ 中间件)的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。

  • Redis 消费者会不会丢消息?不会,因为 Stream(MQ 中间件)会自动使用内部队列(PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。

  • Redis 消息中间件会不会丢消息?,Redis 在以下 2 个场景下,都会导致数据丢失:

    • AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能

    • 主从复制也是异步的,主从切换时,也存在丢失数据的可能

可以看到,Redis 在队列中间件环节无法保证消息不丢。像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写【多个节点】,也就是有多个副本,这样一来,即使其中一个节点挂了,也能保证集群的数据不丢失。

2、Redis Stream 消息可堆积吗?

Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

所以 Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况的发生。

当指定队列最大长度时,队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

但 Kafka、RabbitMQ 专业的消息队列它们的数据都是存储在磁盘上,当消息积压时,无非就是多占用一些磁盘空间。

因此,把 Redis 当做队列来使用,会面临的两个问题:

  • Redis 本身可能会丢数据。

  • 面对消息积压,内存资源会紧张。

所以,能不能将 Redis 作为消息队列来使用,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当做队列是完全可以的。

  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

补充:Redis 发布/订阅机制为什么不可以作为消息队列?

发布订阅机制存在以下缺点,都是跟丢失数据有关:

  1. 发布/订阅机制没有基于任何数据类型实现,所以不具备【数据持久化】的能力,也就是发布/订阅机制的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,发布/订阅机制的数据也会全部丢失

  2. 发布订阅模式是“发后既忘“的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息。

  3. 当消费端有一定的消息积压时,也就是生产者发送的消息,消费者消费不过来时,如果超过 32 M 或者是 60s 内持续保持在 8 M 以上,消费端会被强行断开,这个参数实在配置文件中设置的,默认值是 client-output-buffer-limit pubsub 32mb 8mb 60

所以,发布/订阅机制只适合即时通讯的场景,比如构建哨兵集群的场景采用了发布/订阅机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/844915.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python魔法之旅-魔法方法(05)

目录 一、概述 1、定义 2、作用 二、应用场景 1、构造和析构 2、操作符重载 3、字符串和表示 4、容器管理 5、可调用对象 6、上下文管理 7、属性访问和描述符 8、迭代器和生成器 9、数值类型 10、复制和序列化 11、自定义元类行为 12、自定义类行为 13、类型检…

Linux系统使用Docker安装Drupal结合内网穿透实现远程访问管理后台

目录 前言 1. Docker安装Drupal 2. 本地局域网访问 3 . Linux 安装cpolar 4. 配置Drupal公网访问地址 5. 公网远程访问Drupal 6. 固定Drupal 公网地址 前言 作者简介&#xff1a; 懒大王敲代码&#xff0c;计算机专业应届生 今天给大家聊聊Linux系统使用Docker安装Drupal…

golang开发 gorilla websocket的使用

很多APP都需要主动向用户推送消息&#xff0c;这就需要用到长连接的服务&#xff0c;即我们通常提到的websocket&#xff0c;同样也是使用socket服务&#xff0c;通信协议是基本类似的&#xff0c;在go中用的最多的、也是最简单的socket服务就是gorilla/websocket&#xff0c;它…

Mybatis 查询TypeHandler使用,转译查询数据(逗号分隔转List)

创建自定义的Hanndler /*** Package: com.datalyg.common.core.handler* ClassName: CommaSeparatedStringTypeHandler* Author: dujiayu* Description: 用于mybatis 解析逗号拼接字符串* Date: 2024/5/29 10:03* Version: 1.0*/ public class CommaSeparatedStringTypeHandle…

SAP Build引言

前言 SAP Build 似乎是一个整合了很多低代码或无代码产品的平台&#xff0c;最早的时候应该都是各自分开的几个产品&#xff0c;近年合并到一块上了SAP Build平台 现在看官网的介绍应该是有三四个产品被集成进来了&#xff0c;分别是SAP IRPA&#xff0c;SAP Workflow&#xf…

c# 输出二进制字符串

参考链接 C#二进制输出数据_c# 输出二进制 123.5的方法-CSDN博客https://blog.csdn.net/a497785609/article/details/4572112标准数字格式字符串 - .NET | Microsoft Learnhttps://learn.microsoft.com/zh-cn/dotnet/standard/base-types/standard-numeric-format-strings#BFo…

linux 配置端口转发

当我们内网服务器没有公网IP&#xff0c;但需要将服务暴露出去时&#xff0c;可以采用如下方式&#xff0c;即通过跳板机&#xff08;带公网IP&#xff09;做转发&#xff0c;下面得例子演示将mysql 3306 服务暴露出去 192.168.10.25 运行mysql服务3306192.168.10.30跳板机&…

13、电科院FTU检测标准学习笔记-录波功能1

作者简介&#xff1a; 本人从事电力系统多年&#xff0c;岗位包含研发&#xff0c;测试&#xff0c;工程等&#xff0c;具有丰富的经验 在配电自动化验收测试以及电科院测试中&#xff0c;本人全程参与&#xff0c;积累了不少现场的经验 ———————————————————…

js高级—基础深入总结

文章目录 1. 数据类型1.1. 常见数据类型1.2.数据类型判断&#xff1a;1.3.underfined与null的区别1.4.什么时候给变量赋值为null1.5. 严格区别变量类型与数据类型&#xff1a; 2. 数据变量和内存2.1. 什么是数据2.2. 什么是内存2.3. 什么是变量2.4.内存、数据、变量三者关系问题…

【面试题】Node.js高频面试题

简述 Node. js 基础概念 &#xff1f; Node.js是一个基于Chrome V8引擎的JavaScript运行环境。它使得JavaScript可以在服务器端运行&#xff0c;从而进行网络编程&#xff0c;如构建Web服务器、处理网络请求等。Node.js采用事件驱动、非阻塞I/O模型&#xff0c;使其轻量且高效…

使用DMS肌肉刺激仪的时候肌肉肿了是怎么回事?

当使用DMS&#xff08;深层肌肉刺激仪&#xff09;时&#xff0c;肌肉出现肿胀可能有多种原因。以下是对可能原因的分点表示和归纳&#xff1a; DMS治疗过程中的刺激反应&#xff1a; DMS治疗过程中&#xff0c;肌肉受到强烈的刺激&#xff0c;可能会导致局部肌肉炎症、水肿和…

【科普向】【文末附gpt升级秘笈】人工智能领域的风云变幻:从OpenAI到Anthropic的人才流动与技术走向

人工智能领域的风云变幻&#xff1a;从OpenAI到Anthropic的人才流动与技术走向 摘要&#xff1a;人工智能领域的竞争日趋激烈&#xff0c;技术巨头间的人才流动和团队重组成为常态。本文通过分析OpenAI前首席安全研究员Jan Leike加入Anthropic公司这一事件&#xff0c;探讨人工…

vue el-table使用、el-popover关闭、el-image大图预览

1、html <el-table :data"list" :header-cell-style"{ background: #F7F8F9 }"><el-table-column type"index" width"100px" label"序号"></el-table-column><el-table-column prop"pic" l…

socks5 如何让dns不被污染

问题 发现firefox浏览器代理设置成socks5后&#xff0c;查看ip是成功了&#xff0c;但是谷歌等海外的还是无法正常访问。 原因 主要原因是socks5连接虽然是成功了&#xff0c;但是dns还是走国内的&#xff0c;国内的dns解析都被污染了导致没法正常访问 解决 把设置里的 使…

图数据集的加载

原文参考官方文档&#xff1a; https://pytorch-geometric.readthedocs.io/en/latest/modules/loader.html torch_geometric.loader 库中&#xff0c; 该库中包含了多种 图数据集的 加载方式&#xff0c; 这里主要介绍 DenseDataLoader and DataLoader 这两者之间的区别&#…

LeetCode 每日一题 数学篇 2894.分类求和并作差

给你两个正整数 n 和 m 。 现定义两个整数 num1 和 num2 &#xff0c;如下所示&#xff1a; num1&#xff1a;范围 [1, n] 内所有 无法被 m 整除 的整数之和。num2&#xff1a;范围 [1, n] 内所有 能够被 m 整除 的整数之和。 返回整数 num1 - num2 。 int differenceOfSum…

每日一题——力扣141. 环形链表(举一反三+思想解读+逐步优化)

一个认为一切根源都是“自己不够强”的INTJ 个人主页&#xff1a;用哲学编程-CSDN博客专栏&#xff1a;每日一题——举一反三Python编程学习Python内置函数 Python-3.12.0文档解读 目录 我的写法 专业点评 时间复杂度分析 空间复杂度分析 总结 我要更强 方法2&#x…

【开发日记】ElementUI表单使用原生@submit提交表单数据

使用submit.native.prevent为el-form设置提交方法。 使用native-type为el-button设置原生按钮类型。 示例如下&#xff1a; <el-form class"search" submit.native.prevent"submitSearch"><div classsearch-item>人员ID :<el-input sizemi…

概率论与数理统计,重要知识点——全部公式总结

二、一维随机变量及其分布 五个分布参考另外一篇文章 四、随机变量的数字特征 大数定理以及中心极限定理 六、数理统计

QT事件----QPaintEvent绘图事件,QwheelEvent鼠标滚轮事件等; 用事件自定义按键;QT事件过滤器。

一、QT事件 1.1 QPaintEvent绘图事件 QPaintEvent 是 Qt 框架中一个重要的事件类,专门用于处理绘图事件。当 Qt 视图组件需要重绘自己的一部分时,就会产生 QPaintEvent 事件。这通常发生在以下几种情况:1. 窗口第一次显示时:当窗口或控件第一次出现在屏幕上时,系统会生成…