一、SpringBoot配置Redisson
1.1 引入依赖
<!--Redisson延迟队列-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.1</version>
</dependency>
1.2 代码配置
@Data
@Component
@RefreshScope
@ConfigurationProperties("spring.redis")
public class RedisConfigProperties {String host;String password;Cluster cluster;
}@Data
class Cluster {Boolean enable;List<String> nodes;
}
@Component
@Configuration
@RequiredArgsConstructor
public class RedissonConfig {private final RedisConfigProperties redisConfig;@Beanpublic RedissonClient redissonClient() {Config config = new Config();if (redisConfig.getCluster() != null && Boolean.TRUE.equals(redisConfig.getCluster().getEnable())) {ClusterServersConfig clusterServersConfig = config.useClusterServers();for(String node : redisConfig.getCluster().getNodes()) {clusterServersConfig.addNodeAddress("redis://" + node);}if (StrUtil.isNotBlank(redisConfig.getPassword())){clusterServersConfig.setPassword(redisConfig.getPassword());}} else {SingleServerConfig serverConfig = config.useSingleServer();serverConfig.setAddress("redis://"+redisConfig.getHost()+":6379");System.out.println("============================================================");System.out.println("redisson设置的地址为:" + "redis://"+redisConfig.getHost()+":6379");System.out.println("============================================================");if (StrUtil.isNotBlank(redisConfig.getPassword())){serverConfig.setPassword(redisConfig.getPassword());}}return Redisson.create(config);}
}
1.3 application.yml中配置
spring:redis:host: 127.0.0.1
二、延时队列具体使用
2.1 编写一个工具类RedisDelayQueueUtil
/*** @Description: redission延迟队列工具类*/
@Slf4j
@Component
@RefreshScope
public class RedisDelayQueueUtil {// day代表单位是天,minutes代表单位是分钟(也可以是秒seconds, 但这个不在下面代码示例处理)@Value("${spring.mode}")private String mode;@Autowiredprivate RedissonClient redissonClient;/*** 添加延迟队列* @param queueCode 队列键* @param value 队列值* @param delay 延迟时间* @param <T>*/public <T> void addDelayQueue(String queueCode, String value, long delay) {try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(value, delay, "day".equals(testMode) ? TimeUnit.MINUTES : TimeUnit.DAYS);log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, delay + "天");} catch (Exception e) {throw new RuntimeException("(添加延时队列失败)");}}/*** 删除延迟队列* @param queueCode 队列键* @param value 队列值* @param <T>*/public <T> void removeDelayQueue(String queueCode, String value){try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.remove(value);log.info("(删除延时队列成功) 队列键:{},队列值:{}", queueCode, value);} catch (Exception e) {throw new RuntimeException("(删除延时队列失败)");}}/*** 获取延迟队列* @param queueCode 队列键* @param <T>* @return* @throws InterruptedException*/public <T> T getDelayQueue(String queueCode) throws InterruptedException {RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);redissonClient.getDelayedQueue(blockingDeque);T value = (T) blockingDeque.take();return value;}/*** @param 移除延时队列全部任务* @param code* @param task*/public void removeTask(String code, String value) {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(code);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);List<String> c = new ArrayList<>();c.add(value);delayedQueue.removeAll(c);}}
2.2 在application.yml中配置时间单位
spring:mode: day
2.3 延迟队列枚举类
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {// 这里可以配置多个枚举项,每个枚举项对应一个实现类OVER_TIME("OVER_TIME", "超时触发", "overTimeImpl"),;// 延迟队列 Redis Keyprivate String code;// 中文描述private String name;/*** 延迟队列具体业务实现的 Bean* 可通过 Spring 的上下文获取*/private String beanId;}
2.4 延迟队列枚举类中配置的实现类
@Slf4j
@Component
public class ActOverTimeImpl implements RedisQueueHandle<String> {@Autowired@LazyTestService testService;/*** 任务超时,监听* 可以在里面调用service的代码去处理具体的业务逻辑* @param value*/@Overridepublic void execute(String value) {log.info("接收到延迟任务【超时提醒】:{}", value);testService.dealDelayQueueExpire(value);}
}
2.5 项目启动时使用其它线程控制全部延时队列
@Slf4j
@Component
@AllArgsConstructor
public class AppStartRunner implements ApplicationRunner {private final RedisDelayQueueUtil redisDelayQueueUtil;private final RedissonClient redissonClient;@Override@Order(value = 1)public void run(ApplicationArguments args) {log.info("服务启动了!");RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();for (RedisDelayQueueEnum queueEnum : queueEnums) {new Thread(() -> {while (true) {try {Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());if (value != null) {RedisQueueHandle redisQueueHandle = SpringContextHolder.getBean(queueEnum.getBeanId());redisQueueHandle.execute(value);}} catch (Exception e) {log.error("(Redis延迟队列异常中断) {}", e.getMessage());}}}).start();}log.info("(Redis延迟队列启动成功)");}}
除了希望在一定时间之后触发某些任务外,平时还会有一些资源消耗比较大的任务,如果接口直接对外暴露,多人同时调用时有可能造成系统变慢甚至直接宕机。
在不改变系统配置,不升级系统硬件的情况下,我们可以将这种任务放到一个对列当中排队执行。
三、普通阻塞队列的使用
3.1 管理普阻塞通队列枚举类
@Getter
@AllArgsConstructor
public enum RedisBlockingQueueEnum {CONSUME_RESOURCES_TASK("CONSUME_RESOURCES_TASK", "消耗资源的任务", "consume resourcesImpl"),;private final String code;private final String name;/*** 延迟队列具体业务实现的 Bean* 可通过 Spring 的上下文获取*/private final String beanId;
}
3.2 使用一个RedisBlockingQueueOperator去统一管理添加阻塞队列
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisBlockingQueueOperator {private final RedissonClient redissonClient;public void addConsumeResourcesTaskQueue(Long userId, Long tenantId) {JSONObject jsonObject = new JSONObject();jsonObject.put("userId", userId);jsonObject.put("tenantId", tenantId);RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisBlockingQueueEnum.CONSUME_RESOURCES_TASK.getCode());queue.offer(jsonObject.toJSONString());}
}
3.3 Controller中将请求加入阻塞队列
@RestController
@RequiredArgsConstructor
@RequestMapping("/test")
@Tag(description = "test", name = "测试Controller")
@SecurityRequirement(name = HttpHeaders.AUTHORIZATION)
public class TestController {private final RedisBlockingQueueOperator queueOperator;@PostMapping("/consume_resource_task")public R consumeResourcesTask() throws Exception{queueOperator.addCombinedReleaseQueue(1 TenantContextHolder.getTenantId());return R.ok("success!");}}
3.4 编写实现类
@Slf4j
@Component
public class ConsumeResourcesTaskImpl implements RedisQueueHandle<String> {@Autowired@LazyTestService testService;@Overridepublic void execute(String value) throws Exception {JSONObject jsonObject = JSON.parseObject(value);log.info("延迟队列触发【处理耗时任务】:{}", value);testService.dealConsumeResourcesTask(value);}
}
3.5 项目启动时使用其它线程控制全部普通阻塞队列
@Slf4j
@Component
@AllArgsConstructor
public class AppStartRunner implements ApplicationRunner {private final RedissonClient redissonClient;@Override@Order(value = 1)public void run(ApplicationArguments args) {log.info("服务启动了!");for (RedisBlockingQueueEnum queueEnum : RedisBlockingQueueEnum.values()) {new Thread(() -> {RBlockingQueue<String> queue = redissonClient.getBlockingQueue(queueEnum.getCode());while (true) {try {String value = queue.take();if (value == null) continue;RedisQueueHandle redisQueueHandle = SpringContextHolder.getBean(queueEnum.getBeanId());redisQueueHandle.execute(value);} catch (Exception e) {log.error("(Redis阻塞队列异常中断) {}", e.getMessage());}}}).start();}log.info("(Redis Blocking Queue 启动成功)");}}