redis实现消息队列redis发布订阅redis监听key

文章目录

  • Redis消息队列实现异步秒杀
    • 1. jvm阻塞队列问题
    • 2. 什么是消息队列
    • 3. Redis实现消息队列
      • 1. 基于List结构模拟消息队列
        • 操作
        • 优缺点
      • 2. 基于PubSub发布订阅的消息队列
        • 操作
        • 优缺点
        • spring 结合redis的pubsub使用示例
          • 1. 引入依赖
          • 2. 配置文件
          • 3. RedisConfig
          • 4. CustomizeMessageListener
          • 5. RedisMessageReceiver
          • 6. 监听原理简析
          • 7. 监听redis的key
            • 修改redis.conf
            • KeyspaceEventMessageListener
            • KeyExpirationEventMessageListener
            • 修改RedisConfig
      • 3. 基于Stream的消息队列
        • 1. 单消费者
          • xadd
          • xread
          • 操作示例
          • XREAD命令特点
        • 2. 消费者组
          • 特点
          • 要点
          • 创建消费者组
          • 从消费者组读取消息
          • ==图示操作过程==
          • 消费者监听消息的基本思路
          • XREADGROUP命令特点

Redis消息队列实现异步秒杀

1. jvm阻塞队列问题

java使用阻塞队列实现异步秒杀存在问题:

  1. jvm内存限制问题:jvm内存不是无限的,在高并发的情况下,当有大量的订单需要创建时,就有可能超出jvm阻塞队列的上限。
  2. 数据安全问题:jvm的内存没有持久化机制,当服务重启或宕机时,阻塞队列中的订单都会丢失。或者,当我们从阻塞队列中拿到订单任务,但是尚未处理时,如果此时发生了异常,这个订单任务就没有机会处理了,也就丢失了。

2. 什么是消息队列

消息队列Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

在这里插入图片描述

(正常下单,我们需要将订单消息写入数据库。但由于秒杀并发访问量大,数据库本身并发处理能力不强,因此,在处理秒杀业务时,可以将部分业务在生产者这边做校验,然后将消息写入消息队列,而消费者处理该消息队列中的消息,从而实现双方解耦,更快的处理秒杀业务)

3. Redis实现消息队列

我们可以使用一些现成的mq,比如kafka,rabbitmq等等,但是呢,如果没有安装mq,我们也可以直接使用redis提供的mq方案,降低我们的部署和学习成本。Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

1. 基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果

在这里插入图片描述

操作

命令介绍如下

在这里插入图片描述

在这里插入图片描述

优缺点

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失(如果消费者获取消息后,然后立马就宕机了,这个消息就得不到处理,等同于丢失了)
  • 只支持单消费者(1个消息只能被1个消费者取走,其它消费者会收不到此消息)

2. 基于PubSub发布订阅的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PUBLISH channel msg :向一个频道发送消息
  • PSUBSCRIBE pattern [pattern] :订阅与pattern格式匹配的所有频道
    • ?匹配1个字符:h?llo subscribes to hello, hallo and hxllo
    • *匹配0个或多个字符:h*llo subscribes to hllo and heeeello
    • []指定字符:h[ae]llo subscribes to hello and hallo, but not hillo

在这里插入图片描述

操作

在这里插入图片描述

优缺点

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化(如果发送消息时,这个消息的频道没有被任何人订阅,那这个消息就丢失了,也消息就是不会被保存)
  • 无法避免消息丢失(发完了,没人收,直接就丢了)
  • 消息堆积有上限,超出时数据丢失(当我们发送消息时,如果有消费者在监听,消费者会有1个缓存区去缓存这个消息数据,如果消费者处理的慢,那么客户端的缓存区中的消息会不断堆积,而这个缓存区是有大小限制的,如果超出了就会丢失)
spring 结合redis的pubsub使用示例
1. 引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.zzhua</groupId><artifactId>demo-redis-pubsub</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 如果使用lettuce-core作为连接redis的实现, 不引入此依赖会报错: Caused by: java.lang.ClassNotFoundException:org.apache.commons.pool2.impl.GenericObjectPoolConfig --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2. 配置文件
spring:redis:host: 127.0.0.1port: 6379database: 0password:lettuce:pool:min-idle: 2max-active: 8max-idle: 8
3. RedisConfig

