【Flink metric(2)】chunjun的metric系统是怎么设计的:如何注册metric、如何同步metric

文章目录

  • 一. 管理(注册、同步)metric
    • 1. BaseRichInputFormat对metric的管理
    • 2. 通过BaseMetric管理metric
  • 二. AccumulatorCollector:metric同步
    • 1. 启动线程池,周期性更新metric信息
    • 2. 获取全局指标、本地指标
    • 3. 资源回收
  • 三. 小结

chunjun涉及到的操作有注册metric、同步metric、metric判断。

一. 管理(注册、同步)metric

1. BaseRichInputFormat对metric的管理

注册metric的操作主要是在open方法中,主要逻辑如下:


this.context = (StreamingRuntimeContext) getRuntimeContext();  //获取flink全局参数,用于通过脏数据管理器的参数配置
ExecutionConfig.GlobalJobParameters params =  context.getExecutionConfig().getGlobalJobParameters();  
DirtyConfig dc = DirtyConfUtil.parseFromMap(params.toMap());  
//注册脏数据管理器
this.dirtyManager = new DirtyManager(dc, this.context);  。。。if (!initialized) {  //初始化累加器,initAccumulatorCollector();  //初始化行大小initRowSizeCalculator();  //初始统计metricinitStatisticsAccumulator();  //初始化消费速率initByteRateLimiter();  //初始化cp相关文件initRestoreInfo();  initialized = true;  
}

nbsp;
开启metric同步线程,每taskmanager心跳+1s同步一次数据。

private void initAccumulatorCollector() {  String lastWriteLocation =  String.format("%s_%s", Metrics.LAST_WRITE_LOCATION_PREFIX, indexOfSubTask);  String lastWriteNum =  String.format("%s_%s", Metrics.LAST_WRITE_NUM__PREFIX, indexOfSubTask);  accumulatorCollector =  new AccumulatorCollector(  context,  Arrays.asList(  Metrics.NUM_READS,  Metrics.READ_BYTES,  Metrics.READ_DURATION,  Metrics.WRITE_BYTES,  Metrics.NUM_WRITES,  lastWriteLocation,  lastWriteNum)); accumulatorCollector.start();  
}

注册如下指标

  • 先从RuntimeContext获取指标,如果没有则注册meitric到jobmanager中。
  • 或者通过MetricGroup()进行注册,在BaseMetric类中有详细逻辑,见下节分析。
private void initStatisticsAccumulator() {  numReadCounter = getRuntimeContext().getLongCounter(Metrics.NUM_READS);  bytesReadCounter = getRuntimeContext().getLongCounter(Metrics.READ_BYTES);  durationCounter = getRuntimeContext().getLongCounter(Metrics.READ_DURATION);  inputMetric = new BaseMetric(getRuntimeContext());  inputMetric.addMetric(Metrics.NUM_READS, numReadCounter, true);  inputMetric.addMetric(Metrics.READ_BYTES, bytesReadCounter, true);  inputMetric.addMetric(Metrics.READ_DURATION, durationCounter);  inputMetric.addDirtyMetric(Metrics.DIRTY_DATA_COUNT, this.dirtyManager.getConsumedMetric());  inputMetric.addDirtyMetric(  Metrics.DIRTY_DATA_COLLECT_FAILED_COUNT,  this.dirtyManager.getFailedConsumedMetric());  
}

 

openInputFormat中可以定义:用户的merticReporter


...
if (useCustomReporter()) {  customReporter =  DataSyncFactoryUtil.discoverMetric(  config, getRuntimeContext(), makeTaskFailedWhenReportFailed());  customReporter.open();  
}

 

2. 通过BaseMetric管理metric

BaseMetric提供注册metric group、注册metric 以及等待metric同步(到jobmanager)等能力。

BaseMetric主要有addMetric、addDirtyMetric、waitForReportMetrics、getChunjunMetricGroup等方法,其中构造方法中,通过runtimeContext注册了chunjunMetricGroup、chunjunDirtyMetricGroup。具体逻辑如下:

注册metric group:


public BaseMetric(RuntimeContext runtimeContext) {  //获取全局变量:DELAY_PERIOD_MILLExecutionConfig.GlobalJobParameters params =  runtimeContext.getExecutionConfig().getGlobalJobParameters();  Map<String, String> confMap = params.toMap();  this.DELAY_PERIOD_MILL =  Long.parseLong(  String.valueOf(confMap.getOrDefault(DELAY_PERIOD_MILL_KEY, "20000")));//注册metric group:chunjun、outputchunjunMetricGroup =  runtimeContext  .getMetricGroup()  .addGroup(  Metrics.METRIC_GROUP_KEY_CHUNJUN,  Metrics.METRIC_GROUP_VALUE_OUTPUT);  //注册metric group:DirtyData、outputchunjunDirtyMetricGroup =  chunjunMetricGroup.addGroup(  Metrics.METRIC_GROUP_KEY_DIRTY, Metrics.METRIC_GROUP_VALUE_OUTPUT);  
}

 
注册metric:有gauge、meter等metric类型。

