flink 写kafka_flink消费kafka的offset与checkpoint

生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis。使用的flink版本为1.11.1。

为了防止写入hive的文件数量过多,我设置了checkpoint为30分钟。

env.enableCheckpointing(1000 * 60 * 30); // 1000 * 60 * 30 => 30 minutes

达到的效果就是每30分钟生成一个文件,如下:

hive> dfs -ls /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/ ;Found 10 items-rw-r--r--   3 hdfs hive          0 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/_SUCCESS-rw-r--r--   3 hdfs hive     248895 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10911-rw-r--r--   3 hdfs hive     306900 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10912-rw-r--r--   3 hdfs hive     208227 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10913-rw-r--r--   3 hdfs hive     263586 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10911-rw-r--r--   3 hdfs hive     307723 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10912-rw-r--r--   3 hdfs hive     196777 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10913-rw-r--r--   3 hdfs hive     266984 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10911-rw-r--r--   3 hdfs hive     338992 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10912-rw-r--r--   3 hdfs hive     216655 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10913hive> 

但是,同时也观察到归属于这个作业的kafka消费组积压数量,每分钟消费数量,明显具有周期性消费峰值。

比如,对于每30分钟时间间隔度的一个观察,前面25分钟的“每分钟消费数量”都是为0,然后,后面5分钟的“每分钟消费数量”为300k。同理,“消费组积压数量”也出现同样情况,积压数量一直递增,但是到了30分钟的间隔,就下降到数值0。如图。

c21b958d23606b4f75318b76720e19e7.png

消费组每分钟消费数量

cec0f3bf8cae851c4deb53afd08d6dc6.png

消费组积压数量

但其实,通过对hbase,hive,redis的观察,数据是实时写入的,并不存在前面25分钟没有消费数据的情况。

查阅资料得知,flink会自己维护一份kafka的offset,然后checkpoint时间点到了,再把offset更新回kafka。

为了验证这个观点,“flink在checkpoint的时候,才把消费kafka的offset更新回kafka”,同时,观察,savepoint机制是否会重复消费kafka,我尝试写一个程序,逻辑很简单,就是从topic "test"读取数据,然后写入topic "test2"。特别说明,这个作业的checkpoint是1分钟。

