有时候,我们需要在当前时间点往后延迟一定时间,再执行任务,该怎么实现呢?
1. 延迟任务方案
延迟任务的实现方案有很多,常见的有四类:
DelayQueue | Redisson | MQ | 时间轮 | |
---|---|---|---|---|
原理 | JDK自带延迟队列,基于阻塞队列实现。 | 基于Redis数据结构模拟JDK的DelayQueue实现 | 一些MQ本身支持延迟消息,例如RocketMQ 而RabbitMQ则需要通过插件来实现延迟消息 | 时间轮算法,其中Netty中有开源的实现 |
优点 | 不依赖第三方服务 | 分布式系统下可用不占用JVM内存 | 分布式系统下可以不占用JVM内存 | 不依赖第三方服务性能优异 |
缺点 | 占用JVM内存只能单机使用 | 依赖第三方服务 | 依赖第三方服务 | 只能单机使用 |
以上四种方案都可以解决问题,其中采用RabbitMQ实现延迟任务可参考我另一篇博客RabbitMQ安装配置,封装工具类,发送消息及监听,延迟消息
本例中我们会使用DelayQueue方案。这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。
但缺点也很明显,就是不能在分布式环境下使用,需要占用JVM内存,且在数据量非常大的情况下可能会有问题。。
如果项目中数据量非常大,DelayQueue不能满足业务需求,也可以替换为其它延迟队列方式,例如Redisson、MQ等
2. DelayQueue的原理
首先来看一下DelayQueue的源码:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();// ... 略
}
可以看到DelayQueue实现了BlockingQueue接口,是一个阻塞队列。队列就是容器,用来存储东西的。DelayQueue叫做延迟队列,其中存储的就是延迟执行的任务。
我们可以看到DelayQueue的泛型定义:
DelayQueue<E extends Delayed>
这说明存入DelayQueue
内部的元素必须是Delayed
类型,这其实就是一个延迟任务的规范接口。来看一下:
public interface Delayed extends Comparable<Delayed> {/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit* @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit);
}
从源码中可以看出,Delayed类型必须具备两个方法:
getDelay()
:获取延迟任务的剩余延迟时间compareTo(T t)
:比较两个延迟任务的延迟时间,判断执行顺序
可见,Delayed类型的延迟任务具备两个功能:获取剩余延迟时间、比较执行顺序。当然,我们可以对Delayed做实现和功能扩展,比如添加延迟任务的数据。
新建延迟任务时,放在这样的一个Delayed
类型的延迟任务并设定固定的延迟时间到DelayQueue
队列。DelayQueue
会调用compareTo
方法,根据剩余延迟时间对任务排序。剩余延迟时间越短的越靠近队首,这样就会被优先执行。
3. DelayQueue的用法
首先定义一个Delayed类型的延迟任务类,要能保持任务数据。
package com.gzdemo.delay.utils;import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.time.Duration;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class DelayedTask<T> implements Delayed {private T data;// 数据private long deadLineTime;// 当前消息什么时候到期// Duration jdk 提供的一个可以表示指定时间单位 时间的 对象public DelayedTask(T data, Duration duration) {this.data = data;deadLineTime =System.currentTimeMillis() +duration.toMillis();}/*** DelayQueue 的take() 方法 会不断调用 本方法,获取消息的剩余时间* 1) 消息剩余时间<=0 表示消息到期* 2) >0 表示消息未到期* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {//log.info("getDelay方法被调用了");//return deadLineTime-System.currentTimeMillis();return unit.convert(deadLineTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/*** 如果有多个消息,则 底层存储消息时会调用该方法进行排序* 返回值:* 如果>0 表示当前任务 到期时间 长,应该放入队列的后面* 如果<=0 表示当前任务 到期时间 断,应该放入队列的前面*/@Overridepublic int compareTo(Delayed other) {// 获取当前任务时间和其他任务时间的 差Long i = this.getDelay(TimeUnit.SECONDS) -other.getDelay(TimeUnit.SECONDS);return i.intValue();}
}
接下来就可以创建延迟任务,交给延迟队列保存:
import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.DelayQueue;@Slf4j
@RestController
public class TestController {@GetMapping("/addDelayedTask")public String addDelayedTask() throws InterruptedException {DelayQueue<DelayedTask<String>> queue = new DelayQueue(); // 模拟了一个 消息队列(理解为搭建了一台RabbitMQ)// 模拟消息的生产者queue.add(new DelayedTask("任务1的数据", Duration.ofSeconds(8))); // 添加任务到队列queue.add(new DelayedTask("任务2的数据", Duration.ofSeconds(3))); // 添加任务到队列queue.add(new DelayedTask("任务3的数据", Duration.ofSeconds(5))); // 添加任务到队列// queue.add(new DelayedTask("任务3的数据")); // 添加任务到队列log.info("{} 消息放入了...", LocalTime.now());// 模拟消息的消费者while (true) {// take() 方法可以理解为 监听器 Listener , 如果队列中有要返回的任务,则返回任务,放行// 如果没有就等待DelayedTask<String> task = queue.take();log.info("{} 收到了消息:{}",LocalTime.now(),task.getData());String data = task.getData();}}
}
注意:
这里我们是直接同一个线程来执行任务了。当没有任务的时候线程会被阻塞。而在实际开发中,我们会准备线程池,开启多个线程来执行队列中的任务。
测试
4. 项目中使用案例
工具类,这原本是一个Redis相关的业务工具类,在创建使用redis过程中,需要用到延时任务,在这里把除了延时任务外的别的内容去掉了,展示了项目中要使用DelayQueue的大概骨架。
import com.gzdemo.delay.domain.LearningRecord;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;@Component
@Slf4j
@RequiredArgsConstructor
public class LearningRecordCacheHandler {private DelayQueue<DelayedTask<RecordQueueData>> queue = new DelayQueue<>();// 消息的消费时机: 启动时拉取/*** @PostConstruct: 是spring 支持的一个注解* 写到方法上,表述 当前类的构造方法执行后执行: 初始化对象后执行*/@PostConstructpublic void init(){System.out.println("主线程...对象创建了");// 启动时开启一个子线程,主线程不会阻塞// Thread 对象 创建的线程,不支持后续扩展线程池
// new Thread(() -> {
// this.dealData();;
// }).start();// jdk 8 提供了支持线程池,创建线程的方式 CompletableFutureCompletableFuture.runAsync(()->{this.dealData();;});}public void dealData(){while (true){try {// 获取延时消息DelayedTask<RecordQueueData> task = queue.take();RecordQueueData data = task.getData();log.info("{},延时队列获取到了消息:{}", LocalTime.now(),data);} catch (InterruptedException e) {e.printStackTrace();}}}// 写入消息到队列public void writeRecord2Queue(LearningRecord record){log.info("{},延时队列放入了消息lessonId为{}",LocalTime.now(),record.getLessonId());queue.add(new DelayedTask<>(new RecordQueueData(record), Duration.ofSeconds(15)));}// 队列中的消息对象@Data@NoArgsConstructorpublic class RecordQueueData{private Integer moment;private Long lessonId;private Long sectionId;public RecordQueueData(LearningRecord record) {this.moment = record.getMoment();this.lessonId = record.getLessonId();this.sectionId = record.getSectionId();}}}
通用的DelayedTask
@Data
@Slf4j
public class DelayedTask<T> implements Delayed {private T data;// 数据private long deadLineTime;// 当前消息什么时候到期// Duration jdk 提供的一个可以表示指定时间单位 时间的 对象public DelayedTask(T data, Duration duration) {this.data = data;deadLineTime =System.currentTimeMillis() +duration.toMillis();}/*** DelayQueue 的take() 方法 会不断调用 本方法,获取消息的剩余时间* 1) 消息剩余时间<=0 表示消息到期* 2) >0 表示消息未到期* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {//log.info("getDelay方法被调用了");//return deadLineTime-System.currentTimeMillis();return unit.convert(deadLineTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/*** 如果有多个消息,则 底层存储消息时会调用该方法进行排序* 返回值:* 如果>0 表示当前任务 到期时间 长,应该放入队列的后面* 如果<=0 表示当前任务 到期时间 断,应该放入队列的前面*/@Overridepublic int compareTo(Delayed other) {// 获取当前任务时间和其他任务时间的 差Long i = this.getDelay(TimeUnit.SECONDS) -other.getDelay(TimeUnit.SECONDS);return i.intValue();}
}
添加任务的测试controller
@AutowiredLearningRecordCacheHandler learningRecordCacheHandler;@GetMapping("/addDelayedTask2/{lessonId}")public String addDelayedTask2(@PathVariable Long lessonId) throws InterruptedException {LearningRecord record = LearningRecord.builder().lessonId(lessonId).moment(234).sectionId(21412L).build();learningRecordCacheHandler.writeRecord2Queue(record);return "添加消息成功";}
测试
5. 完整的实际开发案例
解释:
1.主要使用了Redis和DelayQueue
2.用户在网站上播放视频时,视频每隔15s会上传用户当前播放的视频的进度到后台;每次更新mysql效率低,因此使用缓存,查询的时候直接查Redis;但需要将Redis中的数据持久化到mysql中(然后删掉Redis中相关数据,否则Redis中数据越来越多),这里的判断持久化时机的逻辑是,当Redis中的数据保持20秒没有变动,就说明用户在这段时间内退出视频了,就应该持久化。
3.“当Redis中数据保持20秒没有变动”,需要使用到延迟任务。Redis中相关数据改动一项,就新建一个延时任务(将数据放到任务中)放到DelayQueue中,20秒后,判断该延迟任务中的数据和Redis中对应数据是否一致。
4.延迟任务的监听应该开新线程,使用CompletableFuture或别的。
5.为了方便开发,定义了2个内部类,供Redis和DelayedTask使用。
package com.xxx.learning.utils;import com.tianji.common.utils.JsonUtils;
import com.tianji.learning.domain.po.LearningLesson;
import com.tianji.learning.domain.po.LearningRecord;
import com.tianji.learning.mapper.LearningRecordMapper;
import com.tianji.learning.service.ILearningLessonService;
import com.tianji.learning.service.ILearningRecordService;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.function.Supplier;/*** 添加学习记录处理器工具类* 1) 处理缓存* 2)*/
@Component
@Slf4j
@RequiredArgsConstructor
public class LearningRecordCacheHandler {private final StringRedisTemplate redisTemplate;private final ILearningLessonService lessonService;private final LearningRecordMapper recordMapper;// 缓存前缀private static final String LEARNING_RECORD_CACHE_PREFIX="learning:record:";private DelayQueue<DelayedTask<RecordQueueData>> queue = new DelayQueue<>();// 消息的消费时机: 启动时拉取/*** @PostConstruct: 是spring 支持的一个注解* 写到方法上,表述 当前类的构造方法执行后执行: 初始化对象后执行* @PreDestroy 是spring 支持的一个注解* 写到方法上,表述 当前类(正常关闭容器)销毁前执行该方法*/@PostConstructpublic void init(){System.out.println("主线程...对象创建了");// 启动时开启一个子线程,主线程不会阻塞// Thread 对象 创建的线程,不支持后续扩展线程池
// new Thread(() -> {
// this.dealData();;
// }).start();// jdk 8 提供了支持线程池,创建线程的方式 CompletableFutureCompletableFuture.runAsync(()->{this.dealData();;});}@PreDestroypublic void destroy(){System.out.println("对象销毁了");}public void dealData(){while (true){try {// 获取延时消息DelayedTask<RecordQueueData> task = queue.take();RecordQueueData data = task.getData();log.info("延时队列获取到了消息");// 获取redis 中的消息LearningRecord record = readRecordCache(data.getLessonId(), data.getSectionId());/*** 比较消息* 如果消息不一致,证明: 用户在持续播放, 抛弃消息*/if(!data.getMoment().equals(record.getMoment())){log.info("redis 中的数据和 延时消息不一致... 不处理");continue;}log.info("redis 中的数据和 延时消息一致... 处理..写入数据库");// 一致 则证明用户暂停播放, 写入数据库// 更新 recordLearningRecord newRecord = new LearningRecord();newRecord.setId(record.getId());newRecord.setMoment(record.getMoment());recordMapper.updateById(newRecord);// 更新 lessonLearningLesson lesson = new LearningLesson();lesson.setId(data.getLessonId());lesson.setLatestSectionId(data.sectionId);lesson.setLatestLearnTime(LocalDateTime.now().minusSeconds(20));lessonService.updateById(lesson);} catch (InterruptedException e) {e.printStackTrace();}}}// 写入消息到队列public void writeRecord2Queue(LearningRecord record){log.info("延时队列放入了消息:");queue.add(new DelayedTask<>(new RecordQueueData(record),Duration.ofSeconds(20)));}//1) 写入缓存public void writeRecordCache(LearningRecord record){log.info("缓存放入了消息:");// 1.转成只有三个属性的对象RecordCacheData cacheData = new RecordCacheData(record);// 2. 转成json 字符串String jsonData = JsonUtils.toJsonStr(cacheData);// 3 拼接key learning:record:6688String key =LEARNING_RECORD_CACHE_PREFIX+record.getLessonId();// 4 存储缓存(覆盖)redisTemplate.opsForHash().put(key,record.getSectionId().toString(),jsonData);// 5 设置超时时间redisTemplate.expire(key, Duration.ofSeconds(60));}//2) 读取缓存public LearningRecord readRecordCache(Long lessonId,Long sectionId){//1 拼接key learning:record:6688String key =LEARNING_RECORD_CACHE_PREFIX+lessonId;// 真正的是StringObject o = redisTemplate.opsForHash().get(key, sectionId.toString());if(o==null){return null;}// {id:1,moment:30,finshsend:false}// 转成对象LearningRecord record = JsonUtils.toBean(o.toString(), LearningRecord.class);record.setLessonId(lessonId);record.setSectionId(sectionId);return record;}//3) 删除缓存public void removeRecordCache(Long lessonId,Long sectionId){//1 拼接key learning:record:6688String key =LEARNING_RECORD_CACHE_PREFIX+lessonId;redisTemplate.opsForHash().delete(key,sectionId.toString());}// 存储缓存的对象@Data@NoArgsConstructorpublic class RecordCacheData{private Long id;private Boolean finished;private Integer moment;public RecordCacheData(LearningRecord record) {this.id = record.getId();this.finished = record.getFinished();this.moment = record.getMoment();}}// 队列中的消息对象@Data@NoArgsConstructorpublic class RecordQueueData{private Integer moment;private Long lessonId;private Long sectionId;public RecordQueueData(LearningRecord record) {this.moment = record.getMoment();this.lessonId = record.getLessonId();this.sectionId = record.getSectionId();}}}