Flink读取iceberg表

1. 添加依赖包

这里使用的版本时1.14.6,scala版本是2.12.

		<dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime-1.14</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency>

2. 包装工具类

注意检查依赖的包

import org.apache.iceberg.{Table}
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.iceberg.hive.HiveCatalogobject IcebergUtil {/*** 加载catalogLoader,使用多个表时,只需要加载一次* @return*/def hiveCatalogLoader():(HiveCatalog,CatalogLoader) = {val catalog = new HiveCatalog()val hiveProp = new java.util.HashMap[String, String]()hiveProp.put("warehouse", "hdfs://ns1/user/hive/warehouse")hiveProp.put("uri", "thrift://192.168.0.100:9083,thrift://192.168.0.101:9083")catalog.initialize("hive", hiveProp)val catalogLoader = CatalogLoader.hive("hive",new org.apache.hadoop.conf.Configuration(), hiveProp)(catalog,catalogLoader)}def tableLoad(catalog:Catalog, catalogLoader:CatalogLoader,dbName:String, tableName:String):(Table,TableLoader) = {val tableIdentifier = TableIdentifier.of(dbName, tableName)val table = catalog.loadTable(tableIdentifier)val tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier)(table,tableLoader)}}

3. 读取数据

这里设置streaming(false) 将按照批次读取。

  private def getStream(iceberg: String)(env: StreamExecutionEnvironment): datastream.DataStream[RowData] = {val (catalog,hiveCatalogLoader) = IcebergUtil.hiveCatalogLoader()val (table, tableLoader) = IcebergUtil.tableLoad(catalog, hiveCatalogLoader, "iceberg_dw", iceberg)FlinkSource.forRowData().env(env).table(table).tableLoader(tableLoader).streaming(false).build()}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/736801.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python之Web开发中级教程----搭建Web框架一

准备环境&#xff1a;ubuntu,Python3.6.9 一、Web应用程序的原理 接收并解析HTTP请求&#xff0c;获取客户的请求信息->处理完成请求的业务逻辑->返回处理结果HTTP响应。 Web框架的架构是这样的&#xff1a; 基于python的web框架&#xff0c;如tornado、flask、webpy都是…

谈软件的模块间依赖关系

一、什么是软件的模块间依赖关系 1.1、定义 软件的模块间依赖关系指的是在软件系统中&#xff0c;各个模块或组件之间的相互依赖和关联。这种依赖关系可以是直接的&#xff0c;也可以是间接的。具体来说&#xff0c;当一个模块需要调用另一个模块的功能、使用其数据或与其进行…

matlab去除图片上的噪声

本问题来自CSDN-问答板块,题主提问。 如何利用matlab去除图片上的噪声? 一、运行效果图 左边是原图,右边是去掉噪音后的图片。 二、中文说明 中值滤波是一种常见的图像处理技术,用于去除图像中的噪声。其原理如下: 1. 滤波器移动:中值滤波器是一个小的窗口,在图像上移…

SpringBoot中事务

SpringBoot中事务 需要的依赖 <dependencies><!--spring jdbc Spring 持久化层支持jar包--><dependency><groupId>org.springframework</groupId><artifactId>spring-jdbc</artifactId><version>6.0.2</version><…

python处理csv文件

1.使用 csv_writer.writerow # 导入CSV安装包 import csv# 1. 创建文件对象 f open(文件名.csv,a,encodingutf-8)# 2. 基于文件对象构建 csv写入对象 csv_writer csv.writer(f)# 3. 构建列表头 csv_writer.writerow(["问题","答案"])list_name[] # 4. 写…

Android中View对象的实例化方式

Android中View对象的实例化&#xff0c;主要有以下四种方式&#xff1a; &#xff08;1&#xff09;使用findViewById根据resId实例化View或者ViewGroup对象。 这种方式在根据xml生成View或ViewGroup对象时被普遍使用。不过&#xff0c;这种方式也存在较大限制&#xff0c;要求…

raid0、raid1、raid5、raid10选哪个?一文给你答案!

下午好&#xff0c;我的网工朋友。 关于磁盘阵列的用法&#xff0c;总有朋友对其用途与功能一知半解&#xff0c;很容易弄混。 而我们在做监控项目存储时&#xff0c;经常会用到磁盘阵列。 什么是磁盘阵列&#xff1f;为什么要做磁盘阵列&#xff1f;用什么样的磁盘阵列合适…

Buildroot 之一 详解源码及架构

在之前的博文中,我们学习了直接通过 Makefile 手动来进行构建 U-Boot 和 Linux Kernel 等,其实,目前存在多种嵌入式 Linux 环境的构建工具,其中,Buildroot 就是被广泛应用的一种。今天就来详细学习一个 Buildroot 这个自动化构建工具。 Buildroot Buildroot 是一个运行于…

