Redis:原理速成+项目实战——Redis实战10(Redis消息队列实现异步秒杀)

👨‍🎓作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
🌌上期文章:Redis:原理速成+项目实战——Redis实战9(秒杀优化)
📚订阅专栏:Redis:原理速成+项目实战
希望文章对你们有所帮助

上一节已经实现了异步秒杀,也就是将秒杀分为两个环节:
1、判断是否有抢单资格(库存量是否充足、是否满足一人一单)、
2、下单操作(优惠券表中的库存量-1,订单表增加相应信息)
其中,第一步的操作放在了Redis中,可以有效提高效率,而真正大幅度提高效率的点还是因为我们将下单的操作交给了另一个开辟的线程,因为对数据库的操作并不需要什么时效性。

异步执行所需要的信息被封装并保存到了阻塞队列中,上一节分析了这会造成的问题:
1、内存限制问题
2、数据安全问题

消息队列可以解决这个问题,一般建议用专业的消息中间件来使用,最主流的当然就是RabbitMQ了,但是这边也讲解一下用Redis里面的一些数据结构来模拟出消息队列的效果,实现的话我感觉也挺容易的,只演示基于Stream消息队列实现异步秒杀。

Redis消息队列实现异步秒杀

  • 认识消息队列
  • 基于List实现消息队列
  • PubSub实现消息队列
  • Stream的单消费模式
  • Stream的消费者组模式
  • 基于Stream消息队列实现异步秒杀

认识消息队列

消息队列,也就是存放消息的队列,最简单的消息队列包括3个角色:
(1)消息队列(代理):存储和管理信息
(2)生产者:发消息到消息队列
(3)消费者:从消息队列中获取消息并处理
因此,异步秒杀的思路为:
在这里插入图片描述

这个思路与上一节用阻塞队列的思路是差不多的,但是有2点重要区别:
1、消息队列是JVM以外的独立服务,不受JVM内存的限制
2、消息队列不仅仅做数据存储,还确保了数据安全,存到消息队列中的消息会做持久化处理,并要求消费者要做出消息的确认,否则会持续将消息传递给消费者,确保消息至少被“签收”一次

基于List实现消息队列

List是一种双向链表,很容易模拟出队列。
需要注意的是,当消息队列中没有消息的时候,我们应当要让线程等待,而不是直接返回Null,因此这儿要用BRPOPBLPOP来实现阻塞效果(B表示阻塞)

优点:
(1)利用Redis存储,不受限于JVM内存上限
(2)基于Redis的持久化机制,保证数据安全性
(3)满足消息有序性
缺点:
(1)无法避免消息丢失(消息会从队列直接移除)
(2)只支持单消费者

PubSub实现消息队列

PubSub(发布订阅)是Redis2.0引入的,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

优点:采用发布订阅模型,支持多生产、多消费
缺点:
(1)不支持数据持久化
(2)无法避免消息丢失
(3)消息堆积有上线,超出时数据丢失

Stream的单消费模式

Stream是Redis5.0引入的一种新数据类型,可以实现功能完善的消息队列。
在这里插入图片描述
例如:
在这里插入图片描述
读取消息:XREAD
在这里插入图片描述
例如,用XREAD读第一个消息:

XREAD COUNT 1 STREAMS users 0

用XREAD阻塞方式读取最新消息:

XREAD COUNT 1 BLOCK STREAMS users $

所以,在开发的时候,可以循环调用XREAD阻塞方式来查询最新消息,从而实现持久监听队列。
但是,当指定起始ID为$读取最新消息,处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取也只能获取到最新的一条,会出现消息漏读

特点:
(1)消息可回溯
(2)一个消息可以被多个消费者读取
(3)可以阻塞读取
(4)有消息漏读的风险

Stream的消费者组模式

这一部分命令还是麻烦了,理解就行,要使用就去看文档就好了。

消费者组可以解决消息漏读的问题。
消费者组:将多个消费者划分到一个组中,监听同一个队列。

