业务服务:redisson

文章目录

  • 前言
  • 一、配置
    • 1. 添加依赖
    • 2. 配置文件/类
    • 3. 注入redission
    • 3. 封装工具类
  • 二、应用
    • 1. RedisUtils工具类的基本使用
  • 三、队列
    • 1. 工具类
    • 2. 普通队列
    • 3. 有界队列(限制数据量)
    • 4. 延迟队列(延迟获取数据)
    • 5. 优先队列(数据可插队)


前言

redission是一个开源的java redis的客户端,在其基础上进行了进一步扩展。这些扩展极大地丰富了Redis的应用场景,尤其是在构建分布式系统时。


一、配置

1. 添加依赖

<!--redisson-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>${redisson.version}</version><exclusions><exclusion><groupId>org.redisson</groupId><artifactId>redisson-spring-data-30</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-data-27</artifactId><version>${redisson.version}</version>
</dependency>
<dependency><groupId>com.baomidou</groupId><artifactId>lock4j-redisson-spring-boot-starter</artifactId><version>${lock4j.version}</version>
</dependency>

在这里插入图片描述

2. 配置文件/类

spring:redis:# 地址host: localhost# 端口,默认为6379port: 6379# 数据库索引database: 0# 密码(如没有密码请注释掉)password: asd60787533# 连接超时时间timeout: 10s# 是否开启sslssl: falseredisson:# redis key前缀keyPrefix: demo# 线程池数量threads: 4# Netty线程池数量nettyThreads: 8# 单节点配置singleServerConfig:# 客户端名称clientName: demo# 最小空闲连接数connectionMinimumIdleSize: 8# 连接池大小connectionPoolSize: 32# 连接空闲超时,单位:毫秒idleConnectionTimeout: 10000# 命令等待超时,单位:毫秒timeout: 3000# 发布和订阅连接池大小subscriptionConnectionPoolSize: 50
@Data
@Component
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {/*** redis缓存key前缀*/private String keyPrefix;/*** 线程池数量,默认值 = 当前处理核数量 * 2*/private int threads;/*** Netty线程池数量,默认值 = 当前处理核数量 * 2*/private int nettyThreads;/*** 单机服务配置*/private SingleServerConfig singleServerConfig;/*** 集群服务配置*/private ClusterServersConfig clusterServersConfig;@Data@NoArgsConstructorpublic static class SingleServerConfig {/*** 客户端名称*/private String clientName;/*** 最小空闲连接数*/private int connectionMinimumIdleSize;/*** 连接池大小*/private int connectionPoolSize;/*** 连接空闲超时,单位:毫秒*/private int idleConnectionTimeout;/*** 命令等待超时,单位:毫秒*/private int timeout;/*** 发布和订阅连接池大小*/private int subscriptionConnectionPoolSize;}@Data@NoArgsConstructorpublic static class ClusterServersConfig {/*** 客户端名称*/private String clientName;/*** master最小空闲连接数*/private int masterConnectionMinimumIdleSize;/*** master连接池大小*/private int masterConnectionPoolSize;/*** slave最小空闲连接数*/private int slaveConnectionMinimumIdleSize;/*** slave连接池大小*/private int slaveConnectionPoolSize;/*** 连接空闲超时,单位:毫秒*/private int idleConnectionTimeout;/*** 命令等待超时,单位:毫秒*/private int timeout;/*** 发布和订阅连接池大小*/private int subscriptionConnectionPoolSize;/*** 读取模式*/private ReadMode readMode;/*** 订阅模式*/private SubscriptionMode subscriptionMode;}}

3. 注入redission

