10 Flink CDC

10 Flink CDC

  • 1. CDC是什么
  • 2. CDC 的种类
  • 3. 传统CDC与Flink CDC对比
  • 4. Flink-CDC 案例
  • 5. Flink SQL 方式的案例

1. CDC是什么

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC 技术应用场景非常广泛:
数据同步,用于备份,容灾;
数据分发,一个数据源分发给多个下游;
数据采集(E),面向数据仓库/数据湖的 ETL 数据集成。

2. CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
在这里插入图片描述

3. 传统CDC与Flink CDC对比

  1. 传统 CDC ETL 分析
    在这里插入图片描述

  2. 基于 Flink CDC 的 ETL 分析
    在这里插入图片描述

  3. 基于 Flink CDC 的聚合分析
    在这里插入图片描述

  4. 基于 Flink CDC 的数据打宽
    在这里插入图片描述

4. Flink-CDC 案例

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
开源地址:https://github.com/ververica/flink-cdc-connectors。
示例代码:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class FlinkCDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点
续传,需要从 Checkpoint 或者 Savepoint 启动程序//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");//3.创建 Flink-MySQL-CDC 的 Source//initial (default): Performs an initial snapshot on the monitored database tables upon 
first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first 
startup, just read from the end of the binlog which means only have the changes since the 
connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first 
startup, and directly read binlog from the specified timestamp. The consumer will traverse the 
binlog from the beginning and ignore change events whose timestamp is smaller than the 
specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon 
first startup, and directly read binlog from the specified offset.DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder().hostname("hadoop01").port(3306).username("root").password("000000").databaseList("gmall-flink").tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会
读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.startupOptions(StartupOptions.initial()).deserializer(new StringDebeziumDeserializationSchema()).build();//4.使用 CDC Source 从 MySQL 读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);//5.打印数据mysqlDS.print();//6.执行任务env.execute();} 
}

5. Flink SQL 方式的案例

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建 Flink-MySQL-CDC 的 SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop01'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'gmall-flink'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/68765.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【PyTorch】6.张量运算函数:一键开启!PyTorch 张量函数的宝藏工厂

目录 1. 常见运算函数 个人主页&#xff1a;Icomi 专栏地址&#xff1a;PyTorch入门 在深度学习蓬勃发展的当下&#xff0c;PyTorch 是不可或缺的工具。它作为强大的深度学习框架&#xff0c;为构建和训练神经网络提供了高效且灵活的平台。神经网络作为人工智能的核心技术&…

Python-基于PyQt5,wordcloud,pillow,numpy,os,sys等的智能词云生成器

前言&#xff1a;日常生活中&#xff0c;我们有时后就会遇见这样的情形&#xff1a;我们需要将给定的数据进行可视化处理&#xff0c;同时保证呈现比较良好的量化效果。这时候我们可能就会用到词云图。词云图&#xff08;Word cloud&#xff09;又称文字云&#xff0c;是一种文…

DeepSeek-R1论文研读:通过强化学习激励LLM中的推理能力

DeepSeek在朋友圈&#xff0c;媒体&#xff0c;霸屏了好长时间&#xff0c;春节期间&#xff0c;研读一下论文算是时下的回应。论文原址&#xff1a;[2501.12948] DeepSeek-R1: Incentivizing Reasoning Capability in LLMs via Reinforcement Learning 摘要&#xff1a; 我们…

【深度分析】DeepSeek大模型技术解析:从架构到应用的全面探索

深度与创新&#xff1a;AI领域的革新者 DeepSeek&#xff0c;这个由幻方量化创立的人工智能公司推出的一系列AI模型&#xff0c;不仅在技术架构上展现出了前所未有的突破&#xff0c;更在应用领域中开启了无限可能的大门。从其混合专家架构&#xff08;MoE&#xff09;到多头潜…

万物皆有联系:驼鸟和布什

布什&#xff1f;一块布十块钱吗&#xff1f;不是&#xff0c;大家都知道&#xff0c;美国有两个总统&#xff0c;叫老布什和小布什&#xff0c;因为两个布什总统&#xff08;父子俩&#xff09;&#xff0c;大家就这么叫来着&#xff0c;目的是为了好区分。 布什总统的布什&a…

Leetcode:350

1&#xff0c;题目 2&#xff0c;思路 首先判断那个短为什么呢因为我们用短的数组去挨个点名长的数组主要用map装长的数组max判断map里面有几个min数组的元素&#xff0c;list保存交集最后用数组返回list的内容 3&#xff0c;代码 import java.util.*;public class Leetcode…

Spring Boot 热部署实现指南

在开发 Spring Bot 项目时&#xff0c;热部署功能能够显著提升开发效率&#xff0c;让开发者无需频繁重启服务器就能看到代码修改后的效果。下面为大家详细介绍一种实现 Spring Boot 热部署的方法&#xff0c;同时也欢迎大家补充其他实现形式。 步骤一、开启 IDEA 自动编译功能…

LogicFlow 一款流程图编辑框架

