seatunnel数据集成(二)数据同步

seatunnel数据集成(一)简介与安装
seatunnel数据集成(二)数据同步
seatunnel数据集成(三)多表同步
seatunnel数据集成(四)连接器使用


1、Connector类型

seatunnel连接器类型丰富,支持以下类型:

Source

Sink

Clickhouse

Clickhouse

Elasticsearch

Elasticsearch

FakeSource

FakeSource

Ftp

Ftp

Github/Gitlab

Github/Gitlab

Greenplum

Greenplum

Hdfs file

Hdfs file

Hive

Hive

Http

Http

Hudi/Iceberg

Hudi/Iceberg

JDBC

JDBC

Kudu

Kudu

MongoDB

MongoDB

Mysql / MySQL CDC

Mysql / MySQL CDC

Redis

Redis

Kafka

Kafka

StarRocks

StarRocks

Phoenix

Phoenix

...

...

2、mysql to mysql

参数(必备):

mysql source

  • url
  • driver
  • query

mysql sink

  • url
  • driver

样例:

env {# You can set flink configuration hereexecution.parallelism = 2job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "select * from base_region limit 4"}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"user = "user"password = "password"query = "insert into base_region(id,region_name) values(?,?)"}
}

脚本执行:

./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf

3、mysql to hive

如果用的是Spark/Flink引擎,需要Spark/Flink已经集成好了Hive

如果用SeaTunnel Zeta引擎,需要将

  • seatunnel-hadoop3-3.1.4-uber.jar
  • hive-exec-2.3.9.jar

放到 $SEATUNNEL_HOME/lib/ 目录下。

样例:

env {job.mode = "BATCH"
}source {Jdbc {url = "jdbc:mysql:///127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver = "com.mysql.cj.jdbc.Driver"user = "user"password = "password"query = "select * from source_user"}
}transform {
}sink {Hive {table_name = "ods.sink_user"metastore_uri = "thrift://bigdata101:9083"}}

 执行脚本: 

./bin/seatunnel.sh --config ./config/mysql2hive.conf

4、增量同步(参数)

需求:根据创建时间,每天增量抽取

表结构:

-- db 
CREATE TABLE `t_order_detail` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',`order_id` bigint DEFAULT NULL COMMENT '订单编号',`sku_id` bigint DEFAULT NULL COMMENT 'sku_id',`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称',`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',`create_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=863 DEFAULT CHARSET=utf8mb3 COMMENT='订单明细表';-- dw
CREATE TABLE `ods_t_order_detail_di` (`id` bigint NOT NULL COMMENT '编号',`order_id` bigint DEFAULT NULL COMMENT '订单编号',`sku_id` bigint DEFAULT NULL COMMENT 'sku_id',`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称',`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',`create_time` datetime DEFAULT NULL COMMENT '创建时间'
) ENGINE=InnoDB AUTO_INCREMENT=863 DEFAULT CHARSET=utf8mb3 COMMENT='ODS订单明细表';SELECT id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time 
FROM t_order_detail 
WHERE create_time >= '2024-02-05' 
and create_time < DATE_ADD('2024-02-05',interval 1 day) 

样例:

env {execution.parallelism = 2job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "select * from t_order_detail where create_time >= REPLACE('"${etl_dt}"', 'T', ' ') and create_time < date_add(REPLACE('"${etl_dt}"', 'T', ' '),interval 1 day);"}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "insert into ods_t_order_detail_di (id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time) values(?,?,?,?,?,?,?,?)"}
}

脚本执行:

./bin/seatunnel.sh --config ./config/mysql2mysql_ods_t_order_detail_di.conf -i etl_dt='2024-02-05'

5、实时

指定作业模式为:STREAMING

job.mode = "STREAMING"

基于mysql cdc:

env {execution.parallelism = 2job.mode = "STREAMING"checkpoint.interval = 10000#execution.checkpoint.interval = 10000#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}source {MySQL-CDC {username = "user"password = "password"table-names = ["test.source_user"]base-url = "jdbc:mysql://127.0.0.1:3306/test"}
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"username = "user"password = "password"generate_sink_sql = truedatabase = "dw"table = "source_user_01"primary_keys = ["userid"]}
}

