蚂蚁调度AntJob-分布式任务调度系统

分布式任务调度系统,纯NET打造的重量级大数据实时计算平台,万亿级调度经验积累!面向中小企业大数据分析场景。

开源地址:https://github.com/NewLifeX/AntJob

使用教程:https://www.yuque.com/smartstone/blood/antjob

体验地址:http://ant.newlifex.com

功能特点

AntJob的核心是蚂蚁算法把任意大数据拆分成为小块,采用蚂蚁搬家策略计算每一块!

(蚂蚁搬家,一个馒头掉在地上,众多小蚂蚁会把馒头掰成小块小块往家里般!)

该算法设计于2008年,最开始用于处理基金公司的短信/邮件/传真群发(每批两百万)和电话话费分析(上百种国际长途计费规则),数据量不算大,但是有一定复杂度,并且要求支持持续处理(实时计算)以及出错重试。

2016年在中通快递某产品项目中使用该算法进行大数据实时计算,成功挑战每日1200万的订单。并进一步发展衍生成为重量级实时计算平台,集分布式计算、集群调度、配置中心、负载均衡、故障转移、跨机房冗余、作业监控告警、百亿级数据清洗、超大Redis缓存(>2T)于一身,于2019年达到每年万亿级计算量(2019年双十一日订单量破亿)。

AntJob是开源简化版,仅提供分布式计算和集中调度能力,支持百亿级调度(需要改造)。

AntJob主要功能点:

  1. 作业处理器。每一个最小业务模块实现一个处理器类,用于处理这一类作业。例如同步数据表时,每张表写一个处理器类,并在调度中心注册一个作业,调度中心按照作业时间切片得到任务,然后把任务(主要包含时间区间)分派给各个计算节点上的处理器类执行。又如,每天汇总计算是一个作业,而每月汇总计算又是另一个作业;

  2. 任务上下文。作业处理器类实例化以后,将反复向调度中心申请任务来执行,每个任务的上下文核心数据是时间区间(数据调度)、时间点(定时调度)、消息体(消息调度)。调度中心记录任务处理结果;

  3. 数据切片。支持按照时间区间(如5秒)把大数据切分为小片,也即是数据调度,处理过最大单表60亿行;

  4. 定时调度。支持定时执行(秒级)指定业务逻辑,每个执行时间点得到一个任务;

  5. 任务重试。每个任务完整记录处理结果,失败任务在延迟一段时间后将会自动重新分派(可能由原节点或其它节点执行);

  6. 任务重置。支持批量重置已执行完成的任务,让其再次执行处理;

  7. 作业面板。在Web控制台上可查看每个应用所有作业的运行状态,或修改参数;

  8. 作业重置。调整作业参数,让其再次处理某段时间的任务数据,例如重算过去一个月的数据;

其它细节功能将穿插在以下各主要功能点中进行讲解。

定时调度

以下源码位于 https://github.com/NewLifeX/AntJob/tree/master/Samples/HisAgent 

新建项目

新建.net core 3.1项目,从nuget引用 AntJob。实例化一个调度器Scheduler,配置网络提供者。

