前一篇文章KnowStreaming系列教程第二篇——项目整体架构分析_诸葛子房_的博客-CSDN博客
讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步
一、调度模块代码主要在km-task里面
public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);@Overridepublic void onApplicationEvent(ClusterPhyAddedEvent event) {LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());Long now = System.currentTimeMillis();// 交由KS自定义的线程池,异步执行任务FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));}private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {ClusterPhy tempClusterPhy = null;// 120秒内无加载进来,则直接返回退出while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);if (tempClusterPhy != null) {break;}BackoffUtils.backoff(1000);}if (tempClusterPhy == null) {return;}// 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定BackoffUtils.backoff(5000);final ClusterPhy clusterPhy = tempClusterPhy;// 集群执行集群元信息同步List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {try {dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);} catch (Exception e) {// ignore}}// 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定BackoffUtils.backoff(5000);// 集群集群指标采集List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {try {dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);} catch (Exception e) {// ignore}}}
}
通过监听集群添加事件,触发元数据同步和指标采集调度任务
具体实现可参考:
spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674
二、调度任务分布式系统如何做到单节点运行,避免多台机器调度
AbstractDispatchTask 里面的execute 方法通过实现任务分配
public TaskResult execute(JobContext jobContext) {try {long triggerTimeUnitMs = System.currentTimeMillis();// 获取所有的任务List<E> allTaskList = this.listAllTasks();if (ValidateUtils.isEmptyList(allTaskList)) {LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);return TaskResult.SUCCESS;}// 计算当前机器需要执行的任务List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());if (ValidateUtils.isEmptyList(allTaskList)) {LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);return TaskResult.SUCCESS;}// 进行任务处理TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs);//组装信息TaskResult taskResult = new TaskResult();taskResult.setCode(ret.getCode());taskResult.setMessage(ConvertUtil.list2String(subTaskList, ","));return taskResult;} catch (Exception e) {LOGGER.error("process task failed, taskName:{}", taskName, e);return new TaskResult(TaskResult.FAIL_CODE, e.toString());}}
对应代码解释如下:
参考:
https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md