@Slf4j
@Configuration
@EnableCaching
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {@Autowiredprivate RedissonProperties redissonProperties;@Autowiredprivate ObjectMapper objectMapper;@Beanpublic RedissonAutoConfigurationCustomizer redissonCustomizer() {return config -> {config.setThreads(redissonProperties.getThreads()).setNettyThreads(redissonProperties.getNettyThreads()).setCodec(new JsonJacksonCodec(objectMapper));RedissonProperties.SingleServerConfig singleServerConfig = redissonProperties.getSingleServerConfig();if (ObjectUtil.isNotNull(singleServerConfig)) {// 使用单机模式config.useSingleServer()//设置redis key前缀.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix())).setTimeout(singleServerConfig.getTimeout()).setClientName(singleServerConfig.getClientName()).setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout()).setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize()).setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize()).setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());}// 集群配置方式 参考下方注释RedissonProperties.ClusterServersConfig clusterServersConfig = redissonProperties.getClusterServersConfig();if (ObjectUtil.isNotNull(clusterServersConfig)) {config.useClusterServers()//设置redis key前缀.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix())).setTimeout(clusterServersConfig.getTimeout()).setClientName(clusterServersConfig.getClientName()).setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout()).setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize()).setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize()).setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize()).setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize()).setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize()).setReadMode(clusterServersConfig.getReadMode()).setSubscriptionMode(clusterServersConfig.getSubscriptionMode());}log.info("初始化 redis 配置");};}/*** redis集群配置 yml** --- # redis 集群配置(单机与集群只能开启一个另一个需要注释掉)* spring:*   redis:*     cluster:*       nodes:*         - 192.168.0.100:6379*         - 192.168.0.101:6379*         - 192.168.0.102:6379*     # 密码*     password:*     # 连接超时时间*     timeout: 10s*     # 是否开启ssl*     ssl: false** redisson:*   # 线程池数量*   threads: 16*   # Netty线程池数量*   nettyThreads: 32*   # 集群配置*   clusterServersConfig:*     # 客户端名称*     clientName: ${ruoyi.name}*     # master最小空闲连接数*     masterConnectionMinimumIdleSize: 32*     # master连接池大小*     masterConnectionPoolSize: 64*     # slave最小空闲连接数*     slaveConnectionMinimumIdleSize: 32*     # slave连接池大小*     slaveConnectionPoolSize: 64*     # 连接空闲超时,单位:毫秒*     idleConnectionTimeout: 10000*     # 命令等待超时,单位:毫秒*     timeout: 3000*     # 发布和订阅连接池大小*     subscriptionConnectionPoolSize: 50*     # 读取模式*     readMode: "SLAVE"*     # 订阅模式*     subscriptionMode: "MASTER"*/}
public class KeyPrefixHandler implements NameMapper {private final String keyPrefix;public KeyPrefixHandler(String keyPrefix) {//前缀为空 则返回空前缀this.keyPrefix = StringUtils.isBlank(keyPrefix) ? "" : keyPrefix + ":";}/*** 增加前缀*/@Overridepublic String map(String name) {if (StringUtils.isBlank(name)) {return null;}if (StringUtils.isNotBlank(keyPrefix) && !name.startsWith(keyPrefix)) {return keyPrefix + name;}return name;}/*** 去除前缀*/@Overridepublic String unmap(String name) {if (StringUtils.isBlank(name)) {return null;}if (StringUtils.isNotBlank(keyPrefix) && name.startsWith(keyPrefix)) {return name.substring(keyPrefix.length());}return name;}
}

3. 封装工具类

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@SuppressWarnings(value = {"unchecked", "rawtypes"})
public class RedisUtils {private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);/*** 限流** @param key          限流key* @param rateType     限流类型* @param rate         速率* @param rateInterval 速率间隔* @return -1 表示失败*/public static long rateLimiter(String key, RateType rateType, int rate, int rateInterval) {RRateLimiter rateLimiter = CLIENT.getRateLimiter(key);rateLimiter.trySetRate(rateType, rate, rateInterval, RateIntervalUnit.SECONDS);if (rateLimiter.tryAcquire()) {return rateLimiter.availablePermits();} else {return -1L;}}/*** 获取客户端实例*/public static RedissonClient getClient() {return CLIENT;}/*** 发布通道消息** @param channelKey 通道key* @param msg        发送数据* @param consumer   自定义处理*/public static <T> void publish(String channelKey, T msg, Consumer<T> consumer) {RTopic topic = CLIENT.getTopic(channelKey);topic.publish(msg);consumer.accept(msg);}public static <T> void publish(String channelKey, T msg) {RTopic topic = CLIENT.getTopic(channelKey);topic.publish(msg);}/*** 订阅通道接收消息** @param channelKey 通道key* @param clazz      消息类型* @param consumer   自定义处理*/public static <T> void subscribe(String channelKey, Class<T> clazz, Consumer<T> consumer) {RTopic topic = CLIENT.getTopic(channelKey);topic.addListener(clazz, (channel, msg) -> consumer.accept(msg));}/*** 缓存基本的对象,Integer、String、实体类等** @param key   缓存的键值* @param value 缓存的值*/public static <T> void setCacheObject(final String key, final T value) {setCacheObject(key, value, false);}/*** 缓存基本的对象,保留当前对象 TTL 有效期** @param key       缓存的键值* @param value     缓存的值* @param isSaveTtl 是否保留TTL有效期(例如: set之前ttl剩余90 set之后还是为90)* @since Redis 6.X 以上使用 setAndKeepTTL 兼容 5.X 方案*/public static <T> void setCacheObject(final String key, final T value, final boolean isSaveTtl) {RBucket<T> bucket = CLIENT.getBucket(key);if (isSaveTtl) {try {bucket.setAndKeepTTL(value);} catch (Exception e) {long timeToLive = bucket.remainTimeToLive();setCacheObject(key, value, Duration.ofMillis(timeToLive));}} else {bucket.set(value);}}/*** 缓存基本的对象,Integer、String、实体类等** @param key      缓存的键值* @param value    缓存的值* @param duration 时间*/public static <T> void setCacheObject(final String key, final T value, final Duration duration) {RBatch batch = CLIENT.createBatch();RBucketAsync<T> bucket = batch.getBucket(key);bucket.setAsync(value);bucket.expireAsync(duration);batch.execute();}/*** 注册对象监听器* <p>* key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置** @param key      缓存的键值* @param listener 监听器配置*/public static <T> void addObjectListener(final String key, final ObjectListener listener) {RBucket<T> result = CLIENT.getBucket(key);result.addListener(listener);}/*** 设置有效时间** @param key     Redis键* @param timeout 超时时间* @return true=设置成功;false=设置失败*/public static boolean expire(final String key, final long timeout) {return expire(key, Duration.ofSeconds(timeout));}/*** 设置有效时间** @param key      Redis键* @param duration 超时时间* @return true=设置成功;false=设置失败*/public static boolean expire(final String key, final Duration duration) {RBucket rBucket = CLIENT.getBucket(key);return rBucket.expire(duration);}/*** 获得缓存的基本对象。** @param key 缓存键值* @return 缓存键值对应的数据*/public static <T> T getCacheObject(final String key) {RBucket<T> rBucket = CLIENT.getBucket(key);return rBucket.get();}/*** 获得key剩余存活时间** @param key 缓存键值* @return 剩余存活时间*/public static <T> long getTimeToLive(final String key) {RBucket<T> rBucket = CLIENT.getBucket(key);return rBucket.remainTimeToLive();}/*** 删除单个对象** @param key 缓存的键值*/public static boolean deleteObject(final String key) {return CLIENT.getBucket(key).delete();}/*** 删除集合对象** @param collection 多个对象*/public static void deleteObject(final Collection collection) {RBatch batch = CLIENT.createBatch();collection.forEach(t -> {batch.getBucket(t.toString()).deleteAsync();});batch.execute();}/*** 检查缓存对象是否存在** @param key 缓存的键值*/public static boolean isExistsObject(final String key) {return CLIENT.getBucket(key).isExists();}/*** 缓存List数据** @param key      缓存的键值* @param dataList 待缓存的List数据* @return 缓存的对象*/public static <T> boolean setCacheList(final String key, final List<T> dataList) {RList<T> rList = CLIENT.getList(key);return rList.addAll(dataList);}/*** 注册List监听器* <p>* key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置** @param key      缓存的键值* @param listener 监听器配置*/public static <T> void addListListener(final String key, final ObjectListener listener) {RList<T> rList = CLIENT.getList(key);rList.addListener(listener);}/*** 获得缓存的list对象** @param key 缓存的键值* @return 缓存键值对应的数据*/public static <T> List<T> getCacheList(final String key) {RList<T> rList = CLIENT.getList(key);return rList.readAll();}/*** 缓存Set** @param key     缓存键值* @param dataSet 缓存的数据* @return 缓存数据的对象*/public static <T> boolean setCacheSet(final String key, final Set<T> dataSet) {RSet<T> rSet = CLIENT.getSet(key);return rSet.addAll(dataSet);}/*** 注册Set监听器* <p>* key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置** @param key      缓存的键值* @param listener 监听器配置*/public static <T> void addSetListener(final String key, final ObjectListener listener) {RSet<T> rSet = CLIENT.getSet(key);rSet.addListener(listener);}/*** 获得缓存的set** @param key 缓存的key* @return set对象*/public static <T> Set<T> getCacheSet(final String key) {RSet<T> rSet = CLIENT.getSet(key);return rSet.readAll();}/*** 缓存Map** @param key     缓存的键值* @param dataMap 缓存的数据*/public static <T> void setCacheMap(final String key, final Map<String, T> dataMap) {if (dataMap != null) {RMap<String, T> rMap = CLIENT.getMap(key);rMap.putAll(dataMap);}}/*** 注册Map监听器* <p>* key 监听器需开启 `notify-keyspace-events` 等 redis 相关配置** @param key      缓存的键值* @param listener 监听器配置*/public static <T> void addMapListener(final String key, final ObjectListener listener) {RMap<String, T> rMap = CLIENT.getMap(key);rMap.addListener(listener);}/*** 获得缓存的Map** @param key 缓存的键值* @return map对象*/public static <T> Map<String, T> getCacheMap(final String key) {RMap<String, T> rMap = CLIENT.getMap(key);return rMap.getAll(rMap.keySet());}/*** 获得缓存Map的key列表** @param key 缓存的键值* @return key列表*/public static <T> Set<String> getCacheMapKeySet(final String key) {RMap<String, T> rMap = CLIENT.getMap(key);return rMap.keySet();}/*** 往Hash中存入数据** @param key   Redis键* @param hKey  Hash键* @param value 值*/public static <T> void setCacheMapValue(final String key, final String hKey, final T value) {RMap<String, T> rMap = CLIENT.getMap(key);rMap.put(hKey, value);}/*** 获取Hash中的数据** @param key  Redis键* @param hKey Hash键* @return Hash中的对象*/public static <T> T getCacheMapValue(final String key, final String hKey) {RMap<String, T> rMap = CLIENT.getMap(key);return rMap.get(hKey);}/*** 删除Hash中的数据** @param key  Redis键* @param hKey Hash键* @return Hash中的对象*/public static <T> T delCacheMapValue(final String key, final String hKey) {RMap<String, T> rMap = CLIENT.getMap(key);return rMap.remove(hKey);}/*** 获取多个Hash中的数据** @param key   Redis键* @param hKeys Hash键集合* @return Hash对象集合*/public static <K, V> Map<K, V> getMultiCacheMapValue(final String key, final Set<K> hKeys) {RMap<K, V> rMap = CLIENT.getMap(key);return rMap.getAll(hKeys);}/*** 设置原子值** @param key   Redis键* @param value 值*/public static void setAtomicValue(String key, long value) {RAtomicLong atomic = CLIENT.getAtomicLong(key);atomic.set(value);}/*** 获取原子值** @param key Redis键* @return 当前值*/public static long getAtomicValue(String key) {RAtomicLong atomic = CLIENT.getAtomicLong(key);return atomic.get();}/*** 递增原子值** @param key Redis键* @return 当前值*/public static long incrAtomicValue(String key) {RAtomicLong atomic = CLIENT.getAtomicLong(key);return atomic.incrementAndGet();}/*** 递减原子值** @param key Redis键* @return 当前值*/public static long decrAtomicValue(String key) {RAtomicLong atomic = CLIENT.getAtomicLong(key);return atomic.decrementAndGet();}/*** 获得缓存的基本对象列表** @param pattern 字符串前缀* @return 对象列表*/public static Collection<String> keys(final String pattern) {Stream<String> stream = CLIENT.getKeys().getKeysStreamByPattern(pattern);return stream.collect(Collectors.toList());}/*** 删除缓存的基本对象列表** @param pattern 字符串前缀*/public static void deleteKeys(final String pattern) {CLIENT.getKeys().deleteByPattern(pattern);}/*** 检查redis中是否存在key** @param key 键*/public static Boolean hasKey(String key) {RKeys rKeys = CLIENT.getKeys();return rKeys.countExists(key) > 0;}
}

二、应用

1. RedisUtils工具类的基本使用

创建接口

@GetMapping("key")
public String getKey(String key){return RedisUtils.getCacheObject(key);
}@GetMapping("setKey")
public String setKey(String key,String value){RedisUtils.setCacheObject(key,value);return "success";
}

设置key

在这里插入图片描述
在这里插入图片描述

获取key对应的值

在这里插入图片描述
其他方法的作用,可以自行测试。这里就不再演示使用

三、队列

redission也支持队列,下面封装了一些队列的相关方法。可以处理了一些简单的队列任务,如果业务复杂可以选择mq

1. 工具类

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class QueueUtils {private static final RedissonClient CLIENT = SpringUtil.getBean(RedissonClient.class);/*** 获取客户端实例*/public static RedissonClient getClient() {return CLIENT;}/*** 添加普通队列数据** @param queueName 队列名* @param data      数据*/public static <T> boolean addQueueObject(String queueName, T data) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);return queue.offer(data);}/*** 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)** @param queueName 队列名*/public static <T> T getQueueObject(String queueName) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);return queue.poll();}/*** 通用删除队列数据(不支持延迟队列)*/public static <T> boolean removeQueueObject(String queueName, T data) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);return queue.remove(data);}/*** 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static <T> boolean destroyQueue(String queueName) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);return queue.delete();}/*** 添加延迟队列数据 默认毫秒** @param queueName 队列名* @param data      数据* @param time      延迟时间*/public static <T> void addDelayedQueueObject(String queueName, T data, long time) {addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);}/*** 添加延迟队列数据** @param queueName 队列名* @param data      数据* @param time      延迟时间* @param timeUnit  单位*/public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);delayedQueue.offer(data, time, timeUnit);}/*** 获取一个延迟队列数据 没有数据返回 null** @param queueName 队列名*/public static <T> T getDelayedQueueObject(String queueName) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);return delayedQueue.poll();}/*** 删除延迟队列数据*/public static <T> boolean removeDelayedQueueObject(String queueName, T data) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);return delayedQueue.remove(data);}/*** 销毁延迟队列 所有阻塞监听 报错*/public static <T> void destroyDelayedQueue(String queueName) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);delayedQueue.destroy();}/*** 添加优先队列数据** @param queueName 队列名* @param data      数据*/public static <T> boolean addPriorityQueueObject(String queueName, T data) {RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);return priorityBlockingQueue.offer(data);}/*** 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)** @param queueName 队列名*/public static <T> T getPriorityQueueObject(String queueName) {RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);return queue.poll();}/*** 优先队列删除队列数据(不支持延迟队列)*/public static <T> boolean removePriorityQueueObject(String queueName, T data) {RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);return queue.remove(data);}/*** 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static <T> boolean destroyPriorityQueue(String queueName) {RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);return queue.delete();}/*** 尝试设置 有界队列 容量 用于限制数量** @param queueName 队列名* @param capacity  容量*/public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);return boundedBlockingQueue.trySetCapacity(capacity);}/*** 尝试设置 有界队列 容量 用于限制数量** @param queueName 队列名* @param capacity  容量* @param destroy   已存在是否销毁*/public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);if (boundedBlockingQueue.isExists() && destroy) {destroyQueue(queueName);}return boundedBlockingQueue.trySetCapacity(capacity);}/*** 添加有界队列数据** @param queueName 队列名* @param data      数据* @return 添加成功 true 已达到界限 false*/public static <T> boolean addBoundedQueueObject(String queueName, T data) {RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);return boundedBlockingQueue.offer(data);}/*** 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)** @param queueName 队列名*/public static <T> T getBoundedQueueObject(String queueName) {RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);return queue.poll();}/*** 有界队列删除队列数据(不支持延迟队列)*/public static <T> boolean removeBoundedQueueObject(String queueName, T data) {RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);return queue.remove(data);}/*** 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static <T> boolean destroyBoundedQueue(String queueName) {RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);return queue.delete();}/*** 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)*/public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer, boolean isDelayed) {RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);if (isDelayed) {// 订阅延迟队列CLIENT.getDelayedQueue(queue);}queue.subscribeOnElements(consumer);}}

2. 普通队列

添加数据到队列

@GetMapping("add")
public String add(){QueueUtils.addQueueObject("queue:simple",1);QueueUtils.addQueueObject("queue:simple",2);QueueUtils.addQueueObject("queue:simple",3);return "ok";
}

在这里插入图片描述

消费队列数据

遵循先进先出,获取数据后就会删除。如果队列中没有数据,获取到的就为null

@GetMapping("get")
public Integer get(){return QueueUtils.getQueueObject("queue:simple");
}

在这里插入图片描述

移除队列数据

@GetMapping("remove")
public String remove(){QueueUtils.removeQueueObject("queue:simple",3);return "ok";
}

在这里插入图片描述

销毁队列

@GetMapping("destroy")
public String destroy(){QueueUtils.destroyQueue("queue:simple");return "ok";
}

在这里插入图片描述

订阅队列消息

  • 订阅的消息一般在项目启动的时候使用,只能订阅一次
  • 当监听到队列新增数据的时候会立即取出来进行消费
@PostConstruct
public void sub(){QueueUtils.subscribeBlockingQueue("queue:simple",(o)->{System.out.println("接收到消息:"+o);},false);
}

我们再次调用新增

在这里插入图片描述

3. 有界队列(限制数据量)

设置队列最大容量

有界队列在使用前必须设置容量

@GetMapping("set")
public String set(){boolean b = QueueUtils.trySetBoundedQueueCapacity("queue:bound", 10);return  "ok";
}

在这里插入图片描述

新增有界队列数据

@GetMapping("add")
public String add(){QueueUtils.addBoundedQueueObject("queue:bound",1);return "ok";
}

新增完毕后我们可以发现,我们直接设置的最大容量变成来了9。每次添加数据都会查询当前最大容量是否>0,如果大于0添加成功并且减一,否则添加失败

在这里插入图片描述
在这里插入图片描述

获取有界队列数据

@GetMapping("get")
public Integer get(){return QueueUtils.getBoundedQueueObject("queue:bound");
}

我们可以看到当获取数据的时候,容量+1,数据从redis中删除

在这里插入图片描述
在这里插入图片描述

其他用法与普通队列类似,就不再演示了

4. 延迟队列(延迟获取数据)

添加延迟数据

延迟队列的实现原理是将数据添加到另一个缓存队列中,当到达指定时间才会转移到普通队列中

@GetMapping("add")
public String add(){QueueUtils.addDelayedQueueObject("queue:belay",1,10, TimeUnit.SECONDS);return "ok";
}

获取延迟数据

必须达到指定时间后才能获取

@GetMapping("get")
public Integer get(){return QueueUtils.getDelayedQueueObject("queue:belay");
}

删除延迟数据

@GetMapping("remove")
public String remove(){QueueUtils.removeQueueObject("queue:belay",3);return "ok";
}

清空延迟数据

@GetMapping("destroy")
public String destroy(){QueueUtils.destroyDelayedQueue("queue:belay");return "ok";
}

订阅消息使用方法同普通队列类似,第三个参数需要改为true

5. 优先队列(数据可插队)

插入优先队列的数据我们需要先实现比较接口

@Data
@Accessors(chain = true)
class Order implements Comparable<Order>{private Long id;@Overridepublic int compareTo(Order o) {return Long.compare(getId(), o.id);}
}

新增优先数据

@GetMapping("add")
public String add(){QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(1L));QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(6L));QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(2L));QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(5L));QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(22L));QueueUtils.addPriorityQueueObject("queue:priority",new Order().setId(3L));return "ok";
}

