Flink系列之:动态发现新增分区

Flink系列之:动态发现新增分区

  • 一、动态发现新增分区
  • 二、Flink SQL动态发现新增分区
  • 三、Flink API动态发现新增分区

为了在不重新启动 Flink 作业的情况下处理主题扩展或主题创建等场景,可以将 Kafka 源配置为在提供的主题分区订阅模式下定期发现新分区。要启用分区发现,请为属性partition.discovery.interval.ms设置一个非负值。

一、动态发现新增分区

flink程序增加自动发现分区参数:

  • flink.partition-discovery.interval-millis是一个配置属性,用于设置Flink作业中的分区发现间隔时间(以毫秒为单位)。
  • 在Flink作业中,数据源(例如Kafka或文件系统)的分区可能会发生变化。为了及时感知分区的变化情况,并根据变化进行相应的处理,Flink提供了分区发现机制。
  • flink.partition-discovery.interval-millis配置属性用于设置Flink作业在进行分区发现时的间隔时间。Flink作业会定期检查数据源的分区情况,如果发现分区发生了变化(例如增加或减少了分区),Flink会相应地调整作业的并行度或重新分配任务来适应新的分区情况。
  • 通过调整flink.partition-discovery.interval-millis的值,可以控制Flink作业进行分区发现的频率。较小的间隔时间可以实时感知到分区变化,但可能会增加作业的开销;较大的间隔时间可以减少开销,但可能导致较长时间的延迟。
  • 需要注意的是,flink.partition-discovery.interval-millis的默认值是5分钟(300000毫秒),可以根据具体需求进行调整。

二、Flink SQL动态发现新增分区

参数:scan.topic-partition-discovery.interval

