45、Flink 的指标体系介绍及验证(3)- 完整版

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
31、Flink的SQL Gateway介绍及示例
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
35、Flink 的 Formats 之CSV 和 JSON Format
36、Flink 的 Formats 之Parquet 和 Orc Format
41、Flink之Hive 方言介绍及详细示例
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版
46、Flink 的table api与sql之配项列表及示例


文章目录

  • Flink 系列文章
  • 一、Flink 指标体系
    • 1、Registering metrics 注册指标
      • 1)、指标类型
      • 2)、计数器
      • 3)、Gauge
      • 4)、Histogram
      • 5)、Meter
    • 2、Scope 范围
      • 1)、用户范围
      • 2)、系统范围System Scope
      • 3)、所有变量列表
      • 4)、用户变量
    • 3、Reporter
    • 4、System metrics
      • 1)、CPU
      • 2)、Memory
      • 3)、Threads
      • 4)、GarbageCollection
      • 5)、ClassLoader
      • 6)、Network
      • 7)、Default shuffle service
      • 8)、Cluster
      • 9)、Availability
      • 10)、Checkpointing
      • 11)、State Access Latency
      • 12)、RocksDB
      • 13)、State Changelog
      • 14)、IO
      • 15)、Connectors
      • 16)、System resources
      • 17)、预测执行
    • 5、End-to-End latency tracking 延迟跟踪
    • 6、State access latency tracking 延迟跟踪
    • 7、REST API integration
      • 1)/jobmanager/metrics示例
      • 2) taskmanagers/<taskmanagerid>/metrics?get=metric1,metric2示例
      • 3)/taskmanagers/metrics?get=metric1,metric2示例
      • 4)/taskmanagers/metrics?get=metric1,metric2&agg=min,max示例
    • 8、Dashboard integration


本文简单的介绍了Flink 的指标体系内容,即指标类型以及四种类型的代码实现示例、scope、系统指标、报告、跟踪、api与dashboard集成。
本专题分为三部分,即:
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版

本文依赖nc、flink能正常使用。
本文分为8个部分,即指标注册、scope、系统指标、报告、跟踪、api与dashboard集成。
本文的示例是在Flink 1.17版本中运行。

一、Flink 指标体系

Flink暴露了一个度量系统,允许收集度量并将其公开给外部系统。
本文涉及的maven依赖

	<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency></dependencies>

1、Registering metrics 注册指标

通过调用getRuntimeContext().getMetricGroup(),您可以从任何扩展RichFunction的用户函数访问度量系统。此方法返回一个MetricGroup对象,您可以在该对象上创建和注册新度量。

1)、指标类型

Flink支持计数器、仪表盘、柱状图和计量表。Counters, Gauges, Histograms and Meters.

2)、计数器

