33、Flink之hive介绍与简单示例

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)

33、Flink之hive介绍与简单示例

42、Flink 的table api与sql之Hive Catalog


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例: Apache Hive
    • 1、支持的Hive版本
    • 2、依赖项
      • 1)、使用 Flink 提供的 Hive jar
      • 2)、用户定义的依赖项
      • 3)、移动 planner jar 包
    • 3、Maven 依赖
    • 4、连接到Hive
    • 5、DDL&DML


本文介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。
本文依赖环境是hadoop、zookeeper、hive、flink环境好用,本文内容以flink1.17版本进行介绍的,具体示例是在1.13版本中运行的(因为hadoop集群环境是基于jdk8的,flink1.17版本需要jdk11)。
更多的内容详见后续关于hive的介绍。

一、Table & SQL Connectors 示例: Apache Hive

Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

Flink 与 Hive 的集成包含两个层面。

一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

关于flink与hive集成的部分请参考:42、Flink 的table api与sql之Hive Catalog

1、支持的Hive版本

Flink 支持以下的 Hive 版本。

  • 2.3
    2.3.0
    2.3.1
    2.3.2
    2.3.3
    2.3.4
    2.3.5
    2.3.6
    2.3.7
    2.3.8
    2.3.9
  • 3.1
    3.1.0
    3.1.1
    3.1.2
    3.1.3

某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:

  • Hive 内置函数在使用 Hive-2.3.0 及更高版本时支持。
  • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
  • 更改表的统计信息,在使用 Hive-2.3.0 及更高版本时支持。
  • DATE列统计信息,在使用 Hive-2.3.0 及更高版时支持。

2、依赖项

要与 Hive 集成,您需要在 Flink 下的/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。 或者,您可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的-C或-l选项将它们添加到 classpath 中。

Apache Hive 是基于 Hadoop 之上构建的, 首先您需要 Hadoop 的依赖,请参考 Providing Hadoop classes:

export HADOOP_CLASSPATH=`hadoop classpath`

有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar包。您可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包。如果您使用的 Hive 版本尚未在此处列出,则第二种方法会更适合。

注意:建议您优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足您的需求时,再考虑使用分开添加 jar 包的方式。

1)、使用 Flink 提供的 Hive jar

下表列出了所有可用的 Hive jar。您可以选择一个并放在 Flink 发行版的/lib/ 目录中。
在这里插入图片描述

2)、用户定义的依赖项

您可以在下方找到不同Hive主版本所需要的依赖项。

  • Hive 2.3.4
/flink-1.17.1/lib// Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jarsflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-2.3.4.jar// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar
  • Hive 3.1.0
/flink-1.17.1/lib// Flink's Hive connectorflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-3.1.0.jarlibfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar

3)、移动 planner jar 包

把 FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.1.jar 移动到 FLINK_HOME/lib 下,并且将 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.1.jar 移出去。 具体原因请参见 FLINK-25128。你可以使用如下命令来完成移动 planner jar 包的工作:

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.1.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.1.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.1.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.1.jar

只有当要使用 Hive 语法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移动。 但是在集成 Hive 的时候,推荐进行上述的操作。

3、Maven 依赖

如果您在构建自己的应用程序,则需要在 mvn 文件中添加以下依赖项。 您应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们。

<!-- Flink Dependency -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><!-- Hive Dependency -->
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version><scope>provided</scope>
</dependency>

4、连接到Hive

通过 TableEnvironment 或者 YAML 配置,使用 Catalog 接口 和 HiveCatalog连接到现有的 Hive 集群。

