Fink CDC数据同步(一)环境部署

1 背景介绍

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

Flink CDC 是 Apache Flink 的一组源连接器,基于数据库日志的 Change Data Caputre 技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。目前,Flink CDC 的上游已经支持了MySQL,MariaDB, RDS MySQL,Aurora MySQL,PolarDB MySQL,PostgreSQL,Oracle,MongoDB,SqlServer,OceanBase,PolarDB-X,TiDB 等丰富的数据源。Flink CDC 的下游则更加丰富,支持写入 Kafka、Pulsar 消息队列,也支持写入 Hudi、Iceberg 等数据湖,还支持写入各种数据仓库。同时,通过 Flink SQL 原生支持的 Changelog 机制,可以让 CDC 数据的加工变得非常简单。用户可以通过 SQL 便能实现数据库全量和增量数据的清洗、打宽、聚合等操作,极大地降低了用户门槛。 此外, Flink DataStream API 支持用户编写代码实现自定义逻辑,给用户提供了深度定制业务的自由度。

本文以Flink+FlinkCDC同步MySQL数据、数据入仓,数据入湖等测试为例,为日后云桥数据集成产品做准备。

框架软件版本如下:

软件

版本

Java

1.8.0_361

Mysql

8.0.32

Flink

1.16.2

Flink CDC

2.3.0

Hadoop

3.1.5.0

Hive

3.1.0.3.1.5.0-152

kafka

2.0.0.3.1.5.0-152

Hudi

0.13.0

环境部署

2 环境部署

2.1 Flink部署

本次部署以Flink单机版为例

2.1.1 下载Flink部署包并解压

# 下载Flink安装包(这里测试使用Flink16.2版本)
wget https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz# 解压
tar -xzvf flink-1.16.2-bin-scala_2.12.tgz

2.1.2 修改配置文件

修改flink-conf.yaml

在flink目录的conf下

jobmanager.rpc.address: localhost# The RPC port where the JobManager is reachable.jobmanager.rpc.port: 6123jobmanager.bind-host: localhostjobmanager.memory.process.size: 6800mtaskmanager.bind-host: 192.168.1.1taskmanager.host: 192.168.1.1# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.taskmanager.memory.process.size: 6800m# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
#
taskmanager.memory.flink.size: 6280m# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.taskmanager.numberOfTaskSlots: 4# The parallelism used for programs that did not specify and other parallelism.parallelism.default: 1jobmanager.execution.failover-strategy: region# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
#
rest.port: 8787# The address to which the REST client will connect to
#
rest.address: 192.168.1.1rest.bind-address: 192.168.1.1#设置checkpoint周期时间
execution.checkpointing.interval: 30000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE  
execution.checkpointing.mode: EXACTLY_ONCE
#设置checkpoint的存储方式
state.backend: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: file:///opt/data/flink/checkpoint
#设置savepoint的存储位置
state.savepoints.dir: file:///opt/data/flink/checkpoint
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 600000
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# 该配置用于客户端 client 连接 Flink, 将此设置为 JobManager 运行的主机名(该配置决定WEB的地址)
rest.address: 192.168.1.1
# 客户端提供对外访问的地址和端口是rest.port和rest.address
# 如果没有配置rest.bind-port, 那么其他服务也使用rest.port端口,所以只要使用其中一个启动模式,其他模式在启动时就会报错端口无法启动
# 因此配置该项后, 其他 Job 启动后,就会在 rest.bind-address 和 rest.bind-port 随机选择并占用.
rest.bind-address: 192.168.1.1
classloader.check-leaked-classloader: false

2.1.3 启动服务

进入bin目录

# 启动Flink集群
./start-cluster.sh# 停止Flink集群
#./stop-cluster.sh

会启动

StandaloneSessionClusterEntrypoint

TaskManagerRunner

  • 如果StandaloneSessionClusterEntrypoint 没有启动,则检查flink-conf.yaml有地址和端口有没有填写好,
  • TaskManagerRunner没有启动则检查

        flink/comf/masters

                192.168.1.1:8787

        taskmanager.sh