LogicFlow是什么 LogicFlow是一款流程图编辑框架&#xff0c;提供了一系列流程图交互、编辑所必需的功能和灵活的节点自定义、插件等拓展机制。LogicFlow支持前端自定义开发各种逻辑编排场景&#xff0c;如流程图、ER图、BPMN流程等。在工作审批流配置、机器人逻辑编排、无代码…

Git进阶之旅:tag 标签 IDEA 整合 Git

第一章&#xff1a;tag 标签远程管理 git 标签 tag 管理&#xff1a; 标签有两种&#xff1a; 轻量级标签(lightweight)带有附注标签(annotated) git tag 标签名&#xff1a;创建一个标签git tag 标签名 -m 附注内容 &#xff1a;创建一个附注标签git tag -d 标签名…

riscv xv6学习笔记

文章目录 前言util实验sleeputil实验pingpongutil实验primesxv6初始化代码分析syscall实验tracesyscall实验sysinfoxv6内存学习笔记pgtbl实验Print a page tablepgtbl实验A kernel page table per processxv6 trap学习trap实验Backtracetrap实验Alarmlazy实验Lazy allocationxv…

Contrastive Imitation Learning

机器人模仿学习中对比解码的一致性采样 摘要 本文中&#xff0c;我们在机器人应用的对比模仿学习中&#xff0c;利用一致性采样来挖掘演示质量中的样本间关系。通过在排序后的演示对比解码过程中&#xff0c;引入相邻样本间的一致性机制&#xff0c;我们旨在改进用于机器人学习…

Baklib揭示内容中台与人工智能技术的创新协同效应

内容概要 在当今信息爆炸的时代&#xff0c;内容的高效生产与分发已成为各行业竞争的关键。内容中台与人工智能技术的结合&#xff0c;为企业提供了一种新颖的解决方案&#xff0c;使得内容创造的流程更加智能化和高效化。 内容中台作为信息流动的核心&#xff0c;能够集中管…

[论文阅读] (37)CCS21 DeepAID:基于深度学习的异常检测(解释)

祝大家新春快乐&#xff0c;蛇年吉祥&#xff01; 《娜璋带你读论文》系列主要是督促自己阅读优秀论文及听取学术讲座&#xff0c;并分享给大家&#xff0c;希望您喜欢。由于作者的英文水平和学术能力不高&#xff0c;需要不断提升&#xff0c;所以还请大家批评指正&#xff0…

JVM方法区

一、栈、堆、方法区的交互关系 二、方法区的理解: 尽管所有的方法区在逻辑上属于堆的一部分&#xff0c;但是一些简单的实现可能不会去进行垃圾收集或者进行压缩&#xff0c;方法区可以看作是一块独立于Java堆的内存空间。 方法区(Method Area)与Java堆一样&#xff0c;是各个…

火语言RPA--文本内容提取

&#x1f6a9;【组件功能】&#xff1a;通过前后截取、通配符参数组合或纯正则方式提取源字符串中指定的文本内容 配置预览 配置说明 源内容 支持T或# 默认FLOW输入项 进行处理、匹配的对象&#xff0c;若为空&#xff0c;以上一个组件的输出为源内容。 提取方式 前后截取…

JVM的GC详解

获取GC日志方式大抵有两种 第一种就是设定JVM参数在程序启动时查看&#xff0c;具体的命令参数为: -XX:PrintGCDetails # 打印GC日志 -XX:PrintGCTimeStamps # 打印每一次触发GC时发生的时间第二种则是在服务器上监控:使用jstat查看,如下所示&#xff0c;命令格式为jstat -gc…

芯片AI深度实战:给vim装上AI

系列文章&#xff1a; 芯片AI深度实战&#xff1a;私有模型deep seek r1&#xff0c;必会ollama-CSDN博客 芯片AI深度实战&#xff1a;自己的AI&#xff0c;必会LangChain-CSDN博客 芯片AI深度实战&#xff1a;给vim装上AI-CSDN博客 芯片AI深度实战&#xff1a;火的编程AI&…

供应链系统设计-供应链中台系统设计(十四)- 清结算中心设计篇(三)

关于清结算中心的设计&#xff0c;我们之前的两篇文章中&#xff0c;对于业务诉求的好的标准进行了初步的描述&#xff0c;如果没有看的同学可以参考一下两篇文章进行了解&#xff0c;这样更有利于理解本篇的内容。链接具体如下&#xff1a; 供应链系统设计-供应链中台系统设计…

搭建自己的专属AI——使用Ollama+AnythingLLM+Python实现DeepSeek本地部署

前言 最近DeepSeek模型非常火&#xff0c;其通过对大模型的蒸馏得到的小模型可以较轻松地在个人电脑上运行&#xff0c;这也使得我们有机会在本地构建一个专属于自己的AI&#xff0c;进而把AI“调教”为我们希望的样子。本篇文章中我将介绍如何使用OllamaAnythingLLMPython实现…

Golang 并发机制-1:Golang并发特性概述

并发是现代软件开发中的一个基本概念&#xff0c;它使程序能够同时执行多个任务&#xff0c;从而提高效率和响应能力。在本文中&#xff0c;我们将探讨并发性在现代软件开发中的重要性&#xff0c;并深入研究Go处理并发任务的独特方法。 并发的重要性 增强性能 并发在提高软…