黑马Redis(三)黑马点评项目

优惠卷秒杀

一、全局唯一ID

基于Redis实现全局唯一ID的策略:

@Component
@RequiredArgsConstructor
public class RedisIdWorker {private static final Long BEGIN_TIMESTAMP=1713916800L;private static final int COUNT_BITS = 32;@Resourceprivate final StringRedisTemplate stringRedisTemplate;public long nextId(String keyPrefix){//1.生成时间戳LocalDateTime now =LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;//2.生成序列号//2.1. 获取当天的日期String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);//3.拼接并返回  时间戳 左移32位  随后与  count 或运算 实现拼接return timestamp<<COUNT_BITS | count;}}

二、实现优惠卷秒杀下单

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Override@Transactionalpublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 --  已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if(!success){//扣减失败return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2.用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3.代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单idreturn Result.ok(voucherOrder);}
}

超卖问题:

解决问题--加锁:

 乐观锁:

实现--版本号法:

实现--CAS法:

使用对应数据代替版本号进行查询

 业务修改:

乐观锁的判断只针对库存是否>0,如果库存发现已经=0,则终止

        boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();

 

三、实现一人一单

 //5.一人一单Long userId = UserHolder.getUser().getId();//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){//5.2.判断是否存在//说明已经下过单了return Result.fail("该用户已经购买过一次!");}

还是有多单成功 

解决办法--加锁:

版本1(优缺点):

    @Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 --  已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}return  creatVoucherOrder(voucherId);}@Transactionalpublic  Result creatVoucherOrder(Long voucherId){//5.一人一单Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()){//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){//5.2.判断是否存在//说明已经下过单了return Result.fail("该用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();if(!success){//扣减失败return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2.用户idvoucherOrder.setUserId(userId);//7.3.代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(voucherOrder);}}

改进版:

    @Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 --  已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()) {return creatVoucherOrder(voucherId);}}@Transactionalpublic  Result creatVoucherOrder(Long voucherId){//5.一人一单Long userId = UserHolder.getUser().getId();//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){//5.2.判断是否存在//说明已经下过单了return Result.fail("该用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();if(!success){//扣减失败return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2.用户idvoucherOrder.setUserId(userId);//7.3.代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(voucherOrder);}

不能将锁加在方法上:会造成串行操作,多个用户不能并行调用该方法

因此将锁加载用户id上面,根据用户id不同的特点来实现多个用户并行

注意1:

用户id调用ToString方法时,底层代码仍然是通过new 实现,因此即便同一个用户id仍然会有不同的toString值,因此调用 intern( )  方法,通过往字符串池中寻找是否存在对应的字符串,避免new导致的不同。

注意2:

两个版本的锁位置不一样,前者的锁会出现以下并发安全问题:当锁中内容执行完毕释放锁之后,事务可能还没有提交,此时具有相同id的线程可能会重新调用方法,导致问题进而使得事务失败回滚

因此锁在进阶版中加入到了调用这个方法的部分(既锁住了整个函数,又没有影响函数被其他线程调用)

新问题:

可以看到进阶版代码中虽然通过悲观锁预防了并发安全问题,但是也引出了另一个问题 ,在进阶代码中 createVoucherOrder 方法的@Transactional 注释并不会生效:

原因: 代码中的 return creatVoucherOrder(voucherId);

      等价于  return   this.creatVoucherOrder(voucherId);  即 调用的是整个Service实现类的方法(方法属性),而不是代理对象(方法本身),spring实现事务是通过对这个方法进行动态代理,用代理对象去实现事务处理,因此如果通过service实现类调用方法无法实现事务功能。

最终 版本:

 引入依赖:

        <!--aspectj--><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>

配置启动项注释--开启暴露代理对象:

@EnableAspectJAutoProxy(exposeProxy = true) //设置暴露代理对象 -- true
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
public class HmDianPingApplication {public static void main(String[] args) {SpringApplication.run(HmDianPingApplication.class, args);}}
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 --  已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()) {//需要拿到当前对象的代理对象//spring就能通过代理对象来进行事务管理IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();return proxy.reatVoucherOrder(voucherId);}}@Transactionalpublic  Result creatVoucherOrder(Long voucherId){//5.一人一单Long userId = UserHolder.getUser().getId();//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){//5.2.判断是否存在//说明已经下过单了return Result.fail("该用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();if(!success){//扣减失败return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2.用户idvoucherOrder.setUserId(userId);//7.3.代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(voucherOrder);}
}

 一人一单的并发安全问题:

同一个用户发送两个请求,在并发的两个服务中都会接收请求,不会锁住(锁只会在一个虚拟环境中生效)

 四、分布式锁实现一人一单

分布式锁:

分布式锁的实现:

 基于Redis的分布式锁:

 案例--基于Redis实现分布式锁初级版本:

package com.hmdp.utils;public interface ILock {/*** 尝试获取锁* @param* @return*/boolean tryLock(Long timeoutSec);/*** 释放锁* */void unlock();
}
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX= "lock:";public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate){this.stringRedisTemplate=stringRedisTemplate;this.name=name;}@Overridepublic boolean tryLock(Long timeoutSec) {//获取线程标识long threadId = Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success); //预防空指针错误}@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX+name);}
}
    @Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 --  已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象SimpleRedisLock lock= new SimpleRedisLock("order:"+userId,stringRedisTemplate);//获取锁boolean isLock = lock.tryLock(1200L);//判断是否获取锁成功if (!isLock){//获取失败,返回错误或者重试return Result.fail("不允许重复下单!");}try {//需要拿到当前对象的代理对象//spring就能通过代理对象来进行事务管理IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();return proxy.creatVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}

潜在问题:

解决办法--增加释放锁的标识: 

案例--改进的Redis分布式锁:

public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX= "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate){this.stringRedisTemplate=stringRedisTemplate;this.name=name;}@Overridepublic boolean tryLock(Long timeoutSec) {//获取线程标识String threadId =ID_PREFIX + Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success); //预防空指针错误}@Overridepublic void unlock() {//获取线程标识String threadId=ID_PREFIX+Thread.currentThread().getId();//获取锁的中标识String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);//判断是否一致if (id.equals(threadId)){//一致--释放锁stringRedisTemplate.delete(KEY_PREFIX+name);}}
}

潜在问题:

 原因: 判断锁标识 和 释放锁 不具有原子性  是两个操作

解决办法---Lua脚本:

Lua 教程 | 菜鸟教程

 执行脚本:

带参数脚本:

Lua语言数组的下标从1开始 

 基于Lua脚本修改释放锁业务:

编写Lua脚本:


-- 获取锁中的线程标识 get key
local id = redis.call('get',KEYS[1])
-- 比较线程标识与锁中的标识是否一致
if(redis.call('get',KEYS[1]) == ARGV[1]) then-- 释放锁 del keyreturn redis.call('del', KEYS[1])
end
return 0

使用Java执行Lua脚本:

    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic void unlock(){//调用Lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX+Thread.currentThread().getId());}

潜在问题: 

解决办法---Redisson

五、Redisson

Redisson | Valkey & Redis Java client. Ultimate Real-Time Data Platform

GitHub - redisson/redisson: Redisson - Valkey and Redis Java client. Real-Time Data Platform. Sync/Async/RxJava/Reactive API. Over 50 Valkey and Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter, Spring, Tomcat, Scheduler, JCache API, Hibernate, RPC, local cache..

Redisson入门:

        <!--Redisson--><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>


@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){//配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.50.129:6379").setPassword("123");//创建RedissonClient对象return Redisson.create(config);}
}
    @Resourceprivate RedissonClient redissonClient;

Redisson可重入锁原理:

可重入:一个线程里面允许多次获取锁 

流程对应的Lua脚本:

获取锁

 释放锁:

 Redisson分布式锁原理:

主从一致性问题:

解决办法: 

其他线程需要在所有的redis节点中都获取到锁才能进行

 六、分布式锁总结

 七、Redis优化秒杀