2.1.4 访问Flink UI

http://x.x.x.x:8787/#/overview

2.2 FlinkCDC

Flink CDC是Flink的一组连接器,需要连接哪个组件,则需要将对应的连接jar包放在flink安装目录下的lib即可,

以下几种情况需要进行源码编译:

  • 用户对 Flink CDC 源码进行了修改
  • Flink CDC 某依赖项的版本与运行环境不一致
  • 官方未提供最新版本 Flink CDC 二进制安装包

FlinkCDC源码地址:

GitHub - ververica/flink-cdc-connectors: CDC Connectors for Apache Flink®

如果不需要编译,选择对应的连接器和版本,可以直接下载打包好的jar

Central Repository: com/ververica

将jar包放到flink安装目录下的lib即可。

FlinkCDC 与Flink 对应关系:

Flink® CDC Version

Flink® Version

1.0.0

1.11.*

1.1.0

1.11.*

1.2.0

1.12.*

1.3.0

1.12.*

1.4.0

1.13.*

2.0.*

1.13.*

2.1.*

1.13.*

2.2.*

1.13.*, 1.14.*

2.3.*

1.13.*, 1.14.*, 1.15.*, 1.16.0

2.4.*

1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.0


系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502​​​​​​​​​​​​​​
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/667459.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【SpringBoot】RBAC权限控制

📝个页人主:五敷有你 🔥系列专栏:SpringBoot⛺️稳重求进,晒太阳 权限系统与RBAC模型 权限 为了解决用户和资源的操作关系, 让指定的用户,只能操作指定的资源。 权限功能 菜单权限&a…

windows下安装go

下载golang Go 官网下载地址: https://golang.org/dl/ Go 官方镜像站(推荐): https://golang.google.cn/dl/ 选择安装包 验证有没有安装成功 查看 go 环境 说明 : Go1.11 版本之后无需手动配置环境变量&#xff0c…

29 python快速上手

Python操作MySQL和实战 1. 事务1.1 MySQL客户端1.2 Python代码 2. 锁2.1 排它锁2.2 共享锁 3. 数据库连接池4. SQL工具类4.1 单例和方法4.2 上下文管理 5.其他总结 目标:掌握事务和锁以及Python操作MySQL的各种开发必备知识。 概要: 事务锁数据库连接池…

Weblogic反序列化漏洞分析之CVE-2021-2394

目录 简介 前置知识 Serializable示例 Externalizable示例 联系weblogic ExternalizableLite接口 ExternalizableHelperl类 JdbcRowSetImpl类 MethodAttributeAccessor类 AbstractExtractor类 FilterExtractor类 TopNAggregator$PartialResult类 SortedBag$Wrappe…

【测试运维】web自动化全知识点笔记第1篇:什么是Web自动化测试(已分享,附代码)

本系列文章md笔记(已分享)主要讨论Web自动化测试相关知识。了解什么是自动化,理解什么是自动化测试以及为什么要使用自动化测试。具体包含:WebDriver的基本操作,WebDriver的鼠标、键盘操作,下拉选择框、警告…

【教学类-46-01】吉祥字门贴1.0(华光通心圆_CNKI 文本框 空心字涂色,最好繁体字)

作品展示 背景需求: 马上就要过年了,家家户户大门上贴上对联和福字 我想用正方形红色手工纸(15CM)也做一个幼儿描线版的福字 问题一:福字顺时针旋转45度 打印纸上制作福字,需要让这个字顺时针旋转45度&am…

【C++栈和队列:数据结构中的经典组合,高效处理先进先出与后进先出问题的最佳方案】

[本节目标] 1. stack的介绍和使用 2. queue的介绍和使用 3. priority_queue的介绍和使用 4. 容器适配器 1. stack的介绍和使用 1.1 stack的介绍 1. stack是一种容器适配器,专门用在具有后进先出操作的上下文环境中,其删除只能从容器的一端进行元素的…

【DC渗透系列】DC-2靶场

