Redis Stream消息队列

什么是Stream?

Stream 实际上是一个具有消息发布/订阅功能的组件,也就常说的消息队列。其实这种类似于 broker/consumer(生产者/消费者)的数据结构很常见,比如 RabbitMQ 消息中间件、Celery 消息中间件,以及 Kafka 分布式消息系统等,而 Redis Stream 正是借鉴了 Kafaka 系统。

1) 优点

Strean 除了拥有很高的性能和内存利用率外, 它最大的特点就是提供了消息的持久化存储,以及主从复制功能,从而解决了网络断开、Redis 宕机情况下,消息丢失的问题,即便是重启 Redis,存储的内容也会存在。

2) 流程

Stream 消息队列主要由四部分组成,分别是:消息本身、生产者、消费者和消费组,对于前述三者很好理解,下面了解什么是消费组。

一个 Stream 队列可以拥有多个消费组,每个消费组中又包含了多个消费者,组内消费者之间存在竞争关系。当某个消费者消费了一条消息时,同组消费者,都不会再次消费这条消息。被消费的消息 ID 会被放入等待处理的 Pending_ids 中。每消费完一条信息,消费组的游标就会向前移动一位,组内消费者就继续去争抢下消息。

 Redis Stream 消息队列结构程如下图所示:

Redis Stream结构图

 

下面对上图涉及的专有名词做简单解释:
  • Stream direction:表示数据流,它是一个消息链,将所有的消息都串起来,每个消息都有一个唯一标识 ID 和对应的消息内容(Message content)。
  • Consumer Group :表示消费组,拥有唯一的组名,使用 XGROUP CREATE 命令创建。一个 Stream 消息链上可以有多个消费组,一个消费组内拥有多个消费者,每一个消费者也有一个唯一的 ID 标识。
  • last_delivered_id :表示消费组游标,每个消费组都会有一个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :Redis 官方称为 PEL,表示消费者的状态变量,它记录了当前已经被客户端读取的消息 ID,但是这些消息没有被 ACK(确认字符)。如果客户端没有 ACK,那么这个变量中的消息 ID 会越来越多,一旦被某个消息被 ACK,它就开始减少。
3) ACK 

ACK(Acknowledge character)即确认字符,在数据通信中,接收方传递给发送方的一种传输类控制字符。表示发来的数据已确认接收无误。在 TCP/IP 协议中,如果接收方成功的接收到数据,那么会回复一个 ACK 数据。通常 ACK 信号有自己固定的格式,长度大小,由接收方回复给发送方。

常用命令汇总

Redis Stream命令
命令说明
XADD 添加消息到末尾。
XTRIM对 Stream 流进行修剪,限制长度。
XDEL删除指定的消息。
XLEN获取流包含的元素数量,即消息长度。
XRANGE获取消息列表,会自动过滤已经删除的消息。
XREVRANGE 反向获取消息列表,ID 从大到小。
XREAD以阻塞或非阻塞方式获取消息列表。
XGROUP CREATE创建消费者组。
XREADGROUP GROUP读取消费者组中的消息。
XACK将消息标记为"已处理"。
XGROUP SETID为消费者组设置新的最后递送消息ID。
XGROUP DELCONSUMER删除消费者。
XGROUP DESTROY删除消费者组。
XPENDING显示待处理消息的相关信息。
XCLAIM 转移消息的归属权。
XINFO查看 Stream 流、消费者和消费者组的相关信息。
XINFO GROUPS查看消费者组的信息。
XINFO STREAM 查看 Stream 流信息。
XINFO CONSUMERS key group查看组内消费者流信息。

创建消息ID

当创建一个 Srteam 时, 需要创建消息 ID,该 ID 是唯一、不可重复的,并且只增不减。消息 ID 有两种创建方式,一是系统自动生成,二是自定义创建。

1) 系统自动创建

语法格式如下:

XADD key ID field value [field value ...]

参数说明如下:

  • key :指定队列名称,如果不存就创建;
  • ID :消息 id,我们使用*表示由 redis 生成,可以自定义,但是要自己保证递增性;
  • field value :消息记录。


返回值是毫秒时间戳格式的字符串。比如 1610619132674-2,它表示在该毫秒内产生的第 2 条消息。使用示例:

XADD mystream * username cc 10

2) 自定义ID

自定义 ID 比较简单,但是需要注意的是 ID 的形式必须是 “整数”,并且后面加入消息的 ID 必须大于前面消息的 ID,也就是自定义 ID 也必须遵守递增的规则。示例如下:

XADD mystream1 001 name zhangsan addr hebei

