文章目录
- 过期监听
- 准备工作
- 稍微复习下Jedis与JedisPool
- 模拟延时队列
- 优缺点
- **优点**:
- **缺点**:
- `ZSet ` 实现延时队列
- 引入依赖
- 模拟延时队列
- 优缺点
- **优点**(跟过期监听一样):
- **缺点**:
- Reference
Redis实现延时队列主要有两种方式:
- 通过
SETEX
与发布订阅机制过期监听 - 通过
ZSET
实现
过期监听
Redis通过set key and expire
+ RedisExpirationListener
过期监听实现延时队列,主要还是基于Redis的发布/订阅模式实现。
准备工作
找到redis安装目录的redis.windows.conf与redis.windows-server.conf中的“notify-keyspace-events”,取消注释
notify-keyspace-events Ex
然后重启下Redis,windows中会直接使用redis-server.exe,不会加载redis.windows.conf这个配置文件,需要用进入安装目录,命令行启动:
redis-server.exe redis.windows.conf
如果这时命令行报错:
nvalid argument during startup: unknown conf file parameter :
那么要注意了,Redis配置里要顶格!要顶格!要顶格!
稍微复习下Jedis与JedisPool
首先明确一点,Jedis实例不是线程安全的,所以不可以多个线程共用一个Jedis实例
那是不是要开很多个Jedis实例呢?当然也不行,因为更多的Jedis实例意味着要建立更多的socket连接
所以就需要JedisPool
,就是一个线程安全的网络连接池,类似ThreadPool
可以复用Thread
线程实例,JedisPool
可以创建一些可靠的Jedis实例,并且在后续复用,能够提高性能,并且不需要更多的Socket连接
// 开一个连接池
JedisPool jedisPool = new JedisPool("127.0.0.1", 6379); // HOST, PORT
// 获取一个Jedis连接
Jedis jedis = jedisPool.getResource();
模拟延时队列
这里开两个线程:
- 一个用于订阅监听事件:因为
subscrib()
会阻塞等待,需要异步初始化: Jedis.subscribe(JedisPubSub实现类, channel)
订阅频道需要两个参数:channel
参数:"keyevent@0:expired"是发布删除过期key信息的channelJedisPubSub实现类
:简单来说就是实现``JedisPubSub`接口的一些方法,当有key过期删除前/后/时执行一些逻辑
- 一个用于生产数据:
set(key, seconds, value)
key
:这个订单集合的名字seconds
:过期时间(s)value
:当前订单的名字(编号)
public class RedisKeyExpireTest {private static JedisPool jedisPool = new JedisPool("127.0.0.1", 6379);public static void main(String[] args) throws InterruptedException {// subscribe方法会阻塞等待,用异步去初始化订阅监听事件new Thread(() -> {jedisPool.getResource().subscribe(new RedisSub(), "__keyevent@0__:expired");}).start();// 添加几个带过期时间的keynew Thread(() -> {try {for (int i = 0; i < 5; i++) {String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));jedisPool.getResource().setex("orderNo100" + i, i + 1, "orderNo100" + i);System.out.println(time + ":生成订单,订单号:orderNo100" + i + ",有效期:" + (i + 1) + "秒");Thread.sleep(1000);}} catch (Exception e) {e.printStackTrace();}}).start();}
}class RedisSub extends JedisPubSub {@Overridepublic void onMessage(String channel, String message) {String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));System.out.println(time + ":订单号:" + message + "已到期");}
}
输出:
2024-03-30 22:04:21:生成订单,订单号:orderNo1000,有效期:1秒
2024-03-30 22:04:22:生成订单,订单号:orderNo1001,有效期:2秒
2024-03-30 22:04:22:订单号:orderNo1000已到期
2024-03-30 22:04:23:生成订单,订单号:orderNo1002,有效期:3秒
2024-03-30 22:04:24:生成订单,订单号:orderNo1003,有效期:4秒
2024-03-30 22:04:24:订单号:orderNo1001已到期
2024-03-30 22:04:25:生成订单,订单号:orderNo1004,有效期:5秒
2024-03-30 22:04:26:订单号:orderNo1002已到期
2024-03-30 22:04:28:订单号:orderNo1003已到期
2024-03-30 22:04:30:订单号:orderNo1004已到期
优缺点
优点:
- 实现简单,redis内存操作,速度快,性能高,集群扩展方便
- 可以通过AOF和RDB实现消息队列的持久化,适合对延迟精度要求不高的业务场景
缺点:
-
redis的key过期有惰性清除和定时清除两种策略,可能会存在延迟时间不精确的问题
惰性清除:不主动删除过期键,每次从数据库访问 key 时,都检测 key 是否过期,如果过期则删除该 key
- 优点:对 CPU 时间最友好。因为每次访问时,才会检查 key 是否过期,所以此策略只会使用很少的系统资源。
- 缺点:对内存不友好。如果一个 key 已经过期,而这个 key 又仍然保留在数据库中,那么只要这个过期 key 一直没有被访问,它所占用的内存就不会释放,造成了一定的内存空间浪费。
定期删除:每隔一段时间「随机」从数据库中取出一定数量的 key 进行检查,并删除其中的过期key,如果当前抽取的key中过期的超过25%,就再抽一次,循环至过期比例在25%以内。
- 优点:可以减少内存压力
- 缺点:难以确定删除操作执行的时长和频率。太频繁对CPU不友好;频率太低,过期key得不到及时释放,对内存不友好。
而且,很明显,定期查询是一个循环,为了保证不会循环过度导致线程卡死,存在一个定期删除循环流程的时间上限,默认不超过25ms
-
极端情况下不可靠:如果客户端故障或重启期间有key过期则过期通知事件的数据就丢失了(订单无法过期)
- 可以用定时任务去做轮询补偿
ZSet
实现延时队列
Redis 可以使用有序集合(ZSet)的方式来实现延迟消息队列
Set 有一个 Score 属性可以用来存储延迟执行的时间
- 使用
zadd score1 value1
生产消息 - 利用
zrangebyscore
查询符合条件的任务,通过循环执行队列任务
引入依赖
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.2.0</version>
</dependency>
模拟延时队列
下面模拟一下这个过程,需要开两个线程:
- 一个生产者,往redis数据库里生产数据,主要方法:
zadd(key, value, score)
key
:这个订单集合的名字value
:当前订单的名字(编号)score
:当前订单的过期时刻
- 一个消费者,不停轮询这个
key
的Sorted Set,主要方法:zrangeWithScores(key, start, end)
:返回key
中在start
和end
间的value和score的Tuple集合,比如zrangeWithScores(key, 0, 0)
:获取score最小的一个tuplezrangeWithScores(key, 0, 1)
:获取score最小的两个tuple
- 也可以用
zrangeWithScores(key, start, end)
,这是start
和end
需要是准确时间。
public class CancelOrderRedisTest {private static JedisPool jedisPool = new JedisPool("127.0.0.1", 6379);public static void main(String[] args) {// 开一个线程,放几个订单元素到zset中new Thread(() -> {try {for (int i = 0; i < 5; i++) {String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));redisClient().zadd("cancel:order:list", System.currentTimeMillis() + (i + 1) * 1000, "orderNo100" + i);System.out.println(time + ":生成订单,订单号:orderNo100" + i + ",有效期:" + (i + 1) + "秒");Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}).start();// 再开一个线程轮询这个有序集合new Thread(() -> {Jedis jedis = redisClient();while (true){// 取这个有序集合的第一个,也就是score最小的元素Set<Tuple> items = jedis.zrangeWithScores("cancel:order:list", 0, 1);if (items == null || items.isEmpty()){// 避免空指针异常,因为是两个线程,一个线程生产数据(往redis里放订单),一个(现在这个)线程消费数据(清理过期数据)// 可能目前redis里的数据都消费完了,所以要sleep一会,等待另一个线程生产try {Thread.sleep(100);}catch (InterruptedException e){e.printStackTrace();}} else {// 轮询成功拿到redis的数据,下面就判断一下是否到期Tuple tuple = (Tuple) items.toArray()[0];long score = (long) tuple.getScore();// 如果现在的时间大于当前数据的score(过期时间),进行清理if (System.currentTimeMillis() >= score){Long num = jedis.zrem("cancel:order:list", tuple.getElement());if (num != null && num > 0){String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));System.out.println(time + ":订单号:" + tuple.getElement() + "已到期");}}}}}).start();}/*** 获取Redis连接* @return*/private static Jedis redisClient() {return jedisPool.getResource();}
}
输出:
2024-03-30 20:32:58:生成订单,订单号:orderNo1000,有效期:1秒
2024-03-30 20:32:59:订单号:orderNo1000已到期
2024-03-30 20:32:59:生成订单,订单号:orderNo1001,有效期:2秒
2024-03-30 20:33:00:生成订单,订单号:orderNo1002,有效期:3秒
2024-03-30 20:33:01:订单号:orderNo1001已到期
2024-03-30 20:33:01:生成订单,订单号:orderNo1003,有效期:4秒
2024-03-30 20:33:02:生成订单,订单号:orderNo1004,有效期:5秒
2024-03-30 20:33:03:订单号:orderNo1002已到期
2024-03-30 20:33:05:订单号:orderNo1003已到期
2024-03-30 20:33:07:订单号:orderNo1004已到期
优缺点
优点(跟过期监听一样):
- 实现简单,redis内存操作,速度快,性能高,集群扩展方便
- 可以通过AOF和RDB实现消息队列的持久化,适合对延迟精度要求不高的业务场景
缺点:
- 第一个就是轮询带来的问题:
- 轮询线程如果不休眠或休眠时间过短,可能导致过多的空轮询,CPU飙高
- 如果带休眠时间过长,因为现在过期的数据得等到下一轮轮询才能处理,延时队列的延时也就不准确了
- 另外,【没有队列的纯粹轮询】还有个问题,就是数据量太大时,可能一个轮询周期检查不完,这里只需要轮询队头的一个或几个数据,所以不太会有这个问题
- 然后是大部分中间件做延时队列都会有的问题:
- 极端条件下,会丢失数据,不可靠,比如:
- Redis过期通知时,应用正好重启,可能丢失事件(导致订单一直无法关闭)。
- 先删数据在处理订单还是先处理订单再删除数据,处理异常时可能会导致数据丢失。
- 可以用定时任务去做轮询补偿
- 存储维护成本高:需要监听的数据较大时会占用中间件大量的存储空间,增加维护成本
- 极端条件下,会丢失数据,不可靠,比如:
Reference
订单超时自动取消的技术方案解析及代码实现