随机生成1000个token代表不同用户,模拟秒杀过程可以看到性能很差

    @Testvoid testGeneratetokens() throws IOException {String name = "悟空";UserDTO userDTO = new UserDTO();String token = null;BufferedWriter bw=new BufferedWriter(new FileWriter("token.txt"));for(int i=0;i<1000;i++){//随机生成一个tokentoken = UUID.randomUUID().toString(true);bw.write(token);bw.newLine();User user = userService.getById(1 + i);userDTO = BeanUtil.copyProperties(user, UserDTO.class);Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),CopyOptions.create() //允许对Key和Value自定义.setIgnoreNullValue(true)  //忽略一些空值.setFieldValueEditor((fieldName, fieldValue) -> fieldValue == null ? "" : fieldValue.toString()));stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY+token,userMap);//设置有效期stringRedisTemplate.expire(LOGIN_USER_KEY+token,LOGIN_USER_TTL,TimeUnit.MINUTES);}bw.close();}

执行流程(串行): 

改造目标(拆分业务,并行): 

流程:

案例--改进秒杀业务,提高并发性能:

--1.参数列表
--1.1 优惠卷id
local voucherId = ARGV[1]
--1.2 用户id
local userId = ARGV[2]--2.数据key
--2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId  --lua中 .. 表示拼接
--2.2 订单key = 'seckill:order:' .. voucherId--3. 业务脚本
--3.1 判断库存是否重组 get stockKey
if (tonumber(redis.call('get',stockKey)) <= 0) then--3.2 库存不足 返回1return 1
end--3.2 判断用户是否下单 SISMEMER orderKey userId   set数据结构的命令:判断 key名下是否存在userId这个值 返回1 说明存在
if(redis.call('sismember',orderKey,userId)==1) then--3.3 存在 说明重复下单 返回2return 2
end--3.4 扣redis中对应的库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5 下单(保存用户到Set集合中) sadd orderKey userId
redis.call('sadd',orderKey,userId)
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;//阻塞队列特点:当一个线程尝试从其中获取元素时,如果没有元素就会被阻塞,直到队列中有元素才会被唤醒并获取这个元素private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {try {//1.获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();//2.创建订单handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常",e);}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {//1.获取userIdLong userId = voucherOrder.getUserId();//2.创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//3.获取锁//boolean isLock = lock.tryLock(1200L);boolean isLock = lock.tryLock();//判断是否获取锁成功if (!isLock){//获取失败log.error("不允许重复下单");return ;}try {proxy.creatVoucherOrder(voucherOrder);} finally {//释放锁lock.unlock();}}private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {//拆分工作,优化秒杀//获取用户Long userId = UserHolder.getUser().getId();//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());//2.判断结果是否为0int r = result.intValue();if(r != 0){//2.1 结果不为0 代表没有资格return Result.fail(r == 1?"库存不足":"不能重复下单");}//2.2 为0 代表有购买资格,将下单信息保存到阻塞队列//TODO 保存阻塞队列VoucherOrder voucherOrder = new VoucherOrder();//2.3.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//2.4.用户idvoucherOrder.setUserId(userId);//2.5.代金卷idvoucherOrder.setVoucherId(voucherId);//获取代理对象proxy = (IVoucherOrderService)AopContext.currentProxy();//2.6.放入阻塞队列orderTasks.add(voucherOrder);//3.返回订单idreturn Result.ok(orderId);}/* @Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){//开始时间在当前时间之后--未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//结束时间在当前时间之前 --  已经结束return Result.fail("秒杀已经结束!");}//4.库存判断if (voucher.getStock() < 1){//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象//SimpleRedisLock lock= new SimpleRedisLock("order:"+userId,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁//boolean isLock = lock.tryLock(1200L);boolean isLock = lock.tryLock();//判断是否获取锁成功if (!isLock){//获取失败,返回错误或者重试return Result.fail("不允许重复下单!");}try {//需要拿到当前对象的代理对象//spring就能通过代理对象来进行事务管理IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();return proxy.creatVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}*/@Transactional@Overridepublic void creatVoucherOrder(VoucherOrder voucherOrder){//5.一人一单Long userId = voucherOrder.getId();//5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();if(count>0){//5.2.判断是否存在//说明已经下过单了log.error("用户已经购买过一次!");return;// return Result.fail("用户已经购买过一次!");}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0)//乐观锁,在进行扣减前查询数据库中数据是否发生改变.update();if(!success){//扣减失败log.error("库存不足!");return;// return Result.fail("库存不足!");}//7.创建订单save(voucherOrder);//8.返回订单id
/*            return Result.ok(voucherOrder);*/}
}

