八种Flink任务告警方式

目录

一、Flink应用分析

1.1 Flink任务生命周期

1.2 Flink应用告警视角分析

二、监控告警方案说明

2.1 监控消息队中间件消费者偏移量

2.2 通过调度系统监控Flink任务运行状态

2.3 引入开源服务的SDK工具实现

2.4 调用FlinkRestApi实现任务监控告警

2.5 定时去查询目标库最大时间和当前时间做对比

2.6 自定义指标Reporter的SDK

2.7 任务日志告警

2.8 运行任务探活

三、总结


前言:Flink作为一个高性能实时计算引擎,可灵活的嵌入各种场景,许多团队为了实现业务交付,选择了Flink作为解决方案;但是随着Flink应用的增多且出现线上事故,对Flink任务异常的监控告警成为迫切需求;但是如何实现Flink任务异常监控告警,成为了新的问题;本文将从多个角度讲述Flink任务监控告警实现方案。

一、Flink应用分析

       告警可以从多个角度实现;我们先分析Flink任务运行的生命周期,然后拆解每个部分,分析可以从那些角度去监控Flink任务的异常。

1.1 Flink任务生命周期

按读取数据源:有如Kafka、RocketMq、Pulsar等消息队列,还有其他数据源;区别在是否有记录消费者信息的数据标识;

Flink的运行模式:session、per-job、application;三类运行模式可以分为两类场景:单独运行的任务(per和application),还有Flink集群统一提供资源运行的任务(session);

任务场景:离线任务还是实时任务;

Flink任务应用结构图如下:

1.2 Flink应用告警视角分析

从数据源头:

1.对于消息队列这种,本身拥有记录消费者偏移量概念的中间件,可以通过监控消费者偏移量的变化来监控Flink任务运行的异常情况;

从任务运行时:

2.任务层可以通过调度系统的告警插件,监控任务运行结果和任务运行状态而监控任务;

3.也可以在Flink任务内部引入开源SDK配置开源工具实现;

4.或者调用FlinkRestApi实现任务监控告警;

从输出结果上:

5.可以定时去查询输出结果最后的时间

6.或者在Flink任务里引入Flink的指标SDK,自定义Flink任务的指标采集,将结果测流输出到目标端,自定义监控告警和分析;

其他的方式:

7.日志告警,捕捉运行日志,通过关键词监控告警;

8.运行任务定时探活

二、监控告警方案说明

       钉钉、微信、邮件、电话、http等属于告警方式的选择,这里侧重讲对于运行异常事件信息的捕捉。

2.1 监控消息队中间件消费者偏移量

      类似Kafka或者RocketMQ这类拥有记录消费者消费队列信息的中间件,可以通过服务自身的RestAPI,定时计算消费者消费数据lag条数;

以下是Kafka消费者告警配置页面:

这需要后端自定义实现;

实现方式如下:定时通过调用Kafka自己提供的RestApi将Topic和各消费者同步到Mysql,然后配置要监控Topic的消费者告警阈值和告警人员,每隔一分钟定时计算该消费者的lag,如果Flink任务出现异常,本身不提交offset了,数据积压量大于阈值就告警。

2.2 通过调度系统监控Flink任务运行状态

市场上有一些任务调度系统,比如dolphinscheduler、StreamX等,除了提供任务发布的能力,还自带监控告警功能,通过使用这类产品,也能做到监控告警能力。

比如dolphinscheduler:

Flink任务发布功能:

告警功能插件:

比如StreamPark:

Flink任务发布能力:

告警功能插件:

2.3 引入开源服务的SDK工具实现

       博客上对于Flink监控告警推荐最多的一种方式就是,prometheus + pushgateway + grafana这套方案;这套方案需要安装维护prometheus和grafana这两个产品,比较重,但是这套方案除了可以做到任务监控,还可以做到任务指标级的分析,这对于后续的任务性能优化有比较好的支持。

具体操作步骤如下:

1.安装好prometheus + pushgateway这两个服务;

2.在Flink代码里加入以下依赖:

  <!-- Prometheus Metrics Reporter --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-metrics-prometheus</artifactId><version>${flink-version}</version></dependency>

3.在部署Flink的配置文件里

将flink-metrics-prometheus-1.14.3.jar 包放入到flink安装目录/lib下

修改flink-conf.yaml配置文件,设置属性如下:

Example configuration: metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter metrics.reporter.promgateway.host: localhost metrics.reporter.promgateway.port: 9091 metrics.reporter.promgateway.jobName: myJob metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2 metrics.reporter.promgateway.interval: 60 SECONDS

然后启动运行任务,指标数据就自动推送到pushgateway里了,prometheus会从CC里拉取数据到自己的服务里,如下:

在grafana里导入prometheus源,配置指标就可以看到各种指标的运行状态:

总结:这种方案需要四个步骤:

1.启动prometheus+pushgateway+grafana服务;

2.配置Flink安装目录的配置文件、导入prometheus的lib包;

3.然后在Flink任务里引入一个prometheus的SDK,一起打包启动,指标就可以在prometheus看到;

4.通过grafana做分析看板和配置告警规则,驱动事件告警;

       这种方式都是开源服务功能,但是需要维护和理解成本,对于一些轻业务团队有负担,但是对有很多Flink任务的团队,这是一种可用的方案,后续还可以基于历史指标分析,做到内存级的性能优化;

2.4 调用FlinkRestApi实现任务监控告警

       这里要搞清楚Flink集群的生命和Flink任务的生命周期这两个概念;Flink集群按生命周期来分,运行方式可以分为session模式和其他模式两种;这两种的区别分别是,Flink集群和Flink任务的资源是否一起释放;这关系到是否可以稳定的通过FlinkRestApi捕捉到任务运行状态;

       对于Flink Sesion集群,Flink任务可以反复提交,集群的URL是不会变的,可以通过固定的URL监控到Flink任务的运行状态;

       对于per-job和application运行方式,Flink任务web的URL是不固定的,需要每次都捕捉到启动时的Url才能通过url调用RestAPI返回查询指标;

sesion集群样式:

per-job和application运行模式提交的任务,只会有一个任务,且url是随机的。

2.5 定时去查询目标库最大时间和当前时间做对比

       这种方式是公司的DB团队给我的想法,并且他们最初也是这么做的,虽然操作上不美观,无法大面积,且性能上会造成一些影响,但确实可以轻量级的实现对任务异常的监控;

具体是怎么做的呢?

       对于实时任务,数据都是实时捕捉的,写入目标库的时候,数据带有当前时间字段,业务理想状态下,数据会一直产生,查询目标库时间最大的数据与当前时间匹配,超出阈值时间范围就告警;不理想状态,将特殊时间段监控去掉就行;这种方式在生成业务种确实能满足任务的异常监控告警需求。

     要查询最大时间的数据,可以使用如下的 SQL 语句:

 SELECT time_column FROM table_name ORDER BY time_column DESC LIMIT 1;

2.6 自定义指标Reporter的SDK

1.引入Flink自带的指标SDK:

<!-- Prometheus Metrics Reporter --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-metrics-prometheus</artifactId><version>${flink-version}</version></dependency>

2.类似prometheus,将指标类的一些参数,自定义捕捉写到目标库(将推送到pushgateway改成推送到Kafka),然后通过目标库的数据自己做任务异常监控分析;

       这种方式就是避免了开源维护的成本,可以使用产品线自研的一套UI和采集中间件做数据管理,减轻了维护成本。

大致步骤是:

1.自定义 ReporterFactory 实现 MetricReporterFactory 接口中的 createMetricReporter 方法。

2.自定义 Reporter 继承 AbstractReporter 实现 Scheduled 接口中的相关方法

3.在 META-INF/services 下的配置文件中添加对应的实现类,然后在Flink配置里自定义参数。

以写入Kafka为例:

实现KafkaReporterFactory:

package org.apache.flink.metrics.kafka;import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;import java.util.Properties;/*** @Description:* @author:i7Yang* @create 2024-01-26 20:19**/
public class KafkaReporterFactory implements MetricReporterFactory {@Overridepublic MetricReporter createMetricReporter(Properties properties) {return new KafkaReporter();}
}

实现自定义KafkaReporter:

package org.apache.flink.metrics.kafka;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.AbstractReporter;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.*;
import java.util.stream.Collectors;/*** {@link MetricReporter} that exports {@link Metric Metrics} via Kafka.*/public class KafkaReporter extends AbstractReporter implements Scheduled {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReporter.class);static final String JOB_ID_VARIABLE = "<job_id>";static final String JOB_NAME_VARIABLE = "<job_name>";private KafkaProducer<String, String> kafkaProducer;private List<String> metricsFilter = new ArrayList<>();private String topic;private String jobName;private String jobId;@Overridepublic void open(MetricConfig metricConfig) {String bootstrapServer = metricConfig.getString("bootstrapServers", "master:9092,storm1:9092,storm2:9092");String filter = metricConfig.getString("filter", "");String chunkSize = metricConfig.getString("chunkSize", "5");String topic = metricConfig.getString("topic", "flink_metric");Properties properties = new Properties();properties.setProperty("bootstrap.servers", bootstrapServer);properties.setProperty("acks", "all");properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");ClassLoader classLoader = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(null);kafkaProducer = new KafkaProducer<>(properties);Thread.currentThread().setContextClassLoader(classLoader);if (StringUtils.isNotEmpty(filter)) {this.metricsFilter.addAll(Arrays.asList(filter.split(",")));}this.chunkSize = Integer.parseInt(chunkSize);this.topic = topic;// 获取任务的 jobNamethis.jobName = metricConfig.getString("FLINK_JOB_NAME", null);LOGGER.info("job name: {}", jobName);}@Overridepublic void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {Map<String, String> allVariables = group.getAllVariables();String jobID = allVariables.get(JOB_ID_VARIABLE);if (jobID != null && this.jobId == null) {this.jobId = jobID;}String jobName = allVariables.get(JOB_NAME_VARIABLE);if (jobName != null && this.jobName == null) {this.jobName = jobName;}LOGGER.info("job id: {}, job name: {}", this.jobId, this.jobName);LOGGER.info("metric group name: {}, metric name: {}", group.getAllVariables(), metricName);// 只有在 filter 里面的 metric 才会被添加super.notifyOfAddedMetric(metric, metricName, group);}@Overridepublic void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {super.notifyOfRemovedMetric(metric, metricName, group);}@Overridepublic void close() {if (kafkaProducer != null) {kafkaProducer.close();}}@Overridepublic void report() {synchronized (this) {tryReport();}}private void tryReport() {Map<String, Object> metricMap = new HashMap<>();metricMap.put("jobId", this.jobId);metricMap.put("jobName", this.jobName);JSONArray jsonArray = new JSONArray();gauges.forEach((gauge, metricName) -> {JSONObject jsonObject = new JSONObject();jsonObject.put("metricName", metricName);jsonObject.put("value", gauge.getValue());jsonObject.put("type", "Gauge");jsonArray.add(jsonObject);});counters.forEach((counter, metricName) -> {JSONObject jsonObject = new JSONObject();jsonObject.put("metricName", metricName);jsonObject.put("value", counter.getCount());jsonObject.put("type", "Counter");jsonArray.add(jsonObject);});histograms.forEach((histogram, metricName) -> {JSONObject jsonObject = new JSONObject();jsonObject.put("metricName", metricName);jsonObject.put("value", histogram.getCount());jsonObject.put("type", "Histogram");jsonArray.add(jsonObject);});meters.forEach((meter, metricName) -> {JSONObject jsonObject = new JSONObject();jsonObject.put("metricName", metricName);jsonObject.put("value", meter.getCount());jsonObject.put("type", "Meter");jsonArray.add(jsonObject);});metricMap.put("metrics", jsonArray);ProducerRecord<String, String> record = new ProducerRecord<>(this.topic, this.jobId, JSONObject.toJSONString(metricMap));kafkaProducer.send(record);}@Overridepublic String filterCharacters(String input) {return input;}
}

 flink 的配置文件中设置一下 kafka reporter:

metrics.reporter.kafka.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka.bootstrapServers: master:9092,storm1:9092,storm2:9092
metrics.reporter.kafka.topic: flink_metric
metrics.reporter.kafka.filter: inPoolUsage,outPoolUsage,numberOfCompletedCheckpoints,lastCheckpointFullSize,numBytesOutPerSecond,numBuffersOutPerSecond,numRecordsInPerSecond
metrics.reporter.kafka.interval: 20 SECONDS

2.7 任务日志告警

       将Flink的运行任务集中采集,文件日志用LogStagsh,指标日志可在应用里埋点,然后通过日志做告警管理。

2.8 运行任务探活

      上面2.4节讲了Flink的sesion运行模式,可以通过FlinkRestApi获取运行状态和指标;但是对于per-job和applicaiton运行方式,任务异常失败后,restApi是不存在,但是对于其使用的资源管理器,可以捕捉到任务运行状态;比如yarn,可以通过shell查询到任务的存活情况,可以定时去探活或获取url获取运行时指标。