计数器是用来统计数量的。当前值可以是in-或使用 inc()/inc(long n)或dec()/dec(long n)增减。您可以通过调用MetricGroup上的 counter(String name)来创建和注册计数器。
本示例提供了多种实现方式,供参考。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestMetricsDemo {//	public class LineMapper extends RichMapFunction<String, String> {
//		private transient Counter counter;
//
//		@Override
//		public void open(Configuration config) {
//			this.counter = getRuntimeContext().getMetricGroup().counter("result2LineCounter");
//		}
//
//		@Override
//		public String map(String value) throws Exception {
//			this.counter.inc();
//			return value;
//		}
//	}public static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformationDataStream<Tuple2<String, Integer>> result = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(word);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}).keyBy(t -> t.f0).sum(1);//		SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = lines.map(new RichMapFunction<String, Tuple2<Integer, Integer>>() {
//
//			@Override
//			public Tuple2<Integer, Integer> map(String value) throws Exception {
//				int subTaskId = getRuntimeContext().getIndexOfThisSubtask();// 子任务id/分区编号
//				return new Tuple2(subTaskId, 1);
//			}
//			// 按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
//		}).keyBy(t -> t.f0).sum(1);// RichFlatMapFunction<IN, OUT>// Tuple3<String, Long, Integer> 输入的字符串,行数,统计单词的总数DataStream<Tuple3<String, Long, Integer>> result2 = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Long>>() {
//			private transient Counter counter;private long result2LineCounter = 0;@Overridepublic void open(Configuration config) {
//				this.counter = getRuntimeContext().getMetricGroup().counter("result2LineCounter:");result2LineCounter = getRuntimeContext().getMetricGroup().counter("result2LineCounter:").getCount();}@Overridepublic void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
//				this.counter.inc();result2LineCounter++;System.out.println("计数器行数:" + result2LineCounter);String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, result2LineCounter));}}}).map(new MapFunction<Tuple2<String, Long>, Tuple3<String, Long, Integer>>() {@Overridepublic Tuple3<String, Long, Integer> map(Tuple2<String, Long> value) throws Exception {
//				Tuple3<String, Long, Integer> t = Tuple3.of(value.f0, value.f1, 1);return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");result2.print("result2:");env.execute();}public static void main(String[] args) throws Exception {test1();
//		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//		env.setParallelism(1);
//		DataStream<String> input = env.fromElements("a", "b", "c", "a", "b", "c");
//
//		input.keyBy(value -> value).map(new RichMapFunction<String, String>() {
//			private long count = 0;
//
//			@Override
//			public void open(Configuration parameters) throws Exception {
                super.open(parameters);
//				count = getRuntimeContext().getMetricGroup().counter("myCounter").getCount();
//			}
//
//			@Override
//			public String map(String value) throws Exception {
//				count++;
//				return value + ": " + count;
//			}
//		}).print();
//
//		env.execute("Flink Count Counter Example");}}
///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:1
result:> (hello,1)
result2:> (hello,1,1)
result:> (123,1)
result2:> (123,1,1)
计数器行数:2
result2:> (alan,2,1)
result:> (alan,1)
result2:> (flink,2,1)
result:> (flink,1)
result2:> (good,2,1)
result:> (good,1)
计数器行数:3
result:> (alan_chan,1)
result2:> (alan_chan,3,1)
result:> (hi,1)
result2:> (hi,3,1)
result:> (flink,2)
result2:> (flink,2,2)

或者,您也可以使用自己的Counter实现:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestMetricsDemo {public static void test2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// Tuple3<String, Long, Integer> 输入的字符串,行数,统计单词的总数DataStream<Tuple3<String, Long, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Long>>() {private transient Counter counter;@Overridepublic void open(Configuration config) {this.counter = getRuntimeContext().getMetricGroup().counter("result2LineCounter", new AlanCustomCounter());}@Overridepublic void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {this.counter.inc();
//				result2LineCounter++;System.out.println("计数器行数:" + this.counter.getCount());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, this.counter.getCount()));}}}).map(new MapFunction<Tuple2<String, Long>, Tuple3<String, Long, Integer>>() {@Overridepublic Tuple3<String, Long, Integer> map(Tuple2<String, Long> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static class AlanCustomCounter implements Counter {private long count;@Overridepublic void inc() {count += 2;}@Overridepublic void inc(long n) {count += n;}@Overridepublic void dec() {count -= 2;}@Overridepublic void dec(long n) {count -= n;}@Overridepublic long getCount() {return count;}}public static void main(String[] args) throws Exception {test2();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:2
result:> (hello,2,1)
result:> (123,2,1)
计数器行数:4
result:> (alan,4,1)
result:> (flink,4,1)
result:> (good,4,1)
计数器行数:6
result:> (alan_chan,6,1)
result:> (hi,6,1)
result:> (flink,4,2)

3)、Gauge

仪表可根据需要提供任何类型的值。为了使用Gauge,您必须首先创建一个实现org.apache.flink.metrics.Guge接口的类。返回值的类型没有限制。您可以通过调用MetricGroup上的gauge(String name, Gauge gauge) 来注册gauge。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestMetricsGaugeDemo {
//	public class MyMapper extends RichMapFunction<String, String> {
//		private transient int valueToExpose = 0;
//
//		@Override
//		public void open(Configuration config) {
//			getRuntimeContext().getMetricGroup().gauge("MyGauge", new Gauge<Integer>() {
//				@Override
//				public Integer getValue() {
//					return valueToExpose;
//				}
//			});
//		}
//
//		@Override
//		public String map(String value) throws Exception {
//			valueToExpose++;
//			return value;
//		}
//	}public static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;System.out.println("计数器行数:" + result2LineCounter);String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:1
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

报告器会将暴露的对象转换为String,这意味着需要一个有意义的toString()实现。

4)、Histogram

直方图测量长值的分布。您可以通过调用MetricGroup上的histogram(String name, Histogram histogram) 来注册一个对象。
下面的示例是自己实现的Histogram接口,仅仅用于演示实现过程。

import java.io.Serializable;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
//import com.codahale.metrics.Histogram;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestMetricsHistogramDemo {//	public class MyMapper extends RichMapFunction<Long, Long> {
//		private transient Histogram histogram;
//
//		@Override
//		public void open(Configuration config) {
//			this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new AlanHistogram());
//		}
//
//		@Override
//		public Long map(Long value) throws Exception {
//			this.histogram.update(value);
//			return value;
//		}
//	}public static class AlanHistogram implements Histogram {private CircularDoubleArray descriptiveStatistics = new CircularDoubleArray(10);;public AlanHistogram() {}public AlanHistogram(int windowSize) {this.descriptiveStatistics = new CircularDoubleArray(windowSize);}@Overridepublic void update(long value) {this.descriptiveStatistics.addValue(value);}@Overridepublic long getCount() {return this.descriptiveStatistics.getElementsSeen();}@Overridepublic HistogramStatistics getStatistics() {
//			return new DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);return null;}class CircularDoubleArray implements Serializable {private static final long serialVersionUID = 1L;private final double[] backingArray;private int nextPos = 0;private boolean fullSize = false;private long elementsSeen = 0;CircularDoubleArray(int windowSize) {this.backingArray = new double[windowSize];}synchronized void addValue(double value) {backingArray[nextPos] = value;++elementsSeen;++nextPos;if (nextPos == backingArray.length) {nextPos = 0;fullSize = true;}}synchronized double[] toUnsortedArray() {final int size = getSize();double[] result = new double[size];System.arraycopy(backingArray, 0, result, 0, result.length);return result;}private synchronized int getSize() {return fullSize ? backingArray.length : nextPos;}private synchronized long getElementsSeen() {return elementsSeen;}}}public static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;private Histogram histogram = null;;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new AlanHistogram());}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;this.histogram.update(result2LineCounter * 3);// 此处仅仅示例this.histogram.getCount()的值,没有实际的意义System.out.println("计数器行数:" + result2LineCounter + "  histogram:" + this.histogram.getCount());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:1  histogram:1
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2  histogram:2
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3  histogram:3
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

Flink没有提供直方图的默认实现,但提供了一个允许使用Codahale/DropWizard直方图的包装器。要使用此包装器,
在pom.xml中添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-metrics-dropwizard</artifactId><version>1.17.1</version>
</dependency>

下面的示例是使用 Codahale/DropWizard直方图,如下所示:

import java.io.Serializable;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Gauge;
//import com.codahale.metrics.Histogram;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import com.codahale.metrics.SlidingWindowReservoir;/*** @author alanchan**/
public class TestMetricsHistogramDemo {public static void test2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;private Histogram histogram = null;;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
//				this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new AlanHistogram());this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;this.histogram.update(result2LineCounter * 3);// 此处仅仅示例this.histogram.getCount()的值,没有实际的意义System.out.println("计数器行数:" + result2LineCounter + "  histogram:" + this.histogram.getCount());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test2();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出://控制台输出:
计数器行数:1  histogram:1
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2  histogram:2
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3  histogram:3
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

5)、Meter

仪表测量平均吞吐量。可以使用markEvent()方法注册事件的发生。可以使用markEvent(long n)方法注册同时发生多个事件。您可以通过在MetricGroup上调用meter(String name, Meter meter)来注册meter。

下面的示例展示了自定义的Meter实现,可能很不严谨,实际上应用更多的是本部分的第二个示例。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;//import com.codahale.metrics.Meter;
import com.codahale.metrics.SlidingWindowReservoir;/*** @author alanchan**/
public class TestMetricsMeterDemo {public class MyMapper extends RichMapFunction<Long, Long> {private transient Meter meter;@Overridepublic void open(Configuration config) {this.meter = getRuntimeContext().getMetricGroup().meter("myMeter", new AlanMeter());}@Overridepublic Long map(Long value) throws Exception {this.meter.markEvent();return value;}}public static class AlanMeter implements Meter {/** The underlying counter maintaining the count. */private final Counter counter = new SimpleCounter();;/** The time-span over which the average is calculated. */private final int timeSpanInSeconds = 0;/** Circular array containing the history of values. */private final long[] values = null;;/** The index in the array for the current time. */private int time = 0;/** The last rate we computed. */private double currentRate = 0;@Overridepublic void markEvent() {this.counter.inc();}@Overridepublic void markEvent(long n) {this.counter.inc(n);}@Overridepublic long getCount() {return counter.getCount();}@Overridepublic double getRate() {return currentRate;}public void update() {time = (time + 1) % values.length;values[time] = counter.getCount();currentRate = ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);}}public static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;private Histogram histogram = null;private Meter meter;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));this.meter = getRuntimeContext().getMetricGroup().meter("alanMeter", new AlanMeter());}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;this.histogram.update(result2LineCounter * 3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值,没有实际的意义,具体使用以实际使用场景为准System.out.println("计数器行数:" + result2LineCounter + ",  histogram:" + this.histogram.getCount() + ",   meter.getRate:" + this.meter.getRate());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:1,  histogram:1,   meter.getRate:0.0
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2,  histogram:2,   meter.getRate:0.0
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3,  histogram:3,   meter.getRate:0.0
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

Flink提供了一个允许使用Codahale/DropWizard仪表的包装器。要使用此包装器,
在pom.xml中添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-metrics-dropwizard</artifactId><version>1.17.1</version>
</dependency>

下面使用Codahale/DropWizard注册的示例,如下所示:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;//import com.codahale.metrics.Meter;
import com.codahale.metrics.SlidingWindowReservoir;/*** @author alanchan**/
public class TestMetricsMeterDemo {public static void test2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;private Histogram histogram = null;private Meter meter;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));//				this.meter = getRuntimeContext().getMetricGroup().meter("alanMeter", new AlanMeter());com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();this.meter = getRuntimeContext().getMetricGroup().meter("alanMeter", new DropwizardMeterWrapper(dropwizardMeter));}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;this.histogram.update(result2LineCounter * 3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值,没有实际的意义,具体使用以实际使用场景为准System.out.println("计数器行数:" + result2LineCounter + ",  histogram:" + this.histogram.getCount() + ",   meter.getRate:" + this.meter.getRate());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test2();}}//控制台输出:
计数器行数:1,  histogram:1,   meter.getRate:0.0
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2,  histogram:2,   meter.getRate:0.0
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3,  histogram:3,   meter.getRate:0.0
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