我们可以看到插入的数据是有序的

在这里插入图片描述

获取优先队列数据

@GetMapping("get")
public Integer get(){return QueueUtils.getPriorityQueueObject("queue:priority");
}

删除优先队列数据

@GetMapping("remove")
public String remove(){QueueUtils.removeQueueObject("queue:priority",3);return "ok";
}

清空优先队列数据

@GetMapping("destroy")
public String destroy(){QueueUtils.destroyDelayedQueue("queue:priority");return "ok";
}

订阅消息使用方法同普通队列一样

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/757578.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用分治法解决矩阵乘法的Strassen算法及C代码示例

使用分治法解决矩阵乘法的Strassen算法及C代码示例 一、背景与意义二、分治法与矩阵乘法二、Strassen算法的基本思想三、Strassen算法的具体步骤四、Strassen算法的C代码实现五、Strassen算法的时间复杂度分析六、Strassen算法的优缺点及改进七、结论 一、背景与意义 在计算机…

LoRa模块在紧急救援与灾害管理中的作用:连接生命线

在灾害发生时&#xff0c;迅速、精准的信息传递和协调是救援工作中至关重要的一环。LoRa&#xff08;低功耗广域网&#xff09;模块以其长距离通信、低功耗和强大的穿透能力&#xff0c;为紧急救援与灾害管理提供了一种卓越的解决方案&#xff0c;成为连接生命线的关键技术。 1…

