🔥作者主页:小林同学的学习笔录
🔥小林同学的专栏:JAVA之基础专栏
【Redis实战篇】Redis有可能出现的问题以及如何解决问题_redis实现用户登录可能造成哪些问题-CSDN博客
本文接上面的文章
目录
2.优惠券秒杀
2.1 全局唯一ID
2.2 Redis实现全局唯一id
2.3 添加优惠券
2.4 实现秒杀下单
2.5 库存超卖问题分析
2.6 优惠券秒杀-一人一单
2.7 集群环境下的并发问题
3.分布式锁
3.1 基本原理和实现方式对比
3.2 Redis分布式锁的实现核心思路
3.3 Redis分布式锁误删情况说明
3.4 分布式锁的原子性问题
3.5 Lua脚本解决多条命令原子性问题
3.6 利用Java代码调用Lua脚本改造分布式锁
5、分布式锁-redission
5.1 分布式锁-redission功能介绍
5.2 分布式锁-Redission快速入门
5.3 分布式锁-redission可重入锁原理
5.4 分布式锁-redission锁重试和WatchDog机制
5.5 分布式锁-redission锁的MutiLock原理
6.秒杀优化
6.1 秒杀优化-异步秒杀思路
6.2 秒杀优化-Redis完成秒杀资格判断
6.3 秒杀优化-基于阻塞队列实现秒杀优化
7.Redis消息队列
7.1 Redis消息队列-认识消息队列
7.2 Redis消息队列-基于List实现消息队列
7.3 Redis消息队列-基于PubSub的消息队列
7.4 Redis消息队列-基于Stream的消息队列
7.5 Redis消息队列-基于Stream的消息队列-消费者组
7.6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
2.优惠券秒杀
2.1 全局唯一ID
每个店铺都可以发布优惠券
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显,可以被猜到一天订单量
- 受单表数据量的限制,数据库每张表auto_increment都是从0开始,因此会出现重复
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分(64位数字):
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,每次算法生成的时间戳,是当前时间的毫秒数减去起始时间的毫秒数,这个差值即为时间戳
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
全局唯一ID生成策略:
- UUID:生成16进制,返回的是字符串结构,不具有单调递增的特性,有重复可能
- Redis自增:数值最大不会超过Long,单调递增,占据空间小
- snowflake算法(雪花算法):高性能高可用,生成不依赖数据库,在内存中生成,具有趋势递增性
- 数据库自增:单独创建个表,专门管理id自增,然后在其他表引入该id实现连续性自增,但是性能不高
Redis自增ID策略:
每天生成一个key,方便统计订单量
ID构造是 时间戳 + 计数器
2.2 Redis实现全局唯一id
@Component
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 序列号的位数*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1.获取当前日期,精确到天,方便后续统计每天或者每月订单量(通过key统计)String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// 2.2.自增长long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接并返回(这里因为刚开始时间戳从低位开始的,要把它移动到高位,需要向左移动32位,|为或运算,因为序列号刚开始都为0,因此值由count决定)return timestamp << COUNT_BITS | count;}
}
2.3 添加优惠券
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:
tb_voucher:优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher:特价优惠券的库存、开始抢购时间,结束抢购时间。
由于代金券由于优惠力度大,所以像第二种卷,就得限制数量,从表结构上也能看出,特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段
2.4 实现秒杀下单
秒杀下单应该思考的内容:
下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
//这里涉及多张表需要加事务
@Transactional
public Result secKillVoucher(Long voucherId) {//1.查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);//2.判断优惠券开始时间if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀优惠券还没开始");}//3.判断优惠券过期时间if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀优惠券已经结束");}//4.判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("秒杀优惠券库存不足");}//5.扣减库存 boolean success = seckillVoucherService.update() //更新与秒杀抵用券相关的数据的入口点.setSql("stock = stock - 1") //这部分设置了更新操作的 SQL 语句.eq("voucher_id", voucherId) //更新操作设置条件(where).update(); //执行更新操作 if (!success) {//返回库存不足return Result.fail("秒杀优惠券库存不足");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1 生产订单IDlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2 获取用户IDLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3 购买的代金券的IDvoucherOrder.setVoucherId(voucherId);//7.保存数据到数据库save(voucherOrder);//8.返回订单idreturn Result.ok(voucherOrder);}
用Jmeter模拟200个线程来秒杀
出现超卖问题
2.5 库存超卖问题分析
假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,
那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:
悲观锁:悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等。
乐观锁:会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过。当然乐观锁还有一些变种的处理方式比如cas
用乐观锁实现超卖问题
修改代码方案一:
VoucherOrderServiceImpl 在扣减库存时,改为:
boolean success = seckillVoucherService.update().setSql("stock= stock -1") //set stock = stock -1.eq("voucher_id", voucherId).eq("stock",voucher.getStock()) //where id = ? and stock = ?.update();
通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败
修改代码方案二:
之前的方式要修改前后都保持一致,但是这样我们分析过,成功的概率太低,所以我们的乐观锁需要变一下,改成stock大于0 即可
boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).gt("stock",0) //where id = ? and stock > 0.update();
这种方案不会出现上面的问题
2.6 优惠券秒杀-一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
存在问题:现在的问题还是和之前一样,并发过来查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁适合更新数据情况下,而现在是插入数据,所以我们需要使用悲观锁操作
初步代码:在扣减库存之前添加一人一单逻辑
public synchronized Result createVoucherOrder(Long voucherId) {// 5.一人一单逻辑// 5.1.用户idLong userId = UserHolder.getUser().getId();int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}......
}
注意:在这里提到了非常多的问题,我们需要慢慢的来思考,首先我们的初始方案是封装了一个createVoucherOrder方法,同时为了确保他线程安全,在方法上添加了一把synchronized 锁,但是这样添加锁,锁的粒度太粗了,在使用锁过程中,控制锁粒度是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度,以下这段代码需要修改为:
Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {//这里不能拿到createVoucherOrder(voucherId)的动态代理return this.createVoucherOrder(voucherId);
}
关于intern()方法的了解?
intern() 这个方法是从常量池中拿到数据,如果我们直接使用userId.toString() 它拿到的对象实际上是不同的对象,是new出来的对象,我们使用锁必须保证锁必须是同一把,所以我们需要使用intern()方法
但是以上代码还是存在问题,问题的原因在于当前方法被spring的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致并发问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:如下:
synchronized (userId.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);
完整代码:
@Overridepublic Result secKillVoucher(Long voucherId) {//1.查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);//2.判断优惠券开始时间if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀优惠券还没开始");}//3.判断优惠券过期时间if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀优惠券已经结束");}//4.判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("秒杀优惠券库存不足");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {/*这里本来是return this.createVoucherOrder(voucherId);为了拿到的是动态代理对象需要以下操作*/IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();//5.一人一单//5.1 查询订单Integer count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();//5.2 判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).gt("stock", 0) //where id = ? and stock > 0.update();if (!success) {//返回库存不足return Result.fail("秒杀优惠券库存不足");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1 生产订单IDlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2 获取用户IDvoucherOrder.setUserId(userId);//7.3 购买的代金券的IDvoucherOrder.setVoucherId(voucherId);//保存数据到数据库save(voucherOrder);//8.返回订单idreturn Result.ok(voucherOrder);}
2.7 集群环境下的并发问题
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
1、我们将服务启动两份,端口分别为8081和8082:
2、然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:
有关锁失效原因分析:
由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。
一个JVM只有一个锁监视器,在同一个JVM可以实现锁的互斥,但是多个JVM出现锁不住,因此需要分布式锁这种量级的解决
3.分布式锁
3.1 基本原理和实现方式对比
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路。
那么分布式锁他应该满足一些什么样的条件呢?
可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
安全性:安全也是程序中必不可少的一环
常见的分布式锁有以下三种:
3.2 Redis分布式锁的实现核心思路
了解一下阻塞和非阻塞锁?
阻塞锁:在阻塞锁中,当一个线程尝试获取锁时,如果该锁已经被其他线程持有,那么该线程将被阻塞直到锁被释放。Java中最常见的阻塞锁是
synchronized
关键字和ReentrantLock
类。
synchronized关键字:在Java中,使用
synchronized
关键字可以将方法或代码块声明为同步代码块,确保在同一时间只有一个线程可以执行同步代码块中的代码。ReentrantLock类:
ReentrantLock
是java.util.concurrent.locks
包中的一个类,它提供了与synchronized
关键字类似的同步功能,但是具有更灵活的功能,例如可中断锁、定时锁等。非阻塞锁:相对于阻塞锁,非阻塞锁允许线程尝试获取锁而无需等待,如果锁不可用,线程不会被阻塞,而是立即返回并执行其他操作。Java中的
Lock
接口的tryLock()
方法是非阻塞锁的一种实现。
- tryLock()方法:
tryLock()
方法尝试获取锁,如果锁是可用的,则获取锁并立即返回true
,否则返回false
,而不会等待。
实现分布式锁时需要实现的两个基本方法:
获取锁:
-
互斥:确保只能有一个线程获取锁
-
非阻塞锁:尝试一次,成功返回true,失败返回false。通过help指令可以获取到set后面可添加的参数,values一般设置为线程的标识
释放锁:
-
手动释放
-
超时释放:获取锁时添加一个超时时间
主要逻辑:
代码实现:
public interface ILock {//获取锁boolean tryLock(long timeoutSec);//释放锁void unlock();
}public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String key_prefix = "lock:";@Overridepublic boolean tryLock(long timeoutSec) {//分布式锁的设置,values最好用线程来标识long threadId = Thread.currentThread().getId();//获取锁成功返回true,获取锁失败返回falseBoolean success = stringRedisTemplate.opsForValue().setIfAbsent(key_prefix + name, threadId + "", timeoutSec, TimeUnit.SECONDS);//防止自动拆箱出现问题(null)return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {stringRedisTemplate.delete(key_prefix + name);}
}
把之前的synchronized改成分布式锁
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result secKillVoucher(Long voucherId) {//1.查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);//2.判断优惠券开始时间if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀优惠券还没开始");}//3.判断优惠券过期时间if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀优惠券已经结束");}//4.判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("秒杀优惠券库存不足");}Long userId = UserHolder.getUser().getId();//手动创建锁对象key("order:" + userId)SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);boolean isLock = lock.tryLock(1200);if(!isLock){//获取锁失败,返回错误或者重试return Result.fail("不允许一个用户重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//释放锁(报不报错都要释放锁)lock.unlock();}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();//5.一人一单//5.1 查询订单Integer count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();//5.2 判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).gt("stock", 0) //where id = ? and stock > 0.update();if (!success) {//返回库存不足return Result.fail("秒杀优惠券库存不足");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1 生产订单IDlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2 获取用户IDvoucherOrder.setUserId(userId);//7.3 购买的代金券的IDvoucherOrder.setVoucherId(voucherId);//保存数据到数据库save(voucherOrder);//8.返回订单idreturn Result.ok(voucherOrder);}
}
3.3 Redis分布式锁误删情况说明
问题:
持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况说明
改进后
逻辑需要变更为
需求:修改之前的分布式锁实现,满足:在获取锁时存入线程标示(可以用UUID表示) 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
-
如果一致则释放锁
-
如果不一致则不释放锁
核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。
代码实现:
public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String key_prefix = "lock:";private static final String id_prefix = UUID.randomUUID(true).toString() + "-";@Overridepublic boolean tryLock(long timeoutSec) {//存入线程标识String threadId = id_prefix + Thread.currentThread().getId();//获取锁成功返回true,获取锁失败返回falseBoolean success = stringRedisTemplate.opsForValue().setIfAbsent(key_prefix + name, threadId + "", timeoutSec, TimeUnit.SECONDS);//防止自动拆箱出现问题(null)return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {String threadId = id_prefix + Thread.currentThread().getId();//这里有可能redis的键过期了,其他线程进来尝试获取锁后导致标识不一致String id = stringRedisTemplate.opsForValue().get(key_prefix + name);// 判断标示是否一致if(threadId.equals(id)){//释放锁stringRedisTemplate.delete(key_prefix + name);}}
}
如果判断线程不同,就不给释放锁,相同才给释放
3.4 分布式锁的原子性问题
更为极端的误删逻辑说明:
线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,
解决方案就是把判断锁标识和释放锁同时进行
3.5 Lua脚本解决多条命令原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
脚本最终形式:
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 一致,则删除锁return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
3.6 利用Java代码调用Lua脚本改造分布式锁
RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图股
代码实现:
public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID(true).toString() + "-";//加载Lua脚本private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeoutSec) {//存入线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//获取锁成功返回true,获取锁失败返回falseBoolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);//防止自动拆箱出现问题(null)return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//通过Lua脚本可以让判断和释放锁同时执行(原子性)stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}// @Override
// public void unlock() {
// String threadId = id_prefix + Thread.currentThread().getId();
// //这里有可能redis的键过期了,其他线程进来尝试获取锁后导致标识不一致
// String id = stringRedisTemplate.opsForValue().get(key_prefix + name);
// // 判断标示是否一致
// if(threadId.equals(id)){
// //释放锁
// stringRedisTemplate.delete(key_prefix + name);
// }
// }
}
总结:
基于Redis的分布式锁实现思路:
利用set nx ex获取锁,并设置过期时间,保存线程标示
释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
利用set nx满足互斥性
利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
利用Redis集群保证高可用和高并发特性
笔者总结:我们一路走来,利用添加过期时间,防止死锁问题的发生,但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是利用删之前 通过拿锁,比锁,删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,但是现在还有原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过lua表达式来解决这个问题
但是目前还剩下一个问题锁不住,什么是锁不住呢,你想一想,如果当过期时间到了之后,我们可以给他续期一下,比如续个30s,就好像是网吧上网, 网费到了之后,然后说,来,网管,再给我来10块的,是不是后边的问题都不会发生了,那么续期问题怎么解决呢,可以依赖于我们接下来要学习redission啦
5、分布式锁-redission
5.1 分布式锁-redission功能介绍
基于setnx实现的分布式锁存在下面的问题:
重入问题:可重入锁(Reentrant Lock)是一种多线程同步机制,允许线程在持有锁的情况下再次请求相同的锁,而不会被自己所持有的锁所阻塞。这种锁的主要特性是它允许一个线程多次获取同一把锁而不会造成死锁。比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
不可重试:是指目前的分布式锁只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
那么什么是Redission呢?
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redission提供了分布式锁的多种多样的功能
5.2 分布式锁-Redission快速入门
引入依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
配置Redisson客户端:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();//redis的主机和端口号以及密码config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");// 创建RedissonClient对象return Redisson.create(config);}
}
如何使用Redission的分布式锁:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;@Overridepublic Result secKillVoucher(Long voucherId) {//1.查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);//2.判断优惠券开始时间if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀优惠券还没开始");}//3.判断优惠券过期时间if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀优惠券已经结束");}//4.判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("秒杀优惠券库存不足");}Long userId = UserHolder.getUser().getId();//手动创建锁对象key("order:" + userId)//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//使用redisson创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁boolean isLock = lock.tryLock();if(!isLock){//获取锁失败,返回错误或者重试return Result.fail("不允许一个用户重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//释放锁(报不报错都要释放锁)lock.unlock();}}
5.3 分布式锁-redission可重入锁原理
基于Redis的实现:Redisson可重入锁的核心是利用了Redis的原子性操作来实现锁的获取和放。基于Redis的“SETNX"
命令可以在不存在指定key的情况下设置key的值,从而实现锁的获取。当需要释放锁时,Redisson会使用Lua脚本来确保释放锁的原子性。
可重入性:Redisson实现了可重入锁的机制,即同一线程可以多次获取同一个锁而不会被阻塞。Redisson在内部维护了一个计数器,用于记录当前线程获取锁的次数。每次获取锁时,计数器加1,每次释放锁时,计数器减1。只有当计数器减为0时,锁才会真正被释放。因此需要用到redis的hash来存储,key为锁名,field为线程标识,value为计数器
防止死锁:Redisson还实现了防止死锁的机制。在获取锁时,可以设置一个超时时间,如果在指定时间内未能成功获取锁,就会放弃获取锁并返回失败,避免了因为程序出现问题而导致的死锁情况。
逻辑原理:
redisson获取锁和释放锁的底层原理
代码演示:
可以自己打断点走一下,在redis看数据就能看出计数器的效果
@Slf4j
@SpringBootTest
class RedissonTest {@Resourceprivate RedissonClient redissonClient;private RLock lock;@BeforeEachvoid setUp() {lock = redissonClient.getLock("order");}@Testvoid method1() throws InterruptedException {// 尝试获取锁boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);if (!isLock) {log.error("获取锁失败 .... 1");return;}try {log.info("获取锁成功 .... 1");method2();log.info("开始执行业务 ... 1");} finally {log.warn("准备释放锁 .... 1");lock.unlock();}}void method2() {// 尝试获取锁boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败 .... 2");return;}try {log.info("获取锁成功 .... 2");log.info("开始执行业务 ... 2");} finally {log.warn("准备释放锁 .... 2");lock.unlock();}}
}
5.4 分布式锁-redission锁重试和WatchDog机制
-
重试机制:在获取分布式锁时,可能会由于网络延迟、Redis服务端压力过大等原因导致获取锁失败。为了增加获取锁的成功率,Redisson提供了重试机制。当获取锁失败时,可以设置一个重试次数和重试间隔,Redisson会自动进行重试,直到成功获取锁或者超过重试次数为止。这样可以有效应对瞬时的网络波动或者Redis服务端的繁忙情况,提高了分布式锁的可靠性。
-
WatchDog机制:WatchDog是Redisson提供的一种监控机制,用于检测并延长锁的过期时间,防止因为锁的持有者出现异常或者网络故障而导致锁过早释放。WatchDog会在获取锁成功后启动一个定时任务,定期续约锁的过期时间,确保锁能够被持有者一直有效地持有。如果持有锁的线程出现了意外情况(比如宕机),WatchDog会自动停止续约任务,锁会在一定时间后由Redis自动释放,避免出现死锁情况
总结:
Redisson分布式锁原理:
可重入:利用hash结构记录线程id和重入次数
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间
5.5 分布式锁-redission锁的MutiLock原理
为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
那么MutiLock 加锁原理是什么呢?笔者画了一幅图来说明
当我们去设置了多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试.
总结:
1)不可重入Redis分布式锁:
原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
缺陷:不可重入、无法重试、锁超时失效
2)可重入的Redis分布式锁:
原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
缺陷:redis宕机引起锁失效问题
3)Redisson的multiLock:
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂
6.秒杀优化
6.1 秒杀优化-异步秒杀思路
在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的很慢,所以我们需要异步程序执行,那么如何加速呢?
优化方案:我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开另外一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?
实现逻辑:
6.2 秒杀优化-Redis完成秒杀资格判断
需求:
-
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
-
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
-
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
代码实现:
VoucherServiceImpl
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存秒杀库存到Redis中//SECKILL_STOCK_KEY 这个变量定义在RedisConstans中//private static final String SECKILL_STOCK_KEY ="seckill:stock:"stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
完整lua表达式
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2.库存不足,返回1return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3.存在,说明是重复下单,返回2return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
VoucherOrderServiceImpl
@Override
public Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");// 1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));int r = result.intValue();// 2.判断结果是否为0if (r != 0) {// 2.1.不为0 ,代表没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}//TODO 保存阻塞队列// 3.返回订单idreturn Result.ok(orderId);
}
6.3 秒杀优化-基于阻塞队列实现秒杀优化
VoucherOrderServiceImpl
修改下单动作,现在我们去下单时,是通过lua表达式去原子执行判断逻辑,如果判断我出来不为0 ,则要么是库存不足,要么是重复下单,返回错误信息,如果是0,则把下单的逻辑保存到队列中去,然后异步执行
//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 用于线程池处理的任务
// 当初始化完毕后,就会去从对列中去拿信息private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while (true){try {// 1.获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();// 2.创建订单handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常", e);}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {//1.获取用户Long userId = voucherOrder.getUserId();// 2.创建锁对象RLock redisLock = redissonClient.getLock("lock:order:" + userId);// 3.尝试获取锁boolean isLock = redisLock.lock();// 4.判断是否获得锁成功if (!isLock) {// 获取锁失败,直接返回失败或者重试log.error("不允许重复下单!");return;}try {//注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效proxy.createVoucherOrder(voucherOrder);} finally {// 释放锁redisLock.unlock();}}//aprivate BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue<>(1024 * 1024);@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");// 1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));int r = result.intValue();// 2.判断结果是否为0if (r != 0) {// 2.1.不为0 ,代表没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}VoucherOrder voucherOrder = new VoucherOrder();// 2.3.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 2.4.用户idvoucherOrder.setUserId(userId);// 2.5.代金券idvoucherOrder.setVoucherId(voucherId);// 2.6.放入阻塞队列orderTasks.add(voucherOrder);//3.获取代理对象proxy = (IVoucherOrderService)AopContext.currentProxy();//4.返回订单idreturn Result.ok(orderId);}@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();// 5.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了log.error("用户已经购买过了");return ;}// 6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣减失败log.error("库存不足");return ;}//保存到数据库save(voucherOrder);}
小总结:
秒杀业务的优化思路是什么?
-
先利用Redis完成库存余量、一人一单判断,完成抢单业务
-
再将下单业务放入阻塞队列,利用独立线程异步下单
-
基于阻塞队列的异步秒杀存在哪些问题?
-
内存限制问题
-
数据安全问题
-
7.Redis消息队列
7.1 Redis消息队列-认识消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
-
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
-
生产者:发送消息到消息队列
-
消费者:从消息队列获取消息并处理消息
使用队列的好处在于 解耦:所谓解耦,举一个生活中的例子就是:快递员(生产者)把快递放到快递柜里边(Message Queue)去,我们(消费者)从快递柜里边去拿东西,这就是一个异步,如果耦合,那么这个快递员相当于直接把快递交给你,这事固然好,但是万一你不在家,那么快递员就会一直等你,这就浪费了快递员的时间,所以这种思想在我们日常开发中,是非常有必要的。
这里我们可以使用一些现成的mq,比如kafka,rabbitmq等等,但是呢,如果没有安装mq,我们也可以直接使用redis提供的mq方案,降低我们的部署和学习成本。
7.2 Redis消息队列-基于List实现消息队列
基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是先进先出,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
基于List的消息队列有哪些优缺点? 优点:
-
利用Redis存储,不受限于JVM内存上限
-
基于Redis的持久化机制,数据安全性有保证
-
可以满足消息有序性
缺点:
-
无法避免消息丢失
-
只支持单消费者
7.3 Redis消息队列-基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
基于PubSub的消息队列有哪些优缺点?
优点:
-
采用发布订阅模型,支持多生产、多消费
缺点:
-
不支持数据持久化
-
无法避免消息丢失
-
消息堆积有上限,超出时数据丢失
7.4 Redis消息队列-基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:
例如:
读取消息的方式之一:XREAD
例如,使用XREAD读取第一个消息:
XREAD阻塞方式,读取最新的消息:
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:
注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的XREAD命令特点:
优点:
-
消息可回溯
-
一个消息可以被多个消费者读取
-
可以阻塞读取
缺点:
-
有消息漏读的风险
7.5 Redis消息队列-基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
创建消费者组:
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
- MKSTREAM:队列不存在时自动创建队列
其它常见命令:
删除指定的消费者组
XGROUP DESTORY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
XACK 是 Redis 中用于将消息从消费者组的待处理消息列表(pending list)移动到已处理消息列表的命令
XACK key group ID[ID...]
从消费者组(pending-list)读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
-
group:消费组名称
-
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
-
count:本次查询的最大数量
-
BLOCK milliseconds:当没有消息时最长等待时间
-
NOACK:无需手动ACK,获取到消息后自动确认
-
STREAMS key:指定队列名称
-
ID:获取消息的起始ID:
">":从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始。
消费者监听消息的基本思路:
消息队列的XREADGROUP命令特点:
-
消息可回溯
-
可以多消费者争抢消息,加快消费速度
-
可以阻塞读取
-
没有消息漏读的风险
-
有消息确认机制,保证消息至少被消费一次
总结:
7.6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
把阻塞队列改进为消息队列
需求:
-
创建一个Stream类型的消息队列,名为stream.orders
-
修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
-
项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
lua增加1.3和3.6两处逻辑
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2.库存不足,返回1return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3.存在,说明是重复下单,返回2return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
VoucherOrderServiceImpl
private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有消息,继续下一次循环continue;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);//处理异常消息handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有异常消息,结束循环break;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理pendding订单异常", e);try{Thread.sleep(20);}catch(Exception e){e.printStackTrace();}}}}
}