前言
至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用。
正文
技术栈用到了:BackgroundService
和NCrontab
库
第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:
public interface IScheduleTask
{Task ExecuteAsync();
}
public abstract class ScheduleTask : IScheduleTask
{public virtual Task ExecuteAsync(){return Task.CompletedTask;}
}
第二步定义特性标注任务执行周期等信的metadata
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
public class ScheduleTaskAttribute(string cron) : Attribute
{/// <summary>/// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron/// 最小单位为分钟/// </summary>public string Cron { get; set; } = cron;public string? Description { get; set; }/// <summary>/// 是否异步执行.默认false会阻塞接下来的同类任务/// </summary>public bool IsAsync { get; set; } = false;/// <summary>/// 是否初始化即启动,默认false/// </summary>public bool IsStartOnInit { get; set; } = false;
}
第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:
public interface IScheduler
{/// <summary>/// 判断当前的任务是否可以执行/// </summary>bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
}
好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:
public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
{public Type ScheduleTaskType { get; set; } = scheduleTaskType;public string Cron { get; set; } = cron;public string? Description { get; set; }public bool IsAsync { get; set; } = false;public bool IsStartOnInit { get; set; } = false;
}
public interface IScheduleMetadataStore
{/// <summary>/// 获取所有ScheduleTaskMetadata/// </summary>Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();
}
实现一个Configuration级别的Store
internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
{const string Key = "BiwenQuickApi:Schedules";public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync(){var options = configuration.GetSection(Key).GetChildren();if (options?.Any() is true){var metadatas = options.Select(x =>{var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);if (type is null)throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!){Description = x[nameof(ConfigurationScheduleOption.Description)],IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),};});return Task.FromResult(metadatas);}return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());}
}
然后,我们可能需要多任务调度的事件做一些操作或者日志存储。
比如失败了该干嘛,完成了回调其他后续业务等。
我们再来定义一下具体的事件IEvent
,具体可以参考文章: https://www.cnblogs.com/vipwan/p/18184088
事件IEvent
代码
1、首先定义一个事件约定的空接口
public interface IEvent{}
2、然后定义事件订阅者接口
public interface IEventSubscriber<T> where T : IEvent
{Task HandleAsync(T @event, CancellationToken ct);/// <summary>/// 执行排序/// </summary>int Order { get; }/// <summary>/// 如果发生错误是否抛出异常,将阻塞后续Handler/// </summary>bool ThrowIfError { get; }
}
public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent
{public abstract Task HandleAsync(T @event, CancellationToken ct);public virtual int Order => 0;/// <summary>/// 默认不抛出异常/// </summary>public virtual bool ThrowIfError => false;
}
3、接着就是发布者
internal class Publisher(IServiceProvider serviceProvider)
{public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent{var handlers = serviceProvider.GetServices<IEventSubscriber<T>>();if (handlers is null) return;foreach (var handler in handlers.OrderBy(x => x.Order)){try{await handler.HandleAsync(@event, ct);}catch{if (handler.ThrowIfError){throw;}//todo:}}}
}
4、到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了
public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
{/// <summary>/// 任务/// </summary>public IScheduleTask ScheduleTask { get; set; } = scheduleTask;/// <summary>/// 触发时间/// </summary>public DateTime EventTime { get; set; } = eventTime;
}
/// <summary>
/// 执行完成
/// </summary>
public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
{/// <summary>/// 执行结束的时间/// </summary>public DateTime EndTime { get; set; } = endTime;
}
/// <summary>
/// 执行开始
/// </summary>
public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
/// <summary>
/// 执行失败
/// </summary>
public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
{/// <summary>/// 异常信息/// </summary>public Exception Exception { get; private set; } = exception;
}
接下来我们再实现基于NCrontab
的简易调度器,这个调度器主要是解析Cron
表达式判断传入时间是否可以执行ScheduleTask,具体的代码:
internal class SampleNCrontabScheduler : IScheduler
{/// <summary>/// 暂存上次执行时间/// </summary>private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime){var now = DateTime.Now;var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);if (!haveExcuteTime){var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);//如果不是初始化启动,则不执行if (!scheduleMetadata.IsStartOnInit)return false;}if (now >= time){var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);//更新下次执行时间LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);return true;}return false;}
}
然后就是核心的BackgroundService
了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer
等实现更复杂精度更高的调度,这里就不展开讲了。
代码如下:
internal class ScheduleBackgroundService : BackgroundService
{private static readonly TimeSpan _pollingTime
DEBUG//轮询20s 测试环境下,方便测试。= TimeSpan.FromSeconds(20);
if
!DEBUG//轮询60s 正式环境下,考虑性能轮询时间延长到60s= TimeSpan.FromSeconds(60);
if//心跳10s.private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);private readonly ILogger<ScheduleBackgroundService> _logger;private readonly IServiceProvider _serviceProvider;public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider){_logger = logger;_serviceProvider = serviceProvider;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var pollingDelay = Task.Delay(_pollingTime, stoppingToken);try{await RunAsync(stoppingToken);}catch (Exception ex){//todo:_logger.LogError(ex.Message);}await WaitAsync(pollingDelay, stoppingToken);}}private async Task RunAsync(CancellationToken stoppingToken){using var scope = _serviceProvider.CreateScope();var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();if (tasks is null || !tasks.Any()){return;}//调度器var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata){if (scheduler.CanRun(metadata, DateTime.Now)){var eventTime = DateTime.Now;//通知启动_ = new TaskStartedEvent(task, eventTime).PublishAsync(default);try{if (metadata.IsAsync){//异步执行_ = task.ExecuteAsync();}else{//同步执行await task.ExecuteAsync();}//执行完成_ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);}catch (Exception ex){_ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);}}};//注解中的taskforeach (var task in tasks){if (stoppingToken.IsCancellationRequested){break;}//标注的metadatasvar metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();if (!metadatas.Any()){continue;}foreach (var metadata in metadatas){await DoTaskAsync(task, metadata);}}//store中的schedulervar stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();//并行执行,提高性能Parallel.ForEach(stores, async store =>{if (stoppingToken.IsCancellationRequested){return;}var metadatas = await store.GetAllAsync();if (metadatas is null || !metadatas.Any()){return;}foreach (var metadata in metadatas){var attr = new ScheduleTaskAttribute(metadata.Cron){Description = metadata.Description,IsAsync = metadata.IsAsync,IsStartOnInit = metadata.IsStartOnInit,};var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;if (task is null){return;}await DoTaskAsync(task, attr);}});}private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken){try{await Task.Delay(_minIdleTime, stoppingToken);await pollingDelay;}catch (OperationCanceledException){}}
}
最后收尾阶段我们老规矩扩展一下IServiceCollection
:
internal static IServiceCollection AddScheduleTask(this IServiceCollection services)
{foreach (var task in ScheduleTasks){services.AddTransient(task);services.AddTransient(typeof(IScheduleTask), task);}//调度器services.AddScheduler<SampleNCrontabScheduler>();//配置文件Store:
ices.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();//BackgroundServiceservices.AddHostedService<ScheduleBackgroundService>();return services;
}
/// <summary>
/// 注册调度器AddScheduler
/// </summary>
public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler
{services.AddSingleton<IScheduler, T>();return services;
}/// <summary>
/// 注册ScheduleMetadataStore
/// </summary>
public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore
{services.AddSingleton<IScheduleMetadataStore, T>();return services;
}
老规矩我们来测试一下:
//通过特性标注的方式执行:
[ScheduleTask(Constants.CronEveryMinute)] //每分钟一次
[ScheduleTask("0/3 * * * *")]//每3分钟执行一次
public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask
{public async Task ExecuteAsync(){//执行5sawait Task.Delay(TimeSpan.FromSeconds(5));logger.LogInformation("keep alive!");}
}
public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
{public Task ExecuteAsync(){logger.LogInformation("Demo Config Schedule Done!");return Task.CompletedTask;}
}
通过配置文件的方式配置Store:
{"BiwenQuickApi": {"Schedules": [{"ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb","Cron": "0/5 * * * *","Description": "Every 5 mins","IsAsync": true,"IsStartOnInit": false},{"ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb","Cron": "0/10 * * * *","Description": "Every 10 mins","IsAsync": false,"IsStartOnInit": true}]}
}
我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:
public class DemoStore : IScheduleMetadataStore
{public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync(){//模拟从数据库或配置文件中获取ScheduleTaskMetadataIEnumerable<ScheduleTaskMetadata> metadatas =[new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2)){Description="测试的Schedule"},];return Task.FromResult(metadatas);}
}
//然后注册这个Store:
builder.Services.AddScheduleMetadataStore<DemoStore>();
所有的一切都大功告成,最后我们来跑一下Demo,成功了
当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!
提供同一时间单一运行中的任务实现
/// <summary>
/// 模拟一个只能同时存在一个的任务.一分钟执行一次,但是耗时两分钟.
/// </summary>
/// <param name="logger"></param>
[ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]
public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask
{public override Task OnAbort(){logger.LogWarning($"[{DateTime.Now}]任务被打断.因为有一个相同的任务正在执行!");return Task.CompletedTask;}public override async Task ExecuteAsync(){var now = DateTime.Now;//模拟一个耗时2分钟的任务await Task.Delay(TimeSpan.FromMinutes(2));logger.LogInformation($"[{now}] ~ {DateTime.Now} 执行一个耗时两分钟的任务!");}
}
源码地址
https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling