智能化状态管理:自动状态流转处理模块

目录

基本背景介绍

具体实现

基本数据准备

基本数据表

状态转换常量

状态转换注解

任务处理模版

各任务实现逻辑

开启比对任务进行处理

降噪字段处理任务处理

开启业务数据比对处理

业务数据比对处理

开始核对数据生成最终报告处理

核对数据生成最终报告处理

状态逻辑分发器

定时任务定义

总结


自动流转一般都是一个很大的处理系统,其中包含的处理内容是很庞大的,就这样一个大型系统的开发思路,我后面会抽空来分享一篇全局的处理和调度实现方式,本次仅针对一般如果我们需要对一些业务流程需要进行自动化处理思维的给出一个样例的自动状态流转处理模块的代码示例。如果有写的不对的地方,请留言指正!

基本背景介绍

假设我们需要一个自动的数据比对任务处理流程,基本的状态流转如下:

其中,任务创建、任务启动、任务暂停这几项开放接口交由用户手动决策,其他流程则按指定的方式直接进行自动化处理。大致模版如上,实际业务可按实际处理方式进行替换。

具体实现

基本数据准备

基本数据表

启动以上任务以及实际实现处理上,具体表暂时定义如下:

CREATE TABLE `compare_task` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',`compare_task_name` varchar(512) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '比对任务名称',`replay_task_id` bigint(20) unsigned DEFAULT NULL COMMENT '回放任务ID',`status` int(10) unsigned DEFAULT NULL COMMENT '比对状态:-1-取消执行,0-任务创建;1-任务启动,2-降噪字段处理中,3-降噪字段处理完成,4-业务数据比对处理中-比对成功,5-业务数据比对处理完成,6-核对数据生成最终报告处理中,7-核对数据生成最终报告处理完成,8-比对失败',`failure_position` int(10) unsigned DEFAULT NULL COMMENT '中间失败停留状态记录',`failure_reason` varchar(200) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '失败原因',`noise_result` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '噪声数据结果记录',`compare_result` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '业务结果比对结果记录',`final_result` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '最终报告记录',`valid` int(11) DEFAULT '0' COMMENT ' 0当前在线 1已删除',`last_ping_time` int(11) NOT NULL DEFAULT '0' COMMENT '执行节点最后一次心跳时间',`version` int(11) NOT NULL DEFAULT '1' COMMENT '版本',`cname` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '创建人',`uname` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '更新人',`ctime` bigint(20) DEFAULT NULL COMMENT '创建时间',`utime` bigint(20) DEFAULT NULL COMMENT '修改时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='比对任务'

状态转换常量

为了实现以上基本的数据,我们先提供一个具体的状态转换常量表,具体代码如下:

/*** @author yanfengzhang* @description 比对相关常量* @date 2022/5/1  23:29*/
public class CompareCons {/*** 比对基本状态信息*/public static class Status {/*** 比对任务取消执行*/public static final int CANCEL = -1;/*** 比对任务创建*/public static final int CREATE = 0;/*** 比对任务启动*/public static final int START = 1;/*** 降噪字段处理中*/public static final int NOISE_REDUCING = 2;/*** 降噪字段处理完成*/public static final int NOISE_REDUCED = 3;/*** 业务数据比对处理中*/public static final int BIZ_COMPARING = 4;/*** 业务数据比对处理完成*/public static final int BIZ_COMPARED = 5;/*** 核对数据生成最终报告处理中*/public static final int GENERATE_REPORTING = 6;/*** 核对数据生成最终报告处理完成*/public static final int GENERATE_REPORTED = 7;/*** 比对失败*/public static final int FAILED = 8;}
}

状态转换注解

自动化根据状态进行统一管理,故各个处理器实际上需要表明自己需要处理的状态行为,具体注解定义如下:

/*** @author yanfengzhang* @description* @date 2022/5/1  23:33*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Status {int status();
}

任务处理模版

基本任务处理的基类,包含通用逻辑:

  • 将提交的处理交由线程池管理。
  • 同时定义一个心跳关联到外部的Processor,当processor运行结束时,结束心跳。每个ping关联一个单独的ScheduledExecutorService,结束ping时直接shutdown线程池。
  • 每个processor在开始前需要有一定逻辑更新task的状态,否则可能导致任务被重复提交。

具体的代码实现逻辑如下:

/*** @author yanfengzhang* @description 任务处理的基类,包含任务处理的通用逻辑;* 核心逻辑:* 被提交的processor交由线程池执行;* 每个processor关联一个ping对象,ping实现心跳逻辑;* 每个processor在开始前需要有一定逻辑更新task的状态,否则可能导致任务被重复提交。* @date 2022/5/1  23:46*/
public abstract class AbstractProcessor implements Runnable {private final Ping ping;private final CompareTaskPo value;private final Semaphore semaphore = new Semaphore(0);private static final Logger LOGGER = LoggerFactory.getLogger(AbstractProcessor.class);private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(8,16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {private AtomicInteger threadCount = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());}});protected AbstractProcessor(CompareTaskPo value) {ping = new Ping(this);this.value = value;}/*** 心跳。* 关联到外部的Processor,当processor运行结束时,结束心跳。* 每个ping关联一个单独的ScheduledExecutorService,结束ping时直接shutdown线程池。*/class Ping implements Runnable {private WeakReference<AbstractProcessor> weakReference;private ReferenceQueue<AbstractProcessor> referenceQueue = new ReferenceQueue<>();private ScheduledExecutorService scheduleAtFixedRate = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "task-ping"));private CompareTaskMapper compareTaskMapper;Ping(AbstractProcessor processor) {weakReference = new WeakReference<>(processor, referenceQueue);compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);}void ping() {if (referenceQueue.poll() != null) {/*兜底:当其关联的processor被垃圾回收后,结束心跳*/LOGGER.warn("【任务处理心跳】compareTaskId:{}的心跳被动结束", value.getId());scheduleAtFixedRate.shutdown();} else {try {int curTime = (int) (System.currentTimeMillis() / 1000);compareTaskMapper.updateLastPingTime(value.getId(), curTime);LOGGER.warn("【任务处理心跳】compareTaskId:{}心跳正常,当前时间:{} processor:{}", value.getId(), curTime, weakReference.get());} catch (Exception e) {LOGGER.error("【任务处理心跳】compareTaskId:{}心跳时间更新异常,exception:", value.getId(), e);}}}@Overridepublic void run() {ping();}void start() {LOGGER.warn("【任务处理心跳】compareTaskId:{}心跳正常开启", value.getId());scheduleAtFixedRate.scheduleWithFixedDelay(this, 2, 2, TimeUnit.SECONDS);}void close() {LOGGER.warn("【任务处理心跳】compareTaskId:{}心跳正常结束", value.getId());scheduleAtFixedRate.shutdown();}}protected abstract boolean actualProcess(CompareTaskPo value);protected abstract void end(CompareTaskPo value);private void done() {ping.close();semaphore.release(1);}public final void process() {THREAD_POOL_EXECUTOR.submit(this);}public final boolean allowRecycle() {return semaphore.tryAcquire();}@Overridepublic final void run() {this.ping.start();try {/*实际状态下任务处理内容成功后进行状态流转*/if (actualProcess(value)) {end(value);}} finally {done();}}
}

各任务实现逻辑

主要的任务处理流程如下几个重要处理器实现,其中状态可以由用户自动暂停更新状态,更新状态后相关流程被中断,该部分实现放置在定时任务中进行处理,具体见后面的代码,同时如果各任务中间有处理失败的内容,我们也会中断流程并记录具体失败的原因是什么好方便后续问题的定位。

开启比对任务进行处理

主要功能:用户创建比对任务没有问题后,点击任务启动,自动化处理流程开始,对相关业务数据进行分析验证,然后更新相关的状态,具体样例实现如下:

/*** @author yanfengzhang* @description 开启比对任务进行处理* @date 2022/5/2  00:05*/
@Status(status = CompareCons.Status.START)
@Slf4j
public class StartCompareProcessor extends AbstractProcessor {private CompareTaskMapper compareTaskMapper;public StartCompareProcessor(CompareTaskPo value) {super(value);compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);}@Overridepublic boolean actualProcess(CompareTaskPo value) {log.info("开启比对任务进行处理:当前处理id为{}", value.getId());CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());/*1检查数据正确性:对应的回放信息是否满足要求,如果不满足则直接中止比对任务*/return startCompareProcessorCheck(compareTaskPo);}/*** 检查数据正确性:对应的回放信息是否满足要求,如果不满足则直接中止比对任务* 如果没有问题,则认为已经成功** @param compareTaskPo 比对任务信息* @return true-基本检查通过;false-检查不通过*/private boolean startCompareProcessorCheck(CompareTaskPo compareTaskPo) {return true;}@Overridepublic void end(CompareTaskPo value) {log.info("开启比对任务进行处理完成,待更新状态:当前处理id为{}", value.getId());try {/*更新状态为"降噪字段处理中"*/compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.NOISE_REDUCING);} catch (Exception e) {log.info("开启比对任务进行处理完成异常异常异常异常:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.NOISE_REDUCING);}log.info("开启比对任务进行处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.NOISE_REDUCING);}
}

降噪字段处理任务处理

主要功能:假设我们通过比对两次master处理回放来分析得出一些噪声处理信息,比对处理噪声的主要代码如下:

/*** @author yanfengzhang* @description 降噪字段处理任务处理* @date 2022/5/2  00:29*/
@Status(status = CompareCons.Status.NOISE_REDUCING)
@Slf4j
public class NoiseReduceProcessor extends AbstractProcessor {private CompareTaskMapper compareTaskMapper;public NoiseReduceProcessor(CompareTaskPo value) {super(value);compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);}@Overridepublic boolean actualProcess(CompareTaskPo value) {log.info("降噪字段处理任务处理:当前处理id为{}", value.getId());/*1.根据回放任务id来查看对应回放记录中的数据信息*/CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());if (Objects.isNull(compareTaskPo)) {log.error("降噪字段处理任务处理异常:比对任务{}并不存在,请进行核对!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,CompareCons.Status.NOISE_REDUCING, "比对任务并不存在!");return false;}ReplayTaskApplicationService replayTaskApplicationService = BeanFactoryUtil.getBean(ReplayTaskApplicationService.class);ReplayDataResultValue replayDataResultValue = replayTaskApplicationService.getBdfPathListByReplayTaskId(compareTaskPo.getReplayTaskId());if (Objects.isNull(replayDataResultValue) || StringUtils.isBlank(replayDataResultValue.getMasterFirstBdfPath())|| StringUtils.isBlank(replayDataResultValue.getMasterSecondBdfPath())) {log.error("降噪字段处理任务处理异常:比对任务{}对应回放记录id相关数据文件数据并不存在,请进行核对!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,CompareCons.Status.NOISE_REDUCING, "对应回放记录id相关数据文件数据并不存在或不完整!");return false;}//        String masterFirstBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.mfbdf.rpresult";
//        String masterSecondBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.msbdf.rpresult";String masterFirstBdfPath = replayDataResultValue.getMasterFirstBdfPath();String masterSecondBdfPath = replayDataResultValue.getMasterSecondBdfPath();/*2.检查回放记录中两次master文件对应的条数是否一致*/Long masterFirstBdfLines = null;Long masterSecondBdfLines = null;try {masterFirstBdfLines = Files.lines(Paths.get(masterFirstBdfPath)).count();masterSecondBdfLines = Files.lines(Paths.get(masterSecondBdfPath)).count();} catch (Exception e) {log.error("降噪字段处理任务处理异常:比对任务{}对应回放记录中两次master回放文件读取异常!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.NOISE_REDUCING, "两次master回放文件读取异常");return false;}if (!Objects.equals(masterFirstBdfLines, masterSecondBdfLines)) {log.error("降噪字段处理任务处理异常:比对任务{}对应回放记录中两次master文件数据条数并不一致!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.NOISE_REDUCING, "两次master文件数据条数并不一致");return false;}/*3.文件各行进行数据对比并进行记录*/try {String compareMasterFile = "/Users/yanfengzhang/Downloads/" + value.getCompareTaskName() + "_" + value.getId() + "_降噪比对数据.txt";for (int i = 1; i < masterFirstBdfLines + 1; i++) {String masterFirstBdfStr = FileUtils.readAppointedLineNumber(masterFirstBdfPath, i);String masterSecondBdfStr = FileUtils.readAppointedLineNumber(masterSecondBdfPath, i);JsonNode diffInfo = JsonDealUtils.getCompareJsonResult(masterFirstBdfStr, masterSecondBdfStr);FileUtils.writeContent(compareMasterFile, diffInfo.toString());}compareTaskMapper.updateNoiseResult(value.getId(), compareMasterFile);} catch (Exception e) {log.error("降噪字段处理任务处理异常:比对任务{}生成噪声数据异常!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.NOISE_REDUCING, "生成噪声数据异常");return false;}/*4.执行完毕无异常,进行状态变更*/return true;}@Overridepublic void end(CompareTaskPo value) {log.info("降噪字段处理任务处理完成,待更新状态:当前处理id为{}", value.getId());/*更新状态为"降噪字段处理完成"*/compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.NOISE_REDUCED);log.info("降噪字段处理任务处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.NOISE_REDUCED);}
}

开启业务数据比对处理

主要功能:没有其他检查数据内容的话,可以直接进行状态转换,我这边暂时忽略检查!

/*** @author yanfengzhang* @description 开启业务数据比对处理* @date 2022/5/2  00:36*/
@Status(status = CompareCons.Status.NOISE_REDUCED)
@Slf4j
public class StartBizCompareProcessor extends AbstractProcessor {private CompareTaskMapper compareTaskMapper;public StartBizCompareProcessor(CompareTaskPo value) {super(value);compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);}@Overridepublic boolean actualProcess(CompareTaskPo value) {log.info("开启业务数据比对处理:当前处理id为{}", value.getId());/*该状态下当前不做任何处理,基本没有检查的相关启动条件*/return true;}@Overridepublic void end(CompareTaskPo value) {log.info("开启业务数据比对处理完成,待更新状态:当前处理id为{}", value.getId());/*更新状态为"业务数据比对处理中"*/compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.BIZ_COMPARING);log.info("开启业务数据比对处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.BIZ_COMPARING);}
}

业务数据比对处理

主要功能:对本次业务代码改动和master代码进行对比来分析对应的内容处理变化统计,具体代码如下:

/*** @author yanfengzhang* @description 业务数据比对处理* @date 2022/5/2  00:53*/
@Status(status = CompareCons.Status.BIZ_COMPARING)
@Slf4j
public class BizCompareProcessor extends AbstractProcessor {private CompareTaskMapper compareTaskMapper;public BizCompareProcessor(CompareTaskPo value) {super(value);compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);}@Overridepublic boolean actualProcess(CompareTaskPo value) {log.info("开启业务数据比对处理处理:当前处理id为{}", value.getId());CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());/*1.根据回放任务id来查看对应回放记录中的数据信息*/if (Objects.isNull(compareTaskPo)) {log.error("开启业务数据比对处理处理异常:比对任务{}并不存在,请进行核对!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,CompareCons.Status.NOISE_REDUCING, "比对任务并不存在!");return false;}ReplayTaskApplicationService replayTaskApplicationService = BeanFactoryUtil.getBean(ReplayTaskApplicationService.class);ReplayDataResultValue replayDataResultValue = replayTaskApplicationService.getBdfPathListByReplayTaskId(compareTaskPo.getReplayTaskId());if (Objects.isNull(replayDataResultValue) || StringUtils.isBlank(replayDataResultValue.getMasterFirstBdfPath())|| StringUtils.isBlank(replayDataResultValue.getFeatureBdfPath())) {log.error("开启业务数据比对处理处理异常:比对任务{}对应回放记录id相关数据文件数据并不存在,请进行核对!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,CompareCons.Status.NOISE_REDUCING, "对应回放记录id相关数据文件数据并不存在或不完整!");return false;}//        String masterFirstBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.mfbdf.rpresult";
//        String featureBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.fbdf.rpresult";String masterFirstBdfPath = replayDataResultValue.getMasterFirstBdfPath();String featureBdfPath = replayDataResultValue.getFeatureBdfPath();/*2.检查回放记录中master文件和dev文件对应的条数是否一致*/Long masterFirstBdfLines = null;Long featureBdfLines = null;try {masterFirstBdfLines = Files.lines(Paths.get(masterFirstBdfPath)).count();featureBdfLines = Files.lines(Paths.get(featureBdfPath)).count();} catch (Exception e) {log.error("比对任务{}对应回放记录中master回放文件或dev回放文件读取异常!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.BIZ_COMPARING, "master回放文件或dev回放文件读取异常");return false;}if (!Objects.equals(masterFirstBdfLines, featureBdfLines)) {log.error("比对任务{}对应回放记录中master回放文件和dev回放文件数据条数并不一致!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.BIZ_COMPARING, "master回放文件和dev回放文件数据条数并不一致");return false;}/*3.文件各行进行数据对比并进行记录*/try {String compareBizFile = "/Users/yanfengzhang/Downloads/" + value.getCompareTaskName() + "_" + value.getId() + "_业务比对数据.txt";for (int i = 1; i < masterFirstBdfLines + 1; i++) {String masterFirstBdfStr = FileUtils.readAppointedLineNumber(masterFirstBdfPath, i);String featureBdfStr = FileUtils.readAppointedLineNumber(featureBdfPath, i);JsonNode diffInfo = JsonDealUtils.getCompareJsonResult(masterFirstBdfStr, featureBdfStr);FileUtils.writeContent(compareBizFile, diffInfo.toString());}compareTaskMapper.updateCompareResult(value.getId(), compareBizFile);} catch (Exception e) {log.error("比对任务{}生成业务比对数据异常!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.BIZ_COMPARING, "生成业务比对数据异常");return false;}/*4.执行完毕无异常,进行状态变更*/return true;}@Overridepublic void end(CompareTaskPo value) {log.info("开启业务数据比对处理处理完成,待更新状态:当前处理id为{}", value.getId());/*更新状态为"业务数据比对处理完成"*/compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.BIZ_COMPARED);log.info("开启业务数据比对处理处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.BIZ_COMPARED);}
}

开始核对数据生成最终报告处理

主要功能:没有其他检查数据内容的话,可以直接进行状态转换,我这边暂时忽略检查!

/*** @author yanfengzhang* @description 开始核对数据生成最终报告处理* @date 2022/5/2  00:59*/
@Status(status = CompareCons.Status.BIZ_COMPARED)
@Slf4j
public class StartGenerateReportProcessor extends AbstractProcessor {private CompareTaskMapper compareTaskMapper;public StartGenerateReportProcessor(CompareTaskPo value) {super(value);compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);}@Overridepublic boolean actualProcess(CompareTaskPo value) {log.info("开始核对数据生成最终报告处理:当前处理id为{}", value.getId());/*该状态下当前不做任何处理,基本没有检查的相关启动条件(检查相关文件是否存在)*/return true;}@Overridepublic void end(CompareTaskPo value) {log.info("开始核对数据生成最终报告处理完成,待更新状态:当前处理id为{}", value.getId());/*更新状态为"核对数据生成最终报告处理中"*/compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.GENERATE_REPORTING);log.info("开始核对数据生成最终报告处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.GENERATE_REPORTING);}
}

核对数据生成最终报告处理

主要功能:结合前面处理生成的数据进行最终报告的比对任务生成报告,具体处理流程如下:

/*** @author yanfengzhang* @description 核对数据生成最终报告处理* @date 2022/5/2  01:20*/
@Status(status = CompareCons.Status.GENERATE_REPORTING)
@Slf4j
public class GenerateReportProcessor extends AbstractProcessor {private CompareTaskMapper compareTaskMapper;public GenerateReportProcessor(CompareTaskPo value) {super(value);compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);}@Overridepublic boolean actualProcess(CompareTaskPo value) {log.info("开始核对数据生成最终报告处理:当前处理id为{}", value.getId());CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());if (Objects.isNull(compareTaskPo)) {log.error("开始核对数据生成最终报告处理异常:比对任务{}并不存在,请进行核对!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,CompareCons.Status.NOISE_REDUCING, "比对任务并不存在!");return false;}/*1.根据回放任务id来查看对应回放记录中的数据信息*/String compareBizResultPath = compareTaskPo.getCompareResult();String noiseResultPath = compareTaskPo.getNoiseResult();/*2.检查回放记录中master文件和dev文件对应的条数是否一致*/Long compareBizResultLines = null;Long noiseResultLines = null;try {compareBizResultLines = Files.lines(Paths.get(compareBizResultPath)).count();noiseResultLines = Files.lines(Paths.get(noiseResultPath)).count();} catch (Exception e) {log.error("比对任务{}对应核对数据生成最终报告读取文件异常!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.GENERATE_REPORTING, "对应核对数据生成最终报告读取文件异常");return false;}if (!Objects.equals(compareBizResultLines, noiseResultLines)) {log.error("比对任务{}对应核对数据生成最终报告相关文件数据条数并不一致!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.GENERATE_REPORTING, "对应核对数据生成最终报告相关文件数据条数并不一致");return false;}/*3.文件各行进行数据对比并进行记录*/try {String compareBizFile = "/Users/yanfengzhang/Downloads/" + value.getCompareTaskName() + "_" + value.getId() + "_最终结果报告.txt";for (int i = 1; i < compareBizResultLines + 1; i++) {String compareBizResultStr = FileUtils.readAppointedLineNumber(compareBizResultPath, i);String noiseResultStr = FileUtils.readAppointedLineNumber(noiseResultPath, i);List<CompareDataMeta> compareDataMetas = CompareDataResult.getCompareDataResult(noiseResultStr, compareBizResultStr);FileUtils.writeContent(compareBizFile, JSON.toJSONString(compareDataMetas));}compareTaskMapper.updateNoiseResult(value.getId(), compareBizFile);} catch (Exception e) {log.error("比对任务{}核对数据生成最终报告数据处理异常!", value.getId());compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.GENERATE_REPORTING, "核对数据生成最终报告数据处理异常");return false;}/*4.执行完毕无异常,进行状态变更*/return true;}@Overridepublic void end(CompareTaskPo value) {log.info("开始核对数据生成最终报告处理完成,待更新状态:当前处理id为{}", value.getId());/*更新状态为"核对数据生成最终报告处理完成"*/compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.GENERATE_REPORTED);log.info("开始核对数据生成最终报告处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.GENERATE_REPORTED);}
}

