DataX源码分析-插件机制

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel
八、DataX源码分析-插件机制


文章目录

  • 系列文章目录
  • 前言
  • 一、插件分类
  • 插件目录结构
  • 插件加载原理


前言

DataX的插件机制是其核心特性之一,它使得DataX能够灵活地适应各种不同的数据源的数据同步。这一机制主要基于插件开发框架,该框架主要包括Reader插件、Transformer插件、Writer插件。

DataX的插件机制还采用了框架+插件的架构。框架负责连接Reader和Writer插件,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题。这种架构使得插件只需关心数据的读取或写入本身,而同步的共性问题则由框架来处理。

此外,DataX的插件机制还具有良好的扩展性和可维护性。开发者可以根据需要开发新的Reader或Writer插件来支持新的数据源类型,而无需修改DataX的核心框架代码。这种插件化的设计使得DataX能够适应不断变化的业务需求和技术环境。

在插件的加载和初始化方面,DataX使用了类似Java SPI(Service Provider Interface)的机制。它会在指定的插件目录中查找并加载插件,然后将其注册到插件注册中心。这样,当需要使用某个插件时,就可以从注册中心中获取其实例,并进行相应的操作。

总的来说,DataX的插件机制是一种非常灵活和可扩展的设计,它使得DataX能够适应各种不同的数据源和数据存储需求,同时也为开发者提供了丰富的扩展和定制化的可能性。


一、插件分类

按照功能分:
reader, 读插件,例如mysqlReader,从mysql读取数据
writer, 写插件。例如mysqlWriter,给mysql写入数据;
transformer, 中间结果转换,例如SubstrTransformer用于字符截取;
按照运行类型分:
Job级别的插件
Task级别的插件

插件目录结构

datax\plugin下分2个reader和writer目录,下面以mysql为例
在这里插入图片描述
plugin.json内容:

{"name": "mysqlreader","class": "xxx.plugin.reader.mysqlreader.MysqlReader","description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.","developer": "xx"
}

在这里插入图片描述

