检测逻辑
package com.alibaba.gts.flm.push.data.client.service;import com.alibaba.fastjson.JSONObject;
import com.alibaba.gts.flm.push.data.client.common.util.DateUtil;
import com.alibaba.gts.flm.push.data.client.service.model.FcoWarningKeyDTO;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;@Service
@Slf4j
public class FlowCutOffWarningService {@Autowiredprivate RestTemplateService restTemplateService;@Value("${sendDingDingUrl:-}")private String sendDingDingApiUrl;@Value("${fcowTaskLimit:5}")private Integer fcowTaskLimit;private ScheduledExecutorService executorService;private Map<String, Long> keyTime = new ConcurrentHashMap<>();private Map<String, JSONObject> taskStatus = new ConcurrentHashMap<>();// uuid// 每秒许可数 = 12次 / 60秒private Map<String, RateLimiter> taskRateLimiters = new ConcurrentHashMap<>();private AtomicInteger threadCount;// 启动定时任务// limitVal : 每秒许可数 = 12次 / 60秒public void startTask(List<FcoWarningKeyDTO> fcoWarningKeyDTOS, Long second, Double limitVal) {if (fcoWarningKeyDTOS == null || fcoWarningKeyDTOS.size() == 0 || second == null || limitVal == 0) {throw new RuntimeException("参数为空");}if (executorService == null) {executorService = new ScheduledThreadPoolExecutor(fcowTaskLimit);threadCount = new AtomicInteger(0);}if (threadCount.get() >= fcowTaskLimit) {throw new RuntimeException("断流检测任务上限5个,需要启动更多任务请修改配置[fcowTaskLimit]的值");}String uuid = UUID.randomUUID().toString();taskRateLimiters.put(uuid, RateLimiter.create(limitVal));log.info("启动[{}秒]断流监测任务...uuid:{}", second, uuid);executorService.scheduleWithFixedDelay(() -> {try {//log.info("执行[{}秒]断流监测任务...", second);JSONObject status = new JSONObject();status.put("fcoWarningKeyDTOS", fcoWarningKeyDTOS);status.put("second", second);status.put("lastRunTime", DateUtil.now());taskStatus.put(uuid, status);StringBuilder sb = compare(fcoWarningKeyDTOS, second, keyTime);if (sb == null || sb.length() == 0) {return;}if (!taskRateLimiters.get(uuid).tryAcquire()) {log.warn("发送钉钉过于频繁,本次忽略,uuid:{},limitVal:{}", uuid, limitVal);return;}sendDingDing(sb);} catch (Exception e) {log.error("断流监测任务执行出错,fcoWarningKeyDTOS:{},second:{}秒,error:{}", JSONObject.toJSONString(fcoWarningKeyDTOS), second, ExceptionUtils.getStackTrace(e));}}, second, second, TimeUnit.SECONDS);log.info("[{}秒]断流监测任务启动成功,uuid:{}", second, uuid);threadCount.set(threadCount.get() + 1);}public Map<String, JSONObject> getTaskStatus() {return taskStatus;}public Map<String, Long> getKeyTime() {return keyTime;}public Map<String, String> getKeyTimeFormat() {Map<String, String> map = new HashMap<>();keyTime.forEach((k, v) -> {map.put(k, DateUtil.format(v));});return map;}public void flow(String k) {keyTime.put(k, System.currentTimeMillis());}public StringBuilder compare(List<FcoWarningKeyDTO> fcoWarningKeyDTOS, Long second, Map<String, Long> keyTime) {StringBuilder sb = null;for (FcoWarningKeyDTO fcoWarningKeyDTO : fcoWarningKeyDTOS) {if (keyTime.containsKey(fcoWarningKeyDTO.getCode()) && (System.currentTimeMillis() - keyTime.get(fcoWarningKeyDTO.getCode())) / 1000 > second) {String lastTime = keyTime.containsKey(fcoWarningKeyDTO.getCode()) ? DateUtil.format(keyTime.get(fcoWarningKeyDTO.getCode())) : "-";sb = append(sb, fcoWarningKeyDTO, lastTime, second);}}return sb;}private StringBuilder append(StringBuilder sb, FcoWarningKeyDTO fcoWarningKeyDTO, String lastTime, Long second) {if (sb == null) {sb = new StringBuilder();}sb.append("#### " + "[" + fcoWarningKeyDTO.getName() + "]数据断流").append("\n\n");sb.append(" > 时间: ").append(DateUtil.now()).append("\n\n");sb.append(" > 描述: ").append(second).append("秒内无数据").append("\n\n");sb.append(" > 上次数据时间: " + lastTime).append("\n\n");return sb;}private void sendDingDing(StringBuilder sb) {JSONObject req = new JSONObject();JSONObject at = new JSONObject();at.put("isAtAll", "false");req.put("title", "数据断流警告");req.put("text", sb);req.put("at", at);restTemplateService.post(sendDingDingApiUrl, req);}
}
使用
@Autowiredprivate FlowCutOffWarningService flowCutOffWarningService;
List<FcoWarningKeyDTO> fcoWarningKeyDTOS3Second = new LinkedList<FcoWarningKeyDTO>() {{add(new FcoWarningKeyDTO("test", "测试"));}};flowCutOffWarningService.startTask(fcoWarningKeyDTOS3Second, 3L, 0.003);
RestTemplateService
springboot(39) : RestTemplate完全体_Lxinccode的博客-CSDN博客