Redis:原理速成+项目实战——Redis实战10(Redis消息队列实现异步秒杀)

👨‍🎓作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
🌌上期文章:Redis:原理速成+项目实战——Redis实战9(秒杀优化)
📚订阅专栏:Redis:原理速成+项目实战
希望文章对你们有所帮助

上一节已经实现了异步秒杀,也就是将秒杀分为两个环节:
1、判断是否有抢单资格(库存量是否充足、是否满足一人一单)、
2、下单操作(优惠券表中的库存量-1,订单表增加相应信息)
其中,第一步的操作放在了Redis中,可以有效提高效率,而真正大幅度提高效率的点还是因为我们将下单的操作交给了另一个开辟的线程,因为对数据库的操作并不需要什么时效性。

异步执行所需要的信息被封装并保存到了阻塞队列中,上一节分析了这会造成的问题:
1、内存限制问题
2、数据安全问题

消息队列可以解决这个问题,一般建议用专业的消息中间件来使用,最主流的当然就是RabbitMQ了,但是这边也讲解一下用Redis里面的一些数据结构来模拟出消息队列的效果,实现的话我感觉也挺容易的,只演示基于Stream消息队列实现异步秒杀。

Redis消息队列实现异步秒杀

  • 认识消息队列
  • 基于List实现消息队列
  • PubSub实现消息队列
  • Stream的单消费模式
  • Stream的消费者组模式
  • 基于Stream消息队列实现异步秒杀

认识消息队列

消息队列,也就是存放消息的队列,最简单的消息队列包括3个角色:
(1)消息队列(代理):存储和管理信息
(2)生产者:发消息到消息队列
(3)消费者:从消息队列中获取消息并处理
因此,异步秒杀的思路为:
在这里插入图片描述

这个思路与上一节用阻塞队列的思路是差不多的,但是有2点重要区别:
1、消息队列是JVM以外的独立服务,不受JVM内存的限制
2、消息队列不仅仅做数据存储,还确保了数据安全,存到消息队列中的消息会做持久化处理,并要求消费者要做出消息的确认,否则会持续将消息传递给消费者,确保消息至少被“签收”一次

基于List实现消息队列

List是一种双向链表,很容易模拟出队列。
需要注意的是,当消息队列中没有消息的时候,我们应当要让线程等待,而不是直接返回Null,因此这儿要用BRPOPBLPOP来实现阻塞效果(B表示阻塞)

优点:
(1)利用Redis存储,不受限于JVM内存上限
(2)基于Redis的持久化机制,保证数据安全性
(3)满足消息有序性
缺点:
(1)无法避免消息丢失(消息会从队列直接移除)
(2)只支持单消费者

PubSub实现消息队列

PubSub(发布订阅)是Redis2.0引入的,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

优点:采用发布订阅模型,支持多生产、多消费
缺点:
(1)不支持数据持久化
(2)无法避免消息丢失
(3)消息堆积有上线,超出时数据丢失

Stream的单消费模式

Stream是Redis5.0引入的一种新数据类型,可以实现功能完善的消息队列。
在这里插入图片描述
例如:
在这里插入图片描述
读取消息:XREAD
在这里插入图片描述
例如,用XREAD读第一个消息:

XREAD COUNT 1 STREAMS users 0

用XREAD阻塞方式读取最新消息:

XREAD COUNT 1 BLOCK STREAMS users $

所以,在开发的时候,可以循环调用XREAD阻塞方式来查询最新消息,从而实现持久监听队列。
但是,当指定起始ID为$读取最新消息,处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取也只能获取到最新的一条,会出现消息漏读

特点:
(1)消息可回溯
(2)一个消息可以被多个消费者读取
(3)可以阻塞读取
(4)有消息漏读的风险

Stream的消费者组模式

这一部分命令还是麻烦了,理解就行,要使用就去看文档就好了。

消费者组可以解决消息漏读的问题。
消费者组:将多个消费者划分到一个组中,监听同一个队列。

