轻量级消息队列RedisQueue

消息队列(Message Queue)是分布式系统必不可少的中间件,大部分消息队列产品(如RocketMQ/RabbitMQ/Kafka等)要求团队有比较强的技术实力,不适用于中小团队,并且对.NET技术的支持力度不够。而Redis实现的轻量级消息队列很简单,仅有Redis常规操作,几乎不需要开发团队掌握额外的知识!

随着强大的.NET5发布,.NET技术栈里面怎可没有最佳的消息队列搭档?

本文从高性能Redis组件 NewLife.Redis 出发,借用快递业务场景,讲解.NET中如何使用Redis作为消息队列,搭建企业级分布式系统架构!

什么是消息队列

消息队列就是消息在传输过程中保存消息的容器,其核心功用是削峰解耦

早高峰,快递公司的货车前来各驿站卸货,多名站点工作人员使用PDA扫描到站,大量信息进入系统(1000tps),而通知快递公司的接口只有400tps的处理能力。

通过增加MQ来保存消息,让超过系统处理能力的消息滞留下来,等早高峰过后,系统即可完成处理。此为削峰

在快递柜业务流程中,快递员投柜后需要经历扣减系统费、短信通知用户和推送通知快递公司三个业务动作。传统做法需要依次执行这些业务东西,如果其中某一步异常(例如用户手机未开机或者快递公司接口故障),将会延迟甚至中断整个投柜流程,严重影响用户体验。

如果接口层收到投柜数据后,写入消息到MQ,后续三个子系统各自消费处理,将可以完美解决该问题,并且子系统故障不影响上游系统!此为解耦

内存消息队列

最简单的消息队列,可以由阻塞集合BlockingCollection实现

public static void Start()
{var queue = new BlockingCollection<Area>();// 独立线程消费var thread = new Thread(s => Consume(queue));thread.Start();// 发布消息Public(queue);
}
private static void Public(BlockingCollection<Area> queue)
{var area = new Area { Code = 110000, Name = "北京市" };XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);area = new Area { Code = 310000, Name = "上海市" };XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);area = new Area { Code = 440100, Name = "广州市" };XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);
}
private static void Consume(BlockingCollection<Area> queue)
{while (true){var msg = queue.Take();if (msg != null){XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);}}
}

每秒钟生产一个消息,都被独立线程消费到。

Redis做消息队列

Redis的LIST结构,具备左进右出的功能,再使用BRPOP的阻塞弹出,即可完成一个最基本的消息队列 RedisQueue<T>。

GetQueue取得队列后,Add方法发布消息。

TakeOne拉取消费一条消息,指定10秒阻塞,10秒内有消息立马返回,否则等到10秒超时后返回空。

public static void Start(FullRedis redis)
{var topic = "EasyQueue";// 独立线程消费var thread = new Thread(s => Consume(redis, topic));thread.Start();// 发布消息Public(redis, topic);
}
private static void Public(FullRedis redis, String topic)
{var queue = redis.GetQueue<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" });Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" });Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "广州市" });Thread.Sleep(1000);
}
private static void Consume(FullRedis redis, String topic)
{var queue = redis.GetQueue<Area>(topic);while (true){var msg = queue.TakeOne(10);if (msg != null){XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);}}
}

LPUSH 生产消息(插入列表),BRPOP 消费消息(弹出列表),因此,消息被消费后就消失了!

从日志时间可以看到,生产与消费的时间差在1~3ms之间,延迟极低!

注释消费代码后重跑,可以在Redis中看到发布的消息

需要确认的队列

如果通知快递公司的物流推送子系统处理消息时出错,消息丢失怎么办?显然不可能让上游再发一次!

这里我们需要支持消费确认的可信队列 RedisReliableQueue<T>。消费之后,除非程序主动确认消费,否则Redis不许删除消息。

GetReliableQueue获取队列实例后,Add发布消息,TakeOneAsync异步消费一条消息,并指定10秒阻塞超时,处理完成后再通过Acknowledge确认。

public static void Start(FullRedis redis)
{var topic = "AckQueue";// 独立线程消费var source = new CancellationTokenSource();Task.Run(() => ConsumeAsync(redis, topic, source.Token));// 发布消息Public(redis, topic);source.Cancel();
}
private static void Public(FullRedis redis, String topic)
{var queue = redis.GetReliableQueue<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" });Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" });Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "广州市" });Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{var queue = redis.GetReliableQueue<String>(topic);while (!token.IsCancellationRequested){var mqMsg = await queue.TakeOneAsync(10);if (mqMsg != null){var msg = mqMsg.ToJsonEntity<Area>();XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);queue.Acknowledge(mqMsg);}}
}