Jenkins Pipeline实现Golang项目的CI/CD

Jenkins Pipeline实现Golang项目的CI/CD 背景 最近新增了一个Golang实现的项目&#xff0c;需要接入到现有的流水线架构中。 流程图 这边流程和之前我写过的一篇《基于Jenkins实现的CI/CD方案》差不多&#xff0c;不一样的是构建现在是手动触发的&#xff0c;没有配置webho…

IOT的发展历程及其优势——青创智通

工业互联网-物联网-设备改造-IOT-青创智通 ​随着科技的不断发展&#xff0c;物联网&#xff08;IoT&#xff09;已经逐渐成为了我们生活中不可或缺的一部分。IoT是指通过互联网将各种物理设备连接起来&#xff0c;实现设备之间的数据交换和智能化控制。IoT的发展不仅改变了我们…

Window10数据库崩溃启动失败,MySQL8.0.30通过data文件夹恢复数据库到Docker

背景&#xff1a; 昨天关机前还在使用mysql&#xff0c;一切正常&#xff0c;但今天打开电脑&#xff0c;发现mysql启动不起来了&#xff0c;老是提示端口占用&#xff0c;但是系统也没有新安装什么软件&#xff0c;而且通过查询nat命令也没发现3306端口占用。而且修改成3307等…

组态软件的概念

一、前言 组态软件是一种用于设计、配置和管理自动化系统的软件。它可以帮助用户快速地创建和修改自动化系统的界面、逻辑和通信功能&#xff0c;从而提高生产效率和质量。 二、组态软件的定义 组态软件是一种集成开发环境&#xff0c;用于设计、配置和管理自动化系统。它通…

ReentrantReadWriteLock学习

简介 ReentrantReadWriteLock 是 Java 并发包&#xff08;java.util.concurrent.locks&#xff09;中的一个类&#xff0c;它实现了一个可重入的读写锁。读写锁允许多个线程同时读取共享资源&#xff0c;但在写入共享资源时只允许一个线程进行。这种锁机制特别适用于读多写少的…

mysql笔记:12. 数据备份与还原

文章目录 一、数据备份1. 备份单个数据库2. 备份多个数据库3. 备份所有数据库 二、数据还原1. mysql命令2. source命令 在操作数据库时&#xff0c;难免会发生一些意外情况造成数据丢失。为了确保数据的安全&#xff0c;需要定期对数据库中的数据进行备份&#xff0c;这样当遇到…

两会声音|中国石化人大代表:要突出战略性新兴产业、未来产业的位置

十四届全国人大二次会议即将闭幕&#xff0c;“新质生产力”首次写入政府工作报告&#xff0c;并出现在了重要位置。政府工作报告主要从推动产业链供应链优化升级、积极培育新兴产业和未来产业、深入推进数字经济创新发展等三个方面进行了阐述和规划。 全国两会期间&#xff0c…

2024 年系统架构设计师(全套资料)

2024年5月系统架构设计师最新第2版教材对应的全套视频教程、历年真题及解析、章节分类真题及解析、论文写作及范文、教材、讲义、模拟题、答题卡等资料 1、2023年11月最新第2版本教材对应全套教程视频&#xff0c;2022年、2021年、2020年、2018年、2016年五套基础知识精讲视频、…

搭建nacos集群,并通过nginx实现负载均衡

nacos、eureka、consul、zookeeper等都是常用的微服务注册中心&#xff0c;这篇文章详细介绍一下在Ubuntu操作系统上搭建一个nacos的集群&#xff0c;以及通过nginx的反向代理功能实现nacos的负载均衡。 目录 一、安装nacos 1、安装nacos 2、修改nacos配置文件 3、创建naco…

MIT 6.858 计算机系统安全讲义 2014 秋季(二)

译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 本地客户端 注意&#xff1a; 这些讲座笔记略有修改&#xff0c;来自 2014 年 6.858 课程网站。 本文的目标是什么&#xff1f; 当时&#xff0c;浏览器只允许任何网页运行 JS&#xff08;Flash&#xff09;代码。 希…

【C++】6-13 学生成绩的快速录入(构造函数)分数 10

6-13 学生成绩的快速录入&#xff08;构造函数&#xff09; 分数 10 全屏浏览 切换布局 作者 何振峰 单位 福州大学 现在需要录入一批学生的成绩&#xff08;学号&#xff0c;成绩&#xff09;。其中学号是正整数&#xff0c;并且录入时&#xff0c;后录入学生的学号会比前…

学习JAVA的第十九天(基础)

目录 File 成员方法&#xff08;判断和获取&#xff09; 成员方法&#xff08;创建和删除&#xff09; 成员方法&#xff08;获取并遍历&#xff09; IO流 FileOutputStream FileInputStream 文件拷贝 前言&#xff1a;学习JAVA的第十八天&#xff08;基础&#xff09;…