Redis - 消息队列 Stream

一、概述

消息队列

  1. 定义
    1. 消息队列模型:一种分布式系统中的消息传递方案,由消息队列、生产者和消费者组成
    2. 消息队列:负责存储和管理消息的中间件,也称为消息代理(Message Broker)
    3. 生产者:负责 产生并发送 消息到队列的应用程序
    4. 消费者:负责从队列 获取并处理 消息的应用程序
  2. 功能:实现消息发送和处理的解耦,支持异步通信,提高系统的可扩展性和可靠性
  3. 主流消息队列解决方案
    1. RabbitMQ:轻量级,支持多种协议,适合中小规模应用
    2. RocketMQ:阿里开源,高性能,适合大规模分布式应用

Stream

  1. 定义:Stream:Redis 5.0 引入的一种数据类型,用于处理高吞吐量的消息流、事件流等场景
  2. 功能:按时间顺序 ”添加、读取、消费“ 消息,支持消费者组、消息确认等功能

二、Stream 工作流程

  1. 写入消息
    1. 生产者通过 XADD 向 Stream 中添加消息。每条消息自动获得唯一的 ID,按时间顺序存入 Stream。
  2. 创建消费者组
    1. 如果使用消费者组,首先需要通过 XGROUP CREATE 创建消费者组。
    2. 消费者组会根据时间顺序将消息分配给组内的消费者。
  3. 读取消息
    1. 消费者使用 XREADGROUP 命令读取 Stream 中的消息。
    2. 消息按规则分配给不同消费者处理,每个消费者读取到不同的消息。
  4. 确认消息
    1. 消费者在处理完消息后,使用 XACK 命令确认消息,表示该消息已成功处理。
    2. 如果消息未确认(例如消费者崩溃或超时),它将保持在 Pending 状态,等待重新分配给其他消费者。
  5. 重新分配未确认消息
    1. 如果消息在一定时间内没有被确认,其他消费者可以读取未确认的消息并进行处理。
    2. 可通过 XPENDING 命令查看未确认消息,或在消费者组中设置时间阈值自动重新分配。
  6. 删除消费者组
    1. 不再需要消费者组时,使用 XGROUP DESTROY 命令删除消费者组

三、Stream 实现

消费者组模式

  1. 定义:Redis Streams 的一部分,用于处理消息的分布式消费
  2. 优点
    1. 消息分流:多消费者争抢消息,加快消费速度,避免消息堆积
    2. 消息标示:避免消息漏读,消费者读取消息后不马上销毁,加入 consumerGroup 维护的 pending list 队列等待 ACK
    3. 消息确认:通过消息 ACK 机制,保证消息至少被消费一次
    4. 可以阻塞读取,避免盲等
  3. 实现方法 :通过 Stream 数据类型实现消息队列,命令以 “X” 开头

常用命令

XGROUP CREATE key groupName ID [MKSTREAM]

  1. 功能:创建消费者组
  2. 参数
    1. key:队列名称
    2. groupName:组名称
    3. ID:起始 ID 标识,$ 表示队列中最后一个消息,0 表示队列中第一个消息
    4. MKSTREAM:队列不存在则创建队列

XGROUP DESTORY key groupName

  1. 功能:删除指定消费者组

XGROUP CREATECONSUMER key groupName consumerName

  1. 功能:添加组中消费者

XGROUP DELCONSUMER key groupName consumerName

  1. 功能:删除组中消费者

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

  1. 功能:读取组中的消息
  2. gourp:消费者组名称
  3. consumer:消费者名称(不存在则自动创建)
  4. count:本次查询的最大数量
  5. BLOCK milliseconds:当没有消息时最长等待时间
  6. NOACK:无需手动 ACK,获取到消息后自动确认
  7. STREAMS KEY:指定队列名称
  8. ID:获取消息的起始 ID,> 表示从下一个未消费消息开始 (常用)

XPENDING key group [ [ IDLE min-idle-time ] start end count [consumer] ]

  1. 功能:获取 pending-list 中的消息
  2. IDLE:获取消息后、确认消息前的这段时间,空闲时间超过 min-idle-time 则取出
  3. start:获取的最小目标 ID
  4. end:获取的最大目标 ID
  5. count:获取的数量
  6. consumer:获取 consumer 的 pending-list