特点:
1、消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理速度
2、消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后去读取消息,确保了每一个消息都会被消费
3、消息确认:消费者获取消息后,消息处于pending状态,存入pending-list,当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移出,可以解决消息丢失的问题

创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
删除指定消费者组
XGROUP DESTROY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupName consumerName

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key ID
其中,ID表示获取消息的起始ID:
(1)“>”:从下一个未消费的消息开始
(2)其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中第一个消息开始

基于Stream消费者组,我们利用消费者监听消息的基本思路:
1、使用阻塞模式尝试监听队列,没消息就继续监听,有消息就开线程处理消息,并在完成后ACK。
2、若没有成功ACK,抛出异常,那么消息就会留在padding-list中,这时候就需要读取padding-list获取异常消息并处理。

STREAM类型消息队列的XREADGROUP命令特点:

1、消息可回溯
2、可以多消费者争抢消息,加快消费速度
3、可以阻塞读取
4、没有消息漏读的风险
5、有消息确认机制,保证消息至少被消费一次

基于Stream消息队列实现异步秒杀

1、创建Stream类型的消息队列stream.orders和消费者组:

XGROUP CREATE stream.orders g1 0 MKSTREAM # 组名g1,起始位置为0

在这里插入图片描述
2、修改之前秒杀下单的Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId:

-- 1 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3 脚本业务
-- 3.1 判断库存是够充足
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足,返回1return 1
end
-- 3.2 判断用户是否下单,即判断用户id是不是这个set集合的成员
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.2 存在,说明重复下单return 2
end
-- 3.4 扣库存
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户)
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,orderId的key指定为Id更好,因为订单实体类是这么定义的
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'Id', orderId)
return 0

与上次的代码相比,我们多增加了一个参数,所以我们要修改一下函数的调用:
在这里插入图片描述
这个参数的增加,在后续的编写中会省去一些麻烦。

3、项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单,整体的业务流程的代码如下:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {//注入秒杀优惠券的service@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate RedissonClient redissonClient;@Resourceprivate StringRedisTemplate stringRedisTemplate;public static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}//线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//线程任务,用户随时都能抢单,所以应该要在这个类被初始化的时候马上开始执行@PostConstruct  //该注解表示在当前类初始化完毕以后立即执行private void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private void handleVoucherOrder(VoucherOrder voucherOrder) {//获取用户,用户id不能再从UserHolder中取了,因为现在是从线程池获取的全新线程,不是主线程Long userId = voucherOrder.getUserId();//创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁boolean isLock = lock.tryLock();//判断是否获取锁成功if(!isLock){log.error("不允许重复下单");//理论上不会发生}try {proxy.createVoucherOrder(voucherOrder);} finally {lock.unlock();}}IVoucherOrderService proxy;private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while (true){try {//获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 BLOCK 2000 STREAMS stream.ordersList<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//判断消息获取是否成功if(list == null || list.isEmpty()) {//获取失败,说明没有消息,继续下一次循环continue;}//解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();//将其转变为VoucherOrder对象,忽略异常VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//获取成功,下单handleVoucherOrder(voucherOrder);//ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {//异常则表示没有被ACK确认,剩下的操作都是针对pending-list的log.error("处理订单异常", e);handlePendingList();}}}private void handlePendingList() {while (true){try {//获取pending-list中的订单信息 XREADGROUP g1 c1 COUNT 1 STREAMS stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), //消费者信息StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(list == null || list.isEmpty()) {//获取失败,说明没有消息,结束循环break;}//解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();//将其转变为VoucherOrder对象,忽略异常VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//获取成功,下单handleVoucherOrder(voucherOrder);//ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list订单异常", e);}}}}//秒杀优化,调用Lua的代码@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();//获取订单idlong orderId = redisIdWorker.nextId("order");//执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));//判断结果是否为0int r = result.intValue();if(r != 0){//不为0,没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//返回订单idreturn Result.ok(orderId);}@Transactional(rollbackFor = Exception.class)public void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();//查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//判断是否存在if (count > 0) {log.error("不可重复购买");}//扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {log.error("库存不足");}//保存订单this.save(voucherOrder);}
}

