Fink CDC数据同步(六)数据入湖Hudi

数据入湖Hudi

Apache Hudi(简称:Hudi)使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:

  • Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照,并基于此输出结果。
  • 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势(类别)。

配置

将hudi相关jar包放在flink安装目录的lib下

hudi-flink1.16-bundle-0.13.0.jar

hudi-hadoop-mr-0.13.0.jar

hudi-hive-sync-0.13.0.jar

确保/etc/profile配置了hadoop和hive的环境变量

#HADOOP_HOME
export HADOOP_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_CONF_DIR=/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop
export HADOOP_COMMON_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_HDFS_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_YARN_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_MAPRED_HOME=/usr/hdp/3.1.5.0-152/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`#HIVE HOME
export HIVE_HOME=/usr/hdp/3.1.5.0-152/hive
export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/sbin

测试插入hudi表

set sql-client.execution.result-mode = tableau;
set execution.checkpointing.interval=30sec;
SET table.sql-dialect=default;CREATE TABLE hudi_test(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi',  -- 连接器指定hudi'path' = 'hdfs://bigdata101:8020/hudi/hudi_test',  -- 数据存储地址'table.type' = 'MERGE_ON_READ' -- 表类型,默认COPY_ON_WRITE,可选MERGE_ON_READ
);INSERT INTO hudi_test VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

MySql数据写入Hudi表

建hudi表

create table hudi_user(id string not null,name string,birth string,gender string,primary key (id) not enforced
)
with ('connector' = 'hudi','path' = 'hdfs://bigdata101:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','write.option' = 'bulk_insert','write.precombine.field' = 'id'
);

将MySql映射表的数据插入hudi表,此时会生成一个flink任务

insert into ods.hudi_user select * from mysql_user;

流式查询

上面的查询方式是非流式查询,流式查询会生成一个flink作业,并且实时显示数据源变更的数据。

流式查询(Streaming Query)需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所有数据,设置值为earliest。

使用参数如下:

参数名称

是否必填

默认值

备注

read.streaming.enabled

FALSE

FALSE

设置为true,开启stream query

read.start-commit

FALSE

the latest commit

Instant time的格式为:’yyyyMMddHHmmss’

read.streaming_skip_compaction

FALSE

FALSE

是否不消费compaction commit,消费compaction commit会出现重复数据

clean.retain_commits

FALSE

10

当开启change log mode,保留的最大commit数量。如果checkpoint interval为5分钟,则保留50分钟的change log

建表:

create table hudi_user_read_streaming(id int not null ,name string,birth string,gender string,primary key (id) not enforced
)
with ('connector' = 'hudi','path' = 'hdfs://bigdata101:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','write.option' = 'bulk_insert','write.precombine.field' = 'id','read.streaming.enabled' = 'true',  -- 默认值false,设置为true,开启stream query'read.start-commit' = '20231008134557', -- start-commit之前提交的数据不显示,'read.streaming.check-interval' = '4'  -- 检查间隔,默认60s);insert into hudi_user_read_streaming select * from mysql_user;select * from hudi_user_read_streaming;

此时,执行select 语句就会生成一个flink 作业

源端变更数据会实时展示出来


 

 系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502​​​​​​​
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/668005.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LLaVA:GPT-4V(ision) 的新开源替代品

LLaVA:GPT-4V(ision) 的新开源替代品。 LLaVA (https://llava-vl.github.io/,是 Large Language 和Visual A ssistant的缩写)。它是一种很有前景的开源生成式 AI 模型,它复制了 OpenAI GPT-4 在与图像对话方面的一些功…

arping交叉编译

arping命令依赖libpcap和libnet,需要先交叉编译这两个库。 1.交叉编译libpcap 下载libpcap源文件,从github上克隆: git clone https://github.com/the-tcpdump-group/libpcap.git source交叉编译环境 # environment-setup是本机的交叉编译环境, 里面…

LabVIEW风力发电机在线监测

LabVIEW风力发电机在线监测 随着可再生能源的发展,风力发电成为越来越重要的能源形式。设计了一个基于控制器局域网(CAN)总线和LabVIEW的风力发电机在线监测系统,实现风力发电机的实时监控和故障诊断,以提高风力发电的…

windows安装Visual Studio Code,配置C/C++运行环境(亲测可行)

一.下载 Visual Studio Code https://code.visualstudio.com/ 二.安装 选择想要安装的位置: 后面的点击下一步即可。 三.下载编译器MinGW vscode只是写代码的工具,使用编译器才能编译写的C/C程序,将它转为可执行文件。 MinGW下载链接:…

Stable Diffusion 模型下载:国风3 GuoFeng3

文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十推荐提示词下载地址模型介绍 欢迎使用GuoFeng3模型 - 这是一个中国华丽古风风格模型,也可以说是一个古风游戏角色模型,具有2.5D的质感。 条目内

CDH6.3.2 多 Spark 版本共存

一 部署Spark客户端 1.1 部署spark3客户端 tar -zxvf spark-3.3.1-bin-3.0.0-cdh6.3.2.tgz -C /opt/cloudera/parcels/CDH/lib cd /opt/cloudera/parcels/CDH/lib mv spark-3.3.1-bin-3.0.0-cdh6.3.2/ spark3将 CDH 集群的 spark-env.sh 复制到 /opt/cloudera/parcels/CDH/li…

C语言函数递归例子2青蛙跳台阶问题

接下来我们来看一下第二个例子青蛙跳台阶 青蛙跳台阶问题 这个问题经常在各类面试中看到。一只青蛙一次可以跳上1级台阶,也可以跳上2级。求该青蛙跳上一个n级的台阶总共有多少种跳法。是实践函数递归的典型问题 分析问题 我们先假设有n个台阶,如果n1&am…

如何构建多种系统架构支持的 Docker 镜像

如何构建多种系统架构支持的 Docker 镜像 1.概述2.解决方案3.使用manifest案例 1.概述 我们知道使用镜像创建一个容器,该镜像必须与 Docker 宿主机系统架构一致,例如 Linux x86_64 架构的系统中只能使用 Linux x86_64 的镜像创建容器 例如我们在 Linux…

PyTorch 2.2 中文官方教程(七)

使用 torchtext 库进行文本分类 原文:pytorch.org/tutorials/beginner/text_sentiment_ngrams_tutorial.html 译者:飞龙 协议:CC BY-NC-SA 4.0 注意 点击这里下载完整示例代码 在本教程中,我们将展示如何使用 torchtext 库构建文…

Vue中nextTick方法的作用与原理

在Vue的开发中,你可能会遇到一些异步更新的问题,如在改变数据后需要等待DOM更新完毕后再进行下一步操作。这时就可以使用Vue提供的nextTick方法来解决这个问题。 nextTick方法的作用是在DOM更新之后执行回调函数,确保在下次DOM更新循环结束之…

ReactNative实现一个圆环进度条

我们直接看效果,如下图 我们在直接上代码 /*** 圆形进度条*/ import React, {useState, useEffect} from react; import Svg, {Circle,G,LinearGradient,Stop,Defs,Text, } from react-native-svg; import {View, StyleSheet} from react-native;// 渐变色 const C…

PyTorch 2.2 中文官方教程(四)

torch.nn 到底是什么? 原文:pytorch.org/tutorials/beginner/nn_tutorial.html 译者:飞龙 协议:CC BY-NC-SA 4.0 注意 点击这里下载完整示例代码 作者: Jeremy Howard,fast.ai。感谢 Rachel Thomas 和 Fr…

队列---数据结构

定义 队列(Queue)简称队,也是一种操作受限的线性表,只允许在表的一端进行插入,而在表的另一端进行删除。向队列中插入元素称为入队或进队;删除元素称为出队或离队。 队头(Front)&a…

2024年Java面试题大全 面试题附答案详解,BTA内部面试题

基础篇 1、 Java语言有哪些特点 1、简单易学、有丰富的类库 2、面向对象(Java最重要的特性,让程序耦合度更低,内聚性更高) 阿里内部资料 基本类型 大小(字节) 默认值 封装类 6、Java自动装箱与拆箱 装箱就是…

Maven提示Failure to find com.oracle:ojdbc14:jar:10.2.0.4.0

目录 问题 解决方案 1、下载oracle的驱动jar包 2、安装到本地仓库 3、检查本地仓库是否成功安装 4、Maven先clean ,再install。 问题 项目引入Oracle依赖后报错,显示为红色。 解决方案 1、下载oracle的驱动jar包 首先我们要去下载一个oracle的…

undefined symbol: avio_protocol_get_class, version LIBAVFORMAT_58

rv1126上进行编译和在虚拟机里面进行交叉编译ffmpeg都不行 解决办法查看 查看安装的ffmpeg链接的文件 ldd ./ffmpeg rootEASY-EAI-NANO:/home/nano/ffmpeg-4.3.6# ldd ffmpeg linux-vdso.so.1 (0xaeebd000)libavdevice.so.58 > /lib/arm-linux-gnueabihf/libavde…

哪些因素会限制带宽的可用性?

当我们讨论带宽的可用性时,我们主要关注的是数据传输的速度和容量。带宽就像一条公路,数据就像行驶在公路上的车辆,带宽越大,可以同时传输的数据就越多,数据传输的速度也就越快。但是,就像公路会有各种限制…

【云原生运维问题记录】kubesphere登录不跳转问题

文章目录 现象问题排查 结论先行:kubesphere-system名称空间下reids宕机重启,会判断是否通过registry-proxy重新拉取镜像,该镜像原本是通过阿里云上拉取,代理上没有出现超时情况,导致失败。解决方案:删除re…

k8s-常用工作负载控制器(更高级管理Pod)

一、工作负载控制器是什么? 二、Deploymennt控制器:介绍与部署应用 部署 三、Deployment控制器:滚动升级、零停机 方式一: 通个加入健康检查可以,看到,nginx容器逐个被替代,最终每个都升级完成&…

AI助力农作物自动采摘,基于YOLOv5全系列【n/s/m/l/x】参数模型开发构建作物生产场景下番茄采摘检测计数分析系统

去年十一那会无意间刷到一个视频展示的就是德国机械收割机非常高效自动化地24小时不间断地在超广阔的土地上采摘各种作物,专家设计出来了很多用于采摘不同农作物的大型机械,看着非常震撼,但是我们国内农业的发展还是相对比较滞后的&#xff0…