使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

CSDN开发云
实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。

  • 1. 从源数据库(MySQL和Oracle)实时抽取数据
  • 2. 对数据进行清洗和转换
  • 3. 将转换后的数据写入目标数据库(MySQL)
    请添加图片描述

我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。

环境准备

  • 确保MySQL和Oracle数据库运行**,并创建相应的表。
  • 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。

第一步:创建源和目标数据库表

假设我们有以下三个表:

  • source_mysql_table(MySQL中的源表)
  • source_oracle_table(Oracle中的源表)
  • target_table(目标MySQL表)

MySQL源表

CREATE DATABASE source_mysql_db;
USE source_mysql_db;CREATE TABLE source_mysql_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);

Oracle源表

CREATE TABLE source_oracle_table (id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,user_id VARCHAR2(255) NOT NULL,action VARCHAR2(255) NOT NULL,timestamp VARCHAR2(255) NOT NULL,PRIMARY KEY (id)
);

目标MySQL表

CREATE DATABASE target_db;
USE target_db;CREATE TABLE target_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);

第二步:添加项目依赖

在pom.xml中添加Flink、MySQL和Oracle相关的依赖:

<dependencies><!-- Spring Boot dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!-- Oracle JDBC driver --><dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc8</artifactId><version>19.8.0.0</version></dependency>
</dependencies>

第三步:编写Flink ETL任务

创建一个Flink任务类来实现ETL逻辑。

创建一个POJO类表示数据结构

package com.example.flink;public class UserAction {private int id;private String userId;private String action;private String timestamp;// Getters and setterspublic int getId() {return id;}public void setId(int id) {this.id = id;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public String getAction() {return action;}public void setAction(String action) {this.action = action;}public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}
}

编写Flink任务类

package com.example.flink;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;@Component
public class FlinkETLJob implements CommandLineRunner {@Overridepublic void run(String... args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从MySQL读取数据DataStream<UserAction> mysqlDataStream = env.addSource(new MySQLSource());// 从Oracle读取数据DataStream<UserAction> oracleDataStream = env.addSource(new OracleSource());// 合并两个数据流DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);// 清洗和转换数据DataStream<UserAction> transformedStream = mergedStream.map(new MapFunction<UserAction, UserAction>() {@Overridepublic UserAction map(UserAction value) throws Exception {// 进行清洗和转换value.setAction(value.getAction().toUpperCase());return value;}});// 将数据写入目标MySQL数据库transformedStream.addSink(new MySQLSink());// 执行任务env.execute("Flink ETL Job");}public static class MySQLSource implements SourceFunction<UserAction> {private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";private static final String JDBC_USER = "source_user";private static final String JDBC_PASSWORD = "source_password";private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<UserAction> ctx) throws Exception {try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql = "SELECT * FROM source_mysql_table";try (PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()) {while (resultSet.next()) {UserAction userAction = new UserAction();userAction.setId(resultSet.getInt("id"));userAction.setUserId(resultSet.getString("user_id"));userAction.setAction(resultSet.getString("action"));userAction.setTimestamp(resultSet.getString("timestamp"));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次}}}@Overridepublic void cancel() {isRunning = false;}}public static class OracleSource implements SourceFunction<UserAction> {private static final String JDBC_URL = "jdbc:oracle:thin:@localhost:1521:orcl";private static final String JDBC_USER = "source_user";private static final String JDBC_PASSWORD = "source_password";private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<UserAction> ctx) throws Exception {try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql = "SELECT * FROM source_oracle_table";try (PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()) {while (resultSet.next()) {UserAction userAction = new UserAction();userAction.setId(resultSet.getInt("id"));userAction.setUserId(resultSet.getString("user_id"));userAction.setAction(resultSet.getString("action"));userAction.setTimestamp(resultSet.getString("timestamp"));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次}}}@Overridepublic void cancel() {isRunning = false;}}public static class MySQLSink extends RichFlatMapFunction<UserAction, Void> {private static final String JDBC_URL = "jdbc:mysql://localhost:3306/target_db";private static final String JDBC_USER = "target_user";private static final String JDBC_PASSWORD = "target_password";private transient Connection connection;private transient PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);String sql = "INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";statement = connection.prepareStatement(sql);}@Overridepublic void flatMap(UserAction value, Collector<Void> out) throws Exception {statement.setString(1, value.getUserId());statement.setString(2, value.getAction());statement.setString(3, value.getTimestamp());statement.executeUpdate();}@Overridepublic void close() throws Exception {super.close();if (statement != null) {statement.close();}if (connection != null) {connection.close();}}}
}

第四步:配置Spring Boot

在application.properties中添加必要的配置:

# Spring Boot configuration
server.port=8080

第五步:运行和测试

