Stream队列
Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的 支持多播的可持久化的消息队列,作者声明 Redis Stream 地借鉴了 Kafka 的设计。
生产者
xadd 追加消息
xdel 删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息。
xrange 获取消息列表,会自动过滤已经删除的消息
xlen 消息长度
del 删除 Stream
1. 追加消息
xadd streamtest1 * name kk age 30
xadd:新增消息固定写法
streamtest1:队列名, 可任意
*:让redis自动生成id
2. 获取消息列表
xrange streamtest1 - +
xrange streamtest1 1701655814064-0 +
-表示最小值, +表示最大值, 也可以换成具体的消息id
3. 删除消息
只给消息打上标志位, 不会真删消息
xdel streamtest1 1701655821193-0
4. 删除整个stream
del streamtest1
消费者
单消费者 xread
1. 从头开始读
xread count 1 block 0 streams streamtest 0-0
count 1: 读取数量1
block 0: 阻塞
0-0:从头开始读
2. 从指定id开始读
xread count 1 block 0 streams streamtest 1701659324617-0
3. 从结尾开始读-阻塞到有消息
xread count 1 block 0 streams streamtest $
消费群组 xgroup
1. 创建消费者群组
xgroup create streamtest cg1 0-0
- streamtest: 队列名
- cg1: 群组名称, 可随意指定
- 0-0: 从头开始读
xgroup create streamtest cg2 $
- $: 从结尾开始读, 只接受新消息, 老消息全部忽略
2. 查看队列详情
xinfo stream streamtest
3. 查看群组详情
xinfo groups streamtest
4. 群组消费
xreadgroup GROUP cg1 c1 count 1 block 0 streams streamtest >
“GROUP”属于关键字,“cg1”是消费组名称,“c1”是消费者名称,“count 1”指明了消费数量,> 号表示从当前消费组的 last_delivered_id 后面开始读, 每当消费者读取一条消息,last_delivered_id 变量就会前进
5. 查看群组消费情况
xinfo consumers streamtest cg1
可以看到目前 c1 这个消费者有7条待 ACK 的消息,空闲了 329273 ms 没有读取消息。
6. ack确认消息
xack streamtest cg1 1701659319318-0 1701659324617-0
消息确认完, pending消息7条变成5条
Redis几种队列实现
1. 基于List的LPUSH+BRPOP的实现
优点: 实现简单, 消息延迟几乎为0
缺点: 空闲连接问题, ack问题.
如果线程一直阻塞在那里,Redis 客户端的连接就成了闲置连接,闲置过久, 服务器一般会主动断开连接,减少闲置资源占用,这个时候 blpop 和 brpop 或抛 出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常需要重试
2. 基于Sorted-Set 的实现
多用来实现延迟队列,也可以实现有序的普通的消息队列,但是消费者无法阻塞的获取消息,只能轮询,不允许重复消息。
3. PUB/SUB,订阅/发布模式
优点: 典型的广播模式,一个消息可以发布到多个消费者;多信道订阅,消费者可 以同时订阅多个信道,从而接收多类消息;消息即时发送,消息不用等待消费者 读取,消费者会自动接收到信道发布的消息。
缺点:
- 消息一旦发布, 若客户端不在线, 消息就会丢失;
- 不能保证每个消费者接收的时间是一致的;
- 若消费者客户端出现消息 积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产 远大于消费速度时.
由此可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务, 而是擅长处理广播,即时通讯,即时反馈的业务
4. 基于Stream类型的实现
已经有了一个消息中间件的雏形,可以考虑在生产过程中使用,当然真正要在生产中应用,要做的事情还很多,比如消息队列的管理和监控需要花大力气去实现,而专业消息队列都已经自带或者存在着很好的第三方方案和插件