spring-data-redis提供了2种处理redis消息的方法:

  • 自己实现MessageListener接口

    public interface MessageListener {// 处理消息的方法// 第1个参数封装了: 消息发布到哪1个具体频道 和 消息的内容// 第2个参数封装了: //     1. 如果当前是通过普通模式去订阅的频道, 那么收到消息时该pattern就是消息发送的具体频道//     2. 如果当前是通过pattern通配符匹配去订阅的频道, 那么收到消息时, 该pattern就是订阅的频道void onMessage(Message message, @Nullable byte[] pattern);
    }
    
  • 指定MessageListenerAdapter适配器,该适配器指定特定对象的特定方法来处理消息(对特定的方法有参数方面的要求)

@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 监听order.q通道(不带通配符匹配channel)container.addMessageListener(customizeMessageListener, new ChannelTopic("order.q"));// 监听order.*通道(带通配符匹配channel)container.addMessageListener(listenerAdapter(), new PatternTopic("order.*"));return container;}@Beanpublic MessageListenerAdapter listenerAdapter() {// 交给receiver的receiveMessage方法, 对于这个方法的参数有如下要求:// (2个参数: 第一个参数是Object-即消息内容(默认由RedisSerializer#deserialize处理,见MessageListenerAdapter#onMessage), //           第二个参数是String-即订阅的通道, 详细看上面MessageListener接口中第二个参数的解释)// (1个参数: 参数是Object-即消息内容)return new MessageListenerAdapter(redisMessageReceiver, "receiveMessage");}}
4. CustomizeMessageListener
@Slf4j
@Component
public class CustomizeMessageListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("order.q - 消息订阅频道: {}", new String(channelBytes));log.info("order.q - 消息内容: {}", new String(bodyBytes));log.info("order.q - 监听频道: {}", new String(channelBytes));}
}
5. RedisMessageReceiver
@Slf4j
@Component
public class RedisMessageReceiver {public void receiveMessage(String msg, String topic) {log.info("order.* - 消息的订阅频道: {}", topic);log.info("order.* - 消息的内容: {}", msg);}}
6. 监听原理简析

spring-data-redis的lettuce-core是基于netty的,消息监听处理过程如下:
PubSubCommandHandler(netty中的ChannelHandler处理器)->PubSubEndpoint(根据消息类型调用LettuceMessageListener 的不同方法)->LettuceMessageListener -> RedisMessageListenerContainer$DispatchMessageListener(如果是pattern,则从patternMapping中获取所有的listener;如果不是pattern,则从channelMapping中获取所有的listener。至于怎么判断是不是pattern?)->使用异步线程池对上一步获取的所有listener执行onMessage方法

至于怎么判断是不是pattern?这个是根据订阅关系来的,如果订阅的是pattern,那么如果这个向这个pattern中发送了消息,那么就会收到1次消息,并且是pattern。如果订阅的是普通channel,那么如果向这个普通channel发送了消息,那么又会收到1次消息不是pattern。如果向1个channel中发送消息,这个channel既符合订阅的pattern,也符合订阅的普通channel,那么会收到2次消息,并且这2次消息1次是pattern,1次不是pattern的

7. 监听redis的key

既然已经说到了监听redis发布消息了,那么也补充一下监听redis的key过期。因为监听redis的key过期也是通过redis的发布订阅实现的。