LPUSH 生产消息(插入列表),BRPOPLPUSH 消费消息(弹出列表并插入另一个Ack列表),这是确保不丢消息的关键。LREM 从Ack列表删除,用于消费完成后确认。

如果消费异常,就不会执行该确认操作,滞留在Ack列表的消息,60秒后重新回来主列表。

脑筋急转弯:如果应用进程异常退出,未确认的消息该怎么处理? 

注释消费代码后重跑,可以在Redis中看到发布的消息,跟普通队列一样,使用了LIST结构

处理“北京市”消息时,如果没有Acknowledge确认,Redis里面将会看到一个名为AckQueue:Ack:*的LIST结构,里面保存这这一条消息。所以,可信队列本质上就是在消费时,同步把消息备份到另一个LIST里面,确认操作就是从待确认LIST里面删除。

自从有了这个可信队列,基本上足够满足90%以上业务需求。

延迟队列

某一天,小马哥说,快递员投柜一定时间时候,如果用户没有来取件,那么系统需要收取超期取件费,需要一个延迟队列。

于是想到了Redis的ZSET,我们再来一个 RedisDelayQueue<T>,Add生产消息时多了一个参数,指定若干秒后可以消费到该消息,消费用法跟可信队列一样。

public static void Start(FullRedis redis)
{var topic = "DelayQueue";// 独立线程消费var source = new CancellationTokenSource();Task.Run(() => ConsumeAsync(redis, topic, source.Token));// 发布消息Public(redis, topic);source.Cancel();
}
private static void Public(FullRedis redis, String topic)
{var queue = redis.GetDelayQueue<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" }, 2);Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" }, 2);Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "广州市" }, 2);Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{var queue = redis.GetDelayQueue<String>(topic);while (!token.IsCancellationRequested){var mqMsg = await queue.TakeOneAsync(10);if (mqMsg != null){var msg = mqMsg.ToJsonEntity<Area>();XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);queue.Acknowledge(mqMsg);}}
}

上图可以看到,每秒生产一个消息,2秒后消费到北京市,再过1秒消费到上海市(距离上海市的发布刚好2秒)。这里少了广州市,因为测试程序在生产广州市后,只等了1秒就退出。

我们从Redis中可以看到广州市这一条消息,存放在ZSET结构中。

多消费组可重复消费的队列

又一天,数据中台的小伙伴想要消费订单队列,但是不能够啊,LIST结构做的队列,每个消息只能被消费一次,如果数据中台的系统消费掉了,其它业务系统就会失去消息。

我们想到了Redis5.0开始新增的STREAM结构,再次封装RedisStream。

public static void Start(FullRedis redis)
{var topic = "FullQueue";var queue = redis.GetStream<String>(topic);// 独立线程消费var source = new CancellationTokenSource();Task.Run(() => ConsumeAsync(redis, topic, source.Token));// 发布消息Public(redis, topic);//source.Cancel();
}
private static void Public(FullRedis redis, String topic)
{var queue = redis.GetStream<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" });Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" });Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "广州市" });Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{var queue = redis.GetStream<String>(topic);queue.Group = "test";queue.GroupCreate(queue.Group);while (!token.IsCancellationRequested){try{var mqMsg = await queue.TakeMessageAsync(10);if (mqMsg != null){var msg = mqMsg.GetBody<Area>();XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);queue.Acknowledge(mqMsg.Id);}}catch (Exception ex){XTrace.WriteException(ex);}}
}

生产过程不变,消费大循环有点特别,主要是STREAM消费回来的消息,有它自己的Id,只需要对这个Id确认就可以了。

上图中,红色框是生产,紫色框是消费。

再来看看Redis中,可以看到STREAM消息还在里面。数据中台组只需要使用不同的消费组Group,即可独立消费,不用担心抢其它系统消息啦。

最佳实践

RedisQueue在中通大数据分析中,用于缓冲等待写入Oracle/MySql的数据,多线程计算后写入队列,然后由专门线程定时拉取一批(500行),执行批量Insert/Update操作。该系统队列,每天10亿条消息,Redis内存分配8G,实际使用小于100M,除非消费端故障导致产生积压。

