秒杀优化(异步秒杀,基于redis-stream实现消息队列)

目录

  • 秒杀优化
    • 一:异步秒杀
      • 1:思路
      • 2:实现
    • 二:redis实现消息队列
      • 1:什么是消息队列
      • 2:基于list结构实现消息队列
      • 3:基于pubsub实现消息队列
      • 4:基于stream实现消息队列
      • 5:stream的消费者组模式
    • 三:基于redis的stream结构实现消息队列

秒杀优化

一:异步秒杀

1:思路

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

原本我们每一个请求都是串行执行,从头到尾执行完了才算一个请求处理成功,这样过于耗时,我们看到执行的操作中查询优惠券,查询订单,减库存,创建订单都是数据库操作,而数据库的性能又不是很好,我们可以将服务拆分成两部分,将判断优惠券信息和校验一人一单的操作提取出来,先执行判断优惠券和校验操作,然后直接返回订单id,我们在陆续操作数据库减库存和创建订单,这样前端响应的会非常快,并且我们可以将优惠券和一人一单的操作放在redis中去执行,这样又能提高性能,然后我们将优惠券信息,用户信息,订单信息,先保存在队列里,先返回给前端数据,在慢慢的根据队列的信息去存入数据

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

我们之前说将查询和校验功能放在redis中实现,那么用什么结构呢,查询订单很简单,只要查询相应的优惠券的库存是否大于0就行,我们就可以是否字符串结构,key存优惠券信息,value存库存;那么校验呢,因为是一人一单,所以我们可以使用set,这样就能保证用户的唯一性;

我们执行的具体步骤是:先判断库存是否充足,不充足直接返回,充足判断是否有资格购买,没有返回,有就可以减库存,然后将用户加入集合中,在返回,因为我们执行这些操作时要保证命令的原子性,所以这些操作我们都使用lua脚本来编写;

具体的执行流程就是,先执行lua脚本,如果结果不是0那么直接返回,如果不是0,那么就将信息存入阻塞队列然后返回订单id;

2:实现

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

1:新增时添加到redis

stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());

2:lua脚本编写:

local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
return 0

然后就能改变之前的代码,在redis中实现异步下单:

@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {Long id = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA, Collections.emptyList(), voucherId.toString(), id.toString());if (res!=0){return Result.fail(res==1?"库存不足":"一人只能购买一单");}long orderID = redisIDWork.nextId("order");return Result.ok(orderID);}

初始化lua脚本文件

@Resource
private RedissonClient redissonClient2;
public static final DefaultRedisScript SECKIL_ORDER_LUA;
static {//初始化SECKIL_ORDER_LUA=new DefaultRedisScript<>();//定位到lua脚本的位置SECKIL_ORDER_LUA.setLocation(new ClassPathResource("seckill.lua"));//设置lua脚本的返回值SECKIL_ORDER_LUA.setResultType(Long.class);
}

还剩一个阻塞队列没有实现:

阻塞队列的功能就是异步的将订单信息存入数据库;

阻塞队列可以使用blockdeque

BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<VoucherOrder>(1024*1024);

在类上直接初始化

然后使用的时候就是,将订单添加到阻塞队列,让另一个线程去执行,往数据库中添加阻塞队列中的订单信息:

blockingQueue.add(voucherOrder);

然后就要开出一个线程,然后执行往数据库添加元素的任务了:

 //创建一个线程private ExecutorService SECKILL_ORDER_EXECUTOR=Executors.newSingleThreadExecutor();//注解PostConstruct,添加这个注解的方法就是在类初始化完成之后就会执行;@PostConstructprivate void init(){//提交任务SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle());}//定义一个任务内部类,实现Runnable,然后需要实现run方法,run方法中就是我们的任务private class VoucherOrderHandle implements Runnable {@Overridepublic void run() {try {//从阻塞队列中取出订单VoucherOrder voucherOrder = blockingQueue.take();//执行方法handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.info("下单业务异常",e);}}}

当类加载是就会一直提交任务,只要阻塞队列里有订单,就会将订单取出然后调用方法将订单存入数据库

