Flink任务自动恢复脚本

线上环境经常遇到flink任务挂掉得问题,这里写一个自动恢复脚本

# 我这里使用得datastream api编写的任务,类class路径
MAIN_CLASS="com.flink.job.CommonFlinkStreamJob"
# 我的代码包
JAR_PATH="/home/dev/flink/lib/flink-cdc-1.0.jar"
FLINK_HOME="/home/dev/flink"
FLINK_CHECKPOINT_PATH="/home/dev/flink/flink-checkpoints"
# 配置新得任务时需要修改下方两个配置
# 这个是任务参数,你没有可以不写,取决于你的class任务里面是否设置
JOB_ARGS="--jobKey cdcmysql"
# 这个是你给这个任务的名字
JOB_NAME="cdcmysql"# 获取正在运行的任务id,5秒*10次的容错。
get_job_id() {local retry_count=0                         # 初始化重试计数local max_retries=10                        # 最大重试次数local j_id=""                               # 用于存储Job IDwhile [ $retry_count -lt $max_retries ]; do # 当重试次数小于最大重试次数时# 使用flink命令获取正在运行的任务,并截取到job_idj_id=$($FLINK_HOME/bin/flink list | grep -o '[0-9a-f]\{32\} : '"$JOB_NAME"' (RUNNING)')if [ -n "$j_id" ]; then # 如果成功获取到Job IDbreak                 # 跳出循环fisleep 5 # 等待5秒后重试echo "尝试获取jobid $((retry_count + 1))/$max_retries "retry_count=$((retry_count + 1)) # 重试计数加1doneJ_ID=$j_id # 将获取到的Job ID赋值给全局变量J_ID
}
start_job() {echo "Starting Flink job..."# 检查是否存在之前保存的作业ID文件if [ -f "${FLINK_HOME}/job_id_$JOB_NAME.tmp" ]; then# 从文件中读取最新记录的job_id,空格是必须的job_id=$(grep "${JOB_NAME} " ${FLINK_HOME}/job_id_$JOB_NAME.tmp | cut -d ':' -f 1 | sed 's/ //g')if [ -n "$job_id" ]; then# 查找该job_id的检查点目录checkpoint_dirs=$(find "$FLINK_CHECKPOINT_PATH/$job_id" -type d -name 'chk-*')echo "job[$JOB_NAME]检查点查询,job_id: $job_id, 路径:$checkpoint_dirs"if [ -z "$checkpoint_dirs" ]; then# 路径为空,查找上一个可恢复的检查点路径pre_job_id=$(grep "${JOB_NAME}bak" ${FLINK_HOME}/job_id_$JOB_NAME.tmp | cut -d ':' -f 1 | sed 's/ //g')if [ -n "$pre_job_id" ]; thenpre_checkpoint_dirs=$(find "$FLINK_CHECKPOINT_PATH/$pre_job_id" -type d -name 'chk-*')if [ -z "$pre_checkpoint_dirs" ]; thenecho "没有找到检查点路径,尝试从上一个检查点恢复:${pre_job_id},路径:$pre_checkpoint_dirs"echo "无法恢复任务,停止脚本,请手动设置检查点:从上一次成功执行并且$FLINK_CHECKPOINT_PATH/job_id目录下有chk开头的路径恢复"exit 1elseecho "没有找到检查点路径,尝试从上一个检查点恢复:${pre_job_id},路径:$pre_checkpoint_dirs"job_id=$pre_job_idecho "尝试成功,正在恢复任务进度,从Checkpoint path:$pre_checkpoint_dirs"$FLINK_HOME/bin/flink run -s $pre_checkpoint_dirs -c $MAIN_CLASS $JAR_PATH $JOB_ARGS >>$FLINK_HOME/log/startjob$JOB_NAME.out &fielseecho "没有找到检查点路径,尝试从上一个检查点恢复:${pre_job_id},路径:$pre_checkpoint_dirs"# 没有备份jobidecho "无法恢复任务,停止脚本,请手动设置检查点:从上一次成功执行并且$FLINK_CHECKPOINT_PATH/job_id目录下有chk开头的路径恢复"exit 1fielseecho "正在恢复任务进度,从Checkpoint path:$checkpoint_dirs"$FLINK_HOME/bin/flink run -s $checkpoint_dirs -c $MAIN_CLASS $JAR_PATH $JOB_ARGS >>$FLINK_HOME/log/startjob$JOB_NAME.out &fisleep 10get_job_idif [ -n "$J_ID" ]; thenecho "Job启动脚本执行成功. JobID: $J_ID"echo "$J_ID" >"${FLINK_HOME}/job_id_$JOB_NAME.tmp"# 将上一个可恢复的检查点job作为备份记录echo "${job_id} : ${JOB_NAME}bak" >>"${FLINK_HOME}/job_id_$JOB_NAME.tmp"elseecho "Failed to get JobID after $max_retries retries. Check if the job is started."fifielseecho "job_id_$JOB_NAME.tmp 文件不存在,starting a new job!!!"$FLINK_HOME/bin/flink run -c $MAIN_CLASS $JAR_PATH $JOB_ARGS >>$FLINK_HOME/log/startjob$JOB_NAME.out &sleep 10get_job_idif [ -n "$J_ID" ]; thenecho "Job启动脚本执行成功. JobID: $J_ID"echo "$J_ID" >"${FLINK_HOME}/job_id_$JOB_NAME.tmp"elseecho "Failed to get JobID after $max_retries retries. Check if the job is started."fifi
}
watch_job() {while true; dostatus=$($FLINK_HOME/bin/flink list | grep -o '[0-9a-f]\{32\} : '"$JOB_NAME"' (RUNNING)')if [ -z "$status" ]; thenecho "Flink job is not running. Restarting..."start_jobelseecho "$(date +"%Y-%m-%d %H:%M:%S") Flink job is running."fisleep 180 # 每3分钟检查一次任务状态done
}
watch_job