//注册指标
public void addMetric(String metricName, LongCounter counter) {  addMetric(metricName, counter, false);  
}public void addMetric(String metricName, LongCounter counter, boolean meterView) {  metricCounters.put(metricName, counter);  chunjunMetricGroup.gauge(metricName, new SimpleAccumulatorGauge<>(counter)); if (meterView) {  chunjunMetricGroup.meter(  metricName + Metrics.SUFFIX_RATE, new SimpleLongCounterMeterView(counter, 20));  }  
}  //metricName: metric名字
//counter: 具体计数的counter
public void addDirtyMetric(String metricName, LongCounter counter) {  metricCounters.put(metricName, counter);  chunjunDirtyMetricGroup.gauge(metricName, new SimpleAccumulatorGauge<>(counter));  
}

 

等待metric指标:当taskslot结束之后,需要等待一段时间把未同步的指标同步给jobmanager。

public void waitForReportMetrics() {  try {  Thread.sleep(DELAY_PERIOD_MILL);  } catch (InterruptedException e) {  ThreadUtil.sleepMilliseconds(DELAY_PERIOD_MILL);  log.warn("Task thread is interrupted");  }  
}

 

二. AccumulatorCollector:metric同步

AccumulatorCollector实现了周期性合并并获取全局metric,主流程是:每taskmanager心跳后+1s,同步一次全局的metric给taskslot。

AccumulatorCollector有start、close、collectAccumulator、getAccumulatorValue、getLocalAccumulatorValue等方法。

1. 启动线程池,周期性更新metric信息

/** 启动线程池,周期性更新累加器信息 */  
public void start() {  scheduledExecutorService.scheduleAtFixedRate(  this::collectAccumulator, 0, period, TimeUnit.MILLISECONDS);  
}

收集累加器信息,具体逻辑是

调用requestJob(向jobmanager进行rpc请求,合并并获取全局metric),同步全局metric。具体步骤是:

  • 每个slot将自己的metric同步到全局(jobmanager)中
  • 脏数据同步给每个slot:拿到全局的脏数据metric传给taskslot,可在每个taskslot中判断处理的脏数据是否超过全局设置的。
  • 如果没有注册metric,则获取maxValue指标,这里主要是jdbc用到了此指标。
