延迟处理是一个非常常用的一个功能;
例如, 下单成功后,在30分钟内没有支付,自动取消订单;
延迟队列便是延迟处理中最常见的实现方式;
先一起看下JDK中延迟队列是如何实现的.
JUC的DelayQueue
在JDK中, 提供了一套延迟队列的实现, 是JUC包中DelayQueue类.
在使用时只需要让处理的元素对象实现Delayed接口, 就可以根据延迟时间实现延迟处理了.
DelayQueue队列内部是用优先队列实现的, 优先队列的实现原理可以参考小顶堆.
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();
}
元素对象需要实现Delayed接口的两个方法
(1) compareTo()
元素自定义方法实现, 根据延时时间, 确定元素在队列中的位置; 元素剩余延时时间越小排列越靠前, 反之越靠后;
(2) getDelay()
元素自定义方法实现, 判断元素剩余延时时间;
public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}
弹出元素时, 会根据元素对象的剩余延时方法getDelay(), 判断元素是否应该被弹出; 后续逻辑可以根据业务需要继续轮询或休眠等待一段时间.
public E poll() {
E first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();
}
虽然DelayQueue不能满足分布式要求, 但它却提供了一个很好的延迟处理框架, 可以根据不同的底层存储介质替换PriorityQueue实现.
例如, Redis中的Zset.
Redis Zset
在上述DelayQueue框架的基础上, 使用zset代替PriorityQueue存储, 并用延迟时间作为zset的score项, 很容易就能实现一个分布式的高性能延迟队列.
Redis过期事件监听
利用Redis的事件监听机制, 还有另外一种方式实现延迟处理.
Redis可以根据需要, 修改redis.conf配置, 实现对一些事件的监听, 其中就包括key过期事件.
redis.conf 配置
notify-keyspace-events Ex
这个事件监听是通过pubsub机制实现的, 所以业务代码中实现对事件的订阅, 就可以知道哪个key过期了.
PUBSUB 主题:
<db>是指redis的database
__keyevent@<db>__:expired
有了上述事件监听基础, 将延期事件对应key存入Redis, 并根据延迟时间设置key过期时间, 当key过期时, 便能触发监听事件, 完成延迟处理逻辑.
当然, 除了上述方式之外, 还可以使用定时任务轮询, 死信队列等等方式实现延迟处理