DataX源码分析-JobContainer

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel


文章目录

  • 系列文章目录
  • JobContainer
  • JobContainer进行任务拆分和调度的过程
  • JobContainer主要方法
  • JobContainer源码


JobContainer

DataX的JobContainer是一个Job执行器,主要负责Job的全局拆分、调度、前置语句和后置语句等工作。JobContainer可以看作是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报,但它并不进行实际的数据同步操作。

在DataX的运行过程中,Job会被拆分成Task,并在框架提供的容器中执行。JobContainer就是这些容器之一,JobContainer会依次执行job的preHandler()、init()、prepare()、split()、schedule()、post()、postHandle()等方法。

JobContainer继承自AbstractContainer,AbstractContainer作为任务执行容器的基类,提供了任务执行的基本框架和生命周期管理。其具体实现类有JobContainer 和 TaskGroupContainer。

另外,JobContainer还负责接收来自TaskGroupContainer的信息,而TaskGroupContainer则负责执行一组Task的工作单元 。这种设计使得DataX能够灵活地进行任务的拆分和调度,从而提高数据同步的效率。

总的来说,DataX的JobContainer是一个重要的组件,它负责管理和调度Job的执行,确保数据同步过程的顺利进行。


JobContainer进行任务拆分和调度的过程

JobContainer首先会读取job.json中的配置,特别是job.setting.speed里的配置,包括byte、record、channel等参数。根据这些参数,JobContainer会计算出总通道数(channel number)。接着,JobContainer会调用不同的读写插件的split方法来为所有通道分配任务配置。分配好任务配置后,JobContainer会合并这些配置,并使用JobAssignUtil工具类对所有通道的所有配置进行分组。每个组的每个通道都会分配好相应的配置,此时就完成了任务的拆分。
任务调度:
拆分完任务后,JobContainer会开始任务的调度工作。这主要通过AbstractScheduler来完成,其中包含一个TaskExcutor内部类。
TaskExcutor主要用来执行读写线程。在这些读写线程中,会调用Reader和Writer插件里面的具体业务方法,如init()、prepare()、startWriter()、post()等。
通过这种方式,DataX能够并行地执行多个任务,从而提高数据同步的效率。
总的来说,DataX的JobContainer通过一系列复杂的步骤完成了任务的拆分和调度,确保了数据同步过程的顺利进行。

JobContainer主要方法

start():
jobContainer主要负责的工作全部在start()里面,依次执行preHandler()、init()、prepare()、split()、schedule()、post()、postHandle()、invokeHooks()等方法

preHandle():
前置处理方法,通常在作业执行前调用,用于执行一些准备工作,如建立数据库连接、打开文件等。

init():
初始化方法,用于初始化 JobContainer 的内部状态和资源。

prepare():
调用reader或writer插件的prepare方法进行权限 检验等

split():
拆分方法,负责将作业拆分成多个任务(Task)或任务组(TaskGroup)。这个方法会根据作业的配置和插件的拆分策略来确定任务的划分方式。

schedule():
调度方法,负责将拆分好的任务分配到具体的执行容器中( TaskGroupContainer),并启动线程池执行TaskGroup。

postHandle():
执行插件的post方法,用于执行一些清理工作,如关闭数据库连接、释放资源等。

destroy():
销毁方法,用于释放 JobContainer 占用的所有资源,并清理内部状态。

invokeHooks():
调用Communication.collect()收集任务信息方法,用于从各个执行容器中收集任务执行的状态和结果信息,并进行汇总和处理。

JobContainer源码