XACK key group ID [ ID … ]

  1. 功能:确认从组中读取的消息已被处理
  2. key:队列名称
  3. group:组名称
  4. ID:消息的 ID

表格版命令

  1. 命令

    命令功能
    XGROUP CREATE key groupName ID [MKSTREAM]创建消费者组
    XGROUP DESTORY key groupName删除指定消费者组
    XGROUP CREATECONSUMER key groupName consumerName添加组中消费者
    XGROUP DELCONSUMER key groupName consumerName删除组中消费者
    XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]读取组中的消息,ID 填写 “>” 则读取第一条未读消息
    XACK key group ID [ ID … ]确认从组中读取的消息已被处理
  2. 属性

    属性名定义
    key队列名称
    groupName消费者组名称
    ID起始 ID 标示,$ 代表队列中最后一个消息,0 代表第一个消息
    MKSTREAM队列不存在时自动创建队列
    BLOCK milliseconds没有消息时的最大等待时长
    NOACK无需手动 ACK,获取到消息后自动确认
    STREAMS key指定队列名称

运行逻辑

while(true) {// 尝试监听队列,使用阻塞模式,最长等待 2000 msObject msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 2000 STREAMS s1 >");if(msg == null) {continue;}try {// 处理消息,完成后一定要 ACKhandleMessage(msg);} catch (Exception e) {while(true) {// 重新读取阻塞队列消息Object msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 STREAM S1 0");if(msg == null)               // 如果阻塞队中的消息已经全部处理则退出pending-listbreak;try {handleMessage(msg);    			// 重新处理 pending-list 中的消息} catch (Exception e){continue;                   // 如果还出错, 则继续重新读取}}}
}