内存限制:阻塞队列存储内存可能不足

数据安全:jvm如果崩溃会导致阻塞队列中数据丢失、线程获取完数据后如果执行遭遇异常也会导致丢失数据无法找回

 消息队列:

消息队列是jvm之外的一个独立服务,

不经能存储数据,相对于阻塞队列能够保证数据安全不丢失

在传递给消费者时需要进行确认,如果没有确认会持续存在于消息队列中,重复直到传递成功为止

Redis消息队列:

 基于List结构模拟消息队列:

 基于PubSub的消息队列:

基于Stream的消息队列: 

基于Stream的消息队列---消费者组:

创建消费者组: 

从消费者组读取消息: 

消费者监听消息的基本思路:

案例-- 基于Redis的Stream结构作为消息队列,实现异步秒杀下单:

 

--1.参数列表
--1.1 优惠卷id
local voucherId = ARGV[1]
--1.2 用户id
local userId = ARGV[2]
--1.3 订单id
local orderId = ARGV[3]
--2.数据key
--2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId  --lua中 .. 表示拼接
--2.2 订单key = 'seckill:order:' .. orderId
local orderKey = 'seckill:order:' .. voucherId
--3. 业务脚本
--3.1 判断库存是否重组 get stockKey
if (tonumber(redis.call('get',stockKey)) <= 0) then--3.2 库存不足 返回1return 1
end--3.2 判断用户是否下单 SISMEMER orderKey userId   set数据结构的命令:判断 key名下是否存在userId这个值 返回1 说明存在
if(redis.call('sismember',orderKey,userId)==1) then--3.3 存在 说明重复下单 返回2return 2
end--3.4 扣redis中对应的库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5 下单(保存用户到Set集合中) sadd orderKey userId
redis.call('sadd',orderKey,userId)
--3.6 发送消息到队列中 XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0

 

 private IVoucherOrderService proxy;  
