FlinkCDC的分析和应用代码

前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算引擎的开发操作。本文将从FlinkCDC应用场景开始,然后讲述其基于Flink的实现原理和代码应用,为下一篇介绍基于Flink开发定制化引擎做铺垫。

一、FlinkCDC应用场景

经常有同事或朋友问,Flink和FlinkCDC有什么区别?

Flink是一个流数据处理计算框架,FlinkCDC是数据采集工具:

Flink应用场景对比的是Storm、Spark;

FlinkCDC应用场景对比的是Sqoop、Canal、Maxwell和KafkaConnectSource、Debezium等;

FlinkCDC是Flink社区伙伴对数据采集需求,开发的一个SDK工具,让Flink在数据捕捉场景,使用起来更方便一些。

1.1 CDC的应用场景分析

       CDC的英文名是Change Data Capture (变化数据获取);解决的应用场景,是对存储中间件中数据的采集,比如Mysql、Orcle、PGSql、MongoDB等中间件;

采集的方式分为基于查询和基于BinLog两种;

以mysql的数据采集为例:可以通过jdbc批次查询,也可以通过Binlog解析增量数据采集;

两者的一些特性对比如下:

基于查询的CDC直接获取数据,基于binlog的采集需要开启binlog服务。

1.2 FlinkCDC的应用分析

       FlinkCDC和Canal实现的应用场景需求是差不多的,都是通过binlog采集增量数据;

但是可用性上的不同是:  

对于cannal类似的采集服务需要三步:

  • 1.开启mysql的binlog
  • 2.将数据写到kafka
  • 3.用flink订阅kafka中的数据进行业务需求处理

对于FlinkCDC:只需要在binlog开启后,直接在一个Flink任务内做业务处理(可以写到kafka处理也行);

       所有如cannal的采集功能服务,都需要单独维护一套服务,增加了运维负担,FlinkCDC可以当作任务部署到集群,大幅减轻了数据采集的应用难度;

用一个任务就完成这个应用功能:

二、FlinkCDC技术分析与本地操作

2.1 FlinkCDC的技术架构分析

       与Canal这些提供服务能力的服务不同,FlinkCdc只是一个任务,可以简单的开发和部署。

       Flink是借用了Debezium的功能,Debezium是一个可轻量级嵌入代码逻辑的服务,将Debezium的采集功能,用Flink的sourceFunction包装,然后打包成SDK提供给Flink开发使用;

       借用Flink自己的算子和sink能力,可以将采集到的数据以Flink的特性加工数据,并将数据写入Flink内置的connect组件,sink到服务里,如Kafka、Pulser、ES、RabbitMQ、MongoDB等。

2.2 本地操作
2.2.1准备mysql数据库表和数据
use flink_test;
#检测binlog是否开启
show variables like '%log_bin%'#构建测试表
CREATE TABLE `event_info` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL,`category` varchar(512) DEFAULT NULL,`pv` int DEFAULT 0,`uv` int DEFAULT 0,PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;#写入数据
insert  into `event_info`(`id`,`name`,`category`,`pv`,`uv`) values 
(1,'aaa','nfh',20,8),
(2,'bbb','dfgf',30,2),
(3,'ccc','fsd',40,4),
(4,'ddd','afs',50,7),
(5,'eee','asfa',60,3)
(6,'aaa','nfh',20,8),
(7,'bbb','dfgf',30,2),
(8,'ccc','fsd',40,4),
(9,'ddd','afs',50,7),
(10,'eee','asfa',60,3);
2.2.2 pom文件

       注意Flink和FlinkCDC的版本映射,很多显示的可以关联的版本之间是冲突的,这是一个很繁琐的工作,我调试各个版本之间的映射,花了一天左右的时间[求赞求收藏];