修改redis.conf
############################# EVENT NOTIFICATION ############################### Redis能够将在keyspace中发生的事件通知给 发布/订阅 客户端# Redis can notify Pub/Sub clients about events happening in the key space. 
# This feature is documented at http://redis.io/topics/notifications# 例如:如果开启了keyspace事件通知(注意了,必须是开启了keyspace事件通知才可以,开启的方式就是添加参数K),
#      一个客户端在数据库0对一个叫'foo'的key执行了删除操作,
#      那么redis将会通过 发布订阅 机制发布2条消息 
#        PUBLISH __keyspace@0__:foo del 
#        PUBLISH __keyevent@0__:del foo# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo#  也可以指定一组 类名 来选择 Redis 会通知的一类事件。
#  每类事件 都通过一个字符定义# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:#        keySpace事件   以 __keyspace@<数据库序号>__ 为前缀 发布事件
#  K     Keyspace events, published with __keyspace@<db>__ prefix.  #        Keyevent事件   以 __keyevent@<数据库序号>__ 为前缀 发布事件
#  E     Keyevent events, published with __keyevent@<db>__ prefix.        #        执行常规命令,比如del、expire、rename           
#  g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...      #        执行 String 命令
#  $     String commands  #        执行 List   命令                                                          
#  l     List commands  #        执行 Set    命令                                                         
#  s     Set commands  #        执行 Hash   命令                                                         
#  h     Hash commands                                                          执行 Hash   命令#        执行 ZSet   命令
#  z     Sorted set commands #        key过期事件(每个key失效都会触发这类事件)                                                   
#  x     Expired events (events generated every time a key expires) #        key驱逐事件(当key在内存满了被清除时生成)
#  e     Evicted events (events generated when a key is evicted for maxmemory)  #        A是g$lshzxe的别名,因此AKE就意味着所有的事件
#  A     Alias for g$lshzxe, so that the "AKE" string means all the events.     
##  配置中的notify-keyspace-events这个参数由0个或多个字符组成,
#  如果配置为空字符串表示禁用通知
#  The "notify-keyspace-events" takes as argument a string that is composed     
#  of zero or multiple characters. The empty string means that notifications
#  are disabled.
##  比如,要开启list命令和generic常规命令的事件通知,
#  应该配置成 notify-keyspace-events Elg
#  Example: to enable list and generic events, from the point of view of the    
#           event name, use:
#
#  notify-keyspace-events Elg
#
#  比如,订阅了__keyevent@0__:expired频道的客户端要收到key失效的时间,
#  应该配置成 notify-keyspace-events Ex
#  Example 2: to get the stream of the expired keys subscribing to channel   name __keyevent@0__:expired use:
#
#  notify-keyspace-events Ex
##  默认情况下,所有的通知都被禁用了,并且这个特性有性能上的开销。
#  注意,K和E必须至少指定其中一个,否则,将收不到任何事件。
#  By default all notifications are disabled because most users don't need      
#  this feature and the feature has some overhead. Note that if you don't       
#  specify at least one of K or E, no events will be delivered.
notify-keyspace-events "Ex"############################### ADVANCED CONFIG ###############################
KeyspaceEventMessageListener
  1. 通过实现InitializingBean接口,在afterPropertiesSet方法中,调用初始化init方法,从redis中获取notify-keyspace-events配置项对应的值,如果未设置任何值,则改为EA,结合上面的redis.conf节选可知,表示的是开启所有的事件通知
  2. 使用redisMessageListenerContainer,通过pattern通配符匹配的方式订阅__keyevent@*频道
  3. 它是个抽象类,实现了MessageListener接口,处理消息的方法是个抽象方法
  4. 它有1个子类KeyExpirationEventMessageListener,订阅的pattern的频道是:__keyevent@*__:expired,通过重写doRegister修改了订阅的频道。并且重写了处理消息的方法,通过将消息内容包装成RedisKeyExpiredEvent事件对象,然后通过事件发布器将事件发布出去。
