Redis实战案例21-消息队列

1. 基于JVM的阻塞队列的局限

  1. JVM内存限制问题,大量订单出现时,可能会超过JVM阻塞队列上限;
  2. 阻塞队列并不能持久化,因为内存不能持久化,出现异常或者宕机之类的故障时,出现数据丢失;

所以引出消息队列的概念

消息队列的两个优点:

  1. 消息队列在JVM以外的独立服务,不受JVM的内存限制;
  2. 消息队列不仅仅做数据存储,确保数据安全,会做数据的持久化,并且消费者取数据要做消息确认;如果没有确认,那么消息会在队列中依旧存在,下一次会再投递给消费者,让它继续处理,直到确认为止,确保消息至少消费一次;

在这里插入图片描述

在这里插入图片描述

2. 基于List结构模拟消息队列

在这里插入图片描述

  1. 假设从队列里取到消息,取到还未处理就发生了异常,这是消息就无法处理了,因为pop相当于remove;
  2. 发送消息,一旦被消费者拿走之后,别的消费者就无法获得了,无法解决一条消息多个消费者使用;

在这里插入图片描述

3. 基于PubSub的消息队列(不建议使用)

在这里插入图片描述

PSUBSCRIBE 订阅的格式匹配的三种规则

在这里插入图片描述
在这里插入图片描述

PubSub消息队列在传递消息时并不会将消息持久化到硬盘上,而是将消息存储在内存中,当服务重启或者发生故障时,可能会导致消息丢失。

在这里插入图片描述

4. 基于Stream的消息队列

在这里插入图片描述
在这里插入图片描述

$:返回最新的消息,前提是该条信息并没有被消费者读过,否则就是返回nil

在这里插入图片描述

阻塞等待读取最新的消息,阻塞时间设置为0表示永久等待直到有新等待消息

在这里插入图片描述

在这里插入图片描述

重点:这种读取方式存在着弊端,当指定其实ID为$时,代表读取最新等待消息,此时处理一条消息的过程中,又来了一条以上的信息到队列,则下次获取也只能获取最新的一条,可能就会出现漏读消息的问题;

当消费者读取一次之后,再生产k4、k5,此时消费者再次阻塞读取最新的消息,再生产消息k6,此时消费者只能获取消息k6,出现了消息漏读;
在这里插入图片描述

5. 基于Stream的消息队列问题优化

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
示例:

在这里插入图片描述
消息确认(k1…k5都在pending-list中等待确认)、

在这里插入图片描述
查看pending-list中所有未确认的元素

在这里插入图片描述
从pendin-list确认未确认的消息,此时消息的起始ID为0

在这里插入图片描述

所以可以得出处理消息的大致流程:先利用>的方式去获取所有未消费的消息,然后确认,如果出现异常,在Java中catch采用0的方法去获取pending-list的消息(异常消息),处理完毕再确认,pending-list清空;之后再使用>的方式继续获取未消费的消息,直到阻塞时间过后返回为nil;

在这里插入图片描述

6. 基于Redis的Stream结构实现异步秒杀

在这里插入图片描述

创建消息队列

在这里插入图片描述

修改lua脚本,认定可以抢购直接发送消息到消息队列中

注意

如果 redis.call('get', stockKey) 返回的结果是空值(nil),那么尝试将空值转换为数字时会出现错误。
因为无法将空值转换为数字。
为了避免这种错误,可以在进行比较之前,先检查返回结果是否为非空值。
这样,如果 redis.call('get', stockKey) 返回的结果是空值,就不会进行比较,从而避免了错误。

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order' .. voucherId-- 3.脚本业务
-- 3.1 判断库存是否充足
local stock = redis.call('get', stockKey)
if stock and tonumber(stock) <= 0 then-- 3.2 库存不足 返回1return 1
end
-- 3.2 判断用户是否下单
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3 存在,说明是重复下单,返回2return 2
end
-- 3.4 扣库存
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户)
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,XDD stream.orders * k1 v1 k2 v2...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)return 0

秒杀的逻辑修改(修改之前的阻塞队列方法)

@Override
public Result seckillVoucher(Long voucherId) {// 获取用户idLong userId = UserHolder.getUser().getId();// 订单id(生成唯一ID)long orderId = redisIdWorker.nextId("order");//1. 执行lua脚本(判断购买资格,发送订单信息到消息队列)Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),// key参数为0,所以参数传空集合voucherId.toString(),userId.toString(),String.valueOf(orderId));//2. 判断是否为0int i = result.intValue();if(i != 0) {//2.1 不为0,没有购买资格return Result.fail(i == 1 ? "库存不足" : "不能重复下单");}//3. 获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//4. 返回订单idreturn Result.ok(0);
}