2、Scope 范围

本部分的示例比较简单,不再提供具体的验证内容。

每个metric 度量都被分配了一个标识符和一组key-value对,在这些key-value对下将报告度量。

标识符基于3个组件:注册度量时的用户定义名称、可选的用户定义范围和系统提供的范围。例如,如果A.B是系统作用域,C.D是用户作用域,E是名称,那么度量的标识符将是A.B.C.D.E。

您可以通过在conf/flink-conf.yaml中设置metrics.scope.delimiter键来配置用于标识符的分隔符(默认值:.)。

1)、用户范围

您可以通过调用MetricGroup#addGroup(String name)、MetricGroup#addGroup(int name) 或MetricGroup#addGroup(String key, String value)来定义用户作用域。这些方法影响MetricGroup#getMetricIdentifier和MetricGroup#getScopeComponents返回的内容。

counter = getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter");counter = getRuntimeContext().getMetricGroup().addGroup("MyMetricsKey", "MyMetricsValue").counter("myCounter");

2)、系统范围System Scope

系统范围包含有关度量的上下文信息,例如它在哪个任务中注册,或者该任务属于哪个作业。

应该包括哪些上下文信息可以通过在conf/flink-conf.yaml中设置以下键来配置。这些键中的每一个都需要一个格式字符串,该字符串可能包含常量(例如“taskmanager”)和变量(例如“<task_id>”),这些常量和变量将在运行时被替换。

  • metrics.scope.jm
    Default: .jobmanager
    应用于job manager范围内的所有指标
  • metrics.scope.jm-job
    Default: .jobmanager.<job_name>
    应用于 job manager and job范围内的所有度量
  • metrics.scope.tm
    Default: .taskmanager.<tm_id>
    应用于task manager范围内的所有度量
  • metrics.scope.tm-job
    Default: .taskmanager.<tm_id>.<job_name>
    应用于范围为task manager and job的所有度量
  • metrics.scope.task
    Default: .taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    应用于task范围内的所有度量
  • metrics.scope.operator
    Default: .taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    应用于作用域为operator的所有度量

