Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】

欢迎来到我的博客,代码的世界里,每一行都是一个故事


在这里插入图片描述

Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案

    • 引言
    • 前言
    • Redis Streams的基本概念和特性
      • 1. 日志数据结构
      • 2. 消息和字段
      • 3. 消费者组
      • 4. 消息ID
      • 5. 实时和历史数据处理
      • 6. 性能和可靠性
    • 实战
      • maven依赖
      • 配置StreamConfig(监听)
      • 配置生产者
      • 配置消费者(组)
      • 配置初始化方法
      • 实现效果
    • 基于List和专业消息队列对比
      • 相比于Redis List解决的痛点:
      • 相比于专业高级队列的不足:
    • 总结

引言

Redis Stream解密:探秘数据流处理的黑科技【一】

解锁Redis Stream新境界:高级用法大揭秘【二】

Redis List:打造高效消息队列的秘密武器【redis实战 一】

前言

在快节奏的技术世界中,消息队列是连接不同服务和组件的关键。而在这个领域,Redis Streams作为一种新兴的消息队列解决方案,以其高性能和易用性吸引了众多开发者的目光。当这项技术遇到了Spring Boot —— 当今最受欢迎的Java开发框架,它们的结合将如何开启新的可能性?让我们开始这趟探索之旅,深入了解如何将这两种强大的技术融合在一起,打造出优雅而强大的消息队列系统。

Redis Streams的基本概念和特性

Redis Streams是Redis数据库的一个强大类型,于Redis 5.0中引入。它主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。以下是Redis Streams的一些基本概念和核心特性:

1. 日志数据结构

  • 持久化的消息日志:Redis Streams是一个按时间排序的消息日志。每条消息都存储在它被插入时的顺序位置,并且有一个唯一的ID标识。
  • 可追溯性:由于其日志特性,Redis Streams允许你访问历史消息,这对于消息的追溯、重放和延迟处理非常有用。

2. 消息和字段

  • 消息结构:每条消息都可以包含一个或多个字段(field)和值(value)。这类似于一个小的哈希结构,使得每条消息可以携带多个相关的数据点。
  • 灵活的数据模型:你可以根据应用的需要自由定义每条消息包含的字段和数据格式。

3. 消费者组

  • 支持多消费者:Redis Streams可以被多个消费者或多个消费者组同时读取,每个消费者组都会跟踪其成员的进度。
  • 消息确认:消费者读取并处理消息后,可以发送确认,表示消息已被处理。未确认的消息可以被再次处理,确保消息不会因消费者失败而丢失。
  • 故障处理:支持挂起的消息列表和消费者超时检测,使得在消费者失败时可以由其他消费者接手处理消息。

4. 消息ID

  • 自动生成或指定:消息ID通常由Redis自动生成,保证了全局唯一性和顺序性。你也可以手动指定ID以实现更复杂的场景。
  • 组成结构:ID由一个时间戳部分和一个序列号部分组成,格式为<时间戳>-<序列号>

5. 实时和历史数据处理

  • 实时消息处理:通过XREADXREADGROUP命令,你可以实时监听并处理新添加到流中的消息。
  • 历史消息查询:通过XRANGEXREVRANGE等命令,可以查询流中的历史消息,这对于数据分析、审计和消息重放非常有用。

6. 性能和可靠性

  • 高性能:Redis Streams设计用于处理高吞吐量的消息,能够支持每秒数百万消息的读写。
  • 持久化:与Redis的其他数据类型一样,Streams的数据可以持久化到磁盘,保证了数据的持久性和可靠性。

实战

maven依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>

配置StreamConfig(监听)