arp先扫 ┌──(root㉿kali)-[~] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:6b:ed:27, IPv4: 192.168.100.251 Starting arp-scan 1.10.0 with 256 hosts (https://github.com/royhills/arp-scan) 192.168.100.1 00:50:56:c0:00:08 VMware, In…

DolphinScheduler实现隔几天调度

1.场景分析 dolphinscheduler(海豚)定时器模块-定时调度时每3秒|每3分钟|每3天这种定时,不能够跨分钟,跨小时,跨月,每次跨月等都会从每个月的第1天(第几天开始可以设定)开始重新计时…

Unity3d Cinemachine篇(四)— StateDrivenCamera

文章目录 前言使用StateDrivenCamera根据不同动画切换相机1. 创建一个游戏物体2. 创建StateDrivenCamera相机3. 创建动画4. 设置相机5. 完成 前言 上一期我们简单的使用了FreeLook相机,这次我们来使用一下StateDrivenCamera 使用StateDrivenCamera根据不同动画切换…

docker maven插件使用介绍

1、配置docker连接 开放docker tcp连接参考本专栏下令一篇文章 2、docker service窗口 3、根据dockerfile 构建镜像 注意 idea 用通过管理员身份启动,否则连不上docker 构建前添加maven goal 打包 4、运行镜像 创建容器 5、运行docker compose 报错 需要先配置d…

django微博热搜数据分析与可视化系统python毕业设计

简而言之,数据可视化是以图形方式呈现结构化或非结构化数据,从而将隐藏在数据中的信息直接呈现给人们。但是有一个陷阱:它不仅仅是使用数据可视化工具将数据转化为图形。相反,它是从数据的角度看待世界。换句话说,数据可视化的对象…

(7)【Python/机器学习/深度学习】Deep-Learning模型与算法应用—深度学习基础搭建最小神经网络

目录 一、深度学习使用python建立最简单的神经元neuron 1、人工智能&机器学习&深度学习三者关系 2、机器学习& 深度学习区别 3、神经元 4、最小神经网络模型(神经元/感知器) 5、(案例)Predicting if a person would buy life insurn…

使用vue脚手架构建项目

一、前言 * 创建好vue-cli的环境,下载好vue包依赖* 本文使用环境:vue/cli 5.0.8二、步骤 创建vueTest文件夹,管理员身份运行cmd , 进入到vueTest文件夹 执行命令vue create 你的项目名 ,这里我定义的项目名为: my-project 基于…

基于微信小程序的校园水电费管理小程序的研究与实现

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…

DAY39: 动态规划不同路径问题62

Leetcode: 62 不同路径 机器人从(0 , 0) 位置出发,到(m - 1, n - 1)终点。 基本思路 1、确定dp数组(dp table)以及下标的含义 dp[i][j] :表示从(0 ,0)出发,到(i, j) 有dp[i][j]条…

SpringBoot整合Flowable最新教程(二)启动流程

介绍 文章主要从SpringBoot整合Flowable讲起,关于Flowable是什么?数据库表解读以及操作的Service请查看SpringBoot整合Flowable最新教程(一);   其他说明:Springboot版本是2.6.13,java版本是1…

Sentinel应用笔记

概念 当A、B、G、H掉线,其他服务就没法通信了 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、流量路由、熔断降级、系统自适应过载保护、热点流量防护等多个维度保护服务的稳定性。 特性…

Vue3.0(二):Vue组件化基础 - 脚手架

Vue组件化基础 - 脚手架 Vue的组件化 我们在处理一些任务量比较庞大的工作时候,会将工作内容进行拆分,分步骤完成 而组件化的思想正式如此,对于一个庞大的项目,我们可以将其拆分成一个个的小功能,分步骤进行实现 组…

MySQL数据库基础第二篇(函数)

文章目录 一、函数介绍二、字符串函数1.练习代码2.读出结果 三、数值函数1.练习代码2.读出结果 四、日期函数1.练习代码2.读出结果 五、流程控制函数1.练习代码2.读出结果 在当代技术世界中,掌握数据库设计和操作的知识和技能,尤其是对SQL的理解&#xf…