背景 flink1.15 hadoop3.0 pom文件
<?xml version="1.0" encoding="UTF-8"?>
< project xmlns = " http://maven.apache.org/POM/4.0.0" xmlns: xsi= " http://www.w3.org/2001/XMLSchema-instance" xsi: schemaLocation= " http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion> 4.0.0</ modelVersion> < groupId> com.iceberg</ groupId> < artifactId> flink-iceberg</ artifactId> < version> 1.0-SNAPSHOT</ version> < properties> < maven.compiler.source> 8</ maven.compiler.source> < maven.compiler.target> 8</ maven.compiler.target> < flink.version> 1.15.3</ flink.version> < java.version> 1.8</ java.version> < scala.binary.version> 2.12</ scala.binary.version> < slf4j.version> 1.7.30</ slf4j.version> </ properties> < dependencies> < dependency> < groupId> org.apache.flink</ groupId> < artifactId> flink-core</ artifactId> < version> ${flink.version}</ version> </ dependency> < dependency> < groupId> org.apache.flink</ groupId> < artifactId> flink-java</ artifactId> < version> ${flink.version}</ version> </ dependency> < dependency> < groupId> org.apache.flink</ groupId> < artifactId> flink-streaming-java</ artifactId> < version> ${flink.version}</ version> </ dependency> < dependency> < groupId> org.apache.flink</ groupId> < artifactId> flink-table-planner_${scala.binary.version}</ artifactId> < version> ${flink.version}</ version> </ dependency> < dependency> < groupId> org.apache.flink</ groupId> < artifactId> flink-connector-files</ artifactId> < version> ${flink.version}</ version> </ dependency> < dependency> < groupId> org.apache.flink</ groupId> < artifactId> flink-runtime-web</ artifactId> < version> ${flink.version}</ version> </ dependency> < dependency> < groupId> org.apache.flink</ groupId> < artifactId> flink-statebackend-rocksdb</ artifactId> < version> ${flink.version}</ version> </ dependency> < dependency> < groupId> org.apache.hadoop</ groupId> < artifactId> hadoop-client</ artifactId> < version> 3.1.3</ version> < scope> compile</ scope> </ dependency> < dependency> < groupId> org.apache.iceberg</ groupId> < artifactId> iceberg-flink-runtime-1.15</ artifactId> < version> 1.3.0</ version> </ dependency> < dependency> < groupId> org.apache.iceberg</ groupId> < artifactId> iceberg-core</ artifactId> < version> 1.3.0</ version> </ dependency> </ dependencies> < build> < plugins> < plugin> < artifactId> maven-compiler-plugin</ artifactId> < version> 3.8.1</ version> < configuration> < source> 1.8</ source> < target> 1.8</ target> </ configuration> </ plugin> < plugin> < groupId> org.apache.maven.plugins</ groupId> < artifactId> maven-assembly-plugin</ artifactId> < version> 3.3.0</ version> < configuration> < archive> < manifest> < mainClass> com.iceberg.flink.UnionDelData</ mainClass> </ manifest> </ archive> < descriptorRefs> < descriptorRef> jar-with-dependencies</ descriptorRef> </ descriptorRefs> </ configuration> < executions> < execution> < id> make-assembly</ id> < phase> package</ phase> < goals> < goal> single</ goal> </ goals> </ execution> </ executions> </ plugin> </ plugins> </ build>
</ project>
资源配置文件 hadoop三个常用配置文件core-site.xml hdfs-site.xml yarn-site.xml 放到资源目录下 java代码
package com. iceberg. flink ; import org. apache. hadoop. conf. Configuration ;
import org. apache. iceberg. Table ;
import org. apache. iceberg. catalog. TableIdentifier ;
import org. apache. iceberg. flink. actions. Actions ;
import org. apache. iceberg. hadoop. HadoopCatalog ; import java. io. File ;
import java. net. MalformedURLException ; public class UnionDelData { public static void main ( String [ ] args) throws MalformedURLException { String tableNames = args[ 1 ] ; long targetsSize = parseSizeToBytes ( args[ 2 ] ) ; int parallelism = Integer . parseInt ( args[ 3 ] ) ; long retainTime = parseTimeToMillis ( args[ 4 ] ) ; int retainLastNum = Integer . parseInt ( args[ 5 ] ) ; Configuration conf = new Configuration ( ) ; conf. addResource ( new File ( "/home/hadoop/hadoopconf/core-site.xml" ) . toURI ( ) . toURL ( ) ) ; conf. addResource ( new File ( "/home/hadoop/hadoopconf/hdfs-site.xml" ) . toURI ( ) . toURL ( ) ) ; conf. addResource ( new File ( "/home/hadoop/hadoopconf/yarn-site.xml" ) . toURI ( ) . toURL ( ) ) ; HadoopCatalog hadoopCatalog = new HadoopCatalog ( conf, "/user/hadoop/path/" ) ; for ( String tableName : tableNames. split ( "," ) ) { Table table = hadoopCatalog. loadTable ( TableIdentifier . of ( "prod" , tableName) ) ; UnionDataFile ( table, parallelism, targetsSize) ; deleteSnap ( table, retainTime, retainLastNum) ; } } public static void UnionDataFile ( Table table, int parallelism, long targetsSize) { Actions . forTable ( table) . rewriteDataFiles ( ) . maxParallelism ( parallelism) . caseSensitive ( false ) . targetSizeInBytes ( targetsSize) . execute ( ) ; } public static void deleteSnap ( Table table, long retainTime, int retainLastNum) { Snapshot snapshot = table. currentSnapshot ( ) ; long oldSnapshot = snapshot. timestampMillis ( ) - retainTime; if ( snapshot != null ) { table. expireSnapshots ( ) . expireOlderThan ( oldSnapshot) . cleanExpiredFiles ( true ) . retainLast ( retainLastNum) . commit ( ) ; } } public static long parseSizeToBytes ( String sizeWithUnit) { long size = Long . parseLong ( sizeWithUnit. substring ( 0 , sizeWithUnit. length ( ) - 1 ) ) ; char unit = sizeWithUnit. charAt ( sizeWithUnit. length ( ) - 1 ) ; switch ( unit) { case 'B' : return size; case 'K' : case 'k' : return size * 1024 ; case 'M' : case 'm' : return size * 1024 * 1024 ; case 'G' : case 'g' : return size * 1024 * 1024 * 1024 ; default : throw new IllegalArgumentException ( "Invalid size unit: " + unit) ; } } public static long parseTimeToMillis ( String timeWithUnit) { long time = Long . parseLong ( timeWithUnit. substring ( 0 , timeWithUnit. length ( ) - 1 ) ) ; char unit = timeWithUnit. charAt ( timeWithUnit. length ( ) - 1 ) ; switch ( unit) { case 's' : case 'S' : return time * 1000 ; case 'm' : case 'M' : return time * 60 * 1000 ; case 'h' : case 'H' : return time * 60 * 60 * 1000 ; case 'd' : case 'D' : return time * 24 * 60 * 60 * 1000 ; default : throw new IllegalArgumentException ( "Invalid time unit: " + unit) ; } }
}