Flink Hive Catalog操作案例

在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。

基础环境

Hive-3.1.3
Flink-1.17.1

基本操作与准备

1、上传依赖jar包到flink/lib目录下

cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
cp mysql-connector-j-8.1.0.jar

2、更换planner依赖(Hive集成的推荐设置)

mv /usr/sft/flink-1.17.1/opt/flink-table-planner_2.12-1.17.1.jar /usr/sft/flink-1.17.1/lib/
mv /usr/sft/flink-1.17.1/lib/flink-table-planner-loader-1.17.1.jar /usr/sft/flink-1.17.1/opt/

3、启动Hive MetaStore

nohup hive --service metastore 2>&1 &

4、启动flink集群和sql-client

yarn-session.sh -d -nm flink-cluster
sql-client.sh embedded -s yarn-session

5、在flink sql-client中创建hive catalog

CREATE CATALOG hive WITH ('type' = 'hive','default-database' = 'sty','hive-conf-dir' = '/usr/sft/hive-3.1.3/conf'
);

非分区表读写

1、Hive中建表并插入数据

create table behavior(
username string,
behavior string
);
insert into behavior values('lisi','buy'),('zhangsan','read');

2、使用hive catalog

use catalog hive;

2、flink sql-client中执行数据插入与数据查询(和常规sql一致)

insert into behavior values('sisi','buy'),('tracy','read');
select *from behavior;

在这里插入图片描述

分区表读写

这里和非分区表有所不同,主要体现在建表层面,参考博客:https://www.jianshu.com/p/295066a24092

写入到hive分区表
streamEnv需要开启checkpoint,保证flink写入hive分区表的写入一致性
hive表ddl中需要指定以下TBLPROPERTIES:
sink.partition-commit.trigger:分区提交触发器,单选,可选值为partition-time、process-time(默认), 其中partition-time需要根据当前数据的watermark来判断分区是否需要提交,当watermark + delay大于等于分区上的时间时就会提交该分区元数据;process-time的话根据当前系统处理时间来判断分区是否需要提交,当系统处理时间大于等于分区上的时间就会提交该分区元数据
partition.time-extractor.timestamp-pattern:使用partition-time触发器时使用该配置项。表示从表字段中提取出表达某个分区的时间的格式,需要提取到的时间必须为yyyy-MM-dd HH:mm:ss的格式。比如字段dt的格式为yyyy-MM-dd,则配置为$dt 00:00:00则表示分区时间取值为dt的value的0点0分0秒,可以选择多个表字段组合。当表字段无法抽取出符合的格式时,则使用自定义提取器partition.time-extractor.class。
sink.partition-commit.delay: 表示watermark允许event time的最大乱序时间,使用partition-time触发器时可以使用,默认为0s
sink.partition-commit.policy.kind:分区提交方式,多选,可选值为metastore、success-file、custom,metastore表示写入元数据库,success-file表示往hdfs分区目录写入一个标志文件,custom表示使用自定义提交方式,通常使用metastore,success-file组合
partition.time-extractor.kind:当要使用自定义分区时间提取器时需要配置此项,值配置为custom
partition.time-extractor.class:当要使用自定义分区时间提取器时需要配置此项,值配置为自定义提取器的类路径。在集群中运行时,需要把该类打成jar包放到flink lib目录下。
某个分区触发提交后,后续再有此分区的数据进来,仍然会写入hive该分区。
作者:spongebobZ
链接:https://www.jianshu.com/p/295066a24092
来源:简书

1、hive创建分区表并插入数据

create table userinfo(
name string,
age int
)
partitioned by (dt string)
stored as orc
tblproperties('sink.partition-commit.trigger' = 'partition-time','sink.partition-commit.policy.kind'='metastore,success-file','partition.time-extractor.timestamp-pattern' ='yyyy-MM-dd HH:mm:ss','sink.partition-commit.delay' = '10'
);insert into table userInfo partition(dt='2023-10-26') values('zhangsan',23);
insert into table userInfo partition(dt='2023-10-26') values('lisi',26),('wangwu',27);

