-
Seatunnel MYSQL数据同步
-
Docker镜像
- Seatunnel Docker image镜像制作-CSDN博客
-
数据库表
#source库
CREATE TABLE IF NOT EXISTS `student`(`id` INT UNSIGNED AUTO_INCREMENT,`name` VARCHAR(100) NOT NULL,`age` int unsigned,`gender` char(8) NOT NULL,PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;CREATE TABLE IF NOT EXISTS `score`(`id` INT UNSIGNED AUTO_INCREMENT,`subject` VARCHAR(100) NOT NULL,`score` int unsigned NOT NULL,`user_id` int unsigned NOT NULL,PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;#sink库
CREATE TABLE IF NOT EXISTS `accomplishment`(`id` INT UNSIGNED AUTO_INCREMENT,`name` VARCHAR(100),`subject` VARCHAR(100),`score` int unsigned,PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
# Defining the runtime environment
env {# You can set flink configuration hereexecution.parallelism = 1job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:mysql://mysql:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "123456"query = "SELECT a.name, b.subject, b.score from student a, score b where a.id = b.user_id"}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}sink {
# Console {}jdbc {url = "jdbc:mysql://mysql:3306/test2"driver = "com.mysql.cj.jdbc.Driver"user = "root"password = "123456"query = "insert into accomplishment(name, subject, score) values(?,?,?)"}
}
-
临时启动一个容器
- docker run --name seatunnel --hostname seatunnel-node1 --network my-net -e config="/data/seatunnel.batch.conf" -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/config/batch_conf:/data/seatunnel.batch.conf -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/config/plugin_config:/opt/seatunnel/plugin_config -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/lib:/opt/seatunnel/lib -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/plugins:/opt/seatunnel/plugins -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/connectors/seatunnel:/opt/seatunnel/connectors/seatunnel -v /etc/localtime:/etc/localtime seatunnel:2.3.3
-
MySQL-CDC 多表实时同步 insert, update
env {# You can set SeaTunnel environment configuration hereexecution.parallelism = 1job.mode = "STREAMING"# 10秒检查一次,可以适当加大这个值checkpoint.interval = 10000#execution.checkpoint.interval = 10000#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"}# 配置数据源source {MySQL-CDC {result_table_name = "student"# 数据库账号username = "root"password = "123456"# 源表,格式:数据库名.表名table-names = ["test.student"]base-url = "jdbc:mysql://mysql:3306/test"}MySQL-CDC {result_table_name = "score"# 数据库账号username = "root"password = "123456"# 源表,格式:数据库名.表名table-names = ["test.score"]base-url = "jdbc:mysql://mysql:3306/test"}}transform {Sql {source_table_name = "student"result_table_name = "student1"query = "select id, name from student where id>0"}Sql {source_table_name = "score"result_table_name = "score1"query = "select subject, score, user_id as id from score where id>0"}
}# 配置目标库sink {jdbc {url = "jdbc:mysql://mysql:3306/test2"driver = "com.mysql.cj.jdbc.Driver"user = "root"password = "123456"generate_sink_sql = true# 目标数据库名database = "test2"# 目标表名table = "accomplishment"# 主键名称primary_keys = ["id"]source_table_name = "student1"}jdbc {url = "jdbc:mysql://mysql:3306/test2"driver = "com.mysql.cj.jdbc.Driver"user = "root"password = "123456"query = "update accomplishment set subject = ?,score = ? where id = ?"source_table_name = "score1"}}
-
- docker run --name seatunnel --hostname seatunnel-node1 --network my-net -e config="/data/seatunnel.streaming.conf" -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/config/streaming_conf:/data/seatunnel.streaming.conf -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/config/plugin_config:/opt/seatunnel/plugin_config -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/lib:/opt/seatunnel/lib -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/plugins:/opt/seatunnel/plugins -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/connectors/seatunnel:/opt/seatunnel/connectors/seatunnel -v /etc/localtime:/etc/localtime seatunnel:2.3.3
-
No checkpoint found for job异常,启动之后会创建该目录,并且该目录下的文件一直在更新,保留最近的几个文件 .ser文件
- 266369) needed jar urls [file:/opt/seatunnel/lib/seatunnel-transforms-v2.jar, file:/opt/seatunnel/connectors/seatunnel/connector-cdc-mysql-2.3.3.jar, file:/opt/seatunnel/plugins/jdbc/lib/mysql-connector-j-8.0.33.jar]
- 2023-10-26 06:10:04,454 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- 2023-10-26 06:10:04,624 INFO org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage - Path /tmp/seatunnel/checkpoint_snapshot/769804951374266369 is not a directory
- 2023-10-26 06:10:04,624 INFO org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage - No checkpoint found for job, job id is: 769804951374266369
- 2023-10-26 06:10:04,629 INFO org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage - Path /tmp/seatunnel/checkpoint_snapshot/769804951374266369 is not a directory
- 2023-10-26 06:10:04,629 INFO org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage - No checkpoint found for job, job id is: 769804951374266369