使用Redis Stream来做消息队列和在Asp.Net Core中的实现

041ec18ed06bf8072b88e6a17369f1f9.png

Redis - Wikipedia

写在前面

我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的服务,kafka和RabbitMQ等;

奈何这兄弟一直不给力;

虽然 Redis 的Pub/Sub 是实现了发布/订阅的,但这家伙最坑的是:丢数据

由于Pub/Sub 只是简单的实现了发布订阅模式,简单的沟通起生产者和消费者,当接收生产者的数据后并立即推送或者说转发给订阅消费者,并不会做任何的持久化、存储操作。由此:

  1. 消费者(客户端)掉线;

  2. 消费者未订阅(所以使用的时候一定记得先订阅再生产);

  3. 服务端宕机;

  4. 消费者消费不过来,消息堆积(生产数据受数据缓冲区限制);

以上情况都会导致生产数据的丢失,基于上坑,据我所知大家很少使用Pub/Sub ;

不过官方的哨兵集群通信的时候就是用的Pub/Sub;

然后,各路大佬结合队列、阻塞等等实现了各种各样的方案,主要是使用:BLPOP+LPUSH 的实现

这里就不一一展开了,有兴趣请看叶老板文章;

可能是各种实现都会带来各种的问题,redis的官方也看到了社区的挣扎。终于,到了Redis5.0,官方带来了消息队列的实现:Stream

Redis Stream介绍

简单来说Redis Stream 就是想用Redis 做消息队列的最佳推荐;

XADD--发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 #再发一条
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631628884174-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631628890025-0"

其中的'*'表示让 Redis 自动生成唯一的消息 ID,格式是 「时间戳-自增序号」

XREAD--订阅消息

订阅消息

XREAD COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREAD COUNT 5 STREAMS stream1 0-0 
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"

'0-0' 表示从开头读取

如果需继续拉取下一条,需传入上一条消息的id

阻塞等待消息

XREAD COUNT 5 BLOCK 50000 STREAMS stream1 1631628890025-0

阻塞等待消息id ‘1631628890025-0’ 后的消息

50000 阻塞时间(毫秒) ‘0’ 表示无限期阻塞

从到这里就可以看出 Pub/Sub多端订阅的最大优点,Stream也是支持的。有的同学很快就发现问题了,这里多端订阅后,没有消息确认ACK机制。

没错,因为现在所有的消费者都是订阅共同的消息,多端订阅,如果某个客户端ACK某条消息后,其他端消费不了,就实现不了多端消费了。

由此,引出 分组:GROUP

GROUP--订阅分组消息(多端订阅)

同样先发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631629080208-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631629084083-0"

XGROUP CREATE 创建分组

创建分组1

XGROUP CREATE stream1 group1 0-0
127.0.0.1:6379> XGROUP CREATE stream1 group1 0-0  
OK

‘0-0’  表示从开头读取

'>' 表示读取最新,未被消费过的消息

XREADGROUP--分组读取

分组 group1

XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >

consumer1 消费者名称, redis服务器会记住第一次使用的消费者名称;

127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"3) 1) "1631629080208-0"2) 1) "name"2) "hei"3) "age"4) "18"4) 1) "1631629084083-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
(nil)

同样

‘0-0’  表示从开头读取

'>' 表示读取最新,未被消费过的消息 (可以看到命令执行第二遍已经读不到新消息了)

分组 group2

127.0.0.1:6379> XGROUP CREATE stream1 group2 0-0  
OK
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"3) 1) "1631629080208-0"2) 1) "name"2) "hei"3) "age"4) "18"4) 1) "1631629084083-0"2) 1) "name"2) "zhangshan"3) "age"4) "19

可以看到可以读到同样的消息,多端订阅没有问题;

当然分组也支持阻塞读取:

#和XREAD一样
XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0 #分组阻塞
XREADGROUP GROUP group2 consumer1 COUNT 5 BLOCK 0 STREAMS stream1 >

‘0’ 表示无限期阻塞,单位(毫秒)

XPENDING--待处理消息

消息使用XREADGROUP 读取后会进入待处理条目列表(PEL);