特点:
1、消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理速度
2、消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后去读取消息,确保了每一个消息都会被消费
3、消息确认:消费者获取消息后,消息处于pending状态,存入pending-list,当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移出,可以解决消息丢失的问题

创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
删除指定消费者组
XGROUP DESTROY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupName consumerName

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key ID
其中,ID表示获取消息的起始ID:
(1)“>”:从下一个未消费的消息开始
(2)其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中第一个消息开始

基于Stream消费者组,我们利用消费者监听消息的基本思路:
1、使用阻塞模式尝试监听队列,没消息就继续监听,有消息就开线程处理消息,并在完成后ACK。
2、若没有成功ACK,抛出异常,那么消息就会留在padding-list中,这时候就需要读取padding-list获取异常消息并处理。

STREAM类型消息队列的XREADGROUP命令特点:

1、消息可回溯
2、可以多消费者争抢消息,加快消费速度
3、可以阻塞读取
4、没有消息漏读的风险
5、有消息确认机制,保证消息至少被消费一次

基于Stream消息队列实现异步秒杀

1、创建Stream类型的消息队列stream.orders和消费者组:

XGROUP CREATE stream.orders g1 0 MKSTREAM # 组名g1,起始位置为0

在这里插入图片描述
2、修改之前秒杀下单的Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId:

-- 1 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3 脚本业务
-- 3.1 判断库存是够充足
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足,返回1return 1
end
-- 3.2 判断用户是否下单,即判断用户id是不是这个set集合的成员
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.2 存在,说明重复下单return 2
end
-- 3.4 扣库存
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户)
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,orderId的key指定为Id更好,因为订单实体类是这么定义的
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'Id', orderId)
return 0

与上次的代码相比,我们多增加了一个参数,所以我们要修改一下函数的调用:
在这里插入图片描述
这个参数的增加,在后续的编写中会省去一些麻烦。

3、项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单,整体的业务流程的代码如下:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {//注入秒杀优惠券的service@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate RedissonClient redissonClient;@Resourceprivate StringRedisTemplate stringRedisTemplate;public static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}//线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//线程任务,用户随时都能抢单,所以应该要在这个类被初始化的时候马上开始执行@PostConstruct  //该注解表示在当前类初始化完毕以后立即执行private void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private void handleVoucherOrder(VoucherOrder voucherOrder) {//获取用户,用户id不能再从UserHolder中取了,因为现在是从线程池获取的全新线程,不是主线程Long userId = voucherOrder.getUserId();//创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁boolean isLock = lock.tryLock();//判断是否获取锁成功if(!isLock){log.error("不允许重复下单");//理论上不会发生}try {proxy.createVoucherOrder(voucherOrder);} finally {lock.unlock();}}IVoucherOrderService proxy;private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while (true){try {//获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 BLOCK 2000 STREAMS stream.ordersList<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//判断消息获取是否成功if(list == null || list.isEmpty()) {//获取失败,说明没有消息,继续下一次循环continue;}//解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();//将其转变为VoucherOrder对象,忽略异常VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//获取成功,下单handleVoucherOrder(voucherOrder);//ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {//异常则表示没有被ACK确认,剩下的操作都是针对pending-list的log.error("处理订单异常", e);handlePendingList();}}}private void handlePendingList() {while (true){try {//获取pending-list中的订单信息 XREADGROUP g1 c1 COUNT 1 STREAMS stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), //消费者信息StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(list == null || list.isEmpty()) {//获取失败,说明没有消息,结束循环break;}//解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();//将其转变为VoucherOrder对象,忽略异常VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//获取成功,下单handleVoucherOrder(voucherOrder);//ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list订单异常", e);}}}}//秒杀优化,调用Lua的代码@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();//获取订单idlong orderId = redisIdWorker.nextId("order");//执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));//判断结果是否为0int r = result.intValue();if(r != 0){//不为0,没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//返回订单idreturn Result.ok(orderId);}@Transactional(rollbackFor = Exception.class)public void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();//查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//判断是否存在if (count > 0) {log.error("不可重复购买");}//扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {log.error("库存不足");}//保存订单this.save(voucherOrder);}
}

