redis stream学习总结

文章目录

  • stream
    • Stream基本概念
    • 消息id
    • 消息内容
    • 增删查改
    • 消息生产
      • 添加消息 xadd
      • 查看消息长度 xlen
      • 限制stream最大长度
        • 1.xadd 中添加**maxlen**:
        • 2.xtrim
      • 查询消息 xrange
      • 正向排序:消费id从小到大排
        • 反向查询:消费id从大到小排
      • 删除消息
    • 消息消费
      • 独立消费 xread
      • 消费组
        • stream中出现很多特殊Ids解释
        • 创建消费组
        • 消息消费
    • 查看stream信息
    • 场景问题

stream

Stream基本概念

Redis 5.0 被作者 Antirez 突然发布出来,增加了很多新的特色功能,其最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化消息队列,作者坦言 Redis Stream 极大地借鉴了 Kafka 的设计。

Redis Stream 的结构如图:

在这里插入图片描述
它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。

每个 Stream 都有唯一的名称,它就是 Redis 的 Key,在首次使用 xadd 执行追加消息时自动创建。

每个 Stream 都可以挂多个消费组(Consumer Group),每个消费组会有个游标 last_delivered_id 在 Stream 数组之上往前移动表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create 进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id 变量。

每个消费组的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到

同一个消费组可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者内部会有一个状态变量 pending_ids,它记录了当前已经被客户端读取,但是还没有 ack 的消息。如果客户端没有 ack,这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称为 PEL,也就是 Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没被处理。

消息id

消息 ID 的形式是 TimestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息再毫秒时间戳 1527846880572 时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是 “整数-整数”,而且后面加入的消息的 ID 必须要大于前面的消息 ID。

消息内容

消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。

增删查改

增删改查指令说明如下:

1)xadd:向 Stream 追加消息。

2)xdel:向 Stream 中删除消息,这里的删除仅仅是设置标志位,不影响消息总长度。

3)xrange:获取 Stream 中的消息列表,会自动过滤已经删除的消息。

4)xlen:获取 Stream 消息长度。

5)del:删除整个 Stream 消息列表的所有消息。

消息生产

添加消息 xadd

语法:xadd stream_name Id field value(field value)
记住stream中存储的消息必须是kv类型,不允许是单个String,如下操作都是键值对存储的
使用:

xadd news_live * "2020-08-21 10:20" "国航CA1704航班遇颠簸下降千米,业内人士解释:正常操作 "xadd news_live * "2020-08-21 04:12" "5G成电信业务增长“动力担当” "

*:表示由redis自己生成key,key是由当前时间戳(ms)-0格式

查看消息长度 xlen

语法:*xlen stream_name *
使用:xlen news_live
在这里插入图片描述
添加成功后会显示字符串1597979205554-0,改字符串表示时间戳(毫秒)+计数,有点类似于雪花算法

限制stream最大长度

1.xadd 中添加maxlen:

最多存储消息数,依据FIFO原则,自动删除超过最长长度的消息

语法:xadd stream_name maxlen n id field value(field,value)
记住:每次生成消息都需要带这个参数,如果maxlen不带等于没有限制,支持动态改变maxlen
使用:

图片最多存储3条,每次添加xlen一直是3条

疑问:
1.如果我不知道消息具体的大小,我又如何利用maxlen达到自动删除?

解决:在MAXLEN选项个实际技术之间的~参数意味着:我并不真的需要这恰好1000个项目,它可以是1000或1010或1030,只需确保至少保存1000个项目。使用此参数,仅在我们可以删除整个节点时执行修剪。这使它更有效率,通常是你想要的。
在这里插入图片描述

2.每次创建消息的时候都需要带上比较麻烦,有没有什么更好的办法?

maxlen,,当然有了,xtrim!!!

2.xtrim

XTRIM命令,它执行与上面的MAXLEN选项非常相似的操作,但是此命令不需要添加任何内容,可以以独立方式对任何Stream运行。
XTRIM mystream MAXLEN 10
XTRIM mystream MAXLEN ~ 10

查询消息 xrange

查询是生产者查询自己生产的消息,和消费者的消费不是一回事

