SpringBoot项目中如何使用Redisson队列详解

一、SpringBoot配置Redisson

1.1 引入依赖

<!--Redisson延迟队列-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.1</version>
</dependency>

1.2 代码配置

@Data
@Component
@RefreshScope
@ConfigurationProperties("spring.redis")
public class RedisConfigProperties {String host;String password;Cluster cluster;
}@Data
class Cluster {Boolean enable;List<String> nodes;
}
@Component
@Configuration
@RequiredArgsConstructor
public class RedissonConfig {private final RedisConfigProperties redisConfig;@Beanpublic RedissonClient redissonClient() {Config config = new Config();if (redisConfig.getCluster() != null && Boolean.TRUE.equals(redisConfig.getCluster().getEnable())) {ClusterServersConfig clusterServersConfig = config.useClusterServers();for(String node : redisConfig.getCluster().getNodes()) {clusterServersConfig.addNodeAddress("redis://" + node);}if (StrUtil.isNotBlank(redisConfig.getPassword())){clusterServersConfig.setPassword(redisConfig.getPassword());}} else {SingleServerConfig serverConfig = config.useSingleServer();serverConfig.setAddress("redis://"+redisConfig.getHost()+":6379");System.out.println("============================================================");System.out.println("redisson设置的地址为:" + "redis://"+redisConfig.getHost()+":6379");System.out.println("============================================================");if (StrUtil.isNotBlank(redisConfig.getPassword())){serverConfig.setPassword(redisConfig.getPassword());}}return Redisson.create(config);}
}

1.3 application.yml中配置

spring:redis:host: 127.0.0.1

二、延时队列具体使用

2.1 编写一个工具类RedisDelayQueueUtil