状态逻辑分发器

我们针对以上任务处理器,对实际业务处理进行分析并将其转发到相关的处理器上进行自动化处理,具体实现逻辑如下:

/*** @author yanfengzhang* @description 负责对task任务不同状态运行逻辑的分发。* @date 2022/5/2  01:44*/
public class EventDispatcher {private static Map<Integer, Class> status2Processor = Maps.newHashMap();private static Set<AbstractProcessor> curProcessors = new HashSet<>();private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);static {Reflections reflections = new Reflections("com.sankuai.tsp.product.bsap.domain.compare.event.processor.impl");Set<Class<?>> classSet = reflections.getTypesAnnotatedWith(Status.class);for (Class<?> cl : classSet) {Annotation[] annotations = cl.getAnnotations();for (Annotation a : annotations) {if (a instanceof Status) {Status status = (Status) a;status2Processor.put(status.status(), cl);}}}}/*** dispatch方法目前只有cronServer线程调用,* 但是为了防止出现多线程调用导致的curProcessors被并发修改问题,所以用synchronized同步** @param status        当前任务状态* @param compareTaskPo 比对任务消息数据* @return*/public static synchronized boolean dispatch(int status, CompareTaskPo compareTaskPo) {AbstractProcessor processor = getInstance(status, compareTaskPo);if (processor != null) {curProcessors.add(processor);processor.process();return true;}return false;}private static AbstractProcessor getInstance(int status, CompareTaskPo compareTaskPo) {/*zyf:主动清理一次*/cleanDirty();if (containsStatus(status)) {try {Constructor constructor = status2Processor.get(status).getConstructor(CompareTaskPo.class);return (AbstractProcessor) constructor.newInstance(compareTaskPo);} catch (Exception ex) {LOGGER.error("EventDispatcher dispatcher getInstance error, exception:", ex);}}return null;}public static boolean containsStatus(int status) {return status2Processor.containsKey(status);}public static synchronized void cleanDirty() {curProcessors.removeIf(AbstractProcessor::allowRecycle);}public static int getTaskCount() {return curProcessors.size();}
}

定时任务定义

针对以上的内容,我们内部维护一个基本的定时器来完成实际的业务自动化流转处理,主要代码和业务处理如下:

/*** @author yanfengzhang* @description 定时任务:定时读取数据库中比对数据需要处理的task任务,并分发到响应的processor处理。* @date 2022/5/2  02:18*/
@Component
@DependsOn("beanFactoryUtil")
public class CronServer implements InitializingBean {@Autowiredprivate CompareTaskMapper compareTaskMapper;private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();private static final Logger LOGGER = LoggerFactory.getLogger(CronServer.class);@Overridepublic void afterPropertiesSet() throws Exception {SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CompareCronTask(), 20, 3, TimeUnit.SECONDS);}class CompareCronTask implements Runnable {@Overridepublic void run() {if (BsapCompareSwitch.cronServerPause()) {LOGGER.warn("--------------cron server pause--------------");return;}int taskCount = EventDispatcher.getTaskCount();/*清理已经完成的任务*/EventDispatcher.cleanDirty();LOGGER.warn("[--------当前正在运行的任务数量为:{}-------]", EventDispatcher.getTaskCount());if (taskCount != 0 && EventDispatcher.getTaskCount() == 0) {LOGGER.warn("[------------------------任务数量存在问题,主动进行gc处理中---------------------------]");System.gc();}int curSecond = (int) (System.currentTimeMillis() / 1000);try {List<CompareTaskPo> compareTaskPos = compareTaskMapper.selectCompareTaskPoByTimeRange(curSecond - 20);if (CollectionUtils.isEmpty(compareTaskPos)) {return;}for (CompareTaskPo compareTaskPo : compareTaskPos) {/*如果处理的内容不在我们规定的范围时直接跳出*/if (!EventDispatcher.containsStatus(compareTaskPo.getStatus())) {continue;}/*** 思考:* 尝试更新一下last_ping_update的时间,更新成功代表抢锁成功,然后执行任务。* 如果更新成功但是执行失败,待后续CronServer运行时再次尝试。* 每台服务器每次定时任务只运行一个任务,防止同一台服务器抢占多个任务导致压力过大、负载不均衡的问题。* (由于目前任务运行周期在多台服务器是一致的,所以极端情况下可能会出现任务被一台机器抢占的情况,* 后续可以考虑使不同机器的运行周期随机或者引入分布式任务分配(负载均衡)策略)*/if (compareTaskMapper.updateLastPingTimeByVersion(compareTaskPo.getId(), curSecond - 15, compareTaskPo.getVersion()) > 0) {compareTaskPo.setVersion(compareTaskPo.getVersion() + 1);compareTaskPo.setLastPingTime(curSecond - 15);if (EventDispatcher.dispatch(compareTaskPo.getStatus(), compareTaskPo)) {LOGGER.warn("CronServer 提交一个任务,任务id为{}, 任务详细信息:{}", compareTaskPo.getId(), JSON.toJSON(compareTaskPo));break;}}}} catch (Exception e) {LOGGER.error("server cron run catch an exception:", e);}}}
}

