datax底层原理_Datax 插件加载原理

Datax 插件加载原理

插件类型

Datax有好几种类型的插件,每个插件都有不同的作用。

reader, 读插件。Reader就是属于这种类型的

writer, 写插件。Writer就是属于这种类型的

transformer, 目前还未知

handler, 主要用于任务执行前的准备工作和完成的收尾工作。

插件类型由PluginType枚举表示

1

2

3public enum PluginType {

READER("reader"), TRANSFORMER("transformer"), WRITER("writer"), HANDLER("handler");

}

根据运行类型,又可以分为Job级别的插件和Task级别的插件。uml如下图所示

插件配置读取

ConfigParser首先会读取配置文件,提取需要使用的reader,writer,prehandler 和 posthandler的名称。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34public static Configuration parse(final String jobPath){

Configuration configuration = ConfigParser.parseJobConfig(jobPath);

// 合并 conf/core.json文件的配置, false 表示不覆盖原有的配置

configuration.merge(

//CoreConstant.DATAX_CONF_PATH的值为conf/core.json

ConfigParser.parseCoreConfig

(CoreConstant.DATAX_CONF_PATH),

false);

// 获取job.content列表的第一个reader

String readerPluginName = configuration.getString(

//CoreConstant.DATAX_JOB_CONTENT_READER_NAME的值为job.content[0].reader.name

CoreConstant.DATAX_JOB_CONTENT_READER_NAME);

// 获取job.content列表的第一个writer

String writerPluginName = configuration.getString(

//CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME的值为job.content[0].writer.name

CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);

// 读取job.preHandler.pluginName

String preHandlerName = configuration.getString(

//CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME的值为job.preHandler.pluginName

CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);

// 读取job.postHandler.pluginName

String postHandlerName = configuration.getString(

//CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME的值为job.postHandler.pluginName

CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

Set pluginList = new HashSet();

pluginList.add(readerPluginName);

pluginList.add(writerPluginName);

......

// 调用parsePluginConfig生成plugin的配置,然后合并

configuration.merge(parsePluginConfig(new ArrayList(pluginList)), false);

......

return configuration;

}

提取完插件名称后,会去reader目录和writer目录,寻找插件的位置。目前Datax只支持reader和writer插件,因为它只从这两个目录中寻找。如果想自己扩展其他类型插件的话,比如handler类型的, 需要修改parsePluginConfig的代码。每个插件目录会有一个重要的配置文件 plugin.json ,它定义了插件的名称和对应的类,在LoadUtils类加载插件的时候会使用到。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62public static Configuration parsePluginConfig(List wantPluginNames){

Configuration configuration = Configuration.newDefault();

......

// 遍历plugin.reader目录下的文件夹

for (final String each : ConfigParser

.getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {

// 调用 parseOnePluginConfig解析单个plugin配置

Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);

if(eachReaderConfig!=null) {

configuration.merge(eachReaderConfig, true);

complete += 1;

}

}

// 遍历plugin.writer目录下的文件夹

for (final String each : ConfigParser

.getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {

// 调用 parseOnePluginConfig解析单个plugin配置

Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);

if(eachWriterConfig!=null) {

configuration.merge(eachWriterConfig, true);

complete += 1;

}

}

......

return configuration;

}

// 读取plugin目录下的plugin.json 文件

public static Configuration parseOnePluginConfig(final String path, final String type, Set pluginSet, List wantPluginNames){

String filePath = path + File.separator + "plugin.json";

Configuration configuration = Configuration.from(new File(filePath));

String pluginPath = configuration.getString("path");

String pluginName = configuration.getString("name");

if(!pluginSet.contains(pluginName)) {

pluginSet.add(pluginName);

} else {

......

}

//不是想要的插件,就不生成配置,直接返回

if (wantPluginNames != null && wantPluginNames.size() > 0 && !wantPluginNames.contains(pluginName)) {

return null;

}

// plugin.json的path路径,是指插件的jar包。如果没有指定,则默认为和plugin.json文件在同一个目录下

boolean isDefaultPath = StringUtils.isBlank(pluginPath);

if (isDefaultPath) {

configuration.set("path", path);

}

Configuration result = Configuration.newDefault();

// 最后保存在puligin.{type}.{pluginName}路径下

result.set(

String.format("plugin.%s.%s", type, pluginName),

configuration.getInternal());

return result;

}

动态加载插件

插件的加载都是使用ClassLoader动态加载。 为了避免类的冲突,对于每个插件的加载,对应着独立的加载器。加载器由JarLoader实现,插件的加载接口由LoadUtil类负责。当要加载一个插件时,需要实例化一个JarLoader,然后切换thread class loader之后,才加载插件。

JarLoader 类

JarLoader继承URLClassLoader,扩充了可以加载目录的功能。可以从指定的目录下,把传入的路径、及其子路径、以及路径中的jar文件加入到class path。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80public class JarLoader extends URLClassLoader{

public JarLoader(String[] paths){

this(paths, JarLoader.class.getClassLoader());

}

public JarLoader(String[] paths, ClassLoader parent){

// 调用getURLS,获取所有的jar包路径

super(getURLs(paths), parent);

}

// 获取所有的jar包

private static URL[] getURLs(String[] paths) {

// 获取包括子目录的所有目录路径

List dirs = new ArrayList();

for (String path : paths) {

dirs.add(path);

// 获取path目录和其子目录的所有目录路径

JarLoader.collectDirs(path, dirs);

}

// 遍历目录,获取jar包的路径

List urls = new ArrayList();

for (String path : dirs) {

urls.addAll(doGetURLs(path));

}

return urls.toArray(new URL[0]);

}

// 递归的方式,获取所有目录

private static void collectDirs(String path, List collector){

// path为空,终止

if (null == path || StringUtils.isBlank(path)) {

return;

}

// path不为目录,终止

File current = new File(path);

if (!current.exists() || !current.isDirectory()) {

return;

}

// 遍历完子文件,终止

for (File child : current.listFiles()) {

if (!child.isDirectory()) {

continue;

}

collector.add(child.getAbsolutePath());

collectDirs(child.getAbsolutePath(), collector);

}

}

private static List doGetURLs(final String path){

File jarPath = new File(path);

// 只寻找文件以.jar结尾的文件

FileFilter jarFilter = new FileFilter() {

@Override

public boolean accept(File pathname){

return pathname.getName().endsWith(".jar");

}

};

File[] allJars = new File(path).listFiles(jarFilter);

List jarURLs = new ArrayList(allJars.length);

for (int i = 0; i < allJars.length; i++) {

try {

jarURLs.add(allJars[i].toURI().toURL());

} catch (Exception e) {

throw DataXException.asDataXException(

FrameworkErrorCode.PLUGIN_INIT_ERROR,

"系统加载jar包出错", e);

}

}

return jarURLs;

}

}

LoadUtil 类

LoadUtil管理着插件的加载器,调用getJarLoader返回插件对应的加载器。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31public class LoadUtil{

// 加载器的HashMap, Key由插件类型和名称决定, 格式为plugin.{pulginType}.{pluginName}

private static Map jarLoaderCenter = new HashMap();

public static synchronized JarLoader getJarLoader(PluginType pluginType, String pluginName){

Configuration pluginConf = getPluginConf(pluginType, pluginName);

JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,

pluginName));

if (null == jarLoader) {

// 构建加载器JarLoader

// 获取jar所在的目录

String pluginPath = pluginConf.getString("path");

jarLoader = new JarLoader(new String[]{pluginPath});

//添加到HashMap中

jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),

jarLoader);

}

