《引言》
(中)篇将接着记录 Redis 实战篇 ——《黑马点评》(上)篇之后的学习内容与笔记,希望大家能够点赞、收藏支持一下 ᕦ(・ㅂ・)ᕤ,谢谢大家。
传送门(上):Redis 实战篇 ——《黑马点评》(上)
传送门(中):当-前-页
传送门(下):NULL
三、优惠卷秒杀
1. 全局唯一 ID 与秒杀
在如秒杀等业务场景中,并发问题是其中的主要问题,因为此时会产生许多并发的抢购操作请求,而如果在表结构中使用自增主键 id 来保存购物信息,会导致 id 的规律性太明显且在大用户量的基础下,会产生数以亿计的数据量,而单表是无法保存如此庞大的数据的。且订单 id 应该具有唯一性,但将表进行拆分出现了分布式的存储时,会违背其唯一性。
所以在这种情况下,我们就要用到全局 ID 生成器,其能在分布式系统下用来生成全局唯一 ID 的工具,虽然我们是单体架构,但因为数据量大,所以也可以使用全局 ID 生成器。其要满足下列特性:
- 唯一性:ID 必须唯一,如订单业务中不能出现重复 ID。
- 高可用:确保任何时候都能生成可用的 ID。
- 高性能:能够正确生成 ID,且生成效率足够快。
- 递增性:确保整体是逐渐变大的,尽量符合数据库 ID 自增的特性,便于数据库创建索引,提高查询速度。
- 安全性:ID 的规律需要确保不被猜测出来。、
而 Redis 几乎满足了所有的特性,但其安全性无法通过其自增数值来保证,但可以不直接使用 Redis 自增的数值,而是通过拼接字符串的形式充当 ID。
ID:使用 Java 中的 Long 类型,占 8 字节,64 个 bit 位。
- 符号位(1 bit):第一个 bit 位,表示永远是正数。
- 时间戳(31 bit):定义一个初始时间,单位:秒,记录下单时间与初始时间相差多少秒,用于增加 ID 的复杂度。
- 序列号(32 bit):防止时间戳重复的情况,增加 ID 的复杂度。
1.1. Redis 实现全局唯一 ID
想要使用 Redis 实现全局唯一 ID 的功能,我们需要分成三步实现:
- 生成时间戳
- 生成序列号
- 进行拼接
// 2024-1-1 00:00:00
private static final long BEGIN_TIMESTAMP = 1704067200L;//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timeStamp = nowSecond - BEGIN_TIMESTAMP;
● 第一步:使用 LocalDateTime 获取到当前时间后,使用 toEpochSecond 方法转化为自 1970-1-1 00:00:00 到当前时间为止的秒数,最后用当前的秒数减去我们规定好的起始时间就得到了时间戳。
//2.生成序列号
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long increment = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
● 第二步:使用 Redis 里 String 类型中的自增长方法(increment)来确保 ID 的递增性,且因为键 icr:业务名 以业务名为 key,但一个 key 的 value 不断自增存在达到上限的风险,所以我们需要在其后跟上时间来完善,其还能起到以时间为分隔统计的作用。
//需要左移的位数
private static final int COUNT_BITS = 32;//3,拼接
return timeStamp << COUNT_BITS | increment;
● 第三步:利用位运算进行拼接,因为序列号共有 32 个bit位,所以要左移 32 位,且左移后位上全为 0,做或运算相当于加上 increment。
完整代码如下
@Resource private StringRedisTemplate stringRedisTemplate; private static final long BEGIN_TIMESTAMP = 1704067200L; private static final int COUNT_BITS = 32; public long nextId(String keyPrefix) {//1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timeStamp = nowSecond - BEGIN_TIMESTAMP;//2.生成序列号String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));long increment = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);//3,拼接return timeStamp << COUNT_BITS | increment; }
1.2. 优惠卷的分类
在项目中,优惠卷分为普通优惠券与限时秒杀优惠券,一个是永久有效的、一个是限时限量抢购的。而在数据库中与之对应的存在两个表,一个是普通优惠券的信息表、一个是与此表的 id 一一对应关联的特殊优惠券表,其在普通优惠券的信息基础上又多了数量、开始时间、结束时间等等信息。
我们可以通过接口 http://localhost:8081/voucher/seckill 来新增优惠券,其中金额的单位为分,添加一个限时优惠券。(注意:设定的时间过期后将无法显示;且与之绑定商铺 Id 为 1)
最终效果如下
可以看到成功的添加了限时代金券。
1.3. 实现下单功能
同样的,想要实现下单的功能,也需要我们分步进行。但这只是简单的实现下单功能,其中还存在着诸多的多线程并发问题。
- 查询优惠券信息判断是否过期或未开始
- 判断库存是否充足,充足则扣减库存
- 创建订单后返回订单ID
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherServices.getById(voucherId)
//2.判断是否结束
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀尚未开始");
}
//3.判断是否开始
if (voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已经结束");
}
● 第一步:根据传来的优惠券 Id 查询相关信息后,分别判断其是否尚未开始,或者已经过期。如果不符合,则返回相应的错误信息。
//4.判断开始后库存是否充足
if (voucher.getStock() < 1) {//4.1 库存不足return Result.fail("库存不足");
}
//4.2 库存充足,扣减库存
boolean success = seckillVoucherServices.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
if (!success){return Result.fail("库存不足");
}
● 第二步:判断库存是否充足,如果库存数小于 1,则表示库存不足,需返回错误信息;反之,则扣减库存。其中 setSql 用于自定义一个 SQL 语句,在 voucher_id 相等时将库存 stock 减 1。并对返回的结果进行判断,如果失败,则返回错误信息。
//5.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//5.1 订单Id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//5.2 用户Id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//5.3 代金券Id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//返回订单Id
return Result.ok(orderId);
● 第三步:扣减库存后创建订单信息并写入数据库中,其中的订单 Id 使用之前写好的使用 Redis 实现的全局唯一 ID 生成器来生成;用户 Id 则从 UserHolder 工具类中获取,其用于在用户登录成功后保存用户信息;代金券 Id 就是将传入的参数直接使用即可,最后保存订单信息到数据库后返回生成的订单 Id 号。
至此,我们成功的实现了简单的下单功能。但!!!,其内部存在着许多的多线程并发问题,尤其是当大量线程抢购时,会出现超卖问题,就是订单数超出了库存数。如下所示,当线程 2 在线程 1 扣减前进行查询时,就会导致数据不同步,造成超卖问题。
想要解决这个问题,首先想到的方案就是加锁。而锁分为悲观锁与乐观锁两种方案。其中,悲观锁认为线程安全问题一定会发生,所以会将所有线程串行执行,这样会导致性能降低,在高并发的情况下不适用;而乐观锁则认为线程安全问题不一定会发生,它只会在更新数据时去判断数据有没有被修改来确定是否存在线程安全问题,所以性能相较于悲观锁会好一点。
所以,在比对二者之间的优异后,选择使用乐观锁来解决,而乐观锁常见的方式有两种:
- 版本号法:使用版本号来进行校验,每次修改之前对版本号进行比对并在修改后将版本号加一。
- CAS 法:运用数据本身有无变化来判断线程是否安全,就是使用数据本身来替代版本号,省去了版本号。
最终选择使用 CAS 法来解决多线程并发的问题。
1.将原先扣减库存的语句新增对 stock 库存的判断,判断其是否在扣减前被修改过。
//4.2 库存充足,扣减库存
boolean success = seckillVoucherServices.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", voucher.getStock()).update();
但在经过 jmter 测试后,发现产生了大量的异常情况。其原因是由于线程之间存在误差,如果线程 1 在线程 2 扣减前提前扣减了库存,则会导致线程 2 无法扣减库存。究其原因还是对数据的校验太过于严谨(等值),对于该业务,我们可以只用判断其是否还大于 0 即可。
//4.2 库存充足,扣减库存
boolean success = seckillVoucherServices.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();
最后完美达到预期,解决了高并发下的超卖问题。
但!!!(对,还是有问题),在业务要求中,限量代金券应当只允许一人一·单,一个用户不能购买多次,因为限量代金券的目的是为了吸引新客户购买,所以需要对每个用户的购买数进行校验,如果订单号中存在对应的用户 Id,则返回错误信息,拒绝收售卖。
//优化:一人一单
Long userId = UserHolder.getUser().getId(); //将后面创建订单处的获取 userId 移到此处
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {//用户已购买过return Result.fail("用户已购买过");
}
在扣减库存操作前对订单是否存在中用户 Id 的订单数量进行校验,如果查询出数量大于 0,则代表该用户已经购买过代金券了,直接返回错误信息。
但!是!!(ψ(*`ー´)ψ),在进行校验后依然存在问题,仍然是在多线程下产生的并发问题。因为在多线程的操作下,仍会存在线程同时去操作。所以这里就需要使用悲观锁(插入数据)来确保线程的安全性。
● 第一步:将对订单的校验和创建操作封装为一个方法。(Ctrl+Atl+M)
● 第二步:将 seckillVoucher 方法上的 @Transactional 更改到生成的方法上。
● 第三步:对用户 Id 加锁,与锁定整个方法而言可以减小锁定资源范围。但在以 Id 为锁的情况下,需要比较的是 Id 的值,而非每次调用时生成的新的 Id 对象;且因为 toString() 方法其底层是返回新创建的 String 字符串,每次调用返回的都是新的字符串对象。所以就需要 intern 方法(字符串方法)来将字符串规范化,就是返回在字符串常量池中与字符串的值一样的字符串的地址。保证 Id 一样时,锁也一样。
● 第四步:对于锁锁定的范围,如果只锁定提取的方法中的操作,则会导致在锁释放会,事务才会提交,这样同样会产生并发问题。所以需要在事务提交之后再释放锁,在调用方法外去加锁,方法结束也就是事务提交之后才会释放锁。
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {//提取的方法return createVoucherOrder(voucherId);
}
但注意此时会导致 Spring 事务失效,原因是此时调用方法的是 this.createVoucherOrder,此处的 this 并不是代理对象,但事务被非代理对象调用时,会导致 Spring 事务失效,因为其依赖于代理对象去管理事务。所以我们需要拿到当前对象的代理对象。
Spring 事务失效的几种情况通常包括:
方法没有被Spring管理。
方法不是public的。
目标对象没有被Spring容器管理,即没有通过@Autowired/@Inject等注解注入。
接口的实现类上没有配置事务注解。
事务方法内调用了本类中未标记事务的方法。
异常类型不被当前的事务设置所识别或配置错误,例如rollbackFor属性设置不当。
数据库本身不支持事务或者不在同一个事务上下文中。
事务方法被非代理对象调用,即没有通过Spring生成的代理对象调用。
异步方法(@Async)内部不支持事务。
异常发生在事务提交阶段,导致事务已经被提交,无法回滚。
我们使用 AopContext 中的 currentProxy 方法来获取到代理对象后,使用该对象调用提取的方法即可保障事务生效。
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {//获取与事务有关的代理对象IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);
}
之后需要在接口中创建好提取的方法,并且添加新的依赖——aspectjweaver,以及在启动类上添加 @EnableAspectJAutoProxy(exposeProxy = true) 去暴露代理对象,默认不暴露,无法获取代理对象。
<!-- aspectJ -->
<dependency><groupId>org.aspectj<artifactId>aspectjw
</dependency>
最后经过 jmter 的测试(记得需要及时更新请求头 authorization)后虽然我这里未知原因异常百分比是 100%(不知道啥原因 ┗( ▔, ▔ )┛),但在数据库中确实只扣减了一次库存,且只有一个订单。
但~~(┓(;´_`)┏),以上情况仅限于单机模式下才会成功起到效果,而在集群模式或分布式系统的情况下,因为锁的原理是在 JVM 内部维护了一个锁的监视器对象,但由于集群模式下不同的部署都有自己的 JVM,且不同 JVM 下锁监视器不同。导致出现并发问题。而下面,我们就需要使用分布式锁来解决集群模式下的并发问题。(注意:开启了)
2. 分布式锁
而在集群模式或分布式系统下,上面实现的锁就不会起到太大的作用了,这就需要用分布式锁了。分布式锁就是满足分布式系统或集群模式下多进程可见并且互斥的锁。其中重点就是多线程可见,而常见的实现分布式锁的方法就是使用 MySQL、Redis、Zookeeper 等等来实现
MySQL | Redis | Zookeeper | |
互斥 | 利用自身的互斥锁机制 | 利用 setnx 这样的互斥命令 | 利用节点的唯一性和有序性实现 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接后自动释放锁 | 利用锁的超时时间到期后释放 | 临时节点断开连接后自动释放 |
而在分析其优点缺点后,选择使用 Redis 来实现分布式锁(这不本来也是学 Redis 的...)。
我们需要使用 Redis 中的 setnx 命令来实现分布式锁,详细实现思路如下所示:
1. 获取锁:使用 setnx 命令保证只有一个线程能操作成功,且在成功时返回 true,失败是返回 false 保证是非阻塞的。其中使用 NX(不存在时创建)保证互斥;EX(设置过期时间)保证锁能被及时释放。
2. 释放锁:我们可以通过手动释放(手动删除),也可以等过期时间自动释放。
2.1. 分布式锁初级版本
这里只是简单的使用 Redis 来实现分布式锁的功能,我们定义一个接口,其中包括获取锁和释放锁两个方法,并在实现类中实现相应的方法。
定义的接口如下所示:
public interface ILock {/*** 尝试获取锁* @param timeoutSec 锁的超时时间,过期后自动释放* @return true代表获取锁成功,false代表获取锁失败*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();
}
尝试获取锁的方法如下所示:
private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(long timeoutSec) {Long threadId = Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return BooleanUtil.isTrue(success);}
其中,因为此类不是由 Spring 管理的,所以 StringRedisTemplate 只能使用构造函数为其赋值而不能注入。下面的 KEY_PREFIX 是定义好的键的前缀。在方法内,我们先获取到当前线程的 ID 标示,使用 setIfAbsent 方法获取锁并将线程 ID 保存到锁中,其保证了互斥,同时设置了过期时间。且此方法对返回结果进行了判断,无需判断是 ok 或是 nil。但由于直接返回包装类存在拆箱问题,所以这里使用 isTrue 方法进行判断后返回结果,其方法底层实现如下图所示。
释放锁的方法如下所示:
@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX + name);}
相对而言,释放锁的实现逻辑很简单,只需将其删除即可。(但后面肯定会出问题要改造的o(╥﹏╥)o)
紧接着,我们就需要对原先的锁逻辑进行改造了。
Long userId = UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//获取锁
boolean isLock = lock.tryLock(1200);
//判断是否获取锁成功
if (!isLock) {//获取锁失败,返回失败或者重试return Result.fail("不允许重复下单");
}
try {//获取与事务有关的代理对象IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);
} finally {//释放锁lock.unlock();
}
首先将我们写好的类创建出来,其中使用业务名 order 拼接上用户 Id 来确保不同的用户能获取到不同的锁。获取锁失败后返回错误信息;获取锁成功后执行后续逻辑,将其使用 try finally 包围保证锁最终会被释放。
这样,就实现了分布式锁的初级版本。(耶(^-^)V)
2.2. 改进分布式锁——误删问题
在一般情况下,我们实现的初级版本并不会产生问题,但~,在一些十分巧合的情况下,存在误删的问题。
在线程一获取到锁后业务如果阻塞了,造成锁超时释放,此时线程二获取锁就可以成功,此时的锁是线程二的锁,但如果此时线程一完成业务后会将线程二的锁释放,从而导致误删,使其他线程还能再次获取锁成功。究其原因还是因为在释放锁的逻辑中只是简单的删除,并没有进行校验。
所以,在解决误删问题时,我们就需要将线程的标示存入锁中,以便于在释放锁时对线程进行校验,如果标示一致则继续释放,如果标示不一致则不释放。且在集群模式下,每一个 JVM 内部都会为维护一个递增的数字来做为线程 Id,所以不同 JVM 下的线程 Id 可能出现相同的情况。所以还需要使用 UUID 来区分不同的线程。
改造获取锁:
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
使用 hutool 包下的 UUID 工具类来生成相应的 UUID,且由于其生成的字符串中会带有 " - ",所以在其 toString 方法内传入 true 表示去掉其中的 “ - ” 符号。
//获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
最后,只需在获取线程标示时将其拼接在前面即可。
改造释放锁:
在改造释放锁时,我们只需在释放前加上对线程 Id 的校验即可,
//获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁中标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
//判断是否一致
if (threadId.equals(id)) {stringRedisTemplate.delete(KEY_PREFIX + name);
}
首先还是先获取到线程标示,随后再获取到锁中标示并对两者进行比对,如果相同则进行删除操作,如果不同则不做任何操作。
至此,成功解决了误删问题,只需在释放时对线程标示进行校验即可,防止线程误删不属于自己的锁。(测试就免了吧,我偷个懒( ̄ェ ̄;))
2.3. 改进分布式锁——原子性问题
虽然上面的改造已经杜绝了绝大多数问题的发生,但是!!!,在极其极端的状况下还是会出现问题。(好好好...)
根本原因是因为我们释放锁中的校验与删除操作不是原子性的,当校验完毕后由于 JVM 的垃圾回收机制可能导致线程阻塞在此,如果阻塞的时间足够长使锁超时自动释放,那么其他线程还是可以获取到锁,从而再次产生误删问题的发生,究其根本是因为校验与删除操作不具有原子性。
想要保证原子性,可以使用 Redis 的事务去解决该问题,但这种做法需要使用事务配合乐观锁进行校验实现,操作较为复杂。我们可以使用 Lua 脚本解决该问题,其原理是将校验与删除操作写入脚本中,使用脚本执行多条命令从而保证其原子性。
其中 Lua 是一种轻量级的语言,我们只需进行了解其基础语法即可。(Lua 教程 | 菜鸟教程)这里我就直接跳过,不过多描述 Lua 的语法,只需要知道其中可以通过 redis.call() 来执行 Redis 的命令,如 redis.call('set', 'name', 'jack') 就等同于 set name jack。
Lua 脚本的内容如下所示,可以在 IDEA 中下载一个插件 EmmyLua 来创建脚本文件。
-- 比较线程标示与锁中的标示
if (redis.call('get', KEYS[1]) == ARGV[1]) then--释放锁return redis.call('del', KEYS[1])
end
return 0
在 Java 代码中,我们可以先创建一个 DefaultRedisScript 并进行相应的配置来初始化脚本,其中 setLocation 用于指定脚本文件,setResultType 用于指定返回值类型。且使用静态代码块来初始化,随着类的加载完成初始化,无需重复加载,提高了性能。
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}
最后只需在原先释放锁的方法中使用 execute() 方法来调用初始化好的 lua 脚本即可,其中传入参数分别为脚本对象、key 与锁的标示,其中 key 需要转为 List 集合传入。
//调用 lua 脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
最后,就成功解决了分布式锁的原子性问题,其校验与删除操作在一行代码中完成,保证了其原子性。
2.3. Redisson 组件
虽然已经解决了一些潜在的问题,但!!!,其实上述的分布式锁还是存在着许多的问题,如不可重入、不可重试、超时释放、主从一致性等问题解决起来较为麻烦。所以就需要我们使用一个全新的组件——Redisson,它包含了许多的分布式服务,其中就包括分布式锁。
想要使用 Redisson,第一步就是引入对应的依赖:
<!-- redisson -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.2</version>
</dependency>
第二步就是将其配置成一个 Bean,并在其中进行对应的配置:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){//配置Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");return Redisson.create(config);}
}
其中在配置类上加 @Configuration 注解,其中的 Config 类是 redisson 提供的,使用其来设置地址与密码,配置完成后,就可以在代码中去使用它了。
第一步:注入 RedissonClient 后获取锁对象
@Resource
private RedissonClient redissonClient;
//Redisson 改造
//创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
第二步:尝试获取锁
//获取锁
boolean isLock = lock.tryLock();
其中可以传入三个参数,如果无参则 waitTime 默认为 -1,失败后不等待立即返回,且默认过期时间为 30 秒。
这里就不记录 Redisson 组件各功能的底层原理了(偷懒偷懒ψ(*`ー´)ψ)
3. 秒杀优化
为了在代码中进行优惠券秒杀时进一步提高并发性能,需要经过多个步骤来进行改进,其中又分为简单操作与复杂操作,各自的耗时不同,而在业务中其串行执行,对效率存在一定的影响。
我们可以将其进行拆分,耗时短的校验操作的由当前线程去处理,耗时长的读写数据库操作可以交给另一线程去处理,这样就可以大大提高业务的性能,同时在校验时又将读取数据库改为在 Redis 中进行校验,因 Redis 的性能要高于 MySQL,也提高了校验操作的性能。将同步的数据库操作变为了异步的操作,大大提高了性能。
3.1. 基于 Redis 实现秒杀优化
基于 Redis 进行秒杀优化,需要分为四步进行,同时使用了异步操作提高了并发的性能。
- 在新增秒杀优惠券的同时,将其信息保存到 Redis 中
- 基于 Lua 脚本(保证原子性)对库存进行判断,一人一单
- 抢购成功后将优惠券 Id 和用户 Id 封装后存入阻塞队列中
- 开启新线程不断从阻塞队列中获取信息,实现下单功能
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀信息到 Redis 中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
● 第一步:在原先保存秒杀优惠券的代码下将其信息中的库存数保存到 Redis 中,便于在查询时直接查询 Redis,提高效率。key 是前缀 + 优惠券 Id;value 是优惠券的库存数量。
lua 脚本(seckill.lua)
local voucherId = ARGV[1]
local userId = ARGV[2]
-- 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 判断库存是否充足
if (tonumber(redis.call('get', stockKey)) <= 0) then-- 库存不足return 1
end
-- 判断用户是否存在于已购买记录中
if (redis.call('sismember', orderKey, userId) == 1) then-- 用户已经购买过return 2
end
-- 库存 -1
redis.call("incrby", stockKey, -1)
-- 添加用户 Id 到已购买集合
redis.call("sadd", orderKey, userId)
-- 秒杀成功
return 0
VoucherOrderServiceImpl 类
//封装 Lua 脚本对象
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);
}
seckillVoucher 方法
Long userId = UserHolder.getUser().getId();
// 1.执行 Lua 脚本
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());
int r = result.intValue();
if (r != 0) {return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
● 第二步:在原先的优惠券秒杀方法中先去执行 Lua 脚本判断用户是否具有购买资格,最后使用三元运算符对返回结果进行判断,并返回相应错误信息。
//声明阻塞队列 传入队列容量
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
//声明 proxy 成员变量
private IVoucherOrderService proxy;
// 返回值为 0,表示有购买资格,生成订单信息
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 放入阻塞队里
orderTasks.add(voucherOrder);
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
● 第三步:判断为具有购买资格后,生成订单信息,并将其放入阻塞队列中,同时因为在子线程中无法获取到父线程的代理对象(获取代理对象通过 ThreadLocal 实现,但每个线程都有自己独立的 ThreadLocal),而 Spring 的事务管理需要父线程代理对象实现,所以需要在父线程中提前获取到代理对象,这里将其声明为成员变量使其可以在子线程中使用。最后直接返回订单 Id,代表秒杀成功,提高并发性能。
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct
private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true){try {//1.获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();//2.创建订单handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("获取订单信息出错", e);}}}
}private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();//创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁boolean isLock = lock.tryLock();//判断是否获取锁成功if (!isLock) {//获取锁失败,返回失败或者重试log.error("不允许重复下单");}try {proxy.createVoucherOrder(voucherOrder);} finally {//释放锁lock.unlock();}
}
● 第四步:创建一个线程池 ExecutorService,之后再创建一个线程任务(Runnable),其中不断的读取阻塞队列中的订单信息,take() 就是实现阻塞等待的方法。同时为了在秒杀开始之前执行该操作,通过在 init 方法上加 @PostConstruct 注解使其在当前类初始化完毕后执行 init 方法。
将创建订单封装为一个函数(handleVoucherOrder),流程与之前的步骤相同,区别在于需要从传入的订单信息中获取对应的信息,且出错只需记录错误日志即可。而方法中调用的 createVoucherOrder() 方法也需要改造。
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();//一人一单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {//用户已购买过log.error("用户已购买过");}//库存充足,扣减库存boolean success = seckillVoucherServices.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {log.error("库存不足");}//创建订单save(voucherOrder);
}
其中不用返回结果,只需记录错误日志即可。同时不再从 ThreadLocal 中获取用户 Id 而是从传入的订单信息中获取。
效果展示
这里仅使用 ApiFox 单用户测试演示,jmter 的多人秒杀测试因为需要大量 token 比较麻烦就不进行了。
第一次抢购,成功返回订单 Id,同时 Redis 与 MySQL数据库中的库存数减一,并保存了对应的订单信息。
再次点击发送抢购请求时,返回错误信息“不能重复下单”,因在 Lua 脚本中会对传入的用户 Id 判断其是否存在于已购买用户的行列中从而保证一人一单。
注意:测试时 Redis 中需要有对应优惠券的库存数,如果测试的是代码修改前已存在的优惠券,需要手动写到 Redis 中。
3.2. Redis 消息队列实现秒杀优化
虽然已经利用 Redis 实现了异步秒杀的功能,但主要是利用 JVM 的阻塞队列实现的异步秒杀功能,其会存在一些问题。由于 JVM 具有内存限制,在高并发的情况下可能会导致内存超出上限;如果从阻塞队列取出数据时出现异常,会导致任务未处理和数据丢失的问题。
而解决以上问题的最佳方法就是使用消息队列(加一层ㄟ( ▔, ▔ )ㄏ ),其能发送、存放、接收消息,分别对应其中的三个角色:生产者、消息队列、消费者。最常见的消息队列中间件就是 RabbitMQ ,在我的文章 【MQ】学习笔记——消息队列(MQ)之 黑马RabbitMQ 中就记录了学习内容。
而 Redis 中也提供了实现消息队列的方式,一共有三种;
1. List:基于 List 结构来模拟消息队列(双向链表先进先出 如:左进右出)
可以看做是利用 List 来模拟阻塞队列满足基本需求的消息队列,且通过 BRPOP 或 BLPOP 来实现阻塞效果。
优点:
- 利用 Redis 进行存储,不再受限于 JVM 的内存上限
- 基于 Redis 的持久化机制,数据的安全线有保障
- 可以满足消息的有序性
缺点:
- 无法避免消息丢失(在取出消息的同时出现异常会导致数据丢失)
- 仅支持单消费者
2. PubSub:基本的点对点模型,于 Redis 2.0 版本引入的模型
除了上图中的两种命令(发送、订阅),PSUBSCRIBE pattern [pattern] 还可以订阅与通配符格式一致的所有频道,其通配符格式要求如下所示
优点:
- 采取发布订阅的模型,支持多生产、多消费
缺点
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据会丢失
3. Stram:较完善的消息队列模型,于 Redis 5.0 版本引入的新数据类型
◆ 单消费者模式
发送消息——xadd
读取消息——xread
其中阻塞时间为 0 时代表一直阻塞,直到读取到消息,且消息可以被重复读取。但当想要使用读取最新消息($)来实现持续监听队列时,如果此时业务正在处理时又发送了许多消息,那么可以获取到最新消息,但前面的消息就会丢失。
优点:
- 消息可以被重复读取到
- 一个消息可以被多个消费者读取
- 可以阻塞读取
缺点
- 有消息漏读的风险
◆ 消费者组模式
消费者组就是将多个消费者划分到一个组中,共同监听同一个队列。
特点:
- 消息分流:消费者组内具有竞争关系,消息会分流给不同的消费者,可以提高处理消息的速度。而如果想要消息被多个消费者消费,可以添加多个消费者组。
- 消息标示:消费者组会维护一个标示记录最后一个被处理的消息,即使消费者宕机重启,仍能从标示之后读取到消息,确保每一个消息都能被消费
- 消息确认:消费者获取消息后,消息会处于 pending 状态,并存入一个 pending-list 中,当处理完成后需要通过 XACK 来确认消息,标记消息为已处理后才会将其从 pending-list 中移除。避免消息丢失的问题。
语法:
创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始 ID 标示,$ 表示队列中最后一个消息,0 表示队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
删除指定的消费者组
XGROUP DESTORY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
删除消费者组中指定的消费者
XGROUP DELCONSUMER key groupName consumerName
从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREANS key [key ...] ID [ID ...]
group:消费者组名
consumer:消费者名称,如果消费者不存在则自动创建一个
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时的最长等待时间
NOACK:无需手动 ACK,获取消息后自动确认(不建议设置)
STREAMS key:指定队列名称
ID:获取消息的起始 ID(> 表示从下一个未消费的开始,其它 则表示根据指定 ID 从pending-list 队列中获取已消费但未确认的消息)
将未处理消息处理
XACK key group ID [ID ...]
key:队列名称
group:消费者组名
ID:消息的唯一 ID
获取 pending-list 中一定范围内未确认的消息
XPENDING key group [ [IDLE min-idle-time] start end count [consumer] ]
key:队列名称
group:消费者组名
IDLE min-idle-time:空闲时间(获取以后,确认之前)
start end:消息范围(最大 ID ~ 最小 ID;- + 表示都获取)
count:获取消息的数量
consumer:消费者名称(每个消费者都有自己的 pending-list)
3.3. 基于 Stream 结构实现秒杀优化
想要基于 Redis 中的 Stream 结构来实现秒杀优化。首先要做的就是在 Redis 中提前创建好一个消息队列。
XGROUP CREATE stream.order g1 0 MKSTREAM
这里我们创建一个名为 stream.order 的消息队列。其中的消费者组名为 g1,起始 ID 标识为 0,MKSTREAM 表示队列不存在时自动创建。
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
-- 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 判断库存是否充足
if (tonumber(redis.call('get', stockKey)) <= 0) then-- 库存不足return 1
end
-- 判断用户是否存在于已购买记录中
if (redis.call('sismember', orderKey, userId) == 1) then-- 用户已经购买过return 2
end
-- 库存 -1
redis.call("incrby", stockKey, -1)
-- 添加用户 Id 到已购买集合
redis.call("sadd", orderKey, userId)
-- 发送消息到消息队列中
redis.call("xadd", "stream.orders", "*", "userId", userId, "voucherId", voucherId, "id", orderId)
-- 秒杀成功
return 0
● 第一步:在原先秒杀条件校验的 Lua 脚本中添加发送消息到消息队列的操作,在判断秒杀成功的条件下将对应的订单信息(用户 Id、优惠券 Id、订单 Id)发送到消息队列中。(这里需要新增一个参数接收订单 Id)
@Override
public Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");// 1.执行 Lua 脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));int r = result.intValue();if (r != 0) {return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);
}
● 第二步:在原先的秒杀方法中执行 Lua 脚本时传入生成的订单 Id,且由于是基于 Stream 结构实现的消息队列,所以不再需要将订单信息添加到阻塞队列中。
private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true){try {//1.获取消息队列中的订单信息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed()));if (list == null || list.isEmpty()){//获取失败continue;}MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.创建订单handleVoucherOrder(voucherOrder);// ACK确认stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, "g1", record.getId());} catch (Exception e) {log.error("获取订单信息出错", e);handlePandingList();}}}
}
● 第三步:在原先的线程任务中不断尝试获取到消息队列中订单信息,调用对应 Stream 类型的 read 方法。Consumer.from() 中传入消费者组名和消费者名。StreamReadOptions 代表可选参数,其中 count 设置获取消息的个数,block 设置阻塞的时间。StreamOffset 中指定队列名和读取消息的位置。
因为指定只取出一个消息所以只用获取返回集合中的第一个元素即可。取出的结构如上图所示,因存入了三个键值对(用户 Id、优惠券 Id、订单 Id),所以使用 MapRecord 类型存储,通过 getValue 方法取出后再使用 hutool 工具包中的 BeanUtill 将其内容填入订单对象中。接着在用之前的 handleVoucherOrder方法创建好订单后进行 ACK 手动确认已处理消息。
private void handlePandingList() {while (true){try {//1.获取消息队列中的订单信息List<MapRecord<String, Object, Object>> pendingList = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(QUEUE_NAME, ReadOffset.from("0")));if (pendingList == null || pendingList.isEmpty()){//pending-list 中不存在未处理消息,结束循环break;}MapRecord<String, Object, Object> record = pendingList.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.创建订单handleVoucherOrder(voucherOrder);// ACK确认stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list订单异常", e);try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}
}
如果线程任务中抛出了异常导致已处理消息未被确认,就会进入 handlePandingList 方法去检查 pending-list 队列中是否存在未确认消息,只在获取消息时不再设置阻塞时间且从 0 开始获取。如果未获取到则推出循环,如果出现异常则在休眠后继续尝试获取,直到没有异常,确保异常订单一定会被处理。
效果展示
这里仅使用 ApiFox 单用户测试演示(还是一样的原因|ू・ω・` )),记得将之前测试时的订单数据删除。
第一次发送请求后成功获取到订单 Id。
第二次发送请求后提示不能重复下单。
四、达人探店
1. 发布探店笔记
在该业务中,我们只需完善已经实现好的两个接口即可,分别是上传图片与发布接口。在 UploadController 中找到 uploadImag 方法,注意其中的 IMAGE_UPLOAD_DIR 常量,我们需要将其设置为本地对应 Nginx 目录下存放静态资源的 imgs 地址。
然后,就没有了!!!∑(゚Д゚ノ)ノ,成功实现了发布探店笔记的功能(这是学 Redis 的,还学别的干什么,大傻春┓(;´_`)┏)
效果展示
注意开启 Nginx 的负载均衡后需要将两个服务全部启动,否则会出现问题
可以看到成功将图片上传成功并且成功发布探店笔记。
但此时,查看笔记的接口还没有实现,所以要接着实现查看笔记的接口
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id) {return blogService.queryBlogById(id);
}
其接收笔记 Id,利用笔记 Id 去查询对应笔记信息并将其返回。可以根据与之功能相同的根据热度分页查询笔记来实现。
@Override
public Result queryBlogById(Long id) {Blog blog = getById(id);if (blog == null) {return Result.fail("笔记不存在!");}queryBlogUser(blog);return Result.ok(blog);
}private void queryBlogUser(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());
}
通过 Id 查询到对应笔记信息,并通过笔记中的用户 Id 查询用户的头像和昵称用于展示。最后返回笔记对象。
效果展示
注意:店铺不显示是因为在前面做活动预热时没有将此店铺信息保存到 Redis 中,从而导致店铺信息查询不到显示异常。
2. 点赞
我们知道点赞功能这一类操作频繁且简单的操作,需要保证每人只能对一个笔记点赞一次。如果这种校验如果通过数据库来实现由于频繁的操作会对数据库造成很大的压力,所以实现校验功能最好的方法就是通过 Redis 来实现类似一人一单的校验。
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {return blogService.likeBlog(id);
}
首先将方法转移到 Impl 层去实现,主要是利用 Set 数据类型保存已点赞的用户 Id,在下次重复操作时从中使用 SISMEMBER 命令判断其是否存在。
//1.获取当前用户 Id
Long userId = UserHolder.getUser().getId();
//2.判断当前用户是否已经点赞
String key = BLOG_LIKED_KEY + id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
获取当前用户的 Id,从 Redis 中判断该用户是否已经点赞过了。
if (BooleanUtil.isFalse(isMember)){//3.如果未点赞,可以点赞boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();if (isSuccess){stringRedisTemplate.opsForSet().add(key,userId.toString());}
}else {//4.如果已点赞,则不点赞boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();if (isSuccess){stringRedisTemplate.opsForSet().remove(key,userId.toString());}
}
return Result.ok();
结果一:未点赞,需要将点赞数 + 1 并存入数据库中,接着将用户 Id 存入 Redis 中;
结果二:已点赞,需要将点赞数 - 1 并存入数据库中,接着将用户 Id 从 Redis 删除。
以上判断完成后,返回成功。同时在判断完点赞操作后,在查询时还需展示点赞成功后标志变为醒目红色标志。Blog 类中有字段 isLike 记录是否点赞。
/*** 是否点赞过了*/
@TableField(exist = false)
private Boolean isLike;
在查看笔记与分页查询的方法中添加 isBlogLiked 方法判断笔记是否被当前用户点赞。
//查询 blog 是否被点赞
isBlogLiked(blog);
------------------------------
// 查询用户
records.forEach(blog -> {this.queryBlogUser(blog);this.isBlogLiked(blog);
});
private void isBlogLiked(Blog blog) {if (UserHolder.getUser() == null) {//用户未登录时无需查询是否点赞return;}Long userId = UserHolder.getUser().getId();String key = BLOG_LIKED_KEY + blog.getId();Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());blog.setIsLike(BooleanUtil.isTrue(isMember));
}
同理,获取到当前用户的 Id 从 Redis 中查询是否已点赞,如果已点赞,则将 isLike 字段设为 true。(注意:在用户未登录时也会进行分页查询,而此时无需对是否点赞进行判断,否则会出现空指针异常)
效果展示
可以看到最后成功实现了点赞的功能,一个用户只能对一个笔记点赞一次。
3.点赞排行榜
想要实现点赞排行榜的功能,就需要我们将原先保存用户 Id 的 Set 数据类型更改为 Sorted Set 数据类型进行存储,我们可以通过将每个用户点赞时间设置为分数来进行排序展示,而点赞时间越早展示则越靠前。
@Override
public Result likeBlog(Long id) {//1.获取当前用户 IdLong userId = UserHolder.getUser().getId();//2.判断当前用户是否已经点赞String key = BLOG_LIKED_KEY + id;Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());if (score == null){//3.如果未点赞,可以点赞boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();if (isSuccess){stringRedisTemplate.opsForZSet().add(key,userId.toString(), System.currentTimeMillis());}}else {//4.如果已点赞,则不点赞boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();if (isSuccess){stringRedisTemplate.opsForZSet().remove(key,userId.toString());}}return Result.ok();
}
● 第一步:对原先的点赞方法进行改造,存储类型变为 ZSet(Java 中 Sorted Set 为 ZSet),其中 add 方法再添加时加上当前时间作为 score,删除方法变为 remove,同时使用 score 方法查询对应键值对是否存在分数来判断是否已点赞。
@Override
public Result queryBlogLikes(Long id) {//1.查询 top5 的点赞用户Set<String> userTop5 = stringRedisTemplate.opsForZSet().range(BLOG_LIKED_KEY + id, 0, 4);if (userTop5 == null || userTop5.isEmpty()){return Result.ok(Collections.emptyList());}//2.解析出其中的 idList<Long> userIds = userTop5.stream().map(Long::valueOf).collect(Collectors.toList());String idStr = StrUtil.join(",", userIds);//3.根据 id 查询用户List<UserDTO> userDTOS = userService.query().in("id", userIds).last("order by field(id," + idStr + ")").list().stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());//4.返回return Result.ok(userDTOS);
}
● 第二步:在 Impl 层中实现查看点赞排行的功能。其中使用 range 方法范围查询获取排序在前五的用户 Id,如果为空则返回空集合避免空指针问题,将其用 stream 流解析后再从数据库中查询出相应用户信息再次利用 stream 流收集为集合返回。(注意:如果利用 mp 中的 list 方法会导致查询出的用户顺序不会通过从 Redis 中已经过时间排序得到的用户 Id 顺序进行返回,所以需要手写 sql 语句,利用 order by field 来返回对应 Id 顺序的用户信息)
效果展示
可以看到,成功展示了点赞的用户,并且利用时间对点赞的前后顺序进行了展示。
【中】完结
传送门(上):Redis 实战篇 ——《黑马点评》(上)
传送门(下):NULL