使用 Redis Stream 实现消息队列
Intro
Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很好的实现消息队列
Simple Usage
首先我们来看一个简单版本的 Stream 使用,我们在代码里使用一个发布者,一个消费者来模拟一个简单的消息队列的场景
来看下面的测试代码:
private const string StreamKey = "test-simple-stream";public static async Task MainTest()
{await RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);// register background consumer_ = Task.Factory.StartNew(Consume).ConfigureAwait(false);//await Publish();
}private static async Task Publish()
{Console.WriteLine("Press Enter to publish messages, Press Q to exit");var input = Console.ReadLine();while (input is not "q" and not "Q"){var redis = RedisHelper.GetDatabase();for (var i = 0; i < 10; i++){await redis.StreamAddAsync(StreamKey, "message", $"test_message_{i}");}input = Console.ReadLine();}
}private static async Task Consume()
{var lastMsgId = "0-0";while (true){await InvokeHelper.TryInvokeAsync(async () =>{var redis = RedisHelper.GetDatabase();var entries = await redis.StreamReadAsync(StreamKey, lastMsgId, 2);if (entries.Length == 0){return;}foreach (var entry in entries){Console.WriteLine(entry.Id);entry.Values.Dump();// delete message if you want// redis.StreamDelete(StreamKey, new[] { entry.Id });}lastMsgId = entries[^1].Id;});await Task.Delay(200);}
}
上面的代码会使用一个后台线程来运行一个 Consumer 来从 Stream 中读取消息,有两种消费消息的模式,一种是自己维护一个处理的消息 offset,每次从这个 offset 之后读取新消息,另外一种模式不需要维护本地的 offset,可以在处理完消息之后直接删掉消息,默认消息是不会删消息的,所以如果不删消息的话需要维护
Publisher 每次会发布 10 条消息,Consumer 每次会读取两条消息,处理之后会等待 200 ms,之后再查询消息
来看一下运行效果吧:
Consumer Group
上面的示例会相对来说比较简单,只有一个 Consumer,但是在比较常用的场景下往往会有多个消费者处理,
比如说用户注册成功之后,发布一条消息可能会有多个 Consumer 同时给用户发邮件或短信以及给用户加积分等操作,这种场景下使用上面的模式就不合适了,Redis Stream 中增加了 Consumer Group 的概念(有的人甚至称 Redis 内置了一个 Kafka),在创建了 Consumer Group 之后,向 Stream 发布消息的时候会广播到各个 Consumer Group 中,每个 Consumer Group 的消息消费是独立的,不同的 Consumer Group 的消费速度可以不一致,一个 Consumer Group 也可以有多个 Consumer 同时运行,同一个 Group 内的多个 Consumer 是会共享一个 Consumer Group 的消息消费,而且我们可以手动进行消息的 ACK
来看下面的示例代码吧:
private const string StreamKey = "test-stream-group";
private static int _consumerCount;public static async Task MainTest()
{await RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);// register background consumer_ = await Task.Factory.StartNew(Consume).ConfigureAwait(false);_ = await Task.Factory.StartNew(Consume).ConfigureAwait(false);//await Publish();
}private static async Task Publish()
{Console.WriteLine("Press Enter to publish messages, Press Q to exit");var input = Console.ReadLine();while (input is not "q" and not "Q"){var redis = RedisHelper.GetDatabase();for (var i = 0; i < 10; i++){await redis.StreamAddAsync(StreamKey, "message", $"test_message_{i}");}input = Console.ReadLine();}
}private static async Task Consume()
{Interlocked.Increment(ref _consumerCount);var groupName = $"group-{_consumerCount}";var consumerName = $"consumer-{_consumerCount}";var redis = RedisHelper.GetDatabase();redis.StreamCreateConsumerGroup(StreamKey, groupName);while (true){await InvokeHelper.TryInvokeAsync(async () =>{var messages = await redis.StreamReadGroupAsync(StreamKey, groupName, consumerName, count: SecurityHelper.Random.Next(1, 4));if (messages.Length == 0){return;}foreach (var message in messages){Console.WriteLine($"{groupName}-{message.Id}-{message.Values.ToJson()}");await redis.StreamAcknowledgeAsync(StreamKey, groupName, message.Id);}});await Task.Delay(200);}
}
上面的示例代码会先注册两个 Consumer Group,两个 Consumer Group 内各有一个 consumer,你也可以使用多个 consumer,为了体现各个 Consumer Group 是独立的,每次获取消息的 Count 是会随机指定的,在读取的消息之后会输出消息内容来代替处理消息的逻辑,处理完成之后进行消息的 ACK,消息的发布逻辑和上面的示例是类似的
上述代码执行输出示例:
可以看到我们发布的消息,每一个 consumer group 都会处理消息,而且处理消息的速度是独立的,互不影响
通过 XINFO 命令我们可以对 Stream 做一些监控
More
利用 Redis 的 Stream 我们可以实现可靠的一个消息机制,stream 的每一条消息都会有一个消息 Id,默认是两个部分,一个部分是时间戳,另一个部分是一个序列号,消息 Id 可以自定义,但是通常情况下推荐用默认的 id
Redis 中的 List、HashSet、Set、ZSet 这些数据类型中没有元素的时候会把对应的 Key 也会删掉,但是 Stream 是不会的,Stream 允许没有消息的时候依然存在
Redis Stream 使用的时候需要注意我们是可以指定 Stream 的消息长度的,如果我们指定了最大消息长度 10000,超出 10000 的时候旧消息就会被挤出队列,可能会出现消息的丢失,需要对 Stream 做必要的监控和报警
References
https://redis.io/topics/streams-intro
https://redis.io/commands
https://github.com/WeihanLi/SamplesInPractice/tree/master/RedisSample