Flink Catalog

1.Flink侧创建

  按照SQL的解析处理流程在Parse解析SQL以后,进入执行流程——executeInternal。
  其中有个分支专门处理创建Catalog的SQL命令

} else if (operation instanceof CreateCatalogOperation) {return createCatalog((CreateCatalogOperation) operation);
createCatalog方法里完成两件事:1、创建Catalog对象;2、向catalogManager注册
Catalog catalog =FactoryUtil.createCatalog(catalogName, properties, tableConfig, userClassLoader);
catalogManager.registerCatalog(catalogName, catalog);

  创建Catalog会去全包查找对应的CatalogFactory的子类,然后使用配置的子类构建

final CatalogFactory legacyFactory =TableFactoryService.find(CatalogFactory.class, options, classLoader);
return legacyFactory.createCatalog(catalogName, options);

  这里注意,上面的步骤只查询classpath下的类,像HiveCatalog这种外置增加的,在这个步骤里找不到,会抛出NoMatchingTableFactoryException异常之后继续其他步骤处理来获取

} catch (NoMatchingTableFactoryException e) {// No matching legacy factory found, try using the new stackfinal DefaultCatalogContext discoveryContext =new DefaultCatalogContext(catalogName, options, configuration, classLoader);try {final CatalogFactory factory = getCatalogFactory(discoveryContext);

  最终在FactoryUtil.discoverFactory的方法中进行过滤查找,这里用到了type配置做过滤,基于Factory的

factoryIdentifier获取工厂的字段与配置做对比
final List<Factory> matchingFactories =foundFactories.stream().filter(f -> f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());

2.HiveCatalog

  获取到对应的Factory以后,会调用其createCatalog方法创建对应的Catalog

return new HiveCatalog(context.getName(),helper.getOptions().get(DEFAULT_DATABASE),helper.getOptions().get(HIVE_CONF_DIR),helper.getOptions().get(HADOOP_CONF_DIR),helper.getOptions().get(HIVE_VERSION));

  HiveCatalog的整个创建过程主要是发现Hive配置的过程,其他接口就是对库表的操作接口
  获取配置主要是基于上面hive-conf-dir、hadoop-conf-dir来的,首先是根据这两个配置去获取hive配置,如果都获取不到,会从classpath下面去获取hive的配置文件

URL hiveSite =Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE);

3.IcebergCatalog

  Iceberg走的应该是前面TableFactoryService.find能找到的接口,因为它实现的是properties参数的接口,clusterHadoopConf()就是调用的Flink里的方法获取Hadoop的配置

@Override
public Catalog createCatalog(String name, Map<String, String> properties) {return createCatalog(name, properties, clusterHadoopConf());
}

3.1.CatalogLoader

  第一步是创建CatalogLoader,这是Iceberg Catalog的类加载器
  这里可以配置自定义类加载器,相关配置:catalog-impl,如果没有配置则走默认
  默认流程根据catalog-type配置选择实例化Hive的还是Hadoop的,默认是Hive的

String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
switch (catalogType.toLowerCase(Locale.ENGLISH)) {case ICEBERG_CATALOG_TYPE_HIVE:// The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in// that case it will// fallback to parse those values from hadoop configuration which is loaded from classpath.String hiveConfDir = properties.get(HIVE_CONF_DIR);String hadoopConfDir = properties.get(HADOOP_CONF_DIR);Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);return CatalogLoader.hive(name, newHadoopConf, properties);case ICEBERG_CATALOG_TYPE_HADOOP:return CatalogLoader.hadoop(name, hadoopConf, properties);
}

  创建CatalogLoader主要就是进行一些基本参数的设置

private HiveCatalogLoader(String catalogName, Configuration conf, Map<String, String> properties) {this.catalogName = catalogName;this.hadoopConf = new SerializableConfiguration(conf);this.uri = properties.get(CatalogProperties.URI);this.warehouse = properties.get(CatalogProperties.WAREHOUSE_LOCATION);this.clientPoolSize =properties.containsKey(CatalogProperties.CLIENT_POOL_SIZE)? Integer.parseInt(properties.get(CatalogProperties.CLIENT_POOL_SIZE)): CatalogProperties.CLIENT_POOL_SIZE_DEFAULT;this.properties = Maps.newHashMap(properties);
}