注意:若建表时未在tblproperties中配置恰当的sink.partition-commit.policy.kind,flink sql-client插入数据时将遇到如下报错:

Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive`.`sty`.`userInfo` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind

2、flink sql-client插入与查询数据

insert into  userinfo partition(dt='2023-10-24') values('tracy',26),('lily',27);
select *from userinfo;

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/120330.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于springboot实现校友社交平台管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现校友社交平台管理系统演示 摘要 校友社交系统提供给用户一个校友社交信息管理的网站,最新的校友社交信息让用户及时了解校友社交动向,完成校友社交的同时,还能通过论坛中心进行互动更方便。本系统采用了B/S体系的结构,使用了java技…

Linux下的文件操作和文件管理

文章目录 应用编程文件操作文件描述符open函数write函数read函数close函数lseek函数文件操作例子 文件管理文件基本知识文件类型文件共享空洞文件错误处理退出程序原子操作fcntl和ioctl截断文件stat函数软链接和硬链接 应用编程 系统调用(system call)是Linux内核提供给应用层…

便利连锁:如何增加收益?教你一招轻松搞定!

自动售货机,作为零售行业的一项颠覆性技术,正逐渐改变着我们的购物方式和商业格局。这一创新技术不仅重新定义了零售业务模式,还为企业提供了更多的机会来满足不断演变的消费者需求。 客户案例 便利连锁店 成都某便利连锁店面临一系列挑战&am…

ASP.NET Core3.1 API 创建(Swagger配置、数据库连接Sql Server)、开发、部署

文章目录 创建项目点击Nuget安装包删除原有controllers编辑新建controll、添加注释Startup 注册Swagger服务使用swagger中间件配置XML注释更改启动端口 launchsettings.json在startup.cs跨域处理运行 数据库设计与连接安装库新建类继承框架根据数据库表设计对应设计类在DataCon…

【QT】其他常用控件2

新建项目 lineEdit 什么都不显示(linux password) password textEdit和plainTextEdit spinBox和doubleSpinBox timeEdit、dateEdit、dateTimeEdit label 显示图案,导入资源:【QT】资源文件导入_复制其他项目中的文件到qt项目中_St…

jmeter界面压测过程卡死解决思路

1、排查压测机的资源是否充足; 2、检查jmeter压测脚本,除聚合报告的所有组件关闭; 我在压测过程中出现频繁卡死,就是查看结果数和断言结果信息量过多导致: 3、直接用非gui界面形式,也就是脚本形式压测。

【路径规划】A*算法 Java实现

A*(A-Star)算法是一种广泛使用的寻路算法,尤其在计算机科学和人工智能领域。 算法思想 通过评估函数来引导搜索过程,从而找到从起始点到目标点的最短路径。评估函数通常包括两部分:一部分是已经走过的实际距离&#x…

Linux docker 安装 部署

docker 安装 linux系统离线安装docker 如何使用docker部署c/c程序 常用命令 给予 docker 访问 gui 的权限 在 /etc/profile 末尾添加 if [ "$DISPLAY" ! "" ] thenxhost fi在执行 更新 source /etc/profiledocker下载镜像 docker search gcc #搜索d…

C#的DataGridView数据控件(直接访问SQL vs 通过EF实体模型访问SQL)

目录 一、在DataGridView控件中显示数据 1.直接编程访问SQL (1)源码 (2)生成效果 2.通过EF实体模型访问SQL (1)源码 (2)生成效果 二、获取DataGridView控件中的当前单元格 …

文献阅读(207)FPGA HBM

题目:HBM Connect: High-Performance HLS Interconnect for FPGA HBM时间:2021会议:FPGA研究机构:UCLA Jason Cong 题目:Demystifying the Memory System of Modern Datacenter FPGAs for Software Programmers throug…

K-Means和KNN

主要区别 从无序 —> 有序 从K-Means —> KNN KNN:监督学习,类别是已知的,对已知分类的数据进行训练和学习,找到不同类的特征,再对未分类的数据进行分类。K-Means:无监督学习,事先不知道…

【Ubuntu18.04】激光雷达与相机联合标定(Livox+HIKROBOT)(一)相机内参标定

LivoxHIKROBOT联合标定——相机内参标定 引言1 海康机器人HIKROBOT SDK二次开发并封装ROS1.1 介绍1.2 安装MVS SDK1.3 封装ROS packge 2 览沃Livox SDK二次开发并封装ROS3 相机雷达联合标定——相机内参标定3.1 环境配置3.1.1 安装依赖——PCL 安装3.1.2 安装依赖——Eigen 安装…

快捷键记录

文章目录 ctrlaltashftwinsWinRCtrlc和CtrlvCtrl -Xshell的复制粘贴ctrlalt(鼠标跳出)ctrl alt T ctrlalta 这是QQ/TIM的屏幕截图快捷键。截图成功后,会有一栏导航,可以对图片进行勾画、模糊、绘画、标号、撤回、翻译、提取文…

百度文心一言4.0抢先体验教程!

🍁 展望:关注我, AI学习之旅上,我与您一同成长! 一、 引言 想快速体验文心一言4.0,但又觉得技术难度太高?别担心,我来手把手教你! 🚀 10月17日,文心一言4.0…

【Overload游戏引擎细节分析】PBR材质Shader

PBR基于物理的渲染可以实现更加真实的效果,其Shader值得分析一下。但PBR需要较多的基础知识,不适合不会OpenGL的朋友。 一、PBR理论 PBR指基于物理的渲染,其理论较多,需要的基础知识也较多,我在这就不再写一遍了&…

Centos使用war文件部署jenkins

部署jenkins所需要的jdk环境如下: 这里下载官网最新的版本: 选择jenkins2.414.3版本,所以jdk环境最低得是java11 安装java11环境 这里直接安装open-jdk yum -y install java-11-openjdk.x86_64 java-11-openjdk-devel.x86_64下载jenkins最新…

leetcode第80题:删除有序数组中的重复项 II

题目描述 给你一个有序数组 nums ,请你 原地 删除重复出现的元素,使得出现次数超过两次的元素只出现两次 ,返回删除后数组的新长度。 不要使用额外的数组空间,你必须在 原地 修改输入数组 并在使用 O(1) 额外空间的条件下完成。 …

人性与理性共赢,真心罐头跃过增长的山海关

在北方不少地方,黄桃罐头是一种抚慰人心的力量。从大连起家,用真材实料打动人心的真心罐头,在朝着国民品牌前进的路上,需要更透彻地洞悉“人性”。 ”人的因素影响太大。我们希望可以告别个人英雄主义,用流程来保证可…

139.【JUC并发编程-04】

JUC-并发编程04 (八)、共享模型之工具1.线程池(1).自定义线程池_任务数小于队列容量(2).自定义线程池_任务数大于队列容量(3).自定义线程池_拒绝策略 2.ThreadPoolExecutor(1).线程池状态(2).构造方法(3).newFixedThreadPool (固定大小线程池)(4).newCachedThreadPool (缓存线程…

设树B是一棵采用链式结构存储的二叉树,编写一个把树 B中所有结点的左、右子树进行交换的函数 中国科学院大学2015年数据结构(c语言代码实现)

本题代码如下 void swap(tree* t) {if (*t)// 如果当前节点非空 {treenode* temp (*t)->lchild;// 临时存储左子节点 (*t)->lchild (*t)->rchild;// 将右子节点赋值给左子节点(*t)->rchild temp;// 将临时存储的左子节点赋值给右子节点 swap(&(*t)->l…