正向排序:消费id从小到大排

1.查询所有消息:xrange stream_name - +
使用:xrange news_live - +
在这里插入图片描述
2.指定起始id查询:xrange news_live 1597980701728-0 +
查询的消息id >= 起始id
在这里插入图片描述
3.指定最大id查询:xrange news_live 1597980701728-0 +
查询的消息id <= 结束id

反向查询:消费id从大到小排

1.查询所有消息:srevrange stream_name + -
2.指定起始id查询: xrevrange news + 2
消息id >= 2 倒排
3.指定最大id查询: xrevrange news 2 -
消息id <= 2 倒排

练习命令:
在这里插入图片描述

删除消息

语法:xdel stream_name id
在这里插入图片描述

消息消费

独立消费 xread

类似于List,生产者往list中写数据,消费者从list中读数据,只能有一个消费者
在这里插入图片描述

1.头部读取 0-0
语法:xread count n stream stream_name 0-0

记住:0-0表示从头开启读取数据,这里数据消费记录不会保存,每次都是从头开始,如果接着消费必须自己制定其实id

制定起始id读取:xread count n stream stream_name id
在这里插入图片描述

2.尾部读取最新消息 $

从尾部读取最新的一条消息

语法:
1.xread count n streams stream_name $
此时默认不返回任何消息 ???用途

2.xread block time count n streams stream_name $
time为ms,如果time=0表示一直阻塞
在这里插入图片描述
切记:客户端如果想要使用 xread 进行顺序消费,那么一定要记住当前消费到了那里,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递过去,就可以继续消费后续的消息。

block 0 表示永远阻塞,直到消息到来;block 1000 表示阻塞 1s,如果 1s 内没有任何消息到来,就返回 nil。

消费组

在这里插入图片描述

stream中出现很多特殊Ids解释

  1. “-”:xrange中使用,等同于 小于某个id作用 标识最小id
  2. “+”:xrange中使用,等同于 大于某个id作用 标识最大id
  3. “$”:在给定Stream中已经包含的最大ID,在xread、xcreategroup中标识消费着只能消费最新消息
  4. “>”:在消费者组的上下文中使用,在xread、xreadgroup总标识消费未消费过的消息
  5. “*”:只能与XADD命令一起使用,意味新创建消息自动由redis生成ID。
  6. “0-0”:表示id从0开始读取,xcreategroup标识消费组从id为0开始读取
  7. “0”:表示id从0开始读取,xreadgroup标识消费组消费所有stream中的数据

创建消费组

语法: xgroup create stream_name group_name last_delivered_id

解释:last_delivered_id为0-0 表示从头开始消费,last_delivered_id为 $ 表示从尾部开始消费,只接收新消息,当前stream消息全部忽略

使用:xgroup create news goup1 0-0
在这里插入图片描述
注意:xread、xreadgroup中都可以加上 block指令,标识阻塞等待,直到接受到新的消息或者等待超时

消息消费

语法:xreadgroup group group_name consumer_name count n streams news ids
注意:ids不理解可以看上面的stream中出现很多特殊Ids解释,没有加count标识消费所有的消息

解释:
1.xreadgroup group group_name consumer_name count n streams stream_name >
消费组开始消费未消费的数据

2.xreadgroup group group_name consumer_name count n streams stream_name 0
表示id从0开始消费,意味着消息stream中保留的所有消息,包括已经消费过的

使用:
1.xreadgroup group goup1 consumer1 streams news >
在这里插入图片描述
2.xreadgroup count 0 group gb c1 streams news 0使用注意

xgroup create news gb 0-0
xreadgroup count 0 group gb c1 streams news 0
xreadgroup count 0 group gb c1 streams news >
xreadgroup count 0 group gb c1 streams news 0

第一次xreadgroup 0无法查询到数据,知道 > 消费完以后,xreadgroup 0才能获取到数据,说明该指令获取消费组中消费者为c1已经消息过的数据,如果把消费者c1改成其他值返回无数据。

使用注意:
1.xcreategroup指定ids如果为$,如果xreadgroup ids利用>、0未消费数据、无法读取之前历史,只能读取最新数据
2.xcreategroup指定ids为0-0,则xreadgroup ids利用>、0等都可使用