return jarLoader;

}

private static final String pluginTypeNameFormat = "plugin.%s.%s";

// 生成HashMpa的key值

private static String generatePluginKey(PluginType pluginType,

String pluginName){

return String.format(pluginTypeNameFormat, pluginType.toString(),

pluginName);

}

当获取类加载器,就可以调用LoadUtil来加载插件。LoadUtil提供了 loadJobPlugin 和 loadTaskPlugin 两个接口,加载Job 和 Task 的两种插件。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43// 加载Job类型的Plugin

public static AbstractJobPlugin loadJobPlugin(PluginType pluginType, String pluginName){

// 调用loadPluginClass方法,加载插件对应的class

Class extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Job);

// 实例化Plugin,转换为AbstractJobPlugin

AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz.newInstance();

// 设置Job的配置,路径为plugin.{pluginType}.{pluginName}

jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));

return jobPlugin;

}

// 加载Task类型的Plugin

public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType, String pluginName){

// 调用loadPluginClass方法,加载插件对应的class

Class extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Task);

// 实例化Plugin,转换为AbstractTaskPlugin

AbstractTaskPlugin taskPlugin = (AbstracTasktTaskPlugin) clazz.newInstance();

// 设置Task的配置,路径为plugin.{pluginType}.{pluginName}

taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));

}

// 加载插件类

// pluginType 代表插件类型

// pluginName 代表插件名称

// pluginRunType 代表着运行类型,Job或者Task

private static synchronized Class extends AbstractPlugin> loadPluginClass(

PluginType pluginType, String pluginName,

ContainerType pluginRunType) {

// 获取插件配置

Configuration pluginConf = getPluginConf(pluginType, pluginName);

// 获取插件对应的ClassLoader

JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);