3.2.FlinkCatalog

  接下来就是进行一些配置然后创建FlinkCatalog
  配置里注意Hadoop有一个特殊的配置:base-namespace,这是配置namespa的,会自动带上前缀,应该就是在warehouse加上前缀
  这里还有缓存配置:cache-enabled、cache.expiration-interval-ms,控制Catalog是否缓存表入口

3.3.loadCatalog

  FlinkCatalog会使用CatalogLoader加载Catalog,最终会到CatalogUtil.loadCatalog()
  这里最终会用Class.forName来加载类,基于Constructor来构建实例

  ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked();catalog = ctor.newInstance();

3.4.HiveCatalog

  Hive类型最终创建的是org.apache.iceberg.hive.HiveCatalog
  initialize初始化也基本上是进行配置,有两个注意的对象:FileIO、CachedClientPool
  io-impl可以配置文件读取,默认用Iceberg的HadoopFileIO

this.fileIO =fileIOImpl == null? new HadoopFileIO(conf): CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

  CachedClientPool是一个Hive连接缓存,缓存的是HiveMetaStoreClient

return GET_CLIENT.invoke(hiveConf, (HiveMetaHookLoader) tbl -> null, HiveMetaStoreClient.class.getName());

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/707670.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[多进程] 进程间通信-笔记

文章目录 创建进程的方法Linuxforkexecsystem Windowscreateprocessshellexecutesystem 进程间通信方法管道&#xff08;Pipe&#xff09;管道的种类特点 通过文件通信内存映射&#xff08;文件映射&#xff09;匿名映射 共享内存 信号量(或者锁)消息队列Windows消息队列Linux…

finedance 测试笔记

目录 依赖库&#xff1a; 预测流程&#xff1a; 音乐wav切割120帧 general_all.py改进 smplx 学习笔记&#xff1a; 依赖库&#xff1a; import pickle5 as picklepypi尚pickle5最高python版本3.7&#xff1a; pickle5 PyPI 解决方法&#xff0c;改为 import pickle …

cpp基础学习笔记01

C和C的区别 1.语言类型&#xff1a;C 是一种过程性编程语言&#xff0c;着重于以函数为基础的结构化编程&#xff1b;而 C 是一种多范式编程语言&#xff0c;支持面向对象编程&#xff08;OOP&#xff09;和泛型编程等多种编程范式。 2.对象模型&#xff1a;C 支持类和对象的概…

switch其他知识点

1.default的位置在整体输出语句中&#xff0c;放哪都可以&#xff1b; 省略的话结果会不显示&#xff1b; 2.case穿透&#xff0c;还是比较好理解的&#xff0c;因为缺少break导致会把下面的也打印&#xff0c;结果是输出多个 3.switch新特性&#xff1a;是用->减号和大于…

Mysql <=> 安全等于

<> 安全等于&#xff0c;为NULL安全的等值比较运算符&#xff08;NULL-safe equal&#xff09;&#xff0c;该操作符作用类似“”。 区别为当符号两边出现NULL值时&#xff0c;操作符会返回NULL&#xff0c;而<>会返回1&#xff08;两边操作数都为NULL时&#xff…

柯桥会计培训学校,会计职称考试,考中级会计怎么证明工作年限?

中级会计考试是会计从业人员的重要考试之一&#xff0c;对于中级考生来说&#xff0c;工作年限证明是必不可少的一步。因此&#xff0c;在考中级会计之前&#xff0c;需要对如何证明工作年限进行了解和掌握。 为大家整理了工作年限证明相关信息&#xff0c;一起来看看吧~ 一、…

Rocky Linux 运维工具 ls

一、ls 的简介 ​​ls​ 用于列出当前目录下的文件和目录&#xff0c;以及它们的属性信息。通过 ​ls​命令可以查看文件名、文件大小、创建时间等信息&#xff0c;并方便用户浏览和管理文件。 二、ls 的参数说明 序号参数描述1-a显示所有文件&#xff0c;包括以 ​.​开头的…

java单元测试技巧

Test装饰器指定断言类型 在JUnit 4中&#xff0c;你可以使用expected属性在Test注解中声明期望的异常类型。如&#xff1a; Test(expected Exception.class) public void testSqlSessionFactoryBeanWithNullDataSource() throws Exception {

5G双域快网