public void collectAccumulator() {  CompletableFuture<ExecutionGraphInfo> executionGraphInfoCompletableFuture =  gateway.requestJob(Time.seconds(10));  ExecutionGraphInfo executionGraphInfo;  try {  executionGraphInfo = executionGraphInfoCompletableFuture.get();  } catch (Exception e) {  // 限制最大出错次数,超过最大次数则使任务失败,如果不失败,统计数据没有及时更新,会影响速率限制,错误控制等功能  collectErrorTimes++;  if (collectErrorTimes > MAX_COLLECT_ERROR_TIMES) {  // 主动关闭线程和资源,防止异常情况下没有关闭  close();  throw new RuntimeException(  "The number of errors in updating statistics data exceeds the maximum limit of 100 times. To ensure the correctness of the data, the task automatically fails");  }  return;  }  StringifiedAccumulatorResult[] accumulatorResult =  executionGraphInfo.getArchivedExecutionGraph().getAccumulatorResultsStringified();  for (StringifiedAccumulatorResult result : accumulatorResult) {  ValueAccumulator valueAccumulator = valueAccumulatorMap.get(result.getName());  if (valueAccumulator != null) {  valueAccumulator.setGlobal(Long.parseLong(result.getValue()));  } else if (result.getName().equals(Metrics.MAX_VALUE)) {  rdbMaxFuncValue = result.getValue();  }  }  
}

 

2. 获取全局指标、本地指标

获取指定累加器信息

public long getAccumulatorValue(String name, boolean needWaited) {  if (needWaited) {  waited();  }  ValueAccumulator valueAccumulator = valueAccumulatorMap.get(name);  if (valueAccumulator == null) {  return 0;  }  return valueAccumulator.getGlobal();  
}

获取每个subtask的本地指标

/**  * 根据名称获取指定累加器的本地value  * * @param name 累加器指标名称  * @return  */
public long getLocalAccumulatorValue(String name) {  ValueAccumulator valueAccumulator = valueAccumulatorMap.get(name);  if (valueAccumulator == null) {  return 0;  }  return valueAccumulator.getLocal().getLocalValue();  
}

 

3. 资源回收

/** 关闭线程池 */  
public void close() {  if (scheduledExecutorService != null  && !scheduledExecutorService.isShutdown()  && !scheduledExecutorService.isTerminated()) {  scheduledExecutorService.shutdown();  }  
}

 

三. 小结

我们大致了解了chunjun

  • 在什么时机注册metric指标:在BaseRichInputFormat中的open方法中,在连接器消费数据前,进行相关metric的注册;
  • chunjun提供了管理(注册、指标更新、等待metric同步等)metric的基类:BaseMetric;
  • 周期获取全局metric:以便每个subtask进行metric的指标判断; 等metric管理能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/34568.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【产品经理】订单处理8-智能分仓

在电商ERP系统中&#xff0c;通常智能分仓策略是系统中最重要的功能之一&#xff0c;大公司若仓库较多时&#xff0c;智能分仓策略中也会加入大数据团队&#xff0c;通过算法来计算最优仓库。 本次讲解的智能分仓适用于中小公司&#xff0c;适合拥有2个以上10个以下仓库的公司…

AGI 之 【Hugging Face】 的[ 简单介绍 ] [ 基础环境搭建 ] 的简单整理

AGI 之 【Hugging Face】 的[ 简单介绍 ] [ 基础环境搭建 ] 的简单整理 目录 AGI 之 【Hugging Face】 的[ 简单介绍 ] [ 基础环境搭建 ] 的简单整理 一、简单介绍 二、Hugging Face 三、环境搭建 python 环境的搭建 Pycharm 环境搭建 1、下载 Pycharm 安装包 2、安装 …

装机必备一WinRAR安装使用以及常见问题

WinRAR是一款功能强大的压缩包管理器&#xff0c;支持多种压缩格式&#xff0c;如RAR、ZIP等。作为一款经典且广泛使用的压缩软件&#xff0c;WinRAR不仅在文件压缩率和速度方面表现出色&#xff0c;还提供了备份数据、缩减电子邮件附件大小以及解压缩网络下载文件等功能。 为…

数据结构与算法:回溯算法约束条件:剪枝详解、示例(C#、C++)与回溯典型例题详解

文章目录 一、约束条件二、剪枝三、典型例题四、常用术语五、示例N 皇后问题 C# 示例N 皇后问题 C 示例 六、常见用用回溯算法解决的问题汇总组合问题&#xff1a;图论问题&#xff1a;棋盘游戏问题&#xff1a;优化问题&#xff1a;调度问题&#xff1a;其他问题&#xff1a; …

How to persist LangChain conversation memory (save and load)

题意&#xff1a;如何持久化 LangChain 对话记忆&#xff08;保存和加载&#xff09; 问题背景&#xff1a; Im creating a conversation like so: 我正在创建一个对话&#xff0c;如下所示&#xff1a; llm ChatOpenAI(temperature0, openai_api_keyOPENAI_API_KEY,…

【CTF】BUU BURP COURSE 11

打开靶机之后&#xff0c;显示只能在本地打开&#xff08;一度以为靶机出问题&#xff09;。 解题步骤&#xff1a; 1.分析请求包信息 2.构建本地请求IP X-Real-IP&#xff1a;记录真实客户端IP地址信息&#xff1b; X-Forward-for&#xff1a;记录了请求IP到目标ip所经历的…

新型基坑气膜:施工开挖的得力干将—轻空间

随着城市建设的加速推进&#xff0c;施工过程中的环境问题日益受到关注。新型基坑气膜以其卓越的防尘、降噪、节能和防火功能&#xff0c;成为施工开挖领域中的得力干将&#xff0c;极大地提升了绿色施工的水平。 基坑气膜的作用 基坑气膜在施工现场形成了一个完全封闭的作业空…

Java EE之Servlet

Servlet 是 Java EE&#xff08;Java Platform, Enterprise Edition&#xff09;规范中的一个技术&#xff0c;是服务器端 Java 程序&#xff0c;用于处理客户端请求并生成动态响应。Servlet 通常用于构建 Web 应用程序&#xff0c;并与 HTTP 协议紧密集成。以下是对 Servlet 的…

JavaWeb系列七: 动态WEB开发核心(Servlet) 下

韩老师学生 ServletConfigServletContext网站计数器 HttpServletRequest细节1细节2细节3 Dispathcer请求转发应用实例请求转发细节和注意事项习题 HttpServletResponse请求重定向请求重定向注意事项动态获取到application context练习题 ServletConfig ●ServletConfig基本介绍…

docker --restart 容器重启策略

官网连接&#xff1a;https://docs.docker.com/config/containers/start-containers-automatically/ 当容器退出后&#xff0c;或者docker程序重启了&#xff0c;容器是否要重启&#xff0c;可以用重启策略控制。 用docker run命令的时候&#xff0c;用--restart 设置容器重启…

1.文件上传漏洞渗透及防御(OWASP实战训练)

1.文件上传漏洞渗透及防御&#xff08;OWASP实战训练&#xff09; OWASPupload上传漏洞实验一&#xff1a;低安全模式下&#xff0c;上传任意类型的文件&#xff0c;文件大小不受限制实验二&#xff0c;安全级别调整将其变为中等安全级别实验三&#xff1a;将其设为高安全级别 …

【教程】如何一步一步训练一个SOM神经网络-自组织竞争神经网络(Self-organizing Feature Map)

本文来自《老饼讲解-BP神经网络》https://www.bbbdata.com/ 目录 一、什么是SOM神经网络1.1.SOM神经网络有什么用1.2.SOM神经网络是如何聚类的 二、如何训练一个SOM神经网络2.1. 训练一个SOM神经网络的代码示例2.2. 如何查看SOM神经网络的聚类中心 SOM神经网络全称为自组织竞争…

《系统架构设计师教程(第2版)》第11章-未来信息综合技术-05-数字孪生体(Digital Twin)技术概述

文章目录 1. 数字孪生体发展历程1.2 准备期1.2 概念产生期1.3 领先应用期1.4 深度开发和大规模扩展应用期 2. 数字孪生体的定义3. 数字孪生体的关键技术3.1 建模3.2 仿真技术3.3 其他技术 4. 数字孪生体的应用4.1 制造领域4.2 全产业链上的应用4.3 城市4.4 战场 1. 数字孪生体发…

解决Java中的NoSuchAlgorithmException异常的技术实践

解决Java中的NoSuchAlgorithmException异常的技术实践 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在Java编程中&#xff0c;NoSuchAlgorithmException异常…

Redis-主从复制-测试主从模式下的读写操作

文章目录 1、在主机6379写入数据2、在从机6380上写数据报错3、从机只能读数据&#xff0c;不能写数据 1、在主机6379写入数据 127.0.0.1:6379> keys * (empty array) 127.0.0.1:6379> set uname jim OK 127.0.0.1:6379> get uname "jim" 127.0.0.1:6379>…

【机器学习】python之人工智能应用篇——3D生成技术

在Python中&#xff0c;人工智能&#xff08;AI&#xff09;与3D生成技术的结合可以体现在多个方面&#xff0c;比如使用AI算法来优化3D模型的生成、通过机器学习来预测3D模型的属性&#xff0c;或者利用深度学习来生成全新的3D内容。然而&#xff0c;直接通过AI生成完整的3D模…

单片机IO口模拟串口实现原理

参考链接 1、使用GPIO来模拟UART 2、STM32之IO模拟串口篇 1、工作原理 单片机IO口模拟串口的实现原理通常是通过软件来模拟串行通信的传输和接收。下面说明了单片机IO口模拟串口的实现原理&#xff1a; 配置IO口&#xff1a;选择两个IO口作为模拟串口的发送和接收引脚。通常…

go语言:数据库sql查询保存任意数量字段的数据

1、查询任意列数的表&#xff0c;并输出 func search() {rows, _ : db.Query("select * from users") // 查询数据columns, _ : rows.Columns() // 查询到的字段名列表values : make([]any, len(columns)) // 根据字段数量&#xff0c;创建接收…

优雅的参数校验——Guava库中的Preconditions

Guava库中的Preconditions类提供了一些静态方法&#xff0c;用于在程序中执行参数的检查和验证。这些方法在编写健壮和可维护的代码时非常有用 checkArgument(boolean expression)&#xff1a; 作用&#xff1a;用于验证方法的参数是否满足某个条件。如果条件不满足&#xff0c…

rancher快照备份至S3

巧用rancher的S3快照备份功能&#xff0c;快速实现集群复制、集群转移、完全崩溃后的极限修复 1.进入集群管理&#xff0c;在对应的集群菜单后&#xff0c;点击编辑配置 2.选择ETCD&#xff0c;启用&#xff0c;Backup Snapshots to S3选项 并填入你的minio 3 配置成功后 手…