try {

// 加载插件的class

return (Class extends AbstractPlugin>) jarLoader

.loadClass(pluginConf.getString("class") + "$"

+ pluginRunType.value());

} catch (Exception e) {

throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);

}

}

切换类加载器

ClassLoaderSwapper类,提供了比较方便的切换接口。

1

2

3

4

5

6

7

8

9// 实例化

ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();

ClassLoader classLoader1 = new URLClassLoader();

// 切换加载器classLoader1

classLoaderSwapper.setCurrentThreadClassLoader(classLoader1);

Class extends MyClass> myClass = classLoader1.loadClass("MyClass");

// 切回加载器

classLoaderSwapper.restoreCurrentThreadClassLoader();

ClassLoaderSwapper的源码比较简单, 它有一个属性storeClassLoader, 用于保存着切换之前的ClassLoader。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24public final class ClassLoaderSwapper{

// 保存切换之前的加载器

private ClassLoader storeClassLoader = null;

public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader){

// 保存切换前的加载器

this.storeClassLoader = Thread.currentThread().getContextClassLoader();

// 切换加载器到classLoader

Thread.currentThread().setContextClassLoader(classLoader);

return this.storeClassLoader;

}

public ClassLoader restoreCurrentThreadClassLoader(){

ClassLoader classLoader = Thread.currentThread()

.getContextClassLoader();

// 切换到原来的加载器

Thread.currentThread().setContextClassLoader(this.storeClassLoader);

// 返回切换之前的类加载器

return classLoader;

}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/394056.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql windows身份验证_SQL Server 2005 怎么就不能用Windows身份验证方式登录呢?

SQL Server 2005 自从装到我的电脑上始终无法使用Windows身份验证的方式登录,由于使用用户名和密码登录还算顺畅,所以一直忽略了这SQL Server 2005 自从装到我的电脑上始终无法使用Windows身份验证的方式登录,由于使用用户名和密码登录还算顺畅,所以一直忽略了这个问题,直到又有…

JavaScript正则表达式快速简单的指南

Interested in learning JavaScript? Get my ebook at jshandbook.com有兴趣学习JavaScript吗&#xff1f; 在jshandbook.com上获取我的电子书 正则表达式简介 (Introduction to Regular Expressions) A regular expression (also called regex for short) is a fast way to w…

leetcode104. 二叉树的最大深度(dfs)

给定一个二叉树&#xff0c;找出其最大深度。二叉树的深度为根节点到最远叶子节点的最长路径上的节点数。说明: 叶子节点是指没有子节点的节点。示例&#xff1a; 给定二叉树 [3,9,20,null,null,15,7]&#xff0c;3/ \9 20/ \15 7 返回它的最大深度 3 。代码 class Soluti…

[解读REST] 3.基于网络应用的架构

链接上文[解读REST] 2.REST用来干什么的&#xff1f;&#xff0c;上文中解释到什么是架构风格和应该以怎样的视角来理解REST&#xff08;Web的架构风格&#xff09;。本篇来介绍一组自洽的术语&#xff0c;用它来描述和解释软件架构&#xff1b;以及列举下对于基于网络的应用来…

js判断对象还是数组

1.对于Javascript 1.8.5&#xff08;ECMAScript 5&#xff09;&#xff0c;变量名字.isArray( )可以实现这个目的 var a[]; var b{}; Array.isArray(a);//true Array.isArray(b)//false 2.如果你只是用typeof来检查该变量&#xff0c;不论是array还是object&#xff0c;都将返回…

mysql 除去列名打印_sql – 使用beeline时避免在列名中打印表名

在beeline中使用hive时使用简单的select查询我想在列名中返回没有表名的表作为默认值.例数据CREATE TABLE IF NOT EXISTS employee ( eid int, name String,salary String, destination String)COMMENT Employee detailsROW FORMAT DELIMITEDFIELDS TERMINATED BY \tLINES TERM…

移动应用程序和网页应用程序_如何开发感觉像本机移动应用程序的渐进式Web应用程序...

移动应用程序和网页应用程序by Samuele Dassatti通过萨穆尔达萨蒂 如何开发感觉像本机移动应用程序的渐进式Web应用程序 (How you can develop Progressive Web Apps that feel like native mobile apps) I’m currently developing a Progressive Web App that will also ser…

leetcode1162. 地图分析(bfs)

你现在手里有一份大小为 N x N 的「地图」&#xff08;网格&#xff09; grid&#xff0c;上面的每个「区域」&#xff08;单元格&#xff09;都用 0 和 1 标记好了。其中 0 代表海洋&#xff0c;1 代表陆地&#xff0c;请你找出一个海洋区域&#xff0c;这个海洋区域到离它最近…