我觉得真的还是太麻烦了。。。而且我遇到了很多次bug,反正都跟线程池有关系,自己修改bug的能力一般,耽误了不少时间,这方面能力要提高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/614455.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Eureka切换Nacos时发现两个注册中心的解决方法

报错信息如下&#xff0c;意思是发现了两个注册中心 Field autoServiceRegistration in org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration required a single bean, but 2 were found: - nacosAutoServiceRegistration: defined…

数学经典教材有什么?

有本书叫做《自然哲学的数学原理》&#xff0c;是牛顿写的&#xff0c;读完之后你就会感叹牛顿的厉害之处! 原文完整版PDF&#xff1a;https://pan.quark.cn/s/5d5eac2e56af 那玩意真的是人写出来的么… 现代教材把牛顿力学简化成三定律&#xff0c;当然觉得很简单。只有读了原…

【高等数学之不定积分】

一、什么是不定积分? 我们可以简单地从英文层面来基础剖析一下&#xff0c;什么是不定积分? 1.1、基本概念 小tips: 二、不定积分运算法则 三、常用积分公式 四、第一类换元积分法 4.1、定义 4.2、常用凑微分公式 4.3、小calculate 五、第二类换元积分法 5.1、定义 …

JQuery异步加载表格选择记录

JQuery异步加载表格选择记录 JQuery操作表格 首先在页面中定义一个表格对象 <table id"insts" class"table"><thead><tr><th>列1</th><th>列2</th><th>例3</th><th></th></tr>…

Doris 数据模型—Duplicate 模型

Doris 数据模型—Duplicate 模型 文章目录 Doris 数据模型—Duplicate 模型有排序的 Duplicate 模型无排序列 Duplicate 模型总结这是我们关于Doris 数据模型 的最后一节,也就是说到这里我们关于Doris 数据模型介绍就完了,其实Duplicate 模型模型的主要业务场景就是业务上数据…

vue-计算属性

介绍&#xff1a; 在JavaScript中&#xff0c;计算属性&#xff08;Computed Property&#xff09;是一种特殊类型的属性&#xff0c;其值是根据其他属性的值动态计算出来的。计算属性的名称通常以方括号 [] 包围&#xff0c;并且它们的值是根据一个或多个其他属性的值计算得出…

java常见面试题:如何使用Java进行文件操作?

在Java中&#xff0c;你可以使用java.io包中的类来进行文件操作。下面是一些常见的文件操作&#xff0c;我会详细解释并提供相应的示例代码。 读取文件 使用java.io.FileReader和java.io.BufferedReader来读取文本文件。 java复制代码 import java.io.BufferedReader; impor…

CodeParrot数据集

CodeParrot 是一个用于研究和开发自然语言编程接口的数据集。这个数据集包含了大量的编程任务以及与之对应的自然语言描述&#xff0c;这些描述可以被用来训练和评估自然语言理解&#xff08;NLU&#xff09;和代码生成模型。CodeParrot 数据集的目标是促进研究者在自然语言编程…

基于51单片机的万年历系统设计

标题&#xff1a;基于单片机的万年历系统设计与实现 摘要&#xff1a; 本文主要研究和实现了基于51系列单片机&#xff08;如AT89S51&#xff09;为核心的电子万年历系统的设计方案。系统通过集成DS1302时钟芯片作为实时时间基准&#xff0c;结合液晶显示模块、按键输入模块及…

国际化翻译系统V2正式上线了

1、前言 之前上线了移动端国际化翻译系统V1版本&#xff0c;其中有一些弊端&#xff0c;例如&#xff1a; 1、项目仅能适用于Android和iOS项目&#xff0c;针对前端项目&#xff0c;Flutter项目&#xff0c;和后端项目无法支持2、之前的桌面程序需要搭建本地java环境才能运行…