四、示例

  1. 目标:消息队列实现数据库异步修改数据库,将下单 message 缓存在 redis 中,减小下单操作对数据库的冲击

  2. 项目结构

    1. RedisConfig 配置类:创建消费者组是一次性的操作,适合放在配置类中
    2. VoucherOrderHandler 内部类:消费者的逻辑和订单业务相关,因此适合放在 VoucherOrderServiceImpl 中
    3. 多线程启动逻辑:消费者线程的启动与订单业务密切相关,直接放在 VoucherOrderServiceImpl 类中更符合职责分离原则
    src/main/java
    ├── com/example
    │   ├── config
    │   │   └── RedisConfig.java                  // Redis 配置类,包含消费者组初始化
    │   ├── service
    │   │   ├── VoucherOrderService.java
    │   │   └── impl
    │   │       └── VoucherOrderServiceImpl.java  // 包含 VoucherOrderHandler 内部类
    │   ├── entity
    │   │   └── VoucherOrder.java                 // 优惠券订单实体
    │   ├── utils
    │   │   └── BeanUtil.java                     // 用于 Map 转 Bean 的工具类
    │   └── controller
    │       └── VoucherOrderController.java       // 如果有 Controller
    
  3. 创建消费者组(config.RedisConfig)

    @Bean
    public void initStreamGroup() {// 检查是否存在消费者组 g1try {stringRedisTemplate.opsForStream().createGroup("stream.orders", "g1");} catch (RedisSystemException e) {// 如果 group 已存在,抛出异常,可忽略log.warn("消费者组 g1 已存在");}
    }
    
  4. 创建消费者线程

    1. 位置:作为 VoucherOrderServiceImpl 内的预构造部分
    @PostConstruct
    public void startConsumers() {for (int i = 0; i < 5; i++) { // 5 个线程,模拟多个消费者new Thread(new VoucherOrderHandler()).start();}
    }
    
  5. 添加消息到消息队列 (src/main/resources/lua/SECKILL_SCRIPT.lua)

    --1. 参数列表
    --1.1. 优惠券id
    local voucherId = ARGV[1]
    --1.2. 用户id
    local userId = ARGV[2]
    --1.3. 订单id
    local orderId = ARGV[3]--2. 数据key
    local stockKey = 'seckill:stock:' .. voucherId          --2.1. 库存key
    local orderKey = 'seckill:order' .. voucherId           --2.2. 订单key--3. 脚本业务
    --3.1. 判断库存是否充足 get stockKey
    if( tonumber( redis.call('GET', stockKey) ) <= 0 ) thenreturn 1
    end
    --3.2. 判断用户是否重复下单 SISMEMBER orderKey userId
    if( redis.call( 'SISMEMBER', orderKey, userId ) == 1 ) thenreturn 2
    end
    --3.4 扣库存 incrby stockKey -1
    redis.call( 'INCRBY', stockKey, -1 )
    --3.5 下单(保存用户) sadd orderKey userId
    redis.call( 'SADD', orderKey, userId )
    -- 3.6. 发送消息到队列中
    redis.call( 'XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
    
  6. 创建消费者类(ServiceImpl)

    1. 位置:作为 VoucherOrderServiceImpl 内的私有类
    // 在ServiceImpl中创建一个VoucherOrderHandler消费者类,专门用于处理消息队列中的消息
    private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1. 获取消息队列中的订单信息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create( "stream.order", ReadOffset.lastConsumed()));// 2. 没有消息则重新监听if (list == null || list.isEmpty() ) continue;// 3. 获取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 创建订单createVoucherOrder(voucherOrder);// 5. 确认当前消息已消费 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch ( Exception e) {log.error("处理订单异常", e);// 6. 处理订单失败则消息会加入pending-list,继续处理pending-listhandlePendingList();}}}// 处理pending-list中的消息private void handlePendingList() {while(true) {try {// 1. 消费pending-list中的消息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),                                   // 消费者此消息的消费者StreamReadOptions.empty().count(1),                          // StreamOffset.create("stream.order", ReadOffset.from("0"))     // 从pending-list的第一条消息开始读);// 2. 退出条件, list 为空 -> pending-list 已全部处理if(list == null || list.isEmpty()) break;// 3. 获取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 创建订单createVoucherOrder(voucherOrder);// 5. 确认消息已消费(XACK)stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理pendding订单异常", e);try{Thread.sleep(20);     // 如果发生异常则休眠一会再重新消费pending-list中的消息} catch (Exception e2) {e.printStackTrace(); }}}}
    }
    
  7. 创建消息方法

    1. 目标:用户通过这个方法发送一条创建订单的 Message 给 Redis Stream
    // 创建Lua脚本对象
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// Lua脚本初始化 (通过静态代码块)
    static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/SECKILL_SCRIPT.lua"));SECKILL_SCRIPT.setResultType(Long.class);
    }@Override
    public void createVoucherOrder(Long voucherId, Long userId) {// 生成订单 ID(模拟)long orderId = System.currentTimeMillis();// 执行 Lua 脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),                    // 使用空的 key 列表voucherId.toString(), userId.toString(), String.valueOf(orderId));// 根据 Lua 脚本返回结果处理if (result == 1) {throw new RuntimeException("库存不足!");} else if (result == 2) {throw new RuntimeException("不能重复下单!");}// 如果脚本执行成功,则订单消息会进入 Redis Stream,消费者组会自动处理System.out.println("订单创建成功!");
    }
    

(缺陷) 单消费者模式

  1. 常用命令
    1. XADD key [NOMKSTREAM] [MAXLEN | MINID [=|~] threshold [LIMIT count] * | ID field value [field value …]
    2. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ ID … ]
  2. 缺陷:有消息漏读风险

五、其他消息队列方案

(缺陷) List 实现

  1. 优点
    1. 不受 JVM 内存上限限制:因为利用 Redis 存储
    2. 数据安全 :因为基于 List 结构本身是数据存储,基于 Redis 持久化机制
    3. 消息有序性:通过 List 结构的 LPUSH & BRPOP 命令实现顺序
  2. 缺点
    1. 消息丢失:BRPOP 的时候如果宕机则消息会丢失
    2. 只支持单消费者

