使用Flink CDC实时监控MySQL数据库变更

在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。

环境准备

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
  1. 获取Flink执行环境

首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 启用检查点和设置并行度

为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。

env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
  1. 使用Debezium Source读取MySQL的binlog

接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海.hostname("localhost") // MySQL的IP地址.port(3306) // MySQL的端口.username("root") // MySQL的用户名.password("123456") // MySQL的密码.databaseList("my_db") // 监控的数据库.tableList("my_db.user") // 监控的数据库下的表.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化.startupOptions(StartupOptions.initial()) // 启动选项.build();

这里 JsonDebeziumDeserializationSchema类的代码如下:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/**
*  自定义DeserializationSchema进行反序列化。
*/public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject result = new JSONObject();String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value  = (Struct)sourceRecord.value();//获取before数据Struct before = value.getStruct("before");JSONObject beforeJson = getJson(before);//获取after数据Struct after = value.getStruct("after");JSONObject afterJson = getJson(after);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//将字段写入JSON对象result.put("database",database);result.put("tableName",tableName);result.put("type",operation);result.put("before",beforeJson);result.put("after",afterJson);//输出数据collector.collect(result.toJSONString());}/***  获取字段值并写入result对象* @param before* @return*/private JSONObject getJson(Struct before) {JSONObject jsonObject = new JSONObject();if(before != null){Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);jsonObject.put(field.name(), beforeValue);}}return jsonObject;}@Overridepublic TypeInformation getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
  1. 添加数据源并打印数据

将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。

DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
  1. 启动任务

最后,启动Flink作业,开始处理数据流。

env.execute("Flink-CDC");

6.测试

在这里插入图片描述

总结

通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/858342.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Qt、QtCreator和QMake中配置GCC编译开关

背景介绍 近期,我尝试使用Qt Creator 1.3.2、Qt 4.6.2和GCC 4.4.0(32位版本)在Windows 7(64位)上编译一个使用一些实验性的C++0x扩展的应用程序,结果遇到了以下致命错误: This file requires compiler and library support for the upcoming ISO C++ standard, C++0x.…

mysql 如何分配root账号创建数据库的权限

1.mysql 如何分配root账号创建数据库的权限 在 MySQL 中&#xff0c;root 用户通常具有所有的权限&#xff0c;包括创建数据库的权限。但是&#xff0c;如果我们想要为另一个用户分配创建数据库的权限&#xff0c;或者想要限制 root 用户对某个特定数据库或服务器的权限&#…

端到端自动驾驶的基础概念

欢迎大家关注我的B站&#xff1a; 偷吃薯片的Zheng同学的个人空间-偷吃薯片的Zheng同学个人主页-哔哩哔哩视频 (bilibili.com) 目录 1.端到端自动驾驶的定义 1.1特斯拉FSD 1.2端到端架构演进 1.3大模型 1.4世界模型 1.5纯视觉传感器 2.落地的挑战 1.端到端自动驾驶的定…

网络安全的重要性

网络安全的重要性 网络安全是指保护网络系统免受未授权的访问、攻击、破坏或未经授权的数据泄露的能力。随着互联网的普及和数字化进程的加速&#xff0c;网络安全问题日益凸显&#xff0c;成为个人、企业和国家必须面对的重要挑战。 网络安全的威胁 网络安全威胁包括黑客攻…

MySQL----利用Mycat配置读写分离

首先确保主从复制是正常的&#xff0c;具体步骤在MySQL----配置主从复制。MySQL----配置主从复制 环境 master(CtenOS7)&#xff1a;192.168.200.131 ----ifconfig查看->ens33->inetslave(win10)&#xff1a;192.168.207.52 ----ipconfig查看->无线局域网适配器 WLA…

whiteboard - 笔记

1 drawio draw.io GitHub - jgraph/drawio: draw.io is a JavaScript, client-side editor for general diagramming. 2 demo 可以将XML数据保存到服务器上的data目录。需要在服务器端创建一个接收和处理POST请求的脚本,该脚本将接收到的SVG数据保存到指定的文件中。下面是…

C语言封装获取本机IP地址的程序

文章目录 0.概要1. 设计2. 完整的代码ip_address.hip_address.cmain.c编译命令执行结果 0.概要 本文介绍用C语言编写一个函数来获取本机的IP地址。 1. 设计 将获取IP地址的逻辑封装到一个独立的函数中&#xff0c;并定义一个结构体来存储IP地址和接口名称。 将获取IP地址的逻…

libssh-cve_2018_10933-vulfocus

1.原理 ibssh是一个用于访问SSH服务的C语言开发包&#xff0c;它能够执行远程命令、文件传输&#xff0c;同时为远程的程序提供安全的传输通道。server-side state machine是其中的一个服务器端状态机。 在libssh的服务器端状态机中发现了一个逻辑漏洞。攻击者可以MSG_USERA…

