Kettle Local
- 📚 前言
- ⁉️问题记录
- ❓问题一:Database type not found!…database type with plugin id [Oracle] couldn't be found!
- ❕原因:没有初始化Kettle环境
- ❗解决:添加监听器,进行Kettle环境初始化
- ❓问题二:加解密插件加载失败空指针:Encr.decryptPasswordOptionallyEncrypted( XMLHandler.getTagValue( con, "password" ) )
- ❕原因:加解密插件没有成功加载
- ❗解决:添加kettle-password-encoder-plugins.xml
- ❓问题三:steps空指针`Request processing failed; nested exception is java.lang.NullPointerException`
- ❕原因:缺少预处理代码
- ❗解决:添加预处理代码
- 📚成功运行
- 📚其它
- 📗Kettle客户端类型
- 📗Kettle执行前准备转换方法prepareExecution
🔼上一集:Kettle Local引擎使用记录(一)
📚 前言
书接上文,继续解决Kettle Local
运行错误问题
⁉️问题记录
❓问题一:Database type not found!…database type with plugin id [Oracle] couldn’t be found!
❕原因:没有初始化Kettle环境
下面是spoon
的启动类的main
方法(对应源码下载可以参考PDI/Kettle-9源码下载及编译),其中主要是对kettle环境进行初始化、加载各种插件等
-
异步初始化:
Future<KettleException>
对象用于异步执行初始化任务。executor.submit(new Callable<KettleException>() {...})
这行代码提交了一个Callable
任务给线程池执行器,当该任务完成后,可以通过Future
获取到结果(这里是可能抛出的KettleException
)。 -
UI 插件对象类型注册:
registerUIPluginObjectTypes()
方法负责注册与用户界面相关的插件对象类型,确保在 Spoon UI 中能够显示和处理这些类型的对象。(针对spoon图形界面,二开其实不需要) -
设置客户端环境:
KettleClientEnvironment.getInstance().setClient(KettleClientEnvironment.ClientType.SPOON);
这行代码将当前运行环境设置为 Spoon 客户端类型,即 PDI 的图形界面工具。 -
初始化 Kettle 环境:
KettleEnvironment.init();
是整个 Kettle 初始化的关键步骤,它会加载所有核心库和插件,建立必要的运行环境。如果此步骤失败(例如因为某个数据库驱动插件未找到或加载失败),则会抛出KettleException
。
使用的mysql
,报错oracle
,因为插件加载失败,默认是oracle
❗解决:添加监听器,进行Kettle环境初始化
直接从data-integration
项目拷贝KettleStarterListener
(参照的就是Spoon.java
),如下:
package com.renxiaozhao.api;import org.pentaho.di.core.KettleClientEnvironment;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;/*** kettle启动监听器**/
@Component
public class KettleStarterListener implements ServletContextListener {private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KettleStarterListener.class);@Overridepublic void contextInitialized(ServletContextEvent sce) {System.setProperty("KETTLE_CONTEXT_PATH", sce.getServletContext().getContextPath());/** The following lines are from Spoon.main* because they are application-wide context.*/ExecutorService executor = Executors.newCachedThreadPool();Future<KettleException> pluginRegistryFuture = executor.submit(new Callable<KettleException>() {@Overridepublic KettleException call() throws Exception {KettleClientEnvironment.getInstance().setClient(KettleClientEnvironment.ClientType.SPOON);try {logger.info("开始加载kettle环境信息,KettleEnvironment.init(false)");KettleEnvironment.init(false);logger.info("加载kettle环境信息完成。");} catch (KettleException e) {e.printStackTrace();}return null;}});KettleException registryException;try {registryException = pluginRegistryFuture.get();// 关闭线程池executor.shutdown();if (registryException != null) {throw registryException;}} catch (Throwable t) {t.printStackTrace();}}@Overridepublic void contextDestroyed(ServletContextEvent sce) {//shutdownKettleEnvironment.shutdown();}
}
❓问题二:加解密插件加载失败空指针:Encr.decryptPasswordOptionallyEncrypted( XMLHandler.getTagValue( con, “password” ) )
2024-01-08 14:26:23.437 [qtp571696027-84] WARN org.eclipse.jetty.server.HttpChannel -/renxiaozhao/kettle/run
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.pentaho.di.core.exception.KettleXMLException:
错误从XML文件读取转换错误从XML文件读取转换Unable to load database connection info from XML nodeat java.lang.Thread.run (Thread.java:748)at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run (QueuedThreadPool.java:683)at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob (QueuedThreadPool.java:765)at org.eclipse.jetty.io.ChannelEndPoint$2.run (ChannelEndPoint.java:118)at org.eclipse.jetty.io.FillInterest.fillable (FillInterest.java:103)at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded (AbstractConnection.java:305)at org.eclipse.jetty.server.HttpConnection.onFillable (HttpConnection.java:260)at org.eclipse.jetty.server.HttpChannel.handle (HttpChannel.java:364)at org.eclipse.jetty.server.Server.handle (Server.java:502)at org.eclipse.jetty.server.handler.HandlerWrapper.handle (HandlerWrapper.java:132)at org.eclipse.jetty.server.handler.ScopedHandler.handle (ScopedHandler.java:144)at org.eclipse.jetty.server.handler.ContextHandler.doScope (ContextHandler.java:1247)at org.eclipse.jetty.server.handler.ScopedHandler.nextScope (ScopedHandler.java:201)at org.eclipse.jetty.server.session.SessionHandler.doScope (SessionHandler.java:1557)at org.eclipse.jetty.servlet.ServletHandler.doScope (ServletHandler.java:480)at org.eclipse.jetty.server.handler.ScopedHandler.nextScope (ScopedHandler.java:203)at org.eclipse.jetty.server.handler.ContextHandler.doHandle (ContextHandler.java:1345)at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle (ScopedHandler.java:255)at org.eclipse.jetty.server.session.SessionHandler.doHandle (SessionHandler.java:1588)at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle (ScopedHandler.java:257)at org.eclipse.jetty.server.handler.HandlerWrapper.handle (HandlerWrapper.java:132)at org.eclipse.jetty.security.SecurityHandler.handle (SecurityHandler.java:548)at org.eclipse.jetty.server.handler.ScopedHandler.handle (ScopedHandler.java:146)at org.eclipse.jetty.servlet.ServletHandler.doHandle (ServletHandler.java:540)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal (CharacterEncodingFilter.java:200)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal (HiddenHttpMethodFilter.java:93)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)at org.springframework.web.filter.FormContentFilter.doFilterInternal (FormContentFilter.java:92)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)at org.springframework.web.filter.RequestContextFilter.doFilterInternal (RequestContextFilter.java:99)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1623)at org.eclipse.jetty.servlet.ServletHolder.handle (ServletHolder.java:867)at javax.servlet.http.HttpServlet.service (HttpServlet.java:750)at org.springframework.web.servlet.FrameworkServlet.service (FrameworkServlet.java:882)at javax.servlet.http.HttpServlet.service (HttpServlet.java:665)at org.springframework.web.servlet.FrameworkServlet.doPost (FrameworkServlet.java:908)at org.springframework.web.servlet.FrameworkServlet.processRequest (FrameworkServlet.java:1005)at org.springframework.web.servlet.DispatcherServlet.doService (DispatcherServlet.java:942)at org.springframework.web.servlet.DispatcherServlet.doDispatch (DispatcherServlet.java:1038)at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle (AbstractHandlerMethodAdapter.java:87)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal (RequestMappingHandlerAdapter.java:800)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod (RequestMappingHandlerAdapter.java:895)at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle (ServletInvocableHandlerMethod.java:102)at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest (InvocableHandlerMethod.java:138)at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke (InvocableHandlerMethod.java:189)at java.lang.reflect.Method.invoke (Method.java:498)at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)at sun.reflect.NativeMethodAccessorImpl.invoke0 (NativeMethodAccessorImpl.java:-2)at com.renxiaozhao.api.controller.PdiUseDemoController.executeJob (PdiUseDemoController.java:33)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$EnhancerBySpringCGLIB$$f64c592d.executeJob (<generated>:-1)at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept (CglibAopProxy.java:684)at org.springframework.cglib.proxy.MethodProxy.invoke (MethodProxy.java:218)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$FastClassBySpringCGLIB$$56121f70.invoke (<generated>:-1)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.executeJob (PdiUseDemoServiceImpl.java:37)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.execute (PdiUseDemoServiceImpl.java:47)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.buildTransMeta (PdiUseDemoServiceImpl.java:102)at org.pentaho.di.trans.TransMeta.loadXML (TransMeta.java:3030)at org.pentaho.di.core.database.DatabaseMeta.<init> (DatabaseMeta.java:1009)at org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted (Encr.java:148)...at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest (InvocableHandlerMethod.java:138)at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke (InvocableHandlerMethod.java:189)at java.lang.reflect.Method.invoke (Method.java:498)at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)at sun.reflect.NativeMethodAccessorImpl.invoke0 (NativeMethodAccessorImpl.java:-2)at com.renxiaozhao.api.controller.PdiUseDemoController.executeJob (PdiUseDemoController.java:33)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$EnhancerBySpringCGLIB$$f64c592d.executeJob (<generated>:-1)at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept (CglibAopProxy.java:684)at org.springframework.cglib.proxy.MethodProxy.invoke (MethodProxy.java:218)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$FastClassBySpringCGLIB$$56121f70.invoke (<generated>:-1)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.executeJob (PdiUseDemoServiceImpl.java:37)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.execute (PdiUseDemoServiceImpl.java:47)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.buildTransMeta (PdiUseDemoServiceImpl.java:102)at org.pentaho.di.trans.TransMeta.loadXML (TransMeta.java:3030)at org.pentaho.di.core.database.DatabaseMeta.<init> (DatabaseMeta.java:1009)at org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted (Encr.java:148)at org.pentaho.di.core.database.DatabaseMeta.<init>(DatabaseMeta.java:1030)at org.pentaho.di.trans.TransMeta.loadXML(TransMeta.java:3030)... 63 common frames omitted
Caused by: java.lang.NullPointerException: nullat org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted(Encr.java:148)at org.pentaho.di.core.database.DatabaseMeta.<init>(DatabaseMeta.java:1009)... 64 common frames omitted
❕原因:加解密插件没有成功加载
博主这里使用的是kettle9.2
,开源工具使用的是kettle8
,对比发现9.2
多了一个pentaho-encryption-support
插件,需要加载该插件
❗解决:添加kettle-password-encoder-plugins.xml
kettle-password-encoder-plugins.xml
配置文件,是对PDI/Kettle
中用于密码编码器(password encoder
)的插件进行定义和配置,确保文件位于类路径(classpath
)下,这样系统才能够在启动时识别并加载这些插件
<password-encoder-plugins><password-encoder-plugin id="Kettle"><description>Kettle Password Encoder</description><classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname></password-encoder-plugin></password-encoder-plugins>
❓问题三:steps空指针Request processing failed; nested exception is java.lang.NullPointerException
2024/01/08 15:32:19 - 123 - 转换已经从资源库预先载入.
2024-01-08 15:32:19.392 [qtp1772102816-79] INFO org.pentaho.di.trans.Trans -[D:\tmp\test\test.xml] 转换已经从资源库预先载入.
2024/01/08 15:32:19 - 123 - 需要运行的步骤数 : 2 , 节点数 : 1
2024-01-08 15:32:19.393 [qtp1772102816-79] WARN org.eclipse.jetty.server.HttpChannel -/renxiaozhao/kettle/run
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.NullPointerExceptionat org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1013)at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908)at javax.servlet.http.HttpServlet.service(HttpServlet.java:665)at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)at javax.servlet.http.HttpServlet.service(HttpServlet.java:750)at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:867)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623)at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146)at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257)at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1588)at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1557)at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)at org.eclipse.jetty.server.Server.handle(Server.java:502)at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:364)at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765)at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: nullat org.pentaho.di.trans.Trans.startThreads(Trans.java:1329)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.execute(PdiUseDemoServiceImpl.java:80)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.executeJob(PdiUseDemoServiceImpl.java:37)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$FastClassBySpringCGLIB$$56121f70.invoke(<generated>)at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:684)at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$EnhancerBySpringCGLIB$$3379b89f.executeJob(<generated>)at com.renxiaozhao.api.controller.PdiUseDemoController.executeJob(PdiUseDemoController.java:33)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:800)at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)... 42 common frames omitted
❕原因:缺少预处理代码
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/3158bbfd7e834c31898fcf4f1cf6a11d.png)❗解决:添加预处理代码
开源项目中有的,以为没用,就没有加进来
package com.renxiaozhao.service.impl;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.renxiaozhao.bean.entity.SportEntity;
import com.renxiaozhao.dao.mapper.SportMapper;
import com.renxiaozhao.service.inf.PdiUseDemoService;
import com.renxiaozhao.service.util.JSONLinkedObject;
import com.renxiaozhao.service.util.XML;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.exception.KettleMissingPluginsException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.logging.LoggingObjectType;
import org.pentaho.di.core.logging.SimpleLoggingObject;
import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransAdapter;
import org.pentaho.di.trans.TransExecutionConfiguration;
import org.pentaho.di.trans.TransMeta;
import org.springframework.stereotype.Service;
import org.w3c.dom.Document;import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@Slf4j
@Service
public class PdiUseDemoServiceImpl extends ServiceImpl<SportMapper,SportEntity> implements PdiUseDemoService {@Overridepublic void executeJob(String jobJson) throws Exception {execute(jobJson);}@Overridepublic void executeJobByXml(String jobXml) throws Exception {execute(jobXml);}private void execute(String jobJson) throws Exception {// 构建TransMeta 对象TransMeta transMeta = buildTransMeta(jobJson);TransExecutionConfiguration executionConfiguration = new TransExecutionConfiguration();// 设置默认值以便运行配置可以正确设置executionConfiguration.setExecutingLocally(true);executionConfiguration.setExecutingRemotely(false);executionConfiguration.setExecutingClustered(false);// 不启用安全模式executionConfiguration.setSafeModeEnabled(true);executionConfiguration.getUsedVariables(transMeta);executionConfiguration.setLogLevel(LogLevel.DEBUG);// 默认设置本地引擎执行executionConfiguration.setRunConfiguration("Pentaho local");//设置命令参数executionConfiguration.setVariables(new HashMap<>());// 创建transTrans trans = new Trans(transMeta);String spoonLogObjectId = UUID.randomUUID().toString();SimpleLoggingObject spoonLoggingObject = new SimpleLoggingObject(Thread.currentThread().getName() + "-" + Thread.currentThread().getId(), LoggingObjectType.SPOON, null);spoonLoggingObject.setContainerObjectId(spoonLogObjectId);spoonLoggingObject.setLogLevel(executionConfiguration.getLogLevel());trans.setParent(spoonLoggingObject);trans.setLogLevel(executionConfiguration.getLogLevel());trans.setReplayDate(executionConfiguration.getReplayDate());trans.setRepository(executionConfiguration.getRepository());trans.setMonitored(false);if (trans != null) {Map<String, String> arguments = executionConfiguration.getArguments();final String[] args;if (arguments != null) {args = convertArguments(arguments);} else {args = null;}trans.getLogChannel().logBasic("正在启动项目");trans.setSafeModeEnabled(executionConfiguration.isSafeModeEnabled());trans.setGatheringMetrics(executionConfiguration.isGatheringMetrics());// 预处理脚本trans.prepareExecution(args);}// 启动转换trans.addTransListener(new TransAdapter() {@Overridepublic void transFinished(Trans trans) {log.info("项目执行完成");}});trans.startThreads();}private String[] convertArguments(Map<String, String> arguments) {String[] argumentNames = arguments.keySet().toArray(new String[0]);Arrays.sort(argumentNames);String[] args = new String[argumentNames.length];for (int i = 0; i < args.length; i++) {String argumentName = argumentNames[i];args[i] = arguments.get(argumentName);}return args;}public TransMeta buildTransMeta(String jobJson) throws IOException, KettleXMLException, KettleMissingPluginsException {Document document;//json转xmlif (!jobJson.startsWith("<?xml")) {// json转xmljobJson = StringEscapeUtils.unescapeXml(jobJson);jobJson = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + XML.toString(new JSONLinkedObject(jobJson));log.info("json转换成xml,转换后的xml:{}", jobJson);}// 写到临时目录File outFile = new File("D:\\tmp\\test", "test.xml");FileUtils.writeStringToFile(outFile, jobJson);// 加载xmldocument = XMLHandler.loadXMLString(jobJson);TransMeta transMeta = new TransMeta();transMeta.loadXML(document.getDocumentElement(), outFile.getPath(), null, null, true, new Variables(),(message, rememberText, rememberPropertyName) -> {// Yes means: overwritereturn true;});if (transMeta.hasMissingPlugins()) {log.info("【{}】缺少执行插件。", jobJson);}return transMeta;}
}
📚成功运行
src1
中数据成功同步到tgt
中
📚其它
📗Kettle客户端类型
- SPOON:代表
Spoon GUI
工具,这是PDI
的图形界面设计工具,用户可以在这里创建、编辑和运行转换与作业。 - PAN:表示
Pan
运行时环境,Pan
是一个命令行工具,用于执行已经设计好的转换(transformation
)。 - KITCHEN:代表
Kitchen
运行时环境,Kitchen
是另一个命令行工具,用于调度和执行作业(job
)。 - CARTE:指的是
Carte
服务,这是一个轻量级的web
应用服务器,使得用户可以通过web
界面来管理和执行转换与作业,并提供远程调用接口。 - DI_SERVER:代表
Pentaho Data Integration Server
或者现代版的Pentaho Server
(可以参照Linux部署Kettle(pentaho-server-ce-9.4.0.0-343)记录/配置MySQL存储),这是一个完整的数据集成平台,提供了集中式的作业和转换管理、调度以及监控功能。 - SCALE:在某些版本或特定上下文中可能指的是
Pentaho
的分布式处理组件Scale
,它允许将大数据工作负载分布到多台机器上进行并行处理。 - OTHER:其他未知或未明确指定的客户端类型,可能用于扩展或者适应未来可能出现的新应用场景。
📗Kettle执行前准备转换方法prepareExecution
方法逻辑很长,一看就是很重要的方法😂:
- 设置参数和变量:接受一个字符串数组
arguments
作为输入参数,这些参数通常是在命令行或通过程序调用时传递给转换的外部参数,然后将这些参数与转换内部定义的参数进行绑定。 - 初始化状态标志:
- 设置
preparing
标志为true
,表明当前正处于转换执行前的准备阶段。 - 清空(或设置为
null
)startDate
变量,表示此次执行尚未开始。 - 设置
running
标志为false
,因为此时转换尚未实际运行。
- 设置
- 准备步骤和跳转(
hops
):- 针对转换中的每个步骤(
Step
),根据其配置信息创建并初始化对应的StepMetaInterface
和StepInterface
实例,并将其添加到内部数据结构中以便后续管理。 - 检查并确保转换内的所有步骤间跳转关系(
hops
)正确无误,为执行过程中的数据流做好准备。
- 针对转换中的每个步骤(
- 日志记录:调用
log.snap(Metrics.METRIC_TRANSFORMATION_EXECUTION_START)
等日志操作,记录转换执行开始的时间点以及其他性能指标,以供监控和分析使用。
/*** Prepares the transformation for execution. This includes setting the arguments and parameters as well as preparing* and tracking the steps and hops in the transformation.** @param arguments the arguments to use for this transformation* @throws KettleException in case the transformation could not be prepared (initialized)*/
public void prepareExecution( String[] arguments ) throws KettleException {setPreparing( true );startDate = null;setRunning( false );log.snap( Metrics.METRIC_TRANSFORMATION_EXECUTION_START );log.snap( Metrics.METRIC_TRANSFORMATION_INIT_START );ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.TransformationPrepareExecution.id, this );checkCompatibility();// Set the arguments on the transformation...//if ( arguments != null ) {setArguments( arguments );}activateParameters();transMeta.activateParameters();ConnectionUtil.init( transMeta );if ( transMeta.getName() == null ) {if ( transMeta.getFilename() != null ) {log.logBasic( BaseMessages.getString( PKG, "Trans.Log.DispacthingStartedForFilename", transMeta.getFilename() ) );}} else {log.logBasic( BaseMessages.getString( PKG, "Trans.Log.DispacthingStartedForTransformation", transMeta.getName() ) );}if ( getArguments() != null ) {if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.NumberOfArgumentsDetected", String.valueOf(getArguments().length ) ) );}}if ( isSafeModeEnabled() ) {if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.SafeModeIsEnabled", transMeta.getName() ) );}}if ( getReplayDate() != null ) {SimpleDateFormat df = new SimpleDateFormat( REPLAY_DATE_FORMAT );log.logBasic( BaseMessages.getString( PKG, "Trans.Log.ThisIsAReplayTransformation" ) + df.format(getReplayDate() ) );} else {if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.ThisIsNotAReplayTransformation" ) );}}// setInternalKettleVariables(this); --> Let's not do this, when running// without file, for example remote, it spoils the fun// extra check to see if the servlet print writer has some value in case// folks want to test it locally...//if ( servletPrintWriter == null ) {String encoding = System.getProperty( "KETTLE_DEFAULT_SERVLET_ENCODING", null );if ( encoding == null ) {servletPrintWriter = new PrintWriter( new OutputStreamWriter( System.out ) );} else {try {servletPrintWriter = new PrintWriter( new OutputStreamWriter( System.out, encoding ) );} catch ( UnsupportedEncodingException ex ) {servletPrintWriter = new PrintWriter( new OutputStreamWriter( System.out ) );}}}// Keep track of all the row sets and allocated steps//steps = new ArrayList<>();rowsets = new ArrayList<>();List<StepMeta> hopsteps = transMeta.getTransHopSteps( false );if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.FoundDefferentSteps", String.valueOf( hopsteps.size() ) ) );log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocatingRowsets" ) );}// First allocate all the rowsets required!// Note that a mapping doesn't receive ANY input or output rowsets...//for ( int i = 0; i < hopsteps.size(); i++ ) {StepMeta thisStep = hopsteps.get( i );if ( thisStep.isMapping() ) {continue; // handled and allocated by the mapping step itself.}if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocateingRowsetsForStep", String.valueOf( i ),thisStep.getName() ) );}List<StepMeta> nextSteps = transMeta.findNextSteps( thisStep );int nrTargets = nextSteps.size();for ( int n = 0; n < nrTargets; n++ ) {// What's the next step?StepMeta nextStep = nextSteps.get( n );if ( nextStep.isMapping() ) {continue; // handled and allocated by the mapping step itself.}// How many times do we start the source step?int thisCopies = thisStep.getCopies();if ( thisCopies < 0 ) {// This can only happen if a variable is used that didn't resolve to a positive integer value//throw new KettleException( BaseMessages.getString( PKG, "Trans.Log.StepCopiesNotCorrectlyDefined", thisStep.getName() ) );}// How many times do we start the target step?int nextCopies = nextStep.getCopies();// Are we re-partitioning?boolean repartitioning;if ( thisStep.isPartitioned() ) {repartitioning = !thisStep.getStepPartitioningMeta().equals( nextStep.getStepPartitioningMeta() );} else {repartitioning = nextStep.isPartitioned();}int nrCopies;if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.copiesInfo", String.valueOf( thisCopies ), String.valueOf( nextCopies ) ) );}int dispatchType;if ( thisCopies == 1 && nextCopies == 1 ) {dispatchType = TYPE_DISP_1_1;nrCopies = 1;} else if ( thisCopies == 1 && nextCopies > 1 ) {dispatchType = TYPE_DISP_1_N;nrCopies = nextCopies;} else if ( thisCopies > 1 && nextCopies == 1 ) {dispatchType = TYPE_DISP_N_1;nrCopies = thisCopies;} else if ( thisCopies == nextCopies && !repartitioning ) {dispatchType = TYPE_DISP_N_N;nrCopies = nextCopies;} else {// > 1!dispatchType = TYPE_DISP_N_M;nrCopies = nextCopies;} // Allocate a rowset for each destination step// Allocate the rowsets//if ( dispatchType != TYPE_DISP_N_M ) {for ( int c = 0; c < nrCopies; c++ ) {RowSet rowSet;switch ( transMeta.getTransformationType() ) {case Normal:// This is a temporary patch until the batching rowset has proven// to be working in all situations.// Currently there are stalling problems when dealing with small// amounts of rows.//Boolean batchingRowSet =ValueMetaString.convertStringToBoolean( System.getProperty( Const.KETTLE_BATCHING_ROWSET ) );if ( batchingRowSet != null && batchingRowSet.booleanValue() ) {rowSet = new BlockingBatchingRowSet( transMeta.getSizeRowset() );} else {rowSet = new BlockingRowSet( transMeta.getSizeRowset() );}break;case SerialSingleThreaded:rowSet = new SingleRowRowSet();break;case SingleThreaded:rowSet = new QueueRowSet();break;default:throw new KettleException( "Unhandled transformation type: " + transMeta.getTransformationType() );}switch ( dispatchType ) {case TYPE_DISP_1_1:rowSet.setThreadNameFromToCopy( thisStep.getName(), 0, nextStep.getName(), 0 );break;case TYPE_DISP_1_N:rowSet.setThreadNameFromToCopy( thisStep.getName(), 0, nextStep.getName(), c );break;case TYPE_DISP_N_1:rowSet.setThreadNameFromToCopy( thisStep.getName(), c, nextStep.getName(), 0 );break;case TYPE_DISP_N_N:rowSet.setThreadNameFromToCopy( thisStep.getName(), c, nextStep.getName(), c );break;default:break;}rowsets.add( rowSet );if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.TransformationAllocatedNewRowset", rowSet.toString() ) );}}} else {// For each N source steps we have M target steps//// From each input step we go to all output steps.// This allows maximum flexibility for re-partitioning,// distribution...for ( int s = 0; s < thisCopies; s++ ) {for ( int t = 0; t < nextCopies; t++ ) {BlockingRowSet rowSet = new BlockingRowSet( transMeta.getSizeRowset() );rowSet.setThreadNameFromToCopy( thisStep.getName(), s, nextStep.getName(), t );rowsets.add( rowSet );if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.TransformationAllocatedNewRowset", rowSet.toString() ) );}}}}}log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocatedRowsets", String.valueOf( rowsets.size() ),String.valueOf( i ), thisStep.getName() ) + " " );}if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocatingStepsAndStepData" ) );}// Allocate the steps & the data...//for ( int i = 0; i < hopsteps.size(); i++ ) {StepMeta stepMeta = hopsteps.get( i );String stepid = stepMeta.getStepID();if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.TransformationIsToAllocateStep", stepMeta.getName(),stepid ) );}// How many copies are launched of this step?int nrCopies = stepMeta.getCopies();if ( log.isDebug() ) {log.logDebug( BaseMessages.getString( PKG, "Trans.Log.StepHasNumberRowCopies", String.valueOf( nrCopies ) ) );}// At least run once...for ( int c = 0; c < nrCopies; c++ ) {// Make sure we haven't started it yet!if ( !hasStepStarted( stepMeta.getName(), c ) ) {StepMetaDataCombi combi = new StepMetaDataCombi();combi.stepname = stepMeta.getName();combi.copy = c;// The meta-datacombi.stepMeta = stepMeta;combi.meta = stepMeta.getStepMetaInterface();// Allocate the step dataStepDataInterface data = combi.meta.getStepData();combi.data = data;// Allocate the stepStepInterface step = combi.meta.getStep( stepMeta, data, c, transMeta, this );// Copy the variables of the transformation to the step...// don't share. Each copy of the step has its own variables.//step.initializeVariablesFrom( this );step.setUsingThreadPriorityManagment( transMeta.isUsingThreadPriorityManagment() );// Pass the connected repository & metaStore to the steps runtime//step.setRepository( repository );step.setMetaStore( metaStore );// If the step is partitioned, set the partitioning ID and some other// things as well...if ( stepMeta.isPartitioned() ) {List<String> partitionIDs = stepMeta.getStepPartitioningMeta().getPartitionSchema().getPartitionIDs();if ( partitionIDs != null && !partitionIDs.isEmpty() ) {step.setPartitionID( partitionIDs.get( c ) ); // Pass the partition ID// to the step}}// Save the step toocombi.step = step;// Pass logging level and metrics gathering down to the step level.// /if ( combi.step instanceof LoggingObjectInterface ) {LogChannelInterface logChannel = combi.step.getLogChannel();logChannel.setLogLevel( logLevel );logChannel.setGatheringMetrics( log.isGatheringMetrics() );}// Add to the bunch...steps.add( combi );if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.TransformationHasAllocatedANewStep", stepMeta.getName(), String.valueOf( c ) ) );}}}}// Now we need to verify if certain rowsets are not meant to be for error// handling...// Loop over the steps and for every step verify the output rowsets// If a rowset is going to a target step in the steps error handling// metadata, set it to the errorRowSet.// The input rowsets are already in place, so the next step just accepts the// rows.// Metadata wise we need to do the same trick in TransMeta//for ( int s = 0; s < steps.size(); s++ ) {StepMetaDataCombi combi = steps.get( s );if ( combi.stepMeta.isDoingErrorHandling() ) {combi.step.identifyErrorOutput();}}// Now (optionally) write start log record!// Make sure we synchronize appropriately to avoid duplicate batch IDs.//Object syncObject = this;if ( parentJob != null ) {syncObject = parentJob; // parallel execution in a job}if ( parentTrans != null ) {syncObject = parentTrans; // multiple sub-transformations}synchronized ( syncObject ) {calculateBatchIdAndDateRange();beginProcessing();}// Set the partition-to-rowset mapping//for ( int i = 0; i < steps.size(); i++ ) {StepMetaDataCombi sid = steps.get( i );StepMeta stepMeta = sid.stepMeta;StepInterface baseStep = sid.step;baseStep.setPartitioned( stepMeta.isPartitioned() );// Now let's take a look at the source and target relation//// If this source step is not partitioned, and the target step is: it// means we need to re-partition the incoming data.// If both steps are partitioned on the same method and schema, we don't// need to re-partition// If both steps are partitioned on a different method or schema, we need// to re-partition as well.// If both steps are not partitioned, we don't need to re-partition//boolean isThisPartitioned = stepMeta.isPartitioned();PartitionSchema thisPartitionSchema = null;if ( isThisPartitioned ) {thisPartitionSchema = stepMeta.getStepPartitioningMeta().getPartitionSchema();}boolean isNextPartitioned = false;StepPartitioningMeta nextStepPartitioningMeta = null;PartitionSchema nextPartitionSchema = null;List<StepMeta> nextSteps = transMeta.findNextSteps( stepMeta );int nrNext = nextSteps.size();for ( int p = 0; p < nrNext; p++ ) {StepMeta nextStep = nextSteps.get( p );if ( nextStep.isPartitioned() ) {isNextPartitioned = true;nextStepPartitioningMeta = nextStep.getStepPartitioningMeta();nextPartitionSchema = nextStepPartitioningMeta.getPartitionSchema();}}baseStep.setRepartitioning( StepPartitioningMeta.PARTITIONING_METHOD_NONE );// If the next step is partitioned differently, set re-partitioning, when// running locally.//if ( ( !isThisPartitioned && isNextPartitioned ) || ( isThisPartitioned && isNextPartitioned&& !thisPartitionSchema.equals( nextPartitionSchema ) ) ) {baseStep.setRepartitioning( nextStepPartitioningMeta.getMethodType() );}// For partitioning to a set of remove steps (repartitioning from a master// to a set or remote output steps)//StepPartitioningMeta targetStepPartitioningMeta = baseStep.getStepMeta().getTargetStepPartitioningMeta();if ( targetStepPartitioningMeta != null ) {baseStep.setRepartitioning( targetStepPartitioningMeta.getMethodType() );}}setPreparing( false );setInitializing( true );// Do a topology sort... Over 150 step (copies) things might be slowing down too much.//if ( isMonitored() && steps.size() < 150 ) {doTopologySortOfSteps();}if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.InitialisingSteps", String.valueOf( steps.size() ) ) );}StepInitThread[] initThreads = new StepInitThread[ steps.size() ];Thread[] threads = new Thread[ steps.size() ];// Initialize all the threads...//for ( int i = 0; i < steps.size(); i++ ) {final StepMetaDataCombi sid = steps.get( i );// Do the init code in the background!// Init all steps at once, but ALL steps need to finish before we can// continue properly!//initThreads[ i ] = new StepInitThread( sid, log );// Put it in a separate thread!//threads[ i ] = new Thread( initThreads[ i ] );threads[ i ].setName( "init of " + sid.stepname + "." + sid.copy + " (" + threads[ i ].getName() + ")" );ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.StepBeforeInitialize.id, initThreads[ i ] );threads[ i ].start();}for ( int i = 0; i < threads.length; i++ ) {try {threads[ i ].join();ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.StepAfterInitialize.id, initThreads[ i ] );} catch ( Exception ex ) {log.logError( "Error with init thread: " + ex.getMessage(), ex.getMessage() );log.logError( Const.getStackTracker( ex ) );}}setInitializing( false );boolean ok = true;// All step are initialized now: see if there was one that didn't do it// correctly!//for ( int i = 0; i < initThreads.length; i++ ) {StepMetaDataCombi combi = initThreads[ i ].getCombi();if ( !initThreads[ i ].isOk() ) {log.logError( BaseMessages.getString( PKG, "Trans.Log.StepFailedToInit", combi.stepname + "." + combi.copy ) );combi.data.setStatus( StepExecutionStatus.STATUS_STOPPED );ok = false;} else {combi.data.setStatus( StepExecutionStatus.STATUS_IDLE );if ( log.isDetailed() ) {log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.StepInitialized", combi.stepname + "."+ combi.copy ) );}}}if ( !ok ) {// One or more steps failed on initialization.// Transformation is now stopped.setStopped( true );// Halt the other threads as well, signal end-of-the line to the outside world...// Also explicitly call dispose() to clean up resources opened during init();//for ( int i = 0; i < initThreads.length; i++ ) {StepMetaDataCombi combi = initThreads[ i ].getCombi();// Dispose will overwrite the status, but we set it back right after// this.combi.step.dispose( combi.meta, combi.data );if ( initThreads[ i ].isOk() ) {combi.data.setStatus( StepExecutionStatus.STATUS_HALTED );} else {combi.data.setStatus( StepExecutionStatus.STATUS_STOPPED );}}// Just for safety, fire the trans finished listeners...try {fireTransFinishedListeners();} catch ( KettleException e ) {// listeners produces errorslog.logError( BaseMessages.getString( PKG, "Trans.FinishListeners.Exception" ) );// we will not pass this exception up to prepareExecution() entry point.} finally {// Flag the transformation as finished even if exception was thrownsetFinished( true );}// Pass along the log during preview. Otherwise it becomes hard to see// what went wrong.//if ( preview ) {String logText = KettleLogStore.getAppender().getBuffer( getLogChannelId(), true ).toString();throw new KettleException( BaseMessages.getString( PKG, "Trans.Log.FailToInitializeAtLeastOneStep" ) + Const.CR+ logText );} else {throw new KettleException( BaseMessages.getString( PKG, "Trans.Log.FailToInitializeAtLeastOneStep" )+ Const.CR );}}log.snap( Metrics.METRIC_TRANSFORMATION_INIT_STOP );KettleEnvironment.setExecutionInformation( this, repository );setReadyToStart( true );}