iceberg系列之 hadoop catalog 小文件合并实战

  1. 背景
    flink1.15 hadoop3.0
  2. pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.iceberg</groupId><artifactId>flink-iceberg</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--idea运行时也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>compile</scope></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime-1.15</artifactId><version>1.3.0</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-core</artifactId><version>1.3.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><!-- 指定主类 --><mainClass>com.iceberg.flink.UnionDelData</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
  1. 资源配置文件
    hadoop三个常用配置文件core-site.xml hdfs-site.xml yarn-site.xml 放到资源目录下
  2. java代码
package com.iceberg.flink;import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;import java.io.File;
import java.net.MalformedURLException;public class UnionDelData {public static void main(String[] args) throws MalformedURLException {      String tableNames = args[1];long targetsSize = parseSizeToBytes(args[2]);int parallelism = Integer.parseInt(args[3]);long retainTime = parseTimeToMillis(args[4]);int retainLastNum = Integer.parseInt(args[5]);Configuration conf = new Configuration();conf.addResource(new File("/home/hadoop/hadoopconf/core-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/hdfs-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/yarn-site.xml").toURI().toURL());HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, "/user/hadoop/path/");for (String tableName : tableNames.split(",")) {Table table = hadoopCatalog.loadTable(TableIdentifier.of("prod", tableName));UnionDataFile(table,parallelism,targetsSize);deleteSnap(table,retainTime,retainLastNum);}}public static void UnionDataFile(Table table,int parallelism,long targetsSize) {Actions.forTable(table).rewriteDataFiles().maxParallelism(parallelism).caseSensitive(false).targetSizeInBytes(targetsSize).execute();}public static void deleteSnap(Table table,long retainTime,int retainLastNum){Snapshot snapshot = table.currentSnapshot();long oldSnapshot = snapshot.timestampMillis() - retainTime;if (snapshot != null) {            table.expireSnapshots().expireOlderThan(oldSnapshot).cleanExpiredFiles(true).retainLast(retainLastNum).commit();}}public static long parseSizeToBytes(String sizeWithUnit) {long size = Long.parseLong(sizeWithUnit.substring(0, sizeWithUnit.length() - 1));char unit = sizeWithUnit.charAt(sizeWithUnit.length() - 1); switch (unit) {case 'B':return size;case 'K':case 'k': return size * 1024;case 'M':case 'm': return size * 1024 * 1024;case 'G':case 'g': return size * 1024 * 1024 * 1024;default:throw new IllegalArgumentException("Invalid size unit: " + unit);}}public static long parseTimeToMillis(String timeWithUnit) {long time = Long.parseLong(timeWithUnit.substring(0, timeWithUnit.length() - 1));char unit = timeWithUnit.charAt(timeWithUnit.length() - 1);switch (unit) {case 's':case 'S':return time * 1000;case 'm':case 'M':return time * 60 * 1000;case 'h':case 'H':return time * 60 * 60 * 1000;case 'd':case 'D':return time * 24 * 60 * 60 * 1000;default:throw new IllegalArgumentException("Invalid time unit: " + unit);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/41423.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UBuntu18.04 Qt之双HDMI屏切换

UBuntu18.04 Qt之双HDMI接2个4K屏并分别设置分辨率、主屏、副屏 一、设置HDMI-2为主屏 在main函数里面添加&#xff1a; #include "mainwindow.h" #include <QApplication>int main(int argc, char *argv[]) {QApplication a(argc, argv);{long nTotal 0;c…

spring cloud gateway中配置uri

gateway中配置uri配置有三种方式: websocket方式&#xff1a;uri: ws://localhost:9000http方式: uri: http://localhost:8130/lb注册中心配置方式&#xff08;注册的服务名称&#xff09;: uri: lb://monitor-ms gateway的lb方式识别的服务名称命名规则&#xff1a; "[…

设计模式——适配器模式

引入实例 说起适配器其实在我们的生活中是非常常见的&#xff0c;比如&#xff1a;学校的宿舍的电压都比较低&#xff0c;而有的学生想使用大功率电器&#xff0c;宿舍的就会跳闸&#xff0c;然而如果你使用一个适配器&#xff08;变压器&#xff09;就可以使用了&#xff08;…

Jtti:windows虚拟内存最小值太低如何解决

当Windows虚拟内存的最小值设置过低时&#xff0c;可能会导致系统性能下降、应用程序崩溃甚至系统不稳定。解决方法包括&#xff1a; 调整虚拟内存设置&#xff1a; 可以通过以下步骤调整虚拟内存的设置&#xff1a; 右键点击“此电脑”或“计算机”&#xff0c;选择“属性”。…

被迫学习一波Linux命令

事情起因 部署一个服务&#xff0c;人家说了最低配置是3G&#xff0c;我没当回事&#xff0c;拿着个2G的服务器直接就上了&#xff0c;结果&#xff0c;哈哈&#xff0c;都能猜到结果&#xff1a;服务器内存爆了&#xff01;&#xff01;&#xff01;而且最可气的是服务器还登…

ansible案列之LNMP分布式剧本

LNMP分布式剧本 一&#xff1a;环境设置二&#xff1a;编写Nginx剧本准备nginx下载源准备配置文件并开放PHP的访问路径准备php测试页面编写nginx剧本 三&#xff1a;编写Mysql剧本编写密码获取脚本准备Mysql的yum源编写mysql剧本 四&#xff1a;准备PHP剧本准备两个配置文件编写…

深入理解linux内核--块设备驱动程序

块设备的处理 块设备驱动程序上的每个操作都涉及很多内核组件&#xff1b;其中最重要的一些如图14-1所示。 例如&#xff0c;我们假设一个进程在某个磁盘文件上发出一个read()系统调用 ——我们将会看到处理write请求本质上采用同样的方式。 下面是内核对进程请求给予回应的一…

煤矿调度IP语音对讲广播模块一键求助对讲矿用调度通信系统SIP语音对讲求助终端

硬件接口描述 SV-2101VP/ SV-2103VP系列网络音频模块&#xff0c;所有外部连接采用端子&#xff0c;电源采用2.0mm的端子&#xff0c;网络采用标准RJ45连接器&#xff0c;其他都是1.25mm的连接器。 端口类型定义 P ———— 电源 AI ———— 模拟输入&#xff08;在这里是音…

微信小程序前后端开发快速入门(完结篇)

这篇是微信小程序前后端快速入门完结篇了&#xff0c;今天利用之前学习过的所有知识做一个新的项目「群登记助手v1.0」小程序。 整体技术架构&#xff1a;小程序原生前端小程序云开发。 经历了前面教程的学习&#xff0c;大家有了一定的基础&#xff0c;所以本次分享重心主要是…

Ubuntu服务器service版本初始化

下载 下载路径 官网&#xff1a;https://cn.ubuntu.com/ 下载路径&#xff1a;https://cn.ubuntu.com/download 服务器&#xff1a;https://cn.ubuntu.com/download/server/step1 点击下载&#xff08;22.04.3&#xff09;&#xff1a;https://cn.ubuntu.com/download/server…

【Python百日进阶-Web开发-Peewee】Day271 - Peewee API文档 - 字段(二)

文章目录 11.3.17 class UUIDField11.3.18 class BinaryUUIDField11.3.19 class DateTimeField11.3.20 class DateField11.3.21 class TimeField11.3.22 class TimestampField11.3.23 class IPField11.3.24 class BooleanField11.3.25 class BareField11.3.26 class ForeignKey…

神经网络基础-神经网络补充概念-06-计算图

概念 “计算图”&#xff08;Computational Graph&#xff09;是一种用于表示数学表达式计算过程的图结构&#xff0c;广泛用于深度学习和自动微分等领域。计算图将复杂的数学表达式分解为一系列简单的计算节点&#xff0c;这些节点之间通过边连接&#xff0c;形成了一个有向无…

【jwt】JWT原理,JWT是用来解决什么问题的,如何自定义生成JWT数据,并且实现jwt数据的解码

JWT&#xff1a; JSON Web Token 1. jwt概述 用户登录成功后&#xff0c;服务端 如何知道客户端的每次请求对应的是哪个用户呢&#xff1f;怎么做&#xff1a;目前有两种方式实现. 1.1. 一是通过sessionId的方式&#xff0c;登录成功后服务端返回sessionId给客户端&#xff0…

【2023年11月第四版教材】《第5章-信息系统工程之数据工程(第三部分)》

《第5章-信息系统工程之数据工程&#xff08;第三部分&#xff09;》 2 数据工程2.1 数据建模2.2 数据标准化2.3 数据运维2.4 数据开发利用2.5 数据库安全 2 数据工程 2.1 数据建模 1、根据模型应用目的不同&#xff0c;可以将数据模型划分为三类:概念模型、逻辑模型和物理模…

【数据结构】栈与队列

1 栈 1.1 栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底。栈中的数据元素遵守后进先出 LIFO (Last In First Out) 的原则。 压栈&#xff1a;栈…

力扣75——图广度优先搜索

总结leetcode75中的图广度优先搜索算法题解题思路。 上一篇&#xff1a;力扣75——图深度优先搜索 力扣75——图广度优先搜索 1 迷宫中离入口最近的出口2 腐烂的橘子1-2 解题总结 1 迷宫中离入口最近的出口 题目&#xff1a; 给你一个 m x n 的迷宫矩阵 maze &#xff08;下标…

Kafka中的 ISR 机制

ISR 是什么 ISR 的全称叫做&#xff1a; In-Sync Replicas &#xff08;同步副本集&#xff09;, 可以理解为和 leader 保持同步的所有副本的集合。ISR 动态维护了一个和 leader 副本保持同步副本集合&#xff0c;ISR 中的副本全部都和 leader 的数据保持同步。 设一个场景&a…

JupyterHub实战应用

一、JupyerHub jupyter notebook 是一个非常有用的工具&#xff0c;我们可以在浏览器中任意编辑调试我们的python代码&#xff0c;并且支持markdown 语法&#xff0c;可以说是科研利器。但是这种情况适合个人使用&#xff0c;也就是jupyter notebook以我们自己的主机作为服务器…

PostgreSQL逻辑备份pg_dump使用及其原理解析

一、原理分析 1、循环调用getopt_long解析命令行参数&#xff0c;将参数保存到static DumpOptions dopt;中 2、判断参数是否相容&#xff0c;不相容则退出&#xff1a; options -s/--schema-only and -a/--data-only cannot be used togetheroptions -c/--clean and -a/--data…

uni-app中监听网络状态,并在嵌入webView页面的组件中添加网络监测

uni-app中监听网络状态&#xff0c;并在嵌入webView页面的组件中添加网络监测 uni-app中监听网络状态 下载插件 打开网络异常组件页面&#xff0c;点击"下载插件并导入HBuilderX"按钮&#xff0c;打开HBuilderX软件后&#xff0c;选择需要导入插件的项目&#xff…