下面这是Flink1.14.5版本和FlinkCDC2.2.1版本已经调好的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>changedateDoris</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12</scala.version><java.version>1.8</java.version><flink.version>1.14.5</flink.version><fastjson.version>1.2.62</fastjson.version><hadoop.version>2.8.3</hadoop.version><scope.mode>compile</scope.mode><slf4j.version>1.7.30</slf4j.version></properties><dependencies><!--        springboot 依赖--><!-- flink  --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- Add log dependencies when debugging locally --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><!-- mysql-connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.2.1 </version><exclusions><exclusion><artifactId>flink-shaded-guava</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies><build>
<!--        <filters>-->
<!--            <filter>${project.basedir}/src/main/resources/env/application-${profileActive}.properties</filter>-->
<!--        </filters>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><artifactSet><includes><include>*:*</include></includes><excludes><exclude>org.slf4j:slf4j-api:jar:</exclude></excludes></artifactSet><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.factories</resource></transformer></transformers></configuration></execution></executions></plugin><plugin><artifactId>maven-resources-plugin</artifactId><configuration><encoding>utf-8</encoding><useDefaultDelimiters>true</useDefaultDelimiters><delimiters><delimiter>$[*]</delimiter></delimiters></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>single</goal></goals><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
</project>
2.2.3 java代码
package yto.com.net.demo;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class mysqlBinlogRead {private static final Logger log = LoggerFactory.getLogger(mysqlBinlogRead.class);public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("ip").port(3306).databaseList("flink_test").tableList("flink_test.event_info").username("mysqlUser").password("password").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
//                .startupOptions(StartupOptions.earliest()).build();Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8083);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// enable checkpointenv.enableCheckpointing(10000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");SingleOutputStreamOperator<String> process = cdcSource.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String row, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");JSONObject source = rowJson.getJSONObject("source");String table = source.getString("table");}});cdcSource.print("message=:");env.execute("flinkCdc Read message");}}
2.2.4 运行结果

如图所示,已经写入库的结果通过connect获取,增量数据通过binlog获取;

注意:表中的历史数据过多,全量读取的时候将会内存溢出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/617977.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

U盘用完到底能不能直接拔?一篇搞懂

有没有人懂这种情况&#xff01;&#xff01; 传输完文件之后&#xff0c;觉得大功告成 以十分帅气的姿势 and 迅雷不及掩耳之势 “咻”地一下把U盘直接给……拔掉了…… 然后瞬间想起没有安全退出&#xff0c;陷入深深的懊悔…… &#xff08;甚至还要再花时间&#xff0…

WebServer 跑通/运行/测试(详解版)

&#x1f442; 椿 - 沈以诚 - 单曲 - 网易云音乐 目录 &#x1f382;前言 &#x1f33c;跑通 &#xff08;1&#xff09;系统环境 &#xff08;2&#xff09;克隆源码 &#xff08;3&#xff09;安装和配置 Mysql &#xff08;4&#xff09;写 sql 语句 &#xff08;5&…

win11下载Hbuliderx 安装闪退解决教程+安装包分享

在官网下载 目录 在官网下载 出现闪退 下载失败 2.2. 最终在百度网盘里下载了历史版本 2.3. 然后解压文件 2.4. 双击打开 2.5. 安装成功 出现闪退 下载失败 结果下载失败&#xff0c;一下子弹出的下载框就会闪退 2.2. 最终在百度网盘里下载了历史版本 下载的网盘链接: …

黑马苍穹外卖学习Day5

文章目录 Redis学习Redis简介准备工作Redis常用数据类型介绍各数据类型的特点Redis常用命令字符串操作命令哈希操作命令列表操作命令集合操作命令有序集合操作命令通用操作命令 在Java中操作Redis导入Spring Data Redis坐标配置Redis数据源编写配置类&#xff0c;创建RedisTemp…

linux多进程基础(2):僵尸进程以及解决方法wait()函数(大白话解释)

在我的linux多线程多进程基础专栏中,已和大家一起分享了僵尸线程.在这一篇文章中我将分享僵尸进程以及解决方法wait()函数. 1.僵尸进程 什么是僵尸进程呢?用最通俗易懂的话来说就是子进程执行结束的时候其父进程并没有及时回收该子进程导致成为僵尸进程.如果僵尸进程数量较多…

10分钟快速搭建个人博客、文档网站!

本文来分享 8 个现代化前端工具&#xff0c;帮你快速生成个人博客、文档网站&#xff01; VitePress VitePress 是一款静态站点生成器&#xff0c;专为构建快速、以内容为中心的网站而设计。简而言之&#xff0c;VitePress 获取用 Markdown 编写的源内容&#xff0c;为其应用…

python24.1.13for循环

对列表、字典、字符串等进行迭代 range

Legion R7000 2021(82JW)原装出厂Win10/WIN11系统预装OEM系统镜像

LENOVO联想拯救者R7000 2021款(82JW)笔记本电脑原厂Windows10/11系统 链接&#xff1a;https://pan.baidu.com/s/1m_Ql5qu6tnw62PbpvXB0hQ?pwd6ek4 提取码&#xff1a;6ek4 原装出厂系统自带所有驱动、出厂主题壁纸、系统属性专属联机支持标志、系统属性专属联想的LOGO标…

88.乐理基础-记号篇-反复记号(二)D.C.、D.S.、Fine、Coda

内容参考于&#xff1a;三分钟音乐社 上一个内容&#xff1a;87.乐理基础-记号篇-反复记号&#xff08;一&#xff09;反复、跳房子-CSDN博客 下图红色左括号框起来的东西&#xff0c;它们都相对比较抽象一点&#xff0c;这几个词都是意大利语 首先D.C.这个标记&#xff0c;然…

7 - MySQL主从同步|主从同步模式

MySQL主从同步&#xff5c;主从同步模式 MySQL主从同步主从同步介绍主从同步工作过程主从同步结构模式配置主从同步一主一从同步结构一主多从同步结构主从从同步结构主主同步结构 主从同步模式主从同步结构模式复制模式 MySQL主从同步 主从同步介绍 存储数据的服务结构 主服务…

高效便捷的远程管理利器——Royal TSX for Mac软件介绍

Royal TSX for Mac是一款功能强大、操作便捷的远程管理软件。无论是远程桌面、SSH、VNC、Telnet还是FTP&#xff0c;用户都可以通过Royal TSX轻松地远程连接和管理各种服务器、计算机和网络设备。 Royal TSX for Mac提供了直观的界面和丰富的功能&#xff0c;让用户能够快速便…

事关年终奖,速看!年终奖应该如何设定结构,提高激励性?

随着2024年的临近&#xff0c;员工们对于年终奖的发放满怀期待&#xff0c;而企业管理者则又到了大费周章的时候。年终奖的发放方式、内容以及金额&#xff0c;成为困扰每个管理者的难题。为什么年终奖发放后&#xff0c;大家的积极性没有得到提高&#xff1f;该激励的没激励到…

Spring Boot - Application Events 的发布顺序_ApplicationReadyEvent

文章目录 Pre概述Code源码分析 Pre Spring Boot - Application Events 的发布顺序_ApplicationEnvironmentPreparedEvent 概述 Spring Boot 的广播机制是基于观察者模式实现的&#xff0c;它允许在 Spring 应用程序中发布和监听事件。这种机制的主要目的是为了实现解耦&#…

Hades-C2:一款功能强大的纯Python命令控制服务器

关于Hades-C2 Hades-C2是一款功能强大的命令控制服务器&#xff0c;该工具基于纯Python开发&#xff0c;可以帮助广大研究人员快速实现命令控制基础设施的搭建。 当前版本的Hades-C2可以用作安全分析研究或CTF比赛&#xff0c;但功能并不完善&#xff0c;目前该项目仍在积极开…

Trans论文复现:基于数据驱动的新能源充电站两阶段规划方法程序代码!

适用平台&#xff1a;MatlabYalmipCplex/Gurobi&#xff1b; 文章提出了一种电动汽车充电站的两阶段规划方法&#xff0c;第一阶段通过蒙特卡洛法模拟充电车辆需求和电池充放电数据来确定充电站位置&#xff1b;第二阶段通过数据驱动的分布鲁棒优化方法优化充电站的新能源和电池…

【惠友骨科小课堂】拇外翻常见的几个误区,来看看你中了几个?

拇外翻作为常见的足部畸形&#xff0c;在日常生活中困扰着许多人。歪脚趾不仅外观不好看&#xff0c;还会出现疼痛、影响行走运动。但大多数人对于拇外翻的认识都不足常常落入认知误区&#xff0c;快来看看你中了几个&#xff1f; 误区一Q 我都没穿过高跟鞋&#xff0c;怎么也…

爬虫实战丨基于requests爬取比特币信息并绘制价格走势图

文章目录 写在前面实验环境实验描述实验内容 写在后面 写在前面 本期内容&#xff1a;基于requests爬取比特币信息并绘制价格走势图 下载地址&#xff1a;https://download.csdn.net/download/m0_68111267/88734451 实验环境 anaconda丨pycharmpython3.11.4requests 安装r…

MySQL夯实之路-查询性能优化深入浅出

MySQL调优分析 explain&#xff1b;show status查看服务器状态信息 优化 减少子任务&#xff0c;减少子任务执行次数&#xff0c;减少子任务执行时间&#xff08;优&#xff0c;少&#xff0c;快&#xff09; 查询优化分析方法 1&#xff0e;访问了太多的行和列&#xff1…

pytorch学习笔记(十)

一、损失函数 举个例子 比如说根据Loss提供的信息知道&#xff0c;解答题太弱了&#xff0c;需要多训练训练这个模块。 Loss作用&#xff1a;1.算实际输出和目标之间的差距 2.为我们更新输出提供一定的依据&#xff08;反向传播&#xff09; 看官方文档 每个输入输出相减取…

C++(9)——内存管理

1. 内存分类&#xff1a; 在前面的文章中&#xff0c;通常会涉及到几个名词&#xff0c;例如&#xff1a;栈、堆。这两个词所代表的便是计算机内存的一部分 。在计算机中&#xff0c;对系统的内存按照不同的使用需求进行了区分&#xff0c;大致可以分为&#xff1a;栈 、堆、数…