递易智能科技全部使用可信队列 RedisReliableQueue,约200多个队列,按系统分布在各自的Redis实例,公有云2G内存主从版。积压消息小于10万时,队列专用的Redis实例内存占用小于100M,几乎不占内存空间。

公司业务每天带来100万多订单,由此衍生的消息数约1000万条,从未丢失消息!

例程代码

代码:https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/306143.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生态和能力是国内自研操作系统发展的关键

“缺芯少魂”一直是我国信息产业短板&#xff0c;如果无法实现国产化替代&#xff0c;信息安全和产业安全就犹如沙滩上盖房子&#xff0c;上层再坚固&#xff0c;地基不稳&#xff0c;一遇到风吹草动就有可能全部垮掉。近年来&#xff0c;国内自研操作系统厂商动作频频&#xf…

matlab群延时函数,群延迟函数(group delay function)群延迟滤波器 | 学步园

最近看了许多介绍Group delay function的论文&#xff0c;文章中大篇幅提到Group delay&#xff0c;group delay of digital filters,对这个方面的知识好像还挺有用的&#xff0c;所以想把它记录下来。然后总结下计算Group delay function的步骤。假设有N个样本的脉冲响应为h(n…

怎样使用C# 获取WIFI的连接状态?

怎样使用C# 获取WIFI的连接状态&#xff1f;行文导航思路问题得到解决代码展示断开与连接WIFI状态效果在OrangePI Linux Arm32上的测试效果C# 获取WIFI的连接状态本文是在知道WIFI网络设备名称的情况下&#xff0c;获取该设备的连接状态&#xff0c;同样也是可以判断是否已连接…

如何在 ASP.NET Core 中使用 URL Rewriting 中间件

URL rewriting 是根据预先配置好的一组规则去修改 request url&#xff0c;值得注意的是&#xff1a;URL Rewriting 的重写功能和 url 重定向 是两个概念&#xff0c;本篇我们就来讨论下如何在 ASP.NET Core 中对 url 进行 rewriting。安装 URL Rewriting 中间件 要想使用 URL …

睡眠分期matlab代码,非接触式睡眠分期方法与流程

本发明属于雷达监测技术领域&#xff0c;特别是一种非接触式睡眠分期方法。背景技术&#xff1a;传统的呼吸睡眠监护系统主要依靠贴附于人体的接触式传感器、电极进行测量&#xff0c;从而实时获得人体的生命参数信号&#xff0c;这些方法都需要直接或间接地接触人体&#xff0…

叮咚!你有一份来自明源云的圣诞邀约

请查收&#xff0c;来自明源云的圣诞邀约&#xff5e;

java先进先出 循环队列,JavaScript队列、优先队列与循环队列

队列是一种遵从先进先出(FIFO)原则的有序集合队列在尾部添加新元素&#xff0c;从顶部移除元素队列的理解队列在我们生活中最常见的场景就是排队了队列这个名字也已经很通俗易懂了和栈很像&#xff0c;这不过队列是先入先出的数据结构队列的前面是队头队列的后面是队尾出队从队…

Abp小试牛刀之 图片上传

图片上传是很常见的功能&#xff0c;里面有些固定的操作也可以沉淀下来。本文记录使用Abp vNext做图片上传的姿势。目标上传图片----->预览图片----->确定保存支持集群部署实现思路&#xff1a;1. 上传图片要使用WebAPI特定媒体类型&#xff1a;multipart/form-data;2. 因…

.Net Conf 2020 之回顾

Intro上周 .NET Conf 在苏州成功举办了第二届活动&#xff0c;一年一度的 .NET 盛会又来了&#xff0c;今年大会依然有许多从外地过来参加的开发者们&#xff0c;也有很多讲师也是从外地赶过来为我们分享。虽然今年是疫情的一年&#xff0c;并没有影响 .NET Conf 参会者们的热情…

如何使用 C# 在异步代码中处理异常

异常处理是一种处理运行时错误的技术&#xff0c;而 异步编程 允许我们在处理资源密集型的业务逻辑时不需要在 Main 方法或者在 执行线程 中被阻塞&#xff0c;值得注意的是&#xff0c;异步方法和同步方法的异常处理机制是不一样的&#xff0c;本篇我们就来讨论下如何在异步方…

