简化数据流:Apache SeaTunnel实现多表同步的高效指南

Apache SeaTunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例说明如何实现这些功能。

单表 to 单表

一个source,一个sink。

从mysql同步到mysql,中间不做区分

env {# You can set flink configuration hereexecution.parallelism = 2job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "select * from base_region"}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "insert into base_region(id,region_name) values(?,?)"}
}

执行任务

./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf

单表 to 多表

一个source,多个sink。

从MySQL同步到MySQL,将一个用户表数据同步过去,中间通过2个sql组件分布将男性用户和女性用户分开,在sink阶段分别插入到不同的表:

env {execution.parallelism = 2job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"result_table_name="t_user"query = "select * from t_user;"}
}transform {Sql {source_table_name = "t_user"result_table_name = "t_user_nan"query = "select id,name,birth,gender from t_user where gender ='男';"}Sql {source_table_name = "t_user"result_table_name = "t_user_nv"query = "select id,name,birth,gender from t_user where gender ='女';"}
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nan"query =  "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"}jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nv"query =  "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf

多表 to 单表

多个source,一个sink。

假如有一张交换器使用情况表,一张路由器使用情况表,目标表是将这种数据合在一起的olap表。

表结构如下:

-- dw 源表1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
) ;INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', '交换器1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', '交换器2', 65);-- dw 源表2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
);INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', '路由器1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', '路由器2', 46);-------------------------------------------------------------------------------
-- olap 目标表
CREATE TABLE `device_performance` (`id` INT NOT NULL AUTO_INCREMENT COMMENT '表主键',`event_time` VARCHAR(32) NOT NULL COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',`cpu_usage` FLOAT NOT NULL COMMENT 'CPU利用率单位是%',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) COMMENT='设备状态';

将交换器数据和路由器数据一起同步到olap目标表,总结通过sql组件处理:

env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}

执行任务:

./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf

作业成功!

多表 to 多表

多个source,多个sink。

将交换器使用情况数据和路由器使用情况数据分别同步到对应的目标表,中间sql组件处理

env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance_switch  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance_router  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}

结语

综上所述,Apache SeaTunnel多表同步技术具有高效、实时、可靠和灵活的特点,在企业的数据同步领域发挥着重要作用。借助Apache SeaTunnel多表同步功能,企业能够更好地实现不同系统和数据库之间数据的无缝流转,提升数据管理和利用的效率,为业务发展提供有力支持。希望本文能够帮助读者更好地了解和应用Apache SeaTunnel多表同步,从而为企业数据同步带来更多可能性。

原文链接:https://blog.csdn.net/weixin_44586883/article/details/136049897

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/874846.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pytorch TensorBoard的使用

from torch.utils.tensorboard import SummaryWriter writer SummaryWriter("logs")for i in range(100):writer.add_scalar("yx",i,i) writer.close() 第一个参数 y2x: 这是图表的标题或标签。它会显示在TensorBoard界面中,帮助你识别这条曲线。 第二个参…

(35)远程识别(又称无人机识别)(二)

文章目录 前言 4 ArduRemoteID 5 终端用户数据的设置和使用 6 测试 7 为OEMs添加远程ID到ArduPilot系统的视频教程 前言 在一些国家,远程 ID 正在成为一项法律要求。以下是与 ArduPilot 兼容的设备列表。这里(here)有一个关于远程 ID 的很好解释和常见问题列表…

【数据结构】排序算法——Lesson2

Hi~!这里是奋斗的小羊,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~~ 💥💥个人主页:奋斗的小羊 💥💥所属专栏:C语言 🚀本系列文章为个人学习…

uni-app pinia搭建

1.新建store文件 新建index.js,代码: // import { // createPinia // } from pinia //const store createPinia() import * as Pinia from pinia const pinia Pinia.createPinia() export * from "./modules/user" export * from ".…

计算机网络 6.1Internet概念

第六章 Internet基础 第一节 Internet概念 一、认识Internet 1.定义:集现代计算机技术、通信技术于一体的全球性计算机互联网。 2.地位:当今世界上规模最大的计算机互联网。 3.使用协议:TCP/IP。 4.基本结构: ​ ①主干网…

vscode 寻找全部分支的提交

vscode 寻找全部分支的提交 Git Graph

Python 机器学习求解 PDE 学习项目——PINN 求解二维 Poisson 方程

本文使用 TensorFlow 1.15 环境搭建深度神经网络(PINN)求解二维 Poisson 方程: 模型问题 − Δ u f in Ω , u g on Γ : ∂ Ω . \begin{align} -\Delta u & f \quad & \text{in } \Omega,\\ u & g \quad & \text{on } \Gamma:\p…

