1.在application.yaml中配置延时队列信息
#延时队列
redis-delay-queue:enabled: truename: delay_queue_demo
2.定义延时队列
/**
* 延时队列
*/
public class RedisDelayedQueue {private static Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);@ResourceRedissonClient redissonClient;@Value("${redis-delay-queue.name}")public String queueName;/*** 添加队列** @param message Message泛型类* @param delay 时间数量* @param timeUnit 时间单位* @param <T> 泛型*/public <T> void addQueue(DelayMessage<T> message, long delay, TimeUnit timeUnit) {logger.info("添加队列delay:{},timeUnit:{}", delay, timeUnit);RBlockingQueue<DelayMessage<T>> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<DelayMessage<T>> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(message, delay, timeUnit);}/*** 移除队列** @param message Message泛型类* @param <T> 泛型*/public <T> void remove(DelayMessage<T> message) {RBlockingQueue<DelayMessage<T>> blockingFairQueue = this.redissonClient.getBlockingQueue(queueName);RDelayedQueue<DelayMessage<T>> delayedQueue = this.redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.remove(message);}
3.导入自动装配类
@Slf4j
@Import({RedisDelayedQueue.class})
@ConditionalOnProperty(name = "redis-delay-queue.enabled")
public class RedisDelayQueueAutoConfiguration {@PostConstructpublic void init(){log.info("init redis delay queue success!");}
}
4.添加自动装配配置 (在resources目录下的META-INF目录下新增一个spring.factories )
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.haha.framework.redis.delay.autoconfiguration.RedisDelayQueueAutoConfiguration
5.业务类应用延时队列
@Resourceprivate RedisDelayedQueue redisDelayedQueue;
// 移出延时队列
String orderKey = RedisKeys.OrderKeys.ORDER_TIMING_CLOSE_KEY + orderId;
redisDelayedQueue.remove(DelayMessage.of(OrderListener.class.getName(), orderKey));
// 添加到延时队列
String orderKey = RedisKeys.OrderKeys.ORDER_TIMING_CLOSE_KEY + orderId;
redisDelayedQueue.addQueue(DelayMessage.of(OrderListener.class.getName(), orderKey),Long.parseLong(delayTime), TimeUnit.SECONDS);
6.添加延时队列监听器
/*** 订单迟队列监听器**/
@Slf4j
@Component
public class OrderListener implements RedisDelayedQueueListener<String> {@Autowiredprivate OrderHandler handler;@Overridepublic void invoke(String delayMessageKey) {log.info("执行订单延迟任务....key:{}", delayMessageKey);if (delayMessageKey.contains(RedisKeys.OrderKeys.ORDER_TIMING_CLOSE_KEY)) {handler.timeOutToCloseOrder(delayMessageKey);return;}}