使用Redis Stream来做消息队列和在Asp.Net Core中的实现

041ec18ed06bf8072b88e6a17369f1f9.png

Redis - Wikipedia

写在前面

我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的服务,kafka和RabbitMQ等;

奈何这兄弟一直不给力;

虽然 Redis 的Pub/Sub 是实现了发布/订阅的,但这家伙最坑的是:丢数据

由于Pub/Sub 只是简单的实现了发布订阅模式,简单的沟通起生产者和消费者,当接收生产者的数据后并立即推送或者说转发给订阅消费者,并不会做任何的持久化、存储操作。由此:

  1. 消费者(客户端)掉线;

  2. 消费者未订阅(所以使用的时候一定记得先订阅再生产);

  3. 服务端宕机;

  4. 消费者消费不过来,消息堆积(生产数据受数据缓冲区限制);

以上情况都会导致生产数据的丢失,基于上坑,据我所知大家很少使用Pub/Sub ;

不过官方的哨兵集群通信的时候就是用的Pub/Sub;

然后,各路大佬结合队列、阻塞等等实现了各种各样的方案,主要是使用:BLPOP+LPUSH 的实现

这里就不一一展开了,有兴趣请看叶老板文章;

可能是各种实现都会带来各种的问题,redis的官方也看到了社区的挣扎。终于,到了Redis5.0,官方带来了消息队列的实现:Stream

Redis Stream介绍

简单来说Redis Stream 就是想用Redis 做消息队列的最佳推荐;

XADD--发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 #再发一条
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631628884174-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631628890025-0"

其中的'*'表示让 Redis 自动生成唯一的消息 ID,格式是 「时间戳-自增序号」

XREAD--订阅消息

订阅消息

XREAD COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREAD COUNT 5 STREAMS stream1 0-0 
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"

'0-0' 表示从开头读取

如果需继续拉取下一条,需传入上一条消息的id

阻塞等待消息

XREAD COUNT 5 BLOCK 50000 STREAMS stream1 1631628890025-0

阻塞等待消息id ‘1631628890025-0’ 后的消息

50000 阻塞时间(毫秒) ‘0’ 表示无限期阻塞

从到这里就可以看出 Pub/Sub多端订阅的最大优点,Stream也是支持的。有的同学很快就发现问题了,这里多端订阅后,没有消息确认ACK机制。

没错,因为现在所有的消费者都是订阅共同的消息,多端订阅,如果某个客户端ACK某条消息后,其他端消费不了,就实现不了多端消费了。

由此,引出 分组:GROUP

GROUP--订阅分组消息(多端订阅)

同样先发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631629080208-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631629084083-0"

XGROUP CREATE 创建分组

创建分组1

XGROUP CREATE stream1 group1 0-0
127.0.0.1:6379> XGROUP CREATE stream1 group1 0-0  
OK

‘0-0’  表示从开头读取

'>' 表示读取最新,未被消费过的消息

XREADGROUP--分组读取

分组 group1

XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >

consumer1 消费者名称, redis服务器会记住第一次使用的消费者名称;

127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"3) 1) "1631629080208-0"2) 1) "name"2) "hei"3) "age"4) "18"4) 1) "1631629084083-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
(nil)

同样

‘0-0’  表示从开头读取

'>' 表示读取最新,未被消费过的消息 (可以看到命令执行第二遍已经读不到新消息了)

分组 group2

127.0.0.1:6379> XGROUP CREATE stream1 group2 0-0  
OK
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"3) 1) "1631629080208-0"2) 1) "name"2) "hei"3) "age"4) "18"4) 1) "1631629084083-0"2) 1) "name"2) "zhangshan"3) "age"4) "19

可以看到可以读到同样的消息,多端订阅没有问题;

当然分组也支持阻塞读取:

#和XREAD一样
XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0 #分组阻塞
XREADGROUP GROUP group2 consumer1 COUNT 5 BLOCK 0 STREAMS stream1 >

‘0’ 表示无限期阻塞,单位(毫秒)

XPENDING--待处理消息

消息使用XREADGROUP 读取后会进入待处理条目列表(PEL);