public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");private final RedisMessageListenerContainer listenerContainer;private String keyspaceNotificationsConfigParameter = "EA";/*** Creates new {@link KeyspaceEventMessageListener}.** @param listenerContainer must not be {@literal null}.*/public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) {Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!");this.listenerContainer = listenerContainer;}/** (non-Javadoc)* @see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[])*/@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {if (message == null || ObjectUtils.isEmpty(message.getChannel()) || ObjectUtils.isEmpty(message.getBody())) {return;}doHandleMessage(message);}/*** Handle the actual message** @param message never {@literal null}.*/protected abstract void doHandleMessage(Message message);/*** Initialize the message listener by writing requried redis config for {@literal notify-keyspace-events} and* registering the listener within the container.*/public void init() {if (StringUtils.hasText(keyspaceNotificationsConfigParameter)) {RedisConnection connection = listenerContainer.getConnectionFactory().getConnection();try {Properties config = connection.getConfig("notify-keyspace-events");if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);}} finally {connection.close();}}doRegister(listenerContainer);}/*** Register instance within the container.** @param container never {@literal null}.*/protected void doRegister(RedisMessageListenerContainer container) {listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);}/** (non-Javadoc)* @see org.springframework.beans.factory.DisposableBean#destroy()*/@Overridepublic void destroy() throws Exception {listenerContainer.removeMessageListener(this);}/*** Set the configuration string to use for {@literal notify-keyspace-events}.** @param keyspaceNotificationsConfigParameter can be {@literal null}.* @since 1.8*/public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;}/** (non-Javadoc)* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()*/@Overridepublic void afterPropertiesSet() throws Exception {init();}
}
KeyExpirationEventMessageListener
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implementsApplicationEventPublisherAware {private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");private @Nullable ApplicationEventPublisher publisher;/*** Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.** @param listenerContainer must not be {@literal null}.*/public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doRegister(org.springframework.data.redis.listener.RedisMessageListenerContainer)*/@Overrideprotected void doRegister(RedisMessageListenerContainer listenerContainer) {listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doHandleMessage(org.springframework.data.redis.connection.Message)*/@Overrideprotected void doHandleMessage(Message message) {publishEvent(new RedisKeyExpiredEvent(message.getBody()));}/*** Publish the event in case an {@link ApplicationEventPublisher} is set.** @param event can be {@literal null}.*/protected void publishEvent(RedisKeyExpiredEvent event) {if (publisher != null) {this.publisher.publishEvent(event);}}/** (non-Javadoc)* @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)*/@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.publisher = applicationEventPublisher;}
}
修改RedisConfig
@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 注意以下测试在redis.confi配置文件中设置了: notify-keyspace-events 为 AKE, // 也可以参照KeyspaceEventMessageListener在代码中设置这个配置项/*redis提供的事件通知发布消息示例如下:K =>  PUBLISH __keyspace@0__:foo delE =>  PUBLISH __keyevent@0__:del foo参照上述示例去写这个topic即可*/// 监听key删除事件container.addMessageListener(new MessageListener() {/*执行命令: del order:1234输出如下:监听key删除事件 - 消息的发布频道: __keyevent@0__:del监听key删除事件 - 消息内容: order:1234监听key删除事件 - 消息的订阅频道: __keyevent@*__:del*/@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("监听key删除事件 - 消息的发布频道: {}", new String(channelBytes));log.info("监听key删除事件 - 消息内容: {}", new String(bodyBytes));log.info("监听key删除事件 - 消息的订阅频道: {}", new String(pattern));}}, new PatternTopic("__keyevent@*__:del"));// 监听指定前缀的keycontainer.addMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();/*执行命令: set order:1234 a输出如下:监听指定前缀的key - 消息的发布频道: __keyspace@0__:order:1234监听指定前缀的key - 消息内容: set监听指定前缀的key - 消息的订阅频道: __keyspace@0__:order:**/log.info("监听指定前缀的key - 消息的发布频道: {}", new String(channelBytes));log.info("监听指定前缀的key - 消息内容: {}", new String(bodyBytes));log.info("监听指定前缀的key - 消息的订阅频道: {}", new String(pattern));}}, new PatternTopic("__keyspace@0__:order:*"));return container;}/* 借助了1. 这个KeyspaceEventMessageListener的bean中的对redis的配置修改2. 监听patter的topic*/@Beanpublic KeyspaceEventMessageListener keyspaceEventMessageListener(RedisMessageListenerContainer container) {return new KeyspaceEventMessageListener(container){/* __keyevent@* */@Overrideprotected void doHandleMessage(Message message) {log.info("监听所有key命令事件, 消息内容:{}, {}",// set name zzhua; expire name 5;// 消息内容就是key的名称, 比如: namenew String(message.getBody()),// 消息所发布的频道, 比如: __keyevent@0__:set, __keyevent@0__:expire等new String(message.getChannel()));}};}@Beanpublic KeyExpirationEventMessageListener keyExpirationEventMessageListener(RedisMessageListenerContainer container) {return new KeyExpirationEventMessageListener(container){/* __keyevent@*__:expired */@Overrideprotected void doHandleMessage(Message message) {log.info("监听所有key失效, 消息内容:{}, {}",// 消息内容就是key的名称, 比如: namenew String(message.getBody()),// 消息所发布的频道, 比如: __keyevent@0__:expirednew String(message.getChannel()));}};}}

3. 基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新的数据类型(因此支持持久化),可以实现一个功能非常完善的消息队列(专门为消息队列设计的),Redis streams官网介绍

1. 单消费者
xadd

发送消息的命令:

在这里插入图片描述

  • 不指定消息队列的的最大消息数量就是不限制消息数量

  • 消息唯一id建议使用*,让redis自动生成消息唯一id

  • (上面命令介绍中的:大写表示照着抄就行;小写的是需要我们自己提供的参数;中括号表示可选参数)

示例

## 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"
xread

读取消息的方式之一:

在这里插入图片描述

  • 不指定阻塞时间,就是直接返回(不阻塞);设置为0表示阻塞到有值为止;
  • stream中消息读取之后,不会被删除;
  • $ 表示读取最新的消息,但是如果之前消息都已经被读过了,那么当前继续去读的话,是读不到的(尽管当前stream中仍然有消息)

示例

## 从users的队列中读取1条消息, 从第1条开始读
127.0.0.1:6379> XREAD COUNT 1 STREAMS users 0
1) 1) "users"2) 1) 1) "1708522812423-0"2) 1) "name"2) "jack"3) "age"4) "21"
操作示例

查看当前redis版本是否支持stream数据结构

在这里插入图片描述

xadd与xread使用示例

在这里插入图片描述

在上面,还有1点没有体现出来:在stream中的每1个消息,被当前客户端读了1遍,还可以被当前客户端读1遍,然后,这个消息还可以被其它客户端读1遍。

xread读取最新数据要使用阻塞的方法才可以

在这里插入图片描述

我们发现,只有在阻塞期间,使用$才能读取到最新消息;如果不使用阻塞,想要读取最新数据是不可能的。

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

在这里插入图片描述

但是这会存在消息漏读的问题,由于:只有在阻塞期间,使用$才能读取到最新消息,假设在处理消息的时候,此时消息队列中发来了消息,那么这些消息就会被错过,只有当执行XREAD COUNT 1 BLOCK 2000 STREAMS users $开始时收到的第1个消息,才会被处理。

XREAD命令特点

STREAM类型消息队列的XREAD命令特点

  • 消息可回溯(消息读取完之后,不会消失,永久的保留在我们的队列当中,随时想看都可以回去读)
  • 一个消息可以被多个消费者读取(因为消息读取之后,不会消失)
  • 可以阻塞读取
  • 有消息漏读的风险(在处理消息的过程中,如果来了多条消息,则只能看到最后一条消息,即最新的那1条)
2. 消费者组

上面,我们知道通过xread命令你阻塞读取最新消息,有消息漏读的风险,下面,我们看看消费者组是如何解决这个问题的。

特点

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

在这里插入图片描述

消息分流

队列中的消息会分流给组内不同消费者,而重复消费,从而加快消息处理的速度

消息标示

消费者组会维护一个标示记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费

消息确认

消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

要点

redis服务器维护了多个消费者组

可以给1个stream指定多个消费者组

  • 把这里的消费者组当成上节中的消费者即可
  • 1个stream绑定的的多个消费者组都会收到消息

消息发给消费者组

  • 即多个消费者共同加入到消费者组中,形成1个消费者,而消息就分给消费者中中的消息来消费

消费者加入消费者组

消费者从消费者组中拉取消息,拉取到的消息进入消费者组中的pending-list

消费者消费完消息后,向消费者组确认消息已处理,已确认处理的消息会从pending-list中删除

消费者组总会用1个标识,来记录最后1个被处理的消息

创建消费者组
XGROUP CREATE  key groupName ID [MKSTREAM]
  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
    • 建议:如果不想处理队列中已存在的消息,就可以使用$;如果要处理已存在的消息,就是用0)
  • MKSTREAM:队列不存在时自动创建队列;不指定的话,当不存在时,不会创建

其它常见命令:

# 删除指定的消费者组
XGROUP DESTORY key groupName# 给指定的消费者组添加消费者
#(一般情况下,我们并不需要自己添加消费者,因为当我们从这个消费者组当中指定1个消费者,
#                                   并且监听消息的时候,如果这个消费者不存在,则会自动创建消费者)
XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间(若未指定,则不阻塞)
  • NOACK:无需手动ACK,即获取到消息后自动确认(一般不建议使用)
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:
    • “>”:从消费者组的标记找到最后1个处理的消息(注意:不是已处理的消息,是处理的消息,也就是说它有可能被消费者获取了,但还没被消费者确认掉),的下一个未处理的消息开始
    • 其它(除了">"以外的所有):根据指定id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始(一直拿0,就是一直从pending-list中拿第1个消息)
