16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例: Apache Hive
    • 1、支持的Hive版本
    • 2、依赖项
      • 1)、使用 Flink 提供的 Hive jar
      • 2)、用户定义的依赖项
      • 3)、移动 planner jar 包
    • 3、Maven 依赖
    • 4、连接到Hive
    • 5、DDL&DML


本文介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。
本文依赖环境是hadoop、zookeeper、hive、flink环境好用,本文内容以flink1.17版本进行介绍的,具体示例是在1.13版本中运行的(因为hadoop集群环境是基于jdk8的,flink1.17版本需要jdk11)。
更多的内容详见后续关于hive的介绍。

一、Table & SQL Connectors 示例: Apache Hive

Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

Flink 与 Hive 的集成包含两个层面。

一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

1、支持的Hive版本

Flink 支持以下的 Hive 版本。

  • 2.3
    2.3.0
    2.3.1
    2.3.2
    2.3.3
    2.3.4
    2.3.5
    2.3.6
    2.3.7
    2.3.8
    2.3.9
  • 3.1
    3.1.0
    3.1.1
    3.1.2
    3.1.3

某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:

  • Hive 内置函数在使用 Hive-2.3.0 及更高版本时支持。
  • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
  • 更改表的统计信息,在使用 Hive-2.3.0 及更高版本时支持。
  • DATE列统计信息,在使用 Hive-2.3.0 及更高版时支持。

2、依赖项

要与 Hive 集成,您需要在 Flink 下的/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。 或者,您可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的-C或-l选项将它们添加到 classpath 中。

Apache Hive 是基于 Hadoop 之上构建的, 首先您需要 Hadoop 的依赖,请参考 Providing Hadoop classes:

export HADOOP_CLASSPATH=`hadoop classpath`

有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar包。您可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包。如果您使用的 Hive 版本尚未在此处列出,则第二种方法会更适合。

注意:建议您优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足您的需求时,再考虑使用分开添加 jar 包的方式。

1)、使用 Flink 提供的 Hive jar

下表列出了所有可用的 Hive jar。您可以选择一个并放在 Flink 发行版的/lib/ 目录中。
在这里插入图片描述

2)、用户定义的依赖项

您可以在下方找到不同Hive主版本所需要的依赖项。

  • Hive 2.3.4
/flink-1.17.1/lib// Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jarsflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-2.3.4.jar// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar
  • Hive 3.1.0
/flink-1.17.1/lib// Flink's Hive connectorflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-3.1.0.jarlibfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar

3)、移动 planner jar 包

把 FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.1.jar 移动到 FLINK_HOME/lib 下,并且将 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.1.jar 移出去。 具体原因请参见 FLINK-25128。你可以使用如下命令来完成移动 planner jar 包的工作:

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.1.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.1.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.1.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.1.jar

只有当要使用 Hive 语法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移动。 但是在集成 Hive 的时候,推荐进行上述的操作。

3、Maven 依赖

如果您在构建自己的应用程序,则需要在 mvn 文件中添加以下依赖项。 您应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们。

<!-- Flink Dependency -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><!-- Hive Dependency -->
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version><scope>provided</scope>
</dependency>

4、连接到Hive

通过 TableEnvironment 或者 YAML 配置,使用 Catalog 接口 和 HiveCatalog连接到现有的 Hive 集群。

