@GetMapping("/startETL")
// @Idempotent(expireTime = 90, info = "请勿90秒内连续点击")public R getGaugeTestData6() {log.info("start ETL");//redis设置t_data_load_record 值为2bladeRedis.set("t_data_load_record_type", 2);String s = atomicityService.startETL();return R.data(s);}
@Slf4j
@Service
public class AtomicityService {@Autowiredprivate DataSourceService dataSourceService;// 使用 AtomicBoolean 作为互斥标志private final AtomicBoolean isExecuting = new AtomicBoolean(false);private final AtomicBoolean isExecutingForXRRJ = new AtomicBoolean(false);public String startETL() {if (isExecuting.compareAndSet(false, true)) {try {long startTime = System.currentTimeMillis(); // 记录开始时间ETLExecutionThreadLocal.setStartTime(startTime);// 执行 ETL 任务的具体逻辑//查询已经存在的数据的 批次号和站点号 做排除用List batchSiteList = dataSourceService.ReadBatchSiteList();List<Long> batchList = dataSourceService.ReadBatchList();List<Long> testerList = null;//测试人还是全删除吧,不要做增量查询,否则有些人被更新过信息进不来//读取数据源dataSourceService.ReadDataSource(batchSiteList, batchList, testerList);long endTime = System.currentTimeMillis();long executionTimeSeconds = (endTime - startTime) / 1000; // 计算用时(秒)return "ETL执行成功,用时:" + executionTimeSeconds + "秒";} finally {isExecuting.set(false);}} else {// 如果定时任务执行期间有手动执行请求,则驳回log.info("任务已经在执行中,本次请求被驳回");return "任务已经在执行中,本次请求被驳回";}}
定时任务在此与其共用一个程序,两者不可同时执行
package org.springblade.etl.source.task;import lombok.extern.slf4j.Slf4j;
import org.springblade.core.redis.cache.BladeRedis;
import org.springblade.core.tool.api.R;
import org.springblade.etl.source.service.AtomicityService;
import org.springblade.etl.source.service.DataSourceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.List;
import java.util.Map;@Component
@Slf4j
public class ETLJob {// 注入 dataSourceService@Resourceprivate AtomicityService atomicityService;@Resourceprivate BladeRedis bladeRedis;@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行一次public void executeETL() {// 执行 startETL 方法log.info("Scheduled ETL job started");bladeRedis.set("t_data_load_record_type", 1);R result = getGaugeTestData6();log.info("ETL job finished with result: " + result);}// 在此处定义 getGaugeTestData6 方法的具体实现public R getGaugeTestData6() {log.info("start for Scheduled ETL job");atomicityService.startETL();return R.data("success for Scheduled ETL job");}// @Scheduled(cron = "0/1 * * * * ?") // 每秒执行一次
// public void executeTestJob() {
// log.info("定时任务执行测试");
// }
}