Fink CDC数据同步(一)环境部署

1 背景介绍

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

Flink CDC 是 Apache Flink 的一组源连接器,基于数据库日志的 Change Data Caputre 技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。目前,Flink CDC 的上游已经支持了MySQL,MariaDB, RDS MySQL,Aurora MySQL,PolarDB MySQL,PostgreSQL,Oracle,MongoDB,SqlServer,OceanBase,PolarDB-X,TiDB 等丰富的数据源。Flink CDC 的下游则更加丰富,支持写入 Kafka、Pulsar 消息队列,也支持写入 Hudi、Iceberg 等数据湖,还支持写入各种数据仓库。同时,通过 Flink SQL 原生支持的 Changelog 机制,可以让 CDC 数据的加工变得非常简单。用户可以通过 SQL 便能实现数据库全量和增量数据的清洗、打宽、聚合等操作,极大地降低了用户门槛。 此外, Flink DataStream API 支持用户编写代码实现自定义逻辑,给用户提供了深度定制业务的自由度。

本文以Flink+FlinkCDC同步MySQL数据、数据入仓,数据入湖等测试为例,为日后云桥数据集成产品做准备。

框架软件版本如下:

软件

版本

Java

1.8.0_361

Mysql

8.0.32

Flink

1.16.2

Flink CDC

2.3.0

Hadoop

3.1.5.0

Hive

3.1.0.3.1.5.0-152

kafka

2.0.0.3.1.5.0-152

Hudi

0.13.0

环境部署

2 环境部署

2.1 Flink部署

本次部署以Flink单机版为例

2.1.1 下载Flink部署包并解压

# 下载Flink安装包(这里测试使用Flink16.2版本)
wget https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz# 解压
tar -xzvf flink-1.16.2-bin-scala_2.12.tgz

2.1.2 修改配置文件

修改flink-conf.yaml

在flink目录的conf下

jobmanager.rpc.address: localhost# The RPC port where the JobManager is reachable.jobmanager.rpc.port: 6123jobmanager.bind-host: localhostjobmanager.memory.process.size: 6800mtaskmanager.bind-host: 192.168.1.1taskmanager.host: 192.168.1.1# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.taskmanager.memory.process.size: 6800m# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
#
taskmanager.memory.flink.size: 6280m# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.taskmanager.numberOfTaskSlots: 4# The parallelism used for programs that did not specify and other parallelism.parallelism.default: 1jobmanager.execution.failover-strategy: region# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
#
rest.port: 8787# The address to which the REST client will connect to
#
rest.address: 192.168.1.1rest.bind-address: 192.168.1.1#设置checkpoint周期时间
execution.checkpointing.interval: 30000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE  
execution.checkpointing.mode: EXACTLY_ONCE
#设置checkpoint的存储方式
state.backend: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: file:///opt/data/flink/checkpoint
#设置savepoint的存储位置
state.savepoints.dir: file:///opt/data/flink/checkpoint
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 600000
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# 该配置用于客户端 client 连接 Flink, 将此设置为 JobManager 运行的主机名(该配置决定WEB的地址)
rest.address: 192.168.1.1
# 客户端提供对外访问的地址和端口是rest.port和rest.address
# 如果没有配置rest.bind-port, 那么其他服务也使用rest.port端口,所以只要使用其中一个启动模式,其他模式在启动时就会报错端口无法启动
# 因此配置该项后, 其他 Job 启动后,就会在 rest.bind-address 和 rest.bind-port 随机选择并占用.
rest.bind-address: 192.168.1.1
classloader.check-leaked-classloader: false

2.1.3 启动服务

进入bin目录

# 启动Flink集群
./start-cluster.sh# 停止Flink集群
#./stop-cluster.sh

会启动

StandaloneSessionClusterEntrypoint

TaskManagerRunner

  • 如果StandaloneSessionClusterEntrypoint 没有启动,则检查flink-conf.yaml有地址和端口有没有填写好,
  • TaskManagerRunner没有启动则检查

        flink/comf/masters

                192.168.1.1:8787

        taskmanager.sh

2.1.4 访问Flink UI

