.NET 轻量级、高效任务调度器:ScheduleTask

前言

至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel

这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用。

正文

技术栈用到了:BackgroundServiceNCrontab

第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:

public interface IScheduleTask
{Task ExecuteAsync();
}
public abstract class ScheduleTask : IScheduleTask
{public virtual Task ExecuteAsync(){return Task.CompletedTask;}
}

第二步定义特性标注任务执行周期等信的metadata

[AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
public class ScheduleTaskAttribute(string cron) : Attribute
{/// <summary>/// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron/// 最小单位为分钟/// </summary>public string Cron { get; set; } = cron;public string? Description { get; set; }/// <summary>/// 是否异步执行.默认false会阻塞接下来的同类任务/// </summary>public bool IsAsync { get; set; } = false;/// <summary>/// 是否初始化即启动,默认false/// </summary>public bool IsStartOnInit { get; set; } = false;
}

第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:

public interface IScheduler
{/// <summary>/// 判断当前的任务是否可以执行/// </summary>bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
}

好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:

public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
{public Type ScheduleTaskType { get; set; } = scheduleTaskType;public string Cron { get; set; } = cron;public string? Description { get; set; }public bool IsAsync { get; set; } = false;public bool IsStartOnInit { get; set; } = false;
}
public interface IScheduleMetadataStore
{/// <summary>/// 获取所有ScheduleTaskMetadata/// </summary>Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();
}

实现一个Configuration级别的Store

internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
{const string Key = "BiwenQuickApi:Schedules";public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync(){var options = configuration.GetSection(Key).GetChildren();if (options?.Any() is true){var metadatas = options.Select(x =>{var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);if (type is null)throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!){Description = x[nameof(ConfigurationScheduleOption.Description)],IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),};});return Task.FromResult(metadatas);}return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());}
}

然后,我们可能需要多任务调度的事件做一些操作或者日志存储。

比如失败了该干嘛,完成了回调其他后续业务等。

我们再来定义一下具体的事件IEvent,具体可以参考文章: https://www.cnblogs.com/vipwan/p/18184088

事件IEvent代码

1、首先定义一个事件约定的空接口

public interface IEvent{}

2、然后定义事件订阅者接口

public interface IEventSubscriber<T> where T : IEvent
{Task HandleAsync(T @event, CancellationToken ct);/// <summary>/// 执行排序/// </summary>int Order { get; }/// <summary>/// 如果发生错误是否抛出异常,将阻塞后续Handler/// </summary>bool ThrowIfError { get; }
}
public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent
{public abstract Task HandleAsync(T @event, CancellationToken ct);public virtual int Order => 0;/// <summary>/// 默认不抛出异常/// </summary>public virtual bool ThrowIfError => false;
}

3、接着就是发布者