Proxmox8基于PC物理机/服务器安装,初始化,挂载磁盘,安装虚拟机

目录 安装文件 开始安装Proxmox 选择启动菜单,F11 后进入启动菜单选择 按需选择是否关闭RAID 选择对应的U盘 进入安装界面 进入安装启动过程 选择系统盘 设置相关信息 设置IP和开启root远程登录 设置dns 设置网卡ip 设置 ssh 远程登录 开机合并local-l…

LeetCode:爬楼梯(C语言)

1、问题概述:每次可以爬 1 或 2 个台阶。有多少种不同的方法可以爬到楼顶 2、示例 示例 1: 输入:n 2 输出:2 解释:有两种方法可以爬到楼顶。 1. 1 阶 1 阶 2. 2 阶 示例 2: 输入:n 3 输出&a…

Telegram曝零日漏洞,可伪装成视频攻击安卓用户

ESET Research在一个地下论坛上发现了一个针对Android Telegram的零日漏洞广告。 ESET将该漏洞命名为“EvilVideo”,并将其报告给Telegram,Telegram于7月11日更新了该应用程序。 EvilVideo允许攻击者发送恶意的有效载荷,这些载荷以视频文件…

计算机网络-配置双机三层互联(静态路由方式)

目录 交换机工作原理路由器工作原理路由信息表组成部分路由器发决策 ARP工作原理配置双机三层互联(静态路由方式) 交换机工作原理 MAC自学习过程 初始状态: 刚启动的交换机的MAC地址表是空的。 学习过程: 当交换机收到一个数据帧…

【.NET】asp.net core 程序重启容器后redis无法连接,连接超时

环境是容器化部署asp.net core 程序当有大量请求打到容器如果此时重启容器会出现,redis无法连接情况。 使用 csredis 库报错: Status unavailable, waiting for recovery. Connect to server timeout 使用StackExchange.Redis 报错: Time…

如何将Python应用容器化到Docker中

将Python应用容器化到Docker中是一个常见且有用的做法,它可以帮助你轻松地在不同的环境中部署和运行你的应用,无需担心环境差异带来的问题。以下是编写Dockerfile以容器化Python应用的基本过程: 1. 准备你的Python应用 首先,确保…

基于深度学习的多智能体系统

基于深度学习的多智能体系统(Multi-Agent Systems, MAS)是指通过多个智能体(agents)之间的协作或竞争来完成复杂任务的系统。这些智能体通过深度学习和强化学习技术进行学习和决策,广泛应用于机器人协作、自动驾驶、分…

一个注解实现分布式锁加锁

目录 一、概述 二、代码的实现 1、引入依赖 2、配置Redisson 3、定义注解 4、添加aop的切面方法 5、 支持 SpEL 表达式 三、代码验证 四、总结 一、概述 在微服务项目的开发进程中,分布式锁的应用场景屡见不鲜。此时,我们需要借助分布式组件来实…

240723基于opencv下图像阈值

文章目录 1.实验环境2.实验目的3.实验代码4.实验结果1.实验环境 python=3.6 opencv=3.4.1 编译器pycharm 2.实验目的 学习数字图像处理中关于阈值处理的几种方式,分析其中的临界值以及他们的区别 3.实验代码 # @File: 15.1简单阈值.py # @Author: chen_song # @Time: 202…

论文阅读——Integrated Diffusive Antenna Array of Low Backscattering

文章目录 摘要一、背景介绍二、天线结构A. 缝隙天线B. 低频扩散单元C. 高频扩散单元D. 集成设计 三、验证总结 论文来源:https://ieeexplore.ieee.org/document/10309141 摘要 文章提出了一种低雷达散射截面(RCS)的扩散天线阵列。 作为示例…

优化PyCharm:让IDE响应速度飞起来

优化PyCharm:让IDE响应速度飞起来 PyCharm,作为一款功能强大的集成开发环境(IDE),在提供丰富功能的同时,有时也会出现响应慢的问题。这不仅影响开发效率,还可能打击开发者的积极性。本文将详细…

Linux内存管理--系列文章八——内存管理架构

一、引子 上篇文章讲述了目前内存的硬件架构,本篇阐述内核中是怎么表示不同架构的物理内存页。 二、平坦内存模型(Flat Memory Model) 在该模型下,物理内存是连续的,所以物理地址也是连续的。这时内核使用struct pa…

STM32嵌入式人工智能边缘计算应用教程

目录 引言环境准备边缘计算系统基础代码实现:实现嵌入式人工智能边缘计算系统 4.1 数据采集模块 4.2 数据处理与推理模块 4.3 通信与网络系统实现 4.4 用户界面与数据可视化应用场景:边缘计算与优化问题解决方案与优化收尾与总结 1. 引言 嵌入式人工智…