Flink Hive Catalog操作案例

在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。

基础环境

Hive-3.1.3
Flink-1.17.1

基本操作与准备

1、上传依赖jar包到flink/lib目录下

cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
cp mysql-connector-j-8.1.0.jar

2、更换planner依赖(Hive集成的推荐设置)

mv /usr/sft/flink-1.17.1/opt/flink-table-planner_2.12-1.17.1.jar /usr/sft/flink-1.17.1/lib/
mv /usr/sft/flink-1.17.1/lib/flink-table-planner-loader-1.17.1.jar /usr/sft/flink-1.17.1/opt/

3、启动Hive MetaStore

nohup hive --service metastore 2>&1 &

4、启动flink集群和sql-client

yarn-session.sh -d -nm flink-cluster
sql-client.sh embedded -s yarn-session

5、在flink sql-client中创建hive catalog

CREATE CATALOG hive WITH ('type' = 'hive','default-database' = 'sty','hive-conf-dir' = '/usr/sft/hive-3.1.3/conf'
);

非分区表读写

1、Hive中建表并插入数据

create table behavior(
username string,
behavior string
);
insert into behavior values('lisi','buy'),('zhangsan','read');

2、使用hive catalog

use catalog hive;

2、flink sql-client中执行数据插入与数据查询(和常规sql一致)

insert into behavior values('sisi','buy'),('tracy','read');
select *from behavior;

在这里插入图片描述

分区表读写

这里和非分区表有所不同,主要体现在建表层面,参考博客:https://www.jianshu.com/p/295066a24092

写入到hive分区表
streamEnv需要开启checkpoint,保证flink写入hive分区表的写入一致性
hive表ddl中需要指定以下TBLPROPERTIES:
sink.partition-commit.trigger:分区提交触发器,单选,可选值为partition-time、process-time(默认), 其中partition-time需要根据当前数据的watermark来判断分区是否需要提交,当watermark + delay大于等于分区上的时间时就会提交该分区元数据;process-time的话根据当前系统处理时间来判断分区是否需要提交,当系统处理时间大于等于分区上的时间就会提交该分区元数据
partition.time-extractor.timestamp-pattern:使用partition-time触发器时使用该配置项。表示从表字段中提取出表达某个分区的时间的格式,需要提取到的时间必须为yyyy-MM-dd HH:mm:ss的格式。比如字段dt的格式为yyyy-MM-dd,则配置为$dt 00:00:00则表示分区时间取值为dt的value的0点0分0秒,可以选择多个表字段组合。当表字段无法抽取出符合的格式时,则使用自定义提取器partition.time-extractor.class。
sink.partition-commit.delay: 表示watermark允许event time的最大乱序时间,使用partition-time触发器时可以使用,默认为0s
sink.partition-commit.policy.kind:分区提交方式,多选,可选值为metastore、success-file、custom,metastore表示写入元数据库,success-file表示往hdfs分区目录写入一个标志文件,custom表示使用自定义提交方式,通常使用metastore,success-file组合
partition.time-extractor.kind:当要使用自定义分区时间提取器时需要配置此项,值配置为custom
partition.time-extractor.class:当要使用自定义分区时间提取器时需要配置此项,值配置为自定义提取器的类路径。在集群中运行时,需要把该类打成jar包放到flink lib目录下。
某个分区触发提交后,后续再有此分区的数据进来,仍然会写入hive该分区。
作者:spongebobZ
链接:https://www.jianshu.com/p/295066a24092
来源:简书

1、hive创建分区表并插入数据

create table userinfo(
name string,
age int
)
partitioned by (dt string)
stored as orc
tblproperties('sink.partition-commit.trigger' = 'partition-time','sink.partition-commit.policy.kind'='metastore,success-file','partition.time-extractor.timestamp-pattern' ='yyyy-MM-dd HH:mm:ss','sink.partition-commit.delay' = '10'
);insert into table userInfo partition(dt='2023-10-26') values('zhangsan',23);
insert into table userInfo partition(dt='2023-10-26') values('lisi',26),('wangwu',27);

