文章目录
- DelayQueue
- 基本概念
- Delayed接口
- 示例
- 内部实现原理
- 应用场景
- 小结
- 模拟超时订单处理
- Order 实体类
- OrderController 类
- OrderService 类
- OrderConsumer 类
DelayQueue
DelayQueue
是一个有序的阻塞队列,用于在指定的延迟之后从队列中提取元素。它在调度任务、缓存清除、延迟任务
处理等场景中非常有用。
基本概念
DelayQueue
位于java.util.concurrent
包中,它是一个无界的阻塞队列
,元素必须实现Delayed
接口。队列中的元素按照它们的到期时间排序,只有到期的元素才能从队列中提取。
Delayed接口
Delayed
接口扩展了Comparable
接口,要求实现以下方法:
long getDelay(TimeUnit unit)
: 返回元素的剩余延迟时间。int compareTo(Delayed other)
: 用于比较元素的到期时间。
/*** Test类实现了Delayed接口* Delayed接口要求实现getDelay和compareTo方法。*/
public class Test implements Delayed {/*** 返回元素的延迟时间。* @param unit 时间单位* @return 延迟时间,单位为指定的时间单位。*/@Overridepublic long getDelay(TimeUnit unit) {return 0;}/*** 比较两个延迟元素的顺序。* @param o 另一个延迟元素* @return 比较结果,负数表示当前元素在前,正数表示当前元素在后,0表示相等。*/@Overridepublic int compareTo(Delayed o) {return 0;}
}
示例
下面是一个简单的示例,展示了如何使用DelayQueue
。
第一步:定义延时任务
/*** DelayTask类实现了Delayed接口,用于表示延迟任务。* @param <T> 任务数据的类型*/
@Data
public class DelayTask<T> implements Delayed {// 执行的任务数据private T data;// 任务的执行时间,使用纳秒表示private long activeTime;public DelayTask(T data, Duration delayTime) {this.data = data;this.activeTime = System.nanoTime() + delayTime.toNanos();}/*** 获取任务的剩余延迟时间。** @param unit 时间单位* @return 剩余的延迟时间,单位为指定的时间单位*/@Overridepublic long getDelay(TimeUnit unit) {// unit时间单位// 而 convert 方法用于将时间从一个单位转换到另一个单位return unit.convert(Math.max(0, activeTime - System.nanoTime()), TimeUnit.NANOSECONDS);}/*** 比较两个延迟任务的顺序。** @param o 另一个延迟任务* @return 比较结果,负数表示当前任务在前,正数表示当前任务在后,0表示相等*/@Overridepublic int compareTo(Delayed o) {long l = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);if (l > 0) {return 1;} else if (l < 0) {return -1;} else {return 0;}}
}
第二步:测试
/*** DelayTaskTest类用于测试DelayQueue的使用。* 通过SpringBootTest注解加载Spring应用上下文。*/
@SpringBootTest(classes = LearningApplication.class)
@Slf4j
public class DelayTaskTest {@Testpublic void test() throws InterruptedException {// 创建一个DelayQueue,用于存放DelayTask任务DelayQueue<DelayTask> tasks = new DelayQueue<>();tasks.add(new DelayTask("task1", Duration.ofSeconds(3)));tasks.add(new DelayTask("task2", Duration.ofSeconds(6)));tasks.add(new DelayTask("task3", Duration.ofSeconds(7)));// 无限循环,从队列中取出并执行任务while (true) {// 从队列中取出到期的任务,如果没有到期任务则阻塞等待DelayTask take = tasks.take();// 处理任务 这里仅打印log.info("{} is executed", take.getData());}}
}
DelayTaskTest类
通过测试方法展示了如何使用DelayQueue来处理延迟任务。它向队列中添加了多个延迟任务,并通过无限循环从队列中取出并执行到期的任务。这种机制在定时任务调度、延迟消息处理等场景中非常有用。
取出任务用take()
: 检索并移除队列头部的元素,如果没有到期元素则阻塞
。
内部实现原理
DelayQueue
的内部实现依赖于PriorityQueue
,PriorityQueue
保证了队列元素的自然顺序或通过提供的比较器进行排序。
- 添加元素:当元素被添加到
DelayQueue
时,会根据其到期时间进行排序。【有序】 - 提取元素:只有在
getDelay()
方法返回的延迟时间小于等于零时,元素才能从队列中提取出来。
应用场景
DelayQueue
可以应用在以下场景:
- 定时任务调度:用于执行延迟任务。比如
订单超时取消
- 缓存过期处理:在缓存中存储元素,并在元素到期时将其移除。
- 消息延迟处理:在消息队列中处理延迟消息。
小结
DelayQueue
在处理延迟任务和定时调度任务中非常有用。通过实现Delayed
接口,可以轻松地创建自定义的延迟元素
,并将其添加到DelayQueue
中进行管理。
模拟超时订单处理
Order 实体类
@Data
public class Order implements Delayed {private String orderId;private boolean completed;private long endTime;public Order(String orderId, Duration delayTime) {this.orderId = orderId;this.completed = false;this.endTime = System.currentTimeMillis() + delayTime.toMillis();}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));}
}
OrderController 类
@RestController
@RequestMapping("/orders")
public class OrderController {@Autowiredprivate OrderService orderService;@PostMapping("/create")public String createOrder(@RequestParam String orderId, @RequestParam long delayInSeconds) {orderService.createOrder(orderId, delayInSeconds);return "Order created: " + orderId;}@PostMapping("/complete")public String completeOrder(@RequestParam String orderId) {boolean result = orderService.completeOrder(orderId);return result ? "Order completed: " + orderId : "Order not found or already completed: " + orderId;}// ...
}
OrderService 类
@Service
public class OrderService {@Autowiredprivate OrderMapper orderMapper;private final DelayQueue<Order> delayQueue = new DelayQueue<>();public OrderService() {OrderConsumer consumer = new OrderConsumer(delayQueue, this);Thread consumerThread = new Thread(consumer);consumerThread.start();}public void createOrder(String orderId, long delayInSeconds) {Order order = new Order(orderId, Duration.ofSeconds(delayInSeconds));delayQueue.add(order);orderMapper.insertOrder(order); // 插入订单到数据库}public boolean completeOrder(String orderId) {for (Order order : delayQueue) {if (order.getOrderId().equals(orderId) && !order.isCompleted()) {order.complete();orderMapper.updateOrderStatus(orderId, "COMPLETED");return true;}}return false;}public void cancelOrder(String orderId) {orderMapper.updateOrderStatus(orderId, "CANCELLED");}
}
OrderConsumer 类
@Slf4j
public class OrderConsumer implements Runnable {private final DelayQueue<Order> delayQueue;private final OrderService orderService;public OrderConsumer(DelayQueue<Order> delayQueue, OrderService orderService) {this.delayQueue = delayQueue;this.orderService = orderService;}@Overridepublic void run() {while (true) {try {Order order = delayQueue.take();if (!order.isCompleted()) {log.info("Order {} is cancelled due to timeout.", order.getOrderId());orderService.cancelOrder(order.getOrderId());} else {log.info("Order {} is completed.", order.getOrderId());}} catch (InterruptedException e) {log.error(e);}}}
}
❤觉得有用的可以留个关注~❤