SpringBoot整合DataX数据同步(自动生成job文件)

SpringBoot整合Datax数据同步

文章目录

  • SpringBoot整合Datax数据同步
    • 1.简介
        • 设计理念
      • DataX3.0框架设计
      • DataX3.0核心架构
        • 核心模块介绍
        • DataX调度流程
    • 2.DataX3.0插件体系
    • 3.数据同步
      • 1.编写job的json文件
      • 2.进入bin目录下,执行文件
    • 4.SpringBoot整合DataX生成Job文件并执行
        • 1.准备工作
        • 2.文件目录如图
        • 3.Mysql数据同步
        • 4.Elasticsearch写入Mysql数据
    • 5.Job文件参数说明
      • 1.MysqlReader
      • 2.MysqlWriter
      • 3.ElasticsearchWriter

1.简介

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

DataX 是一个异构数据源离线同步工具,致力于实现各种异构数据源之间稳定高效的数据同步功能。

Download DataX下载地址

Github主页地址:https://github.com/alibaba/DataX

请点击:Quick Start

在这里插入图片描述

  • 设计理念

    为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

DataX3.0框架设计

在这里插入图片描述

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

在这里插入图片描述

核心模块介绍
  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

2.DataX3.0插件体系

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL读 、写
Oracle读 、写
OceanBase读 、写
SQLServer读 、写
PostgreSQL读 、写
DRDS读 、写
Kingbase读 、写
通用RDBMS(支持所有关系型数据库)读 、写
阿里云数仓数据存储ODPS读 、写
ADB
ADS
OSS读 、写
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中间件datahub读 、写
SLS读 、写
图数据库阿里云 GDB读 、写
Neo4j
NoSQL数据存储OTS读 、写
Hbase0.94读 、写
Hbase1.1读 、写
Phoenix4.x读 、写
Phoenix5.x读 、写
MongoDB读 、写
Cassandra读 、写
数仓数据存储StarRocks读 、写
ApacheDoris
ClickHouse读 、写
Databend
Hive读 、写
kudu
selectdb
无结构化数据存储TxtFile读 、写
FTP读 、写
HDFS读 、写
Elasticsearch
时间序列数据库OpenTSDB
TSDB读 、写
TDengine读 、写

3.数据同步

1.编写job的json文件

mysql数据抽取到本地内存

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","name","amount",],"connection": [{"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"],"table": ["user"]}],"password": "root","username": "root"}},"writer": {"name": "streamwriter","parameter": {"print": false,"encoding": "UTF-8"}}}],"setting": {"speed": {"channel": "1"}}}
}

mysqlWriter数据写入

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "streamreader","parameter": {"column" : [{"value": "DataX","type": "string"},{"value": 19880808,"type": "long"},{"value": "1988-08-08 08:08:08","type": "date"},{"value": true,"type": "bool"},{"value": "test","type": "bytes"}],"sliceRecordCount": 1000}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "insert","username": "root","password": "root","column": ["id","name"],"session": ["set session sql_mode='ANSI'"],"preSql": ["delete from test"],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk","table": ["test"]}]}}}]}
}

mysql数据同步

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","project_code","category"],"connection": [{"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"],"table": ["project_index"]}],"password": "root","username": "root"}},"writer": {"name": "mysqlwriter","parameter": {"column": ["id","project_code","category"],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai","table": ["project_index"]}],"password": "root","username": "root","writeMode": "update"}}}],"setting": {"speed": {"channel": "1"}}}
}

2.进入bin目录下,执行文件

需python环境

python datax.py {YOUR_JOB.json}

4.SpringBoot整合DataX生成Job文件并执行

1.准备工作

下载datax,安装lib下的datax-commondatax-core的jar到本地maven仓库

依赖

  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.54</version></dependency><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-core</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-common</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.21</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency>
spring:datasource:url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceserver:port: 8080# datax 相关配置,在生成文件时使用
datax:home: D:/software/datax/# job文件存储位置save-path: D:/software/datax/job/

属性配置

/*** datax配置* @author moshangshang*/
@Data
@Component
@ConfigurationProperties("datax")
public class DataXProperties {private String home;private String savePath;}