执行脚本: 

./bin/seatunnel.sh --config ./config/mysql2mysql_rt.conf

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/678811.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java】乐观锁、悲观锁、可重入锁、不可重入锁的关系是什么?

乐观锁、悲观锁、可重入锁和不可重入锁是Java中不同的锁策略和实现方式&#xff0c;它们之间的关系可以从以下几个方面来理解&#xff1a; 乐观锁与悲观锁&#xff1a;这两种锁代表的是不同的加锁策略&#xff0c;而不是具体的锁实现。乐观锁假设冲突不会频繁发生&#xff0c;因…

深入理解 Nginx 插件及功能优化指南

深入理解 Nginx 插件及功能优化指南 深入理解 Nginx 插件及功能优化指南1. Nginx 插件介绍1.1 HTTP 模块插件ngx_http_rewrite_modulengx_http_access_module 1.2 过滤器插件ngx_http_gzip_modulengx_http_ssl_module 1.3 负载均衡插件ngx_http_upstream_modulengx_http_upstre…

CSS Selector—选择方法,和html自动——异步社区的爬取(动态网页)——爬虫(get和post的区别)

这里先说一下GET请求和POST请求&#xff1a; post我们平时是要加data的也就是信息&#xff0c;你会发现我们平时百度之类的 搜索都是post请求 get我们带的是params&#xff0c;是发送我们指定的内容。 要注意是get和post请求&#xff01;&#xff01;&#xff01; 先说一下异…

element-ui解决上传文件时需要携带请求数据的问题

一、问题描述 在前端使用element-ui进行文件上传时&#xff0c;需要携带请求头信息&#xff0c;比如Token。 二、问题解决 1. 表单实现 action置空添加:http-request属性覆盖默认的上传行为&#xff0c;实现自定义上传文件。注意:src后的图片路径如果是个网络请求(外链)&…

python+django人力资源管理系统7w5x3

技术栈 后端&#xff1a;python 前端&#xff1a;vue.jselementui 框架&#xff1a;django Python版本&#xff1a;python3.7 数据库&#xff1a;mysql5.7 数据库工具&#xff1a;Navicat 开发软件&#xff1a;PyCharm .设计框架&#xff1a;Vue 1. 表现层&#xff1a;写多…

JCIM | MD揭示PTP1B磷酸酶激活RtcB连接酶的机制

Background 内质网应激反应&#xff08;UPR&#xff09; 中的一个重要过程。UPR是由内质网中的三种跨膜传感器&#xff08;IRE1、PERK和ATF6&#xff09;控制的细胞应激反应&#xff0c;当内质网中的蛋白质折叠能力受到压力时&#xff0c;UPR通过减少蛋白质合成和增加未折叠或错…

《计算思维导论》笔记:10.4 关系模型-关系运算

《大学计算机—计算思维导论》&#xff08;战德臣 哈尔滨工业大学&#xff09; 《10.4 关系模型-关系运算》 一、引言 本章介绍数据库的基本数据模型&#xff1a;关系模型-关系运算。 二、什么是关系运算 在数据库理论中&#xff0c;关系运算&#xff08;Relational Operatio…

《Docker极简教程》--Docker基础--Docker的核心组件

一、Docker引擎 unsetunset1.1 Docker引擎的组成和功能unsetunset Docker 引擎是 Docker 平台的核心组件&#xff0c;它负责容器的创建、管理和运行。Docker 引擎主要包括两个主要组件&#xff1a;Docker 守护程序&#xff08;Docker Daemon&#xff09;和 Docker 客户端&#…

蓝桥杯Web应用开发-CSS3 新特性【练习一:属性有效性验证】

练习一&#xff1a;属性有效性验证 页面上有一个邮箱输入框&#xff0c;当你的输入满足邮箱格式时&#xff0c;输入框的背景颜色为绿色&#xff1b;当你的输入不满足要求&#xff0c;背景颜色为红色。 新建一个 index2.html 文件&#xff0c;在其中写入以下内容。 <!DOCTYP…

1.7 Binance_interface API U本位合约交易账户