http://x.x.x.x:8787/#/overview

2.2 FlinkCDC

Flink CDC是Flink的一组连接器,需要连接哪个组件,则需要将对应的连接jar包放在flink安装目录下的lib即可,

以下几种情况需要进行源码编译:

  • 用户对 Flink CDC 源码进行了修改
  • Flink CDC 某依赖项的版本与运行环境不一致
  • 官方未提供最新版本 Flink CDC 二进制安装包

FlinkCDC源码地址:

GitHub - ververica/flink-cdc-connectors: CDC Connectors for Apache Flink®

如果不需要编译,选择对应的连接器和版本,可以直接下载打包好的jar

Central Repository: com/ververica

将jar包放到flink安装目录下的lib即可。

FlinkCDC 与Flink 对应关系:

Flink® CDC Version

Flink® Version

1.0.0

1.11.*

1.1.0

1.11.*

1.2.0

1.12.*

1.3.0

1.12.*

1.4.0

1.13.*

2.0.*

1.13.*

2.1.*

1.13.*

2.2.*

1.13.*, 1.14.*

2.3.*

1.13.*, 1.14.*, 1.15.*, 1.16.0

2.4.*

1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.0


系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502​​​​​​​​​​​​​​
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/667459.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【SpringBoot】RBAC权限控制

📝个页人主:五敷有你 🔥系列专栏:SpringBoot⛺️稳重求进,晒太阳 权限系统与RBAC模型 权限 为了解决用户和资源的操作关系, 让指定的用户,只能操作指定的资源。 权限功能 菜单权限&a…

windows下安装go

下载golang Go 官网下载地址: https://golang.org/dl/ Go 官方镜像站(推荐): https://golang.google.cn/dl/ 选择安装包 验证有没有安装成功 查看 go 环境 说明 : Go1.11 版本之后无需手动配置环境变量&#xff0c…

29 python快速上手

Python操作MySQL和实战 1. 事务1.1 MySQL客户端1.2 Python代码 2. 锁2.1 排它锁2.2 共享锁 3. 数据库连接池4. SQL工具类4.1 单例和方法4.2 上下文管理 5.其他总结 目标:掌握事务和锁以及Python操作MySQL的各种开发必备知识。 概要: 事务锁数据库连接池…

Weblogic反序列化漏洞分析之CVE-2021-2394

目录 简介 前置知识 Serializable示例 Externalizable示例 联系weblogic ExternalizableLite接口 ExternalizableHelperl类 JdbcRowSetImpl类 MethodAttributeAccessor类 AbstractExtractor类 FilterExtractor类 TopNAggregator$PartialResult类 SortedBag$Wrappe…

【测试运维】web自动化全知识点笔记第1篇:什么是Web自动化测试(已分享,附代码)

本系列文章md笔记(已分享)主要讨论Web自动化测试相关知识。了解什么是自动化,理解什么是自动化测试以及为什么要使用自动化测试。具体包含:WebDriver的基本操作,WebDriver的鼠标、键盘操作,下拉选择框、警告…

SpringBoot RestTemplate 上传文件

