dolphinscheduler-笔记2

springboot集成dolphinscheduler

说明

为了避免对DolphinScheduler产生过度依赖,实践中通常不会全面采用其内置的所有任务节点类型。相反,会选择性地利用DolphinScheduler的HTTP任务节点功能,以此作为工作流执行管理的桥梁,对接并驱动自有项目的业务流程。这种策略不仅确保了流程编排的灵活性与扩展性,还有效减少了对外部调度系统的深度绑定,从而在提升项目自洽能力的同时,保持了良好的系统间解耦。

简而言之,我们倾向于仅采纳DolphinScheduler中的HTTP任务节点,作为调度机制的一部分,来促进我们内部项目工作流的自动化执行。这样做既能享受DolphinScheduler带来的调度便利,又避免了全盘接受其所有组件所带来的潜在风险,实现了更为稳健、可控的项目管理方案。

代码实现

为了优化与DolphinScheduler的集成,以下是三个关键配置类的概述,它们旨在通过初始化接口实现项目及租户信息的同步通知。值得注意的是,为了确保数据一致性和高效通信,你的Spring Boot应用所使用的数据库应与DolphinScheduler共享同一数据源。这一策略不仅简化了数据管理,还促进了实时状态更新,增强了系统的整体协调性。

简而言之,我们精心设计了三组配置规则,允许我们的Spring Boot项目无缝对接DolphinScheduler平台。通过这些配置,项目和租户的动态变化能够及时反映到DolphinScheduler中,前提是两者共用一个数据库实例。这种架构决策不仅优化了资源分配,还促进了跨系统间的紧密协作,为后续的业务拓展奠定了坚实的基础。

