使用Flink CDC实时监控MySQL数据库变更

在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。

环境准备

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
  1. 获取Flink执行环境

首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 启用检查点和设置并行度

为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。

env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
  1. 使用Debezium Source读取MySQL的binlog

接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海.hostname("localhost") // MySQL的IP地址.port(3306) // MySQL的端口.username("root") // MySQL的用户名.password("123456") // MySQL的密码.databaseList("my_db") // 监控的数据库.tableList("my_db.user") // 监控的数据库下的表.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化.startupOptions(StartupOptions.initial()) // 启动选项.build();

这里 JsonDebeziumDeserializationSchema类的代码如下:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/**
*  自定义DeserializationSchema进行反序列化。
*/public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject result = new JSONObject();String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value  = (Struct)sourceRecord.value();//获取before数据Struct before = value.getStruct("before");JSONObject beforeJson = getJson(before);//获取after数据Struct after = value.getStruct("after");JSONObject afterJson = getJson(after);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//将字段写入JSON对象result.put("database",database);result.put("tableName",tableName);result.put("type",operation);result.put("before",beforeJson);result.put("after",afterJson);//输出数据collector.collect(result.toJSONString());}/***  获取字段值并写入result对象* @param before* @return*/private JSONObject getJson(Struct before) {JSONObject jsonObject = new JSONObject();if(before != null){Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);jsonObject.put(field.name(), beforeValue);}}return jsonObject;}@Overridepublic TypeInformation getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
  1. 添加数据源并打印数据

将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。

DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
  1. 启动任务

最后,启动Flink作业,开始处理数据流。

env.execute("Flink-CDC");

6.测试

在这里插入图片描述

总结

通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/858342.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

端到端自动驾驶的基础概念

欢迎大家关注我的B站&#xff1a; 偷吃薯片的Zheng同学的个人空间-偷吃薯片的Zheng同学个人主页-哔哩哔哩视频 (bilibili.com) 目录 1.端到端自动驾驶的定义 1.1特斯拉FSD 1.2端到端架构演进 1.3大模型 1.4世界模型 1.5纯视觉传感器 2.落地的挑战 1.端到端自动驾驶的定…

MySQL----利用Mycat配置读写分离

首先确保主从复制是正常的&#xff0c;具体步骤在MySQL----配置主从复制。MySQL----配置主从复制 环境 master(CtenOS7)&#xff1a;192.168.200.131 ----ifconfig查看->ens33->inetslave(win10)&#xff1a;192.168.207.52 ----ipconfig查看->无线局域网适配器 WLA…

whiteboard - 笔记

1 drawio draw.io GitHub - jgraph/drawio: draw.io is a JavaScript, client-side editor for general diagramming. 2 demo 可以将XML数据保存到服务器上的data目录。需要在服务器端创建一个接收和处理POST请求的脚本,该脚本将接收到的SVG数据保存到指定的文件中。下面是…

libssh-cve_2018_10933-vulfocus

1.原理 ibssh是一个用于访问SSH服务的C语言开发包&#xff0c;它能够执行远程命令、文件传输&#xff0c;同时为远程的程序提供安全的传输通道。server-side state machine是其中的一个服务器端状态机。 在libssh的服务器端状态机中发现了一个逻辑漏洞。攻击者可以MSG_USERA…

基于CDMA的多用户水下无线光通信(3)——解相关多用户检测

继续上一篇博文&#xff0c;本文将介绍基于解相关的多用户检测算法。解相关检测器的优点是因不需要估计各个用户的接收信号幅值而具有抗远近效应的能力。常规的解相关检测器有运算量大和实时性差的缺点&#xff0c;本文针对异步CDMA的MAI主要来自干扰用户的相邻三个比特周期的特…

【c2】编译预处理,gdb,makefile,文件,多线程,动静态库

文章目录 1.编译预处理&#xff1a;C源程序 - 编译预处理【#开头指令和特殊符号进行处理&#xff0c;删除程序中注释和多余空白行】- 编译2.gdb调试&#xff1a;多进/线程中无法用3.makefile文件&#xff1a;make是一个解释makefile中指令的命令工具4.文件&#xff1a;fprint/f…

Git分支的状态存储——stash命令的详细用法

文章目录 6.6 Git的分支状态存储6.6.1 git stash命令6.6.2 Git存储的基本使用6.6.3 Git存储的其他用法6.6.4 Git存储与暂存区6.6.5 Git存储的原理 6.6 Git的分支状态存储 有时&#xff0c;当我们在项目的一部分上已经工作一段时间后&#xff0c;所有东西都进入了混乱的状态&am…

AG32 MCU是否支持DFU下载实现USB升级

1、AG32 MCU是否支持DFU下载实现USB升级呢&#xff1f; 先说答案是NO. STM32 可以通过内置DFU实现USB升级&#xff0c;AG32 MCU目前不支持。但用户可以自己写一个DFU&#xff0c; 作为二次boot. 2、AG32 MCU可支持的下载方式有哪些呢&#xff1f; 我们AG32裸机下载只支持uart和…