图示操作过程

在这里插入图片描述

在这里插入图片描述

消费者监听消息的基本思路

在这里插入图片描述

XREADGROUP命令特点
  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次
  • (内存不受jvm限制,消息可做持久化,消息确认机制)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/697091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大语言模型的开山之作—探秘GPT系列:GPT-1-GPT2-GPT-3的进化之路

模型模型参数创新点评价GPT1预训练微调&#xff0c; 创新点在于Task-specific input transformations。GPT215亿参数预训练PromptPredict&#xff0c; 创新点在于Zero-shotZero-shot新颖度拉满&#xff0c;但模型性能拉胯GPT31750亿参数预训练PromptPredict&#xff0c; 创新点…

pclpy 可视化点云(多窗口可视化、单窗口多点云可视化)

pclpy 可视化点云&#xff08;多窗口可视化、单窗口多点云可视化&#xff09; 一、算法原理二、代码三、结果1.多窗口可视化结果2.单窗口多点云可视化 四、相关数据五、问题与解决方案1.问题2.解决 一、算法原理 原理看一下代码写的很仔细的。。目前在同一个窗口最多建立2个窗…

ESP8266智能家居(3)——单片机数据发送到mqtt服务器

1.主要思想 前期已学习如何用ESP8266连接WIFI&#xff0c;并发送数据到服务器。现在只需要在单片机与nodeMCU之间建立起串口通信&#xff0c;这样单片机就可以将传感器测到的数据&#xff1a;光照&#xff0c;温度&#xff0c;湿度等等传递给8266了&#xff0c;然后8266再对数据…

【AI应用】SoraWebui——在线文生视频工具

SoraWebui 是一个开源项目&#xff0c;允许用户使用 OpenAI 的 Sora 模型使用文本在线生成视频&#xff0c;从而简化视频创建&#xff0c;并具有轻松的一键网站部署功能 在 Vercel 上部署 1. 克隆项目 git clone gitgithub.com:SoraWebui/SoraWebui.git 2. 安装依赖 cd Sor…

【Java EE初阶二十】http的简单理解(一)

1. 初识http HTTP 最新的版本应该是 HTTP/3.0&#xff0c;目前大规模使用的版本 HTTP/1.1&#xff1b; 下面来简单说明一下使用 HTTP 协议的场景: 1、浏览器打开网站 (基本上) 2、手机 APP 访问对应的服务器 (大概率) 前面的 TCP与UDP 和http不同&#xff0c;HTTP 的报文格式&a…

React基础-webpack+creact-react-app创建项目

学习视频&#xff1a;学习视频 2节&#xff1a;webpack工程化创建项目 2.1.webpack工程化工具&#xff1a;vite/rollup/turbopak; 实现组件的合并、压缩、打包等&#xff1b; 代码编译、兼容、校验等&#xff1b; 2.2.React工程化/组件开发 我们可以基于webpack自己去搭建…

sql-labs25-28a

一、环境 网上都有不过多阐述 二、sql-labs第25关 它说你的OR和and属于它,那就是过滤了OR和and 注入尝试 不用or和and进行爆破注入,很明显是有注入点的 ?id-1 union select 1,2,3-- 查看数据库 ok&#xff0c;此道题算是解了但是如果我们用了and了呢 ?id-1 and updatex…

浅谈集群的分类

本文主要介绍集群部署相关的知识&#xff0c;介绍集群部署的基础&#xff0c;集群的分类、集群的负载均衡技术&#xff0c;集群的可用性以及集群的容错机制。随后介绍Redis-Cluster以及Mysql的架构以及主从复制原理。 集群介绍 单台服务器本身会受到带宽、内存、处理器等多方面…

STM32-串口通信(串口的接收和发送)

文章目录 STM32的串口通信一、STM32里的串口通信二、串口的发送和接收串口发送串口接收 三、串口在STM32中的配置四、串口接收的两种实现方式1. 需要更改的地方2. 查询RXNE标志位3. 使用中断 总结 STM32的串口通信 本文在于记录自己的学习过程中遇到的问题和总结&#xff0c;各…

大型语言模型的语义搜索(一):关键词搜索

