Flink CDC的使用

MySQL数据准备

create database if not exists test;
use test;
drop table if exists stu;
create table stu (id int primary key auto_increment, name varchar(100), age int);
insert into stu(name, age) values("张三",18);
insert into stu(name, age) values("李四",20);
insert into stu(name, age) values("王五",21);

注意:表必须有主键

开启MySQL binlog

修改MySQL配置,开启binlog

$ sudo vim /etc/my.cnf,添加如下设置

server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=test

注意:启用binlog的数据库,需根据实际情况作出修改

重启mysql

$ sudo systemctl restart mysqld

代码开发

依赖

Flink CDC依赖

<!--cdc 依赖--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency>

完整依赖

    <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><flink-cdc.vesion>2.4.0</flink-cdc.vesion></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><!--目前中央仓库还没有 jdbc的连接器,暂时用一个快照版本--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-changelog</artifactId><version>${flink.version}</version><scope>runtime</scope></dependency><dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--cdc 依赖--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.vesion}</version></dependency></dependencies>

 Flink代码

Flink CDC捕获MySQL变更数据(增加、修改、删除),输出到控制台。

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkCDCDemo {public static void main(String[] args) throws Exception {// 环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);// 数据源MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node4").port(3306).username("root").password("000000").databaseList("test").tableList("test.stu").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"mysql_source").setParallelism(1);// 处理数据// 输出数据dataStreamSource.print();// 执行env.execute();}
}

运行程序,确保程序无报错,看到如下输出:

18:58:51,826 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource   - Keepalive thread is running

测试

添加数据

mysql添加数据

mysql> insert into stu(name, age) values("赵六",23);

IDEA控制台输出

{"before":null,"after":{"id":4,"name":"赵六","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831654000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2300,"row":0,"thread":13,"query":null},"op":"c","ts_ms":1719831654692,"transaction":null}

格式化输出

{"before": null,"after": {"id": 4,"name": "赵六","age": 23},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719831654000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2300,"row": 0,"thread": 13,"query": null},"op": "c","ts_ms": 1719831654692,"transaction": null
}

关注before、after符合增加数据的逻辑,op为c表示添加数据

修改数据

mysql修改数据

mysql> update stu set name="zl", age=19 where name="赵六";

IDEA控制台输出

{"before":{"id":4,"name":"赵六","age":23},"after":{"id":4,"name":"zl","age":19},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831987000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2604,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1719831987238,"transaction":null}

格式化输出

{"before": {"id": 4,"name": "赵六","age": 23},"after": {"id": 4,"name": "zl","age": 19},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719831987000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2604,"row": 0,"thread": 13,"query": null},"op": "u","ts_ms": 1719831987238,"transaction": null
}

关注before、after符合更新的逻辑,op为u表示更新数据

删除数据

mysql删除数据

mysql> delete from stu where id=4;

IDEA控制台输出

{"before":{"id":4,"name":"zl","age":19},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719832151000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2913,"row":0,"thread":13,"query":null},"op":"d","ts_ms":1719832151198,"transaction":null}
​

格式化输出

{"before": {"id": 4,"name": "zl","age": 19},"after": null,"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719832151000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2913,"row": 0,"thread": 13,"query": null},"op": "d","ts_ms": 1719832151198,"transaction": null
}

关注before、after符合删除的逻辑,op为d表示删除数据

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/38794.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ESOP 系统助力电子设备公司的管理模式升级

在科技飞速发展的时代&#xff0c;电子设备行业竞争愈发激烈&#xff0c;企业要想在市场中立足并持续发展&#xff0c;不断升级管理模式成为关键。ESOP系统的引入&#xff0c;为电子设备公司带来了全新的机遇&#xff0c;有力地推动了管理模式的升级。 ESOP 系统首先为电子设备…

element el-table表格切换分页保留分页数据+限制多选数量

el-table表格并没有相关的方法来禁用表头里面的多选按钮 那么我们可以另辟蹊径&#xff0c;来实现相同的多选切换分页&#xff08;保留分页数据&#xff09; 限制多选数量的效果 <el-table:data"tableData"style"width: 100%">// 不使用el-talbe自带…

农村程序员陈随易2024年中总结

今天是 2024年7月1日&#xff0c;时间如白驹过隙&#xff0c;今年已去其一半。 总结一下今年上半年的情况&#xff0c;给大家提供一些参考和建议。 希望大家关注一下公众号 陈随易&#xff0c;有些内容只在公众号发表。 先看看我的年初计划&#xff0c;这个在今年年初的时候&…

泛微E9开发 限制明细表列的值重复

限制明细表列的值重复 1、需求说明2、实现方法3、扩展知识点3.1 修改单个字段值&#xff08;不支持附件类型&#xff09;3.1.1 格式3.1.2 参数3.1.3 案例 3.2 获取明细行所有行标示3.2.1 格式3.2.2 参数说明 1、需求说明 限制明细表的“类型”字段&#xff0c;在同一个流程表单…

【全网首发】双字重叠语序验证码识别

【省流&#xff1a;打算直接测试效果的可以访问这个网址】 http://decaptcha.ai?project_namenetease_zh_overlap 【实现方案】 如图所示&#xff0c;我们能看到&#xff0c;比起以往的“单个字”语序点选&#xff0c;这个验证码的难点在于“重叠汉字“&#xff0c;我们知道…

【Python机器学习】模型评估与改进——简单的网格搜索

为了提升模型的泛化性能&#xff0c;我们可以通过调参来实现。 在尝试调参之前&#xff0c;重要的是理解参数的含义&#xff0c;找到一个模型的重要参数&#xff08;提供最佳泛化性能的参数&#xff09;的取值是一项棘手的任务&#xff0c;但对于几乎所有模型和数据集来说都是…

