分布式任务调度系统,纯NET打造的重量级大数据实时计算平台,万亿级调度经验积累!面向中小企业大数据分析场景。
开源地址:https://github.com/NewLifeX/AntJob
使用教程:https://www.yuque.com/smartstone/blood/antjob
体验地址:http://ant.newlifex.com
功能特点
AntJob的核心是蚂蚁算法:把任意大数据拆分成为小块,采用蚂蚁搬家策略计算每一块!
(蚂蚁搬家,一个馒头掉在地上,众多小蚂蚁会把馒头掰成小块小块往家里般!)
该算法设计于2008年,最开始用于处理基金公司的短信/邮件/传真群发(每批两百万)和电话话费分析(上百种国际长途计费规则),数据量不算大,但是有一定复杂度,并且要求支持持续处理(实时计算)以及出错重试。
2016年在中通快递某产品项目中使用该算法进行大数据实时计算,成功挑战每日1200万的订单。并进一步发展衍生成为重量级实时计算平台,集分布式计算、集群调度、配置中心、负载均衡、故障转移、跨机房冗余、作业监控告警、百亿级数据清洗、超大Redis缓存(>2T)于一身,于2019年达到每年万亿级计算量(2019年双十一日订单量破亿)。
AntJob是开源简化版,仅提供分布式计算和集中调度能力,支持百亿级调度(需要改造)。
AntJob主要功能点:
作业处理器。每一个最小业务模块实现一个处理器类,用于处理这一类作业。例如同步数据表时,每张表写一个处理器类,并在调度中心注册一个作业,调度中心按照作业时间切片得到任务,然后把任务(主要包含时间区间)分派给各个计算节点上的处理器类执行。又如,每天汇总计算是一个作业,而每月汇总计算又是另一个作业;
任务上下文。作业处理器类实例化以后,将反复向调度中心申请任务来执行,每个任务的上下文核心数据是时间区间(数据调度)、时间点(定时调度)、消息体(消息调度)。调度中心记录任务处理结果;
数据切片。支持按照时间区间(如5秒)把大数据切分为小片,也即是数据调度,处理过最大单表60亿行;
定时调度。支持定时执行(秒级)指定业务逻辑,每个执行时间点得到一个任务;
任务重试。每个任务完整记录处理结果,失败任务在延迟一段时间后将会自动重新分派(可能由原节点或其它节点执行);
任务重置。支持批量重置已执行完成的任务,让其再次执行处理;
作业面板。在Web控制台上可查看每个应用所有作业的运行状态,或修改参数;
作业重置。调整作业参数,让其再次处理某段时间的任务数据,例如重算过去一个月的数据;
其它细节功能将穿插在以下各主要功能点中进行讲解。
定时调度
以下源码位于 https://github.com/NewLifeX/AntJob/tree/master/Samples/HisAgent
新建项目
新建.net core 3.1项目,从nuget引用 AntJob。实例化一个调度器Scheduler,配置网络提供者。
using System;
using AntJob;
using AntJob.Providers;
using NewLife.Log;
namespace HisAgent
{class Program{static void Main(string[] args){XTrace.UseConsole();var set = AntSetting.Current;// 实例化调度器var sc = new Scheduler();// 使用分布式调度引擎替换默认的本地文件调度sc.Provider = new NetworkJobProvider{Server = set.Server,AppID = set.AppID,Secret = set.Secret,};// 添加作业处理器sc.Handlers.Add(new HelloJob());// 启动调度引擎,调度器内部多线程处理sc.Start();Console.WriteLine("OK!");Console.ReadKey();}}
}
然后添加第一个定时调度的作业处理器
using System;
using AntJob;
namespace HisAgent
{internal class HelloJob : Handler{public HelloJob(){// 今天零点开始,每10秒一次var job = Job;job.Start = DateTime.Today;job.Step = 10;}protected override Int32 Execute(JobContext ctx){// 当前任务时间var time = ctx.Task.Start;WriteLog("新生命蚂蚁调度系统!当前任务时间:{0}", time);// 成功处理数据量return 1;}}
}
作业处理器必须继承自Handler,并且重写Execute实现业务逻辑。
我们这里的业务逻辑就是输出一行日志,其中的ctx.Task就是切分得到的任务上下文,Start是时间点。
构造函数中设定的开始时间和步进Step,仅用于首次注册作业到调度中心,后面就没有用处了。
为了编译观察,修改项目输出目录,在项目文件上点右键选“编辑项目文件”
<PropertyGroup><OutputType>Exe</OutputType><TargetFramework>netcoreapp3.1</TargetFramework><AssemblyVersion>1.0.*</AssemblyVersion><Deterministic>false</Deterministic><OutputPath>..\..\Bin\HisAgent</OutputPath><AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath>
</PropertyGroup>
编译执行
代码能编译通过,先跑起来看看
可以看到,调度器首先连接 tcp://127.0.0.1:9999,其次 tcp://ant.newlifex.com:9999 ,而上面代码中并没有提及这两个地址。其实这就是调度中心地址,默认本地用于调试,如果链接失败再连接公开版调度中心,位于配置文件中:
/// <summary>蚂蚁配置。主要用于网络型调度系统</summary>
[Config("Ant")]
public class AntSetting : Config<AntSetting>
{#region 属性/// <summary>调试开关。默认false</summary>[Description("调试开关。默认false")]public Boolean Debug { get; set; }/// <summary>调度中心。逗号分隔多地址,主备架构</summary>[Description("调度中心。逗号分隔多地址,主备架构")]public String Server { get; set; } = "tcp://127.0.0.1:9999,tcp://ant.newlifex.com:9999";/// <summary>应用标识。调度中心以此隔离应用,默认当前应用</summary>[Description("应用标识。调度中心以此隔离应用,默认当前应用")]public String AppID { get; set; }/// <summary>应用密钥。</summary>[Description("应用密钥。")]public String Secret { get; set; }#endregion#region 方法/// <summary>重载</summary>protected override void OnLoaded(){if (AppID.IsNullOrEmpty()){var asm = Assembly.GetEntryAssembly();if (asm != null) AppID = asm.GetName().Name;}base.OnLoaded();}#endregion
}
其实上面Main函数中已经看到从配置文件里面读取Server+AppID+Secret,该配置类读取的配置文件在这:
AppID默认取本应用名,Secret由调度中心生成并下发。
调度中心默认打开自动注册AutoRegistry,任意应用登录时自动注册,省去人工配置应用账号的麻烦。
企业内部正式场景使用时,为安全期间,建议关闭自动注册。
再来看看前面跑起来的日志
21:33:08.470 1 N - 启动任务调度引擎[AntJob.Providers.NetworkJobProvider],作业[1]项,定时5秒
21:33:08.471 1 N - HelloJob 开始工作 False 区间(2020-04-09 00:00:00, 0001-01-01 00:00:00) Offset=15 Step=10 MaxTask=8
21:33:08.587 5 Y Job HelloJob 停止工作
21:33:09.467 7 Y T [180.174.185.180:53926]上线!X3
启动了调度引擎,带有一个作业;
作业HelloJob,就是我们通过 sc.Handlers.Add(new HelloJob())
添加进去的作业处理器实例;
HelloJob状态False,处于停止工作状态,那是因为作业注册后,默认都是停止状态,需要去web控制台配置参数后手工开启;
最后一个xxx上线,这是蚂蚁调度的Peers功能,可以探测得到当前应用下所有已连接节点的状态。当HisAgent部署于多个服务器时,每个进程都可以通过Peers得知其它节点的存在;
作业管理
不用关闭HistAgent客户端窗口,我们去线上web控制台看看 http://ant.newlifex.com/
可以看到应用节点在线,点击应用名进去作业面板
这就是我们的HelloJob作业,对应HisAgent中的HelloJob作业处理器。
它处于停用状态,下一次执行时间是 00:00:00 ,也就是今天零点,加上10秒步进,也远小于当前时间,因此,只要启用该作业,调度中心将会马上开始切分任务,并分派给客户去执行。
我们来点击红色叉叉,让它改变为启用状态
几秒后,客户端HisAgent欢快地跑起来!它正在以10秒间隔不断切分并执行任务。
刷新作业面板,可以看到,开始时间已经变为当前附近的时间,右边也有了执行次数。
点击作业名HelloJob,进去查看任务明细
任务切分后,插入作业任务表,此时状态为“就绪”,等待分发给客户端执行。
客户端执行后,向调度中心报告执行结果,可能“完成”,可能“错误”。
错误的任务,会在1分钟后,重新执行,最多连续错误10次。
随系统自动启动
至今我们仍然使用控制台来跑调度程序,怎么样实现稳定可靠的自动化处理呢?
那就必须解决随系统自动启动,以及进程守护(包括Windows和Linux)的问题。
这里推荐 NewLife.Agent,可以把调度程序包装成为一个 Windows服务,或者Linux守护进程,支持看门狗守护。
此处为语雀文档,点击链接查看:https://www.yuque.com/smartstone/nx/agent
多节点部署时,推荐 星尘Stardust 中的星尘代理 StarAgent,调度程序无需修改继续使用控制台,由StarAgent负责拉起进程并守护,同时Stardust支持远程多节点部署以及集中监控。
此处为语雀文档,点击链接查看:https://www.yuque.com/smartstone/blood/stardust
双跑,沸腾吧,分布式计算
再开两个HisAgent进程,查看应用在线表,可以看到有三个节点在线。
HisAgent控制台中,可以看到各自都有机会分配了任务,每个任务有且仅有一个节点执行。
刷新作业HelloJob的任务列表,可以看到不同客户端执行了不同的任务。
调度中心
公开版调度中心 http://ant.newlifex.com 仅用于开发测试,不建议用于生产场景。各企业内部应该自己部署调度中心。
获取AntJob源码 https://github.com/NewLifeX/AntJob ,编译 AntJob.Server,然后跑起来 AntServer.exe
这是一个标准的NewLife.Agent应用,可以选择2安装为Windows服务(需要管理员权限),或者Linux守护进程(需要root权限)。这里仅为了测试,选择5循环调试,直接跑起来核心业务:
可以看到AntServer在tcp/udp/ipv6上监听了9999接口,下方是它所使用的RPC接口。除了前三个内置接口意外,AntJob的接口也就7个,非常简单!
ApiServer的具体内容可参考
此处为语雀文档,点击链接查看:https://www.yuque.com/smartstone/nx/api_server
再启动一个HisAgent
可以看到,它自动连接了本机这个调度中心,因为配置文件Server里面,127写在第一位!
AntJob客户端支持调度中心的故障转移,配置多个服务端,其中一个断开后,自动选择下一个。
配置文件 Config\AntJob.config 很简单,只有端口和自动注册开关。
<?xml version="1.0" encoding="utf-8"?>
<Setting><!--调试开关。默认true--><Debug>true</Debug><!--端口--><Port>9999</Port><!--自动注册。任意应用登录时自动注册,省去人工配置应用账号的麻烦,默认true--><AutoRegistry>true</AutoRegistry>
</Setting>
多年使用经验来看,还没遇到过需要关闭自动注册的情况,毕竟都是在企业内网。
推荐部署两套调度中心,一套Web控制台,共用MySql数据库!
如果服务器足够多,或者为了跨机房,部署4套8套也是可以的。
Web控制台
为了查看作业任务状态,以及调整参数,控制作业启停,需要借助控制台。
获取源码 https://github.com/NewLifeX/AntJob ,编译 AntJob.Web,执行 AntWeb.exe。也可以访问公开版AntJob控制台 http://ant.newlifex.com/ 。
首先可以看到应用管理,点击应用名进去应用面板,管理该应用底下的作业。
双击应用所在行空白处,可查看修改应用信息
应用在线记录每个应用实例(应用可以多跑)的实时状态,应用历史记录操作历史。
任务处理过程中,如果抛出异常,将会上报给调度中心,标记任务为“错误”状态,同时把错误信息记录到作业错误中来。也可以通过应用或作业的快捷方式链接进来。
应用消息用于消息调度,消息生产者把消息推送给调度中心时,就是存储在“应用消息”数据表中,消费的时候取出来,创建消息型任务,并从“应用消息”表中删除。
数据调度
数据调度时AntJob毫无疑问的首席角色,它的使用占比超过70%,可见其重要程度。
为了方便处理大数据,我们需要新建一些辅助项目,数据结构来自某医院。
新建数据集项目
新建 .netstandard2.0 类库项目,nuget引用 NewLife.XCode,准备医院的模型文件:
<?xml version="1.0" encoding="utf-8"?>
<Tables Version="9.16.7398.1902" xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" xs:schemaLocation="http://www.newlifex.com http://www.newlifex.com/Model2020.xsd" NameSpace="HisData" ConnName="His" Output="Entity" BaseClass="Entity" IgnoreNameCase="True" xmlns="http://www.newlifex.com/Model2020.xsd"><Table Name="ZYBH0" Description="病人基本信息" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="Bhid" ColumnName="BHID" DataType="Int32" Master="True" Description="病人ID" /><Column Name="XM" DataType="String" Description="姓名" /><Column Name="Ryrq" ColumnName="RYRQ" DataType="Int32" Description="入院日期" /><Column Name="Cyrq" ColumnName="CYRQ" DataType="Int32" Description="出院日期" /><Column Name="Sfzh" ColumnName="SFZH" DataType="String" Description="身份证号" /><Column Name="FB" DataType="String" Description="费用类别" /><Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /><Column Name="Flag" ColumnName="FLAG" DataType="Int32" Description="标记" /><Column Name="Remark" DataType="String" Length="500" Description="内容" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="BHID" Unique="True" /></Indexes></Table><Table Name="ZYBHYZ0" Description="病人医嘱信息" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="Bhid" ColumnName="BHID" DataType="Int32" Description="病人ID" /><Column Name="Mgroupid" ColumnName="MGROUPID" DataType="Int32" Master="True" Description="医嘱组号" /><Column Name="Kyzrq" ColumnName="KYZRQ" DataType="Int32" Description="开医嘱日期" /><Column Name="Tyzrq" ColumnName="TYZRQ" DataType="Int32" Description="停医嘱日期" /><Column Name="Kyzys" ColumnName="KYZYS" DataType="String" Description="开医嘱医生" /><Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="BHID,MGROUPID" Unique="True" /><Index Columns="BHID" /></Indexes></Table><Table Name="ZYBHYZ1" Description="病人医嘱明细信息" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="Dgroupid" ColumnName="DGROUPID" DataType="Int32" Master="True" Description="医嘱组号" /><Column Name="Yzbm" ColumnName="YZBM" DataType="String" Description="医嘱编码" /><Column Name="Yzmc" ColumnName="YZMC" DataType="String" Description="医嘱名称" /><Column Name="DJ" DataType="Decimal" Description="单价" /><Column Name="SL" DataType="Double" Description="数量" /><Column Name="FY" DataType="Decimal" Description="费用" /><Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="DGROUPID,YZBM" Unique="True" /><Index Columns="DGROUPID" /></Indexes></Table><Table Name="ZYYFQLD" Description="病人药房请领单分月表202001" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="Qlrq" ColumnName="QLRQ" DataType="Int32" Description="请领日期" /><Column Name="Qlsj" ColumnName="QLSJ" DataType="Int32" Description="请领时间" /><Column Name="Ksbm" ColumnName="KSBM" DataType="String" Description="请领科室" /><Column Name="Yzgroupid" ColumnName="YZGROUPID" DataType="Int32" Description="医嘱ID" /><Column Name="Bhid" ColumnName="BHID" DataType="Int32" Description="病人ID" /><Column Name="Yzbm" ColumnName="YZBM" DataType="String" Description="药品编码" /><Column Name="DJ" DataType="Decimal" Description="单价" /><Column Name="SL" DataType="Double" Description="请领数量" /><Column Name="Yfbm" ColumnName="YFBM" DataType="String" Description="发药药房" /><Column Name="Fyrq" ColumnName="FYRQ" DataType="Int32" Description="发药日期" /><Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /><Column Name="Remark" DataType="String" Length="500" Description="内容" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="BHID" /></Indexes></Table><Table Name="ZDSF" Description="收费字典" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="BM" DataType="String" Master="True" Nullable="False" Description="编码" /><Column Name="DH" DataType="String" Description="拼音码" /><Column Name="MC" DataType="String" Description="名称" /><Column Name="DJ" DataType="Decimal" Description="单价" /><Column Name="DW" DataType="String" Description="单位" /><Column Name="Mzyflb" ColumnName="MZYFLB" DataType="Int32" Description="门诊费用类别" /><Column Name="Zyfylb" ColumnName="ZYFYLB" DataType="Int32" Description="住院费用类别" /><Column Name="Zfbl" ColumnName="ZFBL" DataType="Double" Description="自费比例" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="BM" Unique="True" /></Indexes></Table>
</Tables>
再去找一个 build_netcore.tt 的T4模板,可以这里下载 http://x.newlifex.com/XCode_BuildModel.zip 。也可以从AntJob.Web项目中拷贝一个。
记得把build_netcore.tt在vs文件属性的自定义工具设置为TextTemplatingFileGenerator。
由于netstandard项目输出目录中不包括XCode.dll等引用程序集,因此build.tt需要改一下
<#@ template language="C#" hostSpecific="true" debug="true" #>
<#@ assembly name="netstandard" #>
<#@ assembly name="$(ProjectDir)\..\..\DLL\NewLife.Core.dll" #>
<#@ assembly name="$(ProjectDir)\..\..\DLL\XCode.dll" #>
<#@ import namespace="System.Diagnostics" #>
<#@ import namespace="System.IO" #>
<#@ import namespace="XCode.Code" #>
<#@ output extension=".log" #>
<#// 设置当前工作目录PathHelper.BasePath = Host.ResolvePath(".");// 导入模型文件并生成实体类,模型文件、输出目录、命名空间、连接名、中文文件名、表名字段名大小写//EntityBuilder.Build(String xmlFile = null, String output = null, String nameSpace = null, String connName = null, Boolean? chineseFileName = true,Boolean? nameIgnoreCase = null);EntityBuilder.Build();//var tables = DAL.ImportFrom("Company.Project.xml");//EntityBuilder.Build(tables);
#>
我们从AntJob.Web中拷贝两个文件 NewLife.Core.dll 和 XCode.dll 到外面的DLL目录中,供build.tt调用。如果实际目录不同,可以修改build.tt文件的指向。
在build.tt文件上右键,执行自定义工具,即可生成一批实体类。
编译通过
新建Web项目
新建 .netcore3.1 的web项目,Nuget引用 NewLife.Cube.Core,并引用项目HisData。
该Web项目主要用于查看和管理那些数据表的数据。
修改Main函数,增加 XTrace.UseConsole,用于拦截所有日志
public class Program
{public static void Main(string[] args){XTrace.UseConsole();CreateHostBuilder(args).Build().Run();}public static IHostBuilder CreateHostBuilder(string[] args) =>Host.CreateDefaultBuilder(args).ConfigureWebHostDefaults(webBuilder =>{webBuilder.UseStartup<Startup>();});
}
Startup.cs 中引用魔方,services.AddCube()/app.UseCube()
编译运行,浏览器访问 http://localhost:5000/Admin
可以看到魔方跑起来了,但是还没有我们需要的数据页面。
新建His控制器区域
区域文件内容:
using System;
using System.ComponentModel;
using NewLife.Cube;
namespace HisWeb.Areas.His
{[DisplayName("医院管理")]public class HisArea : AreaBase{public HisArea() : base(nameof(HisArea).TrimEnd("Area")) { }static HisArea() => RegisterArea<HisArea>();}
}
为每个实体类新建一个控制器,如下
using HisData;
using NewLife.Cube;
namespace HisWeb.Areas.His.Controllers
{[HisArea]public class ZYBH0Controller : EntityController<ZYBH0>{static ZYBH0Controller() => MenuOrder = 100;}
}
编译项目,执行 HisWeb.exe,刷新浏览器页面,即可看到每张数据表对应了一个页面。
生成病人数据
在引用项目HisData。
新建一个作业处理器 BuildPatient 用于随机生成病人
using System;
using System.Collections.Generic;
using AntJob;
using HisData;
using NewLife.Security;
using XCode;
namespace HisAgent
{internal class BuildPatient : Handler{public BuildPatient(){var job = Job;job.Start = DateTime.Today;job.Step = 15;}protected override Int32 Execute(JobContext ctx){// 随机造几个病人var count = Rand.Next(1, 9);var list = new List<ZYBH0>();for (var i = 0; i < count; i++){var time = DateTime.Now.AddSeconds(Rand.Next(-30 * 24 * 3600, 0));var time2 = time.AddSeconds(Rand.Next(3600, 10 * 24 * 3600));var pi = new ZYBH0{Bhid = Rand.Next(999999),XM = Rand.NextString(8),Ryrq = time,Cyrq = time2,Sfzh = Rand.NextString(18),FB = Rand.NextString(6),State = Rand.Next(8),Flag = Rand.Next(2),};list.Add(pi);}list.Insert(true);// 成功处理数据量return count;}}
}
主函数中把该处理器添加到调度器
编译运行,HisAgent将在控制台新增一个作业,把它启用
很快,作业处理器就跑起来了
每次定时任务所添加病人数时随机的,我们通过Execute返回,记录着控制台作业任务表的“成功”字段。
去HisWeb中看看数据
很不幸,啥也没有……
原来,我们并没有配置数据库连接字符串,各个应用就会默认使用SQLite数据库,位于自己目录中,HisAgent生成的数据,HisWeb自然就无法访问了。
修改HisWeb输出目录,让它跟HisAgent并排
<PropertyGroup><TargetFramework>netcoreapp3.1</TargetFramework><AssemblyVersion>1.0.*</AssemblyVersion><Deterministic>false</Deterministic><OutputPath>..\..\Bin\HisWeb</OutputPath><AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath></PropertyGroup>
修改配置文件appsettings.json给的链接字符串
{"Logging": {"LogLevel": {"Default": "Information","Microsoft": "Warning","Microsoft.Hosting.Lifetime": "Information"}},"AllowedHosts": "*","ConnectionStrings": {"His": {"connectionString": "Data Source=..\\Hisagent\\Data\\His.db","providerName": "SQLite"}}
}
重新跑起来后,成功看到病人数据
清洗病人数据
由于HisAgent需要使用数据调度,除了AntJob,我们还需要从nuget引用AntJob.Extensions。
这一次,我们来实时消费病人数据,为其生成医嘱,高仿数据清洗过程。
新增生成医嘱的作业处理器 BuildWill
using System;
using AntJob;
using HisData;
using NewLife.Security;
using XCode;
namespace HisAgent
{class BuildWill : DataHandler{public BuildWill(){var job = Job;job.Start = DateTime.Today;job.Step = 30;}public override Boolean Start(){// 指定要抽取数据的实体类以及时间字段Factory = ZYBH0.Meta.Factory;Field = ZYBH0._.CreateTime;return base.Start();}protected override Boolean ProcessItem(JobContext ctx, IEntity entity){var pi = entity as ZYBH0;// 创建医嘱信息var will = new ZYBHYZ0{Bhid = pi.Bhid,Mgroupid = Rand.Next(9999),Kyzrq = pi.Ryrq.AddHours(1),Tyzrq = pi.Cyrq.AddHours(-3),Kyzys = Rand.NextString(8),State = pi.State,};will.Insert();return true;}}
}
数据调度的处理器基类是DataHandler,并且需要在Start之前指定实体工厂以及时间字段。调度系统将会从该表抽取数据,根据调度中心分派的时间区间(StartTime+EndTime),对时间字段进行查询。
处理函数ProcessItem就是业务核心代码了,也可以重写Execute,实现批量处理。
从今天零点开始消费处理数据,步进30秒,也就是每次抽取30秒的数据来分析处理。
不要忘了在Main中把该处理器加入调度器。
跑起来,去控制台启用作业
可以看到,在(00:30:00, 00:30:30)区间内,得到4个病人,创建了4个医嘱。
运行HisWeb查看数据
消息调度
设计概要
计算型应用(继承Handler)
系统架构
调度中心主从架构