借助Redis完成延时任务

背景

相信我们或多或少的会遇到类似下面这样的需求:

第三方给了一批数据给我们处理,我们处理好之后就通知他们处理结果。

大概就是下面这个图说的。

本来在处理完数据之后,我们就会马上把处理结果返回给对方,但是对方要求我们处理速度不能过快,要有一种人为处理的效果。

换句话就是说,就算是处理好了,也要晚一点再执行通知操作。

这就是一个典型的延时任务。

延时,那还不简单,执行完之后,让它Sleep一下就好了,这样就达到目标了。

Sleep一下确定是最容易实现的一种方案,但是试想一下,数据的数量不断的增加,这样Sleep真的好吗?答案是否定的。

延时队列,是处理这个场景最为妥当的方案。

RabbitMQ,RocketMQ,Cmq等都可以直接或间接的达到相应的效果。

如果不具备队列条件,又要怎么处理呢?还可以借助Redis来完成这项工作。

MQ不一定每个公司都会用,但Redis应该80%以上的都会用吧。

处理方案

Redis这边,可用的方案有两种,下面分别来介绍一下。

#1 键的过期时间

在设置缓存的时候,我们比较多情况下都会设置一个缓存的过期时间,这个时间过期后,会重新去数据源拿数据回来。

可以基于这个过期时间结合Redis的keyspace notifications共同完成。

keyspace notifications里面包含了非常多的事件,这里只关注EXPIRE,这个是和过期有关的。

只要订阅了__keyevent@0__:expired这个主题,当有key过期的时候,就会收到对应的信息。

主题@后面的0,指的是db 0.

要想使用这个特性,必不可少的一步是修改Redis默认的配置,把notify-keyspace-events设置成Ex

############################# Event notification ############################### Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications
#
# .........
#
#  By default all notifications are disabled because most users don't need
#  this feature and the feature has some overhead. Note that if you don't
#  specify at least one of K or E, no events will be delivered.
notify-keyspace-events "Ex"

其中 E 指的是键事件通知,x 指的是过期事件。

根据这个特性,重新调整一下流程图:

应该也比较好懂,下面通过简单的代码来实现一下这种方案。

首先是处理完数据及往Redis写数据。

public async Task DoTaskAsync()
{// 数据处理// ...// 后续操作要延时,把Id记录下来var taskId = new Random().Next(1, 10000);// 要延迟的时间int sec = new Random().Next(1, 5);// 可以加个重试机制,预防单次执行失败。await RedisHelper.SetAsync($"task:{taskId}", "1", sec);
}

还需要回传结果的后台任务,这个任务就是去订阅上面说的键过期事件,然后回传结果。

这里可以借助BackgroundService来订阅处理。