调用的方法是尝试获取锁的方法,而获取锁其实并不需要,因为我们自己开出来的线程只有一个是单线程,而且在lua脚本中已经对一人一单还有超卖问题进行处理,这里只是为了更加保险

 @Transactionalpublic void handleVoucherOrder(VoucherOrder voucherOrder) throws InterruptedException {
//        SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order" + id, stringRedisTemplate);Long userId = voucherOrder.getUserId();RLock simpleRedisLock = redissonClient2.getLock("lock:order" + userId);boolean trylock = simpleRedisLock.tryLock(1L, TimeUnit.SECONDS);if (!trylock){log.info("获取锁失败");}try {orderService.createVoucherOrder(voucherOrder);} catch (IllegalStateException e) {throw new RuntimeException(e);}finally {simpleRedisLock.unlock();}}

然后获取锁成功后就会调用方法执行数据库操作,但是这个方法是带有事务的,我们单独开出来的子线程无法使事务生效,只能在方法的外部声明一个代理对象,然后通过代理对象去调用方法使事务生效;

 @Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Integer count = query().eq("user_id", voucherOrder.getUserId()).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.info("一个用户只能下一单");}//进行更新,库存减一boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();// where id = ? and stock > 0//扣减失败,返回错误信息;if (!success) {log.info("扣减失败");}save(voucherOrder);}

因为我们是开出来的子线程调用的方法,所以不能从线程中获取值,只能从我们传入的订单对象获取,然后就是减库存和存入订单的操作了;

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

总结:

我们使用异步操作,将下单和存入订单分开来执行,大大提高了执行的销量,在redis中完成超卖和一人一单的问题;

然后使用阻塞队列,开出一个子线程异步存入数据库下单;

问题:

我们的阻塞队列是在jvm中的,jvm中内存是有上线的,超过上限就会有异常,还有就是我们的数据都是存放在内存中,要是出现了一些事故会导致数据丢失

二:redis实现消息队列

1:什么是消息队列

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

消息队列由三个角色构成:

1:生产者:发送消息到消息队列

2:消息队列:存储和管理消息队列,也被称为消息代理

3:消费者:从消息队列中获取消息并处理

好的消息队列有这几个特点:

1:有独立的服务,独立的内存;

2:可以做到数据的持久化

3:能够发送消息给消费者并且确保消息处理完成

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

2:基于list结构实现消息队列

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

使用brpop可以实现阻塞获取

3:基于pubsub实现消息队列

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

4:基于stream实现消息队列

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

stream发送消息的方式xadd key * msg

key是指消息队列的名称,* 是发送消息的名称由redis来生成,后面的msg就是键值对,我们要发宋的消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

xread是读取消息的命令:count指定读取消息的数量,block指定阻塞时间,不指定就是不阻塞,指定0就是无限等待,sreams 是消息队列的名称,可以是多个,id是消息的id,0是从0开始读,$是从最新的开始读

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

但是有个问题就是,指定$是获取最新的消息,但是只是获取使用这个命令之后最新的消息,而如果一次性发多条,只会获取最后一个,就会出现漏消息;

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

5:stream的消费者组模式

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

消费者组就是将消费者划分到一个组中监听一个消息队列:

有这些好处:

1:消息分流:消息发送到消费者组中,消费者会处于竞争关系,会争夺消息来处理,这个发送多个消息就会实现分流,就会由不同的消费者来处理,加快了处理速度;

2:消息标识:在读取消息后会记录最后一个被处理的消息,这样就不会出现消息漏读的情况;

3:消息确认:消息发出去会,消息会处于pending状态,会等待消息处理完毕,这个时候会将消息存入pendinglist中,当处理完后才会从pending中移除;确保了消息的安全性,保证消息不会丢失,就算再消息发出去后,服务宕机了,也能知道该消息没有被处理,这个功能的作用就是确保消息至少被消费一次;

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

三:基于redis的stream结构实现消息队列

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

首先再redis客户端中输入命令创建一个队列和接受这个队列消息的组