多个线程交替打印ABC

多个线程交替打印ABC package 多个线程交替打印ABC;import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;/*** Created with IntelliJ IDEA.** Author: AlenXu* Date: 2024/03/20/10:10* Description:*/ public class ThreadLoopP…

FPGA学习_Xilinx7系列FPGA基本结构

文章目录 前言一、7系列FPGA介绍1.1、芯片编号 二、基本组成单元2.1、可编程逻辑块CLB&#xff08;Configable Logic Block&#xff09;2.2、可编程输入输出单元&#xff08;IOB&#xff09;2.3、嵌入式块RAM&#xff08;Block RAM&#xff09;2.4、底层内嵌功能单元2.5、内嵌专…

罐头鱼AI视频混剪系统|视频矩阵运营获客

罐头鱼AI视频混剪系统 在当今数字化时代&#xff0c;视频内容已经成为吸引观众注意力的重要媒介之一。为了帮助用户更轻松地创建、编辑和发布视频内容&#xff0c;q1977470120罐头鱼AI推出了全新的视频混剪系统&#xff0c;让您的视频制作过程更加智能高效。让我们一起来看看罐…

网络工程师之路由交换技术篇

网络工程师之路由交换技术篇 路由交换之技术篇ARPICMPBPDUIPv6IP编址MAC其他技术点参考 以下均为个人笔记&#xff0c;摘录到csdn做备份 路由交换之技术篇 ARP Operation Code指定了ARP报文的类型&#xff0c; 包括ARP request 和ARP reply&#xff1b;取值为1或者2 &#x…