公共实体

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Content {private Reader reader;private Writer writer;}
@Data
public class DataXJobRoot {private Job job;
}
@Data
public class Job {private List<Content> content;private Setting setting = new Setting();}
@Data
public class Setting {private Speed speed = new Speed();@Datapublic static class Speed {private String channel = "1";}
}
public abstract class Parameter {
}
/***  读取抽象类*  @author moshangshang*/
@Data
public abstract class Reader {private String name;private Parameter parameter;}
/*** 写入抽象类*  @author moshangshang*/
@Data
public abstract class Writer {private String name;private Parameter parameter;
}

公共处理接口

public interface DataXInterface {/*** 获取读对象*/Reader getReader(String table);/*** 获取写对象*/Writer getWriter(String table);/*** 同类型读取写入,如mysql到mysql*/String getJobTaskName(String readerTable,String writeTable);/*** 自定义读取写入* @param reader 读处理* @param write 写处理* @param suffix 文件名*/String getJobTaskName(Reader reader,Writer write, String suffix);}

/*** 接口抽象类* @author moshangshang*/
@Component
public abstract class AbstractDataXHandler implements DataXInterface {@Autowiredprivate DataXProperties dataXProperties;/*** 自定义读取写入* @param reader 读处理* @param write 写处理* @param suffix 文件名*/@Overridepublic String getJobTaskName(Reader reader, Writer write, String suffix) {DataXJobRoot root = new DataXJobRoot();Job job = new Job();root.setJob(job);Content content = new Content(reader,write);job.setContent(Collections.singletonList(content));String jsonStr = JSONUtil.parse(root).toJSONString(2);String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_"+suffix+".json";File file = FileUtil.file(dataXProperties.getSavePath(),fileName);FileUtil.appendString(jsonStr, file, "utf-8");return fileName;}}

工具方法

@Repository
public class DatabaseInfoRepository {private final JdbcTemplate jdbcTemplate;@Autowiredpublic DatabaseInfoRepository(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}/*** 获取所有表名*/public List<String> getAllTableNames() {String sql = "SHOW TABLES";return jdbcTemplate.queryForList(sql, String.class);}/*** 根据表名获取字段信息*/public List<Map<String, Object>> getTableColumns(String tableName) {String sql = "SHOW FULL COLUMNS FROM " + tableName;return jdbcTemplate.queryForList(sql);}
}
@Slf4j
@Service
public class DatabaseInfoService {private final DatabaseInfoRepository databaseInfoRepository;@Autowiredpublic DatabaseInfoService(DatabaseInfoRepository databaseInfoRepository) {this.databaseInfoRepository = databaseInfoRepository;}public void printAllTablesAndColumns() {// 获取所有表名List<String> tableNames = databaseInfoRepository.getAllTableNames();// 遍历表名,获取并打印每个表的字段信息for (String tableName : tableNames) {System.out.println("Table: " + tableName);// 获取当前表的字段信息List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);// 遍历字段信息并打印for (Map<String, Object> column : columns) {System.out.println("  Column: " + column.get("Field") + " (Type: " + column.get("Type") + ")" + " (Comment: " + column.get("Comment") + ")");}// 打印空行作为分隔System.out.println();}}/** 查询指定表的所有字段列表 */public List<String> getColumns(String tableName) {List<String> list = new ArrayList<>();// 获取当前表的字段信息List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);// 遍历字段信息并打印for (Map<String, Object> column : columns) {list.add(column.get("Field").toString());}return list;}/** 查询指定表的所有字段列表,封装成HdfsWriter格式 */public List<HdfsWriter.Column> getHdfsColumns(String tableName) {List<HdfsWriter.Column> list = new ArrayList<>();// 获取当前表的字段信息List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);// 遍历字段信息并打印for (Map<String, Object> column : columns) {String name = column.get("Field").toString();String typeDb = column.get("Type").toString();String type = "string";if (typeDb.equals("bigint")) {type = "bigint";} else if (typeDb.startsWith("varchar")) {type = "string";} else if (typeDb.startsWith("date") || typeDb.endsWith("timestamp")) {type = "date";}HdfsWriter.Column columnHdfs = new HdfsWriter.Column();columnHdfs.setName(name);columnHdfs.setType(type);list.add(columnHdfs);}return list;}
}

datax的job任务json执行方法


/*** 执行器* @author moshangshang*/
@Slf4j
@Component
public class DataXExecuter {@Autowiredprivate DataXProperties dataXProperties;public void run(String fileName) throws IOException {System.setProperty("datax.home", dataXProperties.getHome());String filePath = dataXProperties.getSavePath()+fileName;String dataxJson = JSONUtil.parse(FileUtils.readFileToString(new File(filePath),"UTF-8")).toJSONString(2);log.info("datax log:{}",dataxJson);String[] dataxArgs = {"-job", filePath, "-mode", "standalone", "-jobid", "-1"};try {Engine.entry(dataxArgs);}catch (DataXException e){log.error("执行失败",e);} catch (Throwable throwable) {log.error("DataX执行异常,error cause::\n" + ExceptionTracker.trace(throwable));}}}
2.文件目录如图

在这里插入图片描述

3.Mysql数据同步

1.编写mysql读写对象,继承读写接口

@Data
public class MysqlReader extends Reader {public String getName() {return "mysqlreader";}@Datapublic static class MysqlParameter extends Parameter {private List<String> column;private List<Connection> connection;private String password;private String username;private String splitPk;private String where;}@Datapublic static class Connection {private List<String> jdbcUrl;private List<String> table;private List<String> querySql;}
}
@EqualsAndHashCode(callSuper = true)
@Data
public class MysqlWriter extends Writer {public String getName() {return "mysqlwriter";}@EqualsAndHashCode(callSuper = true)@Datapublic static class MysqlParameter extends Parameter {private List<String> column;private List<Connection> connection;private String password;private String username;private String writeMode = "update";}@Datapublic static class Connection {private String jdbcUrl;private List<String> table;}
}

2.配置mysql读和写的数据库信息

/*** mysql读写配置* @author moshangshang*/
@Data
@ConfigurationProperties("datax.mysql.reader")
public class DataXMysqlReaderProperties {private String url;private String password;private String username;}
/*** mysql读写配置* @author moshangshang*/
@Data
@ConfigurationProperties("datax.mysql.writer")
public class DataXMysqlWriterProperties {private String url;private String password;private String username;}
# datax 相关配置,在生成文件时使用
datax:mysql:reader:url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: rootwriter:url: jdbc:mysql://127.0.0.1:3306/ruoyi_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: root

2.编写mysql处理类,继承抽象处理接口。生成job文件

/*** mysql读写处理* @author moshangshang*/
@Component
@EnableConfigurationProperties({DataXMysqlReaderProperties.class, DataXMysqlWriterProperties.class})
public class MysqlHandler extends AbstractDataXHandler{@Autowiredprivate DatabaseInfoService databaseInfoService;@Autowiredprivate DataXProperties dataXProperties;@Autowiredprivate DataXMysqlReaderProperties dataXMysqlReaderProperties;@Autowiredprivate DataXMysqlWriterProperties dataXMysqlWriterProperties;@Overridepublic Reader getReader(String table) {MysqlReader reader = new MysqlReader();MysqlReader.MysqlParameter readerParameter = new MysqlReader.MysqlParameter();readerParameter.setPassword(dataXMysqlReaderProperties.getPassword());readerParameter.setUsername(dataXMysqlReaderProperties.getUsername());List<String> readerColumns = databaseInfoService.getColumns(table);readerParameter.setColumn(readerColumns);MysqlReader.Connection readerConnection = new MysqlReader.Connection();readerConnection.setJdbcUrl(Collections.singletonList(dataXMysqlReaderProperties.getUrl()));readerConnection.setTable(Collections.singletonList(table));readerParameter.setConnection(Collections.singletonList(readerConnection));reader.setParameter(readerParameter);return reader;}@Overridepublic Writer getWriter(String table) {MysqlWriter writer = new MysqlWriter();MysqlWriter.MysqlParameter writerParameter = new MysqlWriter.MysqlParameter();writerParameter.setPassword(dataXMysqlWriterProperties.getPassword());writerParameter.setUsername(dataXMysqlWriterProperties.getUsername());List<String> columns = databaseInfoService.getColumns(table);writerParameter.setColumn(columns);MysqlWriter.Connection connection = new MysqlWriter.Connection();connection.setJdbcUrl(dataXMysqlWriterProperties.getUrl());connection.setTable(Collections.singletonList(table));writerParameter.setConnection(Collections.singletonList(connection));writer.setParameter(writerParameter);return writer;}@Overridepublic String getJobTaskName(String readerTable,String writeTable) {DataXJobRoot root = new DataXJobRoot();Job job = new Job();root.setJob(job);Content content = new Content(getReader(readerTable),getWriter(writeTable));job.setContent(Collections.singletonList(content));String jsonStr = JSONUtil.parse(root).toJSONString(2);String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_h2h.json";File file = FileUtil.file(dataXProperties.getSavePath(),fileName);FileUtil.appendString(jsonStr, file, "utf-8");return fileName;}
}

3.调用执行器,执行任务job

@SpringBootTest
public class DataxTest {@Autowiredprivate MysqlHandler mysqlHandler;@Autowiredprivate DataXExecuter dataXExecuter;/*** 读t_user表同步到user*/@Testpublic void test() throws IOException {String jobTask = mysqlHandler.getJobTaskName("t_user", "user");dataXExecuter.run(jobTask);}/*** 直接执行json文件*/@Testpublic void test2() throws IOException {dataXExecuter.run("datax_job_83798b5f1766406289a44fe681dc8878_m2m.json");}
}

4.执行结果

在这里插入图片描述

4.Elasticsearch写入Mysql数据

注意事项:

  • es目前只支持写入不支持读取
  • mysql数据写入es时,需保证es与mysql的列数column相同,不支持类似mysql的部分字段写入
  • 需保证列的顺序相同,写入时不会根据name名称字段去自动对应,如果顺序不一致,则可能会转换错误。如id,name,写入name,id

原理:使用elasticsearch的rest api接口, 批量把从reader读入的数据写入elasticsearch

创建es索引映射

PUT datax_data
{"mappings": {"properties": {"name":{"type": "keyword"},"amount":{"type": "long"}}}
}

1.添加es配置和文件

spring:elasticsearch:#username:#password:#path-prefix:uris: http://127.0.0.1:9200#连接elasticsearch超时时间connection-timeout: 60000socket-timeout: 30000
# datax 相关配置,在生成文件时使用
datax:elasticsearch:writer:url: http://127.0.0.1:9200username:password:

/*** es写配置* @author moshangshang*/
@Data
@ConfigurationProperties("datax.elasticsearch.writer")
public class DataXElasticSearchProperties {private String url;private String username;private String password;}

2.编写生成job文件实体类

@EqualsAndHashCode(callSuper = true)
@Data
public class ElasticSearchWriter extends Writer {public String getName() {return "elasticsearchwriter";}@EqualsAndHashCode(callSuper = true)@Datapublic static class ElasticSearchParameter extends Parameter {private List<Column> column;private String endpoint;private String accessId;private String accessKey;private String index;private Settings settings;private String type = "default";private boolean cleanup = true;private boolean discovery = false;private Integer batchSize = 1000;private String splitter = ",";}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Column {private String name;private String type;private String analyzer;}@Datapublic static class Settings {private Map<String,Object> index;}}

3.es接口扩展

/*** es接口扩展* @author moshangshang*/
public interface DataXElasticsearchInterface extends DataXInterface {Writer getWriter(String table, Map<String,Object> indexSettings);}

4.es核心处理类

@Component
@EnableConfigurationProperties({DataXElasticSearchProperties.class})
public class ElasticSearchHandler extends AbstractDataXHandler implements DataXElasticsearchInterface{@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredprivate DataXElasticSearchProperties dataXElasticSearchProperties;@Overridepublic Reader getReader(String table) {return null;}/*** 普通写入* @param index 索引* @return Writer*/@Overridepublic Writer getWriter(String index) {ElasticSearchWriter writer = new ElasticSearchWriter();ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);writer.setParameter(writerParameter);return writer;}@Overridepublic String getJobTaskName(String readerTable, String writeTable) {return null;}/*** es写入,带setting设置*/@Overridepublic Writer getWriter(String index,Map<String,Object> map) {ElasticSearchWriter writer = new ElasticSearchWriter();ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);ElasticSearchWriter.Settings settings = new ElasticSearchWriter.Settings();settings.setIndex(map);writerParameter.setSettings(settings);writer.setParameter(writerParameter);return writer;}/*** 获取公共写入参数*/public ElasticSearchWriter.ElasticSearchParameter getElasticSearchWriter(String index){ElasticSearchWriter.ElasticSearchParameter writerParameter = new ElasticSearchWriter.ElasticSearchParameter();List<Column> columns = getEsColumns(index);writerParameter.setColumn(columns);writerParameter.setEndpoint(dataXElasticSearchProperties.getUrl());writerParameter.setAccessId(dataXElasticSearchProperties.getUsername());writerParameter.setAccessKey(dataXElasticSearchProperties.getPassword());writerParameter.setIndex(index);return writerParameter;}/*** 获取指定索引的映射字段* 读取时和创建顺序相反*/public List<ElasticSearchWriter.Column> getEsColumns(String index){List<ElasticSearchWriter.Column> columns = new ArrayList<>();// 获取操作的索引文档对象IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(index));Map<String, Object> mapping = indexOperations.getMapping();mapping.forEach((k,value) ->{JSONObject json = JSON.parseObject(JSONObject.toJSONString(value));for (Map.Entry<String, Object> entry : json.entrySet()) {String key = entry.getKey();JSONObject properties = JSON.parseObject(JSONObject.toJSONString(entry.getValue()));String type = properties.getString("type");String analyzer = properties.getString("analyzer");columns.add(new ElasticSearchWriter.Column(key,type,analyzer));}});return columns;}}

5.测试

