KnowStreaming系列教程第三篇——调度任务模块

前一篇文章KnowStreaming系列教程第二篇——项目整体架构分析_诸葛子房_的博客-CSDN博客

讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步

一、调度模块代码主要在km-task里面

public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);@Overridepublic void onApplicationEvent(ClusterPhyAddedEvent event) {LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());Long now = System.currentTimeMillis();// 交由KS自定义的线程池,异步执行任务FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));}private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {ClusterPhy tempClusterPhy = null;// 120秒内无加载进来,则直接返回退出while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);if (tempClusterPhy != null) {break;}BackoffUtils.backoff(1000);}if (tempClusterPhy == null) {return;}// 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定BackoffUtils.backoff(5000);final ClusterPhy clusterPhy = tempClusterPhy;// 集群执行集群元信息同步List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {try {dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);} catch (Exception e) {// ignore}}// 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定BackoffUtils.backoff(5000);// 集群集群指标采集List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {try {dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);} catch (Exception e) {// ignore}}}
}

通过监听集群添加事件,触发元数据同步和指标采集调度任务

具体实现可参考:

spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674

二、调度任务分布式系统如何做到单节点运行,避免多台机器调度

AbstractDispatchTask 里面的execute 方法通过实现任务分配
public TaskResult execute(JobContext jobContext) {try {long triggerTimeUnitMs = System.currentTimeMillis();// 获取所有的任务List<E> allTaskList = this.listAllTasks();if (ValidateUtils.isEmptyList(allTaskList)) {LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);return TaskResult.SUCCESS;}// 计算当前机器需要执行的任务List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());if (ValidateUtils.isEmptyList(allTaskList)) {LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);return TaskResult.SUCCESS;}// 进行任务处理TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs);//组装信息TaskResult taskResult = new TaskResult();taskResult.setCode(ret.getCode());taskResult.setMessage(ConvertUtil.list2String(subTaskList, ","));return taskResult;} catch (Exception e) {LOGGER.error("process task failed, taskName:{}", taskName, e);return new TaskResult(TaskResult.FAIL_CODE, e.toString());}}

对应代码解释如下:

参考:

https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/9598.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mybatisPlus入门篇

文章目录 初窥门径1.1 初识MybatisPlus1.2 MybatisPlus的特性1.3 MybatisPlus的架构模型 入门案例2.1 准备相关开发环境2.2 搭建springboot工程2.3 创建数据库2.4 引入相关依赖2.5 创建实体类2.6 集成MybatisPlus2.7 单元测试2.8 springboot日志优化 初窥门径 1.1 初识Mybatis…

ChatGPT把python 的import和from讲明白了

文章目录 1、import&#xff1a;import关键字用于导入整个模块&#xff0c;您可以使用该模块中的所有对象。语法如下&#xff1a;2、from ... import ...&#xff1a;from ... import ... 语法用于从模块中导入特定的对象&#xff0c;而不是导入整个模块。您可以通过这种方式选…

基于Web的智慧景区GIS三维可视化运营系统

随着人民生活水平的提高和旅游产品的丰富多样&#xff0c;我国人民对于旅游的需求逐渐从“走过场”转变为“品质体验”。 建设背景 随着互联网、大数据、人工智能等新技术在旅游领域的应用&#xff0c;以数字化、网络化、智能化为特征的智慧旅游成为旅游业高质量发展新动能。…

【雕爷学编程】Arduino动手做(93)--- 0.96寸OLED液晶屏模块11

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…

Django模板语法和请求

1、在django关于模板文件加载顺序 创建的django项目下会有一个seeetings.py的文件 如果在seeetings.py 中加了 os.path.join(BASE_DIR,‘templates’)&#xff0c;如果是pycharm创建的django项目会加上&#xff0c;就会默认先去根目录找templates目录下的html文件&#xff0c…

安全学习DAY06_抓包技术-HTTPHTTPS

抓包技术-HTTP&HTTPS HTTP&HTTPS抓包针对Web&APP&小程序&PC应用等 本节目的&#xff1a; 掌握几种抓包工具证书安装操作掌握几种HTTP&HTTPS抓包工具的使用学会Web&#xff0c;APP&#xff0c;小程序&#xff0c;PC应用等抓包了解本节课抓包是针对哪些…

EAP设备自动化控制系统在设备数采和控制方面的优势

随着科技的不断进步和工业自动化的发展&#xff0c;EAP&#xff08;Equipment Automation Program&#xff09;设备自动化控制系统在各个行业中扮演着越来越重要的角色。作为连接MES&#xff08;Manufacturing Execution System&#xff09;和设备层的沟通桥梁&#xff0c;EAP系…

Chatgpt Web API 创建对话,免费,不计token数量,模仿网页提交对话

Chatgpt API 是收费的&#xff0c;按token使用量计费 Chatgpt Web API 免费的&#xff0c;只要有账号就可以使用。 curl https://chat.openai.com/backend-api/conversation \-H authority: chat.openai.com \-H accept: text/event-stream \-H accept-language: zh-CN,zh;q…

了解Unity编辑器之组件篇Mesh(三)

Mesh&#xff1a;是一种三维模型的表示形式&#xff0c;它由一系列顶点、三角形&#xff08;或其他多边形&#xff09;和相关属性组成。Mesh用于表示物体的外观和形状&#xff0c;它是可见物体的基本组成部分。通过操作Mesh&#xff0c;开发者可以实现各种视觉效果、物理模拟和…

Swagger如何将接口分组?

如果不分组管理端和用户端是混在一起的&#xff0c;不好查看。 在 docket创建的时候&#xff0c;加一行分组代码。 .groupName("用户端接口") 如果你加完之后&#xff0c;重启&#xff0c;报错。 这应该是你访问的网址仍是旧网址。 http://localhost:8080/doc.html#…

MySQL高可用之MHA集群

目录 一、MHA概述 1.1 什么是 MHA 1.2 MHA 的组成 1.3 MHA 的特点 二、MySQL MHA搭建准备 2.1 实验思路 2.2 实验准备 MHA一主两从高可用集群示意图&#xff1a; 三、搭建 MySQL MHA 3.1 配置主从复制 1、四台服务器都关闭防火墙 2、修改 Master、Slave1、Slave2 节…

汇编语言(第4版)实验7 寻址方式在结构化数据访问中的应用

参考答案&#xff1a; ①经分析&#xff0c;完整程序代码如下。 assume cs:codesg data segmentdb 1975,1976,1977,1978,1979,1980,1981,1982,1983db 1984,1985,1986,1987,1988,1989,1990,1991,1992db 1993,1994,1995dd 16,22,382,1356,2390,8000,16000,24486,50065,97479,140…

微服务笔记---Nacos集群搭建

微服务笔记---Nacos集群搭建 Nacos集群搭建1.集群结构图2.搭建集群2.1.初始化数据库2.2.下载nacos2.3.配置Nacos2.4.启动2.5.nginx反向代理2.6.优化 Nacos集群搭建 1.集群结构图 官方给出的Nacos集群图&#xff1a; 其中包含3个nacos节点&#xff0c;然后一个负载均衡器代理…

Clion开发STM32之W5500系列(DNS服务封装)

概述 在w5500基础库中进行封装&#xff0c;通过域名的方式获取实际的ip地址用于动态获取ntp的ip地址 DNS封装 头文件 /*******************************************************************************Copyright (c) [scl]。保留所有权利。****************************…

7.26作业

用fread和fwrite实现文件拷贝 #include<stdio.h> #include<string.h> #include<stdlib.h> #include<head.h> int main(int argc, const char *argv[]) {FILE* fpfopen("./11.txt","w");FILE* fp1fopen("./12.txt",&quo…

idea的Plugins中搜索不到插件

1、ctrlalts 打开设置 ; 2、搜索框输入plugins; 3、点击plugins; 4、点齿轮按钮&#xff0c;选择HTTP Proxy settings; 如下操作&#xff1a; 5、刷新DNS&#xff0c;ipconfig /flushdns 6、重新打开idea 的plugins 插件列表出来了

无涯教程-jQuery - jQuery.ajax( options )方法函数

jQuery.ajax(options)方法使用HTTP请求加载远程页面。 $.ajax()返回它创建的XMLHttpRequest。在大多数情况下&#xff0c;您不需要该对象直接进行操作&#xff0c;但是如果您需要手动中止请求&#xff0c;则可以使用该对象。 jQuery.ajax( options ) - 语法 $.ajax( options…

51单片机--LCD1602

LCD1602的介绍 LCD1602是一种字符型液晶显示模块&#xff0c;通常用于嵌入式系统、单片机等领域。它由LCD&#xff08;液晶显示屏&#xff09;、HD44780控制驱动主电路及其扩展驱动电路、少量电阻、电容元件等组成。 LCD1602具有以下特点和功能&#xff1a; 显示能力&#xf…

面试总结-Redis篇章(八)——Redis分布式锁

JAVA 面试总结-Redis分布式锁 模拟抢券场景 通过下面方法添加Synchronized锁来防止上述情况&#xff0c;如果上面是单体服务没有问题&#xff0c;但是如果项目是集群部署&#xff0c;会出现下面的问题&#xff0c;因为Synchronized是属于本地的锁端口8080和8081同时访问&#x…

Qt 第二讲 登录框完善,登录成功后,进入新的界面;新建工程文件,默认提供的代码注释信息;前两讲思维导图

一&#xff0c;代码完善 头文件 #ifndef ZUOYE_H #define ZUOYE_H#include <QWidget> #include <QDebug> #include <QIcon> #include <QPushButton> #include <QLineEdit> #include <QLabel> //#include <QTextToSpeech>QT_BEGIN_…