1. 添加依赖包
这里使用的版本时1.14.6,scala版本是2.12.
<dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime-1.14</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency>
2. 包装工具类
注意检查依赖的包
import org.apache.iceberg.{Table}
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.iceberg.hive.HiveCatalogobject IcebergUtil {/*** 加载catalogLoader,使用多个表时,只需要加载一次* @return*/def hiveCatalogLoader():(HiveCatalog,CatalogLoader) = {val catalog = new HiveCatalog()val hiveProp = new java.util.HashMap[String, String]()hiveProp.put("warehouse", "hdfs://ns1/user/hive/warehouse")hiveProp.put("uri", "thrift://192.168.0.100:9083,thrift://192.168.0.101:9083")catalog.initialize("hive", hiveProp)val catalogLoader = CatalogLoader.hive("hive",new org.apache.hadoop.conf.Configuration(), hiveProp)(catalog,catalogLoader)}def tableLoad(catalog:Catalog, catalogLoader:CatalogLoader,dbName:String, tableName:String):(Table,TableLoader) = {val tableIdentifier = TableIdentifier.of(dbName, tableName)val table = catalog.loadTable(tableIdentifier)val tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier)(table,tableLoader)}}
3. 读取数据
这里设置streaming(false) 将按照批次读取。
private def getStream(iceberg: String)(env: StreamExecutionEnvironment): datastream.DataStream[RowData] = {val (catalog,hiveCatalogLoader) = IcebergUtil.hiveCatalogLoader()val (table, tableLoader) = IcebergUtil.tableLoad(catalog, hiveCatalogLoader, "iceberg_dw", iceberg)FlinkSource.forRowData().env(env).table(table).tableLoader(tableLoader).streaming(false).build()}