这个脚本的流程:从tmp文件里面读取jobID如果读取到就查找checkpoints路径从而恢复,如果第一个jobid没有找到tmp文件里面会记录上一次可用检查点的jobid,然后恢复;启动任务后会调用函数有容错的读取jobid从而写入到tmp文件中,并且会使用一个线程一直去监测任务的运行情况,执行上述的逻辑。

不足

1、这个脚本在一些情况下会停止,使用linux定时任务检查这个脚本并在停止后重新拉起是个可行
的方案。
2、flink cdc是依赖于binlog运行的,如果你也使用了rds,请注意阿里云的日志过期策略,频繁的主动停止任务并恢复是我正在尝试的方案(公司不愿意花钱增加日志存储周期)

讨论

在flink cdc (mysql -> starrocks)的示例中运行,你在cdc过程中又遇到哪些问题,是如何解决的呢?可以留言讨论一下,给出常见问题的解决思路:
https://www.alibabacloud.com/help/zh/flink/support/faq-about-cdc

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/16607.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序进阶(1)--自定义组件

自定义组件 1.1 什么是自定义组件 开发文档:https://developers.weixin.qq.com/miniprogram/dev/framework/custom-component/ 小程序中常常会有些通用的交互模块,比如:下拉选择列表、搜索框、日期选择器等;这些界面交互模块可…

C-数据结构-动态库