@Overridepublic Result seckillVoucher(Long voucherId) {//拆分工作,优化秒杀//获取用户 订单Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));//2.判断结果是否为0int r = result.intValue();if(r != 0){//2.1 结果不为0 代表没有资格return Result.fail(r == 1?"库存不足":"不能重复下单");}//获取代理对象proxy = (IVoucherOrderService)AopContext.currentProxy();//3.返回订单idreturn Result.ok(orderId);}

 

 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while (true){try {//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.orderList<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//2.判断消息获取是否成功if (list == null || list.isEmpty()){//2.1.失败说明没有消息,继续下一次循环continue;}//3.解析消息中的订单信息//String 是 stream发送消息带的 id   剩下两个Object是传送的对应数据的键值对MapRecord<String, Object, Object> record = list.get(0);//提取键值对  封装进对应对象中Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//4.成功则可以下单handleVoucherOrder(voucherOrder);//5.ACK下单  SACK streams.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendingList();}}}private void handlePendingList() {while (true){try {//1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));//2.判断消息获取是否成功if (list == null || list.isEmpty()){//2.1.失败  说明pending-list没有消息 结束循环break;}//3.解析消息中的订单信息//String 是 stream发送消息带的 id   剩下两个Object是传送的对应数据的键值对MapRecord<String, Object, Object> record = list.get(0);//提取键值对  封装进对应对象中Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//4.成功则可以下单handleVoucherOrder(voucherOrder);//5.ACK下单  SACK streams.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending-list异常",e);try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/78851.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flume----初步安装与配置

目录标题 **flume的简单介绍**⭐flume的**核心组件**⭐**核心特点** **安装部署**1&#xff09;**解压安装包**2&#xff09;**修改名字** **&#xff08;配置文件时&#xff0c;更方便&#xff09;****3&#xff09;⭐⭐配置文件**4&#xff09;**兼容Hadoop**5&#xff09;**…

深度整合Perforce P4+Jira+Confluence:游戏开发团队协作工具链搭建指南

现场对话 游戏开发团队最头疼的版本管理问题是什么&#xff1f; SVN宕机&#xff1f; Git仓库爆炸&#xff1f; 还是美术资源管理一团乱&#xff1f; 在4月11-12日的GGS 2025全球游戏峰会上&#xff0c;Perforce中国授权合作伙伴-龙智的销售和技术支持团队&#xff0c;与行业…

k8s基本概念-YAML

YAML介绍 YAML是“YAML Aint a Markup Language” (YAML不是一种置标语言)的递归缩进写,早先YAML的意思其实是:“Yet Another Markup Language”(另一种置标语言) YAML是一个类似XML、JSON的标记性语言。YAML强调以数据为中心,并不是以标识语言为重点。因而YAML本身的定义…

ECharts散点图-散点图20,附视频讲解与代码下载

引言&#xff1a; ECharts散点图是一种常见的数据可视化图表类型&#xff0c;它通过在二维坐标系或其它坐标系中绘制散乱的点来展示数据之间的关系。本文将详细介绍如何使用ECharts库实现一个散点图&#xff0c;包括图表效果预览、视频讲解及代码下载&#xff0c;让你轻松掌握…

Infrared Finance:Berachain 生态的流动性支柱

在加密市场中&#xff0c;用户除了参与一级和二级交易&#xff0c;还有一种低门槛参与的就是空投。从 2021 年 DeFi 成为主流开始&#xff0c;空投一直都是“以小搏大”的机会&#xff0c;通过参与项目早期的链上交互和任务以获取空投奖励&#xff0c;近几年已成为一种广受欢迎…

附1:深度解读:《金融数据安全 数据安全分级指南》——数据分类的艺术专栏系列

文章目录 一、文件背景与意义1.1 文件背景1.2 文件意义 二、文件结构与核心内容2.1 文件结构概述2.2 核心内容解析2.2.1 范围与适用对象2.2.2 数据安全定级目标与原则2.2.3 数据安全定级要素2.2.4 要素识别2.2.5 数据安全级别划分 三、定级方法与流程3.1 定级流程3.2 级别变更机…

vue mixin混入与hook

mixin混入是 ‌选项式 API‌&#xff0c;在vue3-Composition API <script setup> 中无法直接使用&#xff0c;需通过 setup() 函数转换 vue2、vue3选项式API: // mixins/mixin.js export const mixin {methods: {courseType(courseLevel) {const levelMap {1: 初级,…

Excel如何安装使用EPM插件并且汉化?

Excel如何使用EPM插件 Excel如何使用EPM插件一、安装EPM插件二、启动EPM插件三、插件汉化设置 Excel如何使用EPM插件 一、安装EPM插件 在安装EPM插件时&#xff0c;若运行安装包后出现报错提示&#xff0c;通常是因为系统缺少 Visual Studio 2010 组件&#xff0c;需先安装该…

vue3-springboot-mysql的docker部署

Docker配置原理与部署文档 概述 本文档详细说明RuoYi-Vue与BladeX集成项目的Docker部署原理&#xff0c;包括配置文件的作用、相互关系及数据流动。通过三个核心配置文件&#xff08;docker-compose.yml、Dockerfile和docker-entrypoint.sh&#xff09;&#xff0c;实现了应用…

第十二天 使用Unity Test Framework进行自动化测试 性能优化:Profiler分析、内存管理

前言 在完成游戏核心功能开发后,如何确保项目质量并成功发布到各大平台?本文将从自动化测试到商店上架,手把手教你构建完整的游戏开发闭环。使用Unity 2022 LTS版本进行演示,所有代码均经过实际项目验证。 一、自动化测试实战(Unity Test Framework) 1.1 测试框架搭建 …

【专题四】前缀和(3)

&#x1f4dd;前言说明&#xff1a; 本专栏主要记录本人的基础算法学习以及LeetCode刷题记录&#xff0c;按专题划分每题主要记录&#xff1a;&#xff08;1&#xff09;本人解法 本人屎山代码&#xff1b;&#xff08;2&#xff09;优质解法 优质代码&#xff1b;&#xff…

深度解析:TextRenderManager——Cocos Creator艺术字体渲染核心类

一、类概述 TextRenderManager 是 Cocos Creator 中实现动态艺术字体渲染的核心单例类。它通过整合资源加载、缓存管理、异步队列和自动布局等功能&#xff0c;支持普通字符模式和图集模式两种渲染方案&#xff0c;适用于游戏中的动态文本&#xff08;如聊天内容、排行榜&…

【漫话机器学习系列】229.特征缩放对梯度下降的影响(The Effect Of Feature Scaling Gradient Descent)

特征缩放对梯度下降的影响&#xff1a;为什么特征标准化如此重要&#xff1f; 在机器学习和深度学习中&#xff0c;梯度下降是最常用的优化算法之一。然而&#xff0c;很多人在训练模型时会遇到收敛速度慢、训练不稳定的问题&#xff0c;其中一个重要原因就是特征未进行适当的…

【神经网络与深度学习】批标准化(Batch Normalization)和层标准化(Layer Normalization)

引言 在深度学习中&#xff0c;标准化技术&#xff08;Normalization&#xff09;是提高神经网络训练效率和性能的重要工具。其中&#xff0c;批标准化&#xff08;Batch Normalization, BN&#xff09;和层标准化&#xff08;Layer Normalization, LN&#xff09;是两种常用的…

OpenHarmony之电源管理子系统公共事件定义

OpenHarmony之电源管理子系统公共事件定义 电源管理子系统面向应用发布如下系统公共事件&#xff0c;应用如需订阅系统公共事件&#xff0c;请参考公共事件接口文档。 COMMON_EVENT_BATTERY_CHANGED 表示电池充电状态、电平和其他信息发生变化的公共事件的动作。 值&#x…

linux 环境下 c++ 程序打印 core dump 信息

linux 信号机制 软中断信号 Signal&#xff0c;简称信号&#xff0c;用来通知进程发生了异步事件&#xff0c;进程之间可以互相通过系统调用 kill 等函数来发送软中断信号。内核也可以因为内部事件而给进程发送信号&#xff0c;通知进程发生了某个事件。 进程对信号的处理 进…

Qt开发环境的安装与问题的解决(2)

文章目录 1. Qt开发环境安装的说明2. 通过安装包进行安装3. 通过在线下载程序 解决问题下载 https....网路错误问题解决开始安装--第一部分开始安装--第二部分 4. 建议配置环境变量&#xff08;非必须&#xff09;配置环境变量的意义 简介&#xff1a;这篇文章主要分享Qt开发环…

【每日EDA行业分析】2025年4月25日

深度总结&#xff1a;EDA 软件行业现状与发展趋势 一、引言 在半导体产业的复杂生态中&#xff0c;EDA 软件宛如一颗闪耀的明珠&#xff0c;它是集成电路设计的核心工具&#xff0c;贯穿芯片从设计构思到最终封装测试的全流程&#xff0c;其重要性不言而喻&#xff0c;被誉为…

flutter实践:比例对比线图实现

需求&#xff1a;flutter实现一个左右对比线图,带有动画效果 效果图&#xff1a; Widget _buildTop() {return Container(height: themeData.heightXl,padding: EdgeInsets.symmetric(horizontal: themeData.hSpacingMd),child: Row(mainAxisAlignment: MainAxisAlignment.spa…

测试基础笔记第十五天

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、集合1.集合的定义二、使用集合列表去重 导包二、函数1.函数介绍2.定义函数3.调用函数4.函数实现登录案例5.函数的返回值 三、模块和包1.模块的概念(Module)2.模…