API-Window对象

学习目标&#xff1a; 掌握Window对象 学习内容&#xff1a; BOM&#xff08;浏览器对象模型&#xff09;定时器-延时函数JS执行机制location对象navigation对象history对象 BOM&#xff08;浏览器对象模型&#xff09;&#xff1a; BOM是浏览器对象模型。 window对象是一个全…

Windows 11的市场份额越来越大了,推荐你升级!

7月1日&#xff0c;系统之家发布最新数据&#xff0c;显示Windows 11操作系统的市场份额正在稳步上升。自2021年10月Windows 11发布以来&#xff0c;Windows 10一直占据着市场主导地位&#xff0c;当时其市场份额高达81.44%。然而&#xff0c;随着时间的推移&#xff0c;Window…

鸿蒙学习1:ArkTS基础入门

1 变量和常量 1.1 变量 常见的基础数据类型&#xff1a; string 字符串、number 数字、boolean布尔 判断。 变量&#xff1a;专门用来存储数据的容器。 语法&#xff1a;let 变量名: 数据类型 值。例如&#xff1a;let name: 张三;let price:number 12.4; let isSuccess …

【triton-inference-server】 官方python_backend 文档及例子

https://github.com/triton-inference-server/python_backend#building-from-source 一。 从源码构建python_backend root@ubuntu-server:/home/ubuntu/hzh# sudo apt-get install rapidjson-dev libarchive-dev zlib1g-dev Reading package lists... Done Building dependency…

vue3中的自定义指令

全局自定义指令 假设我们要创建一个全局指令v-highlight&#xff0c;用于高亮显示元素。这个指令将接受一个颜色参数&#xff0c;并有一个可选的修饰符bold来决定是否加粗文本。 首先&#xff0c;在创建Vue应用时定义这个指令&#xff1a;&#xff08;这里可以将指令抽离成单…

昂科烧录器支持BPS晶丰明源半导体的多相Buck控制器BPD93004E

芯片烧录行业领导者-昂科技术近日发布最新的烧录软件更新及新增支持的芯片型号列表&#xff0c;其中BPS晶丰明源半导体的多相Buck控制器BPD93004E已经被昂科的通用烧录平台AP8000所支持。 BPD93004E是一款多相Buck控制器&#xff0c;支持原生1~4相&#xff0c;数字方式控制&am…

科普文:一文搞懂jvm原理(二)类加载器

概叙 科普文&#xff1a;一文搞懂jvm(一)jvm概叙-CSDN博客 前面我们介绍了jvm&#xff0c;jvm主要包括两个子系统和两个组件&#xff1a; Class loader(类装载器) 子系统&#xff0c;Execution engine(执行引擎) 子系统&#xff1b;Runtime data area (运行时数据区域)组件&am…

Cambrian-1: A Fully Open, Vision-Centric Exploration of Multimodal LLMs

摘要 https://arxiv.org/pdf/2406.16860v1 我们介绍了Cambrian-1&#xff0c;这是一系列以视觉为中心的多模态大型语言模型&#xff08;MLLMs&#xff09;。尽管更强大的语言模型可以增强多模态能力&#xff0c;但视觉组件的设计选择往往没有得到充分的探索&#xff0c;并且与…

学习笔记(linux高级编程)9

void pthread_cleanup_push(void (*routine)(void *)&#xff0c; void *arg); 功能&#xff1a;注册一个线程清理函数 参数&#xff0c;routine&#xff0c;线程清理函数的入口 arg&#xff0c;清理函数的参数。 返回值&#xff0c;无 void pthread_cleanup_pop(int execute)…

Perl语言入门指南

一、绪论 1.1 Perl语言概述 1.2 Perl的特色 1.3 Perl面临的问题 1.4 Perl语言的应用领域 二、Perl语言基础 2.1 Perl语言的历史发展 2.2 Perl语言的基本语法 2.3 Perl语言的数据类型 三、Perl语言控制结构 3.1 条件语句 3.2 循环结构 3.3 函数和子程序 四、Perl语…

OpenStack开源虚拟化平台(一)

目录 一、OpenStack背景介绍&#xff08;一&#xff09;OpenStack是什么&#xff08;二&#xff09;OpenStack的主要服务 二、计算服务Nova&#xff08;一&#xff09;Nova组件介绍&#xff08;二&#xff09;Libvirt简介&#xff08;三&#xff09;Nova中的RabbitMQ解析 OpenS…

MySQL-数据操作类型的角度理解 S锁 X锁

文章目录 1、S锁和S锁互相兼容2、S锁和X锁互斥3、X锁和X锁也互斥4、X锁和S锁也互斥5、select * from account for update;6、select * from account for update nowait;7、select * from account for update skip locked; 1、S锁和S锁互相兼容 2、S锁和X锁互斥 3、X锁和X锁也互…

20240702 每日AI必读资讯

&#x1f50d;GPTPdf&#xff1a;使用类似GPT-4o的多模态LLM分析PDF文件 - 使用类似 GPT-4o 多模态模型解析 PDF 文件&#xff0c;转换为 Markdown 格式。 - 代码简洁高效&#xff0c;仅293行。 - 解析结果几乎完美包括排版、数学公式、表格、图片、图表等内容。 &#x1…

【记录】IDEA2023的激活与安装

前言&#xff1a; 记录IDEA2023的激活与安装 第一步&#xff1a;官网下载安装包&#xff1a; 下载地址&#xff1a;https://www.jetbrains.com/idea/download/other.html 这个最好选择2023版本&#xff0c;用着很nice。 安装步骤就不详解了&#xff0c;无脑下一步就可以了…