package com.econ.powercloud.jobsTest;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import javax.annotation.Nullable;import java.util.Properties;public class TestKafkaOffsetCheckpointJob {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(1000 * 60);        ParameterTool parameterTool = ParameterTool.fromArgs(args);        String bootstrapServers = parameterTool.get("bootstrap.servers") == null ? "localhost:9092" : parameterTool.get("bootstrap.servers");        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", bootstrapServers);        properties.setProperty("group.id", "prod-econ-flink-TestKafkaOffsetCheckpointJob-local");        properties.setProperty("transaction.timeout.ms", String.valueOf(1000 * 60 * 5));        String topic = "test";        FlinkKafkaConsumer stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);        DataStreamSource stringDataStreamSource = env.addSource(stringFlinkKafkaConsumer);        String producerTopic = "test2";        FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(producerTopic, new KafkaSerializationSchema() {            @Override            public ProducerRecord serialize(String element, @Nullable Long timestamp) {                return new ProducerRecord<>(producerTopic, element.getBytes());            }        }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);        stringDataStreamSource.addSink(kafkaProducer);        env.execute("TestKafkaOffsetCheckpointJob");    }}

提交作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 5fdd14f7fd3c93287635c9d61180d8a6[econ@dev-hadoop-node-c ~]$ 

使用"kafka-console-producer.sh"往topic "test"生成消息"a1":

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:>a1>

使用"kafka-console-consumer.sh"消费topic "test2"的消息:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1

证明作业逻辑本身没有问题,实现' 从topic "test"读取数据,然后写入topic "test2" '。

使用"kafka-consumer-groups.sh"观察消费组"prod-econ-flink-TestKafkaOffsetCheckpointJob-local"的积压数量,重点观察指标"LAG",可以看到LAG为1 :

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               3               0               -               -               -test            0          3               3               0               -               -               -test            2          5               6               1               -               -               -2020年10月18日 星期日 20时09分45秒 CSTRdeMacBook-Pro:kafka r$ 

证明flink消费了kafka数据后,不会更新offset到kafka。

停止作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 5fdd14f7fd3c93287635c9d61180d8a6Suspending job "5fdd14f7fd3c93287635c9d61180d8a6" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-5fdd14-53dfd9f8eccd[econ@dev-hadoop-node-c ~]$ 

再次启动作业,但是,不使用上面生成的savepoint:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 130568a2eeec96296237ed3e1f280f83[econ@dev-hadoop-node-c ~]$ 

观察topic "test2",发现,同样的数据"a1"被生产进入:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1

证明:flink在没有使用savepoint的时候,消费kafka的offset还是从kafka自身获取。

再仔细观察topic "test"的“消费组积压数量”,注意在"20时10分05秒"还观察到积压数值1,但是在"20时10分08秒"就发现积压数值都是0.

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               3               0               -               -               -test            0          3               3               0               -               -               -test            2          5               6               1               -               -               -2020年10月18日 星期日 20时10分05秒 CSTRdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               3               0               -               -               -test            0          3               3               0               -               -               -test            2          6               6               0               -               -               -2020年10月18日 星期日 20时10分08秒 CSTRdeMacBook-Pro:kafka r$ 

这是因为,在"20:10:06"完成了一次checkpoint,把offset更新回kafka。

f92fa33a34ef11e724fc272f6445e8d8.png

Flink Checkpoint History

下面接着测试flink使用savepoint的情况下,是否会重复消费kafka数据。

使用"kafka-console-producer.sh"往topic "test"生成消息"a2":

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092>a1>a2>

使用"kafka-console-consumer.sh"消费topic "test2"的消息:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2

停止作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 bb8b4ba7ddaad869c6469fab5e81d179Suspending job "bb8b4ba7ddaad869c6469fab5e81d179" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60[econ@dev-hadoop-node-c ~]$ 

观察topic "test"的“消费组积压数量”,发现LAG还是1:

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               4               1               -               -               -test            0          3               3               0               -               -               -test            2          6               6               0               -               -               -2020年10月18日 星期日 20时28分39秒 CSTRdeMacBook-Pro:kafka r$ 

flink使用savepoint启动作业,注意参数"-s":

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d -s 'hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60' ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID d6cb6e1a6f9c0816ac4b61a1df38ddeb[econ@dev-hadoop-node-c ~]$ 

观察"kafka-console-consumer.sh"消费topic "test2"的情况,没有新的消息被打印:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2

再观察“消费组积压数量”,发现LAG值已经全部是0。

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          4               4               0               -               -               -test            0          3               3               0               -               -               -test            2          6               6               0               -               -               -2020年10月18日 星期日 20时31分43秒 CSTRdeMacBook-Pro:kafka r$ 

证明:flink使用savepoint启动作业,不会重复消费kafka数据,也会正确更新kafka的offset。

重申,以上试验证明:

  1. flink消费了kafka数据后,不会更新offset到kafka,直到checkpoint完成。
  2. flink在没有使用savepoint重启作业的时候,消费kafka的offset还是从kafka自身获取,存在重复消费数据的情况。
  3. flink使用savepoint重启作业,不会重复消费kafka数据,也会正确更新kafka的offset。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/507663.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文阅读:超高分辨率图像中快速、准确的条码检测

摘要 由于目标对象的尺度不同&#xff0c;超高分辨率 (UHR) 图像中的对象检测长期以来一直是计算机视觉中的一个具有挑战性的问题。在条码检测方面&#xff0c;将 UHR 输入图像调整为更小的尺寸通常会导致相关信息的丢失&#xff0c;而直接处理它们的效率很高且计算成本很高。…

android 多线程 场景,精选Android初中级面试题 (三): 深探Handler,多线程,Bitmap

码个蛋(codeegg) 第 930 次推文作者&#xff1a;Focusing链接&#xff1a;https://juejin.im/post/5c85cead5188257c6703af47Handler1、谈谈消息机制Handler作用 &#xff1f;有哪些要素 &#xff1f;流程是怎样的 &#xff1f;参考回答&#xff1a;负责跨线程通信&#xff0c;…

python计算bmi的编程_Python学习-计算BMI的小程序

示例&#xff1a;小明身高1.75&#xff0c;体重80.5kg。请根据BMI公式(体重除以身高的平方)帮小明计算他的BMI指数&#xff0c;并根据BMI指数&#xff1a;低于18.5&#xff1a;过轻18.5-25&#xff1a;正常25-28&#xff1a;过重28-32&#xff1a;肥胖高于32&#xff1a;严重肥…

通过超分辨率重构来提高二维码的对比度

1 问题描述 &#xff08;1&#xff09;图像分辨率小。例如一些嵌入在海报&#xff08;如图1&#xff09;或远距离拍摄的码&#xff0c;其分辨率远小于通常情况下的码图像。 图1.海报中的二维码占比很小 &#xff08;2&#xff09;图像质量较低。有很多是经过了多次的压缩和转…

android web 访问数据库,Web下的JDBC访问数据库的基本步骤

Web下的JDBC访问数据库的基本步骤(2012-06-02 12:09:33)在Java程序中连接数据库的一般步骤分为一下几部分&#xff0c;我摘录出来&#xff0c;跟大家分享。(1)将数据库的JABC驱动加载到classpath中&#xff0c;在基于JavaEE的Web应用开发过程中&#xff0c;通常把JDBC驱动放在W…

linux 磁盘扩容_记录一次ESXi Linux在线扩容,不重启系统

因为工作需要&#xff0c;需要将运行在ESXi主机上面的一台Centos 里面的一个LV卷进行扩容&#xff0c;下面记录了此次扩展的详细过程&#xff0c;整个过程&#xff0c;不需要重启服务器。1. 首先通过df-h 查看当前磁盘结构如下&#xff1a;我们此次的最终目标&#xff0c;就是将…

我目前的主要研究方向

推荐系统 https://blog.csdn.net/search_129_hr/article/details/118680185 游戏难度动态调整 https://blog.csdn.net/search_129_hr/article/details/119204173 睡眠声音识别与增强 https://blog.csdn.net/search_129_hr/article/details/118568452 二维码图像识别与增强…

linux的任务计划6,Linux计划任务

Linux计划任务&#xff1a;未来的某个时间执行一次任务&#xff0c;或者周期性执行某个任务&#xff0c;执行结果会通过邮件通知定时任务&#xff1a;at batch周期性任务&#xff1a;crontab系统任务调度&#xff1a;/ect/crontab用户任务调度&#xff1a;/var/spool/cronmail​…

aws python lambda_python – AWS Lambda发送HTTP请求

这可能是一个简单回答的问题,但我似乎无法弄明白.背景&#xff1a;我有一个python Lambda函数来获取数据库中的更改,然后使用HTTP将json中的更改发布到URL.我正在使用urllib2这样&#xff1a;# this runs inside a loop, in reality my error handling is much betterrequest …

标签分布学习相关研究

1 标记增强及标签分布学习 https://mp.weixin.qq.com/s/cXiR-UeJkcdkljJvE2eERw http://palm.seu.edu.cn/xgeng/files/sc-info18.pdf https://baijiahao.baidu.com/s?id1687693358774525583&wfrspider&forpc https://blog.csdn.net/weixin_42001089/article/details/…

android item三种,Android RecyclerView中的ItemDecoration的几种绘制方法

如题&#xff0c;我们使用recyclerview的时候&#xff0c;如果没有设置显示条目的margin&#xff0c;或者padding的话&#xff0c;是没有分割线效果的。那么除去使用margin或padding,其余的方法是用itemdecoration绘制分割线我们绘制分割线的时候通常会使用drawable去绘制&…

上传文件和提交textfield_0基础掌握Django框架(37)文件上传

为了更好的学习效果&#xff0c;请搭配视频教程一起学习&#xff1a;Django零基础到项目实战 - 网易云课堂​study.163.com文件上传&#xff1a;文件上传是网站开发中非常常见的功能。这里详细讲述如何在Django中实现文件的上传功能。前端HTML代码实现&#xff1a;在前端中&…

2021年第3周LDL方向的周报

LDL小组&#xff1a; 如何快速进入研究状态 &#xff08;1&#xff09;系列性的工作&#xff0c;papermaker&#xff1a;读文献&#xff0c;顶刊顶会&#xff0c;综述性文章–》进行扩展&#xff0c;研究主线 &#xff08;2&#xff09;接手师兄师姐的工作–》并且对已有的工作…

2021年第3周人工智能方向的周报

快速进入研究&#xff1a; &#xff08;1&#xff09;读文献–》综述性的文献–》你自己去综述性文献 &#xff08;2&#xff09;已有的工作 &#xff08;3&#xff09;有没有相关的数据&#xff1f; 下一步事情&#xff1a; &#xff08;1&#xff09;想一想自己的横向做什么…

华为p10刷原生android,华为p10怎么刷机 华为p10刷机方法【详细介绍】

喜欢折腾手机的用户一定对于手机root权限获取不陌生&#xff0c;root后虽然不能享受官方联保服务但同时带来的好处不用小编多说。前面给大家介绍了 华为p10 刷入第三方recovery教程&#xff0c;现在华为p10刷机包已经放出来&#xff0c;小编给大家带来华为p10刷机权限获取教程。…

pythoncookie自动模拟登录_用Python模拟技巧带你实现自动抽屉登录自动点赞

原标题&#xff1a;用Python模拟技巧带你实现自动抽屉登录&自动点赞/1 前言/嘿&#xff0c;各位小伙伴们晚上好呀&#xff0c;今天小编又给大家带来干货内容啦,今天带来的是,如何自动登录抽屉&#xff0c;并且点赞&#xff01;原计划是不打算使用selenium的&#xff0c;但是…

同学之间互相出的一些有趣题目

题目1&#xff1a;过隧道时间最短问题 四个人&#xff08;A、B、C、D&#xff09;晚上过隧道,并且只有一个手电筒,每次只能过两个人,并且还需要有一个人回来传递手电筒,四个人过隧道的速度不一样,分别是1、2、5、10分钟,问怎么过隧道最快?总共用多长时间? 问题扩展&#xf…

android studio sqlitedatabase,在SQLite数据库Android Studio上使用预填充数据库

本问题已经有最佳答案&#xff0c;请猛点这里访问。在这里&#xff0c;我想使用我预先填充的SQLite数据库到我的Android应用程序。 因此&#xff0c;首次运行时&#xff0c;它会自动从assets文件夹中复制数据库&#xff0c;并在我的Android应用程序中将其用作数据库。到目前为止…

使用container的嵌套_ElementUI 技术揭秘(4)— Container 布局容器组件的设计与实现。...

前言上一篇文章我们分析了 Layout 布局组件的设计和实现&#xff0c;它的应用场景通常是局部布局。对于整个页面的布局&#xff0c;element-ui 提供了 Container 布局容器组件&#xff0c;专门用于 PC 管理后台页面的整体布局。需求分析我们先通过几幅图看一下页面的常见布局。…

推荐系统最新研究进展

算法相关的综述 从200多篇顶会论文看推荐系统前沿方向与最新进展 基于强化学习的推荐系统相关研究进展、经典论文整理分享 推荐系统去偏&#xff08;Debiased Recommendation&#xff09;研究进展概述 Deep Learning Based Recommender System: A Survey and New Perspectives…