注意:若建表时未在tblproperties中配置恰当的sink.partition-commit.policy.kind,flink sql-client插入数据时将遇到如下报错:

Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive`.`sty`.`userInfo` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind

2、flink sql-client插入与查询数据

insert into  userinfo partition(dt='2023-10-24') values('tracy',26),('lily',27);
select *from userinfo;

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/120330.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【成功实现】CentOS磁盘扩容

对服务器磁盘扩容操作步骤 查看磁盘信息 fdisk -l 创建新分区 fdisk /dev/sda P n p … t 回车 8e w 重启虚拟机 reboot mkfs.ext4 /dev/sda4 查看磁盘信息 fdisk -l 创建物理卷 pvcreate /dev/sda4 y 创建卷组 并绑定物理卷 vgcreate centos /dev/sda4 创建逻辑…

golang小技巧

1/有时需要把json内容返回给前段进行文本编辑json字段,那么最好是能返回格式化后的json,这样对于用户编辑页方便。这时候可以利用json.MarshalIndent(data, "", "\t")来进行格式化,带有缩进的marshal。 2/对holders的填…

如何使用SpringBoot处理全局异常

如何使用SpringBoot处理全局异常 使用ControllerAdvice 和 ExceptionHandler处理全局异常 参考: ControllerAdvice ResponseBody Slf4j public class ExceptionHandler {ResponseStatus(HttpStatus.OK)org.springframework.web.bind.annotation.ExceptionHandler…

基于springboot实现校友社交平台管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现校友社交平台管理系统演示 摘要 校友社交系统提供给用户一个校友社交信息管理的网站,最新的校友社交信息让用户及时了解校友社交动向,完成校友社交的同时,还能通过论坛中心进行互动更方便。本系统采用了B/S体系的结构,使用了java技…

Linux下的文件操作和文件管理

文章目录 应用编程文件操作文件描述符open函数write函数read函数close函数lseek函数文件操作例子 文件管理文件基本知识文件类型文件共享空洞文件错误处理退出程序原子操作fcntl和ioctl截断文件stat函数软链接和硬链接 应用编程 系统调用(system call)是Linux内核提供给应用层…

【ARM Coresight 系列文章 15.2 – components power domain 详细介绍】

【ARM Coresight SoC-400/SoC-600 专栏导读】 文章目录 1.1. Coresight 电源域模型1.1.1 CDBGPWRUPREQ 和 CDBGPWRUPACK1.1.2 CSYSPWRUPREQ 和 CSYSPWRUPACK1.1.3 Power Domain ID In RomTable1.1.4 Power domain entries1.1.5 Algorithm to discover power domain IDs1.1.6 De…

便利连锁:如何增加收益?教你一招轻松搞定!

自动售货机,作为零售行业的一项颠覆性技术,正逐渐改变着我们的购物方式和商业格局。这一创新技术不仅重新定义了零售业务模式,还为企业提供了更多的机会来满足不断演变的消费者需求。 客户案例 便利连锁店 成都某便利连锁店面临一系列挑战&am…

[Pytorch] 保存模型与加载模型

1、保存模型 # 定义模型 model BPNetModel(n_featuren_feature,n_hiddenn_hidden,n_outputn_output) #调用网络# 保存模型 torch.save(model, BPNetModel0.pth) 2、加载模型 import torch## 读取模型 model torch.load(BPNetModel0.pth) 3、保存模型参数 #调用网络 mode…

ASP.NET Core3.1 API 创建(Swagger配置、数据库连接Sql Server)、开发、部署

文章目录 创建项目点击Nuget安装包删除原有controllers编辑新建controll、添加注释Startup 注册Swagger服务使用swagger中间件配置XML注释更改启动端口 launchsettings.json在startup.cs跨域处理运行 数据库设计与连接安装库新建类继承框架根据数据库表设计对应设计类在DataCon…

【QT】其他常用控件2

新建项目 lineEdit 什么都不显示(linux password) password textEdit和plainTextEdit spinBox和doubleSpinBox timeEdit、dateEdit、dateTimeEdit label 显示图案,导入资源:【QT】资源文件导入_复制其他项目中的文件到qt项目中_St…

jmeter界面压测过程卡死解决思路

1、排查压测机的资源是否充足; 2、检查jmeter压测脚本,除聚合报告的所有组件关闭; 我在压测过程中出现频繁卡死,就是查看结果数和断言结果信息量过多导致: 3、直接用非gui界面形式,也就是脚本形式压测。

【路径规划】A*算法 Java实现

A*(A-Star)算法是一种广泛使用的寻路算法,尤其在计算机科学和人工智能领域。 算法思想 通过评估函数来引导搜索过程,从而找到从起始点到目标点的最短路径。评估函数通常包括两部分:一部分是已经走过的实际距离&#x…

springcloud gateway转发后getServerName被更改的问题

该问题起源于一次将代码移植到微服务产生。当使用springcloud gateway更换掉nginx网关后,出现无法登录的情况,跟进发现转发的信息里丢失了Host MimeHeaders accept */* knife4j-gateway-code ROOT content-type application/x-www-form-urlencoded …

Linux docker 安装 部署

docker 安装 linux系统离线安装docker 如何使用docker部署c/c程序 常用命令 给予 docker 访问 gui 的权限 在 /etc/profile 末尾添加 if [ "$DISPLAY" ! "" ] thenxhost fi在执行 更新 source /etc/profiledocker下载镜像 docker search gcc #搜索d…

uniapp实现微信授权(前后端)

在使用UniApp开发微信小程序时,实现微信授权登录是一个常见的需求。本文将介绍如何在UniApp中实现微信授权登录,并涵盖前后端的实现步骤。 1. 后端配置 首先,我们需要在后端进行相关配置和处理。 在微信公众平台申请开发者账号,…

C#的DataGridView数据控件(直接访问SQL vs 通过EF实体模型访问SQL)

目录 一、在DataGridView控件中显示数据 1.直接编程访问SQL (1)源码 (2)生成效果 2.通过EF实体模型访问SQL (1)源码 (2)生成效果 二、获取DataGridView控件中的当前单元格 …

文献阅读(207)FPGA HBM

题目:HBM Connect: High-Performance HLS Interconnect for FPGA HBM时间:2021会议:FPGA研究机构:UCLA Jason Cong 题目:Demystifying the Memory System of Modern Datacenter FPGAs for Software Programmers throug…

1.AUTOSAR的架构及方法论

在15、16年之前,AUTOSAR这个东西其实是被国内很多大的OEM或者供应商所排斥的。为什么?最主要的原因还是以前采用手写底层代码+应用层模型生成代码的方式进行开发。每个供应商或者OEM都有自己的软件规范或者技术壁垒,现在提个AUTOSAR想搞统一,用一个规范来收割汽车软件供应链…

K-Means和KNN

主要区别 从无序 —> 有序 从K-Means —> KNN KNN:监督学习,类别是已知的,对已知分类的数据进行训练和学习,找到不同类的特征,再对未分类的数据进行分类。K-Means:无监督学习,事先不知道…

【Ubuntu18.04】激光雷达与相机联合标定(Livox+HIKROBOT)(一)相机内参标定

LivoxHIKROBOT联合标定——相机内参标定 引言1 海康机器人HIKROBOT SDK二次开发并封装ROS1.1 介绍1.2 安装MVS SDK1.3 封装ROS packge 2 览沃Livox SDK二次开发并封装ROS3 相机雷达联合标定——相机内参标定3.1 环境配置3.1.1 安装依赖——PCL 安装3.1.2 安装依赖——Eigen 安装…