  • 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
  • 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
  • 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。

总结

通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/31341.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows C++ 应用软件开发从入门到精通详解

目录 1、引言 2、IDE 开发环境介绍 2.1、Visual Studio 2.2、QT Creator 3、Windows 平台实用小工具介绍 3.1、代码编辑器 VSCode 3.2、代码查看编辑器 Source Insight 3.3、文本编辑器 Notepad 3.4、文件搜索工具 Everything 4、 C语言特性 4.1、熟悉泛型编程 4.…

恭喜行云绽放,24年再度荣获国家鼓励的企业软件证书

在刚刚过去的五月份&#xff0c;行云绽放再次传来一个好消息&#xff0c;那就是2024年行云绽放再度荣获国家鼓励的企业软件证书。 什么是国家鼓励的企业软件证书&#xff1f; 国家鼓励的企业软件证书被称为“国家鼓励的软件企业证书”&#xff0c;这一证书由中国软件行业协会…

LangChain轻松入门和开发实践

LangChain是一个开发语言模型应用的框架。 LangChain能够简化开发与语言模型工作流中的复杂部分&#xff0c;帮助开发人员能够更轻松地进行开发&#xff0c;并定制满足需求的应用。 LangChain有两大优点&#xff0c;一是它能将外部数据&#xff0c;如文件、其他应用、API数据等…

kylinos 国产操作系统离线安装firefox 麒麟操作系统安装新版本firefox

1. 火狐地址&#xff1a; 下载 Firefox 浏览器&#xff0c;这里有简体中文及其他 90 多种语言版本供您选择 2. 选择&#xff1a; 3. 下载完之后&#xff0c;上传到离线机器 4. 解压缩&#xff1a; tar -xvjf firefox-127.0.1.tar.bz2 5. 去点击解压后的文件夹&#xff0c;找…

SpringCloud Alibaba Sentinel 流量控制之流控效果实践总结

当 QPS 超过某个阈值的时候&#xff0c;则采取措施进行流量控制。流量控制的效果包括以下几种&#xff1a;直接拒绝、Warm Up、匀速排队/排队等待。对应 FlowRule 中的 controlBehavior 字段。 注意&#xff1a;若使用除了直接拒绝之外的流量控制效果&#xff0c;则调用关系限流…

如何有效管理信息技术课堂

有效管理信息技术课堂是确保学生学习效果、维护课堂秩序和提升学生兴趣的关键。以下是一些详细的方法和策略&#xff0c;旨在帮助教师更好地管理信息技术课堂&#xff1a; 一、制定明确的课堂规则 强调课堂纪律&#xff1a;确保学生明确了解并遵守课堂纪律&#xff0c;如准时…

long long ago

一、long 众所周知&#xff0c;英文单词 long&#xff0c;表示长,长的。 但是&#xff0c;还有很多你不知道到的东西&#xff0c;根据英文单词首字母象形原则&#xff0c;我们可以做一下单词long结构分析&#xff1a; long l长 ong长 什么意思&#xff1f;就是说首字线 l…

H3C路由器密码恢复方法

H3C的路由器需要先关闭电源&#xff0c;重新启动路由器&#xff0c;注意终端上显示 press CTRLB to enter extended boot menu 的时候&#xff0c;请迅速按下ctrlB&#xff0c;这样将进入扩展启动选项。接着&#xff0c;你就可以进入路由器视图&#xff1a; 1.低端H3C设备 重启…

STM32 I2C总线锁死原因及解决方法

本文介绍STM32 I2C总线锁死原因及解决方法。 在使用STM32 I2C总线操作外设时&#xff0c;有时会遇到I2C总线锁死&#xff08;I2C总线为Busy状态&#xff09;的问题&#xff0c;即便复位MCU也无法解决&#xff0c;本文介绍其锁死的原因和解决方法&#xff0c;并给出相应的参考代…

pdf转图片转换器,pdf转图片的工具

在日常的工作和学习中&#xff0c;我们经常会遇到需要将PDF文件转换为图片格式的情况。那么&#xff0c;如何才能将PDF格式转换为图片格式呢&#xff1f;今天&#xff0c;我将为大家介绍几种简单易用的方法&#xff0c;帮助大家轻松实现PDF转图片。 打开“轻云pdf处理官网网站”…

deepin 加入甲辰计划,共建 RISC-V 繁荣生态

内容来源&#xff1a;deepin&#xff08;深度&#xff09;社区 今日&#xff0c;deepin(深度)社区宣布正式加入甲辰计划&#xff0c;致力于在下一个丙辰年&#xff08;2036龙年&#xff09;之前&#xff0c;基于RISC-V实现从数据中心到桌面办公、从移动穿戴到智能物联网全信息产…

【廉颇老矣,尚能饭否】传统的数据仓库是否还能发挥作用?

引言&#xff1a;随着数字化转型的深入和大数据技术的发展&#xff0c;大数据平台、数据中台和和数据湖技术不断涌现&#xff0c;给人感觉传统的数据仓库技术已经过时&#xff0c;廉颇老矣&#xff0c;不能应对新的挑战&#xff0c;在数字化转型中&#xff0c;不能发挥重要作用…

网络与协议安全复习 - 电子邮件安全

文章目录 PGP(Pretty Good Privacy)功能 S/MIME(Secure/Multipurpose Internet Mail Extensions)DKIM(Domain Keys Identified Mail) PGP(Pretty Good Privacy) 使用符号&#xff1a; Ks&#xff1a;会话密钥、KRa&#xff1a;A 的私钥、KUa&#xff1a;A 的公钥、EP&#xff…

JAVAWeb---- 数据库的简单了解

目录 1.什么是数据库 2.什么是数据库管理系统 3.什么是SQL 4.什么是关系型数据库 1.什么是数据库 用来存储和管理数据的“仓库”&#xff0c;简称DB(Database)&#xff1b; 2.什么是数据库管理系统 对数据库的一切操作都是在数据库管理系统进行的&#xff0c;比如MySQL&a…

软件测试之解构单元测试

软件单元测试是对软件中的最小可测试单元进行检查和验证的过程。这些单元可以是函数、方法、类实例&#xff0c;或者是任何具有明确功能、规格定义和接口定义的程序代码模块。单元测试是软件开发过程中的最低级别的测试活动&#xff0c;它确保软件的独立单元在与程序的其他部分…

eclipse中没有SERVER的解决办法(超详细)

将 Tomcat 和 Eclipse 相关联时&#xff0c;Eclipse有的版本发现 发现eclipse->【Window】->【Preferences】里没有【server】从而配置不了Runtime Environment。所以需要通过eclipse进行安装。 通过我个人的经验下面给出解决办法&#xff1a; 一、获取 Eclipse版本 点击…

Kafka中的时间轮算法

1. Kafka与时间轮&#xff1a; Kafka的定时器底层使用时间轮算法。Kafka时间轮是层次时间轮&#xff0c;并且支持时间轮复用。 优点&#xff1a; 高效的插入操作&#xff1a; 时间轮底层数据结构&#xff08;桶&#xff09;&#xff0c;使用双向链表的设计使得插入操作的时间…

若电路板上的二极管损坏后怎么确定型号呢?

若电路板上的二极管损坏后&#xff0c;还可以看清原来管子的型号&#xff0c;换用一个同型号的二极管即可。若看不清型号或管子未标注型号&#xff0c;一般可以根据该二极管在电路中的作用来代换。电路板上的二极管坏了&#xff0c;如何确定它的型号&#xff1f;。 一般来说看…

气象数据NC、grb2解析成矢量json、CMIS、MICPS及图片应用到webgis

一、基础概念 气象数据通常以多种格式存储和交换&#xff0c;以适应不同的应用需求和处理工具。以下是一些常见的气象数据格式及其转换方法的概述&#xff1a; 常见气象数据格式 1. NetCDF&#xff08;Network Common Data Form&#xff09;&#xff1a;一种自描述、自包含的…

如何在MATLAB中创建各种常用的图形

本文将介绍如何在MATLAB中创建各种常用的图形。将涵盖以下内容&#xff1a; 基本的二维图形 折线图&#xff08;Line Plot&#xff09;散点图&#xff08;Scatter Plot&#xff09;条形图&#xff08;Bar Plot&#xff09;面积图&#xff08;Area Plot&#xff09; 三维图形 三…