异步秒杀思路
原本的流程是如下所示,必须从开始到创建订单成功才会返回响应。就像饭店里面从下单到上菜都是一个人在服务,就导致服务员利用率很低,后一个顾客要等到前一个顾客上完菜才可以下单。
最简单的优化就是加员工,一次性就可以服务两个顾客。但是更好的优化是,只让一个服务员去记录下单信息,然后让后厨根据下单依次上菜即可。后面的顾客就可以不用等那么久了。
这个业务场景分为两个部分,对秒杀资格的判断和减库存下单,一个是查数据库,一个是改数据库,速度差异很大,所以这里可以将两个部分分给两个线程去执行。主线程判断购买资格,副线程负责减库存下单。
然后针对要查询数据库的操作也可以优化,将数据存在redis,判断有秒杀资格之后直接返回成功信息给用户,然后后续操作根据消息队列里面的消息进行异步执行。
将优惠券信息先存在redis里面,到时候下单先操作redis,再去操作mysql.然后用一个set去存储所有下过单的用户的id,防止重复下单。
基于Redis完成秒杀资格判断
1.保存优惠券信息到Redis
@Service
public class VoucherServiceImpl extends ServiceImpl<VoucherMapper, Voucher> implements IVoucherService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result queryVoucherOfShop(Long shopId) {// 查询优惠券信息...// 返回结果...}@Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券...// 保存秒杀信息...//保存秒杀库存到RedisstringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());}
}
2.基于Lua脚本判断是否下单成功
-- 1.参数列表
-- 1.1 优惠券id
local voucherId=ARGV[1]
-- 1.2 用户id
local userId=ARGV[2]-- 2.数据key
-- 2.1库存key
local stockKey='seckill:stock:'..voucherId
-- 2.2.订单key
local orderKey='seckill:order:'..voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey))<= 0) then--获取到字符串转成数值类型-- 3.2. 库存不足 返回1return 1
end
-- 3.2 判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId)==1) then--3.3存在,说明是重复下单,返回2return 2
end-- 3.4.扣减库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5.下单(保存用户) sadd orderKey userId
redis.call('add',orderKey,userId)
改造秒杀下单资格判断业务
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static{SECKILL_SCRIPT=new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());int r = result.intValue();//2.判断结果是否为0if(r!=0){//2.1.不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//2.2为0,有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");//TODO 保存阻塞队列//3.返回订单idreturn Result.ok(orderId);}
基于阻塞队列实现秒杀异步下单
3. 封装优惠券id和用户id进阻塞队列4.获取阻塞队列消息,实现异步下单
public interface IVoucherOrderService extends IService<VoucherOrder> {Result seckillVoucher(Long voucherId);void createVoucherOrder(VoucherOrder voucherOrder);}
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static{SECKILL_SCRIPT=new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private BlockingQueue<VoucherOrder>orderTasks=new ArrayBlockingQueue<>(1024*1024);private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();@PostConstruct //当前类初始化完毕时就执行private void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while(true){try {//1.获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();//2.创建订单handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.error("处理订单异常",e);}}}}//处理订单private void handleVoucherOrder(VoucherOrder voucherOrder) {//1. 获取用户idLong userId = voucherOrder.getUserId();//2.创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//3.获取锁
// boolean isLock = lock.trylock(1200);boolean isLock = lock.tryLock();//4.判断是否获取锁成功if(!isLock){//获取锁失败,返回报错log.error("不允许重复下单"); //理论上不会有问题,redis已经判断过了}try {//取到代理对象proxy.createVoucherOrder(voucherOrder);}finally {//释放锁lock.unlock();}}private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());int r = result.intValue();//2.判断结果是否为0if(r!=0){//2.1.不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//2.2为0,有购买资格,把下单信息保存到阻塞队列//TODO 保存阻塞队列VoucherOrder voucherOrder = new VoucherOrder();//2.3订单Idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//2.4用户IdvoucherOrder.setUserId(userId);//2.5代金券IdvoucherOrder.setVoucherId(voucherId);//2.6放入阻塞队列orderTasks.add(voucherOrder);//3.获取代理对象 为了让后序线程可以拿到代理对象,可以放在成员变量或者是voucherOrder里面proxy =(IVoucherOrderService) AopContext.currentProxy();//4.返回订单idreturn Result.ok(orderId);}@Override@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {//5.一人一单Long userId = voucherOrder.getUserId();//5.1查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//5.2判断是否存在if (count > 0) {//用户已经购买过了log.error("用户已经购买过一次"); //redis已经判断过了,这里几乎不会出错return ;}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") //set stock =stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)//where id=? and stock > 0.update();if (!success) {log.error("库存不足"); //这里也几乎不会出错return ;}//7.创建订单//此处传了voucherOrder进来,就不用重新创建订单了save(voucherOrder);}
}
太强了,这个代码.
Redis消息队列
基于List实现消息队列
基于PubSub实现消息队列
Stream消息队列
单消费模式
消费者组模式
使用XACK命令移除已经确认的消息
基于Stream消息队列实现异步秒杀
创建消息队列
XGROUP CREATE stream.orders g1 0 MKSTREAM
修改Lua脚本
新增了一个订单id和3.6的操作,使用id作为orderId的key可以直接对应实体类中的属性。
-- 1.参数列表
-- 1.1 优惠券id
local voucherId=ARGV[1]
-- 1.2 用户id
local userId=ARGV[2]
-- 1.3 订单id
local OrderId=ARGV[3]-- 2.数据key
-- 2.1库存key
local stockKey='seckill:stock:'..voucherId
-- 2.2.订单key
local orderKey='seckill:order:'..voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey))<= 0) then--获取到字符串转成数值类型-- 3.2. 库存不足 返回1return 1
end
-- 3.2 判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId)==1) then--3.3存在,说明是重复下单,返回2return 2
end-- 3.4.扣减库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5.下单(保存用户) sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 3.6 发送消息到队列中,XADD stream.orders * k1 v1 k2 v2
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',OrderId)
return 0
改造秒杀业务逻辑
private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();//获取订单IDlong orderId = redisIdWorker.nextId("order");//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(),String.valueOf(orderId));int r = result.intValue();//2.判断结果是否为0if(r!=0){//2.1.不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//3.获取代理对象 为了让后序线程可以拿到代理对象,可以放在成员变量或者是voucherOrder里面proxy =(IVoucherOrderService) AopContext.currentProxy();//4.返回订单idreturn Result.ok(orderId);}
开启线程任务获取消息队列的消息
@PostConstruct //当前类初始化完毕时就执行private void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{String queueName="stream.orders";@Overridepublic void run() {while(true){try {//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//2.判断消息获取是否成功if(list==null|| list.isEmpty()) {//2.1如果获取失败,说明没有消息,继续下一次循环continue;}//3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//4.如果获取成功,可以下单handleVoucherOrder(voucherOrder);//5.ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendingList();}}}private void handlePendingList() {while(true){try {//1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.from("0")));//2.判断消息获取是否成功if(list==null|| list.isEmpty()) {//如果获取失败,说明pending-list没有异常消息,结束循环break;}//3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//4.如果获取成功,可以下单handleVoucherOrder(voucherOrder);//5.ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);try {Thread.sleep(50);} catch (InterruptedException ex) {throw new RuntimeException(ex);}//休眠一会儿后进入下一次循环}}}}
秒杀业务最终代码
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static{SECKILL_SCRIPT=new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private BlockingQueue<VoucherOrder>orderTasks=new ArrayBlockingQueue<>(1024*1024);private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();@PostConstruct //当前类初始化完毕时就执行private void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{String queueName="stream.orders";@Overridepublic void run() {while(true){try {//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//2.判断消息获取是否成功if(list==null|| list.isEmpty()) {//2.1如果获取失败,说明没有消息,继续下一次循环continue;}//3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//4.如果获取成功,可以下单handleVoucherOrder(voucherOrder);//5.ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendingList();}}}private void handlePendingList() {while(true){try {//1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.from("0")));//2.判断消息获取是否成功if(list==null|| list.isEmpty()) {//如果获取失败,说明pending-list没有异常消息,结束循环break;}//3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//4.如果获取成功,可以下单handleVoucherOrder(voucherOrder);//5.ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);try {Thread.sleep(50);} catch (InterruptedException ex) {throw new RuntimeException(ex);}//休眠一会儿后进入下一次循环}}}}//阻塞队列的写法/*private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();@PostConstruct //当前类初始化完毕时就执行private void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while(true){try {//1.获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();//2.创建订单handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.error("处理订单异常",e);}}}}*///处理订单private void handleVoucherOrder(VoucherOrder voucherOrder) {//1. 获取用户idLong userId = voucherOrder.getUserId();//2.创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//3.获取锁
// boolean isLock = lock.trylock(1200);boolean isLock = lock.tryLock();//4.判断是否获取锁成功if(!isLock){//获取锁失败,返回报错log.error("不允许重复下单"); //理论上不会有问题,redis已经判断过了}try {//取到代理对象proxy.createVoucherOrder(voucherOrder);}finally {//释放锁lock.unlock();}}private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();//获取订单IDlong orderId = redisIdWorker.nextId("order");//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(),String.valueOf(orderId));int r = result.intValue();//2.判断结果是否为0if(r!=0){//2.1.不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//3.获取代理对象 为了让后序线程可以拿到代理对象,可以放在成员变量或者是voucherOrder里面proxy =(IVoucherOrderService) AopContext.currentProxy();//4.返回订单idreturn Result.ok(orderId);}//阻塞队列的写法/*@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());int r = result.intValue();//2.判断结果是否为0if(r!=0){//2.1.不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//2.2为0,有购买资格,把下单信息保存到阻塞队列// 保存阻塞队列VoucherOrder voucherOrder = new VoucherOrder();//2.3订单Idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//2.4用户IdvoucherOrder.setUserId(userId);//2.5代金券IdvoucherOrder.setVoucherId(voucherId);//2.6放入阻塞队列orderTasks.add(voucherOrder);//3.获取代理对象 为了让后序线程可以拿到代理对象,可以放在成员变量或者是voucherOrder里面proxy =(IVoucherOrderService) AopContext.currentProxy();//4.返回订单idreturn Result.ok(orderId);}*///不使用异步的写法??/*@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀尚未开始");}//3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀已经结束");}//4.判断库存是否充足if (voucher.getStock()<1) {//库存不足return Result.fail("库存不足");}//5.一人一单Long userId = UserHolder.getUser().getId();//创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁
// boolean isLock = lock.trylock(1200);boolean isLock = lock.tryLock();//判断是否获取锁成功if(!isLock){//获取锁失败,返回报错return Result.fail("不允许重复下单");}try {//取到了当前代理对象IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally {//释放锁lock.unlock();}}*/@Override@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {//5.一人一单Long userId = voucherOrder.getUserId();//5.1查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//5.2判断是否存在if (count > 0) {//用户已经购买过了log.error("用户已经购买过一次"); //redis已经判断过了,这里几乎不会出错return ;}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") //set stock =stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)//where id=? and stock > 0.update();if (!success) {log.error("库存不足"); //这里也几乎不会出错return ;}//7.创建订单//此处传了voucherOrder进来,就不用重新创建订单了save(voucherOrder);}
}