我们看看:

XPENDING stream1 group2
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 4
2) "1631628884174-0"
3) "1631629084083-0"
4) 1) 1) "consumer1"2) "4"

表示:

  1. (integer) 4      //表示当前消费者组的待处理消息的数量

  2. "1631628884174-0"       //消息最大id

  3. "1631629084083-0"      //最小id

    1. "consumer1"      // 消费者名称

    2. "4" //消费者待处理消息数量

XACK--删除已处理消息(消息确认机制)

我们已经知道group2待处理消息有4条,我们从头读取看看:

XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"3) 1) "1631629080208-0"2) 1) "name"2) "hei"3) "age"4) "18"4) 1) "1631629084083-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"

假设最后一条消息 ‘1631629084083-0’ 我已处理完成

127.0.0.1:6379> XACK stream1 group2 1631629084083-0
(integer) 1

再看:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"3) 1) "1631629080208-0"2) 1) "name"2) "hei"3) "age"4) "18"
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 3
2) "1631628884174-0"
3) "1631629080208-0"
4) 1) 1) "consumer1"2) "3"

可以清楚看到goroup2 待处理消息剩下3条;

这时 Redis 已经把这条消息标记为「处理完成」不再追踪;

Stream在Asp.net Core中的使用

private static string _connstr = "172.16.3.119:6379";
private static string _keyStream = "stream1";
private static string _nameGrourp = "group1";
private static string _nameConsumer = "consumer1";

发布:

csRedis.XAdd(_keyStream, "*", ("name", "message1"));

订阅:

static async Task CsRedisStreamConsumer()
{Console.WriteLine("CsRedis StreamConsumer start!");var csRedis = new CSRedis.CSRedisClient(_connstr);csRedis.XAdd(_keyStream, "*", ("name", "message1"));try{csRedis.XGroupCreate(_keyStream, _nameGrourp);}catch { }(string key, (string id, string[] items)[] data)[] product;(string Pid, string Platform, string Time) data = (null, null, null);while (true){try{product = csRedis.XReadGroup(_nameGrourp, _nameConsumer, 1, 10000, (_keyStream, ">"));if (product?.Length > 0 == true && product[0].data?.Length > 0 == true){Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}");product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value =>{Console.WriteLine($"    {value}");});//csRedis.XAck(_keyStream, _nameGrourp, product[0].data[0].id);}}catch (Exception){//throw;}}
}

CSRedisCore

ad2bff81160ea4d52794abac77f31a4d.gif
动画2

这里的超时报错可通过修改连接参数:syncTimeout 解决

CSRedisCore支持阻塞读取;

StackExchange.Redis

发布:

db.StreamAdd(_keyStream, "name", "message1", "*");

订阅:

static async Task StackExchangeRedisStreamConsumer()
{Console.WriteLine("StackExchangeRedis StreamConsumer start!");var redis = ConnectionMultiplexer.Connect(_connstr);var db = redis.GetDatabase();try{///初始化方式1//db.StreamAdd(_keyStream, "name", "message1", "*");//db.StreamCreateConsumerGroup(_keyStream, _nameGrourp);//方式2db.StreamCreateConsumerGroup(_keyStream, _nameGrourp, StreamPosition.NewMessages);}catch { }StreamEntry[] data = null;while (true){data = db.StreamReadGroup(_keyStream, _nameGrourp, _nameConsumer, ">", count: 1, noAck: true);if (data?.Length > 0 == true){Console.WriteLine($"message-id:{data.FirstOrDefault().Id}");data.FirstOrDefault().Values.ToList().ForEach(c =>{Console.WriteLine($"    {c.Name}:{c.Value}");});db.StreamAcknowledge(_keyStream, _nameGrourp, data.FirstOrDefault().Id);}}
}
992784ade01129dfc6946cf3584feac8.gif
动画

StackExchange.Redis 有点比较坑的是不存在阻塞读取;理由:https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing

QA

Q:Stream是否支持AOF、RDB持久化?

**A:**支持,其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

Q:Stream是否还是会丢数据?若是,何种情况下?;

**A:**会;1、AOF是定时写盘的,如果数据还在内存中时redis服务宕机就会;2、主从切换时(从库还未同步完成主库发来的数据,就被提成主库);3、消息队列超MAXLEN限制;

总结

技术中有的时候没有“银弹”,只有更适合的技术,汝之蜜糖彼之砒霜;

很多时候的技术选型都是个比较麻烦的东西,对选型人的要求很高;你可能不是只需要熟悉其中的一种路线,而是要踩过各种各样的坑,再根据当前受限的环境,选择比较适合目前需求/团队的;

回到Stream上,我认为目前Stream能满足挺大部分队列需求;

特别是“在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的更专业的mq,比如kafka和RabbitMQ的时候”

当然,最终决定需要用更专业的mq与否的,还是需求;

引用

http://www.redis.cn/

https://database.51cto.com/art/202104/659208.htm

https://github.com/2881099/csredis/

https://stackexchange.github.io/StackExchange.Redis/Streams.html

3cac5c415f80de4e8927ebbc2d8478b1.png

文章博客园地址请点击“阅读原文”

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/298110.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

windows挂载ext4_使用 UEFI 双启动 Windows 和 Linux | Linux 中国

这是一份在同一台机器上设置 Linux 和 Windows 双重启动的速成解释,使用统一可扩展固件接口(UEFI)。来源:https://linux.cn/article-12891-1.html作者:Alan Formy-duval译者:郑(本文字数&#x…

oh,我的老伙计,你看看这近五十个dapr视频

oh,我的老伙计,你看看这近五十个 dapr 视频。这不就是你想要的视频资料吗?快来捡走吧!开始了,但是没完全开始 Dapr 是一个可移植的、事件驱动的运行时,它使任何开发人员能够轻松构建出弹性的、无状态和有状…

极速理解设计模式系列:2.观察者模式(Observer Pattern)

4个角色:被观察者(Subject/目标对象接口),具体被观察者(ConcreteSubject/具体目标对象),观察者(Observer),具体观察者(ConcreteObserver) 被观察者(Subject/目标对象接口):目标对象的抽象接口 …

16年微软/腾讯云/华为云MVP是怎样炼成的

自由、创新、研究、探索,很难想象到一个IT大神的博客,会将“自由”放在第一位,也许这二字代表的,既是精神,又是情怀。搞微软技术的,大家或多或少都有听说过微软的“最有价值专家”(MVP&#xff…

.NET中的设计模式---由吃龙虾想到的

作者: 倪大虾 发表于 2010-07-18 18:10 原文链接 阅读: 725 评论: 20今天吃小龙虾的时候忽然想到了以前一个湖北朋友讲的虾的故事.这位朋友是湖北人,据他说在他小时候他们那里很多虾,特别是夏天雨后,满地爬的都是.因为传说那是美国对付中国的秘密武器,居然没有人敢吃.后来偶然有…

【需要重视的BUG】:偷权限的情况

!!如果您生产环境用到了Blog.Core系统(本文是我自己逻辑问题,和官方没关系哈),且没有做其他修改,且没有使用Ids4认证中心来授权认证,请看完本文,并即时做系统维护。-----…

Angular运行在java_在本地运行现有Angular项目

我是Angular的新手,我正在尝试在我的机器上运行Angular的现有项目 . 我做了很多测试并且跟着很多文章 . 但无法运行我的项目 .我有这样的项目文件:我在我的系统上安装了nodejs . 并根据文章按照以下说明操作:将目录更改为我们的仓库cd myproj…

自找麻烦

2019独角兽企业重金招聘Python工程师标准>>> 真是想狗想的要发疯了,所以想买条狗,但是阿拉斯加,哈士奇,金毛,拉布拉多,苏牧,喜乐蒂现在我都买不起,他们都是很听话的狗&am…

Prism+WPF使用DependencyInjection实现AutoMapper的依赖注入功能

前言在使用PRISMWPF开发项目的过程中,需要使用AutoMapper实现对象-对象的映射功能。无奈PRISM没有相关对AutoMapper相关的类库,于是转换一下思想,在nuget 中存在有关使用Microsoft.Extensions.DependencyInjection来实现AutoMapper的依赖注入…

webSocket原理及其案例

常见的消息推送方式 1:轮询方式 浏览器以指定的时间间隔向服务器发出HTTP请求,服务器实现试试返回数据给浏览器 缺点:数据有延时、服务器压力较大。 2:长轮询 浏览器发出ajax(异步)请求,服…

这是啥?也太秀了吧?

1 请坐下2 这是什么愿望,感觉老天都看不下去了!3 像极了手机落在上铺的你!4 真正智慧家居,免通电。5 你以为它是土豆,其实。。6 葫芦不一定叫葫芦葫芦有各种奇奇怪怪的形状和名字7 我发光去了!你点的每个赞…

爱数应用容灾部署方案三

级联复制的异地容灾方案部署 爱数应用容灾部署方案可在异地部署远程容灾站点实现远程应用容灾方案,采用级联复制模型,在本地和远程分别部署容灾站点,克服实时复制对带宽延迟较高的缺点,获得最佳的容灾效果。并且可根据用户的网络和…

30张不明觉厉的照片,看几遍终于看懂了

全世界只有3.14 % 的人关注了爆炸吧知识网络上很多照片,虽然没经过PS,第一眼还是会觉得诡异。仔细看看才发现....哇噢!总觉得她的头上有一撮尖尖的毛?狗狗怎么做得出这个表情?震惊!发现没脖子的长颈鹿&…

排序集合的一个小坑

原来一直用SortList,SortedDictionary来作为键值对存储的排序集合来用,心中就默认是以key按ascall排序来存放的,在之前的案例中也没有出现问题,在最近一个demo中,打破了原来的自以为是的认识,因为在key中不…

爱是天时地利的迷信

1 别人撒娇你撒娇2 据说套着这个白袋子晒鞋鞋就不会发黄了。。3 蚂蚁:为什么要欺负我!4 这是天空的心电图吗5 有回应的喜欢真好 6 随主人这事儿原来是真的…7 被别人喜欢时的心理活动图自回忆专用小马甲你点的每个赞,我都认真当成了喜欢

Juniper Firewall多进单出配制实例

Technorati 标签: juniper,多进单出,配置实例,firewallJuniper firewall多進單出配置。想法是這樣的用一台firewall將這幾條ISP線路都接入,再通過一個trunk口出來,通過一台L2 switch劃分出幾個VLAN,分別對應不同的ISP線路。這樣做的好處就不多…

刷题≠学好数学,近百位名校名师告诉你,数学是怎么学好的?

▲ 点击查看 数学是个神奇的科目,它存在着一种“梯次掉队”的现象。不是说你低年级的知识学好了,高年级的知识就一定能学好。相信大家都有这种体会:一二年级孩子成绩不相上下,但到三四年级的时候,有一批学生的数学成绩…

android 系统之ContentProvider

基于上一篇的数据库操作,又写了一个ContentProvider的示例。把SQLiter 的数据提供出去供别的项目进行访问。 这一篇的代码要求熟悉SQLiter 的API. 首先,我们编写一个类extents ContentProvider ,重写他的方法。 URI 在http 中我们称为统一资源…

你是中层管理者?嗯,一个表面看似风光,实际却很 “鸡肋” 的重要岗位

这是头哥侃码的第246篇原创每年的六七月份,上海都会进入梅雨季节。这段时期的上海天气就好比大小姐的脾气,阴晴不定,完全看心情做事,心情好的时候,给你个阳光,让你的生活和休闲时光多一些灿烂,心…

重温SQL——行转列,列转行(转:http://www.cnblogs.com/kerrycode/archive/2010/07/28/1786547.html)...

行转列,列转行是我们在开发过程中经常碰到的问题。行转列一般通过CASE WHEN 语句来实现,也可以通过 SQL SERVER 2005 新增的运算符PIVOT来实现。 用传统的方法,比较好理解。层次清晰,而且比较习惯。 但是PIVOT 、UNPIVOT提供的语法…