总结

整体上的大致模版实现已如上进行了简化,其中有不同的理解的可留言讨论,后续复杂系统的内容后续有时间在进行分享,谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/853372.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI项目二十二:行人属性识别

若该文为原创文章&#xff0c;转载请注明原文出处。 分享一个行人属性分析系统&#xff0c;识别行人&#xff0c;并标记每个人的属性。 项目代码来自公众号渡码的项目。 本人用Win10复现完整项目&#xff0c;并记录过程。 源码会上传到github,可以自行下载测试。 Yinyifen…

Flutter 简化CustomPainter的绘制

文章目录 前言一、为何简化&#xff1f;1、通常做法&#xff08;1&#xff09;、绘制形状1&#xff08;2&#xff09;、绘制形状2&#xff08;3&#xff09;、界面显示 2、简化 二、完整代码三、使用示例1、绘制图形2、动态触发绘制 总结 前言 使用Flutter做界面时&#xff0c…

Linux DMA-Buf驱动框架

一、DMABUF 框架 dmabuf 是一个驱动间共享buf 的机制&#xff0c;他的简单使用场景如下&#xff1a; 用户从DRM&#xff08;显示驱动&#xff09;申请一个dmabuf&#xff0c;把dmabuf 设置给GPU驱动&#xff0c;并启动GPU将数据输出到dmabuf&#xff0c;GPU输出完成后&#xf…

大数据实训项目(小麦种子)-02、实训项目整体功能介绍与演示

文章目录 前言界面及功能描述实现功能描述技术选型界面展示首页界面功能1&#xff1a;HDFS&#xff0c;选择文件上传文件详细步骤 功能2&#xff1a;MapReduce预处理数据功能3&#xff1a;Hbase存储小麦种子数据并查询前10条记录功能4&#xff1a;Hive分析原始csv文件数据并ech…

RTA_OS基础功能讲解 2.9-警报器

RTA_OS基础功能讲解 2.9-警报器 文章目录 RTA_OS基础功能讲解 2.9-警报器一、警报器简介二、警报器配置2.1 激活一个任务2.2 设置一个事件2.3 执行回调函数2.4 递增一个(软件)计数器三、警报器设置3.1 绝对警报3.1.1 单次触发3.1.2 周期触发3.1.3 在过去设置警报3.1.4 将绝对…

swift微调牧歌数据电商多模态大语言模型

大规模中文多模态评测基准MUGE_数据集-阿里云天池多模态理解和生成评估挑战榜(MUGE)是由阿里巴巴达摩院智能计算实验室发起,由阿里云天池平台承办,并由浙江大学、清华大学等单位共同协办。 Mhttps://tianchi.aliyun.com/dataset/107332微调的是牧歌数据集,结果都不好,记录…

中望CAD 2025 (ZW3D2025) 简体中文修改版

名称&#xff1a;中望CAD 2025 (ZW3D2025) 简体中文修改版 描述&#xff1a;一款三维CAD设计工具&#xff0c;运行破解补丁ZW3D2025-2024-Patch执行修补。 链接&#xff1a;夸克网盘分享 &#x1f4c1; 大小&#xff1a;3.2GB &#x1f3f7; 标签&#xff1a;#PC软件 #CAD #设…

支付宝 沙盒demo使用

简介&#xff1a;支付宝沙箱环境是一个为开发者提供的模拟测试环境&#xff0c;用于在应用上线前进行接口功能开发和联调。在这个环境中&#xff0c;开发者可以模拟开放接口&#xff0c;进行开发调试工作&#xff0c;以确保应用上线后能顺利运行。 1. 配置沙盒 1. 1 沙箱控制…

【odoo15】前端自定义模态弹窗

概要 在odoo15或者在15之前&#xff0c;odoo前端的owl框架还没完全替换当前前端框架的时候&#xff0c;我们很多时候都是用js或者jq来直接操作dom&#xff0c;那么我们如果需要在前端用到一个模态弹窗&#xff0c;可以怎么解决呢&#xff1f; 方法1 直接用js原生的模态弹窗&am…

Oracle的这些BUG你要遇到,说明你是一个DBA老鸟...