然后修改秒杀下单的lua脚本,直接在redis中通过消息队列将消息发送给消费者:

local orderId=ARGV[3]
local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
--将消息发送给stream.orders队列
redis.call('xadd','stream.orders','*','userId',userId,'id',orderId,'voucherId',ARGV[1])
return 0

这里发送的是优惠券id,用户id还有订单id,正是我们存入数据库中所需要的参数

然后就可以去修改前面秒杀下单的逻辑,不用去将消息放到阻塞队列,我们直接从redis的队列中取出就行;

@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {long orderId = redisIDWork.nextId("order");Long userId = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA,Collections.emptyList(), voucherId.toString(),userId.toString(),String.valueOf(orderId));if (res != 0) {return Result.fail(res == 1 ? "库存不足" : "一人只能购买一单");}orderService = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);
}

这里我们需要将订单id作为lua脚本的参数传入进去,然后将订单信息存入阻塞队列的操作可以省略,因为我们已经将订单信息存入了redis中的消息队列;

然后这里我们需要单独开出一个线程去将队列中的消息存入数据库:

private class VoucherOrderHandle implements Runnable {String ququeName="stream.orders";@Overridepublic void run() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1) block(2000) streams key  >List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(ququeName, ReadOffset.lastConsumed()));//如果消息为空就继续等待接收if (msg==null||msg.isEmpty()){continue;}//因为每次读取一个消息,所以我们获取第一个消息MapRecord<String, Object, Object> entries = msg.get(0);//获取消息的值,是一些我们传入的键值对Map<Object, Object> value = entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);//确认消息已经处理stringRedisTemplate.opsForStream().acknowledge(ququeName,"g1",entries.getId());}} catch (InterruptedException e) {log.info("下单业务异常",e);handleVoucherOrderError();}}

我们要做的就是接受消息,然后再将消息存入数据库:

我们调用stream的方法,作为消费者从队列中读取消息,阻塞时间是2秒,每次读取一个消息,从下一个未消费的消息读取,如果读取的消息为空那么就继续循环读取消息,如果有消息就将消息取出,然后将其转成对象map,再将其转成对象,然后再去做确认消息的处理,如果不确认消息,消息就会存在待处理的队列中;如果出现的异常,那么我们取出的消息可能没有进行确认,没有确认的会存入待处理队列,我们就要从队列里取出然后进行处理;

出错只会执行的方法:

 private void handleVoucherOrderError() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1)  streams key  0,表示从第一个未处理的消息开始读取List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1), StreamOffset.create(ququeName, ReadOffset.from("0")));//如果为空就说明没有待处理的消息结束就行if (msg==null||msg.isEmpty()){break;}//因为每次读取一个消息,所以我们获取第一个消息MapRecord<String, Object, Object> entries = msg.get(0);//获取消息的值,是一些我们传入的键值对Map<Object, Object> value = entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);}} catch (InterruptedException e) {log.info("下单业务异常",e);}}
}

这里因为是再待处理中直接取出,所以不用阻塞处理,然后从待消费队列中第一个消息开始读,如果为空,那么就说明没有待处理的消息,我们直接返回就行,如果不为空我们再处理

这样使用redis中的消息队列就实现了:1:独立的服务,足够的内存;2:有确认机制,避免消息漏读;3:消息持久化

BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);
//执行方法
handleVoucherOrder(voucherOrder);
}
} catch (InterruptedException e) {
log.info(“下单业务异常”,e);
}
}
}