package cn.com.lyb.data.dev.init;import cn.com.lyb.common.security.annotation.InnerRequest;
import cn.com.lyb.core.exception.BizException;
import cn.com.lyb.core.web.request.Response;
import cn.com.lyb.core.web.request.ResultWrap;
import cn.com.lyb.data.dev.web.config.DolphinschedulerConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;@Api(tags = "初始化dolphinscheduler数据库信息")
@RestController
@RequestMapping("/data-dev")
public class InitializePlugin {@Autowiredprivate DataSource dataSource;@Autowiredprivate DolphinschedulerConfig dolphinschedulerConfig;@GetMapping("/init")@ApiOperation("部署全新的环境可以用此接口,否则会报错")@InnerRequestpublic Response init(){Connection connection = null;try{connection = dataSource.getConnection();// 项目表(xgov如果集成项目的话,这个sql不能再执行,配置也需要改)String projectSql = "INSERT INTO `dolphinscheduler`.`t_ds_project` (`id`, `name`, `code`, `description`, `user_id`, `flag`, `create_time`, `update_time`) VALUES (1, 'lyb', '" + dolphinschedulerConfig.getProjectCode() + "', '', 1, 1, '2024-06-13 02:49:43', '2024-06-13 02:49:43');";String tokenSql = "INSERT INTO `dolphinscheduler`.`t_ds_access_token` (`id`, `user_id`, `token`, `expire_time`, `create_time`, `update_time`) VALUES (1, 1, '"+dolphinschedulerConfig.getDsdToken()+"', '2039-12-30 10:51:26', '2024-06-13 02:50:37', '2024-06-18 10:00:13');";String tenantSql = "INSERT INTO `dolphinscheduler`.`t_ds_tenant` (`id`, `tenant_code`, `description`, `queue_id`, `create_time`, `update_time`) VALUES (1, 'default', '', 1, '2024-06-13 02:50:20', '2024-06-13 02:50:20');";String userSql = "UPDATE `dolphinscheduler`.`t_ds_user` SET `user_name` = 'admin', `user_password` = '470b9934942620215ad1cb3ac2d48497', `user_type` = 0, `email` = 'xxx@qq.com', `phone` = '', `tenant_id` = 1, `create_time` = '2024-06-12 10:23:37', `update_time` = '2024-06-18 09:59:52', `queue` = '', `state` = 1, `time_zone` = 'Asia/Shanghai' WHERE `id` = 1;";List<String> sqlList = Arrays.asList(projectSql, tenantSql, tokenSql, userSql);connection.setAutoCommit(false);Statement statement = connection.createStatement();for (String sql : sqlList) {statement.addBatch(sql);}statement.executeBatch();connection.commit();statement.close();}catch (Exception e){throw new BizException("初始化报错");}finally {if(connection != null) {try {connection.close();} catch (SQLException e) {e.printStackTrace();}}}return ResultWrap.ok();}
}
package cn.com.lyb.data.dev.web.config;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;@Configuration
public class RestTemplateConfig {@Value("${xgov.template.connectTimeout}")private int connectTimeout;@Value("${xgov.template.socketTimeout}")private int socketTimeout;@Beanpublic RestTemplate restTemplate() {return new RestTemplate(httpRequestFactory());}@Beanpublic HttpComponentsClientHttpRequestFactory httpRequestFactory() {PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();connectionManager.setMaxTotal(200); // 最大连接数connectionManager.setDefaultMaxPerRoute(20); // 每个路由默认的最大连接数RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeout) // 连接超时时间.setSocketTimeout(socketTimeout) // 读取超时时间.setConnectionRequestTimeout(5000) // 从连接池获取连接的超时时间.build();CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connectionManager).setDefaultRequestConfig(requestConfig).build();return new HttpComponentsClientHttpRequestFactory(httpClient);}
}
package cn.com.dev.data.dev.web.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
public class DolphinschedulerConfig {@Value("${lyb.dolphinscheduler.server.username}")private String dsdUsername;@Value("${lyb.dolphinscheduler.server.token}")private String dsdToken;@Value("${lyb.dolphinscheduler.server.url}")private String dsdUrl;@Value("${lyb.dolphinscheduler.server.porjectCode}")private String projectCode;@Value("${lyb.dolphinscheduler.server.tenantCode}")private String tenantCode;public String getDsdUsername() {return dsdUsername;}public void setDsdUsername(String dsdUsername) {this.dsdUsername = dsdUsername;}public String getDsdToken() {return dsdToken;}public void setDsdToken(String dsdToken) {this.dsdToken = dsdToken;}public String getDsdUrl() {return dsdUrl;}public void setDsdUrl(String dsdUrl) {this.dsdUrl = dsdUrl;}public String getProjectCode() {return projectCode;}public void setProjectCode(String projectCode) {this.projectCode = projectCode;}public String getTenantCode() {return tenantCode;}public void setTenantCode(String tenantCode) {this.tenantCode = tenantCode;}}

构建一个调用类,该类全面集成了与DolphinScheduler接口的交互逻辑,为我们的应用提供了一层抽象。对于涉及具体业务逻辑的数据封装细节,此处将不再赘述,旨在保持代码的清晰度与通用性。

简言之,我们设计了一个专门的类来处理所有与DolphinScheduler的API调用,确保了业务核心逻辑的独立性和可维护性。这一封装策略使得代码库更加整洁,同时也提升了开发效率和系统的整体健壮性。

通过这种方式,我们不仅隔离了与外部服务的直接交互,还简化了业务逻辑的实现,使其更加专注于核心功能,而非调度系统的细节。这样的架构设计,有助于团队成员快速理解系统架构,同时也便于未来的功能扩展和系统维护。

package cn.com.lyb.data.dev.web.dolphinscheduler.service;import cn.com.lyb.common.redis.service.RedisService;
import cn.com.lyb.core.exception.BizException;
import cn.com.lyb.data.dev.enums.ProcessExecutionTypeEnum;
import cn.com.lyb.data.dev.enums.TaskExecutionStatus;
import cn.com.lyb.data.dev.enums.WorkflowExecutionStatus;
import cn.com.lyb.data.dev.web.config.DolphinschedulerConfig;
import cn.com.lyb.data.dev.workflow.entity.delphinscheduler.TaskDefinition;
import cn.com.lyb.data.dev.workflow.entity.vo.GanttTaskVO;
import cn.com.lyb.data.dev.workflow.entity.vo.ProcessInstanceVO;
import cn.com.lyb.data.dev.workflow.entity.vo.ResponseTaskLog;
import cn.com.lyb.data.dev.workflow.entity.vo.TaskInstanceVO;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;@Service
public class DolphinschedulerService {private static final Logger logger = LoggerFactory.getLogger(DolphinschedulerService.class);private static final String CONTENT_TYPE = "application/x-www-form-urlencoded; charset=utf-8";private static final String X_REQUESTED_WITH = "XMLHttpRequest";@Autowiredprivate RestTemplate restTemplate;@Autowiredprivate DolphinschedulerConfig dolphinschedulerConfig;private static final Boolean DSD_SUCCESS = true;@Autowiredprivate RedisService redisService;private static final String DSD_SESSION_KEY = "DSD_SESSION_KEY";private static final String SUCCESS = "success";private static final String MSG = "msg";// 此种方法适用登录后获取SESSION设置到header里面private static final String SESSION_ID = "sessionId";private static final String TOKEN = "token";private static final String DATA = "data";/*** 登录,返回 sessionId*/public String login() {String sessionValue = redisService.getCacheObject(DSD_SESSION_KEY);if (StringUtils.isNotBlank(sessionValue)) {return sessionValue;}//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());//linkedMultiValueMap.add("userPassword", dolphinschedulerConfig.getDsdPassword());HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/login";JSONObject resultJSON = doPostForObject(url, httpEntity);JSONObject data = (JSONObject) resultJSON.get(DATA);String sessionId = data.get(SESSION_ID).toString();redisService.setCacheObject(DSD_SESSION_KEY, sessionId, 23L, TimeUnit.HOURS);return sessionId;}/*** 创建项目** @return*/public void createProject(String projectName, String description) {// 如果是https登录可以使用该方法//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();linkedMultiValueMap.add("projectName", projectName);linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);logger.info("Azkaban请求信息:" + httpEntity.toString());String url = dolphinschedulerConfig.getDsdUrl() + "/projects";doPostForObject(url, httpEntity);}/*** 项目列表** @param pageNo* @param pageSize* @param searchVal*/public Object projectsPage(Integer pageNo, Integer pageSize, String searchVal) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);String url;if (StringUtils.isNotBlank(searchVal)) {url = dolphinschedulerConfig.getDsdUrl() + "/projects?pageNo=" + pageNo + "&pageSize=" + pageSize + "&searchVal=" + searchVal;} else {url = dolphinschedulerConfig.getDsdUrl() + "/projects?pageNo=" + pageNo + "&pageSize=" + pageSize;}JSONObject resultJSON = doGetForObject(url, entity);return resultJSON.get(DATA);}/*** 修改项目** @param code* @param projectName* @param description* @return*/public Object updateProjects(String code, String projectName, String description) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();linkedMultiValueMap.add("projectName", projectName);linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + code;JSONObject resultJSON = doPutForObject(url, httpEntity);return resultJSON.get(DATA);}/*** 删除项目** @param code* @return*/public Object delProjects(String code) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + code;JSONObject resultJSON = doDeleteForObject(url, httpEntity);return resultJSON.get(DATA);}public JSONObject doPostForObject(String url, HttpEntity httpEntity) {logger.info("调用url:{}", url);try {ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.POST, httpEntity, String.class);String result = exchange.getBody();logger.info("post类型接口调用返回信息:{}", result);return getJsonObject(result);} catch (BizException be) {throw be;} catch (Exception e) {logger.error("post类型接口调用失败:{}", e);throw new BizException(e.getMessage());}}public JSONObject doGetForObject(String url, HttpEntity httpEntity) {logger.info("调用url:{}", url);try {ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class);String result = exchange.getBody();logger.info("get类型接口调用返回信息:{}", result);return getJsonObject(result);} catch (BizException be) {throw be;} catch (Exception e) {logger.error("get类型接口调用失败:{}", e);throw new BizException(e.getMessage());}}public JSONObject doPutForObject(String url, HttpEntity httpEntity) {logger.info("调用url:{}", url);try {ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.PUT, httpEntity, String.class);String result = exchange.getBody();logger.info("put类型接口调用返回信息:{}", result);return getJsonObject(result);} catch (BizException be) {throw be;} catch (Exception e) {logger.error("put类型接口调用失败:{}", e);throw new BizException(e.getMessage());}}public JSONObject doDeleteForObject(String url, HttpEntity httpEntity) {logger.info("调用url:{}", url);try {ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.DELETE, httpEntity, String.class);String result = exchange.getBody();logger.info("delete类型接口调用返回信息:{}", result);return getJsonObject(result);} catch (BizException be) {throw be;} catch (Exception e) {logger.error("delete类型接口调用失败:{}", e);throw new BizException(e.getMessage());}}private static JSONObject getJsonObject(String result) {JSONObject resultJSON = JSON.parseObject(result);if (!DSD_SUCCESS.equals(resultJSON.get(SUCCESS))) {logger.error("调用结果返回异常:{}" + result);Integer code = (Integer) resultJSON.get("code");if(code.intValue() == 50019){throw new BizException("流程节点间存在循环依赖");}else if(code.intValue() == 50036){throw new BizException("工作流任务关系参数错误");} else {throw new BizException(resultJSON.get(MSG).toString());}}return resultJSON;}/*** 创建工作流** @param name* @param description* @param globalParams* @param locations* @param timeout* @param taskRelationJson* @param taskDefinitionJson* @param otherParamsJson* @param executionType* @return 3.2.0 版本*/public Long createWorkFlow320(String name, String description, String globalParams, String locations, int timeout,String taskRelationJson, String taskDefinitionJson, String otherParamsJson,ProcessExecutionTypeEnum executionType) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("name", name);linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);linkedMultiValueMap.add("taskRelationJson", taskRelationJson);linkedMultiValueMap.add("timeout", timeout);linkedMultiValueMap.add("executionType", executionType);linkedMultiValueMap.add("otherParamsJson", otherParamsJson);linkedMultiValueMap.add("globalParams", globalParams);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition";JSONObject resultJSON = doPostForObject(url, httpEntity);JSONObject data = (JSONObject) resultJSON.get(DATA);Long code = (Long) data.get("code");return code;}/*** 查询工作流列表** @param pageNo* @param pageSize* @param searchVal* @return*/public Object selectFlowPage(Integer pageNo, Integer pageSize, String searchVal) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);String url;if (StringUtils.isNotBlank(searchVal)) {url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition?pageNo=" + pageNo + "&pageSize=" + pageSize + "&searchVal=" + searchVal;} else {url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition?pageNo=" + pageNo + "&pageSize=" + pageSize;}JSONObject resultJSON = doGetForObject(url, entity);return resultJSON.get(DATA);}public Object selectOneFlow(String code) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + code;JSONObject resultJSON = doGetForObject(url, entity);return resultJSON.get(DATA);}public Long createWorkFlow(String name, String description, String locations,String taskDefinitionJson, String taskRelationJson,String executionType) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("name", name);linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);linkedMultiValueMap.add("taskRelationJson", taskRelationJson);linkedMultiValueMap.add("timeout", 0);linkedMultiValueMap.add("executionType", executionType);linkedMultiValueMap.add("tenantCode", dolphinschedulerConfig.getTenantCode());linkedMultiValueMap.add("locations", locations);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition";JSONObject resultJSON = doPostForObject(url, httpEntity);JSONObject data = (JSONObject) resultJSON.get(DATA);Long code = (Long) data.get("code");return code;}public void updateReleaseState(String name, String releaseState, Long code) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("name", name);linkedMultiValueMap.add("releaseState", releaseState);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + code + "/release";doPostForObject(url, httpEntity);}public List<TaskDefinition> getTaskByWorkflowCode(Long dsdCode) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + dsdCode;JSONObject resultJSON = doGetForObject(url, httpEntity);JSONObject data = (JSONObject) resultJSON.get(DATA);JSONArray jsonArray = (JSONArray) data.get("taskDefinitionList");return jsonArray.toJavaList(TaskDefinition.class);}/*** 删除工作流** @param codes*/public void delWorkflow(String codes) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("codes", codes);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/batch-delete";doPostForObject(url, httpEntity);}public void updateWorkFlow(Long dsdCode, String name, String description, String locations, String taskDefinitionJson, String taskRelationJson, String executionType) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("name", name);linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);linkedMultiValueMap.add("taskRelationJson", taskRelationJson);linkedMultiValueMap.add("timeout", 0);linkedMultiValueMap.add("executionType", executionType);linkedMultiValueMap.add("tenantCode", dolphinschedulerConfig.getTenantCode());linkedMultiValueMap.add("locations", locations);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + dsdCode;doPutForObject(url, httpEntity);}/*** 运行工作流** @param code*/public void runWorkflow(Long code) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("processDefinitionCode", code);linkedMultiValueMap.add("failureStrategy", "CONTINUE");linkedMultiValueMap.add("warningType", "NONE");linkedMultiValueMap.add("scheduleTime", getStringDate());HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/executors/start-process-instance";doPostForObject(url, httpEntity);}public String getStringDate() {LocalDateTime currentDateTime = LocalDateTime.now();LocalDateTime startDate = currentDateTime.withHour(0).withMinute(0).withSecond(0).withNano(0);LocalDateTime endDate = startDate;DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");String formattedStartDate = startDate.format(formatter);JSONObject jsonObject = new JSONObject();jsonObject.put("complementStartDate", formattedStartDate);jsonObject.put("complementEndDate", formattedStartDate);return jsonObject.toString();}/*** 获取任务日志** @param id* @return*/public ResponseTaskLog getLog(Integer id, Integer limit, Integer skipLineNum) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);String url = dolphinschedulerConfig.getDsdUrl() + "/log/detail?taskInstanceId=" + id + "&limit=" + limit + "&skipLineNum=" + skipLineNum;JSONObject resultJSON = doGetForObject(url, entity);return JSON.parseObject(resultJSON.get(DATA).toString(), ResponseTaskLog.class);}/*** 重跑任务** @param processInstanceId*/public void operation(Integer processInstanceId, String executeType) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();// 1 REPEAT_RUNNING 重跑 2 STOP 停止 3 RECOVER_SUSPENDED_PROCESS 恢复运行 4 PAUSE 暂停linkedMultiValueMap.add("processInstanceId", processInstanceId);switch (executeType) {case "1":addExecutionDetails(linkedMultiValueMap, 1, "REPEAT_RUNNING", "run");break;case "2":linkedMultiValueMap.add("executeType", "STOP");break;case "3":addExecutionDetails(linkedMultiValueMap, 0, "RECOVER_SUSPENDED_PROCESS", "suspend");break;case "4":linkedMultiValueMap.add("executeType", "PAUSE");break;default:throw new BizException("暂不支持该操作");}HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/executors/execute";doPostForObject(url, httpEntity);}public void addExecutionDetails(MultiValueMap<String, Object> map, int index, String executeType, String buttonType) {map.add("index", String.valueOf(index));map.add("executeType", executeType);if (buttonType != null) {map.add("buttonType", buttonType);}}public PageInfo<ProcessInstanceVO> processInstances(Long dsdWorkflowCode, String searchVal, Integer pageNum, Integer pageSize,String startDate, String endDate, String stateType) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl()).append("/projects/").append(dolphinschedulerConfig.getProjectCode()).append("/process-instances?pageNo=").append(pageNum).append("&pageSize=").append(pageSize).append("&call=").append("1");// 这个必须加,不然删除工作流后,实例会不见if(null != dsdWorkflowCode){urlBuilder.append("&processDefineCode=").append(dsdWorkflowCode);}if(StringUtils.isNotBlank(searchVal)){urlBuilder.append("&searchVal=").append(searchVal);}supplementaryParameters(startDate, endDate, stateType, urlBuilder);JSONObject resultJSON = doGetForObject(urlBuilder.toString(), entity);JSONObject data = (JSONObject) resultJSON.get(DATA);JSONArray jsonArray = (JSONArray) data.get("totalList");List<ProcessInstanceVO> javaList = jsonArray.toJavaList(ProcessInstanceVO.class);Integer total = (Integer) data.get("total");PageInfo<ProcessInstanceVO> res = new PageInfo<>();res.setList(javaList);res.setTotal(total);return res;}private void supplementaryParameters(String startDate, String endDate, String stateType, StringBuilder urlBuilder) {if (stateType != null && !stateType.isEmpty()) {urlBuilder.append("&stateType=").append(stateType);}if (startDate != null && !startDate.isEmpty()) {urlBuilder.append("&startDate=").append(startDate);}if (endDate != null && !endDate.isEmpty()) {urlBuilder.append("&endDate=").append(endDate);}}public PageInfo<TaskInstanceVO> taskInstances(Integer processInstanceId, String startDate, String endDate,String stateType, int pageNum, int pageSize) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl()).append("/projects/").append(dolphinschedulerConfig.getProjectCode()).append("/task-instances?pageNo=").append(pageNum).append("&pageSize=").append(pageSize).append("&processInstanceId=").append(processInstanceId).append("&taskExecuteType=").append("BATCH");supplementaryParameters(startDate, endDate, stateType, urlBuilder);String url = urlBuilder.toString();JSONObject resultJSON = doGetForObject(url, entity);JSONObject data = (JSONObject) resultJSON.get(DATA);JSONArray jsonArray = (JSONArray) data.get("totalList");List<TaskInstanceVO> javaList = jsonArray.toJavaList(TaskInstanceVO.class);Integer total = (Integer) data.get("total");PageInfo<TaskInstanceVO> res = new PageInfo<>();res.setList(javaList);res.setTotal(total);return res;}/*** 获取工作流执行顺序* @param processInstanceId* @return*/public List<GanttTaskVO> viewGantt(Long processInstanceId) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl()).append("/projects/").append(dolphinschedulerConfig.getProjectCode()).append("/process-instances/").append(processInstanceId).append("/view-gantt");String url = urlBuilder.toString();JSONObject resultJSON = doGetForObject(url, entity);JSONObject data = (JSONObject) resultJSON.get(DATA);JSONArray jsonArray = (JSONArray) data.get("tasks");List<GanttTaskVO> javaList = jsonArray.toJavaList(GanttTaskVO.class);return javaList;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/39861.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目管理实用表格与应用【项目文件资料分享】

项目管理基础知识 项目管理可分为五大过程组&#xff08;启动、规划、执行、监控、收尾&#xff09;十大知识领域&#xff0c;其中包含49个子过程 项目十大知识领域分为&#xff1a;项目整合管理、项目范围管理、项目进度管理、项目成本管理、项目质量管理、项目资源管理、项目…

标量场与向量场

标量场与向量场 flyfish 场 是一个函数&#xff0c;它把空间中的每一点关联到一个数值或一个数学对象&#xff08;如向量、张量等&#xff09;。在物理学中&#xff0c;场可以描述许多物理现象&#xff0c;例如温度分布、电场、磁场、压力场等。 标量场 标量场 是一个函数&…

【BUUCTF-PWN】9-ciscn_2019_n_8

不属于栈溢出&#xff0c;应该是比较简单的pwn&#xff0c;看懂代码逻辑使用pwntools 32位&#xff0c;开启了Stack、NX、PIE保护 执行效果&#xff1a; main函数 使用通义千问询问的代码解读&#xff1a; 即当var数组的第十四个元素是17就可以 这里可以用两种payload…

Python使用总结之应用程序有哪些配置方式?配置方式对比

Python使用总结之应用程序有哪些配置方式&#xff1f;配置方式对比 在Python程序中&#xff0c;管理配置信息的方法有很多&#xff0c;常见的方式包括使用INI文件、JSON文件、YAML文件、环境变量、以及直接在代码中定义配置。每种方式都有其独特的优势和适用场景。 1. INI文件 …

天环公益原创开发进度网站源码带后台免费分享

天环公益计划首发原创开发进度网站源码带后台免费分享 后台地址是&#xff1a;admin.php 后台没有账号密码 这个没有数据库 有能力的可以自己改 天环公益原创开发进度网站 带后台

ARM架构服务器/虚拟机编译部署Tendis(国产化替换Redis)

文章目录 一、概述 二、安装相关组件 三、下载最新的Tendis源码 四、编译源码 五、启动Tendis 六、使用Docker镜像部署Tendis 七、常见报错 八、参考链接 一、概述 国产化项目要求尽可能使用国产组件,尤其是已存在的项目,需要替换已有组件,比如使用Tendis替换Redis。…

微软中国全面撤店!我们到现场看了看

ChatGPT狂飙160天&#xff0c;世界已经不是之前的样子。 更多资源欢迎关注 7月1日&#xff0c;微软官方发言人向媒体表示&#xff1a; “微软不断评估其零售策略以满足我们的客户不断变化的需求&#xff0c;微软已决定对中国大陆市场的渠道进行整合。客户仍可通过零售合作伙伴…

校园失物招领系统带万字文档java项目失物招领管理系统java课程设计java毕业设计springboot vue

文章目录 校园失物招领系统一、项目演示二、项目介绍三、万字字项目文档四、部分功能截图五、部分代码展示六、底部获取项目源码带万字文档&#xff08;9.9&#xffe5;带走&#xff09; 校园失物招领系统 一、项目演示 校园失物招领系统 二、项目介绍 语言: Java 数据库&…

JAVA导出数据库字典到Excel

文章目录 1、查询某张表字段信息2、TableVo接收sql查询得到的数据3、excel导出4、导出案例 1、查询某张表字段信息 select column_name as columnName, -- 字段名 COLUMN_DEFAULT as colDefault, -- 默认值 column_key as columnKey, -- PRI-主键&#xff0c;UNI-唯一键&…

【Tools】 Postman 接口测试工具详解

那年夏天我和你躲在 这一大片宁静的海 直到后来我们都还在 对这个世界充满期待 今年冬天你已经不在 我的心空出了一块 很高兴遇见你 让我终究明白 回忆比真实精彩 &#x1f3b5; 王心凌《那年夏天宁静的海》 在现代软件开发中&#xff0c;API&#xff08;…

【Python实战因果推断】21_倾向分1

目录 The Impact of Management Training Adjusting with Regression 之前学习了如何使用线性回归调整混杂因素。此外&#xff0c;还向您介绍了通过正交化去偏差的概念&#xff0c;这是目前最有用的偏差调整技术之一。不过&#xff0c;您还需要学习另一种技术--倾向加权。这种…

Ionic 卡片:设计和使用指南

Ionic 卡片&#xff1a;设计和使用指南 Ionic 是一个强大的开源框架&#xff0c;用于构建跨平台的移动应用程序。它结合了 Angular、React 和 Vue 的强大功能&#xff0c;允许开发者使用 Web 技术创建高性能的移动应用。Ionic 卡片是框架中的一个核心组件&#xff0c;用于展示…

js使用插件完成xml转json

插件&#xff1a;xml2json.min.js 插件文件下载&#xff08;不能上传附件&#xff09;&#xff1a;https://download.csdn.net/download/zhu_zhu_xia/89513965 html代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset&qu…

我认为一般信息管理应用中使用存储过程高效

总看有些人反对使用存储过程&#xff0c;原因无非是以下几点 1.不利于更换数据库&#xff0c;就是没有移植性 2.不利用调试和扩展 就依据我们大大小小项目&#xff0c;风风雨雨走过近20年&#xff0c;每个系统的业务逻辑处理几乎都是用存储过程实现的&#xff0c;没发现多不…

p标签文本段落中因编辑器换行引起的空格问题完美解决方案

目录 1.修改前的代码&#xff1a;2.修改后的代码3.总结 在HTML文档中&#xff0c;如何要在&#xff08;p标签&#xff09;内写一段很长的文本段落&#xff0c;并且没有 换行。由于IDE或者编辑器界面大小有限或需要在vue中逻辑处理动态显示文本&#xff0c;一行写完太长&#x…

Eslint prettier airbnb规范 配置

1.安装vscode的Eslint和prettier 插件 eslint&#xff1a;代码质量检查工具 https://eslint.nodejs.cn/docs/latest/use/getting-started prettier&#xff1a;代码风格格式化工具 https://www.prettier.cn/docs/index.html /* eslint-config-airbnb-base airbnb 规范 esl…

高德地图轨迹回放并提示具体信息

先上效果图 到达某地点后显示提示语&#xff1a;比如&#xff1a;12&#xff1a;56分驶入康庄大道、左转驶入xx大道等 <!doctype html> <html> <head><meta charset"utf-8"><meta http-equiv"X-UA-Compatible" content"…

【前端CSS3】CSS显示模式(黑马程序员)

文章目录 一、前言&#x1f680;&#x1f680;&#x1f680;二、CSS元素显示模式&#xff1a;☀️☀️☀️2.1 什么是元素显示模式2.2 块元素2.3 行内元素2.4 行块元素2.5 元素显示模式的转换 三、总结&#x1f680;&#x1f680;&#x1f680; 一、前言&#x1f680;&#x1f…

巴图自动化Modbus协议转Profinet协议网关模块连智能仪表与PLC通讯

一、现场要求:PLC作为控制器&#xff0c;仪表设备作为执行设备。执行设备可以实时响应PLC传送的指令&#xff0c;并将数据反馈给PLC&#xff0c;从而实现PLC对仪表设备的控制和监控&#xff0c;实现对生产过程的精确控制。 二、解决方案:通过巴图自动化Modbus协议转Profinet协议…

前端面试题4(浏览器对http请求处理过程)

浏览器对http请求处理过程 当我们在浏览器中输入URL并按下回车键时&#xff0c;浏览器会执行一系列步骤来处理HTTP请求并与服务器通信。下面是浏览器处理过程 1. 解析URL 浏览器首先解析输入的URL&#xff0c;提取出协议&#xff08;通常是http://或https://&#xff09;、主…