优惠卷秒杀
一、全局唯一ID
基于Redis实现全局唯一ID的策略:
@Component
@RequiredArgsConstructor
public class RedisIdWorker {private static final Long BEGIN_TIMESTAMP=1713916800L;private static final int COUNT_BITS = 32;@Resourceprivate final StringRedisTemplate stringRedisTemplate;public long nextId(String keyPrefix){//1.生成时间戳LocalDateTime now =LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;//2.生成序列号//2.1. 获取当天的日期String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);//3.拼接并返回 时间戳 左移32位 随后与 count 或运算 实现拼接return timestamp<<COUNT_BITS | count;}}
二、实现优惠卷秒杀下单
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Override@Transactionalpublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 -- 已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if(!success){//扣减失败return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2.用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3.代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单idreturn Result.ok(voucherOrder);}
}
超卖问题:
解决问题--加锁:
乐观锁:
实现--版本号法:
实现--CAS法:
使用对应数据代替版本号进行查询
业务修改:
乐观锁的判断只针对库存是否>0,如果库存发现已经=0,则终止
boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();
三、实现一人一单
//5.一人一单Long userId = UserHolder.getUser().getId();//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){//5.2.判断是否存在//说明已经下过单了return Result.fail("该用户已经购买过一次!");}
还是有多单成功
解决办法--加锁:
版本1(优缺点):
@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 -- 已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}return creatVoucherOrder(voucherId);}@Transactionalpublic Result creatVoucherOrder(Long voucherId){//5.一人一单Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()){//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){//5.2.判断是否存在//说明已经下过单了return Result.fail("该用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();if(!success){//扣减失败return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2.用户idvoucherOrder.setUserId(userId);//7.3.代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(voucherOrder);}}
改进版:
@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 -- 已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()) {return creatVoucherOrder(voucherId);}}@Transactionalpublic Result creatVoucherOrder(Long voucherId){//5.一人一单Long userId = UserHolder.getUser().getId();//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){//5.2.判断是否存在//说明已经下过单了return Result.fail("该用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();if(!success){//扣减失败return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2.用户idvoucherOrder.setUserId(userId);//7.3.代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(voucherOrder);}
不能将锁加在方法上:会造成串行操作,多个用户不能并行调用该方法
因此将锁加载用户id上面,根据用户id不同的特点来实现多个用户并行
注意1:
用户id调用ToString方法时,底层代码仍然是通过new 实现,因此即便同一个用户id仍然会有不同的toString值,因此调用 intern( ) 方法,通过往字符串池中寻找是否存在对应的字符串,避免new导致的不同。
注意2:
两个版本的锁位置不一样,前者的锁会出现以下并发安全问题:当锁中内容执行完毕释放锁之后,事务可能还没有提交,此时具有相同id的线程可能会重新调用方法,导致问题进而使得事务失败回滚
因此锁在进阶版中加入到了调用这个方法的部分(既锁住了整个函数,又没有影响函数被其他线程调用)
新问题:
可以看到进阶版代码中虽然通过悲观锁预防了并发安全问题,但是也引出了另一个问题 ,在进阶代码中 createVoucherOrder 方法的@Transactional 注释并不会生效:
原因: 代码中的 return creatVoucherOrder(voucherId);
等价于 return this.creatVoucherOrder(voucherId); 即 调用的是整个Service实现类的方法(方法属性),而不是代理对象(方法本身),spring实现事务是通过对这个方法进行动态代理,用代理对象去实现事务处理,因此如果通过service实现类调用方法无法实现事务功能。
最终 版本:
引入依赖:
<!--aspectj--><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>
配置启动项注释--开启暴露代理对象:
@EnableAspectJAutoProxy(exposeProxy = true) //设置暴露代理对象 -- true
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
public class HmDianPingApplication {public static void main(String[] args) {SpringApplication.run(HmDianPingApplication.class, args);}}
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 -- 已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()) {//需要拿到当前对象的代理对象//spring就能通过代理对象来进行事务管理IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();return proxy.reatVoucherOrder(voucherId);}}@Transactionalpublic Result creatVoucherOrder(Long voucherId){//5.一人一单Long userId = UserHolder.getUser().getId();//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){//5.2.判断是否存在//说明已经下过单了return Result.fail("该用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();if(!success){//扣减失败return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2.用户idvoucherOrder.setUserId(userId);//7.3.代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(voucherOrder);}
}
一人一单的并发安全问题:
同一个用户发送两个请求,在并发的两个服务中都会接收请求,不会锁住(锁只会在一个虚拟环境中生效)
四、分布式锁实现一人一单
分布式锁:
分布式锁的实现:
基于Redis的分布式锁:
案例--基于Redis实现分布式锁初级版本:
package com.hmdp.utils;public interface ILock {/*** 尝试获取锁* @param* @return*/boolean tryLock(Long timeoutSec);/*** 释放锁* */void unlock();
}
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX= "lock:";public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate){this.stringRedisTemplate=stringRedisTemplate;this.name=name;}@Overridepublic boolean tryLock(Long timeoutSec) {//获取线程标识long threadId = Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success); //预防空指针错误}@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX+name);}
}
@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 -- 已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象SimpleRedisLock lock= new SimpleRedisLock("order:"+userId,stringRedisTemplate);//获取锁boolean isLock = lock.tryLock(1200L);//判断是否获取锁成功if (!isLock){//获取失败,返回错误或者重试return Result.fail("不允许重复下单!");}try {//需要拿到当前对象的代理对象//spring就能通过代理对象来进行事务管理IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();return proxy.creatVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}
潜在问题:
解决办法--增加释放锁的标识:
案例--改进的Redis分布式锁:
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX= "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate){this.stringRedisTemplate=stringRedisTemplate;this.name=name;}@Overridepublic boolean tryLock(Long timeoutSec) {//获取线程标识String threadId =ID_PREFIX + Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success); //预防空指针错误}@Overridepublic void unlock() {//获取线程标识String threadId=ID_PREFIX+Thread.currentThread().getId();//获取锁的中标识String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);//判断是否一致if (id.equals(threadId)){//一致--释放锁stringRedisTemplate.delete(KEY_PREFIX+name);}}
}
潜在问题:
原因: 判断锁标识 和 释放锁 不具有原子性 是两个操作
解决办法---Lua脚本:
Lua 教程 | 菜鸟教程
执行脚本:
带参数脚本:
Lua语言数组的下标从1开始
基于Lua脚本修改释放锁业务:
编写Lua脚本:
-- 获取锁中的线程标识 get key
local id = redis.call('get',KEYS[1])
-- 比较线程标识与锁中的标识是否一致
if(redis.call('get',KEYS[1]) == ARGV[1]) then-- 释放锁 del keyreturn redis.call('del', KEYS[1])
end
return 0
使用Java执行Lua脚本:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic void unlock(){//调用Lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX+Thread.currentThread().getId());}
潜在问题:
解决办法---Redisson
五、Redisson
Redisson | Valkey & Redis Java client. Ultimate Real-Time Data Platform
GitHub - redisson/redisson: Redisson - Valkey and Redis Java client. Real-Time Data Platform. Sync/Async/RxJava/Reactive API. Over 50 Valkey and Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter, Spring, Tomcat, Scheduler, JCache API, Hibernate, RPC, local cache..
Redisson入门:
<!--Redisson--><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){//配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.50.129:6379").setPassword("123");//创建RedissonClient对象return Redisson.create(config);}
}
@Resourceprivate RedissonClient redissonClient;
Redisson可重入锁原理:
可重入:一个线程里面允许多次获取锁
流程对应的Lua脚本:
获取锁
释放锁:
Redisson分布式锁原理:
主从一致性问题:
解决办法:
其他线程需要在所有的redis节点中都获取到锁才能进行
六、分布式锁总结
七、Redis优化秒杀
随机生成1000个token代表不同用户,模拟秒杀过程可以看到性能很差
@Testvoid testGeneratetokens() throws IOException {String name = "悟空";UserDTO userDTO = new UserDTO();String token = null;BufferedWriter bw=new BufferedWriter(new FileWriter("token.txt"));for(int i=0;i<1000;i++){//随机生成一个tokentoken = UUID.randomUUID().toString(true);bw.write(token);bw.newLine();User user = userService.getById(1 + i);userDTO = BeanUtil.copyProperties(user, UserDTO.class);Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),CopyOptions.create() //允许对Key和Value自定义.setIgnoreNullValue(true) //忽略一些空值.setFieldValueEditor((fieldName, fieldValue) -> fieldValue == null ? "" : fieldValue.toString()));stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY+token,userMap);//设置有效期stringRedisTemplate.expire(LOGIN_USER_KEY+token,LOGIN_USER_TTL,TimeUnit.MINUTES);}bw.close();}
执行流程(串行):
改造目标(拆分业务,并行):
流程:
案例--改进秒杀业务,提高并发性能:
--1.参数列表
--1.1 优惠卷id
local voucherId = ARGV[1]
--1.2 用户id
local userId = ARGV[2]--2.数据key
--2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId --lua中 .. 表示拼接
--2.2 订单key = 'seckill:order:' .. voucherId--3. 业务脚本
--3.1 判断库存是否重组 get stockKey
if (tonumber(redis.call('get',stockKey)) <= 0) then--3.2 库存不足 返回1return 1
end--3.2 判断用户是否下单 SISMEMER orderKey userId set数据结构的命令:判断 key名下是否存在userId这个值 返回1 说明存在
if(redis.call('sismember',orderKey,userId)==1) then--3.3 存在 说明重复下单 返回2return 2
end--3.4 扣redis中对应的库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5 下单(保存用户到Set集合中) sadd orderKey userId
redis.call('sadd',orderKey,userId)
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;//阻塞队列特点:当一个线程尝试从其中获取元素时,如果没有元素就会被阻塞,直到队列中有元素才会被唤醒并获取这个元素private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {try {//1.获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();//2.创建订单handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常",e);}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {//1.获取userIdLong userId = voucherOrder.getUserId();//2.创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//3.获取锁//boolean isLock = lock.tryLock(1200L);boolean isLock = lock.tryLock();//判断是否获取锁成功if (!isLock){//获取失败log.error("不允许重复下单");return ;}try {proxy.creatVoucherOrder(voucherOrder);} finally {//释放锁lock.unlock();}}private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {//拆分工作,优化秒杀//获取用户Long userId = UserHolder.getUser().getId();//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());//2.判断结果是否为0int r = result.intValue();if(r != 0){//2.1 结果不为0 代表没有资格return Result.fail(r == 1?"库存不足":"不能重复下单");}//2.2 为0 代表有购买资格,将下单信息保存到阻塞队列//TODO 保存阻塞队列VoucherOrder voucherOrder = new VoucherOrder();//2.3.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//2.4.用户idvoucherOrder.setUserId(userId);//2.5.代金卷idvoucherOrder.setVoucherId(voucherId);//获取代理对象proxy = (IVoucherOrderService)AopContext.currentProxy();//2.6.放入阻塞队列orderTasks.add(voucherOrder);//3.返回订单idreturn Result.ok(orderId);}/* @Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 -- 已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象//SimpleRedisLock lock= new SimpleRedisLock("order:"+userId,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁//boolean isLock = lock.tryLock(1200L);boolean isLock = lock.tryLock();//判断是否获取锁成功if (!isLock){//获取失败,返回错误或者重试return Result.fail("不允许重复下单!");}try {//需要拿到当前对象的代理对象//spring就能通过代理对象来进行事务管理IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();return proxy.creatVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}*/@Transactional@Overridepublic void creatVoucherOrder(VoucherOrder voucherOrder){//5.一人一单Long userId = voucherOrder.getId();//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();if(count>0){//5.2.判断是否存在//说明已经下过单了log.error("用户已经购买过一次!");return;// return Result.fail("用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();if(!success){//扣减失败log.error("库存不足!");return;// return Result.fail("库存不足!");}//7.创建订单save(voucherOrder);//8.返回订单id
/* return Result.ok(voucherOrder);*/}
}
内存限制:阻塞队列存储内存可能不足
数据安全:jvm如果崩溃会导致阻塞队列中数据丢失、线程获取完数据后如果执行遭遇异常也会导致丢失数据无法找回
消息队列:
消息队列是jvm之外的一个独立服务,
不经能存储数据,相对于阻塞队列能够保证数据安全不丢失
在传递给消费者时需要进行确认,如果没有确认会持续存在于消息队列中,重复直到传递成功为止
Redis消息队列:
基于List结构模拟消息队列:
基于PubSub的消息队列:
基于Stream的消息队列:
基于Stream的消息队列---消费者组:
创建消费者组:
从消费者组读取消息:
消费者监听消息的基本思路:
案例-- 基于Redis的Stream结构作为消息队列,实现异步秒杀下单:
--1.参数列表
--1.1 优惠卷id
local voucherId = ARGV[1]
--1.2 用户id
local userId = ARGV[2]
--1.3 订单id
local orderId = ARGV[3]
--2.数据key
--2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId --lua中 .. 表示拼接
--2.2 订单key = 'seckill:order:' .. orderId
local orderKey = 'seckill:order:' .. voucherId
--3. 业务脚本
--3.1 判断库存是否重组 get stockKey
if (tonumber(redis.call('get',stockKey)) <= 0) then--3.2 库存不足 返回1return 1
end--3.2 判断用户是否下单 SISMEMER orderKey userId set数据结构的命令:判断 key名下是否存在userId这个值 返回1 说明存在
if(redis.call('sismember',orderKey,userId)==1) then--3.3 存在 说明重复下单 返回2return 2
end--3.4 扣redis中对应的库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5 下单(保存用户到Set集合中) sadd orderKey userId
redis.call('sadd',orderKey,userId)
--3.6 发送消息到队列中 XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
private IVoucherOrderService proxy;
@Overridepublic Result seckillVoucher(Long voucherId) {//拆分工作,优化秒杀//获取用户 订单Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));//2.判断结果是否为0int r = result.intValue();if(r != 0){//2.1 结果不为0 代表没有资格return Result.fail(r == 1?"库存不足":"不能重复下单");}//获取代理对象proxy = (IVoucherOrderService)AopContext.currentProxy();//3.返回订单idreturn Result.ok(orderId);}
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while (true){try {//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.orderList<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//2.判断消息获取是否成功if (list == null || list.isEmpty()){//2.1.失败说明没有消息,继续下一次循环continue;}//3.解析消息中的订单信息//String 是 stream发送消息带的 id 剩下两个Object是传送的对应数据的键值对MapRecord<String, Object, Object> record = list.get(0);//提取键值对 封装进对应对象中Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//4.成功则可以下单handleVoucherOrder(voucherOrder);//5.ACK下单 SACK streams.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendingList();}}}private void handlePendingList() {while (true){try {//1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));//2.判断消息获取是否成功if (list == null || list.isEmpty()){//2.1.失败 说明pending-list没有消息 结束循环break;}//3.解析消息中的订单信息//String 是 stream发送消息带的 id 剩下两个Object是传送的对应数据的键值对MapRecord<String, Object, Object> record = list.get(0);//提取键值对 封装进对应对象中Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//4.成功则可以下单handleVoucherOrder(voucherOrder);//5.ACK下单 SACK streams.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending-list异常",e);try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}}}