插件加载原理

  • DataX进程启动入口为com.alibaba.datax.core.Engineengine.entry()
    public static void entry(final String[] args) throws Throwable {Options options = new Options();options.addOption("job", true, "Job config.");options.addOption("jobid", true, "Job unique id.");options.addOption("mode", true, "Job runtime mode.");BasicParser parser = new BasicParser();CommandLine cl = parser.parse(options, args);String jobPath = cl.getOptionValue("job");// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1String jobIdString = cl.getOptionValue("jobid");RUNTIME_MODE = cl.getOptionValue("mode");Configuration configuration = ConfigParser.parse(jobPath);}
  • 读取并解析插件配置
    ConfigParser.parse(final String jobPath)传入job路径,该方法组装解析,最后返回一个Configuration对象,Configuration里解析出了reader,writer,handler等插件名称;提取完插件名称后,会去reader目录和writer目录,寻找插件的位置。
  • 动态加载插件
    插件的加载都是通过自定义类加载器JarLoader动态加载,提供插件相关Jar隔离的加载机制。插件的加载接口由LoadUtil类负责,当要加载一个插件时,需要实例化一个JarLoader,然后切换thread class loader之后,才加载插件。这个主要由ClassLoaderSwapper实现。
  • JarLoader类
    JarLoader 负责加载指定路径下的插件 JAR 文件。它会检查 JAR 文件的合法性、有效性以及是否包含必要的插件实现类。继承自URLClassLoader提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
/*** 提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。*/
public class JarLoader extends URLClassLoader{public JarLoader(String[] paths) {this(paths, JarLoader.class.getClassLoader());}public JarLoader(String[] paths, ClassLoader parent) {super(getURLs(paths), parent);}private static URL[] getURLs(String[] paths) {Validate.isTrue(null != paths && 0 != paths.length,"jar包路径不能为空.");List<String> dirs = new ArrayList<String>();for (String path : paths) {dirs.add(path);JarLoader.collectDirs(path, dirs);}List<URL> urls = new ArrayList<URL>();for (String path : dirs) {urls.addAll(doGetURLs(path));}return urls.toArray(new URL[0]);}private static void collectDirs(String path, List<String> collector) {if (null == path || StringUtils.isBlank(path)) {return;}File current = new File(path);if (!current.exists() || !current.isDirectory()) {return;}for (File child : current.listFiles()) {if (!child.isDirectory()) {continue;}collector.add(child.getAbsolutePath());collectDirs(child.getAbsolutePath(), collector);}}private static List<URL> doGetURLs(final String path) {Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空.");File jarPath = new File(path);Validate.isTrue(jarPath.exists() && jarPath.isDirectory(),"jar包路径必须存在且为目录.");/* set filter */FileFilter jarFilter = new FileFilter() {@Overridepublic boolean accept(File pathname) {return pathname.getName().endsWith(".jar");}};/* iterate all jar */File[] allJars = new File(path).listFiles(jarFilter);List<URL> jarURLs = new ArrayList<URL>(allJars.length);for (int i = 0; i < allJars.length; i++) {try {jarURLs.add(allJars[i].toURI().toURL());} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR,"系统加载jar包出错", e);}}return jarURLs;}
}
  • LoadUtil
    LoadUtil 是一个工具类,用于辅助插件的加载和初始化过程。LoadUtil 类通常包含静态方法,这些方法简化了插件加载的逻辑,使得 DataX 的核心框架能够与具体的插件进行交互。
    LoadUtil 的主要职责包括:
    插件加载:LoadUtil 提供了加载插件的方法。这些方法会根据配置文件中指定的插件类型和名称,使用 Java 的反射机制来加载插件的类定义。加载过程可能包括查找类路径下的 JAR 文件、读取插件的元数据以及验证插件的合法性。
    插件实例化:一旦插件类被加载,LoadUtil 会负责创建插件的实例。这通常涉及到调用插件类的无参构造函数,并返回该实例的引用。LoadUtil 会处理任何与实例化相关的异常,以确保在出现问题时能够给出适当的错误消息。
    插件注册:加载并实例化插件后,LoadUtil 可能会将插件实例注册到一个全局的插件注册中心。这样,DataX 的其他部分就可以在需要时获取并使用这些插件实例。
    配置传递:LoadUtil 还可能负责将配置文件中针对插件的配置参数传递给插件实例。这确保了插件能够根据用户的配置进行正确的初始化。
    错误处理:如果在加载、实例化或配置插件过程中发生错误,LoadUtil 会负责处理这些错误。这可能包括记录日志、抛出异常或采取其他恢复措施。
public class LoadUtil {private static final String pluginTypeNameFormat = "plugin.%s.%s";private LoadUtil() {}private enum ContainerType {Job("Job"), Task("Task");private String type;private ContainerType(String type) {this.type = type;}public String value() {return type;}}/*** 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别* 具体pluginName,故使用pluginType.pluginName作为key放置在该map中*/private static Configuration pluginRegisterCenter;/*** jarLoader的缓冲*/private static Map<String, JarLoader> jarLoaderCenter = new HashMap();/*** 设置pluginConfigs,方便后面插件来获取** @param pluginConfigs*/public static void bind(Configuration pluginConfigs) {pluginRegisterCenter = pluginConfigs;}private static String generatePluginKey(PluginType pluginType,String pluginName) {return String.format(pluginTypeNameFormat, pluginType.toString(),pluginName);}private static Configuration getPluginConf(PluginType pluginType,String pluginName) {Configuration pluginConf = pluginRegisterCenter.getConfiguration(generatePluginKey(pluginType, pluginName));if (null == pluginConf) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INSTALL_ERROR,String.format("DataX不能找到插件[%s]的配置.",pluginName));}return pluginConf;}/*** 加载JobPlugin,reader、writer都可能要加载** @param pluginType* @param pluginName* @return*/public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,String pluginName) {Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Job);try {AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz.newInstance();jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));return jobPlugin;} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format("DataX找到plugin[%s]的Job配置.",pluginName), e);}}/*** 加载taskPlugin,reader、writer都可能加载** @param pluginType* @param pluginName* @return*/public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType,String pluginName) {Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Task);try {AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clazz.newInstance();taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));return taskPlugin;} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format("DataX不能找plugin[%s]的Task配置.",pluginName), e);}}/*** 根据插件类型、名字和执行时taskGroupId加载对应运行器** @param pluginType* @param pluginName* @return*/public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) {AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType,pluginName);switch (pluginType) {case READER:return new ReaderRunner(taskPlugin);case WRITER:return new WriterRunner(taskPlugin);default:throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format("插件[%s]的类型必须是[reader]或[writer]!",pluginName));}}/*** 反射出具体plugin实例** @param pluginType* @param pluginName* @param pluginRunType* @return*/@SuppressWarnings("unchecked")private static synchronized Class<? extends AbstractPlugin> loadPluginClass(PluginType pluginType, String pluginName,ContainerType pluginRunType) {Configuration pluginConf = getPluginConf(pluginType, pluginName);JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);try {return (Class<? extends AbstractPlugin>) jarLoader.loadClass(pluginConf.getString("class") + "$"+ pluginRunType.value());} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);}}public static synchronized JarLoader getJarLoader(PluginType pluginType,String pluginName) {Configuration pluginConf = getPluginConf(pluginType, pluginName);JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,pluginName));if (null == jarLoader) {String pluginPath = pluginConf.getString("path");if (StringUtils.isBlank(pluginPath)) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format("%s插件[%s]路径非法!",pluginType, pluginName));}jarLoader = new JarLoader(new String[]{pluginPath});jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),jarLoader);}return jarLoader;}
}
  • ClassLoaderSwapper