以下是如何连接到 Hive 的示例:

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf";HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");----------------------示例----------------------------
import java.util.List;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;/*** @author alanchan**/
public class TestHiveCatalogDemo {/*** @param args* @throws DatabaseNotExistException * @throws CatalogException */public static void main(String[] args) throws CatalogException, DatabaseNotExistException {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);String name = "alan_hive";// testhive 数据库名称String defaultDatabase = "testhive";String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);tenv.registerCatalog("alan_hive", hiveCatalog);// 使用注册的catalogtenv.useCatalog("alan_hive");List<String> tables = hiveCatalog.listTables(defaultDatabase); for (String table : tables) {System.out.println("Database:testhive  tables:" + table);}}}
  • sql
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'mydatabase','hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;------------------具体示例如下----------------------------
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in setFlink SQL> CREATE CATALOG alan_hivecatalog WITH (
>     'type' = 'hive',
>     'default-database' = 'testhive',
>     'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
> );
[INFO] Execute statement succeed.Flink SQL> show catalogs;
+------------------+
|     catalog name |
+------------------+
| alan_hivecatalog |
|  default_catalog |
+------------------+
2 rows in setFlink SQL> use alan_hivecatalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [alan_hivecatalog] does not exist in the catalog: [default_catalog].Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.Flink SQL> show tables;
+-----------------------------------+
|                        table name |
+-----------------------------------+
| alan_hivecatalog_hivedb_testtable |
|                         apachelog |
|                          col2row1 |
|                          col2row2 |
|                       cookie_info |
|                              dual |
|                         dw_zipper |
|                               emp |
|                          employee |
|                  employee_address |
|               employee_connection |
|                 ods_zipper_update |
|                          row2col1 |
|                          row2col2 |
|                            singer |
|                           singer2 |
|                           student |
|                      student_dept |
|               student_from_insert |
|                      student_hdfs |
|                    student_hdfs_p |
|                      student_info |
|                     student_local |
|                 student_partition |
|              t_all_hero_part_msck |
|                     t_usa_covid19 |
|                   t_usa_covid19_p |
|                              tab1 |
|                         tb_dept01 |
|                    tb_dept_bucket |
|                            tb_emp |
|                          tb_emp01 |
|                     tb_emp_bucket |
|                     tb_json_test1 |
|                     tb_json_test2 |
|                          tb_login |
|                      tb_login_tmp |
|                          tb_money |
|                      tb_money_mtn |
|                            tb_url |
|              the_nba_championship |
|                             tmp_1 |
|                        tmp_zipper |
|                         user_dept |
|                     user_dept_sex |
|                             users |
|                 users_bucket_sort |
|                   website_pv_info |
|                  website_url_info |
+-----------------------------------+
49 rows in set
  • ymal
execution:...current-catalog: alan_hivecatalog  # set the HiveCatalog as the current catalog of the sessioncurrent-database: testhivecatalogs:- name: alan_hivecatalog  type: hivehive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf

下表列出了通过 YAML 文件或 DDL 定义 HiveCatalog 时所支持的参数。

在这里插入图片描述

5、DDL&DML

在 Flink 中执行 DDL 操作 Hive 的表、视图、分区、函数等元数据时,参考:33、Flink之hive
Flink 支持 DML 写入 Hive 表,请参考:33、Flink之hive
以上,介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/61692.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

「MySQL-05」MySQL Workbench的下载和使用

目录 一、MySQL workbench的下载和安装 1. MySQL workbench介绍 2. 到MySQL官网下载mysql workbench 3. 安装workbench 二、创建能远程登录的用户并授权 1. 创建用户oj_client 2. 创建oj数据库 3. 给用户授权 4. 在Linux上登录用户oj_client检查其是否能操作oj数据库 三、使用…

C++ 友元

朋友可访问自己的东西&#xff0c;大概就这么个意思。即某类的友元类可访问该类的所有变量以及函数&#xff0c;或友元函数可以访问该类的变量以及函数&#xff0c;在朋友眼中没有任何隐藏&#xff0c;可谓时赤裸相对&#xff0c;肝胆相照&#xff0c;生生挚友。 注意&#xf…

聊聊检索增强,LangChain一把梭能行吗?

背景 ChatGPT诞生之初&#xff0c;大家仿佛从中看到了未来&#xff1a;可以拿着大语言模型&#xff08;LLM&#xff09;这把锤子&#xff0c;锤遍业务上的钉子。其中最被看好的场景&#xff0c;莫过于搜索&#xff0c;不仅是微软、谷歌、百度这样的大公司将LLM用到自己的搜索业…

ROS2学习(一):Ubuntu 22.04 安装 ROS2(Iron Irwini)

文章目录 一、ROS2(Iron Irwini)介绍二、ROS2(Iron Irwini)安装1.设置编码2.使能代码库3.安装ROS2 Iron 三、ROS2测试四、ROS2卸载 一、ROS2(Iron Irwini)介绍 官方文档 Iron Irwini版本支持的平台如下&#xff1a; 二、ROS2(Iron Irwini)安装 1.设置编码 sudo apt update…

Java实现根据关键词搜索京东商品列表数据方法,当当API接口(jd.item_search)申请指南

要通过京东网的API获取商品列表数据&#xff0c;您可以使用京东开放平台提供的接口来实现。以下是一种使用Java编程语言实现的示例&#xff0c;展示如何通过京东开放平台API获取商品列表&#xff1a; 首先&#xff0c;确保您已注册成为当当开放平台的开发者&#xff0c;并创建…

Windows安装单节点Zookeeper

刚学习Dubbo&#xff0c;在Centos7中docker安装的zookeeper3.7.1。然后在启动provider时一直报错&#xff0c;用尽办法也没有解决。然后zookeeper相关的知识虽然以前学习过&#xff0c;但是已经忘记的差不多了。现在学习dubbo只能先降低版本使用了&#xff0c;之后再复习zookee…

Golang数据结构和算法

Golang数据结构和算法 数据的逻辑结构和物理结构常见数据结构及其特点算法的时间复杂度和空间复杂度Golang冒泡排序Golang选择排序Golang插入排序Golang快速排序Golang归并排序Golang二分查找Golang sort包Golang链表Golang container/list标准库Golang栈stackGolang二叉搜索树…

解决css样式中last-child不生效的问题

需求 项目中需要使用v-for指令来渲染一个图片列表&#xff0c; 现状 发现&#xff0c;最后一个格子并没有跟下面绿色线对齐。 最后发现 是因为 每个格子都给了 margin-right&#xff1a;36px&#xff0c;影响到了最后一个格子 所以使用last-child 将最后一个格子的margin 属性…

ARM DIY(六)音频调试

前言 今天&#xff0c;调试一下音频 硬件焊接 硬件部分核心是 LM4871 音频功放芯片 对于 SOC 来讲很简单&#xff0c;就一个引脚 HPOUTL&#xff08;单声道&#xff09;&#xff1b;对于扬声器来讲也很简单&#xff0c;就两个引脚&#xff0c;插上就可以了。 另外一个关键点…

FastDFS+Nginx - 本地搭建文件服务器同时实现在外远程访问「端口映射」

文章目录 前言1. 本地搭建FastDFS文件系统1.1 环境安装1.2 安装libfastcommon1.3 安装FastDFS1.4 配置Tracker1.5 配置Storage1.6 测试上传下载1.7 与Nginx整合1.8 安装Nginx1.9 配置Nginx 2. 局域网测试访问FastDFS3. 安装cpolar内网穿透4. 配置公网访问地址5. 固定公网地址5.…

vue3中如何使用el-tooltip中的插槽达到换行效果

el-tooltip的content属性中的内容可以使用插槽来替换 话不多说&#xff0c;直接上代码 <el-tooltip effect"light" placement"top-start"><div slot"content" class"tips"> // 在这里运用插槽<p class"tip-tex…

OS 死锁处理

如果P先申请mutex 则mutex从1置零&#xff0c;假设申请到的empty 0则empty变成-1阻塞态 同理C中mutex从0变为-1&#xff0c;那么如果想离开阻塞态&#xff0c;那么就需要执行V&#xff08;empty&#xff09;但是如果执行V&#xff08;empty&#xff09;就需要P&#xff08;mu…

7.react useReducer使用与常见问题

useReducer函数 1. useState的替代方案.接收一个(state, action)>newState的reducer, 并返回当前的state以及与其配套的dispatch方法2. 在某些场景下,useReducer会比useState更加适用,例如state逻辑较为复杂, 且**包含多个子值**,或者下一个state依赖于之前的state等清楚us…

stm32之25.FLASH闪存

打开标准库 源码--- int main(void) {uint32_t d;Led_init();key_init();/* 初始化串口1波特率为115200bps&#xff0c;若发送/接收数据有乱码&#xff0c;请检查PLL */usart1_init(115200);printf("this is flash test\r\n");/* 解锁FLASH&#xff08;闪存&#xf…

亚马逊云科技 re:Inforce 大会云安全合规与技术实践及 Security Jam 大赛,快来报名吧!...

‍‍ 2023年8月31日在北京 亚马逊云科技 re:Inforce 大会 首次登陆中国&#xff01; 我们期待您的莅临&#xff0c; 并与您一起迎接 AI 时代&#xff0c; 开启全面智能的安全旅程&#xff01; 在13:00-17:00的 培训与动手实验环节中 云安全合规与技术实践 及 Security Jam 大赛…

Ansible学习笔记14

实现多台的分离实现&#xff1a; [rootlocalhost playbook]# cat example3.yaml --- - hosts: 192.168.17.105remote_user: roottasks:- name: create test1 directoryfile: path/test1/ statedirectory- hosts: 192.168.17.106remote_user: roottasks:- name: create test2 d…

Jenkins测试报告样式优化

方式一&#xff1a;修改Content Security Policy&#xff08;临时解决&#xff0c;Jenkins重启后失效) 1、jenkins首页—>ManageJenkins—>Tools and Actions标题下—>Script Console 2、粘贴脚本输入框中&#xff1a;System.setProperty("hudson.model.Directo…

Ubuntu学习---跟着绍发学linux课程记录(第一部分)

文章目录 1、启动、关闭、挂起、恢复&#xff08;电源&#xff09;2、更多虚拟机操作2.1 电源设置2.2 硬件参数设置2.3 状态栏2.4 全屏显示 3、快照与系统恢复4、桌面环境5、文件系统6、用户目录7、创建目录和文件8、命令行&#xff1a;文件列表ls 9、命令行&#xff1a;切换目…

Docker容器:docker consul的注册与发现及consul-template

Docker容器&#xff1a;docker consul的注册与发现及consul-template守护进程 一.docker consul的注册与发现介绍 1.什么是服务注册与发现 &#xff08;1&#xff09;服务注册与发现是微服务架构中不可或缺的重要组件。 &#xff08;2&#xff09;为解决服务都是单节点的&a…

XSS结合CSRF

假设我们获得了目标CMS的源码&#xff0c;搭建了一个相同的网站&#xff0c;我们在自己的网站执行添加用户的操作&#xff0c;并且用bp抓包 如图&#xff0c;这是我们抓到的添加用户的数据包 接下来&#xff0c;我们可以根据数据包构造js代码 <script> xmlhttp new XML…