(缺陷) PubSub 实现

  1. 定义
    1. Publish & Subscribe 模型,一种消息队列模型
    2. 生产者向指定的 channel 来 public 消息
    3. 消费者从 subscribe 的 channel 中接收消息
  2. 功能:支持多消费者模式,多个消费者可以同时 subscribe 一个 channel
  3. 优点:采用发布订阅模型,支持多生产者、消费者
  4. 缺点
    1. 不支持数据持久化
    2. 无法避免消息丢失
    3. 消息堆积有上限,超出时数据丢失

三种消息队列对比


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/63983.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言数组和字符串笔记

C语言数组和字符串笔记 1. 数组及其相关概念 1.1 为什么需要使用数组&#xff1f; 数组是一个有序的、类型相同的数据集合。这些数据被称为数组的元素。每个数组都有一个名字&#xff0c;数组名代表数组的起始地址。数组的元素通过索引或下标访问&#xff0c;索引从0开始。 …

双目摄像头标定方法

打开matlab 找到这个标定 将双目左右目拍的图像上传&#xff08;左右目最好不少于20张&#xff09; 等待即可 此时已经完成标定&#xff0c;左下角为反投影误差&#xff0c;右边为外参可视化 把这些误差大的删除即可。 点击导出 此时回到主页面&#xff0c;即可看到成功导出 Ca…

数据结构开始——时间复杂度和空间复杂度知识点笔记总结

好了&#xff0c;经过了漫长的时间学习c语言语法知识&#xff0c;现在我们到了数据结构的学习。 首先&#xff0c;我们得思考一下 什么是数据结构&#xff1f; 数据结构(Data Structure)是计算机存储、组织数据的方式&#xff0c;指相互之间存在一种或多种特定关系的数据元素…

什么是MMD Maximum Mean Discrepancy 最大均值差异?

9多次在迁移学习看到了&#xff0c;居然还是Bernhard Schlkopf大佬的论文&#xff0c;仔细看看。 一.什么是MMD&#xff1f; 1. MMD要做什么&#xff1f; 判断两个样本&#xff08;族&#xff09;是不是来自于同一分布 2.怎么做&#xff1f;&#xff08;直观上&#xff09;…

电梯内电动车识别数据集,可准确识别电梯内是否有电动车 支持YOLO,COCO,VOC三种格式的标注 7111张图片

电梯内电动车识别数据集&#xff0c;可识别电梯内是否有电动车 支持YOLO&#xff0c;COCO&#xff0c;VOC三种格式的标注 7111张图片 7111总图像数 数据集分割 训练组 74&#xff05; 5291图片 有效集 16% 1168图片 测试集 9&#xff05; 652…

Collection接口

目录 一. Collection基本介绍 二. Collection中的方法及其使用 1. 添加元素 (1) 添加单个元素 (2) 添加另一集合中的所有元素 2. 删除元素 (1) 删除单个元素 (2) 删除某个集合中包含在其他集合中的元素 (3) 保留两个集合中的交集部分, 删除其他元素. 3. 遍历元素 (1) …

Mybatis Plus 3.0 快速入门

1、简介 MyBatis-Plus (简称 MP)是一个 MyBatis 的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开发、提高效率而生。 2、创建并初始化数据库 2.1、创建数据库 mybatis_plus 2.2、创建 User 表 其表结构如下: idnameageemail1Jone18test1@baomidou.com2Jack…

Verilog实现图像处理的行缓存Line Buffer

在图像处理中&#xff0c;难免会遇到对图像进行卷积或者模板的局部处理&#xff0c;例如ISP中的一些算法&#xff0c;很大部分都需要一个窗口&#xff0c;在实时视频处理中&#xff0c;可以利用行缓存Line buffer可以暂存几行数据&#xff0c;然后同时输出每行中的对应列的像素…

【银河麒麟高级服务器操作系统】有关dd及cp测试差异的现象分析详解

了解更多银河麒麟操作系统全新产品&#xff0c;请点击访问 麒麟软件产品专区&#xff1a;https://product.kylinos.cn 开发者专区&#xff1a;https://developer.kylinos.cn 文档中心&#xff1a;https://documentkylinos.cn dd现象 使用银河麒麟高级服务器操作系统执行两次…

ORACLE逗号分隔的字符串字段,关联表查询