using System;
using AntJob;
using AntJob.Providers;
using NewLife.Log;
namespace HisAgent
{class Program{static void Main(string[] args){XTrace.UseConsole();var set = AntSetting.Current;// 实例化调度器var sc = new Scheduler();// 使用分布式调度引擎替换默认的本地文件调度sc.Provider = new NetworkJobProvider{Server = set.Server,AppID = set.AppID,Secret = set.Secret,};// 添加作业处理器sc.Handlers.Add(new HelloJob());// 启动调度引擎,调度器内部多线程处理sc.Start();Console.WriteLine("OK!");Console.ReadKey();}}
}

然后添加第一个定时调度的作业处理器

using System;
using AntJob;
namespace HisAgent
{internal class HelloJob : Handler{public HelloJob(){// 今天零点开始,每10秒一次var job = Job;job.Start = DateTime.Today;job.Step = 10;}protected override Int32 Execute(JobContext ctx){// 当前任务时间var time = ctx.Task.Start;WriteLog("新生命蚂蚁调度系统!当前任务时间:{0}", time);// 成功处理数据量return 1;}}
}

作业处理器必须继承自Handler,并且重写Execute实现业务逻辑。

我们这里的业务逻辑就是输出一行日志,其中的ctx.Task就是切分得到的任务上下文,Start是时间点。

构造函数中设定的开始时间和步进Step,仅用于首次注册作业到调度中心,后面就没有用处了。

为了编译观察,修改项目输出目录,在项目文件上点右键选“编辑项目文件”

<PropertyGroup><OutputType>Exe</OutputType><TargetFramework>netcoreapp3.1</TargetFramework><AssemblyVersion>1.0.*</AssemblyVersion><Deterministic>false</Deterministic><OutputPath>..\..\Bin\HisAgent</OutputPath><AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath>
</PropertyGroup>

编译执行

代码能编译通过,先跑起来看看

可以看到,调度器首先连接 tcp://127.0.0.1:9999,其次 tcp://ant.newlifex.com:9999 ,而上面代码中并没有提及这两个地址。其实这就是调度中心地址,默认本地用于调试,如果链接失败再连接公开版调度中心,位于配置文件中:

/// <summary>蚂蚁配置。主要用于网络型调度系统</summary>
[Config("Ant")]
public class AntSetting : Config<AntSetting>
{#region 属性/// <summary>调试开关。默认false</summary>[Description("调试开关。默认false")]public Boolean Debug { get; set; }/// <summary>调度中心。逗号分隔多地址,主备架构</summary>[Description("调度中心。逗号分隔多地址,主备架构")]public String Server { get; set; } = "tcp://127.0.0.1:9999,tcp://ant.newlifex.com:9999";/// <summary>应用标识。调度中心以此隔离应用,默认当前应用</summary>[Description("应用标识。调度中心以此隔离应用,默认当前应用")]public String AppID { get; set; }/// <summary>应用密钥。</summary>[Description("应用密钥。")]public String Secret { get; set; }#endregion#region 方法/// <summary>重载</summary>protected override void OnLoaded(){if (AppID.IsNullOrEmpty()){var asm = Assembly.GetEntryAssembly();if (asm != null) AppID = asm.GetName().Name;}base.OnLoaded();}#endregion
}

其实上面Main函数中已经看到从配置文件里面读取Server+AppID+Secret,该配置类读取的配置文件在这:

AppID默认取本应用名,Secret由调度中心生成并下发。

调度中心默认打开自动注册AutoRegistry,任意应用登录时自动注册,省去人工配置应用账号的麻烦。

企业内部正式场景使用时,为安全期间,建议关闭自动注册。

再来看看前面跑起来的日志

21:33:08.470  1 N - 启动任务调度引擎[AntJob.Providers.NetworkJobProvider],作业[1]项,定时5秒
21:33:08.471  1 N - HelloJob 开始工作 False 区间(2020-04-09 00:00:00, 0001-01-01 00:00:00) Offset=15 Step=10 MaxTask=8
21:33:08.587  5 Y Job HelloJob 停止工作
21:33:09.467  7 Y T [180.174.185.180:53926]上线!X3

启动了调度引擎,带有一个作业;

作业HelloJob,就是我们通过 sc.Handlers.Add(new HelloJob()) 添加进去的作业处理器实例;

HelloJob状态False,处于停止工作状态,那是因为作业注册后,默认都是停止状态,需要去web控制台配置参数后手工开启;

最后一个xxx上线,这是蚂蚁调度的Peers功能,可以探测得到当前应用下所有已连接节点的状态。当HisAgent部署于多个服务器时,每个进程都可以通过Peers得知其它节点的存在;

作业管理

不用关闭HistAgent客户端窗口,我们去线上web控制台看看 http://ant.newlifex.com/

可以看到应用节点在线,点击应用名进去作业面板

这就是我们的HelloJob作业,对应HisAgent中的HelloJob作业处理器。

它处于停用状态,下一次执行时间是 00:00:00 ,也就是今天零点,加上10秒步进,也远小于当前时间,因此,只要启用该作业,调度中心将会马上开始切分任务,并分派给客户去执行。

我们来点击红色叉叉,让它改变为启用状态

几秒后,客户端HisAgent欢快地跑起来!它正在以10秒间隔不断切分并执行任务。

刷新作业面板,可以看到,开始时间已经变为当前附近的时间,右边也有了执行次数。

点击作业名HelloJob,进去查看任务明细

任务切分后,插入作业任务表,此时状态为“就绪”,等待分发给客户端执行。

客户端执行后,向调度中心报告执行结果,可能“完成”,可能“错误”。

错误的任务,会在1分钟后,重新执行,最多连续错误10次。

随系统自动启动

至今我们仍然使用控制台来跑调度程序,怎么样实现稳定可靠的自动化处理呢?

那就必须解决随系统自动启动,以及进程守护(包括Windows和Linux)的问题。

这里推荐 NewLife.Agent,可以把调度程序包装成为一个 Windows服务,或者Linux守护进程,支持看门狗守护。

此处为语雀文档,点击链接查看:https://www.yuque.com/smartstone/nx/agent

多节点部署时,推荐 星尘Stardust 中的星尘代理 StarAgent,调度程序无需修改继续使用控制台,由StarAgent负责拉起进程并守护,同时Stardust支持远程多节点部署以及集中监控。

此处为语雀文档,点击链接查看:https://www.yuque.com/smartstone/blood/stardust

双跑,沸腾吧,分布式计算

再开两个HisAgent进程,查看应用在线表,可以看到有三个节点在线。

HisAgent控制台中,可以看到各自都有机会分配了任务,每个任务有且仅有一个节点执行。

刷新作业HelloJob的任务列表,可以看到不同客户端执行了不同的任务。

调度中心

公开版调度中心 http://ant.newlifex.com 仅用于开发测试,不建议用于生产场景。各企业内部应该自己部署调度中心。

获取AntJob源码 https://github.com/NewLifeX/AntJob ,编译 AntJob.Server,然后跑起来 AntServer.exe

这是一个标准的NewLife.Agent应用,可以选择2安装为Windows服务(需要管理员权限),或者Linux守护进程(需要root权限)。这里仅为了测试,选择5循环调试,直接跑起来核心业务:

可以看到AntServer在tcp/udp/ipv6上监听了9999接口,下方是它所使用的RPC接口。除了前三个内置接口意外,AntJob的接口也就7个,非常简单!

ApiServer的具体内容可参考

此处为语雀文档,点击链接查看:https://www.yuque.com/smartstone/nx/api_server

再启动一个HisAgent

可以看到,它自动连接了本机这个调度中心,因为配置文件Server里面,127写在第一位!

AntJob客户端支持调度中心的故障转移,配置多个服务端,其中一个断开后,自动选择下一个。

配置文件 Config\AntJob.config 很简单,只有端口和自动注册开关。

<?xml version="1.0" encoding="utf-8"?>
<Setting><!--调试开关。默认true--><Debug>true</Debug><!--端口--><Port>9999</Port><!--自动注册。任意应用登录时自动注册,省去人工配置应用账号的麻烦,默认true--><AutoRegistry>true</AutoRegistry>
</Setting>

多年使用经验来看,还没遇到过需要关闭自动注册的情况,毕竟都是在企业内网。

推荐部署两套调度中心,一套Web控制台,共用MySql数据库!

如果服务器足够多,或者为了跨机房,部署4套8套也是可以的。

Web控制台

为了查看作业任务状态,以及调整参数,控制作业启停,需要借助控制台。

获取源码 https://github.com/NewLifeX/AntJob ,编译 AntJob.Web,执行 AntWeb.exe。也可以访问公开版AntJob控制台 http://ant.newlifex.com/ 。

首先可以看到应用管理,点击应用名进去应用面板,管理该应用底下的作业。

双击应用所在行空白处,可查看修改应用信息

应用在线记录每个应用实例(应用可以多跑)的实时状态,应用历史记录操作历史。

任务处理过程中,如果抛出异常,将会上报给调度中心,标记任务为“错误”状态,同时把错误信息记录到作业错误中来。也可以通过应用或作业的快捷方式链接进来。

应用消息用于消息调度,消息生产者把消息推送给调度中心时,就是存储在“应用消息”数据表中,消费的时候取出来,创建消息型任务,并从“应用消息”表中删除。

数据调度

数据调度时AntJob毫无疑问的首席角色,它的使用占比超过70%,可见其重要程度。

为了方便处理大数据,我们需要新建一些辅助项目,数据结构来自某医院。

新建数据集项目

新建 .netstandard2.0 类库项目,nuget引用 NewLife.XCode,准备医院的模型文件:

<?xml version="1.0" encoding="utf-8"?>
<Tables Version="9.16.7398.1902" xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" xs:schemaLocation="http://www.newlifex.com http://www.newlifex.com/Model2020.xsd" NameSpace="HisData" ConnName="His" Output="Entity" BaseClass="Entity" IgnoreNameCase="True" xmlns="http://www.newlifex.com/Model2020.xsd"><Table Name="ZYBH0" Description="病人基本信息" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="Bhid" ColumnName="BHID" DataType="Int32" Master="True" Description="病人ID" /><Column Name="XM" DataType="String" Description="姓名" /><Column Name="Ryrq" ColumnName="RYRQ" DataType="Int32" Description="入院日期" /><Column Name="Cyrq" ColumnName="CYRQ" DataType="Int32" Description="出院日期" /><Column Name="Sfzh" ColumnName="SFZH" DataType="String" Description="身份证号" /><Column Name="FB" DataType="String" Description="费用类别" /><Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /><Column Name="Flag" ColumnName="FLAG" DataType="Int32" Description="标记" /><Column Name="Remark" DataType="String" Length="500" Description="内容" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="BHID" Unique="True" /></Indexes></Table><Table Name="ZYBHYZ0" Description="病人医嘱信息" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="Bhid" ColumnName="BHID" DataType="Int32" Description="病人ID" /><Column Name="Mgroupid" ColumnName="MGROUPID" DataType="Int32" Master="True" Description="医嘱组号" /><Column Name="Kyzrq" ColumnName="KYZRQ" DataType="Int32" Description="开医嘱日期" /><Column Name="Tyzrq" ColumnName="TYZRQ" DataType="Int32" Description="停医嘱日期" /><Column Name="Kyzys" ColumnName="KYZYS" DataType="String" Description="开医嘱医生" /><Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="BHID,MGROUPID" Unique="True" /><Index Columns="BHID" /></Indexes></Table><Table Name="ZYBHYZ1" Description="病人医嘱明细信息" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="Dgroupid" ColumnName="DGROUPID" DataType="Int32" Master="True" Description="医嘱组号" /><Column Name="Yzbm" ColumnName="YZBM" DataType="String" Description="医嘱编码" /><Column Name="Yzmc" ColumnName="YZMC" DataType="String" Description="医嘱名称" /><Column Name="DJ" DataType="Decimal" Description="单价" /><Column Name="SL" DataType="Double" Description="数量" /><Column Name="FY" DataType="Decimal" Description="费用" /><Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="DGROUPID,YZBM" Unique="True" /><Index Columns="DGROUPID" /></Indexes></Table><Table Name="ZYYFQLD" Description="病人药房请领单分月表202001" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="Qlrq" ColumnName="QLRQ" DataType="Int32" Description="请领日期" /><Column Name="Qlsj" ColumnName="QLSJ" DataType="Int32" Description="请领时间" /><Column Name="Ksbm" ColumnName="KSBM" DataType="String" Description="请领科室" /><Column Name="Yzgroupid" ColumnName="YZGROUPID" DataType="Int32" Description="医嘱ID" /><Column Name="Bhid" ColumnName="BHID" DataType="Int32" Description="病人ID" /><Column Name="Yzbm" ColumnName="YZBM" DataType="String" Description="药品编码" /><Column Name="DJ" DataType="Decimal" Description="单价" /><Column Name="SL" DataType="Double" Description="请领数量" /><Column Name="Yfbm" ColumnName="YFBM" DataType="String" Description="发药药房" /><Column Name="Fyrq" ColumnName="FYRQ" DataType="Int32" Description="发药日期" /><Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /><Column Name="Remark" DataType="String" Length="500" Description="内容" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="BHID" /></Indexes></Table><Table Name="ZDSF" Description="收费字典" IgnoreNameCase="False"><Columns><Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /><Column Name="BM" DataType="String" Master="True" Nullable="False" Description="编码" /><Column Name="DH" DataType="String" Description="拼音码" /><Column Name="MC" DataType="String" Description="名称" /><Column Name="DJ" DataType="Decimal" Description="单价" /><Column Name="DW" DataType="String" Description="单位" /><Column Name="Mzyflb" ColumnName="MZYFLB" DataType="Int32" Description="门诊费用类别" /><Column Name="Zyfylb" ColumnName="ZYFYLB" DataType="Int32" Description="住院费用类别" /><Column Name="Zfbl" ColumnName="ZFBL" DataType="Double" Description="自费比例" /><Column Name="CreateUser" DataType="String" Description="创建者" /><Column Name="CreateUserID" DataType="Int32" Description="创建者" /><Column Name="CreateTime" DataType="DateTime" Description="创建时间" /><Column Name="CreateIP" DataType="String" Description="创建地址" /><Column Name="UpdateUser" DataType="String" Description="更新者" /><Column Name="UpdateUserID" DataType="Int32" Description="更新者" /><Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /><Column Name="UpdateIP" DataType="String" Description="更新地址" /></Columns><Indexes><Index Columns="BM" Unique="True" /></Indexes></Table>
</Tables>

再去找一个 build_netcore.tt 的T4模板,可以这里下载 http://x.newlifex.com/XCode_BuildModel.zip 。也可以从AntJob.Web项目中拷贝一个。

记得把build_netcore.tt在vs文件属性的自定义工具设置为TextTemplatingFileGenerator。

由于netstandard项目输出目录中不包括XCode.dll等引用程序集,因此build.tt需要改一下

<#@ template language="C#" hostSpecific="true" debug="true" #>
<#@ assembly name="netstandard" #>
<#@ assembly name="$(ProjectDir)\..\..\DLL\NewLife.Core.dll" #>
<#@ assembly name="$(ProjectDir)\..\..\DLL\XCode.dll" #>
<#@ import namespace="System.Diagnostics" #>
<#@ import namespace="System.IO" #>
<#@ import namespace="XCode.Code" #>
<#@ output extension=".log" #>
<#// 设置当前工作目录PathHelper.BasePath = Host.ResolvePath(".");// 导入模型文件并生成实体类,模型文件、输出目录、命名空间、连接名、中文文件名、表名字段名大小写//EntityBuilder.Build(String xmlFile = null, String output = null, String nameSpace = null, String connName = null, Boolean? chineseFileName = true,Boolean? nameIgnoreCase = null);EntityBuilder.Build();//var tables = DAL.ImportFrom("Company.Project.xml");//EntityBuilder.Build(tables);
#>

我们从AntJob.Web中拷贝两个文件 NewLife.Core.dll 和 XCode.dll 到外面的DLL目录中,供build.tt调用。如果实际目录不同,可以修改build.tt文件的指向。

在build.tt文件上右键,执行自定义工具,即可生成一批实体类。

编译通过

新建Web项目

新建 .netcore3.1 的web项目,Nuget引用 NewLife.Cube.Core,并引用项目HisData。

该Web项目主要用于查看和管理那些数据表的数据。

修改Main函数,增加 XTrace.UseConsole,用于拦截所有日志

public class Program
{public static void Main(string[] args){XTrace.UseConsole();CreateHostBuilder(args).Build().Run();}public static IHostBuilder CreateHostBuilder(string[] args) =>Host.CreateDefaultBuilder(args).ConfigureWebHostDefaults(webBuilder =>{webBuilder.UseStartup<Startup>();});
}

Startup.cs 中引用魔方,services.AddCube()/app.UseCube()

编译运行,浏览器访问 http://localhost:5000/Admin

可以看到魔方跑起来了,但是还没有我们需要的数据页面。

新建His控制器区域

区域文件内容:

using System;
using System.ComponentModel;
using NewLife.Cube;
namespace HisWeb.Areas.His
{[DisplayName("医院管理")]public class HisArea : AreaBase{public HisArea() : base(nameof(HisArea).TrimEnd("Area")) { }static HisArea() => RegisterArea<HisArea>();}
}

为每个实体类新建一个控制器,如下

using HisData;
using NewLife.Cube;
namespace HisWeb.Areas.His.Controllers
{[HisArea]public class ZYBH0Controller : EntityController<ZYBH0>{static ZYBH0Controller() => MenuOrder = 100;}
}

编译项目,执行 HisWeb.exe,刷新浏览器页面,即可看到每张数据表对应了一个页面。

生成病人数据

在引用项目HisData。

新建一个作业处理器 BuildPatient 用于随机生成病人

using System;
using System.Collections.Generic;
using AntJob;
using HisData;
using NewLife.Security;
using XCode;
namespace HisAgent
{internal class BuildPatient : Handler{public BuildPatient(){var job = Job;job.Start = DateTime.Today;job.Step = 15;}protected override Int32 Execute(JobContext ctx){// 随机造几个病人var count = Rand.Next(1, 9);var list = new List<ZYBH0>();for (var i = 0; i < count; i++){var time = DateTime.Now.AddSeconds(Rand.Next(-30 * 24 * 3600, 0));var time2 = time.AddSeconds(Rand.Next(3600, 10 * 24 * 3600));var pi = new ZYBH0{Bhid = Rand.Next(999999),XM = Rand.NextString(8),Ryrq = time,Cyrq = time2,Sfzh = Rand.NextString(18),FB = Rand.NextString(6),State = Rand.Next(8),Flag = Rand.Next(2),};list.Add(pi);}list.Insert(true);// 成功处理数据量return count;}}
}

主函数中把该处理器添加到调度器

编译运行,HisAgent将在控制台新增一个作业,把它启用

很快,作业处理器就跑起来了

每次定时任务所添加病人数时随机的,我们通过Execute返回,记录着控制台作业任务表的“成功”字段。

去HisWeb中看看数据

很不幸,啥也没有……

原来,我们并没有配置数据库连接字符串,各个应用就会默认使用SQLite数据库,位于自己目录中,HisAgent生成的数据,HisWeb自然就无法访问了。

修改HisWeb输出目录,让它跟HisAgent并排

  <PropertyGroup><TargetFramework>netcoreapp3.1</TargetFramework><AssemblyVersion>1.0.*</AssemblyVersion><Deterministic>false</Deterministic><OutputPath>..\..\Bin\HisWeb</OutputPath><AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath></PropertyGroup>

修改配置文件appsettings.json给的链接字符串

{"Logging": {"LogLevel": {"Default": "Information","Microsoft": "Warning","Microsoft.Hosting.Lifetime": "Information"}},"AllowedHosts": "*","ConnectionStrings": {"His": {"connectionString": "Data Source=..\\Hisagent\\Data\\His.db","providerName": "SQLite"}}
}

重新跑起来后,成功看到病人数据

清洗病人数据

由于HisAgent需要使用数据调度,除了AntJob,我们还需要从nuget引用AntJob.Extensions。

这一次,我们来实时消费病人数据,为其生成医嘱,高仿数据清洗过程。

新增生成医嘱的作业处理器 BuildWill

using System;
using AntJob;
using HisData;
using NewLife.Security;
using XCode;
namespace HisAgent
{class BuildWill : DataHandler{public BuildWill(){var job = Job;job.Start = DateTime.Today;job.Step = 30;}public override Boolean Start(){// 指定要抽取数据的实体类以及时间字段Factory = ZYBH0.Meta.Factory;Field = ZYBH0._.CreateTime;return base.Start();}protected override Boolean ProcessItem(JobContext ctx, IEntity entity){var pi = entity as ZYBH0;// 创建医嘱信息var will = new ZYBHYZ0{Bhid = pi.Bhid,Mgroupid = Rand.Next(9999),Kyzrq = pi.Ryrq.AddHours(1),Tyzrq = pi.Cyrq.AddHours(-3),Kyzys = Rand.NextString(8),State = pi.State,};will.Insert();return true;}}
}

数据调度的处理器基类是DataHandler,并且需要在Start之前指定实体工厂以及时间字段。调度系统将会从该表抽取数据,根据调度中心分派的时间区间(StartTime+EndTime),对时间字段进行查询。

处理函数ProcessItem就是业务核心代码了,也可以重写Execute,实现批量处理。

从今天零点开始消费处理数据,步进30秒,也就是每次抽取30秒的数据来分析处理。

不要忘了在Main中把该处理器加入调度器。

跑起来,去控制台启用作业

可以看到,在(00:30:00, 00:30:30)区间内,得到4个病人,创建了4个医嘱。

运行HisWeb查看数据

消息调度

设计概要

计算型应用(继承Handler)

系统架构

调度中心主从架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/306087.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c语言怎么让图形界面单独显示,「分享」C语言如何编写图形界面

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼贴吧内经常有人问C语言是不是只能用于字符终端界面开发&#xff0c;不能用于图形界面。大家也都有回答&#xff0c;需要其他的库。MFC&#xff0c;GTK&#xff0c;QT。本人近期刚用GTK库加上纯C写成了第一个LINUX实用程序。现在与大…

如何在 ASP.NET Core 中 自定义中间件

ASP.NET Core 是一个跨平台&#xff0c;开源的&#xff0c;轻量级&#xff0c;高性能 并且高度模块化的web框架&#xff0c;同时扩展性也是非常强&#xff0c;你可以在 request -> response 请求管道中安插各种中间件来根据自己的场景定制化&#xff0c;比如说&#xff1a;监…

ASP.NET Core Authentication and Authorization

最近把一个Asp .net core 2.0的项目迁移到Asp .net core 3.1&#xff0c;项目启动的时候直接报错:InvalidOperationException: Endpoint CoreAuthorization.Controllers.HomeController.Index (CoreAuthorization) contains authorization metadata, but a middleware was not …

android dialog 自定义布局,如何设置AlertDialog的自定义布局?

调用我的对话框:alertDialog showInfoDialog(message "$wrongPasscodeMessage\n$retryMessage")方法如下:fun FragmentActivity.showInfoDialog(message: String?): AlertDialog? {return try {val customLayout layoutInflater.inflate(R.layout.custom_layout…

android 打开谷歌导航,国内开启google位置记录功能/android版google maps 7+上,恢复位置记录功能在国内使用(需root)...

android版google 地图在 7以后的版本上&#xff0c;位置记录功能在国内不能用了&#xff0c;提示本功能不能在中国使用。至少对本人&#xff0c;“位置记录”功能是非常有用的功能&#xff0c;尤其是骑车出行时记录自己的路线。目前还没找到替代产品。之前一段时间内恢复回旧版…

程序员过关斩将--少年派登录安全的奇幻遐想

“据说&#xff0c;这篇也是快餐&#xff0c;完全符合年轻人口味说到登录&#xff0c;无人不知无人不晓。每一个有用户体系的相关系统都会有登录的入口&#xff0c;登录是为了确认操作人的正确性。说到登录安全&#xff0c;其实是一个很伟大的命题&#xff0c;不过常用的手段也…

C# 9 新特性 —— 增强的 foreach

C# 9 新特性 —— 增强的 foreachIntro在 C# 9 中增强了 foreach 的使用&#xff0c;使得一切对象都有 foreach 的可能我们来看一段代码&#xff0c;这里我们试图遍历一个 int 类型的值思考一下&#xff0c;我们可以怎么做使得上面的代码编译通过呢&#xff1f;迭代器模式迭代器…

android系统休眠发广播,Android - BroadcastReceiver

BroadcastReceiverBroadcastReceiver&#xff0c;广播接收者&#xff0c;用来接收系统和应用的广播&#xff0c;并做出相应的处理&#xff0c;如电量过低时提示用户充电等&#xff1b;BroadcastReceiver 是 Android 的四大组件之一&#xff0c;分为 普通广播、有序广播、粘性广…

开源·共享·创新|2020年中国.NET开发者大会圆满收官!

“疫情无限续费”的2020年&#xff0c;对于14亿中国人而言&#xff0c;是必须习惯口罩长在来脸上的一年&#xff1b;是各种线下聚会&#xff0c;被迫数次延期、滞后、云上举办的一年&#xff1b;……而对于潜心修行&#xff0c;静蓄能量的中国.NET开发者而言&#xff0c;2020绝…

android+百度lbs云,百度——LBS.云 v2.0——云存储扩展字段——Android

今天要解决两个问题&#xff1a;1云存储扩展字段2上传的数据是乱码3android版本上传数据到云端使用了一段时间LBS云功能之后&#xff0c;随着对系统的熟悉&#xff0c;默认提供的字段&#xff0c;肯定无法满足需要。比如增加注释&#xff0c;价格&#xff0c;档次等字段的时候。…

年终将至,回顾我们一起走过的 2020

又到了年终末尾匆匆忙忙的 2020 似乎按下了倍速键一晃眼我们就从夏天走到了冬天在这不平凡的一年中我们同途共进也笑着成长让我们跟随着六大年度词条重温这一年我们共同经历的值得骄傲的瞬间吧&#xff01;点击文内高亮部分&#xff0c;阅读文章了解更多人才“倍”出星桥计划出…

灵魂拷问:你和大佬,技术差距有多大?

今天咱们聊点技术以外的内容。前几天&#xff0c;有程序员在某个坛子上发帖吐槽&#xff0c;新来的应届生张嘴就是分布式&#xff0c;一堆框架&#xff0c;可代码根本不会写。马上有人跟贴说自己也遇到过这种情况&#xff0c;说之前自己遇到过一个应届生&#xff0c;开口闭口动…

达梦数据查询编码_查询数据库的编码方式

在Mysql中(1)查看Mysql数据库编码show variables like character_set_database 或者 show create database 数据库名称(2)查看Mysql中某张表的编码show create table 表名show create database 数据库名称、show create table 表名 &#xff0c;还能够显示建库和建表语句。(3)…

玩转git-flow工作流-分支解析

概述搞开发的相信大部分人git天天都在用&#xff0c;那么一般我们在实际工程当中&#xff0c;遵循一个合理、清晰的Git使用流程&#xff0c;是非常重要的。否则&#xff0c;每个人都提交一堆杂乱无章的commit&#xff0c;项目很快就会变得难以协调和维护。那么是如何来规范整个…

android中的帧动画,[Android开发] Android中的帧动画

MainActivity文件&#xff1a;public class MainActivity extends Activity implements OnClickListener{AnimationDrawable anim_draw;SuppressLint("NewApi")Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);set…

ksu7对讲机调频软件_数字对讲机的群呼功能原理是什么?你了解多少?

大家都明白数字对讲机能够达到组呼、群呼、选呼的用途&#xff0c;但对数字对讲机的群呼功能原理可能操作方并不是太熟悉&#xff0c;下面我们就和大家来谈谈有关数字对讲机的群呼功能原理&#xff1a;无线对讲机群呼是为了更好地达到1个数字对讲机能够同一时间跟多个数字对讲机…

引入Jaeger——使用

上一篇定义了两种使用Jaeger的方式&#xff1a;中间件和action过滤器&#xff0c;下面这个例子定义了两个服务 WebAPI01&#xff0c;请求WebAPI02&#xff0c;采用的是中间件的请求方式。引入JaegerSharp包&#xff08;或发布到自己的Nuget库里引用&#xff09;WebAPI01的Start…

抖音ai智能机器人挂机_电销秘诀 电销企业难以拒绝的AI智能电销机器人

眼下是快节奏的时代&#xff0c;超智能化的电销机器人已然成为了电销企业实现高速发展的首选方式。为何现代电销企业都会摒弃纯人工电销方式&#xff0c;采取机器人与人工协作方式呢&#xff1f;这就不得不说是因为AI外呼机器人的卓越性能和优势。一&#xff0c;提升工作效率AI…

dataset的去重计数 g2_ExcelExcel去重、计数一步到位,这个方法简单到哭

私信回复关键词【插件】&#xff0c;获取Excel高手都在用的“插件合集插件使用小技巧”&#xff01;最近在哼哧哼哧搬家&#xff0c;搬家第一天&#xff0c;面对空荡荡的房子&#xff0c;我发现了一个严峻的问题——日用品还没买。我打开了一个月前写下的日用品清单&#xff1a…

【源码解读】Vue与ASP.NET Core WebAPI的集成

在前面博文【Vue】Vue 与 ASP.NET Core WebAPI 的集成中&#xff0c;介绍了集成原理&#xff1a;在中间件管道中注册SPA终端中间件&#xff0c;整个注册过程中&#xff0c;终端中间件会调用node&#xff0c;执行npm start命令启动vue开发服务器&#xff0c;向中间件管道添加路由…