创建消费组

Redis Stream通过XGROUP CREATE指令创建消费组(Consumer Group),在创建时,需要传递起始消息的 ID 用来初始化 last_delivered_id 变量。语法格式如下:

<span style="color:#444444">XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]</span>

参数说明如下:

  • key :指定 Stream 队列名称,若不存在则自动创建。
  • groupname :自定义消费组的名称,不可重复。
  • $ :表示从尾部开始消费,只接受新消息,而当前 Stream 的消息则被忽略。

消费消息

Redis Stream 通过XREADGROUP命令使消费组消费信息,它和XREAD命令一样,都可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PLE(正在处理的消息)结构里,客户端处理完毕后使用 XACK 命令通知 Redis 服务器,本条消息已经处理完毕,该消息的 ID 就会从 PEL 中移除。示意图如下

redis stream

XREADGROUP命令的语法格式如下所示:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]  

参数说明如下:

  • group :消费组名称。
  • consumer :消费者名称。
  • count : 要读取的数量。
  • milliseconds : 阻塞时间,以毫秒为单位。
  • key :  键指定的队列名称。
  • ID : 表示消息 ID。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/164102.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

字符串匹配算法——KMP

有文本串aabaabaaf&#xff0c;模式串aabaaf问文本串中是否出现过模式串 暴力解法 最不用动脑子的&#xff0c;直接两层for循环&#xff0c;逐个匹配&#xff0c;匹配到不相等的值时把文本串后移一位&#xff0c;再重新比较。这种方法的复杂度是O(mn)&#xff0c;该方法低效的…

关键字const的修饰(指针)

A.const修饰变量 变量是可以修改的&#xff0c;如果把变量的地址交给⼀个指针变量&#xff0c;通过指针变量的也可以修改这个变量。 但是如果我们希望⼀个变量加上⼀些限制&#xff0c;不能被修改&#xff0c;怎么做呢&#xff1f;这就是const的作⽤。 #include <stdio.h&…

postpresql 查询某张表的字段名和字段类型

postpresql 查询某张表的字段名和字段类型 工作中第一次接触postpresql&#xff0c;接触到这么个需求&#xff0c;只是对sql有点了解&#xff0c;于是就网上查阅资料。得知通过系统表可以查询&#xff0c;设计到几张系统表&#xff1a;pg_class、pg_attrubute、information_sc…

axios二次封装配置请求拦截器和响应拦截器

我们为什么要对axios进行二次封装&#xff1f; 因为我们可以使用请求拦截器在发送请求之前处理一些业务&#xff0c;使用响应拦截器在服务器数据返回后处理一些业务。 我们通常创建一个api文件夹&#xff0c;再创建一个request.js文件&#xff0c;用于存放重写后的axios。 /…

SiP系统级封装、SOC芯片和合封芯片主要区别!合封和sip一样吗?

SiP系统级封装、SOC芯片和合封芯片技术是三种备受关注的技术。它们在提高系统性能、稳定性和功耗效率方面都发挥着重要作用 但在集成方式、应用领域和技术特点等方面存在一些区别。本文将从多个角度对这三种技术进行深入解读。 一、集成方式 合封芯片则是一种将多个芯片或不…

Vue弹窗的使用与传值

使用element-UI中的Dialog 对话框 vue组件结合实现~~~~ 定义html <div click"MyAnalyze()">我的区划</div><el-dialog title"" :visible.sync"dialogBiomeVisible"><NationalBiome :closeValue"TypeBiome" cl…

轻松入门Axios:前端开发中的HTTP利器

轻松入门Axios&#xff1a;前端开发中的HTTP利器 前言为什么选择Axios1. **简单易用:**2. **功能丰富:**3. **广泛支持的浏览器和环境:**4. **跨域支持:**5. **社区活跃:**6. **对于处理错误的友好性:**7. **对于并发请求的支持:** 安装与引用1. 使用 npm 安装 Axios&#xff1…

基于51单片机车载空调系统设计proteus仿真+源程序)

一、系统方案 1、本设计采用这51单片机作为主控器。 2、DS18B20采集温度值送到液晶1602显示。 3、按键设置报警值。 4、温度控制风扇档位。 二、硬件设计 原理图如下&#xff1a; 三、单片机软件设计 1、首先是系统初始化 /T0初始化*/ void init_t0() { //TMOD0x01;//定时器…

数据库实验三 Sql多表查询和视图

