1.插件化开发概述
插件化开发模式正在很多编程语言或技术框架中得以广泛的应用实践,比如大家熟悉的jenkins,docker可视化管理平台rancher,以及日常编码使用的编辑器idea,vscode等。
实现服务模块之间解耦的方式有很多,但是插件来说,其解耦的程度似乎更高,而且更灵活,可定制化、个性化更好。
以spring来说,之所以具备如此广泛的生态,与其自身内置的各种可扩展的插件机制是分不开的。spring框架提供了很多基于插件化的扩展点。插件化机制让系统的扩展性得以提升,从而可以丰富系统的周边应用生态
2.插件化开发常见思路
以java为例,这里结合实际经验,整理一些常用的插件化实现思路:
- spi机制;
- 约定配置和目录,利用反射配合实现;
- springboot中的Factories机制;
- java agent(探针)技术;
- spring内置扩展点;
- 第三方插件包,例如:spring-plugin-core;
- spring aop技术;
3.基于AutoService进行组件化开发
使用AutoService+ServiceLoader
,本篇博客主要就是介绍此方案
缺点:使用的反射去实例化对象
优点:易配置,易调试,上手快
3.1. AutoService介绍
Github地址:https://github.com/google/auto/tree/master/service
AutoService是Google开源的用来方便生成符合ServiceLoader规范的开源库,使用非常的简单。官方的介绍是java.util.ServiceLoader 风格的服务提供者的配置/元数据生成器。
翻译成中文就是自动服务
,这个程序能自动做什么?Java 注释处理器和其他系统使用 java.util.ServiceLoader 来注册使用 META-INF 元数据的已知类型的实现。但是,开发人员很容易忘记更新或正确指定服务描述符。
人工维护配置/元数据的过程
什么意思, 就是我们手动进行SPI插件开发的时候, 都需要手动在类加载路径classpath目录创建两级目录META-INF/services
, 然后创建一个以需要扩展的接口的全限定路径名的名称的文件(javax.annotation.processing.Processor
), 然后在文件中写入该接口的实现类的全限定路径名(com.yanyelai.MyProcessor
)。
人工维护配置/元数据的弊端
如果类路径更新了或者接口的名称定义改变了,包名修改了等等,开发任务忘记了更新对应的配置文件,是不是就会发生问题,可能你会说这怎么可能,怎么会犯这种低级错误,不一定的, 团队越大,开发人员的水平也是良莠不齐, 不能保证所有人都能不出错。
AutoService就是用来解决以上问题的,使用了这个AutoService就不同手动创建这个配置文件了, 在插件编译打包会自动生成这个配置数据,不用再手动创建和维护这个配置文件了。
3.2. AutoService使用示例
在插件中引入AutoService服务
<dependency><groupId>com.google.auto.service</groupId><artifactId>auto-service</artifactId><version>1.1.0</version><scope>provided</scope></dependency>
首先定义一个接口
package com.yanyelai;import javax.annotation.processing.Processor;@AutoService(Processor.class)
final class CustomProcessor implements Processor {// …
}
AutoService 将在输出类文件夹中生成文件
META-INF/services/javax.annotation.processing.Processor 。该文件将包含:
com.yanyelai.CustomProcessor
对于 javax.annotation.processing.Processor
,如果此元数据文件包含在 jar 中,并且该 jar 位于 javac 的类路径上,则 javac 将自动加载它,并将其包含在其正常的注解处理环境中。java.util.ServiceLoader
的其他用户可能会出于不同的目的使用基础结构,但此元数据将适当地提供自动加载。
4.Dolphinscheduler中AutoService的实际应用案例
4.1.Task插件中AutoService的实际应用案例解读
海豚调度Dolphinscheduler中使用了插件化开发数据源、注册中心、告警插件及任务插件, 解读一个其他的都是类似的, 我们这里用Task插件
为例来进行说明。
首先,dolphinscheduler的源码中肯定引用了AutoService依赖, 验证一下:
我们一起来看看dolphinscheduler中是如何实现插件加载的
在dolphinscheduler-api
的启动类ApiApplicationServer.java
中进行了任务插件初始化加载,当Spring容器准备时,会触发任务插件安装的方法执行, 源码如下图:
@EventListenerpublic void run(ApplicationReadyEvent readyEvent) {logger.info("Received spring application context ready event will load taskPlugin and write to DB");// install task plugin 安装任务插件,这个方法执行会动态加载所有的任务插件注册到Spring容器taskPluginManager.loadPlugin();for (Map.Entry<String, TaskChannelFactory> entry : taskPluginManager.getTaskChannelFactoryMap().entrySet()) {String taskPluginName = entry.getKey();TaskChannelFactory taskChannelFactory = entry.getValue();List<PluginParams> params = taskChannelFactory.getParams();String paramsJson = PluginParamsTransfer.transferParamsToJson(params);PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson);pluginDao.addOrUpdatePluginDefine(pluginDefine);}}
在dolphinscheduler-service
中的TaskPluginManager
类中看看loadPlugin
方法是如何实现任务插件加载的
/*** 从classpath加载任务插件*/public void loadPlugin() {if (!loadedFlag.compareAndSet(false, true)) {logger.warn("The task plugin has already been loaded");return;}// 创建了PrioritySPIFactory工厂类,加载TaskChannelFactory的实现类就在这里进行PrioritySPIFactory<TaskChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class);for (Map.Entry<String, TaskChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {String factoryName = entry.getKey();TaskChannelFactory factory = entry.getValue();logger.info("Registering task plugin: {} - {}", factoryName, factory.getClass());taskChannelFactoryMap.put(factoryName, factory);taskChannelMap.put(factoryName, factory.create());logger.info("Registered task plugin: {} - {}", factoryName, factory.getClass());}}
在dolphinscheduler-spi
中看看PrioritySPIFactory
的有参构造方法
public PrioritySPIFactory(Class<T> spiClass) {// 这里调用了ServiceLoader.load动态从classpath中加载TaskChannelFactory的实现类,使用反射注入到Spring容器,// 那么肯定所有的TaskChannelFactory的实现类上都加了@AutoService(TaskChannelFactory.class)注解。for (T t : ServiceLoader.load(spiClass)) {if (map.containsKey(t.getIdentify().getName())) {resolveConflict(t);} else {map.put(t.getIdentify().getName(), t);}}}
看到了吧,这里就是用ServiceLoader
类实现TaskChannelFactory
接口实现类的加载。
验证一下所有的TaskChannelFactory
的实现类上都是用了@AutoService(TaskChannelFactory.class)注解
dolphinsscheduler中当前已经提供的TASK的插件列表如下:
TaskChannelFactory
├─ TaskChannelFactory // TaskChannelFactory接口
│ └─ SubProcessTaskChannelFactory // SubProcessTask接口实现工厂类
│ └─ PythonTaskChannelFactory // PythonTask接口实现工厂类
│ └─ SqlTaskChannelFactory // SqlTask接口实现工厂类
│ └─ JupyterTaskChannelFactory // JupyterTask接口实现工厂类
│ └─ DependentTaskChannelFactory // DependentTask接口实现工厂类
│ └─ DataxTaskChannelFactory // DataxTask接口实现工厂类
│ └─ HttpTaskChannelFactory // HttpTask接口实现工厂类
│ └─ PigeonTaskChannelFactory // PigeonTask接口实现工厂类
│ └─ ShellTaskChannelFactory // ShellTask接口实现工厂类
│ └─ ZeppelinTaskChannelFactory // ZeppelinTask接口实现工厂类
│ └─ MlflowTaskChannelFactory // MlflowTask接口实现工厂类
│ └─ DinkyTaskChannelFactory // DinkyTask接口实现工厂类
│ └─ FlinkTaskChannelFactory // FlinkTask接口实现工厂类
│ └─ SparkTaskChannelFactory // SparkTask接口实现工厂类
│ └─ SagemakerTaskChannelFactory // SagemakerTask接口实现工厂类
│ └─ EmrTaskChannelFactory // EmrTask接口实现工厂类
│ └─ K8sTaskChannelFactory // K8sTask接口实现工厂类
│ └─ SeatunnelTaskChannelFactory // SeatunnelTask接口实现工厂类
│ └─ FlinkStreamTaskChannelFactory // FlinkStreamTask接口实现工厂类
│ └─ ConditionsTaskChannelFactory // ConditionsTask接口实现工厂类
│ └─ DvcTaskChannelFactory // DvcTask接口实现工厂类
│ └─ OpenmldbTaskChannelFactory // OpenmldbTask接口实现工厂类
│ └─ ChunJunTaskChannelFactory // ChunJunTask接口实现工厂类
│ └─ SqoopTaskChannelFactory // SqoopTask口实现工厂类
│ └─ DataQualityTaskChannelFactory // DataQualityTask接口实现工厂类
│ └─ BlockingTaskChannelFactory // BlockingTask接口实现工厂类
│ └─ PytorchTaskChannelFactory // PytorchTask接口实现工厂类
│ └─ SwitchTaskChannelFactory // SwitchTask接口实现工厂类
│ └─ HiveCliTaskChannelFactory // HiveCliTask接口实现工厂类
│ └─ MapReduceTaskChannelFactory // MapReduceTask接口实现工厂类
│ └─ ProcedureTaskChannelFactory // ProcedureTask接口实现工厂类
我们找一个TaskChannelFactory
实现类,这里就拿SqlTaskChannelFactory实现类来解释(其他的实现类都是大同小异)
@AutoService(TaskChannelFactory.class)
public class SqlTaskChannelFactory implements TaskChannelFactory {@Overridepublic String getName() {return "SQL";}@Overridepublic List<PluginParams> getParams() {return null;}@Overridepublic TaskChannel create() {return new SqlTaskChannel();}
}
发现了没,实现类都是用@AutoService进行注解,那么打包之后肯定会自动在classpath的META-INF\services
目录下生成一个以接口为名称org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory
的文件,里面的内容就是SqlTaskChannelFactory实现类的全限定路径名称,
SqlTask的插件打成的jar包中也包含了这个元数据文件,如果此元数据文件包含在 jar 中,并且该 jar 位于 javac 的类路径上,则 javac 将自动加载它,并将其包含在其正常的注解处理环境中。java.util.ServiceLoader 的其他用户可能会出于不同的目的使用基础结构,但此元数据将适当地提供自动加载。
通过以上方式, 就实实现了任务插件的自动加载,通过源码解读之后, 是不是发现其实插件化开发也挺简单的, 没想象的那么复杂。
4.2.dollphinscheduler的Task插件开发
以上我们讲述了插件加载的所有过程,下面我们再讲讲怎么进行Task的插件开发, 如果不会, 我们可以找一个现有的插件看看它的实现思路,因为所有的插件的主体框架肯定是一样, 去源码里面验证一下:
dolphinscheduler的task插件都在dolphinscheduler-task-plugin
模块下
我们就用SQL的任务插件来看看,插件应该怎么写
看到没有, 插件的开发是不是很简单, 基本见名知意:
-
SqlTaskChannelFactory
这个就是实现TaskChannelFactory
接口的实现类,最主要的方法肯定包含获取任务类型名称及创建SqlTaskChannel的方法 -
SqlTaskChannel
这个就是实现TaskChannel
接口的实现类,最主要的方法肯定包含创建任务、解析任务参数、获取资源信息(数据源等) -
SqlTask
Sql任务,主要就是围绕Sql类型的任务的处理, 包含SQL任务中参数的获取、SQL任务的处理逻辑、任务的取消、查询和修改的SQL语句的处理逻辑等等, 这块应该是整个自定义插件开发的核心, 其中包含了不同于其他任务插件的逻辑处理,这里基本都是可以定制处理业务的。 -
SqlSplitter
这个应该是进行SQL语句的分割处理的, SQL任务插件应该支持自定义SQL语句执行,所以如果用户自定义输入了多条SQL语句, 肯定需要按照某种特定的规则进行解析处理,然后再执行。 -
SqlBinds
这个应该也很明显, SQL中语句跟输入参数的关系绑定, 然后在任务处理时再处理这种绑定关系,完成参数变量和参数值的替换, 获取到真正需要执行的SQL语句,在进行下一步处理。
通过SQL任务插件
的源码解读, 我们知道如果我需要二开一个自己的任务插件,
首先创建一个插件模块dolphinscheduler-task-demo
,
pom文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<!--~ Licensed to the Apache Software Foundation (ASF) under one or more~ contributor license agreements. See the NOTICE file distributed with~ this work for additional information regarding copyright ownership.~ The ASF licenses this file to You under the Apache License, Version 2.0~ (the "License"); you may not use this file except in compliance with~ the License. You may obtain a copy of the License at~~ http://www.apache.org/licenses/LICENSE-2.0~~ Unless required by applicable law or agreed to in writing, software~ distributed under the License is distributed on an "AS IS" BASIS,~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.~ See the License for the specific language governing permissions and~ limitations under the License.-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.apache.dolphinscheduler</groupId><artifactId>dolphinscheduler-task-plugin</artifactId><version>3.1.5</version></parent><artifactId>dolphinscheduler-task-demo</artifactId><packaging>jar</packaging><dependencies><dependency><groupId>org.apache.dolphinscheduler</groupId><artifactId>dolphinscheduler-spi</artifactId><scope>provided</scope></dependency><dependency><groupId>org.apache.dolphinscheduler</groupId><artifactId>dolphinscheduler-task-api</artifactId><version>${project.version}</version></dependency></dependencies>
</project>
先定义一个实现了TaskChannelFactory
的MyTaskChannelFactory
类, 如下:
@AutoService(TaskChannelFactory.class)
public class MyTaskChannelFactory implements TaskChannelFactory {@Overridepublic String getName() {return "DEMO";}@Overridepublic List<PluginParams> getParams() {return null;}@Overridepublic TaskChannel create() {return new MyTaskChannel();}
}
再定义一个实现了TaskChannel
的MyTaskChannel
类, 如下:
public class MyTaskChannel implements TaskChannel {@Overridepublic void cancelApplication(boolean status) {}@Overridepublic AbstractTask createTask(TaskExecutionContext taskRequest) {return new MyTask(taskRequest);}@Overridepublic AbstractParameters parseParameters(ParametersNode parametersNode) {// 这里就是从任务定义中获取参数并解析方法, 如果我们的任务插件中有一些定制的参数输入要求// 需要自定义一个参数类继承AbstractParameters,然后增加自定义的任务输入参数字段即可return JSONUtils.parseObject(parametersNode.getTaskParams(), MyParameters.class);}@Overridepublic ResourceParametersHelper getResources(String parameters) {// 如果你的参数是字符串,可以使用下面的这种方式进行解析,然后在解析出来的对象中获取属性信息return JSONUtils.parseObject(parameters, MyParameters.class).getResources();}}
再定义一个实现了org.apache.dolphinscheduler.plugin.task.api.AbstractTask
的MyTask
类,如果你的自定义任务需要提交给Yarn去进行资源调度,那个则需要实现 org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask
。以下三个方法必须重写,其他的集成方法可以按需重写。
public class MyTask implements AbstractTask{public abstract void handle(TaskCallBack taskCallBack) throws TaskException {// 任务的逻辑处理}public abstract void cancel() throws TaskException {// 任务的取消}public abstract AbstractParameters getParameters() {// 任务的参数获取};}
然后在任务处理的方法中如果需要一些额外的工具方法, 可以创建一些辅助工具类,这样基本一个组件就开发好了。然后前端根据我们自定义组件的输入参数要求进行前端页面的开发然后对接调试就可以了。