ClassLoaderSwapper有一个属性storeClassLoader, 用于保存着当前线程的classLoader切换之前的ClassLoader。

/*** 为避免jar冲突,比如hbase可能有多个版本的读写依赖jar包,JobContainer和TaskGroupContainer,就需要脱离当前classLoader去加载这些jar包,执行完成后,又退回到原来classLoader上继续执行接下来的代码*/
public final class ClassLoaderSwapper {private ClassLoader storeClassLoader = null;private ClassLoaderSwapper() {}public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() {return new ClassLoaderSwapper();}/*** 保存当前classLoader,并将当前线程的classLoader设置为所给classLoader** @param* @return*/public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {this.storeClassLoader = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(classLoader);return this.storeClassLoader;}/*** 将当前线程的类加载器设置为保存的类加载* @return*/public ClassLoader restoreCurrentThreadClassLoader() {ClassLoader classLoader = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(this.storeClassLoader);return classLoader;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/684853.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于GPT一键完成数据分析全流程的AI Agent: Streamline Analyst

大型语言模型&#xff08;LLM&#xff09;的兴起不仅为获取知识和解决问题开辟了新的可能性&#xff0c;而且催生了一些新型智能系统&#xff0c;例如旨在辅助用户完成特定任务的AI Copilot以及旨在自动化和自主执行复杂任务的AI Agent&#xff0c;使得编程、创作等任务变得高效…

Prompt Tuning:深度解读一种新的微调范式

阅读该博客&#xff0c;您将系统地掌握如下知识点&#xff1a; 什么是预训练语言模型&#xff1f; 什么是prompt&#xff1f;为什么要引入prompt&#xff1f;相比传统fine-tuning有什么优势&#xff1f; 自20年底开始&#xff0c;prompt的发展历程&#xff0c;哪些经典的代表…

CV | Segment Anything论文详解及代码实现

本文主要是详解解释了SAM的论文部分以及代码实现~ 论文&#xff1a;2023.04.05_Segment Anything 论文地址&#xff1a;2304.02643.pdf (arxiv.org) 代码地址&#xff1a;facebookresearch/segment-anything: The repository provides code for running inference with the Seg…

随机过程及应用学习笔记(四) 马尔可夫过程

马尔可夫过程是理论上和实际应用中都十分重要的一类随机过程。 目录 前言 一、马尔可夫过程的概念 二、离散参数马氏链 1 定义 2 齐次马尔可夫链 3 齐次马尔可夫链的性质 三、齐次马尔可夫链状态的分类 四、有限马尔可夫链 五、状态的周期性 六、极限定理 七、生灭过…

接口测试方法论

第1章 测试那点事 单元测试》接口测试》界面测试 接口就是包含特定输入和特定输出的一套逻辑处理单元&#xff0c;用户无须知晓接口的内部实现逻辑&#xff0c;这也可以称为接口的黑河处理逻辑。因为服务对象不同&#xff0c;接口又可分为两种&#xff1a;一种是系统或服务的…

K8S集群实践之十:虚拟机部署阶段性总结

目录 1. 说明&#xff1a; 2. 安装准备 2.1 每个节点设置双网卡&#xff0c;一卡做网桥&#xff08;外部访问&#xff09;&#xff0c;一卡做NAT&#xff08;集群内网访问&#xff09; 2.2 准备一个可用的代理服务器 3. 由于虚拟机崩溃&#xff08;停电&#xff0c;宿主机…

VBA技术资料MF119:数据验证的添加与删除

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套&#xff0c;分为初级、中级、高级三大部分&#xff0c;教程是对VBA的系统讲解&#…

《苍穹外卖》知识梳理p7-用户下单与模拟微信支付

用户下单与微信支付 一.用户下单 1.1 订单表的设计 订单功能是一个核心的功能&#xff1b; 需要使用2张表&#xff1a; 1.订单表&#xff1a;记录一些订单信息&#xff0c;比如收货人&#xff0c;收获地址&#xff0c;支付金额&#xff0c;下单时间等&#xff1b; 2.订单详情…

GitLab配置SSHKey

段落一&#xff1a;什么是SSH密钥 SSH&#xff08;Secure Shell&#xff09;是一种网络协议&#xff0c;用于安全地远程登录和执行命令。SSH密钥是一种用于身份验证的加密文件&#xff0c;它允许您在与远程服务器通信时&#xff0c;无需输入密码即可进行认证。在GitLab中配置S…

Kibana:如何嵌入 Kibana 仪表板

作者&#xff1a;Carly Richmond 像我这样的前端工程师经常提出的要求是将 Kibana 等来源的现有仪表板嵌入到 JavaScript Web 应用程序中。 这是我必须多次执行的任务&#xff0c;因为我们希望快速部署用户生成的视图或允许用户控制给定的视图。 从我们从精彩的开发者社区收到的…

JVM(3)高级篇

1 GraalVM 1.1 什么是GraalVM GraalVM是Oracle官方推出的一款高性能JDK&#xff0c;使用它享受比OpenJDK或者OracleJDK更好的性能。 GraalVM的官方网址&#xff1a;https://www.graalvm.org/ 官方标语&#xff1a;Build faster, smaller, leaner applications。 更低的CPU、内…

Sora技术报告——Video generation models as world simulators

文章目录 1. 视频生成模型&#xff0c;可以视为一个世界模拟器2. 技术内容2.1 将可视数据转换成patches2.2 视频压缩网络2.3 Spacetime Latent Patches2.4 Scaling transformers 用于视频生成2.5 可变的持续时间&#xff0c;分辨率&#xff0c;宽高比2.6 抽样的灵活性2.7 改进框…

Invalid DataSize: cannot convert ‘30Mb‘ to Long

Invalid DataSize: cannot convert 30Mb to Long servlet:multipart:max-file-size: 30MBmax-request-size: 30MB

【研究生复试】计算机软件工程人工智能研究生复试——资料整理(速记版)——计算机网络

1、JAVA 2、计算机网络 3、计算机体系结构 4、数据库 5、计算机租场原理 6、软件工程 7、大数据 8、英文 自我介绍 2. 计算机网络 1. TCP如何解决丢包和乱序&#xff1f; 序列号&#xff1a;TCP所传送的每段数据都有标有序列号&#xff0c;避免乱序问题发送端确认应答、超时…

反向迭代器------封装的力量

目录 一、list封装中模板参数Ref和Ptr的理解 二、反向迭代器的实现 一、list封装中模板参数Ref和Ptr的理解 对于反向迭代器&#xff0c;是我们在前面STL模拟实现中留下的一个问题。在之前的文章中&#xff0c;我们极大程度上的利用了模板&#xff0c;从而减少了许多的代码&…

09、全文检索 -- Solr -- SpringBoot 整合 Spring Data Solr (生成DAO组件 和 实现自定义查询方法)

目录 SpringBoot 整合 Spring Data SolrSpring Data Solr的功能&#xff08;生成DAO组件&#xff09;&#xff1a;Spring Data Solr大致包括如下几方面功能&#xff1a;Query查询&#xff08;属于半自动&#xff09;代码演示&#xff1a;1、演示通过dao组件来保存文档1、实体类…

SpringCloud之Feign发送Http请求

文章目录 http客户端Feign使用步骤自定义Feign的配置Feign的性能优化Feign的性能优化-连接池配置 Feign的最佳实践 http客户端Feign Feign的介绍&#xff1a; Feign是一个声明式的http客户端&#xff0c;官方地址&#xff1a;https:/github.com/OpenFeign/feign 其作用就是帮助…

laravel_进程门面_简单介绍

文章目录 Facade是什么&#xff1f;Facade能干什么Facade有哪些方法&#xff1f;怎么使用Facade呢&#xff1f;详细的代码解释Symfony Process是什么&#xff1f;介绍Symfony总结 Facade是什么&#xff1f; 在 Laravel 框架中&#xff0c;Facade 是一种设计模式。 它提供了一…

476. Number Complement(数字的补数)

问题描述 对整数的二进制表示取反&#xff08;0 变 1 &#xff0c;1 变 0&#xff09;后&#xff0c;再转换为十进制表示&#xff0c;可以得到这个整数的补数。 例如&#xff0c;整数 5 的二进制表示是 “101” &#xff0c;取反后得到 “010” &#xff0c;再转回十进制表示…