数据库实验三 Sql多表查询和视图 一、Sql表二、在线练习 一、Sql表 www.db-book.com 二、在线练习 对所有表执行查询语句&#xff0c;查看有哪些数据。 select * from tableName; 一、执行以下查询语句&#xff0c;写出查询意图。 (1) select * from student,takes whe…

经典滑动窗口试题(一)

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、将x减到0的最小操作数1、题目讲解2、讲解算法原理3、代码实现 二、无重复的最长子串1、题…

OpenCV数据类型及CV_16UC1深度图ros订阅

最近用到深度图,对其数据类型及显示有些迷惑,记笔记于此: 目录 一、cv::Mat 的数据类型及转换方式1. cv::Mat 数据类型2. cv::Mat 数据类型互转2.1 OpenCV数据类型转换的函数2.2 可视化深度图像(CV_16UC1)二、cv::Mat 与 sensor_msgs::msg::Image 互转(基于cv_bridge)1.…

黑臭水体的“黑”和“臭”形成的机理

水体“黑”和“臭”即呈现令人不悦的颜色和(或)散发令人不适气味的水体。由于水环境遭受超过其自净能力的有机污染&#xff0c;有机物的好氧分解使水体中耗氧速率大于复氧速率&#xff0c;造成水体缺氧&#xff0c;致使有机物降解不完全、速度减缓&#xff0c;厌氧生物降解过程…

mybatis 语法使用各种踩坑(持续更新中。。。)

1、大小写命名&#xff1a;这个别说了&#xff0c;都是泪。 2、联表查询查询&#xff0c;多条合成一条&#xff0c;不生效的原因 博主各种检查关联关系和字段大小写&#xff0c;本来是4条数据最后合成一条数据&#xff0c;死活给你直接返回了4条数据&#xff0c;而且每个类似p…

leetcode刷题之用栈实现队列(C语言版)

leetcode刷题之用栈实现队列&#xff08;C语言版&#xff09; 一、题目描述二、题目要求三、题目解析Ⅰ、typedef structⅡ、MyQueue* myQueueCreateⅢ、void myQueuePush(MyQueue* obj, int x)Ⅳ、int myQueuePeek(MyQueue* obj)Ⅴ、int myQueuePop(MyQueue* obj)Ⅶ、bool myQ…

邦芒忠告:求职者面试时绝不能说的8件事

求职者在面试时应该注意言行举止&#xff0c;避免提及一些敏感或不合适的话题&#xff0c;以下是一些绝不能说的事情&#xff1a; 1、攻击性言辞&#xff1a;不要使用攻击性言辞&#xff0c;如贬低、批评或攻击公司、同事或竞争对手等&#xff0c;这会给人留下不成熟、不尊重他…

新手必看!!附源码!!STM32通用定时器-比较输出PWM

一、什么是PWM? PWM&#xff08;脉冲宽度调制&#xff09;是一种用于控制电子设备的技术。它通过调整信号的脉冲宽度来控制电压的平均值。PWM常用于调节电机速度、控制LED亮度、产生模拟信号等应用。 二、PWM的原理 PWM的基本原理是通过以一定频率产生的脉冲信号&#xff0…

SPSS多元对应分析

前言&#xff1a; 本专栏参考教材为《SPSS22.0从入门到精通》&#xff0c;由于软件版本原因&#xff0c;部分内容有所改变&#xff0c;为适应软件版本的变化&#xff0c;特此创作此专栏便于大家学习。本专栏使用软件为&#xff1a;SPSS25.0 本专栏所有的数据文件请点击此链接下…

红队攻防实战之钉钉RCE

我这一生如履薄冰&#xff0c;你说我能走到对岸吗&#xff1f; 本文首发于SecIN社区&#xff0c;原创作者即是本人 前言 网络安全技术学习&#xff0c;承认⾃⼰的弱点不是丑事。只有对原理了然于⼼&#xff0c;才能突破更多的限制。拥有快速学习能力的白帽子&#xff0c;是不…

vue3 教程(中)

侦听器 用于侦听指定变量&#xff0c;当其响应式状态变化时触发回调函数。 watch() watch() 需明确指定侦听的数据源&#xff0c;并且仅当数据源变化时&#xff0c;才会执行回调&#xff0c;在创建侦听器时&#xff0c;不会执行回调&#xff0c;可以获取到数据源变化前后的值…

Flutter 父子组件通信

在Flutter 中父组件调用子组件的方法可以通过GlobalKey实现&#xff0c;而子组件调用父组件方法可以通过回调函数实现。 父组件 class _MyHomePageState extends State<MyHomePage> {final GlobalKey<LoadPencilState> loadPencilKey GlobalKey<LoadPencilSt…