internal class Publisher(IServiceProvider serviceProvider)
{public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent{var handlers = serviceProvider.GetServices<IEventSubscriber<T>>();if (handlers is null) return;foreach (var handler in handlers.OrderBy(x => x.Order)){try{await handler.HandleAsync(@event, ct);}catch{if (handler.ThrowIfError){throw;}//todo:}}}
}

4、到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了

public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
{/// <summary>/// 任务/// </summary>public IScheduleTask ScheduleTask { get; set; } = scheduleTask;/// <summary>/// 触发时间/// </summary>public DateTime EventTime { get; set; } = eventTime;
}
/// <summary>
/// 执行完成
/// </summary>
public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
{/// <summary>/// 执行结束的时间/// </summary>public DateTime EndTime { get; set; } = endTime;
}
/// <summary>
/// 执行开始
/// </summary>
public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
/// <summary>
/// 执行失败
/// </summary>
public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
{/// <summary>/// 异常信息/// </summary>public Exception Exception { get; private set; } = exception;
}

接下来我们再实现基于NCrontab的简易调度器,这个调度器主要是解析Cron表达式判断传入时间是否可以执行ScheduleTask,具体的代码:

internal class SampleNCrontabScheduler : IScheduler
{/// <summary>/// 暂存上次执行时间/// </summary>private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime){var now = DateTime.Now;var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);if (!haveExcuteTime){var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);//如果不是初始化启动,则不执行if (!scheduleMetadata.IsStartOnInit)return false;}if (now >= time){var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);//更新下次执行时间LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);return true;}return false;}
}

然后就是核心的BackgroundService了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer等实现更复杂精度更高的调度,这里就不展开讲了。

代码如下:

internal class ScheduleBackgroundService : BackgroundService
{private static readonly TimeSpan _pollingTime
DEBUG//轮询20s 测试环境下,方便测试。= TimeSpan.FromSeconds(20);
if
!DEBUG//轮询60s 正式环境下,考虑性能轮询时间延长到60s= TimeSpan.FromSeconds(60);
if//心跳10s.private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);private readonly ILogger<ScheduleBackgroundService> _logger;private readonly IServiceProvider _serviceProvider;public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider){_logger = logger;_serviceProvider = serviceProvider;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var pollingDelay = Task.Delay(_pollingTime, stoppingToken);try{await RunAsync(stoppingToken);}catch (Exception ex){//todo:_logger.LogError(ex.Message);}await WaitAsync(pollingDelay, stoppingToken);}}private async Task RunAsync(CancellationToken stoppingToken){using var scope = _serviceProvider.CreateScope();var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();if (tasks is null || !tasks.Any()){return;}//调度器var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata){if (scheduler.CanRun(metadata, DateTime.Now)){var eventTime = DateTime.Now;//通知启动_ = new TaskStartedEvent(task, eventTime).PublishAsync(default);try{if (metadata.IsAsync){//异步执行_ = task.ExecuteAsync();}else{//同步执行await task.ExecuteAsync();}//执行完成_ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);}catch (Exception ex){_ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);}}};//注解中的taskforeach (var task in tasks){if (stoppingToken.IsCancellationRequested){break;}//标注的metadatasvar metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();if (!metadatas.Any()){continue;}foreach (var metadata in metadatas){await DoTaskAsync(task, metadata);}}//store中的schedulervar stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();//并行执行,提高性能Parallel.ForEach(stores, async store =>{if (stoppingToken.IsCancellationRequested){return;}var metadatas = await store.GetAllAsync();if (metadatas is null || !metadatas.Any()){return;}foreach (var metadata in metadatas){var attr = new ScheduleTaskAttribute(metadata.Cron){Description = metadata.Description,IsAsync = metadata.IsAsync,IsStartOnInit = metadata.IsStartOnInit,};var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;if (task is null){return;}await DoTaskAsync(task, attr);}});}private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken){try{await Task.Delay(_minIdleTime, stoppingToken);await pollingDelay;}catch (OperationCanceledException){}}
}

最后收尾阶段我们老规矩扩展一下IServiceCollection:

internal static IServiceCollection AddScheduleTask(this IServiceCollection services)
{foreach (var task in ScheduleTasks){services.AddTransient(task);services.AddTransient(typeof(IScheduleTask), task);}//调度器services.AddScheduler<SampleNCrontabScheduler>();//配置文件Store:
ices.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();//BackgroundServiceservices.AddHostedService<ScheduleBackgroundService>();return services;
}
/// <summary>
/// 注册调度器AddScheduler
/// </summary>
public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler
{services.AddSingleton<IScheduler, T>();return services;
}/// <summary>
/// 注册ScheduleMetadataStore
/// </summary>
public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore
{services.AddSingleton<IScheduleMetadataStore, T>();return services;
}

老规矩我们来测试一下:

//通过特性标注的方式执行:
[ScheduleTask(Constants.CronEveryMinute)] //每分钟一次
[ScheduleTask("0/3 * * * *")]//每3分钟执行一次
public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask
{public async Task ExecuteAsync(){//执行5sawait Task.Delay(TimeSpan.FromSeconds(5));logger.LogInformation("keep alive!");}
}
public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
{public Task ExecuteAsync(){logger.LogInformation("Demo Config Schedule Done!");return Task.CompletedTask;}
}

通过配置文件的方式配置Store:

{"BiwenQuickApi": {"Schedules": [{"ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb","Cron": "0/5 * * * *","Description": "Every 5 mins","IsAsync": true,"IsStartOnInit": false},{"ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb","Cron": "0/10 * * * *","Description": "Every 10 mins","IsAsync": false,"IsStartOnInit": true}]}
}

我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:

public class DemoStore : IScheduleMetadataStore
{public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync(){//模拟从数据库或配置文件中获取ScheduleTaskMetadataIEnumerable<ScheduleTaskMetadata> metadatas =[new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2)){Description="测试的Schedule"},];return Task.FromResult(metadatas);}
}
//然后注册这个Store:
builder.Services.AddScheduleMetadataStore<DemoStore>();

所有的一切都大功告成,最后我们来跑一下Demo,成功了

图片

当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!

提供同一时间单一运行中的任务实现

/// <summary>
/// 模拟一个只能同时存在一个的任务.一分钟执行一次,但是耗时两分钟.
/// </summary>
/// <param name="logger"></param>
[ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]
public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask
{public override Task OnAbort(){logger.LogWarning($"[{DateTime.Now}]任务被打断.因为有一个相同的任务正在执行!");return Task.CompletedTask;}public override async Task ExecuteAsync(){var now = DateTime.Now;//模拟一个耗时2分钟的任务await Task.Delay(TimeSpan.FromMinutes(2));logger.LogInformation($"[{now}] ~ {DateTime.Now} 执行一个耗时两分钟的任务!");}
}

源码地址

https://github.com/vipwan/Biwen.QuickApi

https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/17424.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全能集成开发平台Team·IDE

三甲医院的床位太难等了。反正也是小手术&#xff0c;老苏周五在附近找了家二甲医院&#xff0c;幸运的是&#xff0c;门诊迅速为我开具了入院证。周六早晨就接受了手术&#xff0c;周日挂了一天水&#xff0c;周一下午就出院了。准备在家先休息两天。 2~4 周之后把支架取出来…

阿里云海外视频安全的DRM加密

随着科技的进步&#xff0c;视频以直播或录播的形式陆续开展海外市场&#xff0c;从而也衍生出内容安全的问题&#xff0c;阿里云在这方面提供了完善的内容安全保护机制&#xff0c;适用于不同的场景&#xff0c;如在视频安全提供DRM加 由图可以了解到阿里云保护直播安全的方法…

【软件设计师】程序语言

1.程序设计语言基本概念 1.1 低级语言与高级语言 低级语言&#xff1a;机器语言和汇编语言称为低级语言 机器语言指0.&#xff0c;1组成的机器指令序列 汇编语言指用符号表示指令的语言&#xff0c;如MOV AX&#xff0c;2 高级语言&#xff1a;从人类的逻辑角度出发&#xff0…

numpy-mkl的下载地址

不要使用pip3直接在终端安装&#xff0c;因为pip3默安装的是numpy&#xff0c;而不是numpymkl。 采用在第三方库中手动下载后&#xff0c;再安装的方式。 第三方库网址&#xff1a;https://www.lfd.uci.edu/~gohlke/pythonlibs/#numpy 如果不能进入就可以选择去git里面&#x…

(三)MobaXterm、VSCode、Pycharm ssh连接服务器并使用

背景&#xff1a;根据前两篇文章操作完成后&#xff0c; 手把手教学&#xff0c;一站式安装ubuntu及配置服务器-CSDN博客 手把手教学&#xff0c;一站式教你实现服务器&#xff08;Ubuntu&#xff09;Anaconda多用户共享-CSDN博客 课题组成员每人都有自己的帐号了&#xff0…

互联网政务应用安全管理规定:使用安全连接方式访问

前几日&#xff0c;由中央网络安全和信息化委员会办公室、中央机构编制委员会办公室、工业和信息化部、公安部等4部门联合制定的《互联网政务应用安全管理规定》&#xff08;以下简称规定&#xff09;发布了&#xff0c;规定定义了互联网政务应用&#xff0c;也对互联网政务应用…

Android数据缓存框架 - 内存数据载体从LiveData到StateFlow

引言&#xff1a;所有成功者的背后&#xff0c;都有一份艰苦的历程&#xff0c;不要只看到了人前的风光&#xff0c;而低估了他们背后所付出的努力。 随着flow到流行度越来越高&#xff0c;有开发者呼吁我使用flow&#xff0c;于是我就如你们所愿&#xff0c;新增了StateFlow作…

智能时代下,人机交互和虚拟现实的机遇和挑战

智能时代下,人机交互和虚拟现实的机遇和挑战

多态(C++)

多态(C) 本文如果有错误或者不足的地方&#xff0c;希望各位大佬多多指点。 【本文目录】 1.多态的概念2.多态的定义及实现3.抽象类4.多态的原理5.单继承和多继承的虚函数表 1.多态的概念 多态的概念就是&#xff1a;多种形态 多态就是可以有多种的形态。不同的身份去实现同一…

【Leetcode 160】环形链表——双指针,细节讲解

题目 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#…

RTSP/Onvif安防视频监控云平台EasyNVR重启后通道在线视频无法播放,接口报错502是什么原因?

EasyNVR安防视频云平台是旭帆科技TSINGSEE青犀旗下支持RTSP/Onvif协议接入的安防监控流媒体视频云平台。平台具备视频实时监控直播、云端录像、云存储、录像检索与回看、告警等视频能力&#xff0c;能对接入的视频流进行处理与多端分发&#xff0c;包括RTSP、RTMP、HTTP-FLV、W…

hypack如何采集多波束数据?(下)

多波束测量模块 1&#xff09;记录多波束和辅助传感器的数据&#xff1b; 2&#xff09;显示实时改正后的数据和数据质量信息。 ​编辑​ 测量准备 1&#xff09;设置大地测量参数和硬件设置&#xff1b; 2&#xff09;计划测线 计划测线是一定间距的平行线&#xff0c;…

微软联手清华,AI注释让文本到图像生成更符合人类偏好

获取本文论文原文PDF&#xff0c;请在公众号【AI论文解读】留言&#xff1a;论文解读 摘要 本研究展示了利用人类偏好数据集来精细调整文本到图像生成模型的潜力&#xff0c;增强了生成图像与文本提示之间的一致性。尽管取得了进展&#xff0c;现有的人类偏好数据集要么构建成…

掌控安全CTF-2024年5月擂台赛-WP(部分)

MISC ez_Misc 题目给了一个加密的压缩包和一个文本文档&#xff0c;首先我们先来看文本的内容&#xff0c;如下&#xff1a; 很容易看出&#xff0c;0宽隐写&#xff0c;用PuzzleSolver梭哈一下&#xff0c;发现了&#xff1a;Thi3 is n0t 2 hint 又在文本中发现一个特征&…

【2024】高校网络安全管理运维赛

比赛时间&#xff1a;2024-05-06 Re-easyre 基本的base64换表&#xff0c;用CyberChef解密 Re-babyre 进入主函数&#xff0c;发现输入四次 看一下就知道是大数求解 (当初写的时候差不多 不知道为什么第四个总是算错…) from z3 import *s Solver() # 设置一个解方程的类…

中心渗透Ⅱ

cs与msf权限传递以及mimikatz抓取win2012明文密码 使用Cobalt Strike抓取win2012明文密码&#xff0c;将会话传递到Metasploit Framework上 1.cs生成木马并使目标服务器中马 建立监听生成木马 2.抓取目标主机的明文密码 通过修改注册表来让Wdigest Auth保存明文口令 shell …

深入pandas:数据分析

目录 前言 第一点&#xff1a;导入模块 第二点&#xff1a;准备数据 第三点&#xff1a;简单的分析数据 第四点&#xff1a;【重点】数据透支 总结 前言 在数据分析与挖掘的领域&#xff0c;了解如何使用工具和方法来探索数据是至关重要的。本文将探讨如何利用Python中的…

C语言常用字符串处理函数

C语言中包含了很多对字符串处理的函数,要使用这些函数&#xff0c; 首先需要导入头文件#include <string.h> 1. strlen() -- 计算字符串长度 原型: size_t strlen(char const *string); 例: char *str "abcde"; size_t len strlen(str); // 结果为…

【DevOps】Elasticsearch在Ubuntu 20.04上的安装与配置:详细指南

目录 一、ES 简介 1、核心概念 2、工作原理 3、 优势 二、ES 在 Ubuntu 20.04 上的安装 1、安装 Java 2、下载 ES 安装包 3、创建 ES 用户 4 、解压安装包 5、 配置 ES 6、 启动 ES 7、验证安装 三、ES 常用命令 1、创建索引 2、 插入文档 3、查询文档 四、ES…

利用audacity和ffmpeg制作测试音频文件

最近要用SIPP测试一个场景&#xff0c;需要发送双声道/16K采样率/16bit量化的PCM流&#xff0c;但是下载的素材往往不能满足参数要求。那么就自己制作。 首先下载mp3文件&#xff0c;并用audacity打开。 接下来&#xff0c;点击菜单栏中轨道-重采样&#xff0c;将采样频率设为1…