开启线程任务

/*** 异步线程,从消息队列中取出订单信息,执行保存订单到数据库*/
private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while (true){try {// 1. 获取消息队列中的订单信息// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2. 判断消息获取是否成功if(list == null || list.isEmpty()) {// 2.1 如果获取失败,说明没有消息,继续下一次循环continue;}// 3. 解析消息中的订单信息,键值类型参考脚本中的'userId', userId, 'voucherId', voucherId, 'id', orderId// 前者String指的是消息ID, <Object, Object>指的是上述键值对MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();// 3.1 转为voucherOrder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 4. 如果有,获取成功,可以下单handleVoucherOrder(voucherOrder);// 5. ACK确认// SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("订单异常信息",e);handlePendingList();}}}/*** 订单异常消息。采用0的方法去处理pending-list中的消息,进行ACK确认*/private void handlePendingList() {while (true){try {// 1. 获取消息队列中的订单信息// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 2. 判断消息获取是否成功if(list == null || list.isEmpty()) {// 2.1 如果获取失败,说明pending-list没有异常消息,继续下一次循环break;}// 3. 解析消息中的订单信息,键值类型参考脚本中的'userId', userId, 'voucherId', voucherId, 'id', orderId// 前者String指的是消息ID, <Object, Object>指的是上述键值对MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();// 3.1 转为voucherOrder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 4. 如果有,获取成功,可以下单handleVoucherOrder(voucherOrder);// 5. ACK确认// SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("订单异常信息",e);try {Thread.sleep(20);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/3057.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于GPT、AI绘画、AI提词器等AI技术的探讨

目前的AI潮流非常火热&#xff0c;CHATGPT可谓是目前大模型人工智能的代表&#xff0c;刚开始听说chatGPT可以写代码&#xff0c;写作&#xff0c;写方案&#xff0c;无所不能。还有AI绘画也很&#xff2e;&#xff22;作为一个程序员&#xff0c;为了体验这些&#xff21;&…

基于深度学习的高精度线路板瑕疵目标检测系统(PyTorch+Pyside6+YOLOv5模型)

摘要&#xff1a;基于深度学习的高精度线路板瑕疵目标检测系统可用于日常生活中来检测与定位线路板瑕疵目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的线路板瑕疵目标检测识别&#xff0c;另外支持结果可视化与图片或视频检测结果的导出。本系统采用YOLOv5…

物理机传输大文件到虚拟机

物理机快速传输大文件到虚拟机 测试使用Tabby传输大文件到虚拟机 1.1 准备大文件 1.2 通过Tabby上传文件到Linux 总耗时约&#xff1a;7分钟 1.3 通过EveryThing配置服务 打开EveryThing&#xff0c;点击工具—> 选项—>http服务器 启用HTTP服务器&#xff0c;配置…

Hyper-V安装Ubuntu-18.04

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、准备工作&#xff1f;二、下载指定的Ubuntu ISO镜像三、开始配置1.点击快速创建2.选择安装源 四、开始安装五、配置启动项总结 前言 最近有个很扯淡的问题…

如何快速入门C#编程?

学习C#需要持续努力和实践&#xff0c;但是在一周内入门是有可能的&#xff0c;前提是你愿意付出足够的时间和精力。以下是一周内入门C#的步骤和建议&#xff1a; 设定学习目标&#xff1a; 在一周内学习C#需要专注于基础知识。明确你的学习目标&#xff0c;例如了解语法、变量…

基于Java+SpringBoot+Vue+Uniapp前后端分离考试学习一体机设计与实现(视频讲解,已发布上线)

博主介绍&#xff1a;✌全网粉丝3W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

《面试1v1》Kafka基础

&#x1f345; 作者简介&#xff1a;王哥&#xff0c;CSDN2022博客总榜Top100&#x1f3c6;、博客专家&#x1f4aa; &#x1f345; 技术交流&#xff1a;定期更新Java硬核干货&#xff0c;不定期送书活动 &#x1f345; 王哥多年工作总结&#xff1a;Java学习路线总结&#xf…

React和Vue生命周期、渲染顺序

主要就是命名不同 目录 React 组件挂载 挂载前constructor() 挂载时render() 挂载后componentDidMount()&#xff1a;初始化节点 更新 更新时render()&#xff1a;prop/state改变 更新后componentDidUpdate() 卸载 卸载前componentWillUnmount()&#xff1a;清理 V…

基于Redisson的Redis结合布隆过滤器使用

一、场景 缓存穿透问题 一般情况下&#xff0c;先查询Redis缓存&#xff0c;如果Redis中没有&#xff0c;再查询MySQL。当某一时刻访问redis的大量key都在redis中不存在时&#xff0c;所有查询都要访问数据库&#xff0c;造成数据库压力顿时上升&#xff0c;这就是缓存穿透。…

【已解决】ModuleNotFoundError: No module named ‘timm.models.layers.helpers‘

文章目录 错误信息原因解决方法专栏&#xff1a;神经网络精讲与实战AlexNetVGGNetGoogLeNetInception V2——V4ResNetDenseNet 错误信息 在使用timm库的时候出现了ModuleNotFoundError: No module named timm.models.layers.helpers’的错误&#xff0c;详情如下&#xff1a; …

CANoe如何配置Master/Slave模式

系列文章目录 文章目录 系列文章目录前言一、CANoe配置端口二、CANoe配置Master模式三、CANoe配置Slave模式前言 随着智能电动汽车的行业的发展,车载以太网的应用越来越广泛,最近很多朋友在问CANoe Master/Slave模式如何设置,车载以太网物理层也有一项是测试Master/Slave模式…

云曦暑期学习第一周——sql注入

1浅谈sql注入 1.1sql注入 sql注入是指web应用程序对用户输入数据的合法性没有判断&#xff0c;前端传入后端的参数是攻击者可控的&#xff0c;并且参数带入数据库查询&#xff0c;攻击者可以通过构造不同的sql语句来实现对数据库的任意操作 1.2原理 条件&#xff1a; 1.参…

C# 同构字符串

205 同构字符串 给定两个字符串 s 和 t &#xff0c;判断它们是否是同构的。 如果 s 中的字符可以按某种映射关系替换得到 t &#xff0c;那么这两个字符串是同构的。 每个出现的字符都应当映射到另一个字符&#xff0c;同时不改变字符的顺序。不同字符不能映射到同一个字符…

GO 语言GC

目录 写屏障 读屏障 GO语言GC准备 堆内存结构: GC内存分配: GC触发&#xff1a; P的作用: 写屏障 实现强弱三色不式,为了避免误删,则实现写屏障. 写屏障是在写操作中插入指令,目的是把数据对象的修改通知到GC GO语言支持两种写屏障 读屏障 非移动垃圾回收(例如 三色)天…

职责链模式:如何实现可灵活扩展算法的敏感信息过滤框架?

今天&#xff0c;我们主要讲解职责链模式的原理和实现。除此之外&#xff0c;我还会利用职责链模式&#xff0c;带你实现一个可以灵活扩展算法的敏感词过滤框架。下一节课&#xff0c;我们会更加贴近实战&#xff0c;通过剖析Servlet Filter、Spring Interceptor来看&#xff0…

对链表进行插入排序

给定单个链表的头 head &#xff0c;使用 插入排序 对链表进行排序&#xff0c;并返回 排序后链表的头 。 插入排序 算法的步骤: 插入排序是迭代的&#xff0c;每次只移动一个元素&#xff0c;直到所有元素可以形成一个有序的输出列表。 每次迭代中&#xff0c;插入排序只从输…

9、PHP超级全局变量$_REQUEST 、$_POST、$_GET

1、PHP $_REQUEST 、$_POST用于收集HTML表单提交的数据。 以下代码演示了一个输入字段&#xff08;input&#xff09;及提交按钮(submit)的表单(form)。 当用户通过点击 "Submit" 按钮提交表单数据时, 表单数据将发送至<form>标签中 action 属性中指定的脚本文…

Word 常用操作总结

文章目录 【 1. 公式篇 】1.1 编号右居中自动编号1.2 多行公式对齐编号右靠下编号右居中 1.3 公式引用1.4 更新编号1.5 Mathtype公式编辑器自动编号右居中多行公式换行以及等号对齐更新编号 【 1. 公式篇 】 简述&#xff1a;通过“#换行”的方式使编号右对齐&#xff0c;通过…

01. Docker基础环境构建

目录 1、前言 2、关于Docker 2.1、几个术语 2.2、Docker容器化的价值 3、搭建基础环境 3.1、安装VMware 3.2、安装Doker 3.3、启动 3.4、验证Docker环境 4、小结 1、前言 在这里我们将学习关于Docker的一些技能知识&#xff0c;那么首先我们应该怼Docker有一个基础的…

centos7.6下安装mysql

1.下载yum源&#xff1a; wget https://dev.mysql.com/get/mysql80-community-release-el7-5.noarch.rpm2.执行安装&#xff1a; rpm -ivh mysql80-community-release-el7-5.noarch.rpm3.开始安装 yum install -y mysql-server4.启动mysql服务 systemctl start mysqld5.查看…