我觉得真的还是太麻烦了。。。而且我遇到了很多次bug,反正都跟线程池有关系,自己修改bug的能力一般,耽误了不少时间,这方面能力要提高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/614455.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Eureka切换Nacos时发现两个注册中心的解决方法

报错信息如下&#xff0c;意思是发现了两个注册中心 Field autoServiceRegistration in org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration required a single bean, but 2 were found: - nacosAutoServiceRegistration: defined…

数学经典教材有什么?

有本书叫做《自然哲学的数学原理》&#xff0c;是牛顿写的&#xff0c;读完之后你就会感叹牛顿的厉害之处! 原文完整版PDF&#xff1a;https://pan.quark.cn/s/5d5eac2e56af 那玩意真的是人写出来的么… 现代教材把牛顿力学简化成三定律&#xff0c;当然觉得很简单。只有读了原…

【高等数学之不定积分】

一、什么是不定积分? 我们可以简单地从英文层面来基础剖析一下&#xff0c;什么是不定积分? 1.1、基本概念 小tips: 二、不定积分运算法则 三、常用积分公式 四、第一类换元积分法 4.1、定义 4.2、常用凑微分公式 4.3、小calculate 五、第二类换元积分法 5.1、定义 …

JQuery异步加载表格选择记录

JQuery异步加载表格选择记录 JQuery操作表格 首先在页面中定义一个表格对象 <table id"insts" class"table"><thead><tr><th>列1</th><th>列2</th><th>例3</th><th></th></tr>…

国际化翻译系统V2正式上线了

1、前言 之前上线了移动端国际化翻译系统V1版本&#xff0c;其中有一些弊端&#xff0c;例如&#xff1a; 1、项目仅能适用于Android和iOS项目&#xff0c;针对前端项目&#xff0c;Flutter项目&#xff0c;和后端项目无法支持2、之前的桌面程序需要搭建本地java环境才能运行…

基于 NFS 的文件共享实现

NFS&#xff08;Network File System&#xff09;即网络文件系统&#xff0c;它允许网络中的计算机之间通过 TCP/IP 网络共享文件资源&#xff0c;服务端通过 NFS 共享文件目录&#xff0c;客户端将该文件目录挂载在本地文件系统中&#xff0c;就可以像操作本地文件一样读写服务…

PostgreSQL 配置文件、数据储存目录

文章目录 查询配置文件所在位置查询数据储存目录PostgreSQL的数据目录 查询配置文件所在位置 show config_file; -- 查询配置文件所在位置查询数据储存目录 show data_directory; -- 查询数据储存目录PostgreSQL的数据目录 在PostgreSQL的数据目录&#xff08;C:\Program…

数据采集卡:16通道16位250KHz AD,支持单点采集,程控增益

概述 USB-XM1603是一款性价比极高的多功能通用A/D板&#xff0c;经过精心设计&#xff0c;采用USB2.0总线支持热插拔&#xff0c;即插即用&#xff0c;无需地址跳线。适合测量变送器输出、直流电压等场合的测量应用。USB-XM1603具有16路单端16位程控增益模拟输入、4路16位模拟…

记录汇川:H5U与Fctory IO测试8

主程序&#xff1a; 子程序&#xff1a; IO映射 子程序&#xff1a; 出料程序 子程序&#xff1a; 重量程序 子程序&#xff1a; 自动程序 Fctory IO配置&#xff1a; HMI配置 实际动作如下&#xff1a; Fctory IO测试8

Element|Upload结合Progress实现上传展示进度条

背景 &#xff1a; 项目里的 附件上传 题型组件&#xff0c;用户在上传过程中&#xff0c;如果文件较大&#xff0c;上传过程较慢&#xff0c;而又没有一个类似 Loading... 的加载过程的话&#xff0c;会显得干愣愣的&#xff0c;用户体验较差&#xff0c;所以需要添加一个进度…

SpringBoot外部配置文件

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; 循序渐进学SpringBoot ✨特色专栏&…