基于CDMA的多用户水下无线光通信(3)——解相关多用户检测

继续上一篇博文&#xff0c;本文将介绍基于解相关的多用户检测算法。解相关检测器的优点是因不需要估计各个用户的接收信号幅值而具有抗远近效应的能力。常规的解相关检测器有运算量大和实时性差的缺点&#xff0c;本文针对异步CDMA的MAI主要来自干扰用户的相邻三个比特周期的特…

【c2】编译预处理,gdb,makefile,文件,多线程,动静态库

文章目录 1.编译预处理&#xff1a;C源程序 - 编译预处理【#开头指令和特殊符号进行处理&#xff0c;删除程序中注释和多余空白行】- 编译2.gdb调试&#xff1a;多进/线程中无法用3.makefile文件&#xff1a;make是一个解释makefile中指令的命令工具4.文件&#xff1a;fprint/f…

Git分支的状态存储——stash命令的详细用法

文章目录 6.6 Git的分支状态存储6.6.1 git stash命令6.6.2 Git存储的基本使用6.6.3 Git存储的其他用法6.6.4 Git存储与暂存区6.6.5 Git存储的原理 6.6 Git的分支状态存储 有时&#xff0c;当我们在项目的一部分上已经工作一段时间后&#xff0c;所有东西都进入了混乱的状态&am…

AG32 MCU是否支持DFU下载实现USB升级

1、AG32 MCU是否支持DFU下载实现USB升级呢&#xff1f; 先说答案是NO. STM32 可以通过内置DFU实现USB升级&#xff0c;AG32 MCU目前不支持。但用户可以自己写一个DFU&#xff0c; 作为二次boot. 2、AG32 MCU可支持的下载方式有哪些呢&#xff1f; 我们AG32裸机下载只支持uart和…

模拟木马程序自动运行:Linux下的隐蔽攻击技术

模拟木马程序自动运行&#xff1a;Linux下的隐蔽攻击技术 在网络安全领域&#xff0c;木马程序是一种常见的恶意软件&#xff0c;它能够悄无声息地在受害者的系统中建立后门&#xff0c;为攻击者提供远程访问权限。本文将探讨攻击者如何在Linux系统中模拟木马程序的自动运行&a…

四川汇聚荣科技有限公司怎么样?

在探讨一家科技公司的综合实力时&#xff0c;我们往往从多个维度进行考量&#xff0c;包括但不限于公司的发展历程、产品与服务的质量、市场表现、技术创新能力以及企业文化。四川汇聚荣科技有限公司作为一家位于中国西部的科技企业&#xff0c;其表现和影响力自然也受到业界和…

Python实现基于深度学习的电影推荐系统

Python实现基于深度学习的电影推荐系统 项目背景 在数字化娱乐时代&#xff0c;用户面临着海量的电影选择。为了帮助用户找到符合个人口味的佳片&#xff0c;MovieRecommendation项目提供了一个基于深度学习的个性化电影推荐系统。该系统利用深度学习技术&#xff0c;根据用户…

测试服务器端口是否打开,服务器端口开放异常的解决方法

在进行服务器端口开放性的测试时&#xff0c;我们通常使用网络工具来验证目标端口是否响应特定的协议请求。常用的工具包括Telnet、Nmap、nc&#xff08;netcat&#xff09;等。这些工具可以通过发送TCP或UDP数据包到指定的IP地址和端口&#xff0c;然后分析返回的数据包&#…

【FreeRTOS】任务管理与调度

文章目录 调度&#xff1a;总结 调度&#xff1a; 相同优先级的任务轮流运行最高优先级的任务先运行 可以得出结论如下&#xff1a; a 高优先级的任务在运行&#xff0c;未执行完&#xff0c;更低优先级的任务无法运行b 一旦高优先级任务就绪&#xff0c;它会马上运行&#xf…

Postman Postman接口测试工具使用简介

Postman这个接口测试工具的使用做个简单的介绍&#xff0c;仅供参考。 插件安装 1&#xff09;下载并安装chrome浏览器 2&#xff09;如下 软件使用说明

函数模板和类模板的区别

函数模板和类模板在C中都是重要的泛型编程工具&#xff0c;但它们之间存在一些显著的区别。以下是它们之间的主要区别&#xff1a; 实例化方式&#xff1a; 函数模板&#xff1a;隐式实例化。当函数模板被调用时&#xff0c;编译器会根据传递给它的参数类型自动推断出模板参数…

从零入手人工智能(5)—— 决策树

1.前言 在上一篇文章《从零入手人工智能&#xff08;4&#xff09;—— 逻辑回归》中讲述了逻辑回归这个分类算法&#xff0c;今天我们的主角是决策树。决策树和逻辑回归这两种算法都属于分类算法&#xff0c;以下是决策树和逻辑回归的相同点&#xff1a; 分类任务&#xff1…