6 Hive引擎集成Apache Paimon

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

在实际工作中,我们通查会使用Flink计算引擎去读写Paimon,但是在批处理场景中,更多的是使用Hive去读写Paimon,这样操作起来更加方便。

前面我们在Flink代码里面,借助于Hive Catalog,实现了在Flink中创建Paimon表,写入数据,并且把paimon的元数据信息保存在Hive Metastore里面,这样创建的表是可以被Hive识别并且操作的。

但是最直接的肯定是在Hive中直接创建Paimon类型的表,并且读写数据。

Paimon目前可以支持Hive 3.1, 2.3, 2.2, 2.1 and 2.1-cdh-6.3这些版本的操作。

但是需要注意,如果Hive的执行引擎使用的是Tez,那么只能读取Paimon,无法向Paimon中写入数据。如果Hive的执行引擎使用的是MR,那么读写都是支持的。

在Hive中配置Paimon依赖

想要在Hive中操作Paimon,首先需要在Hive中配置Paimon的依赖,此时我们需要用到一个jar包:paimon-hive-connector
我们目前使用的Hive是3.1.2版本的,所以需要下载对应版本的paimon-hive-connector jar包。

https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-3.1/0.5.0-incubating/paimon-hive-connector-3.1-0.5.0-incubating.jar

将这个jar包上传到bigdata04机器(hive客户端机器)的hive安装目录中:

[root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[root@bigdata04 apache-hive-3.1.2-bin]# mkdir auxlib
[root@bigdata04 apache-hive-3.1.2-bin]# cd auxlib/
[root@bigdata04 auxlib]# ll
total 34128
-rw-r--r--. 1 root root 34945743 Sep 13  2023 paimon-hive-connector-3.1-0.5.0-incubating.jar

注意:需要在hive安装目录中创建auxlib目录,然后把jar包上传到这个目录中,这样会被自动加载。

如果我们在操作Hive的时候使用的是beeline客户端,那么在Hive中配置好Paimon的环境之后,需要重启HiveServer2服务。

在Hive中读写Paimon表

咱们之前在Flink引擎代码中使用Hive Catalog的时候创建了一个表:p_h_t1,这个表的元数据在Hive Metastore也有存储,之前我们其实也能查看到,只是在hive中查询这个表中数据的时候报错了,其实就是因为缺少paimon-hive-connector这个jar包,现在我们再查询就可以了。

在这里我们使用Hive的beeline客户端。
注意:需要先启动HiveServer2服务。

[root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[root@bigdata04 apache-hive-3.1.2-bin]# bin/hiveserver2

查看hive中目前都有哪些表:

[root@bigdata04 apache-hive-3.1.2-bin]# bin/beeline -u  jdbc:hive2://localhost:10000 -n root
Connecting to jdbc:hive2://localhost:10000
Connected to: Apache Hive (version 3.1.2)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.2 by Apache Hive
0: jdbc:hive2://localhost:10000> SHOW TABLES;
+--------------------+
|      tab_name      |
+--------------------+
| flink_stu          |
| orders             |
| p_h_t1             |
| p_h_par             |
| s1                 |
| student_favors     |
| student_favors_2   |
| student_score      |
| student_score_bak  |
| t1                 |
+--------------------+
9 rows selected (1.663 seconds)

此时可以看到之前通过Hive Catalog写入的表:p_h_t1

查询这个表中的数据:

0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
+--------------+-------------+
| p_h_t1.name  | p_h_t1.age  |
+--------------+-------------+
| jack         | 18          |
| tom          | 20          |
+--------------+-------------+
2 rows selected (5.853 seconds)

此时就可以正常查询了。

接着我们尝试在Hive中向这个Paimon表中插入一条数据:

0: jdbc:hive2://localhost:10000> INSERT INTO p_h_t1(name,age) VALUES('jessic',19);

重新查询这个表中的最新数据:

0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
+--------------+-------------+
| p_h_t1.name  | p_h_t1.age  |
+--------------+-------------+
| jack         | 18          |
| jessic       | 19          |
| tom          | 20          |
+--------------+-------------+
3 rows selected (0.737 seconds)

在通过Hive进行查询的时候,默认查询的是表中最新快照的数据,我们也可以通过时间旅行这个特性来控制查询之前的数据。

举个例子,查询指定快照版本中的数据:

0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id=1;
No rows affected (0.011 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
+--------------+-------------+
| p_h_t1.name  | p_h_t1.age  |
+--------------+-------------+
| jack         | 18          |
| tom          | 20          |
+--------------+-------------+
2 rows selected (0.752 seconds)
0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id=2;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
+--------------+-------------+
| p_h_t1.name  | p_h_t1.age  |
+--------------+-------------+
| jack         | 18          |
| jessic       | 19          |
| tom          | 20          |
+--------------+-------------+
3 rows selected (0.692 seconds)

这样就可以实现查询历史数据的查询了。

在Hive中创建Paimon表

前面我们操作的p_h_t1这个表其实是借助于Flink引擎创建的。
下面我们来看一下在Hive中如何创建Piamon表:

0: jdbc:hive2://localhost:10000> SET hive.metastore.warehouse.dir=hdfs://bigdata01:9000/paimon;
0: jdbc:hive2://localhost:10000> CREATE TABLE IF NOT EXISTS p_h_t2(name STRING,age INT,PRIMARY KEY (name) NOT ENFORCED
)STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';

这样表就创建好了,下面我们可以在Hive中测试一下读写数据:

0: jdbc:hive2://localhost:10000> INSERT INTO p_h_t2(name,age) VALUES('tom',20);
0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t2;
Error: java.io.IOException: java.lang.RuntimeException: Fails to read snapshot from path hdfs://bigdata01:9000/paimon/default.db/p_h_t2/snapshot/snapshot-2 (state=,code=0)

注意:此时查询报错是因为找不到snapshot-2这份快照数据,目前这个表中只添加了一次数据,所以只有snapshot-1

那为什么会查找snapshot-2呢?

因为我们前面在这个会话中设置了SET paimon.scan.snapshot-id=2;,这个配置在当前会话有效。

正常情况下我们在hive中执行SET paimon.scan.snapshot-id=null;其实就可以了:

0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id=null;
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id;
+-------------------------------+
|              set              |
+-------------------------------+
| paimon.scan.snapshot-id=null  |
+-------------------------------+
1 row selected (0.009 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t2;
Error: java.io.IOException: java.lang.RuntimeException: Fails to read snapshot from path hdfs://bigdata01:9000/paimon/default.db/p_h_t2/snapshot/snapshot-2 (state=,code=0)

但是发现他还是会找snapshot-2

我们尝试重新开启一个新的会话查询也不行,就算重启hiveserver2也还是不行。

后来发现这可能是一个bug,当我们在hive会话中设置了paimon.scan.snapshot-id=2,那么之后创建的表默认就只会查询snapshot-2了,那也就意味着建表的时候会把这个参数带过去。

为了验证这个猜想,我们在flink代码中查询这个Paimon表的详细建表语句,不要在hive命令行中查看(在Hive命令行中看不到详细的参数信息)。

创建package:tech.xuwei.paimon.hivepaimon
创建object:FlinkSQLReadFromPaimo

完整代码如下:

package tech.xuwei.paimon.cdcingestionimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 使用FlinkSQL从Paimon表中读取数据* Created by xuwei*/
object FlinkSQLReadFromPaimon {def main(args: Array[String]): Unit = {//创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH(|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//读取Paimon表中的数据,并且打印输出结果tEnv.executeSql("""|SHOW CREATE TABLE p_h_t2;|""".stripMargin).print()}
}

在idea中执行代码,查看结果:

CREATE TABLE `paimon_catalog`.`default`.`p_h_t2` (`name` VARCHAR(2147483647),`age` INT
) WITH ('path' = 'hdfs://bigdata01:9000/paimon/default.db/p_h_t2','totalSize' = '0','numRows' = '0','rawDataSize' = '0','scan.snapshot-id' = '2','COLUMN_STATS_ACCURATE' = '{"BASIC_STATS":"true","COLUMN_STATS":{"age":"true","name":"true"}}','numFiles' = '0','bucketing_version' = '2','storage_handler' = 'org.apache.paimon.hive.PaimonStorageHandler'
)

在这里发现建表语句中有一个参数:'scan.snapshot-id' = '2',所以它默认会读取第2个快照。

想要解决这个问题,有两个办法。

  • 1:在hive中删除这个表,然后执行SET paimon.scan.snapshot-id=null;,再创建这个表就行了。
  • 2:如果不想删除这个表,可以在Flink代码中修改这个表,移除scan.snapshot-id属性即可,这个功能我们之前讲过。

第一种办法简单粗暴,不再演示,我们来看一下第二种办法:

创建object:FlinkSQLAlterPaimonTable

完整代码如下:

package tech.xuwei.paimon.hivepaimonimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 修改Paimon表属性* Created by xuwei*/
object FlinkSQLAlterPaimonTable {def main(args: Array[String]): Unit = {//创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH(|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//移除表中的scan.snapshot-id属性tEnv.executeSql("""|ALTER TABLE p_h_t2 RESET ('scan.snapshot-id')|""".stripMargin)//查看最新的表属性信息tEnv.executeSql("""|SHOW CREATE TABLE p_h_t2|""".stripMargin).print()}
}

执行此代码,可以看到如下结果:

CREATE TABLE `paimon_catalog`.`default`.`p_h_t2` (`name` VARCHAR(2147483647),`age` INT
) WITH ('path' = 'hdfs://bigdata01:9000/paimon/default.db/p_h_t2','totalSize' = '0','numRows' = '0','rawDataSize' = '0','COLUMN_STATS_ACCURATE' = '{"BASIC_STATS":"true","COLUMN_STATS":{"age":"true","name":"true"}}','numFiles' = '0','bucketing_version' = '2','storage_handler' = 'org.apache.paimon.hive.PaimonStorageHandler'
)

此时表中就没有scan.snapshot-id属性了。

这个时候我们再回到hive命令行中查询这个表:

0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t2;
+--------------+-------------+
| p_h_t2.name  | p_h_t2.age  |
+--------------+-------------+
| tom          | 20          |
+--------------+-------------+
1 row selected (0.46 seconds)

这样就可以正常查询了。

注意:如果此时我们在hive中删除这个表,那么对应的paimon中这个表也会被删除。

0: jdbc:hive2://localhost:10000> drop table p_h_t2;
No rows affected (0.33 seconds)

到hdfs中确认一下这个paimon表是否存在:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/p_h_t2
ls: `/paimon/default.db/p_h_t2': No such file or directory

这样就说明paimon中这个表不存在了。

不过在hdfs中会多一个这个目录,这属于一个临时目录,没什么影响,可以手工删除,不处理也没影响。

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/_tmp.p_h_t2

针对Paimon中已经存在的表,我们想要在hive中进行访问,应该如何实现呢?

此时可以借助于Hive的外部表特性来实现。
相当于是在hive中创建一个外部表,通过location指向paimon表的hdfs路径即可。

我们使用前面cdc数据采集中创建的Paimon表:cdc_chinese_code,在hive中创建一个外部表映射到这个表:

0: jdbc:hive2://localhost:10000> CREATE EXTERNAL TABLE p_h_external
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
LOCATION 'hdfs://bigdata01:9000/paimon/default.db/cdc_chinese_code';

然后就可以在hive中查询这个表了:

0: jdbc:hive2://localhost:10000> select * from p_h_external;
+------------------+--------------------+
| p_h_external.id  | p_h_external.name  |
+------------------+--------------------+
| 1                | 张三                 |
| 2                | 李四                 |
+------------------+--------------------+

此时如果我们在hive中删除这个外部表,不会影响paimon中的cdc_chinese_code表。

0: jdbc:hive2://localhost:10000> drop table p_h_external;

到hdfs中确认一下,cdc_chinese_code这个paimon表还是存在的:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/cdc_chinese_code
Found 4 items
drwxr-xr-x   - root supergroup          0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/bucket-0
drwxr-xr-x   - root supergroup          0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/manifest
drwxr-xr-x   - root supergroup          0 2029-02-27 11:26 /paimon/default.db/cdc_chinese_code/schema
drwxr-xr-x   - root supergroup          0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/snapshot

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/136889.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

物联网中的毫米波雷达:连接未来的智能设备

随着物联网(IoT)技术的飞速发展,连接设备的方式和效能变得越来越重要。毫米波雷达技术作为一种先进的感知技术,正在为物联网设备的连接和智能化提供全新的可能性。本文将深入探讨毫米波雷达在物联网中的应用,以及它是如…

高防CDN与高防服务器:为什么高防服务器不能完全代替高防CDN

在当今的数字化时代,网络安全已经成为企业不容忽视的关键问题。面对不断增长的网络威胁和攻击,许多企业采取了高防措施以保护其网络和在线资产。然而,高防服务器和高防CDN是两种不同的安全解决方案,各自有其优势和局限性。在本文中…

Apache APISIX Dashboard 未经认证访问导致 RCE(CVE-2021-45232)漏洞复现

漏洞描述 Apache APISIX 是一个动态、实时、高性能的 API 网关,而 Apache APISIX Dashboard 是一个简单易用的前端界面,用于管理 Apache APISIX。 在 2.10.1 之前的 Apache APISIX Dashboard 中,Manager API 使用了两个框架,并在…

Mysql数据库 12.SQL语言 触发器

一、触发器&#xff08;操作日志表&#xff09; 1.介绍 不需要主动调用的一种储存过程&#xff0c;是一个能够完成特定过程&#xff0c;存储在数据库服务器上的SQL片段。 对当前表中数据增删改查的一种记录<日志表>&#xff0c;根据触发器自动执行&#xff0c;记录当前…

图解三傻排序 选择排序、冒泡排序、插入排序

&#xff08;1&#xff09;选择排序 // 交换 void swap(int arr[], int i, int j) {int tmp arr[i];arr[i] arr[j];arr[j] tmp; }// 选择排序 void selectionSort(int arr[],int len) {if (len < 2) return;for (int minIndex, i 0; i < len - 1; i) {minIndex i;f…

【11】使用透视投影建立一个3D空间的测试

核心操作&#xff1a; 1.proj view model 这三个矩阵 glm::mat4 mvp m_Proj * m_View * model; m_Shader->Bind(); m_Shader->SetUniformMat4f("u_MVP", mvp);着色器里面就&#xff1a; proj:投影矩阵&#xff0c;可以选择正交投影&#xff0c;或者透视投影…

使用 OpenTracing 和 LightStep 监控无服务器功能

无服务器功能的采用在企业组织内达到了创纪录的水平。有趣的是&#xff0c;鉴于越来越多的采用和兴趣&#xff0c;许多监控解决方案孤立了在这些环境中执行的代码的性能&#xff0c;或者仅提供有关执行的基本指标。为了了解应用程序的性能&#xff0c;我想知道存在哪些瓶颈、时…

实操创建属于自己的亚马逊云科技VPS服务:Amazon Lightsail

本文主要讲述如何独立创建自己的亚马逊云科技VPS服务&#xff0c;希望此文能帮助你对亚马逊云科技VPS服务也就是Amazon Lightsail&#xff0c;有个新的认识&#xff0c;对所使用的VPS有所帮助。 Amazon Lightsail是一款无论云计算的新手还是专家&#xff0c;都可通过其快速启动…

Kafka Rebanlace次数过高问题

Kafka Rebanlace次数过高问题 环境&#xff1a; Kafka Server 2.6.x Kafka Client Java 2.8.2 缘起&#xff1a; 最近发现Kafka Rebalance次数着实有点多&#xff0c;一天达到了六十多次&#xff0c;感觉不太正常&#xff0c;于是查了下日志发现&#xff1a; Offset commit c…

将 Ordinals 与比特币智能合约集成:第 4 部分

控制 BSV-20 代币的分配 在上一篇文章中&#xff0c;我们展示了智能合约可以在铸造后控制 BSV-20 代币的转移。 今天&#xff0c;我们演示如何控制此类代币的分发/发行。 无Tick模式 BSV-20 在 V2 中引入了无Tick模式&#xff0c;并采用了与 V1 不同的方法。 部署 (Deploy) …

程序员们保住自己饭碗

在现代社会中&#xff0c;程序员扮演着至关重要的角色。他们不仅仅是编写代码的人&#xff0c;更是保障数字世界安全稳定的守护者。随着科技的迅猛发展&#xff0c;程序员保住自己饭碗的护城河变得愈发重要。本文将探讨程序员如何通过不断学习、技术创新和软实力的发展&#xf…

Queue 中 poll()和 remove()的区别(详解)

系列文章目录 1.SpringBoot整合RabbitMQ并实现消息发送与接收 2. 解析JSON格式参数 & 修改对象的key 3. VUE整合Echarts实现简单的数据可视化 4. List&#xff1c;HashMap&#xff1c;String,String&#xff1e;&#xff1e;实现自定义字符串排序&#xff08;key排序、Val…

使用EvoMap/Three.js模拟无人机灯光秀

一、创建地图对象 首先我们需要创建一个EM.Map对象&#xff0c;该对象代表了一个地图实例&#xff0c;并设置id为"map"的文档元素作为地图的容器。 let map new EM.Map("map",{zoom:22.14,center:[8.02528, -29.27638, 0],pitch:71.507,roll:2.01,maxPit…

代码随想录算法训练营Day 47 || 198.打家劫舍、213.打家劫舍II、337.打家劫舍 III

198.打家劫舍 力扣题目链接(opens new window) 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系…

【Cheat Engine7.5】基础教程第三关(步骤4)

文章目录 一、简介二、操作步骤2.1、加载进程2.2、查找健康数据2.2.1、首次扫描(单浮点数100)2.2.2、点击打我&#xff0c;再次扫描数值97.112.2.3、修改数据值为50002.2.4、测试正常 2.3、查找弹药数据2.3.1、双浮点数1002.3.2、点击开火2.3.3、修改数据2.3.4、测试 2.4、通关…

11.3 校招 实习 内推 面经

绿*泡*泡&#xff1a; neituijunsir 交流裙 &#xff0c;内推/实习/校招汇总表格 1、校招 | 奇瑞汽车2024届“奇稷生”全球招募 研发 专场进行中 校招 | 奇瑞汽车2024届“奇稷生”全球招募 研发 专场进行中 2、校招 | 航天十院2024届秋季校园招聘进行中 校招 | 航天十院20…

程序员的“护城河”

程序员的“护城河”可以包括以下几方面&#xff1a; 技术能力和经验&#xff1a;作为程序员&#xff0c;深入了解编程语言、算法、数据结构和软件开发原理&#xff0c;可以积累丰富的经验&#xff0c;并将其转化为实际的技术能力。这些能力是程序员的核心竞争力&#xff0c;是…

微头条项目实战:通过postman测试登录验证请求

1、CrosFilter package com.csdn.headline.filters; import jakarta.servlet.*; import jakarta.servlet.http.HttpServletResponse; import java.io.IOException; public class CrosFilter implements Filter {/*** 过滤器方法&#xff0c;用于处理HTTP请求* param servletReq…

674. 最长连续递增序列 718. 最长重复子数组 1143.最长公共子序列 1035.不相交的线

674. 最长连续递增序列 题目&#xff1a; 给定一个未经排序的整数数组nums&#xff0c;找到最长且 连续递增的子序列&#xff0c;并返回该序列的长度。 dp数组含义&#xff1a; dp[i]&#xff1a;以下标i为结尾的连续递增的子序列长度为dp[i]。 递推公式&#xff1a; 怎么…

ai 问答时刻

妙啊 这很快 相当棒