/*** job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调  度、运行、回收、监控和汇报 但它并不做实际的数据同步操作*/
public class JobContainer extends AbstractContainer {private static final Logger LOG = LoggerFactory.getLogger(JobContainer.class);private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();private long jobId;//reader插件名称private String readerPluginName;//writer插件ing从private String writerPluginName;/*** reader和writer jobContainer的实例*/private Reader.Job jobReader;private Writer.Job jobWriter;//配置信息private Configuration userConf;private long startTimeStamp;private long endTimeStamp;private long startTransferTimeStamp;private long endTransferTimeStamp;private int needChannelNumber;private int totalStage = 1;private ErrorRecordChecker errorLimit;public JobContainer(Configuration configuration) {super(configuration);errorLimit = new ErrorRecordChecker(configuration);}/*** jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、* post以及destroy和statistics*/@Overridepublic void start() {LOG.info("DataX jobContainer starts job.");boolean hasException = false;boolean isDryRun = false;try {this.startTimeStamp = System.currentTimeMillis();isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);if(isDryRun) {LOG.info("jobContainer starts to do preCheck ...");this.preCheck();} else {userConf = configuration.clone();LOG.debug("jobContainer starts to do preHandle ...");this.preHandle();LOG.debug("jobContainer starts to do init ...");this.init();LOG.info("jobContainer starts to do prepare ...");this.prepare();LOG.info("jobContainer starts to do split ...");this.totalStage = this.split();LOG.info("jobContainer starts to do schedule ...");this.schedule();LOG.debug("jobContainer starts to do post ...");this.post();LOG.debug("jobContainer starts to do postHandle ...");this.postHandle();LOG.info("DataX jobId [{}] completed successfully.", this.jobId);this.invokeHooks();}} catch (Throwable e) {LOG.error("Exception when job run", e);hasException = true;if (e instanceof OutOfMemoryError) {this.destroy();System.gc();}if (super.getContainerCommunicator() == null) {// 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化AbstractContainerCommunicator tempContainerCollector;// standalonetempContainerCollector = new StandAloneJobContainerCommunicator(configuration);super.setContainerCommunicator(tempContainerCollector);}Communication communication = super.getContainerCommunicator().collect();// 汇报前的状态,不需要手动进行设置// communication.setState(State.FAILED);communication.setThrowable(e);communication.setTimestamp(this.endTimeStamp);Communication tempComm = new Communication();tempComm.setTimestamp(this.startTransferTimeStamp);Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);super.getContainerCommunicator().report(reportCommunication);throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);} finally {if(!isDryRun) {this.destroy();this.endTimeStamp = System.currentTimeMillis();if (!hasException) {//最后打印cpu的平均消耗,GC的统计VMInfo vmInfo = VMInfo.getVmInfo();if (vmInfo != null) {vmInfo.getDelta(false);LOG.info(vmInfo.totalString());}LOG.info(PerfTrace.getInstance().summarizeNoException());this.logStatistics();}}}}private void preCheck() {this.preCheckInit();this.adjustChannelNumber();if (this.needChannelNumber <= 0) {this.needChannelNumber = 1;}this.preCheckReader();this.preCheckWriter();LOG.info("PreCheck通过");}private void preCheckInit() {this.jobId = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);if (this.jobId < 0) {LOG.info("Set jobId = 0");this.jobId = 0;this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,this.jobId);}Thread.currentThread().setName("job-" + this.jobId);JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());this.jobReader = this.preCheckReaderInit(jobPluginCollector);this.jobWriter = this.preCheckWriterInit(jobPluginCollector);}private Reader.Job preCheckReaderInit(JobPluginCollector jobPluginCollector) {this.readerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_CONTENT_READER_NAME);classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(PluginType.READER, this.readerPluginName);this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER + ".dryRun", true);// 设置reader的jobConfigjobReader.setPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));// 设置reader的readerConfigjobReader.setPeerPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));jobReader.setJobPluginCollector(jobPluginCollector);classLoaderSwapper.restoreCurrentThreadClassLoader();return jobReader;}private Writer.Job preCheckWriterInit(JobPluginCollector jobPluginCollector) {this.writerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(PluginType.WRITER, this.writerPluginName);this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER + ".dryRun", true);// 设置writer的jobConfigjobWriter.setPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));// 设置reader的readerConfigjobWriter.setPeerPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));jobWriter.setPeerPluginName(this.readerPluginName);jobWriter.setJobPluginCollector(jobPluginCollector);classLoaderSwapper.restoreCurrentThreadClassLoader();return jobWriter;}private void preCheckReader() {classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .",this.readerPluginName));this.jobReader.preCheck();classLoaderSwapper.restoreCurrentThreadClassLoader();}private void preCheckWriter() {classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .",this.writerPluginName));this.jobWriter.preCheck();classLoaderSwapper.restoreCurrentThreadClassLoader();}/*** reader和writer的初始化*/private void init() {this.jobId = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);if (this.jobId < 0) {LOG.info("Set jobId = 0");this.jobId = 0;this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,this.jobId);}Thread.currentThread().setName("job-" + this.jobId);JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());//必须先Reader ,后Writerthis.jobReader = this.initJobReader(jobPluginCollector);this.jobWriter = this.initJobWriter(jobPluginCollector);}private void prepare() {this.prepareJobReader();this.prepareJobWriter();}private void preHandle() {String handlerPluginTypeStr = this.configuration.getString(CoreConstant.DATAX_JOB_PREHANDLER_PLUGINTYPE);if(!StringUtils.isNotEmpty(handlerPluginTypeStr)){return;}PluginType handlerPluginType;try {handlerPluginType = PluginType.valueOf(handlerPluginTypeStr.toUpperCase());} catch (IllegalArgumentException e) {throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,String.format("Job preHandler's pluginType(%s) set error, reason(%s)", handlerPluginTypeStr.toUpperCase(), e.getMessage()));}String handlerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(handlerPluginType, handlerPluginName));AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, handlerPluginName);JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());handler.setJobPluginCollector(jobPluginCollector);//todo configuration的安全性,将来必须保证handler.preHandler(configuration);classLoaderSwapper.restoreCurrentThreadClassLoader();LOG.info("After PreHandler: \n" + Engine.filterJobConfiguration(configuration) + "\n");}private void postHandle() {String handlerPluginTypeStr = this.configuration.getString(CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINTYPE);if(!StringUtils.isNotEmpty(handlerPluginTypeStr)){return;}PluginType handlerPluginType;try {handlerPluginType = PluginType.valueOf(handlerPluginTypeStr.toUpperCase());} catch (IllegalArgumentException e) {throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,String.format("Job postHandler's pluginType(%s) set error, reason(%s)", handlerPluginTypeStr.toUpperCase(), e.getMessage()));}String handlerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(handlerPluginType, handlerPluginName));AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, handlerPluginName);JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());handler.setJobPluginCollector(jobPluginCollector);handler.postHandler(configuration);classLoaderSwapper.restoreCurrentThreadClassLoader();}/*** 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,* 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起,* 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉*/private int split() {this.adjustChannelNumber();if (this.needChannelNumber <= 0) {this.needChannelNumber = 1;}List<Configuration> readerTaskConfigs = this.doReaderSplit(this.needChannelNumber);int taskNumber = readerTaskConfigs.size();List<Configuration> writerTaskConfigs = this.doWriterSplit(taskNumber);List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));/*** 输入是reader和writer的parameter list,输出是content下面元素的list*/List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(readerTaskConfigs, writerTaskConfigs, transformerList);LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);return contentConfig.size();}private void adjustChannelNumber() {int needChannelNumberByByte = Integer.MAX_VALUE;int needChannelNumberByRecord = Integer.MAX_VALUE;boolean isByteLimit = (this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);if (isByteLimit) {long globalLimitedByteSpeed = this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);// 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!Long channelLimitedByteSpeed = this.configuration.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,"在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");}needChannelNumberByByte =(int) (globalLimitedByteSpeed / channelLimitedByteSpeed);needChannelNumberByByte =needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");}boolean isRecordLimit = (this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;if (isRecordLimit) {long globalLimitedRecordSpeed = this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);Long channelLimitedRecordSpeed = this.configuration.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,"在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");}needChannelNumberByRecord =(int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);needChannelNumberByRecord =needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");}// 取较小值this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?needChannelNumberByByte : needChannelNumberByRecord;// 如果从byte或record上设置了needChannelNumber则退出if (this.needChannelNumber < Integer.MAX_VALUE) {return;}boolean isChannelLimit = (this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);if (isChannelLimit) {this.needChannelNumber = this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);LOG.info("Job set Channel-Number to " + this.needChannelNumber+ " channels.");return;}throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,"Job运行速度必须设置");}/*** schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中,* 同时不同的执行模式调用不同的调度策略,将所有任务调度起来*/private void schedule() {/*** 这里的全局speed和每个channel的速度设置为B/s*/int channelsPerTaskGroup = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);int taskNumber = this.configuration.getList(CoreConstant.DATAX_JOB_CONTENT).size();this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);PerfTrace.getInstance().setChannelNumber(needChannelNumber);/*** 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务*/List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,this.needChannelNumber, channelsPerTaskGroup);LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());ExecuteMode executeMode = null;AbstractScheduler scheduler;try {executeMode = ExecuteMode.STANDALONE;scheduler = initStandaloneScheduler(this.configuration);//设置 executeModefor (Configuration taskGroupConfig : taskGroupConfigs) {taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());}if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {if (this.jobId <= 0) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,"在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");}}LOG.info("Running by {} Mode.", executeMode);this.startTransferTimeStamp = System.currentTimeMillis();scheduler.schedule(taskGroupConfigs);this.endTransferTimeStamp = System.currentTimeMillis();} catch (Exception e) {LOG.error("运行scheduler 模式[{}]出错.", executeMode);this.endTransferTimeStamp = System.currentTimeMillis();throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);}/*** 检查任务执行情况*/this.checkLimit();}private AbstractScheduler initStandaloneScheduler(Configuration configuration) {AbstractContainerCommunicator containerCommunicator = new StandAloneJobContainerCommunicator(configuration);super.setContainerCommunicator(containerCommunicator);return new StandAloneScheduler(containerCommunicator);}private void post() {this.postJobWriter();this.postJobReader();}private void destroy() {if (this.jobWriter != null) {this.jobWriter.destroy();this.jobWriter = null;}if (this.jobReader != null) {this.jobReader.destroy();this.jobReader = null;}}private void logStatistics() {long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;if (0L == transferCosts) {transferCosts = 1L;}if (super.getContainerCommunicator() == null) {return;}Communication communication = super.getContainerCommunicator().collect();communication.setTimestamp(this.endTimeStamp);Communication tempComm = new Communication();tempComm.setTimestamp(this.startTransferTimeStamp);Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);// 字节速率long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)/ transferCosts;long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)/ transferCosts;reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond);reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);super.getContainerCommunicator().report(reportCommunication);LOG.info(String.format("\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"+ "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"+ "%-26s: %19s\n","任务启动时刻",dateFormat.format(startTimeStamp),"任务结束时刻",dateFormat.format(endTimeStamp),"任务总计耗时",String.valueOf(totalCosts) + "s","任务平均流量",StrUtil.stringify(byteSpeedPerSecond)+ "/s","记录写入速度",String.valueOf(recordSpeedPerSecond)+ "rec/s", "读出记录总数",String.valueOf(CommunicationTool.getTotalReadRecords(communication)),"读写失败总数",String.valueOf(CommunicationTool.getTotalErrorRecords(communication))));if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {LOG.info(String.format("\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n","Transformer成功记录总数",communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),"Transformer失败记录总数",communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),"Transformer过滤记录总数",communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)));}}/*** reader job的初始化,返回Reader.Job** @return*/private Reader.Job initJobReader(JobPluginCollector jobPluginCollector) {this.readerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_CONTENT_READER_NAME);classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(PluginType.READER, this.readerPluginName);// 设置reader的jobConfigjobReader.setPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));// 设置reader的readerConfigjobReader.setPeerPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));jobReader.setJobPluginCollector(jobPluginCollector);jobReader.init();classLoaderSwapper.restoreCurrentThreadClassLoader();return jobReader;}/*** writer job的初始化,返回Writer.Job** @return*/private Writer.Job initJobWriter(JobPluginCollector jobPluginCollector) {this.writerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(PluginType.WRITER, this.writerPluginName);// 设置writer的jobConfigjobWriter.setPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));// 设置reader的readerConfigjobWriter.setPeerPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));jobWriter.setPeerPluginName(this.readerPluginName);jobWriter.setJobPluginCollector(jobPluginCollector);jobWriter.init();classLoaderSwapper.restoreCurrentThreadClassLoader();return jobWriter;}private void prepareJobReader() {classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));LOG.info(String.format("DataX Reader.Job [%s] do prepare work .",this.readerPluginName));this.jobReader.prepare();classLoaderSwapper.restoreCurrentThreadClassLoader();}private void prepareJobWriter() {classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));LOG.info(String.format("DataX Writer.Job [%s] do prepare work .",this.writerPluginName));this.jobWriter.prepare();classLoaderSwapper.restoreCurrentThreadClassLoader();}// TODO: 如果源头就是空数据private List<Configuration> doReaderSplit(int adviceNumber) {classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));List<Configuration> readerSlicesConfigs =this.jobReader.split(adviceNumber);if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR,"reader切分的task数目不能小于等于0");}LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",this.readerPluginName, readerSlicesConfigs.size());classLoaderSwapper.restoreCurrentThreadClassLoader();return readerSlicesConfigs;}private List<Configuration> doWriterSplit(int readerTaskNumber) {classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));List<Configuration> writerSlicesConfigs = this.jobWriter.split(readerTaskNumber);if (writerSlicesConfigs == null || writerSlicesConfigs.size() <= 0) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR,"writer切分的task不能小于等于0");}LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",this.writerPluginName, writerSlicesConfigs.size());classLoaderSwapper.restoreCurrentThreadClassLoader();return writerSlicesConfigs;}/*** 按顺序整合reader和writer的配置,这里的顺序不能乱! 输入是reader、writer级别的配置,输出是一个完整task的配置** @param readerTasksConfigs* @param writerTasksConfigs* @return*/private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration> readerTasksConfigs,List<Configuration> writerTasksConfigs) {return mergeReaderAndWriterTaskConfigs(readerTasksConfigs, writerTasksConfigs, null);}private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration> readerTasksConfigs,List<Configuration> writerTasksConfigs,List<Configuration> transformerConfigs) {if (readerTasksConfigs.size() != writerTasksConfigs.size()) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR,String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",readerTasksConfigs.size(), writerTasksConfigs.size()));}List<Configuration> contentConfigs = new ArrayList<Configuration>();for (int i = 0; i < readerTasksConfigs.size(); i++) {Configuration taskConfig = Configuration.newDefault();taskConfig.set(CoreConstant.JOB_READER_NAME,this.readerPluginName);taskConfig.set(CoreConstant.JOB_READER_PARAMETER,readerTasksConfigs.get(i));taskConfig.set(CoreConstant.JOB_WRITER_NAME,this.writerPluginName);taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,writerTasksConfigs.get(i));if(transformerConfigs!=null && transformerConfigs.size()>0){taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);}taskConfig.set(CoreConstant.TASK_ID, i);contentConfigs.add(taskConfig);}return contentConfigs;}/*** 这里比较复杂,分两步整合 1. tasks到channel 2. channel到taskGroup* 合起来考虑,其实就是把tasks整合到taskGroup中,需要满足计算出的channel数,同时不能多起channel* <p/>* example:* <p/>* 前提条件: 切分后是1024个分表,假设用户要求总速率是1000M/s,每个channel的速率的3M/s,* 每个taskGroup负责运行7个channel* <p/>* 计算: 总channel数为:1000M/s / 3M/s =* 333个,为平均分配,计算可知有308个每个channel有3个tasks,而有25个每个channel有4个tasks,* 需要的taskGroup数为:333 / 7 =* 47...4,也就是需要48个taskGroup,47个是每个负责7个channel,有4个负责1个channel* <p/>* 处理:我们先将这负责4个channel的taskGroup处理掉,逻辑是:* 先按平均为3个tasks找4个channel,设置taskGroupId为0,* 接下来就像发牌一样轮询分配task到剩下的包含平均channel数的taskGroup中* <p/>* TODO delete it** @param averTaskPerChannel* @param channelNumber* @param channelsPerTaskGroup* @return 每个taskGroup独立的全部配置*/@SuppressWarnings("serial")private List<Configuration> distributeTasksToTaskGroup(int averTaskPerChannel, int channelNumber,int channelsPerTaskGroup) {Validate.isTrue(averTaskPerChannel > 0 && channelNumber > 0&& channelsPerTaskGroup > 0,"每个channel的平均task数[averTaskPerChannel],channel数目[channelNumber],每个taskGroup的平均channel数[channelsPerTaskGroup]都应该为正数");List<Configuration> taskConfigs = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);int taskGroupNumber = channelNumber / channelsPerTaskGroup;int leftChannelNumber = channelNumber % channelsPerTaskGroup;if (leftChannelNumber > 0) {taskGroupNumber += 1;}/*** 如果只有一个taskGroup,直接打标返回*/if (taskGroupNumber == 1) {final Configuration taskGroupConfig = this.configuration.clone();/*** configure的clone不能clone出*/taskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT));taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL,channelNumber);taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, 0);return new ArrayList<Configuration>() {{add(taskGroupConfig);}};}List<Configuration> taskGroupConfigs = new ArrayList<Configuration>();/*** 将每个taskGroup中content的配置清空*/for (int i = 0; i < taskGroupNumber; i++) {Configuration taskGroupConfig = this.configuration.clone();List<Configuration> taskGroupJobContent = taskGroupConfig.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);taskGroupJobContent.clear();taskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupJobContent);taskGroupConfigs.add(taskGroupConfig);}int taskConfigIndex = 0;int channelIndex = 0;int taskGroupConfigIndex = 0;/*** 先处理掉taskGroup包含channel数不是平均值的taskGroup*/if (leftChannelNumber > 0) {Configuration taskGroupConfig = taskGroupConfigs.get(taskGroupConfigIndex);for (; channelIndex < leftChannelNumber; channelIndex++) {for (int i = 0; i < averTaskPerChannel; i++) {List<Configuration> taskGroupJobContent = taskGroupConfig.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);taskGroupJobContent.add(taskConfigs.get(taskConfigIndex++));taskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT,taskGroupJobContent);}}taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL,leftChannelNumber);taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID,taskGroupConfigIndex++);}/*** 下面需要轮询分配,并打上channel数和taskGroupId标记*/int equalDivisionStartIndex = taskGroupConfigIndex;for (; taskConfigIndex < taskConfigs.size()&& equalDivisionStartIndex < taskGroupConfigs.size(); ) {for (taskGroupConfigIndex = equalDivisionStartIndex; taskGroupConfigIndex < taskGroupConfigs.size() && taskConfigIndex < taskConfigs.size(); taskGroupConfigIndex++) {Configuration taskGroupConfig = taskGroupConfigs.get(taskGroupConfigIndex);List<Configuration> taskGroupJobContent = taskGroupConfig.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);taskGroupJobContent.add(taskConfigs.get(taskConfigIndex++));taskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupJobContent);}}for (taskGroupConfigIndex = equalDivisionStartIndex;taskGroupConfigIndex < taskGroupConfigs.size(); ) {Configuration taskGroupConfig = taskGroupConfigs.get(taskGroupConfigIndex);taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL,channelsPerTaskGroup);taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID,taskGroupConfigIndex++);}return taskGroupConfigs;}private void postJobReader() {classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));LOG.info("DataX Reader.Job [{}] do post work.",this.readerPluginName);this.jobReader.post();classLoaderSwapper.restoreCurrentThreadClassLoader();}private void postJobWriter() {classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));LOG.info("DataX Writer.Job [{}] do post work.",this.writerPluginName);this.jobWriter.post();classLoaderSwapper.restoreCurrentThreadClassLoader();}/*** 检查最终结果是否超出阈值,如果阈值设定小于1,则表示百分数阈值,大于1表示条数阈值。** @param*/private void checkLimit() {Communication communication = super.getContainerCommunicator().collect();errorLimit.checkRecordLimit(communication);errorLimit.checkPercentageLimit(communication);}/*** 调用外部hook*/private void invokeHooks() {Communication comm = super.getContainerCommunicator().collect();HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", configuration, comm.getCounter());invoker.invokeAll();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/673985.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

postman执行批量测试

1.背景 有许多的人常常需要使用第三方系统进行重复的数据查询&#xff0c;本文介绍使用PostMan的方式对数据进行批量的查询&#xff0c;减少重复的劳动。 2.工具下载 3.初入门 一、如图示进行点击&#xff0c;创建collection 二、输入对应的名称 三、创建Request并进行查…

C++后端开发之Sylar学习二:配置VSCode远程连接Ubuntu开发

C后端开发之Sylar学习二&#xff1a;配置VSCode远程连接Ubuntu开发 没错&#xff0c;我不能像大佬那样直接在Ubuntu上面用Vim手搓代码&#xff0c;只能在本地配置一下VSCode远程连接Ubuntu进行开发咯&#xff01; 本篇主要是讲解了VSCode如何配置ssh连接Ubuntu&#xff0c;还有…

无损音乐下载,最新音乐下载,mp3格式音乐下载,一键下载mp3格式音乐,我只用这个软件,歌曲资源丰富,全网音乐免费下载,稳定运行,告别收费

一、软件简介 现在很多支持一键下载mp3音乐/无损音质音乐的音乐播放器通常都是解析接口套了一个壳&#xff0c;一旦解析接口失效&#xff0c;软件就不能下载音乐了&#xff0c;因此一个稳定的解析接口是这类软件最大的保障。本次小编推荐的音乐下载软件接口非常稳定&#xff0…

货仓选址(c++题解)

题目描述 在一条数轴上有N家商店&#xff0c;它们的坐标分别为 A[1]~A[N]。现在需要在数轴上建立一家货仓&#xff0c;每天清晨&#xff0c;从货仓到每家商店都要运送一车商品。为了提高效率&#xff0c;求把货仓建在何处&#xff0c;可以使得货仓到每家商店的距离之和最小。 …

单片机学习笔记---LED点阵屏的工作原理

目录 LED点阵屏分类 LED点阵屏显示原理 74HC595的介绍 一片74HC595的工作原理 多片级联工作原理 总结 LED点阵屏由若干个独立的LED组成&#xff0c;LED以矩阵的形式排列&#xff0c;以灯珠亮灭来显示文字、图片、视频等。LED点阵屏广泛应用于各种公共场合&#xff0c;如汽…

3分钟带你了解Vue3的nextTick()

前言 Vue 实现响应式并不是数据发生变化之后 DOM 立即变化&#xff0c;而是按一定的策略进行 DOM 的更新。简单来说&#xff0c;Vue在修改数据后&#xff0c;视图不会立刻更新&#xff0c;而是等同一事件循环中的所有数据变化完成之后&#xff0c;再统一进行视图更新&#xff…

第五篇【传奇开心果系列】vant开发移动应用示例:深度解读高度可定制

传奇开心果博文系列 系列博文目录Vant 开发移动应用示例系列 博文目录前言一、Vant高度可定制的重要作用二、样式定制介绍和示例代码三、组件定制介绍和示例代码四、组件库定制介绍和示例代码五、主题定制介绍和示例代码六、语言环境定制介绍和示例代码七、资源加载定制介绍和示…

网络层DoS

网络层是OSI参考模型中的第三层&#xff0c;介于传输层和数据链路层之间&#xff0c;其目的 是实现两个终端系统之间数据的透明传送&#xff0c;具体功能包括&#xff1a;寻址和路由选择、连 接的建立、保持和终止等。位于网络层的协议包括ARP 、IP和ICMP等。下面就 ICMP为例&…

创建一个VUE项目(vue2和vue3)

背景&#xff1a;电脑已经安装完vue2和vue3环境 一台Mac同时安装vue2和vue3 https://blog.csdn.net/c103363/article/details/136059783 创建vue2项目 vue init webpack "项目名称"创建vue3项目 vue create "项目名称"

diffusers单机多卡推理(全网首发)

起因 博主在部署InstantID项目时&#xff0c;显存不够&#xff0c;想要将模型分散在多张卡上。 翻到这篇发现是分布式推理&#xff0c;博主一直以为这个可以达到我想要的效果&#xff0c;但是效果是多线程并行推理&#xff0c;并不能将一个模型切片在多个GPU上。 Distributed …

一起玩儿物联网人工智能小车(ESP32)——57. SPI总线协议初探(一)

摘要&#xff1a;介绍SPI总线的基本知识 前面已经学习过IIC总线协议&#xff0c;今天开始介绍另一个总线协议——SPI。SPI&#xff08;Serial Peripheral Interface&#xff0c;串行外设接口&#xff09;是由Motorola提出的一种高速、全双工、同步的通信总线。并且在芯片的管脚…

TI的电量计驱动在卸载时导致Linux卡死

背景 最近移植TI电量计芯片bq40z50的驱动&#xff0c;移植完毕后&#xff0c;能正常读取电池信息了&#xff0c;但是无意中发现驱动卸载会导致Linux卡死&#xff0c;死前终端闪过大量打印&#xff0c;将putty的缓冲区都耗尽了&#xff0c;必须启用syslog转发并用visual syslog…

用docker 配置scala spark环境

要使用Docker配置Scala和Spark环境&#xff0c;您可以按照以下步骤进行操作。以下是一个基本的示例&#xff0c;您可能需要根据您的具体需求进行调整。 安装Docker: 在您的系统上安装Docker。您可以从Docker官方网站下载并安装适用于您操作系统的版本。 创建Dockerfile: 在您的…

数据分析基础之《pandas(6)—高级处理》

一、缺失值处理 1、如何处理nan 两种思路&#xff1a; &#xff08;1&#xff09;如果样本量很大&#xff0c;可以删除含有缺失值的样本 &#xff08;2&#xff09;如果要珍惜每一个样本&#xff0c;可以替换/插补&#xff08;计算平均值或中位数&#xff09; 2、判断数据是否…

L1-080 乘法口诀数列

一、题目 二、解题思路 三、代码 #include<iostream> using namespace std; int main() {int a1,a2,n;cin>>a1>>a2>>n;if(n1){cout<<a1;return 0; }int a[n*2];cout<<a1<<" "<<a2;a[0]a1;a[1]a2;for(int i2,j2;i&l…

ubuntu20安装mongodb

方法一&#xff1a;直接安装(命令是直接从mongo官网Install MongoDB Community Edition on Ubuntu — MongoDB Manual复制的&#xff09; cat /etc/lsb-release sudo apt-get install -y gnupg curl curl -fsSL https://www.mongodb.org/static/pgp/server-7.0.asc | \sudo gp…

背包问题(01背包、完全背包、多重背包)详解(超详细!!!),及题目代码和题意,包含6个例题。

第一题&#xff1a;01背包问题 01背包问题 时间限制&#xff1a;1秒 内存限制&#xff1a;128M 题目描述 一个旅行者有一个最多能装 M 公斤的背包&#xff0c;现在有 n 件物品&#xff0c;它们的重量分别是 W1&#xff0c;W2&#xff0c;...,Wn ,它们的价值分别为 C1…

maven插件maven-jar-plugin构建jar文件详细使用

文章目录 前言一、使用方式二、常用配置详解1.classesDirectory2.outputDirectory3.excludes4.includes5.archive添加Implementation和Specification属性添加manifestEntries添加键值对属性Manifest Sections自定义manifest配置设置一个可执行的jar文件精确设置Classpath根据目…

[C#]无法获取源 https://api.nuge t.org/v3-index存储签名信息解决方法

参考网上大部分方法错误&#xff0c;根本不起作用。正确方法是 C:\Users\你的用户名\AppData\Roaming\NuGet找到NuGet.Config打开&#xff0c;看到类似下面信息&#xff08;可能不一样&#xff09; <?xml version"1.0" encoding"utf-8"?> <co…

FXTM富拓监管变更!2024开年连续3家交易商注销牌照

交易商的监管信息是经常发生变更的&#xff0c;即使第一次投资时查询平台监管牌照&#xff0c;投资者仍需持续关注其监管动态。千万不要以为第一步审核好后就万事大吉了&#xff01; 2024年开年&#xff0c;就有3家交易商的重要信息发生变更&#xff0c;注销其金融监管牌照&…