【Flink connector】文件系统 SQL 连接器:实时写文件系统以及(kafka到hive)实战举例

文章目录

  • 一. 滚动策略:sink后文件切分(暂不关注)
    • 1. 切分分区目录下的文件
    • 2. 小文件合并
  • 二. 分区提交
    • 1. 分区提交触发器 (什么时候创建分区)
      • 1.1. 逻辑说明
      • 1.2. 举例说明
    • 2. 分区时间提取器 (由分区字段来写分区名)
      • 2.1. 逻辑说明
      • 2.2. 举例说明
    • 3. 分区提交策略 (分区创建后怎么告知下游或系统)
      • 3.1. 逻辑说明
      • 3.2. 举例说明
    • 4. Sink Parallelism
  • 三. 完整示例
    • 1. 官网(partition-time)
    • 2. 实际测试(kafka->hive)

本文概述

flink支持动态写数据到文件系统,提供了分块写数据以及动态分区,接下来看flink是如何分块写数据,以及如何配置动态分区的建立。

 

文件系统连接器支持写入,是基于 Flink 的 文件系统 写入文件的。

我们可以直接编写 SQL,将流数据插入到非分区表。 如果是分区表,可以配置分区操作相关的属性。具体参考分区提交。

 

一. 滚动策略:sink后文件切分(暂不关注)

1. 切分分区目录下的文件

分区目录下的数据被分割到 part 文件中。每个分区对应的 sink 的 subtask 都至少会为该分区生成一个 part 文件。
该策略基于大小,和指定的文件可被打开的最大 timeout 时长,来滚动 part 文件。

默认值类型描述
sink.rolling-policy.file-size128MBMemorySize当part达到设定值时,文件开始滚动。
sink.rolling-policy.rollover-interval30 minDuration滚动前,part 文件处于打开状态的最大时长(默认值30分钟,以避免产生大量小文件)。 检查频率是由 sink.rolling-policy.check-interval 属性控制的。
sink.rolling-policy.check-interval1 minDuration周期检查文件打开时长。

根据描述默认情况下Flink采取了如上默认值的滚动策略。

 


todo:checkpoint 也会影响part文件的生成


对于 bulk formats 数据 (parquet、orc、avro):滚动策略与 checkpoint 间隔(pending 状态的文件会在下个 checkpoint 完成)控制了 part 文件的大小和个数。

 

2. 小文件合并


todo: checkpoint的间隔会影响文件产生的效率


file sink 支持文件合并,允许应用程序使用较小的 checkpoint 间隔但不产生大量小文件。

默认值类型描述
auto-compactionfalseBoolean在流式 sink 中自动合并功能。数据首先会被写入临时文件。当 checkpoint 完成后,该检查点产生的临时文件会被合并。这些临时文件在合并前不可见。
compaction.file-size(无)MemorySize合并目标文件大小,默认值为滚动文件大小

如果启用文件合并功能,会根据目标文件大小,将多个小文件合并成大文件。

在生产环境中使用文件合并功能时,需要注意:

  • 只有 checkpoint 内部的文件才会被合并,至少生成的文件个数与 checkpoint 个数相同。
  • 合并前文件是不可见的,那么文件的可见时间是:checkpoint 间隔时长 + 合并时长。
  • 如果合并时间过长,将导致反压,延长 checkpoint 所需时间。

 

二. 分区提交

sink动态写分区包括如下两个操作:

  1. Trigger-提交分区的时机:通过什么来识别分区(watermark或处理时间),什么时候提交分区
  2. Policy-提交分区后通知下游:写_SUCCESS,hive metadata 中新增分区,或自定义:合并小文件等。

注意: 分区提交仅在(什么是?)动态分区插入模式下才有效。

 

1. 分区提交触发器 (什么时候创建分区)

1.1. 逻辑说明

Flink 提供了两种类型分区提交触发器:

  • 第一种:根据分区的处理时间(没有根据字段吗)。基于分区创建时间(这里指的是什么)和当前系统时间来触发分区。 这种触发器更具通用性,但不是很精确。例如,数据延迟或故障将导致过早提交分区。
  • 第二种:根据从分区字段提取的时间以及 watermark。 这需要 job 支持 watermark 生成,分区是根据时间来切割的,例如,按小时或按天分区。

 

感知分区的几种情况:

  1. 不管分区数据是否完整而只想让下游尽快感知到分区:(不推荐)

‘sink.partition-commit.trigger’=‘process-time’ (默认值)
‘sink.partition-commit.delay’=‘0s’ (默认值) 一旦数据进入分区,将立即提交分区。注意:这个分区可能会被提交多次(提交多次产生的影响ing:浪费多余的资源)。

  1. 如果想让下游只有在分区数据完整时才感知到分区,并且 job 中有 watermark 生成,也能从分区字段的值中提取到时间

‘sink.partition-commit.trigger’=‘partition-time’
‘sink.partition-commit.delay’=‘1h’ (根据分区类型指定,如果是按小时分区可配置为 ‘1h’) 该方式是最精准地提交分区的方式,尽力确保提交分区的数据完整。

  1. 如果想让下游系统只有在数据完整时才感知到分区,但是没有 watermark,或者无法从分区字段的值中提取时间:

‘sink.partition-commit.trigger’=‘process-time’ (默认值)
‘sink.partition-commit.delay’=‘1h’ (根据分区类型指定,如果是按小时分区可配置为 ‘1h’) 该方式尽量精确地提交分区,但是数据延迟或者故障将导致过早提交分区

延迟数据的处理:延迟的记录会被写入到已经提交的对应分区中,且会再次触发该分区的提交。

 

如下参数:

确定何时提交分区:这里只关注process-time trigger下的两个参数

sink.partition-commit.trigger:
默认值:process-time
描述:

  • 基于机器时间: ‘process-time’:不需要分区时间提取器也不需要 watermark 生成器。
  • 一旦 “当前系统时间” 超过了 "分区创建系统时间(比如flink消费到一条数据,触发了分区创建操作对应的时间)" 和 'sink.partition-commit.delay' 之和立即提交分区。
  • 基于提取的分区时间:‘partition-time’。需要 watermark 生成。一旦 watermark 超过了 “分区创建系统时间” 和 ‘sink.partition-commit.delay’ 之和立即提交分区。

sink.partition-commit.delay
默认值:0s
描述: 该延迟时间之前分区不会被提交。如果是按天分区,可以设置为 ‘1 d’,如果是按小时分区,应设置为 ‘1 h’,当然也可以设置分钟,例如 30min

 

1.2. 举例说明

--默认值可以不配置
'sink.partition-commit.trigger'='process-time' 
--当来第一条数据时(记录为时刻1),先创建hive分区文件夹,当时间超过 时刻1+1h 时,分区提交
--分区未提交时文件为.data开头的临时文件,分区提交时,会从cp中同步数据到临时文件中,并命名为正式文件。 
'sink.partition-commit.delay'='1h' 

 

2. 分区时间提取器 (由分区字段来写分区名)

2.1. 逻辑说明

时间提取器从分区字段值中提取时间。

partition.time-extractor.kind
默认值:default
描述:从分区字段中提取时间的时间提取器。
支持default 和 custom。在默认情况下,可以配置 timestamp-pattern/formatter。对于custom,应指定提取器类。

partition.time-extractor.timestamp-pattern
默认值:无
描述:分区格式的数据拼接。
默认支持第一个字段按 ‘yyyy-MM-dd hh:mm:ss’ 这种模式提取。

  • 如果需要从一个分区字段 ‘dt’ 提取 timestamp,可以配置成:‘$dt’。
  • 如果需要从多个分区字段中提取分区,比如 ‘year’、‘month’、‘day’ 和 ‘hour’ 提取 timestamp,可以配置成: $year-$month-$day $hour:00:00
  • 如果需要从两个分区字段 'dt' 和 'hour' 提取 timestamp,可以配置成:'$dt$hour:00:00'。

partition.time-extractor.timestamp-formatter
默认值:yyyy-MM-dd HH:mm:ss
描述:分区格式的规定。具体数值由partition.time-extractor.timestamp-pattern设置。默认yyyy-MM-dd HH:mm:ss

 

2.2. 举例说明

-- 'year'、'month' 和 'day'三个字段组成分区
-- 可不填,'default'为默认值,即从分区字段中获取
'partition.time-extractor.kind' = 'default'
--具体动态分区名怎么由字段拼接
'partition.time-extractor.timestamp-pattern' = '$year$month$day'
--分区名格式
'partition.time-extractor.timestamp-formatter' = 'yyyyMMdd'

 

3. 分区提交策略 (分区创建后怎么告知下游或系统)

3.1. 逻辑说明

分区提交策略定义了提交分区时的具体操作。

  1. metadata 存储(metastore),仅 hive 表支持该策略,该策略下文件系统通过目录层次结构来管理分区。(todo:通过hive更新表元数据?)
  2. success 文件,该策略下会在分区对应的目录下生成一个名为 _SUCCESS 的空文件。

sink.partition-commit.policy.kind
默认值:无
描述:分区提交策略通知下游某个分区已经写完毕可以被读取了。

  • metastore:向 metadata 增加分区。仅 hive 支持 metastore 策略,文件系统通过目录结构管理分区;
  • success-file:在目录中增加 ‘_success’ 文件; 上述两个策略可以同时定:‘metastore,success-file’。
  • custom:通过指定的类来创建提交策略。 支持同时指定多个提交略:‘metastore,success-file’。

sink.partition-commit.success-file.name
默认值: _SUCCESS
描述:使用success-file 分区提交策略时的文件名,默认值是 ‘_SUCCESS’。

sink.partition-commit.policy.class
默认值:无
描述: custom下才用: 实现PartitionCommitPolicy 接口的分区提交策略类。只有在 custom 提交策略下才使用该类。 可以自定义提交策略,如下


public class AnalysisCommitPolicy implements PartitionCommitPolicy {private HiveShell hiveShell;@Overridepublic void commit(Context context) throws Exception {if (hiveShell == null) {hiveShell = createHiveShell(context.catalogName());}hiveShell.execute(String.format("ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s = '%s') location '%s'",context.tableName(),context.partitionKeys().get(0),context.partitionValues().get(0),context.partitionPath()));hiveShell.execute(String.format("ANALYZE TABLE %s PARTITION (%s = '%s') COMPUTE STATISTICS FOR COLUMNS",context.tableName(),context.partitionKeys().get(0),context.partitionValues().get(0)));}
}

todo:如上通过hive语句来添加分区

 

3.2. 举例说明


'sink.partition-commit.policy.kind'='success-file'
'sink.partition-commit.success-file.name'='_SUCCESS_gao'

 

4. Sink Parallelism

在流模式和批模式下,向外部文件系统(包括 hive)写文件时的 parallelism 可以通过相应的 table 配置项指定。

默认情况下,该 sink parallelism 与上游 chained operator 的 parallelism 一样。
比如kafka作为source源(分区为5,设置并行度为5),(在同一个chained中)写分区时,hive sink的并行度自动设为5。

当配置了跟上游的 chained operator 不一样的 parallelism 时,写文件和合并文件的算子(如果开启的话)会使用指定的 sink parallelism。

默认值类型描述
sink.parallelism(无)Integer将文件写入外部文件系统的 parallelism。这个值应该大于0否则抛异常。

注意: 目前,当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常。

 

三. 完整示例

1. 官网(partition-time)

以下示例展示了如何使用文件系统连接器编写流式查询语句,将数据从 Kafka 写入文件系统,然后运行批式查询语句读取数据。


CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);CREATE TABLE fs_table (user_id STRING,order_amount DOUBLE,dt STRING,`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH ('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file'
);-- 流式 sql,插入文件系统表
INSERT INTO fs_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'),DATE_FORMAT(log_ts, 'HH') 
FROM kafka_table;-- 批式 sql,使用分区修剪进行选择
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

如果 watermark 被定义在 TIMESTAMP_LTZ 类型的列上并且使用 partition-time 模式进行提交,sink.partition-commit.watermark-time-zone 这个属性需要设置成会话时区,否则分区提交可能会延迟若干个小时。


CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,ts BIGINT, -- 以毫秒为单位的时间ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义 watermark
) WITH (...);CREATE TABLE fs_table (user_id STRING,order_amount DOUBLE,dt STRING,`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH ('connector'='filesystem','path'='...','format'='parquet','partition.time-extractor.timestamp-pattern'='$dt $hour:00:00','sink.partition-commit.delay'='1 h','sink.partition-commit.trigger'='partition-time','sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假设用户配置的时区为 'Asia/Shanghai''sink.partition-commit.policy.kind'='success-file'
);-- 流式 sql,插入文件系统表
INSERT INTO fs_table 
SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),DATE_FORMAT(ts_ltz, 'HH') 
FROM kafka_table;-- 批式 sql,使用分区修剪进行选择
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

 

2. 实际测试(kafka->hive)