/* 动态库基本实现 libxx.so xx是库名 .so 后缀 gcc -shared -fpic -o libxx.so yyy.c发布到 /usr/local/include/ /usr/local/lib在 /etc/ld.so.conf 中添加路径 /sbin/ldconfig 重读 /etc/ld.so.conf 为了上面的步骤生效 gcc -I/usr/local/include -L/usr/local/…

数据挖掘案例-航空公司客户价值分析

文章目录 1. 案例背景2. 分析方法与过程2.1 分析流程步骤2.2 分析过程1. 数据探索分析2. 描述性统计分析3. 分布分析1.客户基本信息分布分析2. 客户乘机信息分布分析3. 客户积分信息分布分析 4. 相关性分析 3. 数据预处理3.1 数据清洗3.2 属性约束3. 3 数据转换 4. 模型构建4. …

spring 指定bean id 来加载相同类名 不同包路径的bean 并使用set方法注入

业务场景,数据源可能是mysql也可能是impala。在mapper层级方法都是一样的。所以抽象出来一个父接口,再分别用mysql包下面的一个mapper和一个impala包mapper接口分别继承它。注意这俩mapper的beanid要区分开。 使用:首先有两个bean在不同的包…

超融合架构下,虚拟机高可用机制如何构建?

作者:SmartX 产品部 钟锦锌 虚拟机高可用(High Availability,简称 HA)是虚拟化/超融合平台最常用、关键的功能之一,可在服务器发生故障时通过重建业务虚拟机以降低故障对业务带来的影响。因此,为了充分保障…

ubuntu22.04下 easyconnect+输入法安装

先使用对应ubuntu版本的easyconnect安装 sudo dpkg -i EasyConnect_x64_7_6_7_3.deb 下载压缩包servicePack,并解压缩 cd 下载路径/servicePack sudo cp * /usr/share/sangfor/EasyConnect/ 打开easyConnect /usr/share/sangfor/EasyConnect/EasyConnect 此处…

pid中的d到底是什么意思?微分到底是用来做什么的,什么情况下用,避免入坑实际案例中的使用-----------PDI中的D阻尼调节

1,PID中表示的含义是什么? 比例(proportional):放大比例-------表示现在 0.2 积分(integral):误差积分------过去 0.04 微分 (derivative):阻尼 ------未来 0.002 在调节…

IDEA设置运行内存

1.开启内存指示条​​​​​​​ 查看idea右下角​​​​​​​ 2.环境变量查看ideaVM地址,没有的话那就是默认的配置文件: idea 安装 bin 目录下 idea64.exe.vmoptions 3.去对应路径修改内存参数大小 4.重启IDEA,end

体育赛事直播系统源码开发:社区论坛模块如何实现引流与增收双赢

在当今数字化时代,体育直播平台不仅是赛事观看的窗口,更是一个互动和交流的社区,以及是一场关于用户体验、用户粘性以及商业模式创新的综合较量。为了在这片红海市场中脱颖而出,平台必须采取更加精细化和多元化的运营策略。其中&a…

前端命令行部署

最近接了一个项目,发版本需要把dist包给后端部署服务,再加上产品那边需求不稳定,改了又改,一天要发好几个,不仅跟我配合的后端不胜其烦,本人也是很烦。最近在网上看到一个npm自主部署的包–deploy cli工具&…

搜维尔科技:2024中国力触觉技术及应用会议,搜维尔科技携Haption力反馈设备参会

2024中国力触觉技术及应用会议,搜维尔科技携Haption力反馈设备参会 搜维尔科技:2024中国力触觉技术及应用会议,搜维尔科技携Haption力反馈设备参会

香橙派 AIpro开发板:开启AI视觉的无限可能

前言 在当今这个由数据和智能驱动的时代, 人工智能(AI) 已经成为推动技术创新和实现自动化的关键。 特别是在计算机视觉领域,AI的潜能被无限放大,它使得机器能够“看见”并理解视觉世界,从而执行复杂的任务…

LangChain 0.2 - 对话式RAG

文章目录 一、项目说明二、设置1、引入依赖2、LangSmith 三、Chains1、添加聊天记录Contextualizing the question聊天记录状态管理 2、合并 四、Agents1、检索工具2、代理建造者3、合并 五、下一步 本文翻译整理自:Conversational RAG https://python.langchain.co…

加宽全连接

一、Functional API 搭建神经网络模型 1.对宽深神经网络模型进行手写数字识别: 运行代码: inputs keras.layers.Input(shapeX_train.shape[1:]) hidden1 keras.layers.Dense(300,activation"relu")(inputs) hidden2 keras.layers.Dense(…

MySQL中视图是什么,有什么作用

目录 一、视图的简介 1.1 什么是视图? 1.2 为什么使用视图? 1.3 视图有哪些规则与限制? 1.4 视图能否更新? 二、视图的创建 三、视图的作用 3.1 用视图简化复杂的联结 3.2 用视图格式化检索出的数据 3.3 用视图过滤数据…

梭住绿色,植梦WILL来,容声冰箱“节能森林计划”再启航

近日,容声冰箱再度开启了“节能森林计划”绿色公益之旅。 据「TMT星球」了解,此次活动深入到阿拉善荒漠化地带,通过实地考察和亲身体验,见证容声了“节能森林计划”项目的持续落地和实施效果。 2022年,容声冰箱启动了…

【电控实物-PMSM】

遗留问题 电流环闭环 电流环频率会受到编码器回传频率影响? Ld&Lq辨识 L观测器设计验证 滑膜观测器/高频注入 前馈(加大负载) 各种电流控制模式: psms规格书 参数辨识 Ld&Lq

代码随想录训练营Day 40|力扣509. 斐波那契数、70. 爬楼梯、746. 使用最小花费爬楼梯

1.斐波那契数 代码随想录 视频&#xff1a;手把手带你入门动态规划 | LeetCode&#xff1a;509.斐波那契数_哔哩哔哩_bilibili 代码&#xff1a; class Solution { public:int fib(int n) {if(n < 1) return n;vector<int> dp(n 1);dp[0] 0;dp[1] 1;for(int i 2;…

qt 布局学习笔记

目录 qt下载地址&#xff1a; widget 宽高 管理信息列表源码 c版&#xff1a; pro文件&#xff1a; qt 设置水平布局&#xff0c;里面有两个按钮&#xff0c;每个按钮就变的很宽&#xff0c;怎么设置按钮的精确位置 设置固定大小&#xff1a; 使用弹性空间&#xff08;…

高效掌控速卖通自养号测评:成本、步骤、技巧全方位掌握

在跨境电商的汹涌浪潮中&#xff0c;速卖通犹如一颗璀璨的领航星&#xff0c;引领着无数寻求海外拓展的企业和商家驶向国际市场的广阔海域。从最初的C2C模式起步&#xff0c;速卖通历经蜕变&#xff0c;如今已华丽转身成为B2C跨境电商领域的翘楚&#xff0c;承载着无数中国卖家…