《Training language models to follow instructions》论文解读--训练语言模型遵循人类反馈的指令

目录 1摘要 2介绍 方法及实验细节 3.1高层次方法论 3.2数据集 3.3任务 3.4人体数据收集 3.5模型 3.6评价 4 结果 4.1 API分布结果 4.2公共NLP数据集的结果 4.3定性结果 问题 1.什么是rm分数 更多资料 1摘要 使语言模型更大并不能使它们更好地遵循用户的意图。例…

if单分支,二分支,多分支,语句嵌套,while语句,for语句(Python实现)

一、主要目的&#xff1a; 1&#xff0e;熟悉程序设计结构的三种方式 2.掌握if单分支语句、if二分支语句、if多分支语句及if语句嵌套的使用方法 3.掌握while语句的使用方法 4.掌握for语句的使用方法 5.掌握循环嵌套的使用方法 二、主要内容和结果展现&#xff1a; 1&…

Spark on Hive及 Spark SQL的运行机制

Spark on Hive 集成原理 HiveServer2的主要作用: 接收SQL语句&#xff0c;进行语法检查&#xff1b;解析SQL语句&#xff1b;优化&#xff1b;将SQL转变成MapReduce程序&#xff0c;提交到Yarn集群上运行SparkSQL与Hive集成&#xff0c;实际上是替换掉HiveServer2。是SparkSQL…

Handsfree_ros_imu:ROS机器人IMU模块ARHS姿态传感器(A9)Liunx系统Ubuntu20.04学习启动和运行教程

这个是篇学习 Handsfree_ros_imu 传感器的博客记录 官方教程链接见&#xff1a; https://docs.taobotics.com/docs/hfi-imu/ 产品功能 IMU 内有 加速度计&#xff0c;陀螺仪&#xff0c;磁力计这些传感器&#xff0c;通过固定 imu 到物体上后&#xff0c;可以获取物体在运动…

Python OpenCv中调用cv2.selectROI( )函数提取图像中指定区域(高效抠图)

目录 一、cv2.selectROI()函数参数二、代码三、提取结果四、总结 一、cv2.selectROI()函数参数 下面是cv2.seletROI()函数中各个参数的解析&#xff1a; selectROI(windowName, img, showCrosshairNone, fromCenterNone):. 参数windowName&#xff1a;选择的区域被显示在的…

(Java企业 / 公司项目)配置Gateway + Nacos应用名路由转发?

首先看项目的gateway&#xff0c; 没有进行路由转发的时候的缺点 在gateway模块中的配置的路径都是写死的&#xff0c;到时候我们更改了IP地址又要改这个代码&#xff0c;会很麻烦所以我们应该怎么样做才能使得请求更加方便&#xff1f;这是子模块 在我们请求模块member中配置…

x-cmd pkg | llm - 用于与 OPENAI 交互的命令行工具

目录 简介首次用户功能特点进一步探索 简介 llm 是一个命令行工具和 Python 库&#xff0c;用于与大型语言模型&#xff08;Large Language Models&#xff0c;简称 LLMs&#xff09;交互&#xff0c;既可以通过远程 API 访问&#xff0c;也可以在本地机器上运行安装的模型。由…

蚁群算法解决旅行商问题的完整Python实现

蚁群算法&#xff08;Ant Colony Optimization&#xff0c;简称ACO&#xff09;是一种模拟蚂蚁觅食行为的启发式优化算法。它通过模拟蚂蚁在寻找食物时释放信息素的行为&#xff0c;来解决组合优化问题&#xff0c;特别是旅行商问题&#xff08;TSP&#xff09;。 蚁群算法的基…

C#.Net学习笔记——设计模式六大原则

***************基础介绍*************** 1、单一职责原则 2、里氏替换原则 3、依赖倒置原则 4、接口隔离原则 5、迪米特法原则 6、开闭原则 一、单一职责原则 举例&#xff1a;类T负责两个不同的职责&#xff1a;职责P1&#xff0c;职责P2。当由于职责P1需求发生改变而需要修…