直接分享代码
package com.xxx.init.delayJob;import lombok.Data;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** User:Json* Date: 2024/5/15* java 内置的延迟队列* 服务器重启后,数据全部消失,怕宕机* 他是无界队列 可以一直放 有可能导致内存满的问题* 所以不宜处理 任务比较多的业务* 用来处理 任务少并不是很重要的数据* 如果数据放的太快 有可能 顺序也会错 **/
@Data
public class DelayTask<T> implements Delayed {final private T data;final private long expire;/*** 构造延时任务* @param data 业务数据* @param expire 任务延时时间(ms) 分钟*/public DelayTask(T data, long expire) {super();this.data = data;this.expire = expire * 60 * 1000 + System.currentTimeMillis();}//返回任务的剩余延迟时间,用于决定任务何时可以从队列中取出@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire - System.currentTimeMillis(), unit);}// 比较两个延迟任务的优先级,决定它们在队列中的顺序//如果delta等于0,说明两个任务的延迟时间相同,返回0。//如果delta小于0,说明当前任务的延迟时间小于另一个任务,返回-1,表示当前任务优先级更高。//如果delta大于0,说明当前任务的延迟时间大于另一个任务,返回1,表示当前任务优先级更低。@Overridepublic int compareTo(Delayed o) {long delay1 = getDelay(TimeUnit.NANOSECONDS);long delay2 = o.getDelay(TimeUnit.NANOSECONDS);return Long.compare(delay1, delay2);}
}
package com.xxx.iotjava.utils;import com.xxx.init.delayJob.DelayTask;
import com.xxx.iotjava.entities.SkeAirCdRealtimeData;
import com.xxx.iotjava.job.AicCdJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.util.concurrent.DelayQueue;/*** User:Json* Date: 2024/5/15* java 内置的延迟队列* 服务器重启后,数据全部消失,怕宕机* 他是无界队列 可以一直放 有可能导致内存满的问题* 所以不宜处理 任务比较多的业务* 用来处理 任务少并不是很重要的数据**/
@Component
@Slf4j
@Order(3)
public class DelayQueueUtil<T> implements CommandLineRunner {private final DelayQueue<DelayTask<T>> delayQueue = new DelayQueue<>();private volatile boolean running = true;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;@AutowiredAicCdJob aicCdJob;/*** 加入到延时队列中* @param task*/public void put(DelayTask<T> task) {// log.info("加入延时任务时间:"+LocalDateTime.now()+",加入延时任务:"+task);delayQueue.put(task);}/*** 取消延时任务* @param task* @return*/public boolean remove(DelayTask<T> task) {//log.info("取消延时任务时间:"+LocalDateTime.now()+",取消延时任务:{}", task);return delayQueue.remove(task);}/*** 取消延时任务* @param taskid* @return*/public boolean remove(T taskid) {return remove(new DelayTask<>(taskid, 0));}//初始化延迟线程@Overridepublic void run(String... args) {//log.info("初始化延时队列");taskExecutor.execute(this::executeThread);}/*** 延时任务执行线程*/public void executeThread() {while (running) {try {DelayTask<T> task = delayQueue.take();processTask(task);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}/*** 内部执行延时任务* @param task*/public void processTask(DelayTask<T> task) {// log.info("执行延时任务时间:"+LocalDateTime.now()+"【参数】:"+task);if(task.getData() instanceof SkeAirCdRealtimeData){aicCdJob.aicCdRealTimeDatAll();}else if(task.getData() instanceof String){System.out.println(task.getData());}// 根据task中的data自定义数据来处理相关逻辑,例如 if (task.getData() instanceof XXX) {}}/*** 停止延时队列*/public void stop() {running = false;Thread.currentThread().interrupt();taskExecutor.shutdown();}@PreDestroypublic void onDestroy() {stop();}}
调用
@AutowiredDelayQueueUtil delayQueueUtil;@GetMapping("index18")@ApiOperation(value = "测试延迟任务")public R index18(){for (int i=0;i<=20;i++){delayQueueUtil.put(new DelayTask<>("第"+i+"次", 2));}//delayQueueUtil.put(new DelayTask<>(new SkeAirCdRealtimeData(), 1));return R.success();}