Flink-cdc更好的流式数据集成工具

What’s Flink-cdc?

在这里插入图片描述

Flink CDC 是基于Apache Flink的一种数据变更捕获技术,用于从数据源(如数据库)中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作,将这些变更事件转化为流式数据,并能够对这些事件进行实时处理和分析。

Flink CDC提供了与各种数据源集成的功能,包括常见的关系型数据库(如MySQL、PostgreSQL、Oracle等)以及NoSQL数据库(如MongoDB、HBase等)。它通过监控数据库的日志或轮询方式来捕获数据变更,并将变更事件作为数据流发送到Flink的任务中进行处理。

Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:

✅ 端到端的数据集成框架
✅ 为数据集成的用户提供了易于构建作业的 API
✅ 支持在 Source 和 Sink 中处理多个表
✅ 整库同步
✅具备表结构变更自动同步的能力(Schema Evolution)

在使用者的角度,就是Flink-cdc可以简化流处理的流程:

  • 引入Flink-cdc之前流处理流程
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/449d813da3f945cc9974baba563f6424.png

  • 引入Flink-cdc之后后流处理流程
    在这里插入图片描述

如上所示,在flink-cdc被引入后大大简化了流处理流程

Flink-cdc支持的链接及对应的版本

Pipeline Connectors
在这里插入图片描述
Source Connectors
在这里插入图片描述截止目前(2024-05-23)

Flink-cdc与Flink对应对影版本的关系

在这里插入图片描述截止目前(2024-05-23)

flink-connector-mysql-cdc 实例分析

示例代码

demo代码:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MySqlSourceDemo {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("mysql-server-host").port(3306).databaseList("mydb") // 设置捕获的数据库.tableList("mydb.products") // 设置捕获的表,如果需要同步整个数据库,请将 tableList 设置为 ".*".
//                .tableList(".*") // 捕获整个数据库的表
//                .tableList("^(?!mysql|information_schema|performance_schema).*") // 设置捕获的表,排除系统库
//                .tableList("mydb.(?!products|orders).*") // 同步排除products和orders表之外的整个my_db库.username("flink-cdc").password("xxx").serverId("5400-5405").deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串.serverTimeZone("Asia/Shanghai") // 设置时区.startupOptions(StartupOptions.initial()).scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
//                .includeSchemaChanges(true) // 包括 schema 变更.build();org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();config.setString("rest.port", "8081");
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config); //本地环境,调试用StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置 3s 的 checkpoint 间隔env.enableCheckpointing(3000);env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("file:///tmp/ck");//本地文件系统
//        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 1.14.0 版本开始支持env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// 设置 source 节点的并行度为 4.setParallelism(5).print().setParallelism(1); // 设置 sink 节点并行度为 1env.execute("Print MySQL Snapshot + Binlog");}
}

maven依赖:

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.5</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><!-- 将 Apache Flink 的 Web 运行时模块添加到项目中 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope> <!--provided生命周期在test模式才可以运行,在main模式会找不到包--></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>provided</scope></dependency></dependencies>

日志配置文件:
log4j.properties

log4j.rootCategory=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1}:%L - %m%n

启动standalone Flink级群

# jobmanager
docker run -d \
--name flink-jm \
--hostname flink-jm \
-p 8082:8081 \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8  \
jobmanager# taskmanager
docker run -d \
--name flink-tm \
--hostname flink-tm \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8 \
taskmanager \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=5 \
-Drest.flamegraph.enabled=true

分析说明

为每个 Reader 设置不同的 Server id

每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 Server id。 MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。 因此,如果不同的作业共享相同的 Server id, 则可能导致从错误的 binlog 位置读取数据。 因此,建议通过为每个 Reader 设置不同的 Server id , 假设 Source 并行度为 4,server id 配置必须:serverId(“5400-5405”),5405-5400=5 >= 4。来为 4 个 Source readers 中的每一个分配唯一的 Server id。

查看mysql链接发现
select * from information_schema.processlist where user = ‘flink-cdc’;
在这里插入图片描述Flink-cdc对mysql的影响
正常情况下,Flink-cdc是No-lock Read,主库可以继续处理事务和查询,而不会导致主库进程阻塞,不会对主库产生直接影响。但是,在某些情况下数据同步的过程中可能会对主库产生一些间接影响,比如:网络、IO、CPU负载以及mysql的并发连接数等资源消耗。但这些对主库的开销影响相对较小(全量同步阶段可能比较耗能,但时间相对比较短)。