使用场景如下&#xff1a; oracle12 以前的写法&#xff1a; selectt.pro_ids,wm_concat(t1.name) pro_names from info t,product t1 where instr(,||t.pro_ids|| ,,,|| t1.id|| ,) > 0 group by pro_ids oracle12 以后的写法&#xff1a; selectt.pro_ids,listagg(DIS…

记录2024-leetcode-字符串DP

10. 正则表达式匹配 - 力扣&#xff08;LeetCode&#xff09;

微信开发者工具(小程序)的版本管理,Git Push 和 Pull

微信开发者工具&#xff08;小程序&#xff09;的版本管理&#xff0c;Git Push 和 Pull 一、设置 第一次用微信开发者工具自带的版本管理的拉取和推送功能&#xff0c;稍稍的研究了下。 1、首先要先设置 “用户”&#xff0c;名字和邮箱&#xff0c;不一定要真名&#xff0c…

2020-12-07 光棍数

由光棍数的特征可推导其商的个位数不存在偶数且只有1、3、7、9这4个数。一个数可匹配多个光棍数且必定是中间隔着0的循环数。 void 光棍数(int n) {//缘由http://ask.csdn.net/questions/3444069 做乘法运行时间超长int w 0; long long x 111111111111111, j 0;//j x*n;/…

【Linux系统】—— 初识 shell 与 Linux 中的用户

【Linux系统】—— 初识shell 与 Linux 中的用户 1 Xshell 运行原理1.1 命令行的组成1.2 外壳程序 2 Linux中的用户2.1 两种用户2.2 创建普通用户2.3 用户切换2.3.1 普通->超级2.3.2 超级->普通 3 指令的短暂提权3.1 为什么要提权3.2 sudo 指令3.3 人人都能提权吗 1 Xshe…

.NET平台使用C#设置Excel单元格数值格式

设置Excel单元格的数字格式是创建、修改和格式化Excel文档的关键步骤之一&#xff0c;它不仅确保了数据的正确表示&#xff0c;还能够增强数据的可读性和专业性。正确的数字格式可以帮助用户更直观地理解数值的意义&#xff0c;减少误解&#xff0c;并且对于自动化报告生成、财…

Android显示系统(10)- SurfaceFlinger内部结构

一、前言: 之前讲述了native层如何使用SurfaceFlinger,我们只是看到了简单的API调用,从本文开始,我们逐步进行SurfaceFlinger内部结构的分析。话不多说,莱茨狗~ 二、类图: 2.1、总体架构: 先看下SurfaceFlinger的关键成员和我们BootAnimation侧关键成员如何对应起来…

深度学习中的多通道卷积与偏置过程详解

目录 ​编辑 多通道卷积的深入理解 &#x1f50d; 卷积核的多维特性 &#x1f30c; 卷积操作的细节 &#x1f527; 多通道卷积的优势 &#x1f31f; 偏置过程的深入理解 &#x1f3af; 偏置的两种实现方式 &#x1f6e0;️ 偏置的作用与重要性 &#x1f308; 多通道卷…

易语言鼠标轨迹算法(游戏防检测算法)

一.简介 鼠标轨迹算法是一种模拟人类鼠标操作的程序&#xff0c;它能够模拟出自然而真实的鼠标移动路径。 鼠标轨迹算法的底层实现采用C/C语言&#xff0c;原因在于C/C提供了高性能的执行能力和直接访问操作系统底层资源的能力。 鼠标轨迹算法具有以下优势&#xff1a; 模拟…

【蓝桥杯选拔赛真题93】Scratch青蛙过河 第十五届蓝桥杯scratch图形化编程 少儿编程创意编程选拔赛真题解析

目录 Scratch青蛙过河 一、题目要求 编程实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、python资料 Scratc…

手机实时提取SIM卡打电话的信令声音--社会价值(一、方案解决了什么问题)

手机实时提取SIM卡打电话的信令声音 --社会价值(一、方案解决了什么问题) 一、前言 这段时间&#xff0c;我们在技术范围之外陷入了一个自证或者说下定义的怪圈&#xff0c;即要怎么样去介绍或者描述&#xff1a;我们是一个什么样的产品。它在当前这个世界上&#xff0c;处于…