-- SET 'table.sql-dialect'='hive';
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'data_base','hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);CREATE TABLE source_kafka (`pv` string,`uv` string,`p_day_id` string
) WITH ('connector' = 'kafka-x','topic' = 'hive_kafka','properties.bootstrap.servers' = 'xxx:9092','properties.group.id' = 'luna_g','scan.startup.mode' = 'earliest-offset','json.timestamp-format.standard' = 'SQL','json.ignore-parse-errors' = 'true','format' = 'json','scan.parallelism' = '1');-- 通过sql hint来指定表的行为
--  1. 分区名称策略
-- partition.time-extractor.timestamp-pattern'='$p_day_id' :分区数据组成
-- partition.time-extractor.timestamp-formatter' = 'yyyyMMdd' :分区格式-- 2. 分区提交策略 
-- 'sink.partition-commit.delay'='5min':分区提交延迟:分区时间 + 延迟 与 process_time做对比--3. 通知下游策略
-- 'sink.partition-commit.policy.kind'='metastore,success-file':通知下游策略
-- 'sink.partition-commit.success-file.name'='_SUCCESS_gao' :成功文件名称insert into myhive.logsget.dws_thjl_pv_uv_d_xky_bak /*+ OPTIONS('partition.time-extractor.timestamp-pattern'='$p_day_id:00:00','sink.partition-commit.policy.kind'='metastore,success-file','sink.partition-commit.success-file.name'='_SUCCESS_gao111') */select *  from source_kafka; 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/773048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

政安晨:【Keras机器学习实践要点】(四)—— 顺序模型

政安晨的个人主页:政安晨 欢迎 👍点赞✍评论⭐收藏 收录专栏: TensorFlow与Keras实战演绎机器学习 希望政安晨的博客能够对您有所裨益,如有不足之处,欢迎在评论区提出指正! 介绍 Keras是一个用于构建和训练深度学习模…

MySQL数据库高级语句(一)