mysql修改root密码的方法

在 Navicat for MySQL 下面直接执行 SET PASSWORD FOR rootlocalhost PASSWORD(newpass); 就可以 方法1&#xff1a; 用SET PASSWORD命令 mysql -u root mysql> SET PASSWORD FOR rootlocalhost PASSWORD(newpass); 方法2&#xff1a;用mysqladmin mysqladmin -u root …

android 上下偏差怎么写_详解 Android 热更新升级如何突破底层结构差异?

知道了 native 替换方式兼容性问题的原因&#xff0c;我们是否有办法寻求一种新的方式&#xff0c;不依赖于 ROM 底层方法结构的实现而达到替换效果呢&#xff1f;我们发现&#xff0c;这样 native 层面替换思路&#xff0c;其实就是替换 ArtMethod 的所有成员。那么&#xff0…

Python3 Flask+nginx+Gunicorn部署(上)

前言&#xff1a;一般在本地运行flask项目通常是直接python3 文件名.py&#xff0c;然后打开&#xff1a;http://127.0.0.1:5000 查看代码结果 这次主要是记录flask在python3 环境结合nginx gunicorn在服务器上进行项目的部署 &#xff08;一&#xff09;运行环境&#xff1a;虚…

NOIP2011 铺地毯

题目描述 为了准备一个独特的颁奖典礼&#xff0c;组织者在会场的一片矩形区域&#xff08;可看做是平面直角坐标系的第一象限&#xff09;铺上一些矩形地毯&#xff0c;一共有n张地毯&#xff0c;编号从 1 到n。现在将这些地毯按照编号从小到大的顺序平行于坐标轴先后铺设&…

java lock可重入_Java源码解析之可重入锁ReentrantLock

本文基于jdk1.8进行分析。ReentrantLock是一个可重入锁&#xff0c;在ConcurrentHashMap中使用了ReentrantLock。首先看一下源码中对ReentrantLock的介绍。如下图。ReentrantLock是一个可重入的排他锁&#xff0c;它和synchronized的方法和代码有着相同的行为和语义&#xff0c…

matlab的qammod函数_基于-MATLAB下的16QAM仿真.doc

1.课程设计目的随着现代通信技术的发展&#xff0c;特别是移动通信技术高速发展&#xff0c;频带利用率问题越来越被人们关注。在频谱资源非常有限的今天&#xff0c;传统通信系统的容量已经不能满足当前用户的要求。正交幅度调制QAM(Quadrature Amplitude Modulation)以其高频…

POJ3264 【RMQ基础题—ST-线段树】

ST算法Code&#xff1a; //#include<bits/stdc.h> #include<cstdio> #include<math.h> #include<iostream> #include<queue> #include<algorithm> #include<string.h> using namespace std; typedef long long LL;const int N5e410;…

leetcode199. 二叉树的右视图(bfs)

给定一棵二叉树&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所能看到的节点值。示例:输入: [1,2,3,null,5,null,4] 输出: [1, 3, 4] 解释:1 <---/ \ 2 3 <---\ \5 4 <---解题思…

开发人员工作周报_如何增加找到开发人员工作的机会

开发人员工作周报In a recent job as a senior developer, I helped interview and hire many of my employer’s development team members. This is a brain dump of my advice based on those interviews.在最近担任高级开发人员的工作中&#xff0c;我帮助面试和雇用了许多…

安全专家教你如何利用Uber系统漏洞无限制的免费乘坐?

本文讲的是安全专家教你如何利用Uber系统漏洞无限制的免费乘坐&#xff1f;&#xff0c;近日&#xff0c;根据外媒报道&#xff0c;美国一名安全研究人员发现Uber上存在一处安全漏洞&#xff0c;允许发现这一漏洞的任何用户在全球范围内免费享受Uber乘车服务。据悉&#xff0c;…

flume介绍

flume 1.flume是什么 Flume:** Flume是Cloudera提供的一个高可用的&#xff0c;高可靠的&#xff0c;分布式的海量日志采集、传输、聚合的系统。** Flume仅仅运行在linux环境下** flume.apache.org(Documentation--Flume User Guide) Flume体系结构(Architecture)&#xff1a; …

threadx 信号量 应用_操作系统及ThreadX简介.ppt

操作系统及ThreadX简介操作系统及ThreadX简介 软件二部 2006.09 主要内容 多任务操作系统概述 ThreadX简介 关于驱动的交流 操作系统概述 什么是操作系统 管理计算机的所有资源&#xff0c;并为应用程序提供服务的最重要的系统软件 操作系统的目的 为用户编程提供简单的接口&am…