package fun.bo.config;import fun.bo.consumer.MessageConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;import java.time.Duration;/*** @author xiaobo*/
@Configuration
public class StreamConfig {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {// 用于配置消息监听容器的选项。在这个方法中,通过设置不同的选项,如轮询超时时间和消息的目标类型,可以对消息监听容器进行个性化的配置。StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 设置了轮询超时的时间为100毫秒。这意味着当没有新的消息时,容器将每隔100毫秒进行一次轮询。.pollTimeout(Duration.ofMillis(100))// 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型,以便在后续的处理中使用。.targetType(String.class).build();// 创建一个可用于监听Redis流的消息监听容器。StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer =StreamMessageListenerContainer.create(connectionFactory, options);// 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量,以确定从哪里开始读取消息。listenerContainer.receive(Consumer.from("your-consumer-group", "your-consumer-name"),StreamOffset.create("your-stream-name", ReadOffset.lastConsumed()), messageConsumer);// 方法启动了消息监听容器,使其开始监听消息。一旦容器被启动,它将开始接收并处理来自Redis流的消息。listenerContainer.start();return listenerContainer;}
}

配置生产者

package fun.bo.produce;import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;/*** @author xiaobo*/
@Service
@RequiredArgsConstructor
public class MessageProducer {private final RedisTemplate<String, String> redisTemplate;public void sendMessage(String streamKey, String messageKey, String message) {Map<String, String> messageMap = new HashMap<>();messageMap.put(messageKey, message);RecordId recordId = redisTemplate.opsForStream().add(streamKey, messageMap);if (recordId != null) {System.out.println("Message sent to Stream '" + streamKey + "' with RecordId: " + recordId);}}
}

配置消费者(组)

package fun.bo.consumer;import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;/*** @author xiaobo*/
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {@Overridepublic void onMessage(ObjectRecord<String, String> message) {String stream = message.getStream();String messageId = message.getId().toString();String messageBody = message.getValue();System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);System.out.println("Message body: " + messageBody);}
}

配置初始化方法

如果是已经存在stream,则可以不配置,这个主要是为了防止启动报错,org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key ‘your-stream-name’ or consumer group ‘your-consumer-group’ in XREADGROUP with GROUP option

public void initializeStream() {StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();// 创建一个流try {streamOperations.createGroup("your-stream-name", ReadOffset.from("0"), "your-consumer-group");} catch (Exception e) {// 流可能已存在,忽略异常}
}

实现效果

在这里插入图片描述

基于List和专业消息队列对比

Redis Streams作为消息队列相比于使用传统的Redis List类型,引入了一系列改进和新功能,同时也与专业的高级消息队列系统(如RabbitMQ、Kafka等)相比存在一些差距。以下是详细的分析:

相比于Redis List解决的痛点:

  1. 更好的消息顺序保证

    • List:虽然List可以保持插入顺序,但在高并发情况下,确保生产者和消费者的顺序一致性较为复杂。
    • Streams:提供了全局唯一的、基于时间的ID来标识消息,确保了消息的全局顺序。
  2. 消费者组支持

    • List:原生List类型不支持消费者组的概念,实现多消费者协调处理同一任务队列较为复杂。
    • Streams:原生支持消费者组,允许多个消费者共享负载,并跟踪各自的进度。
  3. 消息持久化和读取

    • List:读取或消费消息后,需要显式删除,否则会一直保留在List中,处理大量消息时可能会导致内存问题。
    • Streams:消息即使被消费,仍然保留在Stream中,可以随时查询历史消息,且不会因消费而被移除。
  4. 复杂的读取操作

    • List:List提供的操作相对简单,复杂的读取逻辑(如按时间范围查询)需要额外的逻辑来实现。
    • Streams:提供了复杂的查询命令,如XRANGEXREVRANGE,可以按ID范围(时间范围)查询消息。
  5. 消息确认和重试

    • List:需要手动实现消息确认和重试机制,管理起来较为复杂。
    • Streams:提供了消息确认(XACK)和挂起消息查询(XPENDING)的功能,使得消息的重试和故障处理更加容易。

相比于专业高级队列的不足:

  1. 事务和消息持久性保证

    • Redis Streams:虽然提供持久化,但在处理复杂事务和确保消息持久性方面不如一些专业的消息队列系统(如Kafka的WAL日志)。
  2. 集群和分区

    • Redis Streams:在集群环境下使用稍显复杂,且对于数据分区和扩展性的支持不如专业的消息队列系统(如Kafka的分区机制)。
  3. 管理和监控工具

    • Redis Streams:虽然有基本的监控命令,但没有一些高级消息队列系统提供的丰富的管理界面和监控工具。
  4. 高级消息路由和过滤

    • Redis Streams:缺乏一些高级消息队列提供的消息路由和过滤功能(如RabbitMQ的Exchange和Binding)。
  5. 消息传递语义

    • Redis Streams:提供了基础的至少一次处理语义,但可能不像某些系统那样支持严格的只处理一次语义。

