使用 Redis Stream 实现消息队列

使用 Redis Stream 实现消息队列

Intro

Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很好的实现消息队列

Simple Usage

首先我们来看一个简单版本的 Stream 使用,我们在代码里使用一个发布者,一个消费者来模拟一个简单的消息队列的场景

来看下面的测试代码:

private const string StreamKey = "test-simple-stream";public static async Task MainTest()
{await RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);// register background consumer_ = Task.Factory.StartNew(Consume).ConfigureAwait(false);//await Publish();
}private static async Task Publish()
{Console.WriteLine("Press Enter to publish messages, Press Q to exit");var input = Console.ReadLine();while (input is not "q" and not "Q"){var redis = RedisHelper.GetDatabase();for (var i = 0; i < 10; i++){await redis.StreamAddAsync(StreamKey, "message", $"test_message_{i}");}input = Console.ReadLine();}
}private static async Task Consume()
{var lastMsgId = "0-0";while (true){await InvokeHelper.TryInvokeAsync(async () =>{var redis = RedisHelper.GetDatabase();var entries = await redis.StreamReadAsync(StreamKey, lastMsgId, 2);if (entries.Length == 0){return;}foreach (var entry in entries){Console.WriteLine(entry.Id);entry.Values.Dump();// delete message if you want// redis.StreamDelete(StreamKey, new[] { entry.Id });}lastMsgId = entries[^1].Id;});await Task.Delay(200);}
}

上面的代码会使用一个后台线程来运行一个 Consumer 来从 Stream 中读取消息,有两种消费消息的模式,一种是自己维护一个处理的消息 offset,每次从这个 offset 之后读取新消息,另外一种模式不需要维护本地的 offset,可以在处理完消息之后直接删掉消息,默认消息是不会删消息的,所以如果不删消息的话需要维护

Publisher 每次会发布 10 条消息,Consumer 每次会读取两条消息,处理之后会等待 200 ms,之后再查询消息

来看一下运行效果吧:

Consumer Group

上面的示例会相对来说比较简单,只有一个 Consumer,但是在比较常用的场景下往往会有多个消费者处理,

比如说用户注册成功之后,发布一条消息可能会有多个 Consumer 同时给用户发邮件或短信以及给用户加积分等操作,这种场景下使用上面的模式就不合适了,Redis Stream 中增加了 Consumer Group 的概念(有的人甚至称 Redis 内置了一个 Kafka),在创建了 Consumer Group 之后,向 Stream 发布消息的时候会广播到各个 Consumer Group 中,每个 Consumer Group 的消息消费是独立的,不同的 Consumer Group 的消费速度可以不一致,一个 Consumer Group 也可以有多个 Consumer 同时运行,同一个 Group 内的多个 Consumer 是会共享一个 Consumer Group 的消息消费,而且我们可以手动进行消息的 ACK

来看下面的示例代码吧:

private const string StreamKey = "test-stream-group";
private static int _consumerCount;public static async Task MainTest()
{await RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);// register background consumer_ = await Task.Factory.StartNew(Consume).ConfigureAwait(false);_ = await Task.Factory.StartNew(Consume).ConfigureAwait(false);//await Publish();
}private static async Task Publish()
{Console.WriteLine("Press Enter to publish messages, Press Q to exit");var input = Console.ReadLine();while (input is not "q" and not "Q"){var redis = RedisHelper.GetDatabase();for (var i = 0; i < 10; i++){await redis.StreamAddAsync(StreamKey, "message", $"test_message_{i}");}input = Console.ReadLine();}
}private static async Task Consume()
{Interlocked.Increment(ref _consumerCount);var groupName = $"group-{_consumerCount}";var consumerName = $"consumer-{_consumerCount}";var redis = RedisHelper.GetDatabase();redis.StreamCreateConsumerGroup(StreamKey, groupName);while (true){await InvokeHelper.TryInvokeAsync(async () =>{var messages = await redis.StreamReadGroupAsync(StreamKey, groupName, consumerName, count: SecurityHelper.Random.Next(1, 4));if (messages.Length == 0){return;}foreach (var message in messages){Console.WriteLine($"{groupName}-{message.Id}-{message.Values.ToJson()}");await redis.StreamAcknowledgeAsync(StreamKey, groupName, message.Id);}});await Task.Delay(200);}
}

上面的示例代码会先注册两个 Consumer Group,两个 Consumer Group 内各有一个 consumer,你也可以使用多个 consumer,为了体现各个 Consumer Group 是独立的,每次获取消息的 Count 是会随机指定的,在读取的消息之后会输出消息内容来代替处理消息的逻辑,处理完成之后进行消息的 ACK,消息的发布逻辑和上面的示例是类似的

上述代码执行输出示例:


可以看到我们发布的消息,每一个 consumer group 都会处理消息,而且处理消息的速度是独立的,互不影响

通过 XINFO 命令我们可以对 Stream 做一些监控

More

利用 Redis 的 Stream 我们可以实现可靠的一个消息机制,stream 的每一条消息都会有一个消息 Id,默认是两个部分,一个部分是时间戳,另一个部分是一个序列号,消息 Id 可以自定义,但是通常情况下推荐用默认的 id

Redis 中的 List、HashSet、Set、ZSet 这些数据类型中没有元素的时候会把对应的 Key 也会删掉,但是 Stream 是不会的,Stream 允许没有消息的时候依然存在

Redis Stream 使用的时候需要注意我们是可以指定 Stream 的消息长度的,如果我们指定了最大消息长度 10000,超出 10000 的时候旧消息就会被挤出队列,可能会出现消息的丢失,需要对 Stream 做必要的监控和报警

References

  • https://redis.io/topics/streams-intro

  • https://redis.io/commands

  • https://github.com/WeihanLi/SamplesInPractice/tree/master/RedisSample

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/303239.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用linux命令通常做什么,如何知道你在 Linux 里最常使用的几个命令?

不知道大家自接触 Linux 以来&#xff0c;都使用过哪些命令&#xff0c;其中最常用的命令是什么&#xff1f;我最常用的命令之一是 sudo &#xff0c;因为我每天都在使用它在 Linux 上安装、更新、删除软件包以及其它各种需要超级用户权限的操作。那么你知道你自己最经常使用的…

资料分享 | python机器学习教程分享来袭

小天从大学开始&#xff0c;便开启资料收集功能。近几年以AlphaGo为契机&#xff0c;人工智能进入新的发展阶段&#xff0c;再加上日常的深入研究&#xff0c;小天收集整理了丰富的机器学习资料&#xff0c;内容涵盖“机器学习视频”&#xff0c;“机器学习教程”等。截止到今天…

Autofac框架初识与应用

一、前言这上一篇中&#xff0c;主要讲述了什么是IoC容器&#xff0c;以及了解到它是DI构造函注入的框架&#xff0c;它管理着依赖项的生命周期以及映射关系&#xff0c;同时也介绍实践了在ASP.Net Core中,默认提供的内置IoC容器&#xff0c;以及它的实例注册方式和相应的生命周…

聊一聊数据导出那些事

前言 数据导出&#xff0c;这可以说是一个随处可见的需求&#xff0c;大部分管理平台&#xff0c;报表系统都会有这个需求。对于这个需求&#xff0c;不少系统会做限制&#xff0c;只能从系统导出几千或几万的数据&#xff0c;再多的话就要提申请&#xff0c;经过层层审批&…

如何黑掉一台根本不联网的电脑

一直以来&#xff0c;拿到一台电脑上的密钥&#xff0c;方法无非有以下三种&#xff1a;1、直接拿到这台电脑&#xff0c;然后输入木马病毒进行盗取。&#xff08;此种略微LowB的方法风险在于&#xff1a;如果被电脑主人“捉奸在床”&#xff0c;愤而报警&#xff0c;则需要黑客…

通过Dapr实现一个简单的基于.net的微服务电商系统(二)——通讯框架讲解

首先感谢张队geffzhang公众号转发了上一篇文章&#xff0c;希望广大.neter多多推广dapr&#xff0c;让云原生更快更好的在.net这片土地上落地生根。 书接上回通过Dapr实现一个简单的基于.net的微服务电商系统&#xff0c;今天来分享一下这套电商demo的通讯部分到底是如何工作的…

windows下整合tomcat和nginx

tomcat自带的apache服务器对于并发请求的处理能力比较差&#xff0c;并且耗费资源很大&#xff0c;而nginx这方便却很强悍&#xff0c;以下是在windows下整合tomcat和nginx的过程。 1.准备工作 下载tomcat&#xff08;http://tomcat.apache.org/download-70.cgi&#xff09;,下…

从飞机上看下雨是这样子,太震撼了!

不同的角度&#xff0c;不一样的世界&#xff01;来源&#xff1a;环球顶尖摄影版权归原作者所有&#xff0c;转载仅供学习使用&#xff0c;不用于任何商业用途&#xff0c;如有侵权请留言联系删除&#xff0c;感谢合作。数据与算法之美用数据解决不可能长按扫码关注

linux基础 linhaifeng,Linux基础之命令练习Day2(示例代码)

作业一&#xff1a;1) 新建用户natasha&#xff0c;uid为1000&#xff0c;gid为555&#xff0c;备注信息为“master”2) 修改natasha用户的家目录为/Natasha3) 查看用户信息配置文件的最后一行4) 为natasha用户设置密码“123”5) 查看用户密码配置文件的最后一行6) 将natasha用…

NET问答: 为什么 null + true = string 呢?

咨询区 Javed Akram&#xff1a;请问 null true 为什么是一个 string 类型的 True&#xff0c;代码如下&#xff1a;static void Main(string[] args){string b null true;Console.WriteLine(b);}谁知道这背后的原理&#xff1f;回答区 JaredPar&#xff1a;这是因为你一旦引…

华为21级程序员月薪曝光:270k封神!众网友直呼长见识……

如果一个人的薪水是每月几万&#xff0c;估计很多人都会认为很高&#xff0c;而能拿到这么高薪水的人一定是一个非常优秀人。最近&#xff0c;一名HR在互联网上发布了一个内容。该公司表示在招聘简历中找到华为高管的简历&#xff0c;简历的级别为21&#xff0c;月薪为27万&…

做移动互联网App,你的测试用例足够吗?

我在面试测试工程师时&#xff0c;经常问到的一个问题是“给出Word另存为这个功能的测试用例”。除开基本的测试用例外&#xff0c;考虑到各种异常情况&#xff0c;例如内存已满、硬盘空间不足是非常重要的。但是针对移动互联网App来说&#xff0c;情况还要复杂的多。一个重要原…

linux 手机 wlan信号桥,手机WLAN信号桥是什么?WLAN信号的作用和使用方法

什么是寒假必备&#xff1f;温暖的被窝和wifi绝对是不能少的。今天就给大家介绍一下和手机WIFI有关的WLAN信号桥的知识&#xff0c;让大家在寒假享受WIFI又多了一种选择。感兴趣的可以一起来看看。一、什么是WLAN信号桥&#xff1f;WLAN信号桥简单的来说就是手机连接一个WiFi后…

.NET 6 HotReload的试用

.net v6.0.0-preview.3的HotReload我是极喜欢的&#xff0c;因为之前有这样的需求——状态不丢&#xff0c;上下文不丢。为了验证&#xff0c;安装完.net 6 preview 3后&#xff0c;新建一个webapi项目&#xff0c;写了如下简单代码&#xff1a;using Microsoft.AspNetCore.Mvc…

如何撬动机器学习的冰山一角?

目前&#xff0c;人工智能的应用日渐广泛。而作为人工智能核心的机器学习&#xff0c;是一门多领域的交叉学科&#xff0c;专门研究计算机模拟或实现人类学习行为的方法&#xff0c;以获取新的知识或技能&#xff0c;重新组织已有的知识结构使之不断改善自身的性能。简单来说&a…

linux 下运行libnids,libnids出错

2017/03/28因为246上一般用的东西都比较多了&#xff0c;就直接使用了这部分。但当初编译的时候也不是我弄得。今天想试一下libnids的编程。编译错误在网上查了下&#xff0c;说是版本不够的原因&#xff0c;那我也不能重新编译了&#xff0c;毕竟还有别人说不定用的好好的。gc…

服务治理治什么,10张图告诉你答案

凌晨四点被公司的监控告警叫醒了&#xff0c;告警的原因是生产环境跑批任务发生故障。即刻起床处理故障&#xff0c;但还是花了不少时间才解决。这次故障是一次数据校验的跑批任务&#xff0c;校验前面跑批任务的数据是否正确。幸运的是&#xff0c;之前的核心任务已经完成&…

Mapgis6.7 林相图自动注记 .

林业行业制作林相图是一件比较复杂的事。Mapgis注释和子图分别对应arcmap中的标注和符号&#xff0c;区文件填充颜色对应arcmap中的渲染。由于mapgis中上注释文本和子图号都只能手工一个个去上。如果能结合arcmap出图的强大功能&#xff0c;由arcmap负责制图&#xff0c;mapgis…

在linux文件共享接口,入坑Linux-day13(使用vsftpd服务传输文件、使用Samba或NFS实现文件共享)...

一、文件传输协议#ftp是一种在互联网中进行的文件传输协议&#xff0c;基于客户端/服务器模式&#xff0c;默认使用20、21号端口&#xff0c;其中端口20(数据端口)用于进行数据传输&#xff0c;端口21(命令端口)用于接收客户端发出的相关FTP命令与参数。#FTP工作的两种模式主动…

大数据分析了50万条拼多多商品数据, 得出了这样的结论

一、缘起我在杭州有位朋友&#xff0c;提到有家做社交的电商很火&#xff0c;叫拼多多&#xff0c;我没有在意&#xff0c;直到有一天&#xff0c;我居然在电视上看到了它的广告&#xff0c;广告画面活蹦乱跳&#xff0c;余音绕梁&#xff0c;我惊呆了&#xff0c;想知道这是何…