springboot封装azkaban的api,提供可调用azkaban任务流的接口
流程如下:
springboot接口->azkaban api->azkaban project(flow tasks)->shell脚本->spark tasks
Api测试
curl -k -X POST --data "action=login&username=azkaban&password=azkaban" https://192.168.33.162:8443
{"session.id" : "2b0a77ee-ee93-4d31-a188-8bcc89bacdb2","status" : "success"
}curl -k -X POST --data "session.id=2b0a77ee-ee93-4d31-a188-8bcc89bacdb2&ajax=executeFlow&project=iot&flow=iot&flowOverride[buiz]=iot_ads_use_x_hour&flowOverride[projects]=120100lae&flowOverride[meterKind]=1&flowOverride[meterCode]='xxx'&flowOverride[dt]=2021-11-16&flowOverride[archive_suffix]=''" https://192.168.33.162:8443/executor
{"project" : "iot","message" : "Execution queued successfully with exec id 7975","flow" : "iot","execid" : 7975
}curl -k --data "session.id=2b0a77ee-ee93-4d31-a188-8bcc89bacdb2&ajax=fetchexecflow&execid=5559" https://192.168.33.162:8443/executor
{"project" : "iot","updateTime" : 1637132885051,"type" : null,"attempt" : 0,"execid" : 5559,"submitTime" : 1637132859558,"nodes" : [ {"nestedId" : "iot_main","startTime" : 1637132859661,"updateTime" : 1637132884986,"id" : "iot_main","endTime" : 1637132884914,"type" : "command","attempt" : 0,"status" : "FAILED"} ],"nestedId" : "iot","submitUser" : "azkaban","startTime" : 1637132859655,"id" : "iot","endTime" : 1637132885046,"projectId" : 17,"flowId" : "iot","flow" : "iot","status" : "FAILED"
}curl -k --data "session.id=2b0a77ee-ee93-4d31-a188-8bcc89bacdb2&ajax=fetchExecJobLogs&execid=5559&jobId=iot_main&offset=0&length=10000" https://192.168.33.162:8443/executor
{"length" : 0,"offset" : 0,"data" : ""
}
FlowJobProcess
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import okhttp3.*;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;import javax.net.ssl.*;
import java.io.IOException;
import java.security.cert.CertificateException;/*** curl -k -X POST --data "action=login&username=azkaban&password=azkaban" https://192.168.0.162:8443* curl -k -X POST --data "session.id=78d1374e-3b0e-445b-9a71-302cffa05f98&ajax=executeFlow&project=iot&flow=iot&flowOverride[buiz]=iot_ads_use_x_hour&flowOverride[projects]=120100lae&flowOverride[meterKind]=1&flowOverride[meterCode]='xxx'&flowOverride[dt]=2021-11-16&flowOverride[archive_suffix]=''" https://192.168.0.162:8443/executor* curl -k --data "session.id=78d1374e-3b0e-445b-9a71-302cffa05f98&ajax=fetchexecflow&execid=5559" https://192.168.0.162:8443/executor* curl -k --data "session.id=78d1374e-3b0e-445b-9a71-302cffa05f98&ajax=fetchExecJobLogs&execid=5559&jobId=iot_main&offset=0&length=10000" https://192.168.0.162:8443/executor*/
public class FlowJobProcess {public static void main(String[] args) {Observable.create(new Observable.OnSubscribe<Response>() {@Overridepublic void call(Subscriber<? super Response> subscriber) {OkHttpClient okHttpClient = getUnsafeOkHttpClient();String url = "https://192.168.0.162:8443";RequestBody requestBody = new FormBody.Builder().add("action", "login").add("username", "azkaban").add("password", "azkaban").build();Request request = new Request.Builder().url(url).post(requestBody).build();Call call = okHttpClient.newCall(request);try {Response response = call.execute();subscriber.onNext(response);} catch (IOException e) {e.printStackTrace();}}}).map(new Func1<Response, String>() {@Overridepublic String call(Response response) {String result = null;try {result = response.body().string();} catch (IOException e) {e.printStackTrace();}System.out.println("授权成功:" + result);JSONObject jsonObject = JSON.parseObject(result);return jsonObject.getString("session.id");}}).flatMap(new Func1<String, Observable<String>>() {@Overridepublic Observable<String> call(String s) {System.out.println("sessionId:" + s);OkHttpClient okHttpClient = getUnsafeOkHttpClient();String url = "https://192.168.0.162:8443/executor";RequestBody requestBody = new FormBody.Builder().add("session.id", s).add("ajax", "executeFlow").add("project", "iot").add("flow", "iot").add("flowOverride[buiz]", "iot_ads_use_x_hour").add("flowOverride[projects]", "120100lae").add("flowOverride[meterKind]", "1").add("flowOverride[meterCode]", "xxx").add("flowOverride[dt]", "2021-12-02").add("flowOverride[archive_suffix]", "").build();Request request = new Request.Builder().url(url).post(requestBody).build();String result = "";Call call = okHttpClient.newCall(request);try {Response response = call.execute();result = response.body().string();System.out.println(result);} catch (IOException e) {e.printStackTrace();}return Observable.just(s + "@" + result);}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {String[] split = s.split("@");String sessionId = split[0];String result = split[1];/*** flow exec result:{* "project" : "iot",* "message" : "Execution queued successfully with exec id 5626",* "flow" : "iot",* "execid" : 5626* }*/System.out.println("sessionId is" + sessionId + " and flow exec result:" + result);}});}public static OkHttpClient getUnsafeOkHttpClient() {try {// Create a trust manager that does not validate certificate chainsfinal TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() {@Overridepublic void checkClientTrusted(java.security.cert.X509Certificate[] chain, String authType) throws CertificateException {}@Overridepublic void checkServerTrusted(java.security.cert.X509Certificate[] chain, String authType) throws CertificateException {}@Overridepublic java.security.cert.X509Certificate[] getAcceptedIssuers() {return new java.security.cert.X509Certificate[]{};}}};// Install the all-trusting trust managerfinal SSLContext sslContext = SSLContext.getInstance("SSL");sslContext.init(null, trustAllCerts, new java.security.SecureRandom());// Create an ssl socket factory with our all-trusting managerfinal SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();OkHttpClient.Builder builder = new OkHttpClient.Builder();builder.sslSocketFactory(sslSocketFactory);builder.hostnameVerifier(new HostnameVerifier() {@Overridepublic boolean verify(String hostname, SSLSession session) {return true;}});return builder.build();} catch (Exception e) {throw new RuntimeException(e);}}
}
下面的web项目只是spark任务启动前的前置任务:利用接口删除数据
es数据Dao
import com.mysql.jdbc.StringUtils;
import lombok.SneakyThrows;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;/*** 针对某个索引,在dt内,基于设备编码删除数据*/
@Repository
public class IndexHandler {@AutowiredRestHighLevelClient esClient;public void getAllIndexes() throws IOException {GetAliasesRequest getAliasesRequest = new GetAliasesRequest();GetAliasesResponse alias = esClient.indices().getAlias(getAliasesRequest, RequestOptions.DEFAULT);Map<String, Set<AliasMetaData>> aliases = alias.getAliases();Set<String> indices = aliases.keySet();indices.forEach(new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println("index:" + s);}});}public SearchResponse search(String dt, String meterKind, String meterCode) throws IOException {String index = "";switch (meterKind) {case "0":index = "ads_iot_electricity_index2";break;case "1":index = "ads_iot_water_index";break;}if (StringUtils.isNullOrEmpty(index)) {throw new IllegalArgumentException("es index is not matched!!!");}System.out.println("index:" + index);BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("dt", dt));String meterCodeAlas = "equipmentNo";if (!StringUtils.isNullOrEmpty(meterCode)) {must.must(QueryBuilders.matchQuery(meterCodeAlas, meterCode));}System.out.println("must:" + must.toString());//查询apiSearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.query(must);SearchRequest searchRequest = new SearchRequest();searchRequest.indices(index.split(","));searchRequest.source(searchSourceBuilder);return esClient.search(searchRequest, RequestOptions.DEFAULT);}@SneakyThrowspublic BulkByScrollResponse deleteByQuery(String dt, String meterKind, String meterCode) {String index = "";switch (meterKind) {case "0":index = "ads_iot_electricity_index2";break;case "1":index = "ads_iot_water_index";break;}if (StringUtils.isNullOrEmpty(index)) {throw new IllegalArgumentException("es index is not matched!!!");}BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("dt", dt))//注意旧索引没有meterKind
// .must(QueryBuilders.matchQuery("meterKind", meterKind)).must(QueryBuilders.matchQuery("equipmentNo", meterCode));System.out.println("must:" + must.toString());DeleteByQueryRequest request = new DeleteByQueryRequest();request.setQuery(must);request.indices(index);request.setConflicts("proceed");request.setTimeout(String.valueOf(TimeValue.timeValueMinutes(5)));request.setRefresh(true);request.setQuery(must);BulkByScrollResponse bulkByScrollResponse = esClient.deleteByQuery(request, RequestOptions.DEFAULT);return bulkByScrollResponse;}}
IndexController
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;import java.io.IOException;@Controller
@ResponseBody
public class IndexController {@AutowiredIndexHandler indexHandler;@GetMapping("/search/{dt}/{meterKind}")public R search(@PathVariable("dt") String dt, @PathVariable("meterKind") String meterKind) throws IOException {return search(dt, meterKind, null);}@GetMapping("/search/{dt}/{meterKind}/{meterCode}")public R search(@PathVariable("dt") String dt, @PathVariable("meterKind") String meterKind, @Nullable @PathVariable("meterCode") String meterCode) throws IOException {if ("yyyy-MM-dd".length() != dt.length()) {throw new IllegalArgumentException("dt格式必须为yyyy-MM-dd");}System.out.println("dt:" + dt);SearchResponse response = indexHandler.search(dt, meterKind, meterCode);RestStatus status = response.status();System.out.println("status:" + status);if (status == RestStatus.OK) {SearchHits hits = response.getHits();SearchHit[] searchHits = hits.getHits();for (SearchHit hit : searchHits) {// do something with the SearchHitString sourceAsString = hit.getSourceAsString();System.out.println("hits:" + sourceAsString);}return R.ok().put("data", searchHits);} else {return R.error(-1, "not ok").put("data", response.status());}}/*** Resolved [org.springframework.web.HttpRequestMethodNotSupportedException: Request method 'GET' not supported]*///@DeleteMapping("/delete/{dt}/{meterKind}/{meterCode}")@RequestMapping(value = "/delete/{dt}/{meterKind}/{meterCode}", method = RequestMethod.GET)public R deleteByQuery(@PathVariable("dt") String dt, @PathVariable("meterKind") String meterKind, @PathVariable("meterCode") String meterCode) {System.out.println("dt:" + dt);if ("yyyy-MM-dd".length() != dt.length()) {throw new IllegalArgumentException("dt格式必须为yyyy-MM-dd");}BulkByScrollResponse response = indexHandler.deleteByQuery(dt, meterKind, meterCode);System.out.println("delete response:" + response);return R.ok().put("data", response);}
}
EsConfig
import lombok.Data;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置绑定*/
@Data
@Configuration
@ConfigurationProperties(prefix = "es")
public class EsConfig {/*** Factory method 'esClient' threw exception;* nested exception is java.lang.IllegalArgumentException: Host name may not be null*/private String host;private int port;@Beanpublic RestHighLevelClient esClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));}}
R
import org.apache.http.HttpStatus;import java.util.HashMap;
import java.util.Map;/*** 返回数据*/
public class R<T> extends HashMap<String, Object> {private static final long serialVersionUID = 1L;private T data;public T getData() {return data;}public void setData(T data) {this.data = data;}public R() {put("code", 200);put("msg", "success");}public static R error() {return error(HttpStatus.SC_INTERNAL_SERVER_ERROR, "未知异常,请联系管理员");}public static R error(String msg) {return error(HttpStatus.SC_INTERNAL_SERVER_ERROR, msg);}public static R error(int code, String msg) {R r = new R();r.put("code", code);r.put("msg", msg);return r;}public static R ok(String msg) {R r = new R();r.put("msg", msg);return r;}public static R ok(Map<String, Object> map) {R r = new R();r.putAll(map);return r;}public static R ok() {return new R();}public R put(String key, Object value) {super.put(key, value);return this;}
}
Main
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;@SpringBootApplication(scanBasePackages = "com.xx.yy", exclude = {DataSourceAutoConfiguration.class})
public class Main {public static void main(String[] args) {SpringApplication.run(Main.class, args);}
}
TestData
import com.mz.iot.dao.IndexHandler;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;@SpringBootTest
public class TestData {// @Autowired
// JdbcTemplate template;@AutowiredIndexHandler indexHandler;@AutowiredRestHighLevelClient esClient;@Testvoid testGetAllIndexes() throws IOException {indexHandler.getAllIndexes();}@Testvoid testEsClient() {System.out.println("esClient:" + esClient);}@Testvoid testDelete() {BulkByScrollResponse bulkByScrollResponse = indexHandler.deleteByQuery("2021-12-23", "1", "CACVHNM61U62");System.out.println("test delete response:" + bulkByScrollResponse);}@Testvoid testSearch() throws IOException {SearchResponse response = indexHandler.search("2021-12-23", "1", "CACVHNM61U62");System.out.println("test search response:" + response);SearchHits hits = response.getHits();SearchHit[] searchHits = hits.getHits();for (SearchHit hit : searchHits) {// do something with the SearchHitString sourceAsString = hit.getSourceAsString();System.out.println("hits:" + sourceAsString);}}
}
application.propertis
server.port=8888
server.tomcat.accesslog.encoding=utf-8
debug=false
my.car.brand=byd
my.car.price=9999.98
#Consider the following:
#If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
#If you have database settings to be loaded from a particular profile you may need to activate it (no profiles are currently active).
spring.profiles.active=test
spring.mvc.hiddenmethod.filter.enabled=true
application-test.yml
spring:datasource:url: jdbc:mysql://192.168.33.169:3306/paascloud_wfsusername: mzadminpassword: Mz@123456driver-class-name: com.mysql.jdbc.Driverjdbc:template:query-timeout: 30data:elasticsearch:client:reactive:endpoints: 191.168.33.163:9200application:name: cupon
es:host: 192.168.33.163port: 9200
my:car:brand: bwm
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/></parent><artifactId>iot_web</artifactId><modelVersion>4.0.0</modelVersion><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><!--覆盖父工程spring-boot-starter-parent中定义的依赖版本--><elasticsearch.version>6.5.4</elasticsearch.version><lombok.version>1.18.18</lombok.version><mysql.version>5.1.48</mysql.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jdbc</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.9.1</version></dependency><dependency><groupId>io.reactivex</groupId><artifactId>rxjava</artifactId><version>1.3.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>