断点续传

通过从checkpoint/savepoint 恢复,flink-cdc可以保证断点续传。

  • 从checkpoint/savepoint恢复,缩小同步范围,例如:从tableList(“mydb.products,mydb.orders”)或tableList(“.*”) 缩小到 tableList(“mydb.products”),应用更新生效。

  • 应用从checkpoint/savepoint恢复,扩大同步范围的部分不会生效,例如:从tableList(“mydb.products”) 到 tableList(“mydb.products,mydb.orders”)或tableList(“.*”),应用更新不生效生效。若想使动态加表生效,可以显示制定scanNewlyAddedTableEnabled(true) ,来启用扫描新添加的表功能。如没有特殊情况,建议在开发环境开启此配置。

flink-cdc包名变更

Flink CDC 项目 从 2.0.0 版本将 group id 从com.alibaba.ververica 改成 com.ververica, 自 3.1 版本从将 group id 从 com.ververica 改成 org.apache.flink。 这是为了让项目更加社区中立,让各个公司的开发者共建时更方便。所以在maven仓库找 2.x 的包时,路径是 /com/ververica;找3.1及以上版本的包时,路径是/org/apache/flink

参考:
flink-cdc
flink-cdc docs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/16141.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows平台C#版RTSP转RTMP直播推送定制版

技术背景 前几年我们发布了C版的多路RTMP/RTSP转RTMP转发官方定制版。在秉承低延迟、灵活稳定、低资源占用的前提下&#xff0c;客户无需关注开发细节&#xff0c;只需图形化配置转发等各类参数&#xff0c;实现产品快速上线目的。 如监控类摄像机、NVR等&#xff0c;通过厂商…

【启程Golang之旅】深入解析函数的奥秘与技巧

欢迎来到Golang的世界&#xff01;在当今快节奏的软件开发领域&#xff0c;选择一种高效、简洁的编程语言至关重要。而在这方面&#xff0c;Golang&#xff08;又称Go&#xff09;无疑是一个备受瞩目的选择。在本文中&#xff0c;带领您探索Golang的世界&#xff0c;一步步地了…

【全开源】海报在线制作系统源码(ThinkPHP+FastAdmin+UniApp)

打造个性化创意海报的利器 引言 在数字化时代&#xff0c;海报作为一种重要的宣传媒介&#xff0c;其设计质量和效率直接影响着宣传效果。为了满足广大用户对于个性化、高效制作海报的需求&#xff0c;海报在线制作系统源码应运而生。本文将详细介绍海报在线制作系统源码的特…

AbMole - 肿瘤发展与免疫器官的“舞蹈”:一场细胞层面的时间赛跑

在生物医学领域&#xff0c;肿瘤与免疫系统之间的相互作用一直是研究的热点话题。肿瘤细胞不是孤立存在的&#xff0c;它们与宿主的免疫系统进行着一场复杂的“舞蹈”。 最近&#xff0c;一项发表在《Molecular & Cellular Proteomics》杂志上的研究&#xff0c;为我们揭开…

【C++】二分查找算法

1.题目 2.算法思路 暴力解法&#xff1a;可以将数组遍历一遍&#xff0c;就可以找到。时间复杂度为O(n)。不算太差&#xff0c;可以接受。 但是有更优秀的解法&#xff1a; 就是二分查找算法。 算法的特点&#xff1a;我们所查找的“数组”具有二段性。这里的二段性不一定有…

Oracle 并行和 session 数量的

这也就是为什么我们指定parallel为4&#xff0c;而实际并行度为8的原因。 insert create index&#xff0c;发现并行数都是加倍的 Indexes seem always created with parallel degree 1 during import as seen from a sqlfile. The sql file shows content like: CREATE INDE…

滑不动窗口的秘密—— “滑动窗口“算法 (Java版)

本篇会加入个人的所谓鱼式疯言 ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能说的不是那么严谨.但小编初心是能让更多人能接…

《python编程从入门到实践》day39

# 昨日知识点回顾 创建主页、继承模版、显示特定主题页面 # view.py from django.shortcuts import render# 导入所需数据相关联的模型 from .models import Topic# Create your views here. def index(request):"""学习笔记的主页"""#…

Java进阶学习笔记13——抽象类

