KnowStreaming系列教程第三篇——调度任务模块

前一篇文章KnowStreaming系列教程第二篇——项目整体架构分析_诸葛子房_的博客-CSDN博客

讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步

一、调度模块代码主要在km-task里面

public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);@Overridepublic void onApplicationEvent(ClusterPhyAddedEvent event) {LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());Long now = System.currentTimeMillis();// 交由KS自定义的线程池,异步执行任务FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));}private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {ClusterPhy tempClusterPhy = null;// 120秒内无加载进来,则直接返回退出while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);if (tempClusterPhy != null) {break;}BackoffUtils.backoff(1000);}if (tempClusterPhy == null) {return;}// 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定BackoffUtils.backoff(5000);final ClusterPhy clusterPhy = tempClusterPhy;// 集群执行集群元信息同步List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {try {dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);} catch (Exception e) {// ignore}}// 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定BackoffUtils.backoff(5000);// 集群集群指标采集List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {try {dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);} catch (Exception e) {// ignore}}}
}

通过监听集群添加事件,触发元数据同步和指标采集调度任务

具体实现可参考:

spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674

二、调度任务分布式系统如何做到单节点运行,避免多台机器调度

AbstractDispatchTask 里面的execute 方法通过实现任务分配
public TaskResult execute(JobContext jobContext) {try {long triggerTimeUnitMs = System.currentTimeMillis();// 获取所有的任务List<E> allTaskList = this.listAllTasks();if (ValidateUtils.isEmptyList(allTaskList)) {LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);return TaskResult.SUCCESS;}// 计算当前机器需要执行的任务List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());if (ValidateUtils.isEmptyList(allTaskList)) {LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);return TaskResult.SUCCESS;}// 进行任务处理TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs);//组装信息TaskResult taskResult = new TaskResult();taskResult.setCode(ret.getCode());taskResult.setMessage(ConvertUtil.list2String(subTaskList, ","));return taskResult;} catch (Exception e) {LOGGER.error("process task failed, taskName:{}", taskName, e);return new TaskResult(TaskResult.FAIL_CODE, e.toString());}}

对应代码解释如下:

参考:

https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/9598.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

opengauss安装

opengauss安装 系统环境 Redhat版本&#xff1a;redhat7.6 虚拟机ip&#xff1a;192.168.5.144 Gauss版本&#xff1a;openGauss-5.0.0-CentOS-64bit-all.tar.gz 企业版 一&#xff0e;准备软硬件环境 1.1 安装依赖包 yum -y install bzip2 python3 libaio-devel flex bis…

hive 相关总结

1、表复制(五分区表复制) create table t1 as select name,age from t2 where ds>2022-06-04 2、表覆盖(先清空表中的原有数据&#xff0c;再向表中插入数据) insert overwrite TABLE t1 select name,age from t2 where ds>2022-06-04 3、表分区覆盖&#xff08;先清空表…

LeetCode中删除数组元素

26&#xff0c;80 题是删除 有序数组 中的 重复项 对于此类问题&#xff0c;我们应该进行如下考虑&#xff1a; 由于是保留 k 个相同数字&#xff0c;对于前 k 个数字&#xff0c;我们可以直接保留。对于后面的任意数字&#xff0c;能够保留的前提是&#xff1a;与当前写入的位…

题目:2119.反转两次的数字

​​题目来源&#xff1a; leetcode题目&#xff0c;网址&#xff1a;2119. 反转两次的数字 - 力扣&#xff08;LeetCode&#xff09; 解题思路&#xff1a; 当该数字不为 0 且最低位为 0 时&#xff0c;反转两次不是本身&#xff0c;返回 false&#xff0c;否则&#xff0c;反…

shiro的优点

shiro是一个强大的java安全框架&#xff0c;它的优点有以下&#xff1a; shiro就是权限管理&#xff1a;包括两部分&#xff1a;身份验证、授权 一、它提供了身份验证、授权、密码和会话管理等功能&#xff0c;可以满足各种应用程序的安全需求。 身份认证就是&#xff1a;验证是…

Python面试:技巧与实践

Python面试&#xff1a;技巧与实践 在当今的IT行业中&#xff0c;Python已经成为了一种非常流行的编程语言。它以其简洁明了的语法、强大的库支持和广泛的应用领域&#xff0c;吸引了无数的开发人员。然而&#xff0c;掌握Python并不是一件容易的事情&#xff0c;尤其是在面试…

mybatisPlus入门篇

文章目录 初窥门径1.1 初识MybatisPlus1.2 MybatisPlus的特性1.3 MybatisPlus的架构模型 入门案例2.1 准备相关开发环境2.2 搭建springboot工程2.3 创建数据库2.4 引入相关依赖2.5 创建实体类2.6 集成MybatisPlus2.7 单元测试2.8 springboot日志优化 初窥门径 1.1 初识Mybatis…

ChatGPT把python 的import和from讲明白了

文章目录 1、import&#xff1a;import关键字用于导入整个模块&#xff0c;您可以使用该模块中的所有对象。语法如下&#xff1a;2、from ... import ...&#xff1a;from ... import ... 语法用于从模块中导入特定的对象&#xff0c;而不是导入整个模块。您可以通过这种方式选…

基于Web的智慧景区GIS三维可视化运营系统

随着人民生活水平的提高和旅游产品的丰富多样&#xff0c;我国人民对于旅游的需求逐渐从“走过场”转变为“品质体验”。 建设背景 随着互联网、大数据、人工智能等新技术在旅游领域的应用&#xff0c;以数字化、网络化、智能化为特征的智慧旅游成为旅游业高质量发展新动能。…

【雕爷学编程】Arduino动手做(93)--- 0.96寸OLED液晶屏模块11

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…

Django模板语法和请求

1、在django关于模板文件加载顺序 创建的django项目下会有一个seeetings.py的文件 如果在seeetings.py 中加了 os.path.join(BASE_DIR,‘templates’)&#xff0c;如果是pycharm创建的django项目会加上&#xff0c;就会默认先去根目录找templates目录下的html文件&#xff0c…

安全学习DAY06_抓包技术-HTTPHTTPS

抓包技术-HTTP&HTTPS HTTP&HTTPS抓包针对Web&APP&小程序&PC应用等 本节目的&#xff1a; 掌握几种抓包工具证书安装操作掌握几种HTTP&HTTPS抓包工具的使用学会Web&#xff0c;APP&#xff0c;小程序&#xff0c;PC应用等抓包了解本节课抓包是针对哪些…

EAP设备自动化控制系统在设备数采和控制方面的优势

随着科技的不断进步和工业自动化的发展&#xff0c;EAP&#xff08;Equipment Automation Program&#xff09;设备自动化控制系统在各个行业中扮演着越来越重要的角色。作为连接MES&#xff08;Manufacturing Execution System&#xff09;和设备层的沟通桥梁&#xff0c;EAP系…

Chatgpt Web API 创建对话,免费,不计token数量,模仿网页提交对话

Chatgpt API 是收费的&#xff0c;按token使用量计费 Chatgpt Web API 免费的&#xff0c;只要有账号就可以使用。 curl https://chat.openai.com/backend-api/conversation \-H authority: chat.openai.com \-H accept: text/event-stream \-H accept-language: zh-CN,zh;q…

了解Unity编辑器之组件篇Mesh(三)

Mesh&#xff1a;是一种三维模型的表示形式&#xff0c;它由一系列顶点、三角形&#xff08;或其他多边形&#xff09;和相关属性组成。Mesh用于表示物体的外观和形状&#xff0c;它是可见物体的基本组成部分。通过操作Mesh&#xff0c;开发者可以实现各种视觉效果、物理模拟和…

Swagger如何将接口分组?

如果不分组管理端和用户端是混在一起的&#xff0c;不好查看。 在 docket创建的时候&#xff0c;加一行分组代码。 .groupName("用户端接口") 如果你加完之后&#xff0c;重启&#xff0c;报错。 这应该是你访问的网址仍是旧网址。 http://localhost:8080/doc.html#…

MySQL高可用之MHA集群

目录 一、MHA概述 1.1 什么是 MHA 1.2 MHA 的组成 1.3 MHA 的特点 二、MySQL MHA搭建准备 2.1 实验思路 2.2 实验准备 MHA一主两从高可用集群示意图&#xff1a; 三、搭建 MySQL MHA 3.1 配置主从复制 1、四台服务器都关闭防火墙 2、修改 Master、Slave1、Slave2 节…

解析 WebRTC

1 主要的类 RTCPeerConnection&#xff1a;用于建立点对点连接&#xff0c;实现音视频传输和数据通信。它负责协商媒体传输的参数、处理ICE候选项以及创建和管理媒体通道。 RTCDataChannel&#xff1a;用于在两个端点之间传输任意类型的数据。通过该通道&#xff0c;可以实现…

azure-cognitiveservices-speech api error while using with AWS Lambda

Azure 语音评估服务 Cancellation Reason 初始化平台失败 1.在mac上安装 pip install azure-cognitiveservices-speech1.30.0正常运行没有问题&#xff0c;服务部署到docker 容器中后调用Azure语音评估服务报错 Cancellation Reason 初始化平台失败 2.解决方案&#xff0c;变…

汇编语言(第4版)实验7 寻址方式在结构化数据访问中的应用

参考答案&#xff1a; ①经分析&#xff0c;完整程序代码如下。 assume cs:codesg data segmentdb 1975,1976,1977,1978,1979,1980,1981,1982,1983db 1984,1985,1986,1987,1988,1989,1990,1991,1992db 1993,1994,1995dd 16,22,382,1356,2390,8000,16000,24486,50065,97479,140…