    @Testpublic void test3() throws IOException {Map<String,Object> settings = new HashMap<>();settings.put("number_of_shards",1);settings.put("number_of_replicas",1);String jobTask = elasticSearchHandler.getJobTaskName(mysqlHandler.getReader("t_user"), elasticSearchHandler.getWriter("datax_data",settings),"m2e");dataXExecuter.run(jobTask);}

5.Job文件参数说明

1.MysqlReader

参数名描述必选
jdbcUrl对端数据库的JDBC连接信息并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,MysqlReader可以依次探测ip的可连接性,直到选择一个合法的IP,如果全部连接失败,MysqlReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里外部使用情况,JSON数组填写一个JDBC连接即可。
username数据源的用户名
password数据源指定用户名的密码
table所选取的需要同步的表。支持多张表同时抽取,用户自己需保证多张表是同一schema结构,注意,table必须包含在connection配置单元中。
column所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用代表默认使用所有列配置,例如['']。
splitPk分区主键,DataX因此会启动并发任务进行数据同步。推荐splitPk用户使用表主键
where筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。注意:limit不是SQL的合法where子句。where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,均视作同步全量数据。
querySql查询SQL同步。当用户配置了这一项之后,当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。

2.MysqlWriter

参数名描述必选
jdbcUrl目的数据库的 JDBC 连接信息。作业运行时,DataX 会在你提供的 jdbcUrl 后面追加如下属性:yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true;在一个数据库上只能配置一个 jdbcUrl 值。这与 MysqlReader 支持多个备库探测不同,因为此处不支持同一个数据库存在多个主库的情况(双主导入数据情况)
username数据源的用户名
password数据源指定用户名的密码
table目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致。table 和 jdbcUrl 必须包含在 connection 配置单元中
column目的表需要写入数据的字段,例如: “column”: [“id”,“name”,“age”]。如果要依次写入全部列,使用*表示, 例如: "column": ["*"]
sessionDataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
preSql写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, … datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:"preSql":["delete from 表名"],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
postSql写入数据到目的表后,会执行这里的标准语句
writeMode控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句;可选:insert/replace/update,默认insert
batchSize一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。默认:1024

3.ElasticsearchWriter

参数名描述必选
endpointElasticSearch的连接地址
accessIdhttp auth中的user
accessKeyhttp auth中的password
indexelasticsearch中的index名
typeelasticsearch中index的type名,默认index名
cleanup是否删除原表, 默认值:false
batchSize每次批量数据的条数,默认值:1000
trySize失败后重试的次数, 默认值:30
timeout客户端超时时间,默认值:600000
discovery启用节点发现将(轮询)并定期更新客户机中的服务器列表。默认false
compressionhttp请求,开启压缩,默认true
multiThreadhttp请求,是否有多线程,默认true
ignoreWriteError忽略写入错误,不重试,继续写入,默认false
alias数据导入完成后写入别名
aliasMode数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个),默认append
settings创建index时候的settings, 与elasticsearch官方相同
splitter如果插入数据是array,就使用指定分隔符,默认值:-,-
columnelasticsearch所支持的字段类型,样例中包含了全部
dynamic不使用datax的mappings,使用es自己的自动mappings,默认值: false

参考资料:https://blog.csdn.net/wlddhj/article/details/137585979

Github主页地址:https://github.com/alibaba/DataX

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/42327.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生产力工具|VS Code安装及使用指南

一、VS Code介绍 &#xff08;一&#xff09;软件介绍 Visual Studio Code&#xff08;简称VS Code&#xff09;是由Microsoft开发的免费开源代码编辑器&#xff0c;适用于Windows、macOS和Linux操作系统。它支持多种编程语言&#xff0c;如JavaScript、Python、C等&#xff0…

知识社区在线提问小程序模板源码

蓝色的知识问答&#xff0c;问答交流&#xff0c;知识社区&#xff0c;在线提问手机app小程序网页模板。包含&#xff1a;社区主页、提问、我的、绑定手机&#xff0c;实名认证等。 知识社区在线提问小程序模板源码

品质至上!中国星坤连接器的发展之道!

在电子连接技术领域&#xff0c;中国星坤以其卓越的创新能力和对品质的不懈追求&#xff0c;赢得了业界的广泛认可。凭借在高精度连接器设计和制造上的领先地位&#xff0c;星坤不仅获得了多项实用新型专利&#xff0c;更通过一系列国际质量管理体系认证&#xff0c;彰显了其产…

【Qt5.12.9】程序无法显示照片问题(已解决)

问题记录&#xff1a;Qt5.12.9下无法显示照片 我的工程名为03_qpainter&#xff0c;照片cd.png存放在工程目录下的image文件夹中。 /03_qpainter/image/cd.png 因为这是正点原子Linux下Qt书籍中的例程&#xff0c;在通过学习其配套的例程中的项目&#xff0c;发现我的项目少…

【Python】搭建属于自己 AI 机器人

目录 前言 1 准备工作 1.1 环境搭建 1.2 获取 API KEY 2 写代码 2.1 引用库 2.2 创建用户 2.3 创建对话 2.4 输出内容 2.5 调试 2.6 全部代码 2.7 简短的总结 3 优化代码 3.1 规范代码 3.1.1 引用库 3.1.2 创建提示词 3.1.3 创建模型 3.1.4 规范输出&#xf…

西门子1200高速计数器编码器的应用 接线 组态 编程 调试 测距测速

编码器的应用、接线、组态、博途1200编程与调试&#xff1a;高速计数器&#xff0c;用于给PLC发高速脉冲&#xff0c;接I点 用来例如&#xff1a;检测电机转速&#xff0c;皮带输送机运行的距离 &#xff08;粗略定位&#xff09; 360&#xff1a;代表转一圈会对外发360个脉冲&…

系统化学习 H264视频编码(02) I帧 P帧 B帧 引入及相关概念解读

说明&#xff1a;我们参考黄金圈学习法&#xff08;什么是黄金圈法则?->模型 黄金圈法则&#xff0c;本文使用&#xff1a;why-what&#xff09;来学习音H264视频编码。本系列文章侧重于理解视频编码的知识体系和实践方法&#xff0c;理论方面会更多地讲清楚 音视频中概念的…

Vue3+.NET6前后端分离式管理后台实战(二十八)

1&#xff0c;Vue3.NET6前后端分离式管理后台实战(二十八)

【Linux进阶】文件系统6——理解文件操作

目录 1.文件的读取 1.1.目录 1.2.文件 1.3.目录树读取 1.4.文件系统大小与磁盘读取性能 2.增添文件 2.1.数据的不一致&#xff08;Inconsistent&#xff09;状态 2.2.日志式文件系统&#xff08;Journaling filesystem&#xff09; 3.Linux文件系统的运行 4、文件的删…

干货 | 2024大模型场景下智算平台的设计与优化实践(免费下载)

诚挚邀请您微信扫描以下二维码加入方案驿站知识星球&#xff0c;获取上万份PPT/WORD解决方案&#xff01;&#xff01;&#xff01;感谢支持&#xff01;&#xff01;&#xff01;

【C++】string的底层原理及实现

文章目录 string类的存储结构默认成员函数构造函数析构函数拷贝构造函数赋值重载 容量操作size()capacity()reserve()resize()clear() 遍历与访问operator[ ]迭代器范围与for 增删查改push_back()pop_back()append()operatorinsert()erase()c_str()find()substr() 非成员函数op…

力扣考研经典题 反转链表

核心思想 头插法&#xff1a; 不断的将cur指针所指向的节点放到头节点之前&#xff0c;然后头节点指向cur节点&#xff0c;因为最后返回的是head.next 。 解题思路 1.如果头节点是空的&#xff0c;或者是只有一个节点&#xff0c;只需要返回head节点即可。 if (head null …

Echarts中的热力图和漏斗图(在Vue中使用热力图和漏斗图)

热力图 (Heatmap) Echarts的热力图用于展示两个维度数据矩阵中的值分布情况。它通过在平面上划分成多个矩形区域&#xff0c;并用不同的颜色填充这些区域来表示数据的大小或强度。颜色渐变从浅到深通常映射着数值从小到大&#xff0c;从而直观展示数据的集中程度和分布模式。热…

半同步主从复制

半同步主从复制的概念 半同步主从复制&#xff08;Semisynchronous Replication, SBR&#xff09;是MySQL数据库中的一种数据复制方式&#xff0c;它在异步复制的基础上增加了一定程度的同步性&#xff0c;旨在提高数据安全性&#xff0c;减少数据丢失的风险。 半同步主从复制…

阶段三:项目开发---大数据开发运行环境搭建:任务8:安装配置Redis

任务描述 知识点&#xff1a;安装配置Redis 重 点&#xff1a; 安装配置Redis 难 点&#xff1a;无 内 容&#xff1a; Redis&#xff08;Remote Dictionary Server )&#xff0c;即远程字典服务&#xff0c;是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可…

电路基础知识汇总

1.0 串连&#xff0c;并联&#xff0c;混连 串联的定义 电路串联是一种电路元件的连接方式&#xff0c;其中各个元件沿着单一路径互相连接&#xff0c;形成一个连续的链。在串联电路中&#xff0c;每个节点最多只连接两个元件&#xff0c;这意味着电流只有一条路径可以通过整个…

Apache Seata Mac下的Seata Demo环境搭建

本文来自 Apache Seata官方文档&#xff0c;欢迎访问官网&#xff0c;查看更多深度文章。 本文来自 Apache Seata官方文档&#xff0c;欢迎访问官网&#xff0c;查看更多深度文章。 Mac下的Seata Demo环境搭建&#xff08;AT模式&#xff09; 前言 最近因为工作需要&#xf…

Git教程

文章目录 Git分布式版本控制工具版本控制器的方式常用命令远程仓库Tip Git分布式版本控制工具 ​ Git是一个开源的分布式版本控制系统&#xff0c;可以有效、高速地处理从很小到非常大的项目版本管理。 ​ Git是分布式的&#xff0c;Git不需要有中心服务器&#xff0c;我们每…

【感谢告知】本账号内容调整,聚焦于Google账号和产品的使用经验和问题案例分析

亲爱的各位朋友&#xff1a; 感谢您对本账号的关注和支持&#xff01; 基于对朋友们需求的分析和个人兴趣的转变&#xff0c;该账号从今天将对内容做一些调整&#xff0c;有原来的内容改为Google&#xff08;谷歌&#xff09;账号和产品的使用经验&#xff0c;以及相关问题的…

24西安电子科技大学经济与管理学院—考研录取情况

24西安电子科技大学—经理与管理学院—考研录取统计 01、经理与管理学院各个方向 02、24经济与管理近三年复试分数线对比 1、经管院24年院线相对于23年院线普遍下降2-15分&#xff0c;个别专业上涨4-10分。 2、经管院应用经济学2024年院线350分&#xff1b;管理科学与工程院线…