对 Redis 中的有序集合SortedSet的理解

本篇说一下Redis中的 有序集合类型&#xff0c;曾几何时&#xff0c;我们想把所有数据存到内存中的 数据结构 中&#xff0c;但为了多机器共享内存&#xff0c;不得不将这块内存包装成wcf单独部署&#xff0c;同时还要考虑怎么序列化&#xff0c;烦心事太多太多。。。后来才知道…

代码质量在「内卷时代」的重要性

这里是Z哥的个人公众号每周五11&#xff1a;45 按时送达当然了&#xff0c;也会时不时加个餐&#xff5e;我的第「173」篇原创敬上大家好&#xff0c;我是Z哥。提到代码质量&#xff0c;不知道你的脑海中浮现出的第一个词是什么&#xff1f;规范&#xff1f;可读性&#xff1f;…

.NET Core AWS S3云存储

【导读】最近有需要用到AWS S3云存储上传附件&#xff0c;这里对利用.NET或.NET Core在调用SDK APi需要注意的一点小问题做个记录&#xff0c;或许能对后续有用到的童鞋提供一点帮助Amazon Simple Storage Service (Amazon S3) 是一种对象存储服务&#xff0c;提供行业领先的可…

MiniProfiler,一个.NET简单但有效的微型分析器

背景MVC MiniProfiler是Stack Overflow团队设计的一款对ASP.NET MVC的性能分析的小程序。可以对一个页面本身&#xff0c;及该页面通过直接引用、Ajax、Iframe形式访问的其它页面进行监控,监控内容包括数据库内容&#xff0c;并可以显示数据库访问的SQL&#xff08;支持EF、EF …

龙芯.NET正式发布 稳步推进生态建设

2020年12月19日&#xff0c;2020中国. NET开发者大会于苏州开幕。此次大会上&#xff0c;龙芯发布了龙芯.NET 3。龙芯.NET 3基于.NET Core 3.1&#xff0c;支持该版本具备的所有主要功能&#xff0c;包括GC、AOT等。CoreCLR、CoreFX、ASP.NET Core等库的测试通过情况与x64/arm6…

有温度的技术,改善上亿人的生活

06有温度的技术&#xff0c;改善上亿人的生活鱼小皮哥&#xff0c;现在的 APP 真是越来越难用了&#xff0c;功能多、操作复杂、广告更多。唉&#xff0c;可不是么&#xff0c;而且人们的生活已经离不开 APP 了&#xff01;老百鱼小皮我爷爷最近的视力下降的很快&#xff0c;用…

Linux链接文件包括,Linux操作系统——系统各目录有什么作用、以及文件链接过程...

三、 Linux 系统目录结构/bin — 用来贮存用户命令。目录 /usr/bin 也被用来贮存用户命令。/sbin — 许多系统命令(例如 shutdown)的贮存位置。目录 /usr/sbin 中也包括了许多系统命令。/root — 根用户(超级用户)的主目录。/mnt — 该目录中通常包括系统引导后被挂载的文件系统…

刚转Java?那准备转回.NET5吧!

再过几天就2021年了&#xff0c;回首今年最大的变化就是.NET5的发布&#xff0c;终结了.NET Framework和.NET Core两个分支。虽然因为新冠疫情原因&#xff0c;原定于.NET5的部分功能被推迟到.NET6了&#xff0c;但.NET5是一个非常非常重要的版本&#xff0c;会载入史册的一个版…

在 xunit 测试项目中使用依赖注入

在 xunit 测试项目中使用依赖注入Intro之前写过几篇 xunit 依赖注入的文章&#xff0c;今天这篇文章将结合我在 .NET Conf 上的分享&#xff0c;更加系统的分享一下在测试中的应用案例。之所以想分享这个话题是因为我觉得在我们开发过程中测试是非常重要的一部分&#xff0c;高…

利用 C# 中的 FileSystemWatcher 制作一个文件夹监控小工具

利用 C# 中的 FileSystemWatcher 制作一个文件夹监控小工具独立观察员 2020 年 12 月 26 日前一段看到微信公众号 “码农读书” 上发了一篇文章《如何使用 C# 中的 FileSystemWatcher》&#xff08;翻译自&#xff1a;https://www.infoworld.com/article/3185447/how-to-work-w…