我们看看:

XPENDING stream1 group2
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 4
2) "1631628884174-0"
3) "1631629084083-0"
4) 1) 1) "consumer1"2) "4"

表示:

  1. (integer) 4      //表示当前消费者组的待处理消息的数量

  2. "1631628884174-0"       //消息最大id

  3. "1631629084083-0"      //最小id

    1. "consumer1"      // 消费者名称

    2. "4" //消费者待处理消息数量

XACK--删除已处理消息(消息确认机制)

我们已经知道group2待处理消息有4条,我们从头读取看看:

XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"3) 1) "1631629080208-0"2) 1) "name"2) "hei"3) "age"4) "18"4) 1) "1631629084083-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"

假设最后一条消息 ‘1631629084083-0’ 我已处理完成

127.0.0.1:6379> XACK stream1 group2 1631629084083-0
(integer) 1

再看:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"2) 1) 1) "1631628884174-0"2) 1) "name"2) "hei"3) "age"4) "18"2) 1) "1631628890025-0"2) 1) "name"2) "zhangshan"3) "age"4) "19"3) 1) "1631629080208-0"2) 1) "name"2) "hei"3) "age"4) "18"
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 3
2) "1631628884174-0"
3) "1631629080208-0"
4) 1) 1) "consumer1"2) "3"

可以清楚看到goroup2 待处理消息剩下3条;

这时 Redis 已经把这条消息标记为「处理完成」不再追踪;

Stream在Asp.net Core中的使用

private static string _connstr = "172.16.3.119:6379";
private static string _keyStream = "stream1";
private static string _nameGrourp = "group1";
private static string _nameConsumer = "consumer1";

发布:

csRedis.XAdd(_keyStream, "*", ("name", "message1"));

订阅:

static async Task CsRedisStreamConsumer()
{Console.WriteLine("CsRedis StreamConsumer start!");var csRedis = new CSRedis.CSRedisClient(_connstr);csRedis.XAdd(_keyStream, "*", ("name", "message1"));try{csRedis.XGroupCreate(_keyStream, _nameGrourp);}catch { }(string key, (string id, string[] items)[] data)[] product;(string Pid, string Platform, string Time) data = (null, null, null);while (true){try{product = csRedis.XReadGroup(_nameGrourp, _nameConsumer, 1, 10000, (_keyStream, ">"));if (product?.Length > 0 == true && product[0].data?.Length > 0 == true){Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}");product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value =>{Console.WriteLine($"    {value}");});//csRedis.XAck(_keyStream, _nameGrourp, product[0].data[0].id);}}catch (Exception){//throw;}}
}

CSRedisCore

ad2bff81160ea4d52794abac77f31a4d.gif
动画2

这里的超时报错可通过修改连接参数:syncTimeout 解决

CSRedisCore支持阻塞读取;

StackExchange.Redis

发布:

db.StreamAdd(_keyStream, "name", "message1", "*");

订阅:

static async Task StackExchangeRedisStreamConsumer()
{Console.WriteLine("StackExchangeRedis StreamConsumer start!");var redis = ConnectionMultiplexer.Connect(_connstr);var db = redis.GetDatabase();try{///初始化方式1//db.StreamAdd(_keyStream, "name", "message1", "*");//db.StreamCreateConsumerGroup(_keyStream, _nameGrourp);//方式2db.StreamCreateConsumerGroup(_keyStream, _nameGrourp, StreamPosition.NewMessages);}catch { }StreamEntry[] data = null;while (true){data = db.StreamReadGroup(_keyStream, _nameGrourp, _nameConsumer, ">", count: 1, noAck: true);if (data?.Length > 0 == true){Console.WriteLine($"message-id:{data.FirstOrDefault().Id}");data.FirstOrDefault().Values.ToList().ForEach(c =>{Console.WriteLine($"    {c.Name}:{c.Value}");});db.StreamAcknowledge(_keyStream, _nameGrourp, data.FirstOrDefault().Id);}}
}
992784ade01129dfc6946cf3584feac8.gif
动画

StackExchange.Redis 有点比较坑的是不存在阻塞读取;理由:https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing

QA

Q:Stream是否支持AOF、RDB持久化?

**A:**支持,其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

Q:Stream是否还是会丢数据?若是,何种情况下?;

**A:**会;1、AOF是定时写盘的,如果数据还在内存中时redis服务宕机就会;2、主从切换时(从库还未同步完成主库发来的数据,就被提成主库);3、消息队列超MAXLEN限制;

总结

技术中有的时候没有“银弹”,只有更适合的技术,汝之蜜糖彼之砒霜;

很多时候的技术选型都是个比较麻烦的东西,对选型人的要求很高;你可能不是只需要熟悉其中的一种路线,而是要踩过各种各样的坑,再根据当前受限的环境,选择比较适合目前需求/团队的;

回到Stream上,我认为目前Stream能满足挺大部分队列需求;

特别是“在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的更专业的mq,比如kafka和RabbitMQ的时候”

当然,最终决定需要用更专业的mq与否的,还是需求;

引用

http://www.redis.cn/

https://database.51cto.com/art/202104/659208.htm

https://github.com/2881099/csredis/

https://stackexchange.github.io/StackExchange.Redis/Streams.html

3cac5c415f80de4e8927ebbc2d8478b1.png

文章博客园地址请点击“阅读原文”

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/298110.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java基础代码下载_Java基础(一)(示例代码)

1. 概述1.1 什么是Java语言Java语言:面向对象的程序设计语言与机器无关的二进制格式的类文件Java虚拟机(用来执行类文件)完整的软件程序包(跨平台的API和库)1.1.1 Java语言特点语法简单,功能强大分布式与安全性与平台无关解释、编译两种运行方式多线程动…

windows挂载ext4_使用 UEFI 双启动 Windows 和 Linux | Linux 中国

这是一份在同一台机器上设置 Linux 和 Windows 双重启动的速成解释,使用统一可扩展固件接口(UEFI)。来源:https://linux.cn/article-12891-1.html作者:Alan Formy-duval译者:郑(本文字数&#x…

Master DNS服务的搭建

很多企业都通过Linux来搭建自己的DNS服务器来提高网路效率,在此我们来学习如何搭建Master DNS服务器。首先,确定是否搭建yum仓库,并有软件包可用,如DVD光盘是否挂载。一、安装DNS服务所需要的软件包DNS所需要的软件包有&#xff1…

oh,我的老伙计,你看看这近五十个dapr视频

oh,我的老伙计,你看看这近五十个 dapr 视频。这不就是你想要的视频资料吗?快来捡走吧!开始了,但是没完全开始 Dapr 是一个可移植的、事件驱动的运行时,它使任何开发人员能够轻松构建出弹性的、无状态和有状…

极速理解设计模式系列:2.观察者模式(Observer Pattern)

4个角色:被观察者(Subject/目标对象接口),具体被观察者(ConcreteSubject/具体目标对象),观察者(Observer),具体观察者(ConcreteObserver) 被观察者(Subject/目标对象接口):目标对象的抽象接口 …

java try finally connectoin close_Java SocketChannel類代碼示例

本文整理匯總了Java中io.netty.channel.socket.SocketChannel類的典型用法代碼示例。如果您正苦於以下問題:Java SocketChannel類的具體用法?Java SocketChannel怎麽用?Java SocketChannel使用的例子?那麽恭喜您, 這裏精選的類代碼…

被娱乐在线报道的“唐骏造假门事件”

最近全球最热的是南非的世界杯,而在中国最近比较热的是另外一个事情。。。话说那天晚上回家已经很晚,照例的,家里的毛孩子歪歪斜斜的睡在床的角落里,一边是正在看节目的老婆吃着零食,我随便瞄了一眼,是新闻…

16年微软/腾讯云/华为云MVP是怎样炼成的

自由、创新、研究、探索,很难想象到一个IT大神的博客,会将“自由”放在第一位,也许这二字代表的,既是精神,又是情怀。搞微软技术的,大家或多或少都有听说过微软的“最有价值专家”(MVP&#xff…

Dave Python 练习三 -- 对象

#encodingutf-8 #*************Part 1 : 对象 ****************** #Python 对象 #Python 使用对象模型来存储数据。构造任何类型的值都是一个对象。所有的Python 对像都拥有三个特性:身份,类型和值。 #身份: #每一个对象都有一个唯一的身份标…

java 注册驱动失败_java – JDBC驱动程序注册死锁?

在一个线程中,正在创建一个JackRabbit:"docs-on-startup" #32 prio5 os_prio0 tid0x00007f730d73e800 nid0x601d in Object.wait() [0x00007f725bffc000]java.lang.Thread.State: RUNNABLEat sun.reflect.NativeConstructorAccessorImpl.newInstance0(Nat…

python3多线程queue_Python多线程(3)——Queue模块

Queue模块支持先进先出(FIFO)队列,支持多线程的访问,包括一个主要的类型(Queue)和两个异常类(exception classes)。Python 2 中的Queue模块在Python 3中更名为 queue。Queue对象的创建可以通过实例化Queue类型获得队列对象:创建新的队列&…

.NET中的设计模式---由吃龙虾想到的

作者: 倪大虾 发表于 2010-07-18 18:10 原文链接 阅读: 725 评论: 20今天吃小龙虾的时候忽然想到了以前一个湖北朋友讲的虾的故事.这位朋友是湖北人,据他说在他小时候他们那里很多虾,特别是夏天雨后,满地爬的都是.因为传说那是美国对付中国的秘密武器,居然没有人敢吃.后来偶然有…

【需要重视的BUG】:偷权限的情况

!!如果您生产环境用到了Blog.Core系统(本文是我自己逻辑问题,和官方没关系哈),且没有做其他修改,且没有使用Ids4认证中心来授权认证,请看完本文,并即时做系统维护。-----…

Java实现文件过滤

Java实现文件过滤的方法,比如我只想获得某个路径下.java文件 只需要实现FilenameFilter这个接口即可。 比如: private class FileFilter implements FilenameFilter { public boolean accept(File dir, String name) { return name.en…

Angular运行在java_在本地运行现有Angular项目

我是Angular的新手,我正在尝试在我的机器上运行Angular的现有项目 . 我做了很多测试并且跟着很多文章 . 但无法运行我的项目 .我有这样的项目文件:我在我的系统上安装了nodejs . 并根据文章按照以下说明操作:将目录更改为我们的仓库cd myproj…

python作业题目用户输入行数、输出倒的等腰三角形_智慧职教云课堂APPPython程序设计(常州工业职业技术学院)作业期末考试答案...

在FANUC15系统中所采用的高分辨率绝对脉冲编码器,若每转输出脉冲数为100万个,最高允许转速10000r/min。如果当前和今后相当长一段时期,个人住房贷款市场中()将是一种主要的模式。A.多种机构的参与菱形ABCD中,AB2&#…

自找麻烦

2019独角兽企业重金招聘Python工程师标准>>> 真是想狗想的要发疯了,所以想买条狗,但是阿拉斯加,哈士奇,金毛,拉布拉多,苏牧,喜乐蒂现在我都买不起,他们都是很听话的狗&am…

Prism+WPF使用DependencyInjection实现AutoMapper的依赖注入功能

前言在使用PRISMWPF开发项目的过程中,需要使用AutoMapper实现对象-对象的映射功能。无奈PRISM没有相关对AutoMapper相关的类库,于是转换一下思想,在nuget 中存在有关使用Microsoft.Extensions.DependencyInjection来实现AutoMapper的依赖注入…

【机房真是】。。。各种蛋疼。。。

渣渣。。。呵呵。。。预流推进什么的。。。呵呵。。。。渣渣。。。渣渣。。。、、走了。。。 转载于:https://www.cnblogs.com/Aoi3x/archive/2011/09/07/2645360.html

webSocket原理及其案例

常见的消息推送方式 1:轮询方式 浏览器以指定的时间间隔向服务器发出HTTP请求,服务器实现试试返回数据给浏览器 缺点:数据有延时、服务器压力较大。 2:长轮询 浏览器发出ajax(异步)请求,服…