认识抽象类&#xff1a; 当我们在做子类共性功能抽取的时候&#xff0c;有些方法在父类中并没有具体的体现&#xff0c;这个时候就需要抽象类了。在Java中&#xff0c;一个没有方法体的方法应该定义为抽象方法&#xff0c;而类中如果有抽象方法&#xff0c;该类就定义为抽象类…

ISCC2024个人挑战赛WP-迷失之门

&#xff08;非官方解&#xff0c;以下内容均互联网收集的信息和个人思路&#xff0c;仅供学习参考&#xff09; 迷失之门 方法一&#xff1a; IDA看一下 check函数逻辑 进入到check2函数 R键将ascii码转字符&#xff0c;写出逆向脚本 #include <stdio.h> #include &l…

Linux C++ Socket 套接字、select、poll、epoll 实例

文章目录 1. 概述2. TCP 网络编程实例2.1 服务器端2.2 客户端2.3 运行截图 3. I/O 模型3.1 阻塞式I/O模型3.2 非阻塞I/O模型3.3 I/O 复用模型3.4 信号驱动式I/O3.5 异步I/O模型 4. I/O复用之 select4.1 select 函数描述4.2 服务端代码4.3 客户端代码4.4 运行截图 5. I/O复用之 …

RocketMq局部顺序消息

package com.ldj.rocketmq.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** User: ldj* Date: 2024/5/26* Time: 15:09* Description: 局部顺序消…

css卡片翻转 父元素翻转子元素不翻转效果

css卡片翻转 父元素翻转子元素不翻转效果 vue <div class"moduleBox"><div class"headTitle"><span class"headName">大额案例</span></div><div class"moduleItem"><span class"module…

three.js判断物体在人的前面,还是后面

three.js判断物体在人的前面&#xff0c;还是后面 const player new THREE.Vectors(10, 0, 5); const mesh new THREE.Vectors(15, 0, 6);上面&#xff0c;两个变量分别表示&#xff0c;玩家的位置&#xff0c;物体的位置。 从这发现&#xff0c;当玩家和物体的角度关系 小…

spring boot 整合j2cache 项目启动警告 Redis mode [null] not defined. Using ‘single‘

好 之前的文章 spring boot 整合j2cache 基础操作 在spring boot环境中整合了 j2cache 我们 项目启动时 日志会有一个关键信息 Redis的模式 没有定义 默认使用 single Redis 的这个模式有四种 大家可以自己去网上找一下 做个了解 不用很纠结 我们直接在 j2cache.properties …

一文读懂Apollo客户端配置加载流程

本文基于 apollo-client 2.1.0 版本源码进行分析 Apollo 是携程开源的配置中心&#xff0c;能够集中化管理应用不同环境、不同集群的配置&#xff0c;配置修改后能够实时推送到应用端&#xff0c;并且具备规范的权限、流程治理等特性。 Apollo支持4个维度管理Key-Value格式的配…

比勤奋更重要的是系统思考的能力

不要在接近你问题症状的地方寻找解决办法&#xff0c;要追溯过去&#xff0c;查找问题的根源。通常&#xff0c;最有效的活动是最微妙的。有时最好按兵不动&#xff0c;使系统自我修正&#xff0c;或让系统引导行动。有时会发现&#xff0c;最好的解决办法出现在完全出乎预料的…

HTML蓝色爱心

目录 写在前面 HTML入门 完整代码 代码分析 运行结果 系列推荐 写在后面 写在前面 最近好冷吖&#xff0c;小编给大家准备了一个超级炫酷的爱心&#xff0c;一起来看看吧&#xff01; HTML入门 HTML全称为HyperText Markup Language&#xff0c;是一种标记语言&#…

C++-指针

在C中&#xff0c;指针是至关重要的组成部分。它是C语言最强大的功能之一&#xff0c;也是最棘手的功能之一。 指针具有强大的能力&#xff0c;其本质是协助程序员完成内存的直接操纵。 指针&#xff1a;特定类型数据在内存中的存储地址&#xff0c;即内存地址。 指针变量的定…

2024.5组队学习——MetaGPT(0.8.1)智能体理论与实战(下):多智能体开发

传送门&#xff1a; 《2024.5组队学习——MetaGPT&#xff08;0.8.1&#xff09;智能体理论与实战&#xff08;上&#xff09;&#xff1a;MetaGPT安装、单智能体开发》《2024.5组队学习——MetaGPT&#xff08;0.8.1&#xff09;智能体理论与实战&#xff08;中&#xff09;&…