目录 一、业务场景 二、三类技术方案 2.1、专用DNN方案 2.2、ULCL方案&#xff1a;通用/专用DNNULCL分流 2.3、 多DNN方案-定制终端无感分流方案 漫游场景 一、业务场景 初期双域专网业务可划分为三类业务场景&#xff0c;学校、政务、文旅等行业均已提出公/专网融合访问需…

Spring MVC HandlerAdapter原理解析

在Spring MVC框架中&#xff0c;HandlerAdapter&#xff08;处理器适配器&#xff09;是一个非常重要的组件&#xff0c;它负责调用处理器&#xff08;Handler&#xff09;来处理客户端的请求。HandlerAdapter在请求处理流程中起到了桥梁的作用&#xff0c;连接了DispatcherSer…

【DDD】学习笔记-领域驱动设计对持久化的影响

资源库的实现 如何重用资源库的实现&#xff0c;以及如何隔离领域层与基础设施层的持久化实现机制&#xff0c;具体的实现还要取决于开发者对 ORM 框架的选择。Hibernate、MyBatis、jOOQ 或者 Spring Data JPA&#xff08;当然也包括基于 .NET 的 Entity Framework、NHibernat…

Acwing周赛记录

很难得参加一次周赛hhhhh这次参加的是第144场周赛&#xff0c;一共有三道题 AcWing 5473. 简单数对推理 给定两个整数数对&#xff0c;每个数对都包含两个 1∼9 之间的不同整数。 这两个数对恰好包含一个公共数&#xff0c;即恰好有一个整数同时包含于这两个数对。 给定这两…

选择排序,冒泡排序,插入排序,快速排序及其优化

目录 1 选择排序 1.1 原理 1.2 具体步骤 1.3 代码实现 1.4 优化 2 冒泡排序 2.1 原理 2.2 具体步骤 2.3 代码实现 2.4 优化 3 插入排序 3.1 原理 3.2 具体步骤 3.3 代码实现 3.4 优化 4. 快速排序 4.1 原理 4.2 具体步骤 4.3 代码实现 4.4 优化 为了讲…

linux动态库加载相关

linux下动态库搜索规则 (1)编译目标代码时指定的动态库搜索路径,也就是RPATH&#xff1b; (2)环境变量LD_LIBRARY_PATH指定的动态库搜索路径&#xff1b; (3)配置文件/etc/ld.so.conf中指定的动态库搜索路径&#xff1b;配置完毕后需运行ldconfig命令生效&#xff1b; (4)默…

Linux课程四课---Linux开发环境的使用(vim编辑器的相关)

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

【MySQL】内置函数 -- 详解

一、日期函数 日期&#xff1a;年月日时间&#xff1a;时分秒 1、获得年月日 2、获得时分秒 3、获得时间戳 4、在日期的基础上加日期 5、在日期的基础上减去时间 6、计算两个日期之间相差多少天 7、获得当前时间 ⚪练习 &#xff08;1&#xff09;记录生日 &#xff08;2&…

视频监控简史

安防系统中,视频监控始终是重头戏,是安防系统的核心。 目录 一、起步 二、模拟时代 三、数字时代 四、嵌入式存储 五、视频编码器 六、全数字化监控

Flask入门一(介绍、Flask安装、Flask运行方式及使用、虚拟环境、调试模式、配置文件、路由系统)

文章目录 一、Flask介绍二、Flask创建和运行1.安装2.快速使用3.Flask小知识4.flask的运行方式 三、Werkzeug介绍四、Jinja2介绍五、Click CLI 介绍六、Flask安装介绍watchdog使用python--dotenv使用&#xff08;操作环境变量&#xff09; 七、虚拟环境介绍Mac/linux创建虚拟环境…

家政按摩上门服务小程序搭建

家政按摩上门服务小程序支持技师入驻申请&#xff0c;用户可以通过在线下单预约家政服务&#xff0c;并根据距离、价格、销量好评度等条件进行筛选和选择。用户可以选择技师进行预约&#xff0c;并填写自己的服务地点和时间&#xff0c;享受上门服务。同时&#xff0c;技师也可…

【MySQL】_自连接与子查询、

目录 1. 自连接 2. 子查询&#xff08;嵌套查询&#xff09; 2.1 子查询分类 2.2 单行子查询示例1&#xff1a;查询不想毕业同学的同班同学 2.3 多行子查询示例2&#xff1a;查询语文或英语课程的信息成绩 3. 合并查询 3.1 示例1&#xff1a;查询id3或者名字为英文的课程…