文章目录
- Redis消息队列实现异步秒杀
- 1. jvm阻塞队列问题
- 2. 什么是消息队列
- 3. Redis实现消息队列
- 1. 基于List结构模拟消息队列
- 操作
- 优缺点
- 2. 基于PubSub发布订阅的消息队列
- 操作
- 优缺点
- spring 结合redis的pubsub使用示例
- 1. 引入依赖
- 2. 配置文件
- 3. RedisConfig
- 4. CustomizeMessageListener
- 5. RedisMessageReceiver
- 6. 监听原理简析
- 7. 监听redis的key
- 修改redis.conf
- KeyspaceEventMessageListener
- KeyExpirationEventMessageListener
- 修改RedisConfig
- 3. 基于Stream的消息队列
- 1. 单消费者
- xadd
- xread
- 操作示例
- XREAD命令特点
- 2. 消费者组
- 特点
- 要点
- 创建消费者组
- 从消费者组读取消息
- ==图示操作过程==
- 消费者监听消息的基本思路
- XREADGROUP命令特点
Redis消息队列实现异步秒杀
1. jvm阻塞队列问题
java使用阻塞队列实现异步秒杀存在问题:
- jvm内存限制问题:jvm内存不是无限的,在高并发的情况下,当有大量的订单需要创建时,就有可能超出jvm阻塞队列的上限。
- 数据安全问题:jvm的内存没有持久化机制,当服务重启或宕机时,阻塞队列中的订单都会丢失。或者,当我们从阻塞队列中拿到订单任务,但是尚未处理时,如果此时发生了异常,这个订单任务就没有机会处理了,也就丢失了。
2. 什么是消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
(正常下单,我们需要将订单消息写入数据库。但由于秒杀并发访问量大,数据库本身并发处理能力不强,因此,在处理秒杀业务时,可以将部分业务在生产者这边做校验,然后将消息写入消息队列,而消费者处理该消息队列中的消息,从而实现双方解耦,更快的处理秒杀业务)
3. Redis实现消息队列
我们可以使用一些现成的mq,比如kafka,rabbitmq等等,但是呢,如果没有安装mq,我们也可以直接使用redis提供的mq方案,降低我们的部署和学习成本。Redis提供了三种不同的方式来实现消息队列:
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
1. 基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
操作
命令介绍如下
优缺点
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失(如果消费者获取消息后,然后立马就宕机了,这个消息就得不到处理,等同于丢失了)
- 只支持单消费者(1个消息只能被1个消费者取走,其它消费者会收不到此消息)
2. 基于PubSub发布订阅的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
- SUBSCRIBE channel [channel] :订阅一个或多个频道
- PUBLISH channel msg :向一个频道发送消息
- PSUBSCRIBE pattern [pattern] :订阅与pattern格式匹配的所有频道
- ?匹配1个字符:
h?llo
subscribes tohello
,hallo
andhxllo
- *匹配0个或多个字符:
h*llo
subscribes tohllo
andheeeello
- []指定字符:
h[ae]llo
subscribes tohello
andhallo,
but nothillo
- ?匹配1个字符:
操作
优缺点
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化(如果发送消息时,这个消息的频道没有被任何人订阅,那这个消息就丢失了,也消息就是不会被保存)
- 无法避免消息丢失(发完了,没人收,直接就丢了)
- 消息堆积有上限,超出时数据丢失(当我们发送消息时,如果有消费者在监听,消费者会有1个缓存区去缓存这个消息数据,如果消费者处理的慢,那么客户端的缓存区中的消息会不断堆积,而这个缓存区是有大小限制的,如果超出了就会丢失)
spring 结合redis的pubsub使用示例
1. 引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.zzhua</groupId><artifactId>demo-redis-pubsub</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 如果使用lettuce-core作为连接redis的实现, 不引入此依赖会报错: Caused by: java.lang.ClassNotFoundException:org.apache.commons.pool2.impl.GenericObjectPoolConfig --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2. 配置文件
spring:redis:host: 127.0.0.1port: 6379database: 0password:lettuce:pool:min-idle: 2max-active: 8max-idle: 8
3. RedisConfig
spring-data-redis提供了2种处理redis消息的方法:
-
自己实现MessageListener接口
public interface MessageListener {// 处理消息的方法// 第1个参数封装了: 消息发布到哪1个具体频道 和 消息的内容// 第2个参数封装了: // 1. 如果当前是通过普通模式去订阅的频道, 那么收到消息时该pattern就是消息发送的具体频道// 2. 如果当前是通过pattern通配符匹配去订阅的频道, 那么收到消息时, 该pattern就是订阅的频道void onMessage(Message message, @Nullable byte[] pattern); }
-
指定MessageListenerAdapter适配器,该适配器指定特定对象的特定方法来处理消息(对特定的方法有参数方面的要求)
@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 监听order.q通道(不带通配符匹配channel)container.addMessageListener(customizeMessageListener, new ChannelTopic("order.q"));// 监听order.*通道(带通配符匹配channel)container.addMessageListener(listenerAdapter(), new PatternTopic("order.*"));return container;}@Beanpublic MessageListenerAdapter listenerAdapter() {// 交给receiver的receiveMessage方法, 对于这个方法的参数有如下要求:// (2个参数: 第一个参数是Object-即消息内容(默认由RedisSerializer#deserialize处理,见MessageListenerAdapter#onMessage), // 第二个参数是String-即订阅的通道, 详细看上面MessageListener接口中第二个参数的解释)// (1个参数: 参数是Object-即消息内容)return new MessageListenerAdapter(redisMessageReceiver, "receiveMessage");}}
4. CustomizeMessageListener
@Slf4j
@Component
public class CustomizeMessageListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("order.q - 消息订阅频道: {}", new String(channelBytes));log.info("order.q - 消息内容: {}", new String(bodyBytes));log.info("order.q - 监听频道: {}", new String(channelBytes));}
}
5. RedisMessageReceiver
@Slf4j
@Component
public class RedisMessageReceiver {public void receiveMessage(String msg, String topic) {log.info("order.* - 消息的订阅频道: {}", topic);log.info("order.* - 消息的内容: {}", msg);}}
6. 监听原理简析
spring-data-redis的lettuce-core是基于netty的,消息监听处理过程如下:
PubSubCommandHandler(netty中的ChannelHandler处理器)->PubSubEndpoint(根据消息类型调用LettuceMessageListener 的不同方法)->LettuceMessageListener -> RedisMessageListenerContainer$DispatchMessageListener(如果是pattern,则从patternMapping中获取所有的listener;如果不是pattern,则从channelMapping中获取所有的listener。至于怎么判断是不是pattern?)->使用异步线程池对上一步获取的所有listener执行onMessage方法
至于怎么判断是不是pattern?这个是根据订阅关系来的,如果订阅的是pattern,那么如果这个向这个pattern中发送了消息,那么就会收到1次消息,并且是pattern。如果订阅的是普通channel,那么如果向这个普通channel发送了消息,那么又会收到1次消息不是pattern。如果向1个channel中发送消息,这个channel既符合订阅的pattern,也符合订阅的普通channel,那么会收到2次消息,并且这2次消息1次是pattern,1次不是pattern的
7. 监听redis的key
既然已经说到了监听redis发布消息了,那么也补充一下监听redis的key过期。因为监听redis的key过期也是通过redis的发布订阅实现的。
修改redis.conf
############################# EVENT NOTIFICATION ############################### Redis能够将在keyspace中发生的事件通知给 发布/订阅 客户端# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications# 例如:如果开启了keyspace事件通知(注意了,必须是开启了keyspace事件通知才可以,开启的方式就是添加参数K),
# 一个客户端在数据库0对一个叫'foo'的key执行了删除操作,
# 那么redis将会通过 发布订阅 机制发布2条消息
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo# 也可以指定一组 类名 来选择 Redis 会通知的一类事件。
# 每类事件 都通过一个字符定义# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:# keySpace事件 以 __keyspace@<数据库序号>__ 为前缀 发布事件
# K Keyspace events, published with __keyspace@<db>__ prefix. # Keyevent事件 以 __keyevent@<数据库序号>__ 为前缀 发布事件
# E Keyevent events, published with __keyevent@<db>__ prefix. # 执行常规命令,比如del、expire、rename
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... # 执行 String 命令
# $ String commands # 执行 List 命令
# l List commands # 执行 Set 命令
# s Set commands # 执行 Hash 命令
# h Hash commands 执行 Hash 命令# 执行 ZSet 命令
# z Sorted set commands # key过期事件(每个key失效都会触发这类事件)
# x Expired events (events generated every time a key expires) # key驱逐事件(当key在内存满了被清除时生成)
# e Evicted events (events generated when a key is evicted for maxmemory) # A是g$lshzxe的别名,因此AKE就意味着所有的事件
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
## 配置中的notify-keyspace-events这个参数由0个或多个字符组成,
# 如果配置为空字符串表示禁用通知
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
## 比如,要开启list命令和generic常规命令的事件通知,
# 应该配置成 notify-keyspace-events Elg
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# 比如,订阅了__keyevent@0__:expired频道的客户端要收到key失效的时间,
# 应该配置成 notify-keyspace-events Ex
# Example 2: to get the stream of the expired keys subscribing to channel name __keyevent@0__:expired use:
#
# notify-keyspace-events Ex
## 默认情况下,所有的通知都被禁用了,并且这个特性有性能上的开销。
# 注意,K和E必须至少指定其中一个,否则,将收不到任何事件。
# By default all notifications are disabled because most users don't need
# this feature and the feature has some overhead. Note that if you don't
# specify at least one of K or E, no events will be delivered.
notify-keyspace-events "Ex"############################### ADVANCED CONFIG ###############################
KeyspaceEventMessageListener
- 通过实现InitializingBean接口,在afterPropertiesSet方法中,调用初始化init方法,从redis中获取
notify-keyspace-events
配置项对应的值,如果未设置任何值,则改为EA
,结合上面的redis.conf节选可知,表示的是开启所有的事件通知 - 使用redisMessageListenerContainer,通过pattern通配符匹配的方式订阅
__keyevent@*
频道 - 它是个抽象类,实现了MessageListener接口,处理消息的方法是个抽象方法
- 它有1个子类KeyExpirationEventMessageListener,订阅的pattern的频道是:
__keyevent@*__:expired
,通过重写doRegister修改了订阅的频道。并且重写了处理消息的方法,通过将消息内容包装成RedisKeyExpiredEvent事件对象,然后通过事件发布器将事件发布出去。
public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");private final RedisMessageListenerContainer listenerContainer;private String keyspaceNotificationsConfigParameter = "EA";/*** Creates new {@link KeyspaceEventMessageListener}.** @param listenerContainer must not be {@literal null}.*/public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) {Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!");this.listenerContainer = listenerContainer;}/** (non-Javadoc)* @see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[])*/@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {if (message == null || ObjectUtils.isEmpty(message.getChannel()) || ObjectUtils.isEmpty(message.getBody())) {return;}doHandleMessage(message);}/*** Handle the actual message** @param message never {@literal null}.*/protected abstract void doHandleMessage(Message message);/*** Initialize the message listener by writing requried redis config for {@literal notify-keyspace-events} and* registering the listener within the container.*/public void init() {if (StringUtils.hasText(keyspaceNotificationsConfigParameter)) {RedisConnection connection = listenerContainer.getConnectionFactory().getConnection();try {Properties config = connection.getConfig("notify-keyspace-events");if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);}} finally {connection.close();}}doRegister(listenerContainer);}/*** Register instance within the container.** @param container never {@literal null}.*/protected void doRegister(RedisMessageListenerContainer container) {listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);}/** (non-Javadoc)* @see org.springframework.beans.factory.DisposableBean#destroy()*/@Overridepublic void destroy() throws Exception {listenerContainer.removeMessageListener(this);}/*** Set the configuration string to use for {@literal notify-keyspace-events}.** @param keyspaceNotificationsConfigParameter can be {@literal null}.* @since 1.8*/public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;}/** (non-Javadoc)* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()*/@Overridepublic void afterPropertiesSet() throws Exception {init();}
}
KeyExpirationEventMessageListener
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implementsApplicationEventPublisherAware {private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");private @Nullable ApplicationEventPublisher publisher;/*** Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.** @param listenerContainer must not be {@literal null}.*/public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doRegister(org.springframework.data.redis.listener.RedisMessageListenerContainer)*/@Overrideprotected void doRegister(RedisMessageListenerContainer listenerContainer) {listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doHandleMessage(org.springframework.data.redis.connection.Message)*/@Overrideprotected void doHandleMessage(Message message) {publishEvent(new RedisKeyExpiredEvent(message.getBody()));}/*** Publish the event in case an {@link ApplicationEventPublisher} is set.** @param event can be {@literal null}.*/protected void publishEvent(RedisKeyExpiredEvent event) {if (publisher != null) {this.publisher.publishEvent(event);}}/** (non-Javadoc)* @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)*/@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.publisher = applicationEventPublisher;}
}
修改RedisConfig
@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 注意以下测试在redis.confi配置文件中设置了: notify-keyspace-events 为 AKE, // 也可以参照KeyspaceEventMessageListener在代码中设置这个配置项/*redis提供的事件通知发布消息示例如下:K => PUBLISH __keyspace@0__:foo delE => PUBLISH __keyevent@0__:del foo参照上述示例去写这个topic即可*/// 监听key删除事件container.addMessageListener(new MessageListener() {/*执行命令: del order:1234输出如下:监听key删除事件 - 消息的发布频道: __keyevent@0__:del监听key删除事件 - 消息内容: order:1234监听key删除事件 - 消息的订阅频道: __keyevent@*__:del*/@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("监听key删除事件 - 消息的发布频道: {}", new String(channelBytes));log.info("监听key删除事件 - 消息内容: {}", new String(bodyBytes));log.info("监听key删除事件 - 消息的订阅频道: {}", new String(pattern));}}, new PatternTopic("__keyevent@*__:del"));// 监听指定前缀的keycontainer.addMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();/*执行命令: set order:1234 a输出如下:监听指定前缀的key - 消息的发布频道: __keyspace@0__:order:1234监听指定前缀的key - 消息内容: set监听指定前缀的key - 消息的订阅频道: __keyspace@0__:order:**/log.info("监听指定前缀的key - 消息的发布频道: {}", new String(channelBytes));log.info("监听指定前缀的key - 消息内容: {}", new String(bodyBytes));log.info("监听指定前缀的key - 消息的订阅频道: {}", new String(pattern));}}, new PatternTopic("__keyspace@0__:order:*"));return container;}/* 借助了1. 这个KeyspaceEventMessageListener的bean中的对redis的配置修改2. 监听patter的topic*/@Beanpublic KeyspaceEventMessageListener keyspaceEventMessageListener(RedisMessageListenerContainer container) {return new KeyspaceEventMessageListener(container){/* __keyevent@* */@Overrideprotected void doHandleMessage(Message message) {log.info("监听所有key命令事件, 消息内容:{}, {}",// set name zzhua; expire name 5;// 消息内容就是key的名称, 比如: namenew String(message.getBody()),// 消息所发布的频道, 比如: __keyevent@0__:set, __keyevent@0__:expire等new String(message.getChannel()));}};}@Beanpublic KeyExpirationEventMessageListener keyExpirationEventMessageListener(RedisMessageListenerContainer container) {return new KeyExpirationEventMessageListener(container){/* __keyevent@*__:expired */@Overrideprotected void doHandleMessage(Message message) {log.info("监听所有key失效, 消息内容:{}, {}",// 消息内容就是key的名称, 比如: namenew String(message.getBody()),// 消息所发布的频道, 比如: __keyevent@0__:expirednew String(message.getChannel()));}};}}
3. 基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新的数据类型(因此支持持久化),可以实现一个功能非常完善的消息队列(专门为消息队列设计的),Redis streams官网介绍
1. 单消费者
xadd
发送消息的命令:
-
不指定消息队列的的最大消息数量就是不限制消息数量
-
消息唯一id建议使用
*
,让redis自动生成消息唯一id -
(上面命令介绍中的:大写表示照着抄就行;小写的是需要我们自己提供的参数;中括号表示可选参数)
示例
## 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"
xread
读取消息的方式之一:
- 不指定阻塞时间,就是直接返回(不阻塞);设置为0表示阻塞到有值为止;
- stream中消息读取之后,不会被删除;
- $ 表示读取最新的消息,但是如果之前消息都已经被读过了,那么当前继续去读的话,是读不到的(尽管当前stream中仍然有消息)
示例
## 从users的队列中读取1条消息, 从第1条开始读
127.0.0.1:6379> XREAD COUNT 1 STREAMS users 0
1) 1) "users"2) 1) 1) "1708522812423-0"2) 1) "name"2) "jack"3) "age"4) "21"
操作示例
查看当前redis版本是否支持stream数据结构
xadd与xread使用示例
在上面,还有1点没有体现出来:在stream中的每1个消息,被当前客户端读了1遍,还可以被当前客户端读1遍,然后,这个消息还可以被其它客户端读1遍。
xread读取最新数据要使用阻塞的方法才可以
我们发现,只有在阻塞期间,使用$才能读取到最新消息;如果不使用阻塞,想要读取最新数据是不可能的。
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:
但是这会存在消息漏读的问题,由于:只有在阻塞期间,使用$才能读取到最新消息,假设在处理消息的时候,此时消息队列中发来了消息,那么这些消息就会被错过,只有当执行XREAD COUNT 1 BLOCK 2000 STREAMS users $开始时收到的第1个消息,才会被处理。
XREAD命令特点
STREAM类型消息队列的XREAD命令特点
- 消息可回溯(消息读取完之后,不会消失,永久的保留在我们的队列当中,随时想看都可以回去读)
- 一个消息可以被多个消费者读取(因为消息读取之后,不会消失)
- 可以阻塞读取
- 有消息漏读的风险(在处理消息的过程中,如果来了多条消息,则只能看到最后一条消息,即最新的那1条)
2. 消费者组
上面,我们知道通过xread命令你阻塞读取最新消息,有消息漏读的风险,下面,我们看看消费者组是如何解决这个问题的。
特点
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
消息分流
队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
消息标示
消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
消息确认
消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。
要点
redis服务器维护了多个消费者组
可以给1个stream指定多个消费者组
- 把这里的消费者组当成上节中的消费者即可
- 1个stream绑定的的多个消费者组都会收到消息
消息发给消费者组
- 即多个消费者共同加入到消费者组中,形成1个消费者,而消息就分给消费者中中的消息来消费
消费者加入消费者组
消费者从消费者组中拉取消息,拉取到的消息进入消费者组中的pending-list
消费者消费完消息后,向消费者组确认消息已处理,已确认处理的消息会从pending-list中删除
消费者组总会用1个标识,来记录最后1个被处理的消息
创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
- 建议:如果不想处理队列中已存在的消息,就可以使用$;如果要处理已存在的消息,就是用0)
- MKSTREAM:队列不存在时自动创建队列;不指定的话,当不存在时,不会创建
其它常见命令:
# 删除指定的消费者组
XGROUP DESTORY key groupName# 给指定的消费者组添加消费者
#(一般情况下,我们并不需要自己添加消费者,因为当我们从这个消费者组当中指定1个消费者,
# 并且监听消息的时候,如果这个消费者不存在,则会自动创建消费者)
XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间(若未指定,则不阻塞)
- NOACK:无需手动ACK,即获取到消息后自动确认(一般不建议使用)
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
- “>”:从消费者组的标记找到最后1个处理的消息(注意:不是已处理的消息,是处理的消息,也就是说它有可能被消费者获取了,但还没被消费者确认掉),的下一个未处理的消息开始
- 其它(除了">"以外的所有):根据指定id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始(一直拿0,就是一直从pending-list中拿第1个消息)
图示操作过程
消费者监听消息的基本思路
XREADGROUP命令特点
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
- (内存不受jvm限制,消息可做持久化,消息确认机制)