总结

Redis Streams提供了一个轻量级、高性能且功能丰富的消息队列实现,解决了使用List作为队列时的许多痛点,特别适合需要快速部署、低延迟和简单可靠的场景。然而,对于需要复杂事务处理、高级路由和过滤、或更丰富管理工具的复杂应用场景,专业的消息队列系统可能更加适合。选择哪种方案,应根据你的具体需求、资源和技术栈来决定。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/580108.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7.3 uvm_config_db in UVM

uvm_config_db类派生自uvm_resource_db类。它是uvm_resource_db顶部的另一层便利层&#xff0c;简化了用于uvm_component实例的基本接口&#xff08;资源库的访问方法&#xff09;。 下面uvm_config_db类的代码段取自uvm源代码。 class uvm_config_db#(type Tint) extends uv…

html之为什么使用表单,常用表单元素使用?

文章目录 一、为什么使用表单呢&#xff1f;二、常用表单元素使用三、总结 一、为什么使用表单呢&#xff1f; 为什么使用表单呢&#xff0c;使用表单是为了更好的收集用户数据&#xff0c;并且安全 二、常用表单元素使用 1、password密码框 密码框&#xff1a;会隐藏数据&a…

网络摄像头爆破实战

*** 重要说明&#xff1a;仅用于交流网络安全测试技术&#xff0c;并唤起大家对网络安全的重视&#xff0c;如用本文的技术干违法的事情&#xff0c;博主概不负责。*** 文章目录 前言1. 发现摄像头2. 发现端口3. 确定品牌信息4. 确定RTSP地址5. 获取视频流6. 获取密码7. 再次获…

flutter学习-day20-使用SafeArea组件处理各机型的安全距离

&#x1f4da; 目录 介绍分析示例和效果图特殊情况 1. 介绍 安全区域&#xff0c;指的是移动端设备的可视窗口范围。处于安全区域的内容不受圆角、刘海屏、iPhone 小黑条、状态栏等的影响&#xff0c;也就是说&#xff0c;我们要做好适配&#xff0c;必须保证页面可视、可操作…

【资源】stable diffusion常用checkpoint

翻墙下载实在太慢了&#xff0c;还不稳定&#xff0c;就把常用的一些checkpoint传网盘了&#xff0c;需要自取~ clip-vit-large-patch14 脸书&#xff1a;openai/clip-vit-large-patch14 Hugging Face 链接&#xff1a;https://pan.baidu.com/s/1dg3XQmcYMoHtNKLqlrBVzQ?p…

亚马逊鲲鹏系统全自动化操作注册下单更快捷

亚马逊鲲鹏系统的强大崛起&#xff0c;让买家号的注册、养号、下单留评等繁琐任务迎来了一场全新的自动化革命。这一创新性软件系统的横空出世&#xff0c;为广大亚马逊卖家提供了一种高效、智能的解决方案&#xff0c;成功摆脱了繁重的手动操作。 在这一系统中&#xff0c;买家…

安卓恢复指南:五种安卓数据恢复软件推荐

我们的手机随身携带。我们抓住他们快速拍照、发送消息并保持娱乐。有时我们对它们过于冒险&#xff0c;将它们扔在混凝土或水中&#xff0c;安装我们不应该安装的软件&#xff0c;然后将它们留在电影中或公园的长椅上。 如果您要在任何地方丢失重要数据&#xff0c;很可能是在…

【Qt-数据库】

Qt编程指南 ■ SQLite■ CSV■ JSON ■ SQLite Qt 提供了很多操作数据库的类&#xff0c; SQLite 是非常小的&#xff0c;是轻量级的&#xff0c;完全配置时小于 400KiB&#xff0c;省略可选功能配置时小于 250KiB。 SQLite 是一个进程内的库&#xff0c;实现了自给自足的、无…