变量的数量或顺序没有限制。变量区分大小写。
操作员度量的默认作用域将产生类似于 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的标识符

如果还希望包含任务名称但省略task manager信息,则可以指定以下格式:
metrics.scope.operator: .<job_name>.<task_name>.<operator_name>.<subtask_index>

这可以创建标识符localhost localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric.

对于此格式字符串,如果同一作业同时运行多次,可能会发生标识符冲突,从而导致度量数据不一致。因此,建议使用通过包括id(例如<job_id>)或通过为作业和运算符分配唯一名称来提供一定程度的唯一性的格式字符串。

3)、所有变量列表

  • JobManager:
  • TaskManager: , <tm_id>
  • Job: <job_id>, <job_name>
  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
  • Operator: <operator_id>,<operator_name>, <subtask_index>

对于Batch API, <operator_id> = <task_id>.

4)、用户变量

您可以通过调用MetricGroup#addGroup(String key, String value)来定义用户变量。此方法会影响MetricGroup#getMetricIdentifier、MetricGroup#getScopeComponents和MetricGroup#getAllVariables()返回的内容。

用户变量不能用于范围格式。

3、Reporter

Flink 支持用户将 Flink 的各项运行时指标发送给外部系统。
有关如何设置Flink的度量报告程序的信息,请查看47、Flink 的指标报告介绍及示例。

4、System metrics

默认情况下,Flink会收集几个指标,这些指标可以深入了解当前状态。本节是所有这些指标的参考。
下表通常有5列:

  • “Scope”列描述了用于生成系统范围的scope format。例如,如果单元格包含“Operator”,则使用“metrics.scope.Operator”的scope format。如果单元格包含多个值,用斜线分隔,则会多次报告不同实体的metrics ,例如作业管理器和任务管理器。
  • (optional)“Infix”列描述了将哪个Infix附加到system scope。
  • “Metrics”列列出了为给定 scope and infix注册的所有度量的名称。
  • “Description”列提供了有关给定度量的测量信息。
  • “Type”列描述了用于测量的度量类型。

请注意, infix/metric名称列中的所有点仍受“metrics.demitter”设置的约束。

因此,为了推断metric identifier:
1、采用基于“Scope”列的scope-format
2、将值附加到“Infix”列中(如果存在),并说明“metrics.demitter”设置
3、附加metric 名称。

1)、CPU

在这里插入图片描述

2)、Memory

与内存相关的指标要求Oracle的内存管理(也包含在OpenJDK的Hotspot实现中)到位。在使用其他JVM实现(例如IBM’s J9)时,某些度量可能不会公开。
在这里插入图片描述

3)、Threads

在这里插入图片描述

4)、GarbageCollection

在这里插入图片描述

5)、ClassLoader

在这里插入图片描述

6)、Network

不推荐:使用默认的Default shuffle service 中的指标

在这里插入图片描述

7)、Default shuffle service

与使用netty网络通信的任务执行器之间的数据交换相关的度量。

在这里插入图片描述
在这里插入图片描述

8)、Cluster

在这里插入图片描述

9)、Availability

此表中的指标可用于以下每个作业状态:INITIALIZING、CREATED、RUNNING、RESTARTING、CANCELLING、FAILING。是否报告这些指标取决于metrics.job.status.enable设置。

这些度量的语义可能会在以后的版本中发生变化。

在这里插入图片描述
实验阶段功能

当作业处于RUNNING状态时,此表中的指标提供了有关作业当前正在执行的操作的其他详细信息。是否报告这些指标取决于metrics.job.status.enable设置。

在这里插入图片描述
在以下情况下,作业被视为正在部署任务:

  • 对于流作业,任何任务都处于“正在部署”状态
  • 对于批处理作业,如果至少有一个任务处于展开状态,并且没有INITIALIZING/RUNNING任务

在这里插入图片描述

10)、Checkpointing

对于失败的检查点,度量是在尽最大努力的基础上更新的,可能不准确。

在这里插入图片描述

11)、State Access Latency

在这里插入图片描述
在这里插入图片描述

12)、RocksDB