文章目录 MySQL高级语句older by 排序区间判断查询或与且(or 与and)嵌套查询(多条件)查询不重复记录distinctcount 计数限制结果条目limit别名as常用通配符 结语 MySQL高级语句 1构建测试用表 create table test1 (id int prima…

【搜索引擎1】Ubuntu通过deb方式安装ElasticSearch和Kibana、ik中文分词插件

1、官网下载文件 版本为官网最新版本,ElasticSearch与Kibana版本必须保持一致 ElasticSearch下载地址:Download Elasticsearch | Elastic Kibana下载地址:Past Releases of Elastic Stack Software | Elastic 下载选择DEB文件 ik插件下载…

湖北汽车工业学院 实验一 关系数据库标准语言SQL

头歌 实验一 关系数据库标准语言SQL 制作不易!点个关注呗!为大家创造更多的价值! 目录 头歌 实验一 关系数据库标准语言SQL**制作不易!点个关注呗!为大家创造更多的价值!** 第一关:创建数据库第…

Apple Vision Pro应用合集

这里给大家分享一个网站,手机了最新的apple vision pro 上面运行的应用。 1、查找应用:用户可以浏览特色推荐的应用,或者通过随机挑选功能发现新的应用。 2、社区交流:提供社区功能,用户可以在这里交流使用体验、分享…

小程序接入第三方信息流流程 下载SDK

由第三方信息流提供相应的SDK下载链接以及接入说明和开发文档或其他方式接入,如果第三方能支持小程序SDK,则不需要后面步骤,只需要提供相关开发文档和接入方式接口 接入SDK 后台开发人员接入第三方提供的SDK,并进行相关接口开发…

在django中使用kindeditor出现转圈问题

在django中使用kindeditor出现转圈问题 【一】基础检查 【1】前端检查 确保修改了uploadJson的默认地址 该地址需要在路由层有映射关系 确认有加载官方文件 kindeditor-all-min.js确保有传递csrfmiddlewaretoken 或者后端关闭了csrf验证 <textarea name"content&qu…

如何使用 ChatGPT 进行编码和编程

文章目录 一、初学者1.1 生成代码片段1.2 解释功能 二、自信的初学者2.1 修复错误2.2 完成部分代码 三、中级水平3.1 研究库3.2 改进旧代码 四、进阶水平4.1 比较示例代码4.2 编程语言之间的翻译 五、专业人士5.1 模拟 Linux 终端 总结 大多数程序员都知道&#xff0c;ChatGPT …

GitLab更新失败(Ubuntu)

在Ubuntu下使用apt更新gitlab报错如下&#xff1a; An error occurred during the signature verification.The repository is not updated and the previous index files will be used.GPG error: ... Failed to fetch https://packages.gitlab.com/gitlab/gitlab-ee/ubuntu/d…

thinkadmin 新版安装步骤

1.通过 Composer 安装: ( 推荐方式,默认只安装 admin 模块 ) ### 创建项目( 需要在英文目录下面执行 ) composer create-project zoujingli/thinkadmin### 进入项目根目录 cd thinkadmin### 数据库初始化并安装 ### 默认使用 Sqlite 数据库,若使用其他数据库请按第二步修…

FineDance pkl渲染

FineDance pkl渲染代码 如果是75,也可以渲染 给定wav路径,可以渲染mp4 import pickle import numpy as np import torch import cv2 import os # os.environ["PYOPENGL_PLATFORM"] = "osmesa" from tqdm import tqdm from smplx import SMPL, SMPLX, …

AIGC工具系列之——基于OpenAI的GPT大模型搭建自己的AIGC工具

今天我们来讲讲目前非常火的人工智能话题“AIGC”&#xff0c;以及怎么使用目前的AI技术来开发&#xff0c;构建自己的AIGC工具 什么是AIGC&#xff1f; AIGC它的英文全称为(Artificial Intelligence Generated Content)&#xff0c;中文翻译过来就是“人工智能生成内容”&…

【笔记】Nginx配置类似Tomcat请求接口链路access_log日志

项目部署在tomcat容器中&#xff0c;请求的接口会被记录在文件名&#xff1a;localhost_access_log.2024-03-22.log的文件中&#xff0c;如果使用Nginx也需要记录请求接口&#xff0c;该如何做呢&#xff1f;步骤如下 步骤1&#xff1a; 打开nginx.conf&#xff0c;在 http 块中…

HarmonyOS网格布局:List组件和Grid组件的使用

简介 在我们常用的手机应用中&#xff0c;经常会见到一些数据列表&#xff0c;如设置页面、通讯录、商品列表等。下图中两个页面都包含列表&#xff0c;“首页”页面中包含两个网格布局&#xff0c;“商城”页面中包含一个商品列表。 上图中的列表中都包含一系列相同宽度的列表…

easyexcel与vue配合下载excel

后端 设置响应 // 设置响应头 response.setContentType("application/octet-stream;charsetUTF-8"); String returnName null; try {returnName URLEncoder.encode(fileName, "UTF-8"); } catch (UnsupportedEncodingException e) {throw new RuntimeExc…

java类的定义方式和实例化、this引用、对象的构造及其初始化、封装特性、static修饰成员变量、static修饰成员方法

java类的定义方式和实例化 类的定义和使用 类是用来对一个实体(对象)来进行描述的&#xff0c;主要描述该实体(对象)具有哪些属性(外观尺寸等)&#xff0c;哪些功能(用来干啥)&#xff0c;描述完成后计算机就可以识别了。 在Java中定义类需要用到class关键字具体如下 // 创…

API网关-Apisix路由配置教程(数据编辑器方式)

文章目录 前言一、端口修改1. apisix 端口修改2. dashboard 端口修改3. 登录密码修改 二、常用插件介绍1. 常用转换插件1.1 proxy-rewrite插件1.1.1 属性字段1.1.2 配置示例 2. 常用认证插件2.1 key-auth插件2.1.1 消费者端字段2.1.2 路由端字段2.1.3 配置示例 2.2 basic-auth插…

Oracle 19cADG集群补丁升级

Oracle 19cADG集群补丁升级 文章目录 Oracle 19cADG集群补丁升级1.备库备份2.备库升级Opatch3.备库应用补丁4.主库备份 oracle_home目录5.主库升级Opatch6.注册补丁7.编译无效对象8.检查主库的补丁注册情况9.备库切换主库完成补丁注册 1.备库备份 su - oracle cd $ORACLE_HOME…

机器学习 - 神经网络中的训练模型

接着上一篇机器学习-创建一个PyTorch classification model做进一步陈述。 训练模型的步骤&#xff1a; Forward pass: The model goes through all of the training data once, performing its forward() function calculations (model(x_train))Calculate the loss: 使用 l…

format(C++20)

1. std::format format_01.cpp // g format_01.cpp -stdc20 #include <iostream> #include <string> #include <format>void test_01() {// 使用字符串填充std::cout << std::format("Hello {}!\n", "World"); // Hello World!…