Cesium问题汇总

引入图片报错&#xff0c;Error loading image for billboard: [object Event] 解决方法&#xff1a;可以import或require引入一下再应用

C# WPF上位机开发(扩展上位机之外的技能)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 如果把c# wpf只是看成是一个做界面的框架&#xff0c;那确实有点狭隘了。单独的上位机软件&#xff0c;如果不需要上下游的支持&#xff0c;没有与…

hive中struct相关函数总结

目录 hive官方函数解释示例实战 hive官方函数解释 hive官网函数大全地址&#xff1a;添加链接描述 Return TypeNameDescriptionstructstruct(val1, val2, val3, …)Creates a struct with the given field values. Struct field names will be col1, col2, …structnamed_str…

linux中top参数详解

top命令是Linux下常用的性能分析工具&#xff0c;能够实时显示系统中各个进程的资源占用状况&#xff0c;类似于Windows的任务管理器 top参数详解 第一行&#xff0c;任务队列信息&#xff0c;同 uptime 命令的执行结果 系统时间&#xff1a;07:27:05 运行时间&#xff1a;up …

Oracle查询重复数据取第二行,好用来删除重复数据

Oracle查询重复数据取第二行&#xff0c;好用来删除重复数据 SELECT * FROM ( SELECT e.* , ROW_NUMBER() over(PARTITION BY product_category_id,model_size_id ORDER BY product_category_id,model_size_id) rn FROM equ_check_rules e ) s WHERE rn 2;

微信小程序的bindtap和catchtap的区别

一. 事件 1.事件是视图层到逻辑层的通讯方式。 2. 事件可以将用户的行为反馈到逻辑层进行处理。 3. 事件可以绑定在组件上&#xff0c;当达到触发事件&#xff0c;就会执行逻辑层中对应的事件处理函数。 二. 如何使用事件 1. 简单来说就是将事件绑定到组件上面&#xff0c;bi…

Plantuml之序列图语法介绍(十七)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

阿里云服务器华北3(张家口)暂时无法办理经营性ICP许可证

阿里云服务器的华北 3&#xff08;张家口&#xff09;地域暂时无法办理经营性ICP许可证&#xff0c;如有经营性ICP业务请勿选择此地域。如果需要办理经营性ICP业务的用户&#xff0c;不需要选择华北3&#xff08;张家口&#xff09;地域&#xff0c;可以选择华北2&#xff08;北…

springboot对接WebSocket实现消息推送

1.修改pom文件 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency> 2.增加配置WebSocketConfig.java import org.springframework.context.annotation.Bean…

实训4---硬件部分---点灯实验--按键控制灯实验--uart串口实验

目录 三、硬件部分 【1】点灯实验 【2】按键控制灯实验 【3】uart串口实验 核心代码&#xff1a; 实验视频 实现流水灯 uart串口实验 三、硬件部分 GPIO 【1】点灯实验 1.首先找到要点的灯&#xff0c;在板子上看到对应的白色丝印&#xff0c;比如绿灯D10.然后打开底板…

服务器数据恢复-raid6离线磁盘强制上线后分区打不开的数据恢复案例

服务器数据恢复环境&#xff1a; 服务器上有一组由12块硬盘组建的raid6磁盘阵列&#xff0c;raid6阵列上层有一个lun&#xff0c;映射到WINDOWS系统上使用&#xff0c;WINDOWS系统划分了一个GPT分区。 服务器故障&分析&#xff1a; 服务器在运行过程中突然无法访问。对服务…

【开题报告】基于SpringBoot的篮球社团管理系统设计与实现

1.研究背景 基于Spring Boot的篮球社团管理系统设计与实现的选题背景可以从以下几个方面展开&#xff1a; 社团管理需求&#xff1a; 在大学校园和社区中&#xff0c;篮球社团是一个非常活跃的组织形式&#xff0c;拥有众多会员和参与者。然而&#xff0c;传统的社团管理方式…