查看stream信息

1.xinfo stream stream_name
在这里插入图片描述

2.xinfo groups stream_name

在这里插入图片描述

场景问题

1.Stream消息太多时怎么办?

2.ack作用,如果忘记ack会怎样?

3.PEL 如何避免消息丢失?

引用博客:

原理篇:挑战 Kafka!Redis5.0 重量级特性 Stream 尝鲜

使用手册篇:不是特别全的手册

翻译官方文档篇:翻译的比较硬核,不过也不错

命令行:redis5.0的Stream实现消息队列(springboot+jedis简单例子)

pending、消息转移、死信队列

结合应用场景联系篇:直播场景下使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508839.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jedis StreamEntryID参数解释

//$ 在给定Stream中已经包含的最大ID&#xff0c;在xread、xcreategroup中标识消费着只能消费最新消息 StreamEntryID.LAST_ENTRY; “>” 在消费者组的上下文中使用&#xff0c;在xread、xreadgroup总标识消费未消费过的消息 StreamEntryID.UNRECEIVED_ENTRY; 如果传入的为…

RabbitMQ TTL、死信队列在订单支付场景的应用

基于RabbitMQ的TTL以及死信队列&#xff0c;使用SpringBoot实现延迟付款&#xff0c;手动补偿操作。 1、用户下单后展示等待付款页面 2、在页面上点击付款的按钮&#xff0c;如果不超时&#xff0c;则跳转到付款成功页面 3、如果超时&#xff0c;则跳转到用户历史账单中查看…

阿里巴巴Java开发手册-使用JDK8的Opional类来防止出现NPE问题