某些RocksDB本机指标可用,但默认情况下已禁用,您可以在此处找到完整的文档

13)、State Changelog

在这里插入图片描述

这些指标只能通过报告器获得。

14)、IO

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

15)、Connectors

  • Kafka Connectors
    在这里插入图片描述
  • Kinesis 源
    在这里插入图片描述
  • Kinesis 接收器
    在这里插入图片描述
  • HBase Connectors
    在这里插入图片描述

16)、System resources

默认情况下,系统资源报告处于禁用状态。启用metrics.system-resource后,以下列出的其他度量将在Job和TaskManager上可用。系统资源度量被定期更新,并且它们呈现配置的间隔(metrics.System resource probing interval)的平均值。
系统资源报告要求类路径上存在一个可选的依赖项(例如,位于Flink的lib目录中):

  • com.github.oshi:oshi核心:6.1.5(根据MIT license授权)

包括它的可传递依赖项:

  • net.java.dev.jna:jna平台:jar:5.10.0
  • net.java.dev.jna:jar:5.10.0

这方面的故障将报告为警告消息,如SystemResourcesMetricInitializer在启动期间记录的NoClassDefFoundError。

  • System CPU
    在这里插入图片描述
  • System memory
    在这里插入图片描述
  • System network
    在这里插入图片描述

17)、预测执行

以下指标可以用来衡量预测执行的有效性。
在这里插入图片描述

5、End-to-End latency tracking 延迟跟踪

Flink允许跟踪在系统中传输的记录的延迟。默认情况下,此功能处于禁用状态。要启用延迟跟踪,必须在Flink配置或ExecutionConfig中将latencyTracingInterval设置为正数。

在latencyTracingInterval,源将周期性地发出一个特殊记录,称为LatencyMarker。标记包含从记录在源处发出的时间开始的时间戳。延迟标记无法超过常规用户记录,因此,如果记录在operator面前排队,则会增加标记跟踪的延迟。

延迟标记并没有考虑用户记录在运算符中花费的时间,因为它们正在绕过它们。特别是,标记没有考虑记录在窗口缓冲区中花费的时间。只有当operator无法接受新记录,因此他们正在排队时,使用标记测量的延迟才会反映这一点。
LatencyMarketers用于导出拓扑的源和每个下游操作符之间的延迟分布。这些分布被报告为直方图度量。这些分布的粒度可以在Flink配置中进行控制。对于最高粒度的子任务,Flink将导出每个源子任务和每个下游子任务之间的延迟分布,这将导致直方图的二次(就并行性而言)数量。

目前,Flink假设集群中所有机器的时钟都是同步的。我们建议设置自动时钟同步服务(如NTP),以避免错误的延迟结果。

警告启用延迟度量可能会显著影响集群的性能(尤其是子任务粒度)。强烈建议仅将它们用于调试目的。

6、State access latency tracking 延迟跟踪

Flink还允许跟踪标准Flinkstate-backends或从AbstractStateBackend扩展的自定义state-backends的keyed state访问延迟。默认情况下,此功能处于禁用状态。要启用此功能,必须在Flink配置中将state.backend.latency-track.keyed-state-enabled设置为true。

启用跟踪keyed state访问延迟后,Flink将对每N次访问的状态访问延迟进行采样,其中N由state.backend.latency-track.sample-interval定义。此配置的默认值为100。较小的值将获得更准确的结果,但由于采样频率更高,因此对性能的影响更大。

由于此延迟度量的类型为直方图,state.backend.latency-track.history-size将控制历史记录值的最大数量,默认值为128。此配置的较大值将需要更多的内存,但将提供更准确的结果。

警告启用状态访问延迟度量可能会影响性能。建议仅将它们用于调试目的。

7、REST API integration

可以通过监控REST API查询度量。

下面是可用endpoints的列表,其中包含一个示例JSON响应。
所有endpoints均为示例形式http://hostname:8081/jobmanager/metrics,

下面我们只列出URL的路径部分。
例如,<>中的值是变量http://hostname:8081/jobs//metrics必须被请求,
例如 http://192.168.10.49:8081/jobs/cb4443fd87ed97873b55be1bdefede30/metrics.

特定实体的请求度量:

  • /jobmanager/metrics
  • /taskmanagers/<taskmanagerid>/metrics
  • /jobs/<jobid>/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
  • 示例
    通过日志或web ui界面可以很容易找到jobid。
    下图为示例性图示
    在这里插入图片描述
    本示例下面的参数如:
    jobid:bb741e7e46d97541a83a492c948e000d
    taskmanagerid:192.168.10.42:42933-a2a682
    subtaskindex:0
    vertexid:cbc357ccb763df2852fee8c4fc7d55f2