SpringBoot RestTemplate 上传文件 Testpublic void testUpload() throws Exception {String url "http://127.0.0.1/file/upload";String filePath "C:\\temp\\1.png";RestTemplate rest new RestTemplate();FileSystemResource resource new FileSys…

【教学类-46-01】吉祥字门贴1.0(华光通心圆_CNKI 文本框 空心字涂色,最好繁体字)

作品展示 背景需求: 马上就要过年了,家家户户大门上贴上对联和福字 我想用正方形红色手工纸(15CM)也做一个幼儿描线版的福字 问题一:福字顺时针旋转45度 打印纸上制作福字,需要让这个字顺时针旋转45度&am…

C语言:简单排序

题目描述 输入11个整数,如果第1个数为1,则将其中的第2至11个数升序排列;如果第1个数为0,则降序排列。 输入格式 输入一行,包含11个整数,用空格符分隔。 输出格式 输出1行,包含10个顺序排列的整…

Qt程序设计-使用QSplashScreen制作开机界面

目录 QSplashScreen简介 实例演示 QSplashScreen简介 在Qt中,QSplashScreen类就是用来创建启动画面的。它是一个窗口类,可以显示一个图片,并在图片上显示一些文本信息。QSplashScreen类提供了一些方法,可以方便地设置启动画面的图片和文本,以及控制启动画面的显示和隐藏…

【C++栈和队列:数据结构中的经典组合,高效处理先进先出与后进先出问题的最佳方案】

[本节目标] 1. stack的介绍和使用 2. queue的介绍和使用 3. priority_queue的介绍和使用 4. 容器适配器 1. stack的介绍和使用 1.1 stack的介绍 1. stack是一种容器适配器,专门用在具有后进先出操作的上下文环境中,其删除只能从容器的一端进行元素的…

【DC渗透系列】DC-2靶场

arp先扫 ┌──(root㉿kali)-[~] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:6b:ed:27, IPv4: 192.168.100.251 Starting arp-scan 1.10.0 with 256 hosts (https://github.com/royhills/arp-scan) 192.168.100.1 00:50:56:c0:00:08 VMware, In…

DolphinScheduler实现隔几天调度

1.场景分析 dolphinscheduler(海豚)定时器模块-定时调度时每3秒|每3分钟|每3天这种定时,不能够跨分钟,跨小时,跨月,每次跨月等都会从每个月的第1天(第几天开始可以设定)开始重新计时…

Unity3d Cinemachine篇(四)— StateDrivenCamera

文章目录 前言使用StateDrivenCamera根据不同动画切换相机1. 创建一个游戏物体2. 创建StateDrivenCamera相机3. 创建动画4. 设置相机5. 完成 前言 上一期我们简单的使用了FreeLook相机,这次我们来使用一下StateDrivenCamera 使用StateDrivenCamera根据不同动画切换…

PFMEA的具体实施步骤都有哪些——FMEA软件免费

免费试用FMEA软件-免费版-SunFMEA 一、引言 PFMEA(Process Failure Mode and Effects Analysis)是一种用于识别、评估和优先处理生产过程中潜在失效模式的工具。它通过对生产过程中的各个环节进行深入分析,发现可能导致产品不合格、过程不稳…

docker maven插件使用介绍

1、配置docker连接 开放docker tcp连接参考本专栏下令一篇文章 2、docker service窗口 3、根据dockerfile 构建镜像 注意 idea 用通过管理员身份启动,否则连不上docker 构建前添加maven goal 打包 4、运行镜像 创建容器 5、运行docker compose 报错 需要先配置d…

面了昆仑天工大模型算法岗(实习),感觉彻底凉凉。。。

我是过年某985研二,过完年打算找大厂实习 offer,本文章主要记录了本小菜研找实习的坎坷历程。 应聘岗位:昆仑天工大模型算法工程师 面试轮数:第一面 1. 自我介绍 在自我介绍环节,我清晰地阐述了个人基本信息、教育背景…

django微博热搜数据分析与可视化系统python毕业设计

简而言之,数据可视化是以图形方式呈现结构化或非结构化数据,从而将隐藏在数据中的信息直接呈现给人们。但是有一个陷阱:它不仅仅是使用数据可视化工具将数据转化为图形。相反,它是从数据的角度看待世界。换句话说,数据可视化的对象…

(7)【Python/机器学习/深度学习】Deep-Learning模型与算法应用—深度学习基础搭建最小神经网络

目录 一、深度学习使用python建立最简单的神经元neuron 1、人工智能&机器学习&深度学习三者关系 2、机器学习& 深度学习区别 3、神经元 4、最小神经网络模型(神经元/感知器) 5、(案例)Predicting if a person would buy life insurn…

使用vue脚手架构建项目

一、前言 * 创建好vue-cli的环境,下载好vue包依赖* 本文使用环境:vue/cli 5.0.8二、步骤 创建vueTest文件夹,管理员身份运行cmd , 进入到vueTest文件夹 执行命令vue create 你的项目名 ,这里我定义的项目名为: my-project 基于…

基于微信小程序的校园水电费管理小程序的研究与实现

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…