linux系统------------Mysql数据库

目录 一、数据库基本概念 1.1数据(Data) 1.2表 1.3数据库 1.4数据库管理系统(DBMS) 数据库管理系统DBMS原理 1.5数据库系统&#xff08;DBS) 二、数据库发展史 1、第一代数据库 2、第二代数据库 3、第三代数据库 三、关系型数据库 3.1关系型数据库应用 3.2主流的…

docker pull 镜像 报server misbehaving 异常信息

错误原因&#xff1a; DNS服务器配置问题。 解决方案&#xff1a; 修改DNS配置文件&#xff0c;增加nameserver如下 #打开配置文件并进行编辑 vi /etc/resolv.conf 添加如下nameserver配置&#xff1a; #resolv.conf增加nameserver nameserver 114.114.114.114

Flutter开发进阶之使用Socket实现主机服务(二)

Flutter开发进阶之使用Socket实现主机服务(二) Flutter开发进阶之使用Socket实现主机服务(一) 在完成局域网内设备定位后就可以进入微服务的实操了。 I、构建Socket连接池 一、定义Socket 使用socket_io_client socket_io_client: ^2.0.3+1导入头文件 import packag…

Flink:使用 Faker 和 DataGen 生成测试数据

博主历时三年精心创作的《大数据平台架构与原型实现&#xff1a;数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行&#xff0c;点击《重磅推荐&#xff1a;建大数据平台太难了&#xff01;给我发个工程原型吧&#xff01;》了解图书详情&#xff0c;…