/*** @Description: redission延迟队列工具类*/
@Slf4j
@Component
@RefreshScope
public class RedisDelayQueueUtil {// day代表单位是天,minutes代表单位是分钟(也可以是秒seconds, 但这个不在下面代码示例处理)@Value("${spring.mode}")private String mode;@Autowiredprivate RedissonClient redissonClient;/*** 添加延迟队列* @param queueCode 队列键* @param value 队列值* @param delay 延迟时间* @param <T>*/public <T> void addDelayQueue(String queueCode, String value, long delay) {try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(value, delay, "day".equals(testMode) ? TimeUnit.MINUTES : TimeUnit.DAYS);log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, delay + "天");} catch (Exception e) {throw new RuntimeException("(添加延时队列失败)");}}/*** 删除延迟队列* @param queueCode 队列键* @param value 队列值* @param <T>*/public <T> void removeDelayQueue(String queueCode, String value){try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.remove(value);log.info("(删除延时队列成功) 队列键:{},队列值:{}", queueCode, value);} catch (Exception e) {throw new RuntimeException("(删除延时队列失败)");}}/*** 获取延迟队列* @param queueCode 队列键* @param <T>* @return* @throws InterruptedException*/public <T> T getDelayQueue(String queueCode) throws InterruptedException {RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);redissonClient.getDelayedQueue(blockingDeque);T value  = (T) blockingDeque.take();return value;}/*** @param 移除延时队列全部任务* @param code* @param task*/public void removeTask(String code, String value) {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(code);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);List<String> c = new ArrayList<>();c.add(value);delayedQueue.removeAll(c);}}

2.2 在application.yml中配置时间单位

spring:mode: day

2.3 延迟队列枚举类

@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {// 这里可以配置多个枚举项,每个枚举项对应一个实现类OVER_TIME("OVER_TIME", "超时触发", "overTimeImpl"),;// 延迟队列 Redis Keyprivate String code;// 中文描述private String name;/*** 延迟队列具体业务实现的 Bean* 可通过 Spring 的上下文获取*/private String beanId;}

2.4 延迟队列枚举类中配置的实现类

@Slf4j
@Component
public class ActOverTimeImpl implements RedisQueueHandle<String> {@Autowired@LazyTestService testService;/*** 任务超时,监听* 可以在里面调用service的代码去处理具体的业务逻辑* @param value*/@Overridepublic void execute(String value) {log.info("接收到延迟任务【超时提醒】:{}", value);testService.dealDelayQueueExpire(value);}
}

2.5 项目启动时使用其它线程控制全部延时队列

@Slf4j
@Component
@AllArgsConstructor
public class AppStartRunner implements ApplicationRunner {private final RedisDelayQueueUtil redisDelayQueueUtil;private final RedissonClient redissonClient;@Override@Order(value = 1)public void run(ApplicationArguments args) {log.info("服务启动了!");RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();for (RedisDelayQueueEnum queueEnum : queueEnums) {new Thread(() -> {while (true) {try {Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());if (value != null) {RedisQueueHandle redisQueueHandle = SpringContextHolder.getBean(queueEnum.getBeanId());redisQueueHandle.execute(value);}} catch (Exception e) {log.error("(Redis延迟队列异常中断) {}", e.getMessage());}}}).start();}log.info("(Redis延迟队列启动成功)");}}

除了希望在一定时间之后触发某些任务外,平时还会有一些资源消耗比较大的任务,如果接口直接对外暴露,多人同时调用时有可能造成系统变慢甚至直接宕机。

在不改变系统配置,不升级系统硬件的情况下,我们可以将这种任务放到一个对列当中排队执行。

三、普通阻塞队列的使用

3.1 管理普阻塞通队列枚举类

@Getter
@AllArgsConstructor
public enum RedisBlockingQueueEnum {CONSUME_RESOURCES_TASK("CONSUME_RESOURCES_TASK", "消耗资源的任务", "consume resourcesImpl"),;private final String code;private final String name;/*** 延迟队列具体业务实现的 Bean* 可通过 Spring 的上下文获取*/private final String beanId;
}

3.2 使用一个RedisBlockingQueueOperator去统一管理添加阻塞队列

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisBlockingQueueOperator {private final RedissonClient redissonClient;public void addConsumeResourcesTaskQueue(Long userId, Long tenantId) {JSONObject jsonObject = new JSONObject();jsonObject.put("userId", userId);jsonObject.put("tenantId", tenantId);RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisBlockingQueueEnum.CONSUME_RESOURCES_TASK.getCode());queue.offer(jsonObject.toJSONString());}
}

3.3 Controller中将请求加入阻塞队列

@RestController
@RequiredArgsConstructor
@RequestMapping("/test")
@Tag(description = "test", name = "测试Controller")
@SecurityRequirement(name = HttpHeaders.AUTHORIZATION)
public class TestController {private final RedisBlockingQueueOperator queueOperator;@PostMapping("/consume_resource_task")public R consumeResourcesTask() throws Exception{queueOperator.addCombinedReleaseQueue(1 TenantContextHolder.getTenantId());return R.ok("success!");}}

3.4 编写实现类

@Slf4j
@Component
public class ConsumeResourcesTaskImpl implements RedisQueueHandle<String> {@Autowired@LazyTestService testService;@Overridepublic void execute(String value) throws Exception {JSONObject jsonObject = JSON.parseObject(value);log.info("延迟队列触发【处理耗时任务】:{}", value);testService.dealConsumeResourcesTask(value);}
}

3.5 项目启动时使用其它线程控制全部普通阻塞队列