关键词搜索(Keyword Search)是文本搜索种一种常用的技术&#xff0c;很多知名的应用app比如Spotify、YouTube 或 Google map等都会使用关键词搜索的算法来实现用户的搜索任务&#xff0c;关键词搜索是构建搜索系统最常用的方法&#xff0c;最常用的搜索算法是Okapi BM25&#x…

Liunx使用nginx和http搭建yum-server仓库

文章目录 1. yum-server的搭建方式2. nginx搭建yum-server仓库2.1. 安装配置nginx2.2 配置yum-server的rpm2.3. 同步yum源相关包2.3.1 rsync同步源3.3.1 reposync同步源 2.4. 配置客户端访问yum配置2.5. 验证测试 3. http服务搭建yum-server仓库3.1. 安装配置http3.2 配置yum-s…

Firewalld防火墙

Firewalld概述 Firewalld firewalld防火墙是centos7系统默认防火墙的防火墙管理工具&#xff0c;取代了之前的iptables防火墙&#xff0c;也是工作在网络层&#xff0c;属于包过滤防火墙。 支持网络区域所定义的网络链接以及接口安全等级的动态防火墙管理工具至此IPv4、IPv6…

studio one 6正版多少钱?怎么购买studio one 更便宜,有优惠券哦

Presonus Studio One Studio One是由美国PreSonus公司开发的数字音频工作站&#xff0c;作为DAW届的新人&#xff0c;功能强大且全面&#xff0c;虽然它不像其他DAW那样拥有历史和声誉&#xff0c;但它是一个可爱的软件&#xff0c;包含许多其它DAW所不具备的实用功能&#xff…

web基础及http协议 (二)----------Apache相关配置与优化

一、httpd 安装组成 http 服务基于 C/S 结构 1 .常见http 服务器程序 httpd apache&#xff0c;存在C10K&#xff08;10K connections&#xff09;问题 nginx 解决C10K问题lighttpd IIS .asp 应用程序服务器 tomcat .jsp 应用程序服务器 jetty 开源的servlet容器&#xf…

选择 Python IDE(VSCode、Spyder、Visual Studio 2022和 PyCharm)

前言 当选择 Python 开发工具时&#xff0c;你需要考虑自己的需求、偏好和项目类型。下面是对VSCode、Spyder、Visual Studio 2022和 PyCharm的对比推荐总结&#xff1a; 结论 1、如果你专注于“数据科学”&#xff0c;选择SpyDer没错。 内容 Visual Studio Code (VS Code)…

CleanMyMac2024苹果电脑清理工具最新使用全面评价

作为软件评价专家&#xff0c;我对CleanMyMac X进行了全面的评估&#xff0c;以下是我的详细评价&#xff1a; CleanMyMac X4.14.6全新版下载如下: https://wm.makeding.com/iclk/?zoneid49983 一、功能 CleanMyMac X的功能相当全面&#xff0c;几乎涵盖了Mac电脑清理所需的…

nginx 具体介绍

一&#xff0c;nginx 介绍 &#xff08;一&#xff09;nginx 与apache 1&#xff0c; Apache event 模型 相对于 prefork 模式 可以同时处理更多的请求 相对于 worker 模式 解决了keepalive场景下&#xff0c;长期被占用的线程的资源浪费问题 因为有监听线程&#…

【数据结构】链式队列

链式队列实现&#xff1a; 1.创建一个空队列 2.尾插法入队 3.头删法出队 4.遍历队列 一、main函数 #include <stdio.h> #include "./3.linkqueue.h" int main(int…

运维SRE-19 网站Web中间件服务-http-nginx

Ans自动化流程 1.网站集群核心协议&#xff1a;HTTP 1.1概述 web服务&#xff1a;网站服务&#xff0c;网站协议即可. 协议&#xff1a;http协议,https协议 服务&#xff1a;Nginx服务&#xff0c;Tengine服务....1.2 HTTP协议 http超文本传输协议&#xff0c;负责数据在网站…

更高效的构建工具-vite

更高效的构建工具-vite 前言Vite是什么Vite和webpack的比较1. 运行原理2. 使用成本 Vite的初体验 前言 首先我们要认识什么时构建工具&#xff1f; 企业级项目都具备什么功能呢&#xff1f; Typescript&#xff1a;如果遇到ts文件&#xff0c;我们需要使用tsc将typescript代码…