> 这里因为是再待处理中直接取出,所以不用阻塞处理,然后从待消费队列中第一个消息开始读,如果为空,那么就说明没有待处理的消息,我们直接返回就行,如果不为空我们再处理这样使用redis中的消息队列就实现了:1:独立的服务,足够的内存;2:有确认机制,避免消息漏读;3:消息持久化![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://i-blog.csdnimg.cn/direct/9d63425d09764b7c8b385a64615924a2.png)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/885089.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小新学习k8s第六天之pod详解

一、资源限制 Pod是k8s中的最小的资源管理组件&#xff0c;pod也是最小化运行容器化应用的资源对象。一个Pod代表着集群中运行的一个进程。k8s中其他大多数组件都是围绕着Pod来进行支撑和扩展Pod功能的&#xff0c;例如&#xff0c;用于管理Pod运行的StatefulSet和Deployment等…

Solana 代币 2022 — Transfer Hook

从零到英雄的 Solana 代币 2022 — Transfer Hook Token 2022 计划引入了几项令人兴奋的扩展&#xff0c;增强了铸造和代币账户的功能。在这些功能中&#xff0c;我个人最喜欢的是Transfer Hook &#xff08;转账钩子&#xff09; 。 想象时间 让我们戴上想象的帽子&#xf…

自定义类型:结构体(一)

一 . 结构体的相关概念 结构体&#xff0c;无需多言&#xff0c;是我们的老朋友了&#xff0c;我们之前就学习过一些有关结构体的知识&#xff0c;今天我们就来正式认识一下这个朋友 结构体属于一种自定义类型&#xff0c;在我们C语言中&#xff1a;自定义类型并非只有结构体…

使用匿名管道时出现程序一直运行问题

父进程创建两个子进程&#xff0c;父子进程之间利用管道进行通信。要求能显示父进程、子进程各自的信息&#xff0c;体现通信效果。(源程序pipe_1.c) 一开始&#xff0c;我忘了初始化pipe,很傻*的直接把fd当管道使&#xff0c;出现了儿子喊爸爸"i am your father."的…

协程4 --- 一个特殊的栈溢出例子

文章目录 代码运行结果分析 代码 先看下面这个程序流程&#xff1a; 有个长度位24的字符数组buffer&#xff0c;前面16个字符初始化。 把attack函数的地址复制到后面8个字符&#xff08;编译成64位程序&#xff0c;指针大小为8Byte&#xff09;。 打印信息&#xff1a;do Some…

C++用string实现字符串相加

. - 力扣&#xff08;LeetCode&#xff09; -》》》》》题目链接 实现思路&#xff1a;计算数字符串长度并用数组的方式计算出字符位置&#xff0c;用字符的ask码‘0’计算出字符本身。 class Solution { public:string addStrings(string num1, string num2) {string str;int…

03 Oracle进程秘籍:深度解析Oracle后台进程体系

文章目录 Oracle进程秘籍&#xff1a;深度解析Oracle后台进程体系一、Oracle后台进程概览1.1 DBWn&#xff08;Database Writer Process&#xff09;1.2 LGWR&#xff08;Log Writer Process&#xff09;1.3 SMON&#xff08;System Monitor Process&#xff09;1.4 PMON&#…

【大数据学习 | kafka高级部分】文件清除原理

2. 两种文件清除策略 kafka数据并不是为了做大量存储使用的&#xff0c;主要的功能是在流式计算中进行数据的流转&#xff0c;所以kafka中的数据并不做长期存储&#xff0c;默认存储时间为7天 那么问题来了&#xff0c;kafka中的数据是如何进行删除的呢&#xff1f; 在Kafka…

浏览器存储策略解析(三)Local/sessionStorage实战:如何查看本地浏览器上数据

物理意义上的localStorage/sessionStorage在哪里 我们都知道&#xff0c;localStorage存于本地&#xff0c;sessionStorage存于会话&#xff0c;这是见名知意可以得到的。但是在物理层面他们究竟存储在哪里呢&#xff1f; localStorage和sessionStorage一样&#xff0c;是存储…

设计模式讲解02—责任链模式(Chain)

1. 概述 定义&#xff1a;责任链模式是一种行为型模式&#xff0c;在这个模式中&#xff0c;通常创建了一个接收者对象的链来处理请求&#xff0c;该请求沿着链的顺序传递。直到有对象处理该请求为止&#xff0c;从而达到解耦请求发送者和请求处理者的目的。 解释&#xff1a;责…

判断二叉搜索树(递归)

给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。binary search tree (BST) 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含 小于 当前节点的数。节点的右子树只包含 大于 当前节点的数。所有左子树和右子树自身必须也是二叉搜索树。 …

私有化视频平台EasyCVR海康大华宇视视频平台视频诊断技术是如何实时监测视频质量的?

在现代视频监控系统中&#xff0c;确保视频流的质量和稳定性至关重要。随着技术的进步&#xff0c;视频诊断技术已经成为实时监测视频质量的关键工具。这种技术通过智能分析算法对视频流进行实时评估和处理&#xff0c;能够自动识别视频中的各种质量问题&#xff0c;并给出相应…

大语言模型(LLM)量化基础知识(一)

请大家关注我的知乎博客&#xff1a;- 派神 - - 知乎 随着大型语言模型 (LLM) 的参数数量的增长,与其支持硬件&#xff08;加速器内存&#xff09;增长速度之间的差距越来越大&#xff0c;如下图所示&#xff1a; 上图显示&#xff0c;从 2017 年到 2022 年&#xff0c;语言模…

【comfyui教程】ComfyUI 现已支持 Stable Diffusion 3.5 Medium!人人都能轻松上手的图像生成利器

前言 ComfyUI 现已支持 Stable Diffusion 3.5 Medium&#xff01;人人都能轻松上手的图像生成利器 大家翘首以盼的Stable Diffusion 3.5 Medium模型终于来了&#xff01;就在今天&#xff0c;Stability AI 正式推出了这款“亲民版”平衡模型&#xff0c;让创作者们得以在消费…

大模型微调技术 --> LoRA 系列之 AdaLoRA

AdaLoRA 1.摘要 之前的微调方法(如低秩更新)通常将增量更新的预算均匀地分布在所有预训练的权重矩阵上&#xff0c;并且忽略了不同权重参数的不同重要性。结果&#xff0c;微调结果不是最优的。 为了弥补这一差距&#xff0c;我们提出了AdaLoRA&#xff0c;它根据权重矩阵的…

带你搞懂红黑树的插入和删除

文章目录 1. 红黑树1.1 红黑树的概念1.2 红黑树的性质1.3 红黑树节点的定义1.4 红黑树的插入找到插入的位置调节平衡 1.5 红黑树的删除删除节点平衡调整 1.6 红黑树和AVL树的比较 1. 红黑树 1.1 红黑树的概念 红黑树也是一种二叉搜索树,但是在每一个节点上增加了一个存储位表…

揭秘全向轮运动学:机动艺术与上下位机通信的智慧桥梁

✨✨ Rqtz 个人主页 : 点击✨✨ &#x1f308;Qt系列专栏:点击 &#x1f388;Qt智能车上位机专栏: 点击&#x1f388; 本篇文章介绍的是有关于全向轮运动学分析&#xff0c;单片机与上位机通信C代码以及ROS里程计解算的内容。 目录 大纲 ROS&#xff08;机器人操作系统&…

移远通信推出八款天线新品,覆盖5G、4G、Wi-Fi和LoRa领域

近日&#xff0c;全球领先的物联网整体解决方案供应商移远通信宣布&#xff0c;再次推出八款高性能天线新品&#xff0c;进一步丰富其天线产品阵容&#xff0c;更好地满足全球客户对高品质天线的更多需求。具体包括5G超宽带天线YECT005W1A和YECT004W1A、5G天线YECT028W1A、4G天…

【设计模式系列】桥接模式(十三)

一、什么是桥接模式 桥接模式&#xff08;Bridge Pattern&#xff09;是一种结构型设计模式&#xff0c;其核心目的是将抽象部分与实现部分分离&#xff0c;使它们可以独立地变化。这种模式主要用于处理那些在设计时无法确定实现细节的场合&#xff0c;或者需要在多个实现之间…

Java多态和继承(下篇)

今天接着学习多态和继承 目录 1 继承1.1 再谈初始化1.2 protect关键字1.3 继承方式1.4 final 关键字1.5 组合 2 多态2.1 多态的概念2.2 多态实现条件2.3 重写2.4 向上转型和向下转型2.4.1 向上转型2.4.2 向下转型 2.5 多态的优缺点2.6 避免在构造方法中使用重写的方法 总结 1 继…