flink 写kafka_flink消费kafka的offset与checkpoint

生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis。使用的flink版本为1.11.1。

为了防止写入hive的文件数量过多,我设置了checkpoint为30分钟。

env.enableCheckpointing(1000 * 60 * 30); // 1000 * 60 * 30 => 30 minutes

达到的效果就是每30分钟生成一个文件,如下:

hive> dfs -ls /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/ ;Found 10 items-rw-r--r--   3 hdfs hive          0 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/_SUCCESS-rw-r--r--   3 hdfs hive     248895 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10911-rw-r--r--   3 hdfs hive     306900 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10912-rw-r--r--   3 hdfs hive     208227 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10913-rw-r--r--   3 hdfs hive     263586 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10911-rw-r--r--   3 hdfs hive     307723 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10912-rw-r--r--   3 hdfs hive     196777 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10913-rw-r--r--   3 hdfs hive     266984 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10911-rw-r--r--   3 hdfs hive     338992 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10912-rw-r--r--   3 hdfs hive     216655 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10913hive> 

但是,同时也观察到归属于这个作业的kafka消费组积压数量,每分钟消费数量,明显具有周期性消费峰值。

比如,对于每30分钟时间间隔度的一个观察,前面25分钟的“每分钟消费数量”都是为0,然后,后面5分钟的“每分钟消费数量”为300k。同理,“消费组积压数量”也出现同样情况,积压数量一直递增,但是到了30分钟的间隔,就下降到数值0。如图。

c21b958d23606b4f75318b76720e19e7.png

消费组每分钟消费数量

cec0f3bf8cae851c4deb53afd08d6dc6.png

消费组积压数量

但其实,通过对hbase,hive,redis的观察,数据是实时写入的,并不存在前面25分钟没有消费数据的情况。

查阅资料得知,flink会自己维护一份kafka的offset,然后checkpoint时间点到了,再把offset更新回kafka。

为了验证这个观点,“flink在checkpoint的时候,才把消费kafka的offset更新回kafka”,同时,观察,savepoint机制是否会重复消费kafka,我尝试写一个程序,逻辑很简单,就是从topic "test"读取数据,然后写入topic "test2"。特别说明,这个作业的checkpoint是1分钟。