四川汇聚荣科技有限公司怎么样?

在探讨一家科技公司的综合实力时&#xff0c;我们往往从多个维度进行考量&#xff0c;包括但不限于公司的发展历程、产品与服务的质量、市场表现、技术创新能力以及企业文化。四川汇聚荣科技有限公司作为一家位于中国西部的科技企业&#xff0c;其表现和影响力自然也受到业界和…

测试服务器端口是否打开,服务器端口开放异常的解决方法

在进行服务器端口开放性的测试时&#xff0c;我们通常使用网络工具来验证目标端口是否响应特定的协议请求。常用的工具包括Telnet、Nmap、nc&#xff08;netcat&#xff09;等。这些工具可以通过发送TCP或UDP数据包到指定的IP地址和端口&#xff0c;然后分析返回的数据包&#…

【FreeRTOS】任务管理与调度

文章目录 调度&#xff1a;总结 调度&#xff1a; 相同优先级的任务轮流运行最高优先级的任务先运行 可以得出结论如下&#xff1a; a 高优先级的任务在运行&#xff0c;未执行完&#xff0c;更低优先级的任务无法运行b 一旦高优先级任务就绪&#xff0c;它会马上运行&#xf…

Postman Postman接口测试工具使用简介

Postman这个接口测试工具的使用做个简单的介绍&#xff0c;仅供参考。 插件安装 1&#xff09;下载并安装chrome浏览器 2&#xff09;如下 软件使用说明

从零入手人工智能(5)—— 决策树

1.前言 在上一篇文章《从零入手人工智能&#xff08;4&#xff09;—— 逻辑回归》中讲述了逻辑回归这个分类算法&#xff0c;今天我们的主角是决策树。决策树和逻辑回归这两种算法都属于分类算法&#xff0c;以下是决策树和逻辑回归的相同点&#xff1a; 分类任务&#xff1…

椭圆的矩阵表示法

椭圆的矩阵表示法 flyfish 1. 标准几何表示法 标准几何表示法是通过椭圆的几何定义来表示的&#xff1a; x 2 a 2 y 2 b 2 1 \frac{x^2}{a^2} \frac{y^2}{b^2} 1 a2x2​b2y2​1其中&#xff0c; a a a 是椭圆的长半轴长度&#xff0c; b b b 是椭圆的短半轴长度。 2.…

三十八篇:架构大师之路:探索软件设计的无限可能

架构大师之路&#xff1a;探索软件设计的无限可能 1. 引言&#xff1a;架构的艺术与科学 在软件工程的广阔天地中&#xff0c;系统架构不仅是设计的骨架&#xff0c;更是灵魂所在。它如同建筑师手中的蓝图&#xff0c;决定了系统的结构、性能、可维护性以及未来的扩展性。本节…

AWS-PatchAsgInstance自动化定时ASG组打补丁

问题 需要给AWS的EC2水平自动扩展组AutoScaling Group&#xff08;ASG&#xff09;中的EC2自动定期打补丁。 创建自动化运行IAM角色 找到创建角色入口页面&#xff0c;如下图&#xff1a; 开始创建Systems Manager自动化运行的IAM角色&#xff0c;如下图&#xff1a; 设置…

2023-2024 学年第二学期小学数学六年级期末质量检测模拟(制作:王胤皓)(90分钟)

word效果预览&#xff1a; 一、我会填 1. 1.\hspace{0.5em} 1. 一个多位数&#xff0c;亿位上是次小的素数&#xff0c;千位上是最小的质数的立方&#xff0c;十万位是 10 10 10 和 15 15 15 的最大公约数&#xff0c;万位是最小的合数&#xff0c;十位上的数既不是质数也…

体验了一下AI生产3D模型有感

我的实验路子是想试试能不能帮我建一下实物模型 SO 我选择了一个成都环球中心的网图 但是生成的结果掺不忍睹&#xff0c;但是看demo来看&#xff0c;似乎如果你能给出一张干净的提示图片&#xff0c;他还是能做出一些东西的 这里我延申的思考是这个物体他如果没看过背面&…

大型企业网络DHCP服务器配置安装实践@FreeBSD

企业需求 需要为企业里的机器配置一台DHCP服务器。因为光猫提供DHCP服务的能力很差&#xff0c;多机器dhcp多机器NAT拓扑方式机器一多就卡顿。使用一台路由器来进行子网络的dhcp和NAT服务&#xff0c;分担光猫负载&#xff0c;但是还有一部分机器需要放到光猫网络&#xff0c;…

torchinfo这个包中的summary真的很好用

1.安装直接使用 pip 进行安装即可&#xff1a; pip install torchinfo 2.导入该模块 from torchinfo import summary 3.使用模块 summary(model)#这里的model是你自己的model&#xff0c;可以添加参数进去 4.效果图&#xff1a; 第一个图片是直接打印model吗&#xff0c;…