public class SubscribeTaskBgTask : BackgroundService
{protected override Task ExecuteAsync(CancellationToken stoppingToken){stoppingToken.ThrowIfCancellationRequested();var keyPrefix = "task:";RedisHelper.Subscribe(("__keyevent@0__:expired", arg =>{var msg = arg.Body;Console.WriteLine($"recive {msg}");if (msg.StartsWith(keyPrefix)){// 取到任务Idvar val = msg.Substring(keyPrefix.Length);Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");// 回传处理结果给第三方,这里可以考虑这个并发锁,避免多实例都处理了这个任务。// ....}}));return Task.CompletedTask;}
}

这里有一个要注意的地方,要在key里面包含任务的Id,因为订阅处理的时候,只能拿到一个key,后续能做的操作也只是基于这个key。

上面的例子,是用了task:任务Id的形式,所以在订阅处理的时候,只处理以task:开头的那些key。

效果如下:

这种方案,直观上是非常简单的,不过这种方案会遇到一个小问题。

当一个key过期后,并不一定会马上收到通知,这个也是会有一定的延时的,取决于Redis的内部机制。

Redis Keyspace Notifications文档的最后一段也提到了这个问题。

所以用这种方案的时候,要考虑一下,你的延时是不是要及时~~

#2 有序集合

有序集合是Redis中一种十分有用的数据结构,它的本质其实就是集合加了一个排序的功能,每个集合里面的元素还会有一个分值的属性。

它提供了一个可以获取指定分值范围内的元素,这个也就是我们的出发点。

在这个场景下,什么东西可能作为这个分值呢?现在只有一个处理任务的Id还有一个延迟的时间,Id肯定不行,那么也只能是延迟时间来作这个分值了。

延迟1秒,5秒,1分钟,这个都是比较大粒度的时间,这里要转化一下,用时间戳来代替这些延迟的时间。

假设现在的时间戳是 1584171520, 要延迟5秒执行,那么执行任务的时间就是 1584171525,在当前时间戳的基础上加个5秒,就是最终要执行的了。

到时有序集合中存的元素就会是这样的

任务Id-1 1584171525
任务Id-2 1584171528
任务Id-3 1584171530

接下来就是要怎么取出这些任务的问题了!

把当前时间戳当成是取数的最大分值,0作为最小分值,这个时候取出的元素就是应该要执行回传的任务了。

根据这个方案,重新调整一下流程图:

交代清楚了思路,再来点代码,加深一下理解。

首先还是处理完数据后往Redis写数据。

public async Task DoTaskAsync()
{// 数据处理// ...// 后续操作要延时,把Id记录下来var taskId = new Random().Next(1, 10000);var cacheKey = "task:delay";int sec = new Random().Next(1, 5);// 要执行这个任务的时间戳var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();await RedisHelper.ZAddAsync(cacheKey, (time, taskId));Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}");
}

后面就是轮训有序集合里面的元素了,这里同样是借助BackgroundService来处理。

public class SubscribeTaskBgTask : BackgroundService
{protected override async Task ExecuteAsync(CancellationToken stoppingToken){stoppingToken.ThrowIfCancellationRequested();var cacheKey = "task:delay";while (true){// 先取,后删,不具备原子性,可考虑用lua脚本来保证原子性。var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);if (vals != null && vals.Length > 0){var val = vals[0];var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals);if (rmCount > 0){// 要把这个元素先删除成功了,再执行任务,不然会重复Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");// 回传处理结果给第三方,这里可以考虑这个并发锁,避免多实例都处理了这个任务。// ....}}else{// 没有数据,休眠500ms,避免CPU空转await Task.Delay(500);}}}
}

效果如下:

参考文章

https://redis.io/topics/notifications

https://zhuanlan.zhihu.com/p/87113913

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/310328.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Java基础]HashSet集合概述和特点

HashSet集合概述和特点: 练习代码如下: package HashSetPackage;import java.util.HashSet;public class HashSetDemo {public static void main(String[] args){HashSet<String> hs new HashSet<String>();hs.add("hello");hs.add("world")…

多亏我缓存技术过硬!疫情防控项目上线,我只用了5天!

先介绍下背景&#xff0c;我是武汉某O2O电商公司开发组长&#xff0c;疫情震中的我被老板要求7天之内上线《疫情防控热点图》项目&#xff0c;几个组员回老家断网&#xff0c;最终就2个人完成开发上线&#xff0c;满足了10w用户的高频访问。时间和人力都紧张&#xff0c;不能按…

数据结构与算法--有序数组中找出和为s的两个数字

有序数组中找和为s的两个数字 题目&#xff1a;输入一个递增排序的数组array&#xff0c; 和一个数字s&#xff0c; 在数组中找出两个数&#xff0c;使得这两个数的和是s&#xff0c;如果有多对&#xff0c;输出一对即可。 最简单方案 双循环&#xff0c;每次获取一个数据&a…

ABP框架使用拦截器动态配置租户过滤器

前言最近项目要求在ABP框架中根据TenantId是否为空来配置是否禁用租户过滤器。ABP自身给我我们禁用租户过滤器的两种方法官方文档https://aspnetboilerplate.com/Pages/Documents/Data-Filters方法一&#xff1a;使用工作单元using (_unitOfWorkManager.Current.DisableFilter(…

[Java基础]LinkedHashSet集合概述和特点

练习代码如下: package LinkedHashSetPack;import java.util.LinkedHashSet;public class LinkedHashSetDemo {public static void main(String[] args){LinkedHashSet<String> linkedHashSet new LinkedHashSet<String>();linkedHashSet.add("hello")…

数据结构与算法--翻转单词顺序

翻转单词顺序 题目&#xff1a;输入一个英文句子&#xff0c;翻转句子中的单词顺序&#xff0c;但是单词内的字符顺序不变&#xff0c;例如&#xff1a;I am a software engineer -> engineer software a am I 方案一&#xff1a;空间换时间 空间换时间方法&#xff0c;还…

在.NET Core中用最原生的方式读取Nacos的配置

背景 之前老黄写过一篇《ASP.NET Core结合Nacos来完成配置管理和服务发现》简单介绍了如何让.NET Core程序接入Nacos&#xff0c;之前的SDK里面更多的是对Nacos的Open API进行了封装以及对服务注册和发现的封装。配置这一块当时并没有过多的处理&#xff0c;用起来有时感觉不会…

[Java基础]TreeSet集合概述和特点

练习代码如下: package TreeSetPack;import java.util.TreeSet;public class TreeSetDemo {public static void main(String[] args){TreeSet<Integer> ts new TreeSet<Integer>();ts.add(10);ts.add(40);ts.add(30);ts.add(50);ts.add(20);ts.add(30);for (Inte…

.NET Core下的开源分布式任务调度系统ScheduleMaster-v2.0低调发布

从1月份首次公开介绍这个项目到现在也快4个月了&#xff0c;期间做了一些修修补补整体没什么大的改动。2.0算是发布之后第一个大的版本更新&#xff0c;带来了许多新功能新特性&#xff0c;也修复了一些已知的bug&#xff0c;在此感谢在博客、Issue和QQ群中提出各种意见的朋友&…

[Java基础]自然排序Comparable的使用

代码如下: package ComparablePack;public class Student implements Comparable<Student>{private String name;private int age;public Student() {}public Student(String name, int age) {this.name name;this.age age;}public String getName() {return name;}pu…

数据结构与算法--我们来玩丢手绢(约瑟夫环问题)

我们来玩丢手绢 昨天我们打扑克&#xff0c;今天我们丢手绢丢手绢我们都知道这个游戏&#xff0c;他的由来由约瑟夫 &#xff08;Josephus&#xff09;提出来的 据说著名犹太历史学家Josephus有过以下的故事&#xff1a;在罗马人占领乔塔帕特后&#xff0c;39 个犹太人与Jose…

后端开发都应该了解点接口的压力测试(Apache Bench版)

背景 小A&#xff1a;小B&#xff0c;最近调你的接口老是超时呀&#xff0c;8秒都还没返回结果&#xff0c;是不是有性能问题呀&#xff01;小B &#xff1a;我看看~~类似这样的对话&#xff0c;在现实中是时有发生的&#xff0c;不是特别严重的话&#xff0c;往往大家也不会去…

数据结构与算法--这个需求很简单怎么实现我不管(发散思维)

发散思维 程序员是一个高危职业&#xff0c;最近动不动就听到谁谁谁猝死&#xff0c;谁谁谁过劳晕倒&#xff0c;所以面对奇葩问题&#xff0c;我们要淡定&#xff0c; 开发中被产品虐&#xff0c;说的最多的一句话就是这个需求很简单&#xff0c;怎么实现我不管 找工作被面试…

[Java基础]比较器排序Comparator的使用

代码如下: package ComparablePack;public class Student {private String name;private int age;public Student() {}public Student(String name, int age) {this.name name;this.age age;}public String getName() {return name;}public void setName(String name) {this…

手把手教你如何构建 WPF 官方开源框架源代码

本文转自林德熙的博客&#xff08;blog.lindexi.com&#xff09;导语从去年微软就将 WPF 开源了&#xff0c;差不多现在所有 WPF 的源代码都开源了。在学习框架的时候&#xff0c;我会做一些改动&#xff0c;期望能构建一个自己的版本进行测试。但是作为一个特别大的框架&#…

数据结构与算法--再来聊聊数组

再来聊聊数组 这篇我们来总结一下数组相关的一些算法&#xff0c;数组的特点在于我们能通过下标得到对应数据&#xff0c;时间复杂度在O(1)&#xff0c;之前有多篇文章有数组相关的体系&#xff0c;一下来一个归纳&#xff1a; 数据结构与算法–判断扑克牌是否顺子 数据结构…

[Java基础]泛型基础

可变参数的使用&#xff1a; 代码如下: package CanChangePack;import java.util.Arrays; import java.util.List;public class ArgsDemo01 {public static void main(String[] args){List<String> list Arrays.asList("hello","world","jav…

数据结构与算法--二叉树第k个大的节点

二叉树第k个大的节点 二叉树文章列表&#xff1a; 数据结构与算法–面试必问AVL树原理及实现 数据结构与算法–二叉树的深度问题 数据结构与算法–二叉堆&#xff08;最大堆&#xff0c;最小堆&#xff09;实现及原理 数据结构与算法–二叉查找树转顺序排列双向链表 数据…