CREATE TABLE KafkaTable (`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
);

Connector Options:

OptionRequiredDefaultTypeDescription
scan.topic-partition-discovery.intervaloptional(none)Duration消费者定期发现动态创建的Kafka主题和分区的时间间隔。

三、Flink API动态发现新增分区

参数:partition.discovery.interval.ms

Java

KafkaSource.builder()    
.setProperty("partition.discovery.interval.ms", "10000"); 
// discover new partitions per 10 seconds

Python

KafkaSource.builder() \.set_property("partition.discovery.interval.ms", "10000")  # discover new partitions per 10 seconds

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/19091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot2学习笔记

信息来源:https://www.bilibili.com/video/BV19K4y1L7MT?p5&vd_source3969f30b089463e19db0cc5e8fe4583a 作者提供的文档:https://www.yuque.com/atguigu/springboot 作者提供的代码:https://gitee.com/leifengyang/springboot2 ----…

[转]关于cmake --build .的理解

https://blog.csdn.net/qq_38563206/article/details/126486183 https://blog.csdn.net/HandsomeHong/article/details/120170219 cmake --build . 该命令的含义是:执行当前目录下的构建系统,生成构建目标。 cmake项目构建过程简述: 1. 首先&#xf…

Verilog语法学习——边沿检测

边沿检测 代码 module edge_detection_p(input sys_clk,input sys_rst_n,input signal_in,output edge_detected );//存储上一个时钟周期的输入信号reg signal_in_prev;always (posedge sys_clk or negedge sys_rst_n) beginif(!sys_rst_n)signal_in_prev < 0;else…

12. Mybatis 多表查询 动态 SQL

目录 1. 数据库字段和 Java 对象不一致 2. 多表查询 3. 动态 SQL 使用 4. if 标签 5. trim 标签 6. where 标签 7. set 标签 8. foreach 标签 9. 通过注解实现 9.1 查找所有数据 9.2 通过 id 查找 1. 数据库字段和 Java 对象不一致 我们先来看一下数据库中的数…

解决Hadoop审计日志hdfs-audit.log过大的问题

【背景】 新搭建的Hadoop环境没怎么用&#xff0c;就一个环境天天空跑&#xff0c;结果今天运维告诉我说有一台服务器磁盘超过80%了&#xff0c;真是太奇怪了&#xff0c;平台上就跑了几个spark测试程序&#xff0c;哪来的数据呢&#xff1f; 【问题调查】 既然是磁盘写满了&…

mysql主从配置及搭建(gtid方式)

一、搭建主从-gtid方式 搭建步骤查看第一篇。bin-log方式。可以进行搭建1.1 gtid和二进制的优缺点 使用 GTID 的主从复制优点&#xff1a; 1、简化配置&#xff1a;使用 GTID 可以简化主从配置&#xff0c;不需要手动配置每个服务器的二进制日志文件和位置。 2、自动故障转移…

MySQL 其他数据库日志

我们了解数据库事务时&#xff0c;知道两种日志&#xff1a;重做日志&#xff0c;回滚日志。 对于线上数据库应用系统&#xff0c;突然遭遇 数据库宕机 怎么办&#xff1f;在这种情况下&#xff0c;定位宕机的原因 就非常关键。我们可以查看数据库的 错误日志。因为日志中记录…

给你一个小技巧,解放办公室管理!

电力的稳定供应对于现代社会中的办公室和企业来说至关重要。为了应对这些潜在的问题&#xff0c;许多办公室和企业都采用了不间断电源&#xff08;UPS&#xff09;系统来提供电力备份。UPS可以保持关键设备的运行&#xff0c;确保生产和业务不受干扰。 然而&#xff0c;仅仅安装…

[SQL挖掘机] - 窗口函数 - lead

介绍: lead() 是一种常用的窗口函数&#xff0c;它用于获取某一行之后的行的值。它可以用来在结果集中的当前行后面访问指定列的值。 用法: lead() 函数的语法如下&#xff1a; lead(列名, 偏移量, 默认值) over (partition by 列名1, 列名2, ... order by 列名 [asc|desc]…

力扣468 验证IP地址

ipv4地址&#xff1a;1.必须是四个非空子串 2.每个非空子串不含前导零 3.子串里字符只能是0~255 ipv6地址&#xff1a;1.必须是八个非空子串 2。每段非空串得长度是否在1~4之间&#xff0c;且不含0-9&#xff0c;a-f&#xff0c;A-F之外得字符。 3.同时0-9也不允许含前导零 cl…

【JAVASE】什么是方法

⭐ 作者&#xff1a;小胡_不糊涂 &#x1f331; 作者主页&#xff1a;小胡_不糊涂的个人主页 &#x1f4c0; 收录专栏&#xff1a;浅谈Java &#x1f496; 持续更文&#xff0c;关注博主少走弯路&#xff0c;谢谢大家支持 &#x1f496; 方法 1. 方法概念及使用1.1 什么是方法1…

linux 服务器忘记mysql密码(修改密码)

1、结束当前正在运行的mysql进程。 service mysql stop 2、用mysql安全模式运行并跳过权限验证。 /www/server/mysql/bin/mysqld_safe --skip-grant-tables & 3、重开一个终端窗口以root身份登录mysql&#xff0c; 输入&#xff1a;mysql -u root -p 命令然后回车&a…

[学习笔记]全面掌握Django ORM

参考资料&#xff1a;全面掌握Django ORM 1.1 课程内容与导学 学习目标&#xff1a;独立使用django完成orm的开发 学习内容&#xff1a;Django ORM所有知识点 2.1 ORM介绍 ORM&#xff1a;Object-Relational Mapping Django的ORM详解 在django中&#xff0c;应用的文件夹…

Elisp之buffer-substring-no-properties用法(二十七)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

c++ std::sort使用自定义的比较函数排序

文章目录 使用sort对容器内元素进行排序在类中如何调用自定义的成员函数进行排序错误原因&#xff1a;解决办法&#xff1a;使用Lambda表达式&#xff1a; 使用sort对容器内元素进行排序 std::sort()函数专门用来对容器或普通数组中指定范围内的元素进行排序&#xff0c;排序规…

2023.07.29 驱动开发DAY6

通过epoll实现一个并发服务器 服务器 #include <stdio.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/epoll.h…

编译 TPC-DS ( dsdgen / dsqgen ) 生成测试数据和查询语句

文章目录 1. 下载2. 编译3. 生成测试数据4. 检查5. 建表6. 生成查询语句 1. 下载 TPC所有Benchmark工具包的下载地址是&#xff1a;https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp , TPC-DS当前最新版本是3.2.0&#xff0c;下载前需要填写真…

上位机一般的开发工具?

上位机开发工具是用于开发和构建上位机应用程序的软件工具。它们提供了一系列功能和资源&#xff0c;帮助开发人员设计、编写和调试上位机应用程序。以下是一些常见的上位机开发工具&#xff1a;Visual Studio&#xff1a;作为一种集成开发环境&#xff08;IDE&#xff09;&…

spark-sql : “java.lang.NoSuchFieldError: out“ 异常解决

异常现象 at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:847)at org.apac…

【有趣的设计模式】23 种设计模式详解和场景分析

前言 七大设计原则 1、单一原则&#xff1a;一个类只负责一个职责 2、开闭原则&#xff1a;对修改关闭&#xff0c;对扩展开放 3、里氏替换原则&#xff1a;不要破坏继承关系 4、接口隔离原则&#xff1a;暴露最小接口&#xff0c;避免接口过于臃肿 5、依赖倒置原则&#xff1…