Binance_interface API U本位合约交易账户 Github地址PyTed量化交易研究院 1. API U本位合约交易账户接口总览 方法解释Pathset_positionSide_dual更改持仓模式/fapi/v1/positionSide/dualget_positionSide_dual查询持仓模式/fapi/v1/positionSide/dualset_multiAssetsMargi…

读书笔记之《重塑大脑重塑人生》:大脑强大的可塑性

《重塑大脑重塑人生》作者是诺曼道伊奇&#xff0c;原作名: The Brain That Changes Itself: Stories of Personal Triumph from the Frontiers of Brain Science &#xff0c;于 2015-1-20出版。 诺曼•道伊奇&#xff08;Norman Doidge&#xff09;是医学博士&#xff0c;精…

【JavaEE】传输层网络协议

传输层网络协议 1. UDP协议 1.1 特点 面向数据报&#xff08;DatagramSocket&#xff09;数据报大小限制为64k全双工不可靠传输有接收缓冲区&#xff0c;无发送缓冲区 UDP的特点&#xff0c;我理解起来就是工人组成的**“人工传送带”**&#xff1a; 面向数据报&#xff08;…

ubuntu22.04 安装部署04:经常死机,鼠标,键盘无响应

相关文章&#xff1a; ubuntu22.04 安装部署01&#xff1a;禁用内核更新 ubuntu22.04安装部署02&#xff1a;禁用显卡更新 ubuntu22.04安装部署03&#xff1a; 设置root密码 一、现象说明 1. 开机一小时后&#xff0c;突然之间网络掉线&#xff0c;鼠标、键盘无反应。 2.…

中国判决生效,诺基亚全面与中国手机签署授权协议,降低专利费

日前媒体报道指诺基亚与中国两家手机企业都签署了专利授权协议&#xff0c;全面结束诉讼&#xff0c;而这一切正是在OPPO于去年底在重庆法院就OPPO与诺基亚的专利费诉讼问题&#xff0c;做出裁决之后&#xff0c;要求诺基亚按公平、公正等合理收费原则收取专利费。 这几年诺基亚…

C语言的字符函数的使用与模拟实现

各位少年&#xff0c;大家好&#xff0c;我是博主那一脸阳光&#xff0c;今天给大家分享字符函数的使用与模拟实现。 前言&#xff1a;如果你想使用一个锤子非常方便&#xff0c;直接使用做好的就行&#xff0c;但是锤子是怎么构成的&#xff0c;你就不知所云了&#xff0c;模拟…

8 scala的共生对象

1 单例对象 在编写 Java 程序时&#xff0c;我们经常会通过编写静态方法代码&#xff0c;去封装常用的 Utility 类。 在 Scala 中没有静态成员这一概念&#xff0c;所以&#xff0c;如果我们要定义静态属性或方法&#xff0c;就需要使用 Scala 的单例对象 object。Scala 的对…

CSS动画简述

CSS中的动画用法主要是通过使用关键帧动画&#xff08;keyframes&#xff09;来定义动画的不同阶段及其样式。下面是一个例子&#xff1a; <!DOCTYPE html> <html> <head><style>/* 定义关键帧动画 */keyframes example {0% {background-color: red;}…

系统架构22 - 软件架构设计(1)

软件架构设计 概述关键步骤 生命周期需求分析阶段设计阶段实现阶段构件组装阶段部署阶段后开发阶段 概述 从需求分析到软件设计之间的过渡过程称为软件架构&#xff08;Software Architecture, SA&#xff09;。只要软件架构设计好了&#xff0c;整个软件就不会出现坍塌性的错…

一个简单的2024龙年倒计时页面html源码

预览如下 复制粘贴下面的代码&#xff0c;另存为html文件即可打开&#xff0c;文字链接都可以更改&#xff1a; <!doctype html> <html> <head> <meta charset"utf-8"> <title>2024新年倒计时</title> <style>::-webki…

C语言--------数据在内存中的存储

1.整数在内存中的存储 整数在内存是以补码的形式存在的&#xff1b; 整型家族包括char,int ,long long,short类型&#xff1b; 因为char类型是以ASCII值形式存在&#xff0c;所以也是整形家族&#xff1b; 这四种都包括signed,unsigned两种&#xff0c;即有符号和无符号&am…