使用yarn做Flink任务资源管理的命令:

定时监控flink任务状态:

yarn application -list | grep -w flink任务名 字 | awk '{print $1}'

返回flink任务url链接:

yarn application -list | grep -w flink 任务名字 | awk '{print $10}'

三、总结

       Flink任务告警方式的选择,要从任务的使用情况和期盼来考量;简单的使用,且任务少,可以用监控目标数据库的数据写入情况、per-job和application运行任务探活、Sesion运行方式通过RestApi来告警;特定场景的业务可以靠监控存储中间偏移量来告警;通用大规模应用场景可以通过采集运行时日志、使用调度平台,使用调度平台、引入开源SDK方式、自定义SDK写入通用系统通用系统里方式选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/652613.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无人机在三维空间中的转动问题

前提 这篇博客是对最近一个有关无人机拍摄图像项目中所学到的新知识的一个总结&#xff0c;比较杂乱&#xff0c;没有固定的写作顺序。 无人机坐标系旋转问题 上图是无人机坐标系&#xff0c;绕x轴是翻滚(Roll)&#xff0c;绕y轴是俯仰(Pitch)&#xff0c;绕z轴是偏航(Yaw)。…

力扣日记1.27-【回溯算法篇】131. 分割回文串

力扣日记&#xff1a;【回溯算法篇】131. 分割回文串 日期&#xff1a;2023.1.27 参考&#xff1a;代码随想录、力扣 131. 分割回文串 题目描述 难度&#xff1a;中等 给你一个字符串 s&#xff0c;请你将 s 分割成一些子串&#xff0c;使每个子串都是 回文串 。返回 s 所有可…

D. Epic Transformation(堆+贪心)

思路&#xff1a;我们删的策略是从次数多的数开始删&#xff0c;每次取两种不同的数&#xff0c;每种删去一个&#xff0c;然后放回堆中。 代码&#xff1a; void solve(){int n;cin >> n;map<int,int>mp;for(int i 1;i < n;i ){int x;cin >> x;mp[x] …

Java笔记 --- 四、异常

四、异常 Java.lang.Throwable Error Exception&#xff08;异常&#xff09; 异常的作用 异常的处理方式 JVM默认的处理方式 捕获异常&#xff08;自己处理&#xff09; try里面没有出现异常&#xff0c;就不会运行catch里面的代码 如果出现多个异常&#xff0c;需要多个c…

【归并排序】【图论】【动态规划】【 深度游戏搜索】1569将子数组重新排序得到同一个二叉搜索树的方案数

本文涉及知识点 动态规划汇总 图论 深度游戏搜索 归并排序 组合 LeetCoce1569将子数组重新排序得到同一个二叉搜索树的方案数 给你一个数组 nums 表示 1 到 n 的一个排列。我们按照元素在 nums 中的顺序依次插入一个初始为空的二叉搜索树&#xff08;BST&#xff09;。请你统…

精选6款前端动画特效分享(附在线演示)

分享6款好玩的前端动画特效 其中有CSS动画、canvas动画、js小游戏等等 下方效果图可能不是特别的生动 那么你可以点击在线预览进行查看相应的动画特效 同时也是可以下载该资源的 CSS日食与太阳碰撞动画 一款基于CSS实现的日食动画特效 碰撞物体会从右侧旋转向太阳靠近重合而后…

程序员成被裁最多的职业,互联网成围城,“转码”神话破灭?

随着互联网蓬勃发展&#xff0c;“转码”一直被视为找不到工作时的灵丹妙药。所谓转码&#xff0c;就是转行成为程序员。专业太偏&#xff1f;没关系&#xff0c;可以转码。失业了&#xff1f;没关系&#xff0c;可以转码。不知道该做什么工作&#xff1f;那就转码吧。程序员薪…

idea提交代码到git或svn上时,怎么忽略.class、.iml文件和文件夹等不必要的文件

第一种方法 在Setings–> Editor --> File Types -->Ignore files and folders中添加需要忽略的文件和文件夹&#xff1a; .idea 忽略 .idea 的文件或者文件夹 *.iml 忽略后缀为iml的文件 target 忽略target 文件或目录以及目录下的所有文件注…

Linux学习之文件系统与动静态库