基于 NFS 的文件共享实现

NFS&#xff08;Network File System&#xff09;即网络文件系统&#xff0c;它允许网络中的计算机之间通过 TCP/IP 网络共享文件资源&#xff0c;服务端通过 NFS 共享文件目录&#xff0c;客户端将该文件目录挂载在本地文件系统中&#xff0c;就可以像操作本地文件一样读写服务…

PostgreSQL 配置文件、数据储存目录

文章目录 查询配置文件所在位置查询数据储存目录PostgreSQL的数据目录 查询配置文件所在位置 show config_file; -- 查询配置文件所在位置查询数据储存目录 show data_directory; -- 查询数据储存目录PostgreSQL的数据目录 在PostgreSQL的数据目录&#xff08;C:\Program…

大模型技术的未来

大模型技术是当前人工智能领域的研究热点&#xff0c;其应用范围不断扩大&#xff0c;未来发展前景广阔。以下是大模型技术未来的发展趋势&#xff1a; 持续增长的数据规模&#xff1a;随着数据的不断积累和丰富&#xff0c;大模型将会继续保持增长态势&#xff0c;数据规模将…

Ubuntu 24.04 Preview 版安装 libtinfo5

Ubuntu 24.04 Preview 版安装 libtinfo5 0. 背景1. 安装 libtinfo52. 安装 cuda 0. 背景 Ubuntu 24.04 Preview 版安装 Cuda 时报确实 libtinfo5 的错误。 1. 安装 libtinfo5 wget http://archive.ubuntu.com/ubuntu/pool/universe/n/ncurses/libtinfo5_6.4-2_amd64.deb dpk…

数据采集卡:16通道16位250KHz AD,支持单点采集,程控增益

概述 USB-XM1603是一款性价比极高的多功能通用A/D板&#xff0c;经过精心设计&#xff0c;采用USB2.0总线支持热插拔&#xff0c;即插即用&#xff0c;无需地址跳线。适合测量变送器输出、直流电压等场合的测量应用。USB-XM1603具有16路单端16位程控增益模拟输入、4路16位模拟…

记录汇川:H5U与Fctory IO测试8

主程序&#xff1a; 子程序&#xff1a; IO映射 子程序&#xff1a; 出料程序 子程序&#xff1a; 重量程序 子程序&#xff1a; 自动程序 Fctory IO配置&#xff1a; HMI配置 实际动作如下&#xff1a; Fctory IO测试8

力扣_数组25—柱状图中最大的矩形

题目 给定 n 个非负整数&#xff0c;用来表示柱状图中各个柱子的高度。每个柱子彼此相邻&#xff0c;且宽度为 1 。 求在该柱状图中&#xff0c;能够勾勒出来的矩形的最大面积。 思路 暴力法&#xff1a; 有下述事实&#xff1a;最大矩形的高度一定等于某个柱子的高度遍历每…

简易学生管理系统-----------查看学生信息

code: --------------------------------- import java.util.ArrayList; import java.util.Scanner;public class StudentManager {public static void main(String[] args) {//创建集合对象&#xff0c;用于存储学生数据】ArrayList<Student> array new ArrayList<…

ReactHooks:渲染与useState

渲染和提交 组件显示到屏幕之前&#xff0c;必须被 React 渲染。主要需要经历以下三个步骤&#xff1a; 步骤1&#xff1a; 触发一次渲染 有两种原因会导致组件的渲染&#xff1a; 组件的初次渲染组件&#xff08;或其父组件&#xff09;的状态发生改变而触发重新渲染 当应…

Element|Upload结合Progress实现上传展示进度条

背景 &#xff1a; 项目里的 附件上传 题型组件&#xff0c;用户在上传过程中&#xff0c;如果文件较大&#xff0c;上传过程较慢&#xff0c;而又没有一个类似 Loading... 的加载过程的话&#xff0c;会显得干愣愣的&#xff0c;用户体验较差&#xff0c;所以需要添加一个进度…