## 1、/jobmanager/metrics
http://192.168.10.41:9081/jobmanager/metrics
[{"id":"Status.JVM.GarbageCollector.PS_MarkSweep.Time"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"taskSlotsAvailable"},{"id":"taskSlotsTotal"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.GarbageCollector.PS_MarkSweep.Count"},{"id":"Status.JVM.GarbageCollector.PS_Scavenge.Time"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.GarbageCollector.PS_Scavenge.Count"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"numRegisteredTaskManagers"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"numRunningJobs"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"}]## 2、/taskmanagers/<taskmanagerid>/metrics
http://192.168.10.41:9081/taskmanagers/192.168.10.42:42933-a2a682/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]## 3、/jobs/<jobid>/metrics
http://192.168.10.41:9081/jobs/bb741e7e46d97541a83a492c948e000d/metrics
[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointExternalPath"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"lastCheckpointProcessedData"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"},{"id":"lastCheckpointPersistedData"}]## 4、/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
## 在作业详情页面中,找到并点击 "Task Managers" 选项卡。
## 在 "Task Managers" 页面中,您可以查看每个 Task Manager 的详细信息,包括其分配给该 Task Manager 的任务(即 vertex)及其 ID。
## 或者在chrome中右击检查中查看vertexid,由于下面的链接内容太多,仅仅截图展示
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics

在这里插入图片描述

在相应类型的所有实体中聚合的请求度量:

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
  • 示例
    通过日志或web ui界面可以很容易找到jobid。
    本示例下面的参数如:
    jobid:bb741e7e46d97541a83a492c948e000d
    vertexid:cbc357ccb763df2852fee8c4fc7d55f2
## 1、/taskmanagers/metrics
http://192.168.10.41:9081/taskmanagers/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]## 2、/jobs/metrics
http://192.168.10.41:9081/jobs/metrics
[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointExternalPath"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"lastCheckpointProcessedData"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"},{"id":"lastCheckpointPersistedData"}]## 3、/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
## 在作业详情页面中,找到并点击 "Task Managers" 选项卡。
## 在 "Task Managers" 页面中,您可以查看每个 Task Manager 的详细信息,包括其分配给该 Task Manager 的任务(即 vertex)及其 ID。
## 或者在chrome中右击检查中查看vertexid,由于下面的链接内容太多,仅仅截图展示
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics

在这里插入图片描述

在相应类型的所有实体的子集上聚合的请求度量:

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
  • 示例
    本示例下面的参数如:
    jobid:bb741e7e46d97541a83a492c948e000d
    taskmanagerid1:192.168.10.42:42933-a2a682
    taskmanagerid2:192.168.10.43:38542-8d626d
    taskmanagerid3:192.168.10.44:43904-9a6f04
    vertexid:cbc357ccb763df2852fee8c4fc7d55f2