@Slf4j
@Component
@AllArgsConstructor
public class AppStartRunner implements ApplicationRunner {private final RedissonClient redissonClient;@Override@Order(value = 1)public void run(ApplicationArguments args) {log.info("服务启动了!");for (RedisBlockingQueueEnum queueEnum : RedisBlockingQueueEnum.values()) {new Thread(() -> {RBlockingQueue<String> queue = redissonClient.getBlockingQueue(queueEnum.getCode());while (true) {try {String value = queue.take();if (value == null) continue;RedisQueueHandle redisQueueHandle = SpringContextHolder.getBean(queueEnum.getBeanId());redisQueueHandle.execute(value);} catch (Exception e) {log.error("(Redis阻塞队列异常中断) {}", e.getMessage());}}}).start();}log.info("(Redis Blocking Queue 启动成功)");}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/19651.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI预测福彩3D采取888=3策略+和值012路一缩定乾坤测试5月29日预测第5弹

今天继续基于8883的大底&#xff0c;使用尽可能少的条件进行缩号&#xff0c;同时&#xff0c;同样准备两套方案&#xff0c;一套是我自己的条件进行缩号&#xff0c;另外一套是8883的大底结合2码不定位奖号预测二次缩水来杀号。好了&#xff0c;直接上结果吧~ 首先&…

云计算面试题

5.20日 java的集合体系结构 它提供了一组接口、类和算法&#xff0c;用于存储和管理对象集合。Java的集合框架包括多个基本接口&#xff0c;如Collection、List、Set、Map等。Collection是集合层次结构的根接口&#xff0c;代表一组对象&#xff1b;List是有序集合&#xff0…

大数据组件doc

1.flink Apache Flink Documentation | Apache Flink 2.kafka Apache Kafka 3.hbase Apache HBase ™ Reference Guide 4.zookeeper ZooKeeper: Because Coordinating Distributed Systems is a Zoo 5.spark Overview - Spark 3.5.1 Documentation 6.idea组件&#xff08;…

Python安装vnpy教程

安装vn.py(一个用于开发量化交易应用的Python库)请确保你已经安装了Python环境 (推荐使用Python3.7或以上版本)。 1. 安装Python和pip 确保你已经安装了Python和pip。如果没有,请先安装。 Windows 下载Python安装包:https://www.python.org/downloads/安装Python时,确…

BUUCTF Crypto RSA详解《1~32》刷题记录

文章目录 一、Crypto1、 一眼就解密2、MD53、Url编码4、看我回旋踢5、摩丝6、password7、变异凯撒8、Quoted-printable9、篱笆墙的影子10、Rabbit11、RSA12、丢失的MD513、Alice与Bob14、大帝的密码武器15、rsarsa16、Windows系统密码17、信息化时代的步伐18、凯撒&#xff1f;…

如何使用视频号下载助手机器人,下载视频号视频

目录 微信视频号版权问题 视频号下载助手机器人如何获取 手机市场基本一年每个品牌商发布的手机就高达10多种&#xff0c;而这些设备中并不支持手机缓存操作&#xff0c;却把市场搞的越来越浑&#xff0c;还不断宣传手机缓存可保存视频&#xff0c;今天教教大家如何使用视频号…

程序员转行8大方向,都考虑一下

众所周之&#xff0c;程序员的工作通常面临较大的压力&#xff0c;包括项目进度紧张、技术难题需要解决、与其他团队成员沟通协作等。 长时间处于高压状态下可能会导致一些程序员感到身心疲惫&#xff0c;这也让大家产生了转行的想法&#xff0c;程序员想要转行&#xff0c;有…

私域加持业务 快消门店运营新玩法

两个月前&#xff0c;某快消品企业的李总急切地联系了纷享销客&#xff0c;希望能找到解决终端门店运营难题的有效方法。 Step1、连接终端门店&#xff0c;导入私域进行深度维系与运营 一、与终端门店建立联系 为了与众多门店老板建立紧密的联系&#xff0c;并将他们转化为企…

error: #error C++17 or later compatible compiler is required to use PyTorch.

背景 在安装wonder3d的时候要安装tinycudnn&#xff0c; 命令如下&#xff1a; pip install githttps://github.com/NVlabs/tiny-cuda-nn/#subdirectorybindings/torch报错如下&#xff1a; error: #error C17 or later compatible compiler is required to use PyTorch. 解…

sqliteSQL基础

SQL基础 SQLite 数据库简介 SQLite 是一个开源的、 内嵌式的关系型数据库&#xff0c; 第一个版本诞生于 2000 年 5 月&#xff0c; 目前最高版本为 SQLite3。 下载地址&#xff1a; https://www.sqlite.org/download.html 菜鸟教程 : https://www.runoob.com/sqlite/sqlit…

WHAT - package.json 解释

目录 一、前言二、介绍2.1 package.json 文件示例2.2 关键字段2.3 常用命令2.4 自定义脚本 三、element-plus 完整示例3.1 main 和 module1. main 字段2. module 字段3. 综合示例 3.2 types1. 示例2. TypeScript 类型定义文件的作用3. 类型定义文件示例4. 发布带有类型定义的包…

Redis相关详解

什么是 Redis&#xff1f;它主要用来什么✁&#xff1f; Redis&#xff0c;英文全称是 Remote Dictionary Server&#xff08;远程字典服务&#xff09;&#xff0c;是一个开源✁使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化✁日志型、Key-Value 数据库&#xff…

Elasticsearch 认证模拟题 -2

一、题目 有一个索引 task3&#xff0c;其中有 fielda&#xff0c;fieldb&#xff0c;fieldc&#xff0c;fielde 现要求对 task3 重建索引&#xff0c;重建后的索引新增一个字段 fieldg 其值是fielda&#xff0c;fieldb&#xff0c;fieldc&#xff0c;fielde 的值拼接而成。 …

css :hover的使用

参考未整理 即鼠标移入类名为btn的元素时&#xff0c;她的子元素i样式发生改变 自身的样式也发生改变 &#xff0c;如果他有更多的子元素也可以这样写

机器学习笔记(1):sklearn是个啥?

sklearn 简介 Sklearn是一个基于Python语言的开源机器学习库。全称Scikit-Learn&#xff0c;是建立在诸如NumPy、SciPy和matplotlib等其他Python库之上&#xff0c;为用户提供了一系列高质量的机器学习算法&#xff0c;其典型特点有&#xff1a; 简单有效的工具进行预测数据分…

自动化安装Nginx

1. 指定版本号和用户&#xff1b; 2. 确定安装目录&#xff1b; 3. 确定安装编译模块&#xff1b; 4. 安装相关依赖&#xff1b; 5. 下载源码包并解压&#xff1b; 6. 编译安装&#xff1b; 7. 文件授权及临时文件清理。 #!/bin/bash# 用户输入的Nginx版本号NGIN…

VMware虚拟机安装Ubuntu-Server版教程(超详细)

目录 1. 下载2. 安装 VMware3. 安装 Ubuntu3.1 新建虚拟机3.2 安装操作系统 4. SSH方式连接操作系统4.1 好用的SSH工具下载&#xff1a;4.2 测试SSH连接 5. 开启root用户登录5.1 设置root用户密码5.2 传统方式切换root用户5.3 直接用root用户登录5.4 SSH启用root用户登录 6. 安…

CV每日论文--2024.5.31

1、X-VILA: Cross-Modality Alignment for Large Language Model 中文标题&#xff1a;X-VILA: 跨模态对齐的大型语言模型 简介&#xff1a;我们提出了X-VILA,这是一种全模态模型,旨在通过整合图像、视频和音频模态来扩展大型语言模型(LLM)的能力。X-VILA通过将模态特定的编码…

Spring Cache自定义序列化解决乱码问题

Spring Cache数据缓存到 Redis中的 value是乱码问题&#xff08;包含日期格式转换&#xff09;&#xff0c;主要是因为序列化方式导致的。 Spring Cache使用 Redis缓存管理器时&#xff0c;默认 value使用的是 JDK序列化值的方式&#xff0c;所以导致缓存的 value乱码的问题。 …

【OceanBase诊断调优】—— 执行内存占用高问题汇总

执行内存占用高问题经常在不同环境中遇到, 在内存较大的租户场景下, 问题可能暴露不明显, 但小规格场景下, 如果执行内存占用非常高, 往往有上 G 甚至几十G 的情况, 可能导致整个租户无内存使用。本文汇总执行内存占用高问题。 获取内存高的 mod 的代码 backtrace 方式 对于执…