以下是如何连接到 Hive 的示例:

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf";HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");----------------------示例----------------------------
import java.util.List;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;/*** @author alanchan**/
public class TestHiveCatalogDemo {/*** @param args* @throws DatabaseNotExistException * @throws CatalogException */public static void main(String[] args) throws CatalogException, DatabaseNotExistException {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);String name = "alan_hive";// testhive 数据库名称String defaultDatabase = "testhive";String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);tenv.registerCatalog("alan_hive", hiveCatalog);// 使用注册的catalogtenv.useCatalog("alan_hive");List<String> tables = hiveCatalog.listTables(defaultDatabase); for (String table : tables) {System.out.println("Database:testhive  tables:" + table);}}}
  • sql
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'mydatabase','hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;------------------具体示例如下----------------------------
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in setFlink SQL> CREATE CATALOG alan_hivecatalog WITH (
>     'type' = 'hive',
>     'default-database' = 'testhive',
>     'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
> );
[INFO] Execute statement succeed.Flink SQL> show catalogs;
+------------------+
|     catalog name |
+------------------+
| alan_hivecatalog |
|  default_catalog |
+------------------+
2 rows in setFlink SQL> use alan_hivecatalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [alan_hivecatalog] does not exist in the catalog: [default_catalog].Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.Flink SQL> show tables;
+-----------------------------------+
|                        table name |
+-----------------------------------+
| alan_hivecatalog_hivedb_testtable |
|                         apachelog |
|                          col2row1 |
|                          col2row2 |
|                       cookie_info |
|                              dual |
|                         dw_zipper |
|                               emp |
|                          employee |
|                  employee_address |
|               employee_connection |
|                 ods_zipper_update |
|                          row2col1 |
|                          row2col2 |
|                            singer |
|                           singer2 |
|                           student |
|                      student_dept |
|               student_from_insert |
|                      student_hdfs |
|                    student_hdfs_p |
|                      student_info |
|                     student_local |
|                 student_partition |
|              t_all_hero_part_msck |
|                     t_usa_covid19 |
|                   t_usa_covid19_p |
|                              tab1 |
|                         tb_dept01 |
|                    tb_dept_bucket |
|                            tb_emp |
|                          tb_emp01 |
|                     tb_emp_bucket |
|                     tb_json_test1 |
|                     tb_json_test2 |
|                          tb_login |
|                      tb_login_tmp |
|                          tb_money |
|                      tb_money_mtn |
|                            tb_url |
|              the_nba_championship |
|                             tmp_1 |
|                        tmp_zipper |
|                         user_dept |
|                     user_dept_sex |
|                             users |
|                 users_bucket_sort |
|                   website_pv_info |
|                  website_url_info |
+-----------------------------------+
49 rows in set
  • ymal
execution:...current-catalog: alan_hivecatalog  # set the HiveCatalog as the current catalog of the sessioncurrent-database: testhivecatalogs:- name: alan_hivecatalog  type: hivehive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf

下表列出了通过 YAML 文件或 DDL 定义 HiveCatalog 时所支持的参数。

在这里插入图片描述

5、DDL&DML

在 Flink 中执行 DDL 操作 Hive 的表、视图、分区、函数等元数据时,参考:33、Flink之hive
Flink 支持 DML 写入 Hive 表,请参考:33、Flink之hive
以上,介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/68600.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker使用(一)生成,启动,更新(容器暂停,删除,再生成)

docker使用&#xff08;一&#xff09; 编写一个 Dockerfile构建镜像构建失败构建成功 运行镜像运行成功 修改代码后再次构建请不要直接进行构建&#xff0c;要将原有的旧容器删除或暂停停止成功删除成功再次构建且构建成功&#xff01; 要创建一个镜像&#xff0c;你可以按照以…

二叉树的介绍及二叉树的链式结构的实现(C语言版)

前言 二叉树是一种特殊的树&#xff0c;它最大的度为2&#xff0c;每个节点至多只有两个子树。它是一种基础的数据结构&#xff0c;后面很多重要的数据结构都是依靠它来进行实现的。了解并且掌握它是很重要的。 目录 1.二叉树的介绍 1.1概念 1.2现实中的二叉树 1.3特殊的二叉…

Linux下 Socket服务器和客户端文件互传

目录 1.项目描述 2.函数准备 2.1 gets函数 2.2 popen函数、fread函数 2.3 access 函数 2.4 exit 函数 2.5 strtok 函数 2.6 chdir函数 3.项目代码 3.1服务器代码 3.2客户端代码 4.问题总结 1.项目描述 基于Soket聊天服务器&#xff0c;实现服务器和客户端的文件传输。…

Mendix如何实现导出文件

刚刚接触Mendix低代码两周&#xff0c;花了一周在b站看初级视频然后考完初级&#xff0c;第二周开始做个列表查询感觉照葫芦画瓢没啥难度。但最近要求写个导出列表数据&#xff0c;在mendix社区翻了翻&#xff0c;这个功能算是常见的。找了mendix官方提供的Docs磕磕盼盼才实现了…

双向交错CCM图腾柱无桥单相PFC学习仿真与实现(4)一些优化总结

前言 上一次说到单相的PFC硬件功能已经实现&#xff0c;THD3.15%满足了国标要求的范围&#xff0c;还是有优化的空间&#xff0c;目前系统设计的是6.6Kw&#xff0c;220V交流输出&#xff0c;400-800V直流输出。目前基本功能完成&#xff0c;但是还有很多细节需要优化&#xf…

科技驱动产业升级:浅谈制造型企业对MES系统的应用

在科技不断进步的背景下&#xff0c;制造型行业也在持续发展&#xff0c;但随之而来的挑战也不断增加。传统的管理方式已经无法满足企业的需求&#xff0c;因此许多制造型企业开始寻找新的管理模式。制造执行系统&#xff08;MES&#xff09;作为先进的制造信息技术之一&#x…

学会这几步,教你1分钟辨出B站优质UP主!

品牌想要投放某UP主&#xff0c;该如何判断UP主是否优质并且同品牌相匹配呢&#xff1f;运用这一套多维度的UP主评估方法 &#xff0c;帮助你高效判断&#xff0c;快来看看具体怎么操作吧&#xff01; 一、up主粉丝涨跌 有些广告主在判断UP主是否值得投放时&#xff0c;会陷入…

9.3.tensorRT高级(4)封装系列-自动驾驶案例项目self-driving-车道线检测

目录 前言1. 车道线检测总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程&#xff0c;之前有看过一遍&#xff0c;但是没有做笔记&#xff0c;很多东西也忘了。这次重新撸一遍&#xff0c;顺便记记笔记。 本次课程学习 tensorRT 高级-自动驾驶案例项目self-driving-车道…

工服穿戴检测联动门禁开关算法

工服穿戴检测联动门禁开关算法通过yolov8深度学习框架模型&#xff0c;工服穿戴检测联动门禁开关算法能够准确识别和检测作业人员是否按照规定进行工服着装&#xff0c;只有当人员合规着装时&#xff0c;算法会发送开关量信号给门禁设备&#xff0c;使门禁自动打开。YOLO的结构…

港陆证券:五日线破位怎么看?

在股票交易中&#xff0c;五日线是个重要的技术指标之一&#xff0c;它能够反映出最近的商场趋势。假如五日线破位&#xff0c;这意味着商场呈现了趋势反转&#xff0c;出资者需求注重趋势改动&#xff0c;并采取相应的出资战略。 首先&#xff0c;咱们来看看五日线破位的原因…

【算法与数据结构】654、LeetCode最大二叉树

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;【算法与数据结构】106、LeetCode从中序与后序遍历序列构造二叉树这两道题有些类似&#xff0c;相关代…

OLED透明屏原彩优势和特点解析:开创显示技术新时代

OLED透明屏 原彩作为一项领先的显示技术&#xff0c;正以其卓越的性能和创新的设计特点引起广泛关注。 本文将通过深入探讨OLED透明屏 原彩的优势和特点、应用领域、技术发展以及未来前景等方面内容&#xff0c;并结合具体数据、报告和行业动态&#xff0c;为读者提供专业可信…

解决DNS服务器未响应错误的方法

​当你将设备连接到家庭网络或具有互联网接入功能的Wi-Fi热点时,由于各种原因,互联网连接可能无法正常工作。本文中的说明适用于Windows 10、Windows 8和Windows 7。 无法连接到DNS服务器的原因 故障的一类与域名系统有关,域名系统是世界各地互联网提供商使用的分布式名称…

W5500-EVB-PICO进行MQTT连接订阅发布教程(十二)

前言 上一章我们用开发板通过SNTP协议获取网络协议&#xff0c;本章我们介绍一下开发板通过配置MQTT连接到服务器上&#xff0c;并且订阅和发布消息。 什么是MQTT&#xff1f; MQTT是一种轻量级的消息传输协议&#xff0c;旨在物联网&#xff08;IoT&#xff09;应用中实现设备…

仿`gRPC`功能实现像调用本地方法一样调用其他服务器方法

文章目录 仿gRPC功能实现像调用本地方法一样调用其他服务器方法 简介单体架构微服务架构RPCgPRC gRPC交互逻辑服务端逻辑客户端逻辑示例图 原生实现仿gRPC框架编写客户端方法编写服务端方法综合演示 仿 gRPC功能实现像调用本地方法一样调用其他服务器方法 简介 在介绍gRPC简介…

【OpenCV入门】第五部分——图像运算

文章结构 掩模图像的加法运算图像的位运算按位与运算按位或运算按位取反运算按位异或运算图像位运算的运用 合并图像加权和覆盖 掩模 当计算机处理图像时&#xff0c;有些内容需要处理&#xff0c;有些内容不需要处理。能够覆盖原始图像&#xff0c;仅暴露原始图像“感兴趣区域…

Myvatis关联关系映射与表对象之间的关系

目录 一、关联关系映射 1.1 一对一 1.2 一对多 1.3 多对多 二、处理关联关系的方式 2.1 嵌套查询 2.2 嵌套结果 三、一对一关联映射 3.1 建表 ​编辑 3.2 配置文件 3.3 代码生成 3.4 编写测试 四、一对多关联映射 五、多对多关联映射 六、小结 一、关联关系映射 …

一文学会K8s集群搭建

环境准备 节点数量&#xff1a;2台虚拟机 centos7硬件配置&#xff1a;master节点内存至少3G&#xff08;2G后面在master节点初始化集群时会报错&#xff0c;内存不够&#xff09;&#xff0c;node节点可以2G&#xff0c;CPU至少2个&#xff0c;硬盘至少30G网络要求&#xff1…

Ant-Design-Pro-V5: ProTable前端导出excel表格。

Prtable表格中根据搜索条件实现excel表格导出。 代码展示&#xff1a; index.jsx import React, { useRef, useState, Fragment, useEffect } from react; import { getLecturerList, lecturerExportExcel } from /services/train/personnel; import { getOrgList, getSelec…

Navicat Premium 16.2.7 for Mac

Navicat Premium 16是一款功能强大的跨平台数据库管理工具&#xff0c;支持多种数据库类型&#xff0c;如MySQL、MariaDB、Oracle、SQLite、PostgreSQL等等。它提供了丰富的数据库管理功能和工具&#xff0c;可以帮助开发人员和数据库管理员快速地创建、管理和维护数据库。 Nav…