目录 一&#xff0c;文件的管理 什么是磁盘&#xff1f; 磁盘的逻辑抽象结构 格式化 inode 挂载 软硬链接 二&#xff0c;动静态库 什么是动静态库&#xff1f; 1.站在库的制作者角度 静态库&#xff1a; 制作一个静态库 2.站在静态库使用者的角度 动态库 作为制…

80.网游逆向分析与插件开发-背包的获取-自动化助手显示物品数据1

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;升级Notice类获得背包基址-CSDN博客 码云地址&#xff08;ui显示角色数据 分支&#xff09;&#xff1a;https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号&#xff1a;3be017de38c50653b…

操作日志应记录编辑的前后内容变化

总体思路是增加一个注解类&#xff0c;将注解加到要进行记录变化的Java类属性上却可。 上代码&#xff1a; 1. 实现注解类&#xff1a; Target(ElementType.FIELD) Retention(RetentionPolicy.RUNTIME) public interface FieldName {String value();boolean isIgnoreNull()…

day34_js

今日内容 0 复习昨日 1 事件 1.1 事件介绍 1.2 事件绑定方式 1.3 不同事件的演示 2 DOM操作 2.1 概述 2.2 查找元素 2.3 元素内容的查找和设置 2.4 元素属性的查找和设置 2.5 元素CSS样式的查找和设置 2.6 创建元素 2.7 创建文本节点 2.8 追加元素 2.9 删除元素 3 案例练习 0 复…

TCP 异常断开连接【重点】

参考链接 https://xiaolincoding.com/network/3_tcp/tcp_down_and_crash.html https://xiaolincoding.com/network/3_tcp/tcp_unplug_the_network_cable.html#%E6%8B%94%E6%8E%89%E7%BD%91%E7%BA%BF%E5%90%8E-%E6%9C%89%E6%95%B0%E6%8D%AE%E4%BC%A0%E8%BE%93 关键词&#xff1a…

无际线复选框

效果演示 实现了一个网格布局&#xff0c;其中每个网格是一个复选框&#xff0c;可以选择是否显示。每个复选框都有一个漂浮的天花板&#xff0c;表示它是一个房间的天花板。每个房间的天花板都有一个不同的形状和颜色&#xff0c;分别对应不同的房间。整个页面的背景是一个由两…

echarts多个折线图共用X轴,实现tooltip合并和分离

echarts共享X轴案例&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>Document</…

DevSecOps核心流程基本组成分析

目录 一、DevSecOps核心流程基本组成 1.1 核心流程概述 1.2 DevSecOps 核心流程说明 1.2.1 核心流程图 1.2.2 流程说明 1.2.2.1 持续开发 1.2.2.2 持续构建 1.2.2.3 持续运维 1.2.2.4 持续监控 二、DevSecOps核心流程经典场景 2.1 Azure DevSecOps核心流程 2.1.1 核…

HCIA-HarmonyOS设备开发认证-3.内核基础

目录 前言目标一、进程与线程待续。。。 前言 对于任何一个操作系统而言&#xff0c;内核的运行机制与原理是最为关键的部分。本章内容从多角度了解HarmonyOS的内核运行机制&#xff0c;涵盖进程与线程的概念&#xff0c;内存管理机制&#xff0c;网络特性&#xff0c;文件系统…

HTTP连接池在Java中的应用:轻松应对网络拥堵

网络拥堵是现代生活中无法避免的问题&#xff0c;尤其是在我们这个“点点点”时代&#xff0c;网页加载速度直接影响到我们的心情。此时&#xff0c;我们需要一位“救世主”——HTTP连接池。今天&#xff0c;就让我们一起探讨一下&#xff0c;这位“救世主”如何在Java中大显神…

Linux下安装openresty

Linux下安装openresty 十一、Linux下安装openresty11.1.概述11.2.下载OpenResty并安装相关依赖&#xff1a;11.3.使用wget下载:11.4.解压缩:11.5.进入OpenResty目录:11.6.编译和安装11.7.进入OpenResty的目录&#xff0c;找到nginx&#xff1a;11.8.在conf目录下的nginx.conf添…

C# 获取计算机信息

目录 一、本机信息 1、本机名 2、获得本机MAC地址 3、获得计算机名 4、显示器分辨率 5、主显示器分辨率 6、系统路径 二、操作系统信息 1、操作系统类型 2、获得操作系统位数 3、获得操作系统版本 三、处理器信息 1 、处理器个数 四、CPU信息 1、CPU的个数 2、…