作者&#xff1a;IT邦德 中国DBA联盟(ACDU)成员&#xff0c;10余年DBA工作经验&#xff0c; Oracle、PostgreSQL ACE CSDN博客专家及B站知名UP主&#xff0c;全网粉丝10万 擅长主流Oracle、MySQL、PG、高斯及Greenplum备份恢复&#xff0c; 安装迁移&#xff0c;性能优化、故障…

【LVGL】Guider 界面分析

文章目录 前言架构创建 UI切换界面空间释放分析创建页面空间变化 前言 分析Gui Guider-1.7.2-GA 生成的 LVGL 界面切换&#xff0c;资源管理等处理 架构 所有控件存放于同一个结构体 lv_ui 内&#xff0c;每个页面都至少包含 screen_xxx 和 screen_xxx_del 两个成员 typede…

用HAL库改写江科大的stm32入门-7-1 ADC

实验目的:了解ADC基本概念 电路图&#xff1a; ADC&#xff08;Analog-Digital Converter&#xff09;模拟-数字转换器&#xff0c;它可以将引脚上连续变化的模拟电压转换为内存中存储的数字变量&#xff0c;建立模拟电路到数字电路的桥梁。 实验效果&#xff1a; &#xff0…

【html】学会这一套布局,让你的网页更加

很多小伙伴们在刚刚开始学习网页设计的时候不知道怎么布局今天给大家介绍一种非常实用且更加专业的一种布局。 灵感来源&#xff1a; 小米官网 布局图; 实例效果图&#xff1a; 这是一个简单的HTML模板&#xff0c;包括头部、内容区域和底部。 头部部分包括一个分为左右两部分…

【代码随想录】【算法训练营】【第39天】 [62]不同路径 [63]不同路径II [343]整数拆分 [96]不同的二叉搜索树

前言 思路及算法思维&#xff0c;指路 代码随想录。 题目来自 LeetCode。 day 39&#xff0c;周六&#xff0c;坚持不住了~ 题目详情 [62] 不同路径 题目描述 62 不同路径 解题思路 前提&#xff1a;每次只能向下或者向右移动一步 思路&#xff1a;动态规划&#xff0…

部署LVS-DR群集...

目录 最后一台主机&#xff08;第四台&#xff09; 本地yum源安装httpd&#xff08;非必做&#xff09; 继续开始从最后一台主机开始&#xff08;第四台&#xff09; 转第二台主机 转第三台主机 回第二台 上传 转第三台主机 上传 回第二台 转第三台 转第一台主机…

Java 项目学习(初始化项目)

后端工程基于 maven 进行项目构建&#xff0c;并且进行分模块开发 参考&#xff1a;Spring或Spring Boot项目目录结构划分和代码分层 1、了解项目的整体结构 sky-take-out maven 父工程&#xff0c;统一管理依赖版本&#xff0c;聚合其他子模块 sky-common 子模块&#xff0c…

【背包题】oj题库

目录 1282 - 简单背包问题 1780 - 采灵芝 1888 - 多重背包&#xff08;1&#xff09;​编辑 1891 - 开心的金明 2073 - 码头的集装箱 1905 - 混合背包 1282 - 简单背包问题 #include <bits/stdc.h> using namespace std; //二维数组:dp[i][j]max(dp[i-1][j],v[i]dp[…

Oracle备份失败处理,看这一篇就够了!

作者&#xff1a;IT邦德 中国DBA联盟(ACDU)成员&#xff0c;10余年DBA工作经验&#xff0c; Oracle、PostgreSQL ACE CSDN博客专家及B站知名UP主&#xff0c;全网粉丝10万 擅长主流Oracle、MySQL、PG、高斯及Greenplum备份恢复&#xff0c; 安装迁移&#xff0c;性能优化、故障…

FLAN-T5模型的文本摘要任务

Text Summarization with FLAN-T5 — ROCm Blogs (amd.com) 在这篇博客中&#xff0c;我们展示了如何使用HuggingFace在AMD GPU ROCm系统上对语言模型FLAN-T5进行微调&#xff0c;以执行文本摘要任务。 介绍 FLAN-T5是谷歌发布的一个开源大型语言模型&#xff0c;相较于之前的…

什么是专业的CRM客户管理系统,介绍crm客户管理系统的功能作用

CRM&#xff08;Customer Relationship Management&#xff09;客户管理系统&#xff0c;是现代企业不可或缺的一款管理工具。它集客户信息管理、销售自动化、客户服务与支持、数据分析与决策支持等多项功能于一身&#xff0c;帮助企业实现客户关系的全方位管理&#xff0c;从而…