/*** https://www.baeldung.com/java-optional*/Testpublic void optionalTest(){Peo peo new Peo("weijie", 18);/*** of、ofNullable*///of 判断peo是否为空&#xff0c;如果不为空程序继续执行Optional<Peo> _of Optional.of(peo);//程序直接抛出NullExce…

阿里巴巴Java开发手册-finally块必须对资源对象、流对象进行关闭操作,如果有异常也要做try-cach操作

对于JDK7及以上版本&#xff0c;可以使用try-with-resources方式 使用方式&#xff1a; /*** https://www.cnblogs.com/itZhy/p/7636615.html* 其实这种方式只是语法糖&#xff0c;反编译以后还是tryCacheThrowTest()中的代码* https://www.cnblogs.com/langtianya/p/5139465.h…

阿里巴巴Java开发手册-日志规约

1.【强制】 应用中不可直接使用日志系统(Log4j、Logback)中的API&#xff0c;而应依赖使用的SLF4j中的API。使用门面模式的日志框架吗&#xff0c;有利于维护和各个类的日志处理方式统一。 import org.slf4j.Logger;import org.slf4j.LoggerFactory;Logger logger LoggerFacto…

Java 回调 (Callback) 接口学习使用

文章目录Java 回调 (Callback) 接口学习使用1.什么是回调(Callback)&#xff1f;2.Java代码示例2.直接调用3.接口调用4.Lambda表达式推荐看我的InfoQ地址&#xff0c;界面排版更简洁Java 回调 (Callback) 接口学习使用 1.什么是回调(Callback)&#xff1f; 回调函数&#xff0…

常用的限流算法学习

常用的限流算法有漏桶算法和令牌桶算法&#xff0c;guava的RateLimiter使用的是令牌桶算法&#xff0c;也就是以固定的频率向桶中放入令牌&#xff0c;例如一秒钟10枚令牌&#xff0c;实际业务在每次响应请求之前都从桶中获取令牌&#xff0c;只有取到令牌的请求才会被成功响应…

基于rocketMq秒杀系统demo

基于RocketMQ设计秒杀。 要求&#xff1a; 1. 秒杀商品LagouPhone&#xff0c;数量100个。 2. 秒杀商品不能超卖。 3. 抢购链接隐藏 4. NginxRedisRocketMQTomcatMySQL 实现 接口说明&#xff1a;https://www.liuchengtu.com/swdt/#R9f978d0d00ef9be99f0…

常见压缩算法学习

文章目录无损压缩算法理论基础信息熵熵编码字典编码综合通用无损压缩算法相关常见名词说明java对几种常见算法实现Snappydeflate算法Gzip算法huffman算法Lz4算法Lzo算法使用方式无损压缩算法理论基础 信息熵 信息熵是一个数学上颇为抽象的概念&#xff0c;在这里不妨把信息熵理…

java中钩子方法 addShutdownHook 学习使用

钩子作用&#xff1a; 在线上Java程序中经常遇到进程程挂掉&#xff0c;一些状态没有正确的保存下来&#xff0c;这时候就需要在JVM关掉的时候执行一些清理现场的代码。Java中得ShutdownHook提供了比较好的方案。 JDK在1.3之后提供了Java Runtime.addShutdownHook(Thread hook)…

基于Curator实现dubbo服务自动注册发现

文章目录概念基于ServiceDiscovery实现服务自动注册和发现Service:服务基本信息InstanceDetails:封装实例用过来保存到zk中ServiceProvider&#xff1a;服务提供者ServiceConsumer&#xff1a;服务消费者运行基于ServiceDiscovery、ServiceCache实现服务自动注册和发现Registry…

jdk、cglib动态代理代码示例

文章目录jdk动态代理实现步骤代码示例新建一个接口新建一个接口的实现类新建一个代理类调用测试cglib动态代理实现实现步骤创建一个实现类新建一个代理类调用测试jdk动态代理 实现步骤 新建一个接口新建一个接口的实现类新建一个代理类&#xff0c;实现InvocationHandler接口…

Netty 客户端服务器端通信 demo

服务端 package com.demo.rpc.netty;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketC…

Solr实战篇

1.在MySQL数据中建立lagou_db数据库, 将position.sql中的数据导入到mysql 数据中。 2.使用Solr的DIH 将mysql中的数据导入到Solr中。 3.使用SpringBoot 访问Solr 使用positionName 字段检索职位信息 如果检索到的职位信息不够5条 则需要 启用positionAdvantage 查找 美女多、…

Elasticsearch Java Low Level REST Client(通用配置)

Elasticsearch Java Low Level REST Client&#xff08;通用配置&#xff09; 通用配置 正如初始化中所解释的&#xff0c;RestClientBuilder支持提供RequestConfigCallback和HttpClientConfigCallback&#xff0c;它们允许Apache Async Http Client公开的任何自定义。这些回…

elasticsearch实战篇

文章目录1.新建SpringBoot项目依赖2.实现配置模块 config控制层 controller模型层 model服务层 service工具 util主类单元测试1.新建SpringBoot项目 依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org…

Docker 部署java服务

作业描述&#xff1a; &#xff08;1&#xff09;Hot是应用程序(springboot)&#xff0c;打成jar包&#xff1a;docker-demo-1.0-SNAPSHOT.jar &#xff08;2&#xff09;利用dockerfile将docker-demo-1.0-SNAPSHOT.jar构建成镜像docker-demo Dockerfile-docker-demo&#xf…

单向链表 双向链表 java代码实现

文章目录单向链表代码实现单元测试控制台打印头插法尾插法双向链表代码实现单元测试控制台打印头插法尾插法单向链表 代码实现 package csdn.dreamzuora.list;/*** author: weijie* Date: 2020/10/15 15:28* Description:*/ public class SingleNode {int id;String name…

栈、队列 java代码实现

文章目录普通队列数组实现java代码实现单元测试控制台打印链表实现java代码实现单元测试控制台打印LinkedList队列使用优先队列&#xff1a;PriorityQueue使用栈数组实现java代码实现单元测试控制台打印链表实现java代码实现单元测试控制台打印普通队列 概念&#xff1a;先入先…

递归学习 斐波那契 java代码实现

文章目录java代码单元测试java代码 package csdn.dreamzuora.recursion;/*** Title: 斐波那契额* Description:*斐波那契数列&#xff1a;0、1、1、2、3、5、8、13、21、34、55.....* f[n] f[n-2] f[n-1]* 优点&#xff1a;代码简单* 缺点&#xff1a;占用空间较大、如果递归…