深入理解Sora技术原理

OpenAI 发布的视频生成模型 Sora(https://openai.com/sora)&#xff0c;能根据文本生成长达一分钟的高质量视频&#xff0c;理论上支持任意分辨率&#xff0c;如 1920x1080 、1080x1920 &#xff0c;生成能力远超此前只能生成 25 帧 576x1024 图像的顶尖视频生成模型 Stable Vi…

2078: [蓝桥杯2023初赛] 01 串的熵

对于一个长度为 n 的 01 串 S x1x2x3...xn. 香农信息熵的定义为&#xff1a; 。 其中 p(0), p(1) 表示在这个 01 串中 0 和 1 出现的占比。 比如&#xff0c;对于S 100 来说&#xff0c;信息熵 H(S ) - 1/3 log2(1/3) - 2/3 log2(2/3) - 2/3 log2(2/3) 1.3083。 对于一个…

Harbor镜像仓库的安装和使用

1 Harbor安装 参考文章&#xff1a; 银河麒麟v10离线安装harbor 由于配置了本地私有yum源&#xff0c;因此&#xff0c;直接使用yum命令安装docker和docker-compose 1.1 安装docker yum install docker-ce1.2 安装docker-compose yum install docker-compose1.3 安装harbo…

一起学数据分析_3(模型建立与评估_1)

使用前面清洗好的数据来建立模型。使用自变量数据来预测是否存活&#xff08;因变量&#xff09;&#xff1f; &#xff08;根据问题特征&#xff0c;选择合适的算法&#xff09;算法选择路径&#xff1a; 1.切割训练集与测试集 import pandas as pd import numpy as np impo…

机器学习——编程实现从零构造训练集的决策树

自己搭建一棵决策树【长文预警】 忙了一个周末就写到了“构建决策树”这一步&#xff0c;还没有考虑划分测试集、验证集、“缺失值、连续值”&#xff0c;预剪枝、后剪枝的部分&#xff0c;后面再补吧&#xff08;挖坑&#xff09; 第二节内容&#xff1a;验证集划分\k折交叉…

python爬虫之xpath入门

文章目录 一、前言参考文档&#xff1a; 二、xpath语法-基础语法常用路径表达式举例说明 三、xpath语法-谓语表达式举例注意 四、xpath语法-通配符语法实例 五、选取多个路径实例 六、Xpath Helper安装使用说明例子&#xff1a; 七、python中 xpath 的使用安装xpath 的依赖包xm…

基于yolov2深度学习网络的人脸检测matlab仿真,图像来自UMass数据集

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1 网络架构与特征提取 4.2 输出表示 4.3损失函数设计 4.4预测阶段 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 load yolov2.mat% 加载…

爱注讲台三尺案不辞长作“育花”人

—记邵阳市优秀班主任、新宁县优秀教师周芳平 教育是人与人心灵上最美妙的接触&#xff0c;只要用心体察&#xff0c;用情关注&#xff0c;每一位学生都会走向金光大道。 ---题记 “亲爱的妈妈&#xff0c;祝您节日快乐&#xff01;”2024年3月8日&#xff0c;一条从深圳华为…

28-3 文件上传漏洞 -白盒审计绕过

环境准备:构建完善的安全渗透测试环境:推荐工具、资源和下载链接_渗透测试靶机下载-CSDN博客 一、upload-labs 靶场的第7关 先进行代码审计 $is_upload = false; $msg = null; if (isset($_POST[submit])) {if (file_exists($UPLOAD_ADDR)) {$deny_ext = array(".php&…

Spring Boot:筑基

Spring Boot 前言概述使用 Intellij idea 快速创建 Spring Boot 项目注意事项 前言 在学习 Spring 、SpringMVC 、MyBatis 和 JPA 框架的过程中&#xff0c;了解到 SSM 框架为 Java Web 开发提供了强大的后端支持&#xff0c;JPA 框架则简化了数据库的操作。然而&#xff0c;S…