## 1、/taskmanagers/metrics?taskmanagers=A,B,C
http://192.168.10.41:9081/taskmanagers/metrics?taskmanagers=192.168.10.42:42933-a2a682,192.168.10.43:38542-8d626d,192.168.10.44:43904-9a6f04
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]## 2、/jobs/metrics?jobs=D,E,F
http://192.168.10.41:9081/jobs/metrics?jobs=bb741e7e46d97541a83a492c948e000d
[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointExternalPath"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"lastCheckpointProcessedData"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"},{"id":"lastCheckpointPersistedData"}]## 3、/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
## 在作业详情页面中,找到并点击 "Task Managers" 选项卡。
## 在 "Task Managers" 页面中,您可以查看每个 Task Manager 的详细信息,包括其分配给该 Task Manager 的任务(即 vertex)及其 ID。
## 或者在chrome中右击检查中查看vertexid,由于下面的链接内容太多,仅仅截图展示
## 以下按照步骤显示subtask的metrics
### 1、查询所有的subtask,其内容如上图,不再赘述
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics
### 2、
### 上图中有id为 Source__TableSourceScan(table=[[default_catalog__default_database__alanchan_kafk.KafkaConsumer.bytes-consumed-total、Source__TableSourceScan(table=[[default_catalog__default_database__alanchan_kafk.failed-reauthentication-rate等,本示例就以查其2个子任务
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics?subtask= Source__TableSourceScan(table=[[default_catalog__default_database__alanchan_kafk.KafkaConsumer.bytes-consumed-total,Source__TableSourceScan(table=[[default_catalog__default_database__alanchan_kafk.failed-reauthentication-rate
### 其内容太多,见下面截图

在这里插入图片描述

警告度量名称可以包含查询度量时需要转义的特殊字符。例如,“a_+b”将转义为“a%2B_b”。

应转义的字符列表:
在这里插入图片描述

1)/jobmanager/metrics示例

GET /jobmanager/metrics

http://192.168.10.41:9081/jobmanager/metrics
[{"id":"Status.JVM.GarbageCollector.PS_MarkSweep.Time"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"taskSlotsAvailable"},{"id":"taskSlotsTotal"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.GarbageCollector.PS_MarkSweep.Count"},{"id":"Status.JVM.GarbageCollector.PS_Scavenge.Time"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.GarbageCollector.PS_Scavenge.Count"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"numRegisteredTaskManagers"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"numRunningJobs"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"}]

2) taskmanagers//metrics?get=metric1,metric2示例

请求特定taskmanagers的 Metric 的值(未聚合)
GET taskmanagers//metrics?get=metric1,metric2
本示例下面的参数如:
taskmanagerid1:192.168.10.42:42933-a2a682
taskmanagerid2:192.168.10.43:38542-8d626d
taskmanagerid3:192.168.10.44:43904-9a6f04

## 1、获取taskmananger的指标
http://192.168.10.41:9081/taskmanagers/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]## 2、获取指定taskmanager的指定指标(Status.JVM.Memory.Mapped.TotalCapacity和Status.JVM.CPU.Load)的值
http://192.168.10.41:9081/taskmanagers/192.168.10.42:42933-a2a682/metrics?get=Status.JVM.Memory.Mapped.TotalCapacity,Status.JVM.CPU.Load[{"id": "Status.JVM.Memory.Mapped.TotalCapacity","value": "0"
}, {"id": "Status.JVM.CPU.Load","value": "0.001329512482145905"
}]

3)/taskmanagers/metrics?get=metric1,metric2示例

请求特定 Metric 的聚合值
GET /taskmanagers/metrics?get=metric1,metric2
GET /taskmanagers/metrics?get=metric1,metric2


## 1、获取taskmananger的指标
http://192.168.10.41:9081/taskmanagers/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]## 2、获取taskmanagers的指定指标(Status.JVM.Memory.Mapped.TotalCapacity和Status.JVM.CPU.Load)的值
http://192.168.10.41:9081/taskmanagers/metrics?get=Status.JVM.Memory.Mapped.TotalCapacity,Status.JVM.CPU.Load
[{"id": "Status.JVM.Memory.Mapped.TotalCapacity","min": 0.0,"max": 0.0,"avg": 0.0,"sum": 0.0},{"id": "Status.JVM.CPU.Load","min": 5.440145745299967E-4,"max": 0.0015120478111207314,"avg": 9.553803717257513E-4,"sum": 0.002866141115177254}
]

4)/taskmanagers/metrics?get=metric1,metric2&agg=min,max示例

请求特定 Metric 的特定值的聚合值
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

## 1、获取taskmananger的指标
http://192.168.10.41:9081/taskmanagers/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]## 2、获取taskmanagers的指定指标(Status.JVM.Memory.Mapped.TotalCapacity和Status.JVM.CPU.Load)的值
http://192.168.10.41:9081/taskmanagers/metrics?get=Status.JVM.Memory.Mapped.TotalCapacity,Status.JVM.CPU.Load&agg=min,max[{"id": "Status.JVM.Memory.Mapped.TotalCapacity","min": 0.0,"max": 0.0},{"id": "Status.JVM.CPU.Load","min": 3.784653231147696E-4,"max": 0.001422205366454916}
]

8、Dashboard integration

为每个task or operator收集的度量也可以在仪表板中可视化。在作业的主页面上,选择度量选项卡。在顶部图形中选择一个任务后,可以使用添加度量下拉菜单选择要显示的度量。

  • task指标列为<subtask_index><metric_name>。
  • operator指标列为<subtask_index><operator_name><metric_name>。

每个度量将被可视化为一个单独的图形,x轴表示时间,y轴表示测量值。所有图形每10秒自动更新一次,并在导航到另一个页面时继续更新。
可视化度量的数量没有限制;然而,只有数字度量可以被可视化。

以上,本文简单的介绍了Flink 的指标体系内容,即指标类型以及四种类型的代码实现示例、scope、系统指标、报告、跟踪、api与dashboard集成。
本专题分为三部分,即:
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/182960.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从源代码出发,Jenkins 任务排队时间过长问题的解决过程

最近开发了一个部署相关的工具&#xff0c;使用 Jenkins 来构建应用。Jenkins 的任务从模板中创建而来。每次部署时&#xff0c;通过 Jenkins API 来触发构建任务。在线上运行时发现&#xff0c;通过 API 触发的 Jenkins 任务总是会时不时在队列中等待较长的时间。某些情况下的…

Node.js案例 - 记账本

目录 项目效果 项目的搭建 ​编辑 响应静态网页 ​编辑 ​编辑 结合MongoDB数据库 结合API接口 进行会话控制 项目效果 该案例实现账单的添加删除查看&#xff0c;用户的登录注册。功能比较简单&#xff0c;但是案例主要是使用前段时间学习的知识进行实现的&#xff0c…

C++ AVL 树

AVL树的概念 当数据有序或接近有序二叉搜索树将退化为单支树&#xff0c;此时二叉搜索树的搜索效率低下 解决方法&#xff1a;AVL树&#xff08;降低树的高度&#xff0c;从而减少平均搜索长度) 一棵AVL树或者是空树&#xff0c;或者是具有以下性质的二叉搜索树&#xff1…

JavaScript基础—函数、参数、返回值、作用域、变量、匿名函数、综合案例—转换时间,逻辑中断,转换为Boolean型

版本说明 当前版本号[20231129]。 版本修改说明20231126初版20231129完善部分内容 目录 文章目录 版本说明目录JavaScript 基础 - 第4天笔记函数声明和调用声明&#xff08;定义&#xff09;调用细节补充 参数形参和实参函数默认值 返回值作用域全局作用域局部作用域 变量全…

laraval6.0 GatewayWorker 交互通信

laravel 6.0 GatewayWorker 通讯 开发前准备下载 GatewayWorker 及操作方式前端demo测试效果项目中安装GatewayClient 开发前准备 GatewayClient 官网&#xff1a;https://www.workerman.net/ 当前使用的是宝塔操作 下载 GatewayWorker 及操作方式 前端demo 测试效果 项目中安…

纹理烘焙:原理及实现

纹理烘焙是计算机图形学中常见的技术&#xff0c;用于将着色器的细节传输到纹理中。 如果你的着色器计算量很大&#xff0c;但会产生静态结果&#xff0c;例如&#xff0c;这非常有用。 复杂的噪音。 NSDT在线工具推荐&#xff1a; Three.js AI纹理开发包 - YOLO合成数据生成器…

Ajax的使用方法

1,什么是Ajax&#xff1f; Ajax&#xff08;异步Javascript和XML&#xff09;&#xff0c;是指一种创建交互式网页应用的网页开发技术。 2&#xff0c;Ajax的作用 Ajax可以使网页实现异步更新----即在不更新整个页面的情况下实现对某一部分进行更新。 简单来说Ajax就是用于连接…

【Python】yaml.safe_load()函数详解和示例

在Python中&#xff0c;PyYAML库提供了对YAML&#xff08;YAML Ain’t Markup Language&#xff09;文件的强大支持。YAML是一种直观的数据序列化标准&#xff0c;可以方便地存储和加载配置文件、数据日志等。 yaml.safe_load和yaml.load是Python的PyYAML库提供的两个函数&…

从零搭建AlibabaCloud微服务项目

1&#xff0c;创建maven项目工程如下 equipment-admin 后台equipment-applet 前台或小程序端或app、h5equipment-common 公共模块equipment-gateway 网关equipment-mapper mapper层操作数据库equipment-model 实体类对应数据库表 2&#xff0c;在父pom文件引入依赖 <proper…

基于Java SSM框架实现美食推荐管理系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现美食推荐管理系统演示 摘要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所认识&a…

国内首个农业开源鸿蒙操作系统联合华为正式发布

2023年11月29日&#xff0c;在中国国际供应链促进博览会上&#xff0c;中信农业科技股份有限公司&#xff08;简称“中信农业”&#xff09;与深圳开鸿数字产业发展有限公司&#xff08;简称“深开鸿”&#xff09;以及华为技术有限公司&#xff08;简称“华为”&#xff09;联…

UniWebView 版本3 版本4 版本5介绍

一、介绍 UniWebView是iOS/Android上的web视图组件的包装器&#xff0c;所以运行时拥有与原生web相似性能。是针对Unity所写的插件&#xff0c;节省了项目的开发时间。 官网地址&#xff1a;UniWebView 二、下载&使用 1、下载 &#xff08;1&#xff09;、Unity Asset …

GAN:PacGAN-生成对抗网络中两个样本的威力

论文&#xff1a;https://arxiv.org/pdf/1712.04086.pdf 代码&#xff1a;GitHub - fjxmlzn/PacGAN: [NeurIPS 2018] [JSAIT] PacGAN: The power of two samples in generative adversarial networks 发表&#xff1a;2016 一、摘要 1&#xff1a;GAN最重大的缺陷是&#xf…

自己动手写 chatgpt: Attention 机制的原理与实现

chatgpt等大模型之所以成功都有赖于一种算法突破&#xff0c;那就是 attention 机制。这种机制能让神经网络更有效的从语言中抽取识别其内含的规律&#xff0c;同时它支持多路并行运算&#xff0c;因此相比于原来的自然语言处理算法&#xff0c;它能够通过并发的方式将训练的速…

leetcode 11. 盛最多水的容器(优质解法)

代码&#xff1a; class Solution {public int maxArea(int[] height) {int nheight.length;int left0;int rightn-1;int max0;while (left<right){//计算当前 left 和 right 所在位置的面积int areaMath.min(height[left],height[right])*(right-left);//重置最大值if(are…

进程间通信基础知识【Linux】——上篇

目录 一&#xff0c;理解进程之间的通信 1. 进程间通信目的 2. 进程间通信的技术背景 3&#xff0c;常见的进程间通信 二&#xff0c;管道 1. 尝试建立一个管道 管道的特点&#xff1a; 管道提供的访问控制&#xff1a; 2. 扩展&#xff1a;进程池 阶段一&#xff1a…

sqli-labs靶场详解(less32-less37)

宽字节注入 原理在下方 目录 less-32 less-33 less-34 less-35 less-36 less-37 less-32 正常页面 ?id1 下面有提示 获取到了Hint: The Query String you input is escaped as : 1\ ?id1 看来是把参数中的非法字符就加上了转义 从而在数据库中只能把单引号当成普通的字…

asla四大开源组件应用示例(alsa-lib、alsa-utils、alsa-tools、alsa-plugins)

文章目录 alsa设备文件/dev/snd//sys/class/sound/proc/asoundalsa-lib示例1alsa-utilsalsa-toolsalsa-plugins参考alsa设备文件 /dev/snd/ alsa设备文件目录位于,/dev/snd,如下所示 root@xboard:~#ls /dev/snd -l total 0 drwxr-xr-x 2 root root 60 Nov 6 2023 …

springboot基础配置及maven运行

目录 1、spring快速开始&#xff1a; 2、通过idea工具打开导入包 3、maven打包 1、springboot快速开始&#xff1a; 环境依赖&#xff1a;jdk17 Spring | Quickstart spring初始化包下载&#xff1a; 点击generate&#xff0c;下载包 2、通过idea工具打开导入包 我之前写了…

【Vulnhub靶机】lampiao--DirtyCow

文章目录 漏洞介绍简介原因类型版本危害 信息收集主机扫描端口扫描 漏洞探测漏洞利用权限提升nc文件传输编译 参考 靶机地址&#xff1a;lampiao 下载地址&#xff1a;Lampio: 1 漏洞介绍 简介 脏牛&#xff08;Dirty Cow&#xff09;是Linux内核的一个提权漏洞&#xff0c;…