package com.econ.powercloud.jobsTest;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import javax.annotation.Nullable;import java.util.Properties;public class TestKafkaOffsetCheckpointJob {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(1000 * 60);        ParameterTool parameterTool = ParameterTool.fromArgs(args);        String bootstrapServers = parameterTool.get("bootstrap.servers") == null ? "localhost:9092" : parameterTool.get("bootstrap.servers");        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", bootstrapServers);        properties.setProperty("group.id", "prod-econ-flink-TestKafkaOffsetCheckpointJob-local");        properties.setProperty("transaction.timeout.ms", String.valueOf(1000 * 60 * 5));        String topic = "test";        FlinkKafkaConsumer stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);        DataStreamSource stringDataStreamSource = env.addSource(stringFlinkKafkaConsumer);        String producerTopic = "test2";        FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(producerTopic, new KafkaSerializationSchema() {            @Override            public ProducerRecord serialize(String element, @Nullable Long timestamp) {                return new ProducerRecord<>(producerTopic, element.getBytes());            }        }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);        stringDataStreamSource.addSink(kafkaProducer);        env.execute("TestKafkaOffsetCheckpointJob");    }}

提交作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 5fdd14f7fd3c93287635c9d61180d8a6[econ@dev-hadoop-node-c ~]$ 

使用"kafka-console-producer.sh"往topic "test"生成消息"a1":

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:>a1>

使用"kafka-console-consumer.sh"消费topic "test2"的消息:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1

证明作业逻辑本身没有问题,实现' 从topic "test"读取数据,然后写入topic "test2" '。

使用"kafka-consumer-groups.sh"观察消费组"prod-econ-flink-TestKafkaOffsetCheckpointJob-local"的积压数量,重点观察指标"LAG",可以看到LAG为1 :

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               3               0               -               -               -test            0          3               3               0               -               -               -test            2          5               6               1               -               -               -2020年10月18日 星期日 20时09分45秒 CSTRdeMacBook-Pro:kafka r$ 

证明flink消费了kafka数据后,不会更新offset到kafka。

停止作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 5fdd14f7fd3c93287635c9d61180d8a6Suspending job "5fdd14f7fd3c93287635c9d61180d8a6" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-5fdd14-53dfd9f8eccd[econ@dev-hadoop-node-c ~]$ 

再次启动作业,但是,不使用上面生成的savepoint:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 130568a2eeec96296237ed3e1f280f83[econ@dev-hadoop-node-c ~]$ 

观察topic "test2",发现,同样的数据"a1"被生产进入:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1

证明:flink在没有使用savepoint的时候,消费kafka的offset还是从kafka自身获取。

再仔细观察topic "test"的“消费组积压数量”,注意在"20时10分05秒"还观察到积压数值1,但是在"20时10分08秒"就发现积压数值都是0.

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               3               0               -               -               -test            0          3               3               0               -               -               -test            2          5               6               1               -               -               -2020年10月18日 星期日 20时10分05秒 CSTRdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               3               0               -               -               -test            0          3               3               0               -               -               -test            2          6               6               0               -               -               -2020年10月18日 星期日 20时10分08秒 CSTRdeMacBook-Pro:kafka r$ 

这是因为,在"20:10:06"完成了一次checkpoint,把offset更新回kafka。

f92fa33a34ef11e724fc272f6445e8d8.png

Flink Checkpoint History

下面接着测试flink使用savepoint的情况下,是否会重复消费kafka数据。

使用"kafka-console-producer.sh"往topic "test"生成消息"a2":

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092>a1>a2>

使用"kafka-console-consumer.sh"消费topic "test2"的消息:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2

停止作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 bb8b4ba7ddaad869c6469fab5e81d179Suspending job "bb8b4ba7ddaad869c6469fab5e81d179" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60[econ@dev-hadoop-node-c ~]$ 

观察topic "test"的“消费组积压数量”,发现LAG还是1:

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               4               1               -               -               -test            0          3               3               0               -               -               -test            2          6               6               0               -               -               -2020年10月18日 星期日 20时28分39秒 CSTRdeMacBook-Pro:kafka r$ 

flink使用savepoint启动作业,注意参数"-s":

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d -s 'hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60' ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID d6cb6e1a6f9c0816ac4b61a1df38ddeb[econ@dev-hadoop-node-c ~]$ 

观察"kafka-console-consumer.sh"消费topic "test2"的情况,没有新的消息被打印:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2

再观察“消费组积压数量”,发现LAG值已经全部是0。

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          4               4               0               -               -               -test            0          3               3               0               -               -               -test            2          6               6               0               -               -               -2020年10月18日 星期日 20时31分43秒 CSTRdeMacBook-Pro:kafka r$ 

证明:flink使用savepoint启动作业,不会重复消费kafka数据,也会正确更新kafka的offset。

重申,以上试验证明:

  1. flink消费了kafka数据后,不会更新offset到kafka,直到checkpoint完成。
  2. flink在没有使用savepoint重启作业的时候,消费kafka的offset还是从kafka自身获取,存在重复消费数据的情况。
  3. flink使用savepoint重启作业,不会重复消费kafka数据,也会正确更新kafka的offset。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/507663.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文阅读:超高分辨率图像中快速、准确的条码检测

摘要 由于目标对象的尺度不同&#xff0c;超高分辨率 (UHR) 图像中的对象检测长期以来一直是计算机视觉中的一个具有挑战性的问题。在条码检测方面&#xff0c;将 UHR 输入图像调整为更小的尺寸通常会导致相关信息的丢失&#xff0c;而直接处理它们的效率很高且计算成本很高。…

android 多线程 场景,精选Android初中级面试题 (三): 深探Handler,多线程,Bitmap

码个蛋(codeegg) 第 930 次推文作者&#xff1a;Focusing链接&#xff1a;https://juejin.im/post/5c85cead5188257c6703af47Handler1、谈谈消息机制Handler作用 &#xff1f;有哪些要素 &#xff1f;流程是怎样的 &#xff1f;参考回答&#xff1a;负责跨线程通信&#xff0c;…

通过超分辨率重构来提高二维码的对比度

1 问题描述 &#xff08;1&#xff09;图像分辨率小。例如一些嵌入在海报&#xff08;如图1&#xff09;或远距离拍摄的码&#xff0c;其分辨率远小于通常情况下的码图像。 图1.海报中的二维码占比很小 &#xff08;2&#xff09;图像质量较低。有很多是经过了多次的压缩和转…

android web 访问数据库,Web下的JDBC访问数据库的基本步骤

Web下的JDBC访问数据库的基本步骤(2012-06-02 12:09:33)在Java程序中连接数据库的一般步骤分为一下几部分&#xff0c;我摘录出来&#xff0c;跟大家分享。(1)将数据库的JABC驱动加载到classpath中&#xff0c;在基于JavaEE的Web应用开发过程中&#xff0c;通常把JDBC驱动放在W…

linux 磁盘扩容_记录一次ESXi Linux在线扩容,不重启系统

因为工作需要&#xff0c;需要将运行在ESXi主机上面的一台Centos 里面的一个LV卷进行扩容&#xff0c;下面记录了此次扩展的详细过程&#xff0c;整个过程&#xff0c;不需要重启服务器。1. 首先通过df-h 查看当前磁盘结构如下&#xff1a;我们此次的最终目标&#xff0c;就是将…

android item三种,Android RecyclerView中的ItemDecoration的几种绘制方法

如题&#xff0c;我们使用recyclerview的时候&#xff0c;如果没有设置显示条目的margin&#xff0c;或者padding的话&#xff0c;是没有分割线效果的。那么除去使用margin或padding,其余的方法是用itemdecoration绘制分割线我们绘制分割线的时候通常会使用drawable去绘制&…

上传文件和提交textfield_0基础掌握Django框架(37)文件上传

为了更好的学习效果&#xff0c;请搭配视频教程一起学习&#xff1a;Django零基础到项目实战 - 网易云课堂​study.163.com文件上传&#xff1a;文件上传是网站开发中非常常见的功能。这里详细讲述如何在Django中实现文件的上传功能。前端HTML代码实现&#xff1a;在前端中&…

华为p10刷原生android,华为p10怎么刷机 华为p10刷机方法【详细介绍】

喜欢折腾手机的用户一定对于手机root权限获取不陌生&#xff0c;root后虽然不能享受官方联保服务但同时带来的好处不用小编多说。前面给大家介绍了 华为p10 刷入第三方recovery教程&#xff0c;现在华为p10刷机包已经放出来&#xff0c;小编给大家带来华为p10刷机权限获取教程。…

使用container的嵌套_ElementUI 技术揭秘(4)— Container 布局容器组件的设计与实现。...

前言上一篇文章我们分析了 Layout 布局组件的设计和实现&#xff0c;它的应用场景通常是局部布局。对于整个页面的布局&#xff0c;element-ui 提供了 Container 布局容器组件&#xff0c;专门用于 PC 管理后台页面的整体布局。需求分析我们先通过几幅图看一下页面的常见布局。…

宝马屏幕共享android,宝马屏幕共享功能怎么用

【太平洋汽车网】使用宝马屏幕共享要先打开车载电脑多媒体功能&#xff0c;再选择屏幕共享手机投屏&#xff0c;勾掉宝马互联选项&#xff0c;然后再打开手机wifi&#xff0c;选择BMW输入密码即可使用屏幕共享。车手机互联映射即车机互联&#xff0c;就是将手机投屏到车载显示器…

“用于无监督图像生成解耦的正交雅可比正则化”论文解读

Tikhonov regularization terms https://blog.csdn.net/jiejinquanil/article/details/50411617 本文是对博客https://baijiahao.baidu.com/s?id1710942953471566583&wfrspider&forpc的重写。 1 简介 本文是对发表于计算机视觉和模式识别领域的顶级会议 ICCV 2021 的…

html带正方形项目列表,5种简单实用的css列表样式实例,可以直接用到项目中。...

谁不希望有一个好看而又干净的列表&#xff1f;这篇文章中我们给出几个实用的例子&#xff0c;你可以把他们直接用到自己的工作中。我们从一个带有动画效果的垂直列表开始&#xff0c;接着是一个图文混排的例子&#xff0c;然后是一个只有图片的list例子跟一个水平菜单的例子&a…

pb 打印html页面,用PB开发WEB应用

用PB开发WEB应用用PB开发WEB应用烟台教育学院网络中心 孙连三一、PB Window plug &#xff0d;in 的用途PowerBuilder Window plug &#xff0d;in 的用途是在HTML 页面中插入PowerBuilder 中定义的窗口对象&#xff0c;此窗口对象上定义的功能在浏览器中一样被执行&#xff0…

html加上百度统计,vue单页面应用加入百度统计

版权声明&#xff1a;本文为CSDN博主「钟文辉」的原创文章&#xff0c;遵循CC 4.0 by-sa版权协议&#xff0c;转载请附上原文出处链接及本声明。原文链接&#xff1a;https://blog.csdn.net/qq_39753974/article/details/80322643在单页面中&#xff0c;要是只加在head中的话那…

推荐系统:猜你喜欢

0 简介 网络的迅速发展带来了信息超载&#xff08;information overload&#xff09;问题。解决信息超载问题一个非常有潜力的办法是推荐系统&#xff0c;它根据用户的信息需求、兴趣等&#xff0c;将用户感兴趣的信息、产品等推荐给用户。推荐系统最典型应用领域是电子商务领…

量子计算机九章能否预测未来,张礼立 : 中国 “九章”量子计算机到底厉害在哪?...

原标题&#xff1a;张礼立 &#xff1a; 中国 “九章”量子计算机到底厉害在哪&#xff1f;【背景信息】12月4日&#xff0c;《科学》杂志公布了 中国 “九章” 的重大突破。 这台由中国科学技术大学潘建伟、陆朝阳等学者研制的76个光子的量子计算原型机&#xff0c;推动全球量…

利用GAN实现QR Code超分辨率的研究

文章目录1 传统方案2 基于CNN的实现方案2.1 SRCNN2.2 WeChat AI3 基于GAN的实现方案3.1 SRGAN3.2 ESRGAN3.3 Real-ESRGAN4 基于GAN的QR Code的实现方案1 传统方案 https://blog.csdn.net/caomin1hao/article/details/81092134?utm_mediumdistribute.pc_relevant.none-task-bl…

如何用计算机截部分屏,电脑如何长屏幕的截图?电脑截取长屏的方法

新手用户对windows系统截取长屏的方法比较陌生&#xff0c;正常情况下&#xff0c;我们截取屏幕内容是登录QQ&#xff0c;按ctrlalta来截取屏幕的&#xff0c;只能截取部分内容&#xff0c;无法截图整个页面。日常的工作生活中&#xff0c;截图是经常会用到的&#xff0c;如果你…

Audio-based snore detection using deep neural networks解读

0 摘要 Background and Objective: 打鼾是一种普遍现象。 它可能是良性的&#xff0c;但也可能是阻塞性睡眠呼吸暂停 (OSA) 一种普遍存在的睡眠障碍的症状。 准确检测打鼾可能有助于筛查和诊断 OSA。 Methods: 我们介绍了一种基于卷积神经网络 (CNN) 和循环神经网络 (RNN) 组合…

2018计算机专业考研报名人数,2018年全国考研报考人数、各省市考研报考人数、历年考研录取率、全日制及非全日制硕士研究生比例及考研动机分析【图】...

一、考研报考人数、录取率及报录比分析从上世纪末开始&#xff0c;研究生和本科生招生数量一样在逐步增多。本科扩招的同时&#xff0c;研究生也在扩招。1999年&#xff0c;全国报考研究生人数31.9万&#xff0c;录取人数7.3万&#xff0c;录取率22.8%。2008年&#xff0c;报考…