一、接着上文
RDelayedQueue作为redisson封装的一个分布式延迟队列,直接拿来使用还是比较简单的。
本文主要包括以下几部分:
- 保存至延迟队列(生产者)
- 读取延迟队列(消费者)
- 从延迟队列移除任务
二、redission配置
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Redisson配置类** @author xxx*/
@Configuration
public class RedissonConfig {@Value("${spring.application.name}")private String serverName;@Beanpublic RedissonClient redissonClient(RedisProperties redisProperties) {Config config = new Config();SingleServerConfig singleServerConfig = config.useSingleServer();singleServerConfig.setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort());singleServerConfig.setPassword(redisProperties.getPassword());singleServerConfig.setKeepAlive(true);singleServerConfig.setDatabase(redisProperties.getDatabase());singleServerConfig.setConnectionMinimumIdleSize(2);singleServerConfig.setConnectionPoolSize(4);singleServerConfig.setClientName(serverName);return Redisson.create(config);}
}
spring:application:name: delay-task-serviceredis:host: 192.168.8.18port: 6379database: 0timeout: 3000
三、保存至延迟队列(生产者)
作为延迟任务的生产者,你需要根据预期的回调时间,计算出delay延迟时间。
伪代码见下:
public static final String REDISSON_QUEUE_NAME = "DelayTaskQueue";private final RedissonClient redissonClient;RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);long delay = DateUtil.between(event.getNotifyDate(), new DateTime(), DateUnit.SECOND);delayedQueue.offer(event.getTransNo(), delay < 0 ? 1 : delay, TimeUnit.SECONDS);
四、读取延迟队列(消费者)
public static final String REDISSON_QUEUE_NAME = "DelayTaskQueue";private final RedissonClient redissonClient;@PostConstructpublic void init() {new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()).execute(() -> {while (true) {try {RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDISSON_QUEUE_NAME);RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);String transNo = blockingDeque.take();if (null == transNo) {return;}if (log.isInfoEnabled()) {log.info("开始执行延迟队列中的任务,transNo={}", transNo);}// 异步执行你的操作notifyTaskService.handleTask(transNo, null);} catch (Exception e) {log.error("延时队列的任务执行出现异常", e);}}});}
五、从延迟队列移除任务
public static final String REDISSON_QUEUE_NAME = "DelayTaskQueue";private final RedissonClient redissonClient;RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.remove(transNo);
六、总结
本文主要是摘要一些源码,仅供参考。
附:相关系列文章链接
延时任务通知服务的设计及实现(一)-- 设计方案
延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue
延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue
延时任务通知服务的设计及实现(四)-- webhook执行任务