介绍
具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
使用场景
延迟队列在项目中的应用还是比较多的,尤其像电商类平台:
订单成功后,在30分钟内没有支付,自动取消订单
外卖平台发送订餐通知,下单成功后60s给用户推送短信。
如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
淘宝新建商户一个月内还没上传商品信息,将冻结商铺等
该介绍来自其他文章
方案
下面的例子没有进行封装,所以代码仅供参考
Redis过期事件
注意:
不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大
不保证一定送达
发送即忘策略,不包含持久化
但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。
redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。
配置
需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:
-- 取消注释
notify-keyspace-events Ex-- 注释
#notify-keyspace-events ""
然后重新启动服务器,比如windows
.\redis-server.exe .\redis.windows.conf
或者linux中使用docker-compose重新部署redis
redis:container_name: redisimage: redishostname: redisrestart: alwaysports: - "6379:6379"volumes: - $PWD/redis/redis.conf:/etc/redis.conf- /root/common-docker-compose/redis/data:/datacommand: /bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件
然后使用客户端订阅事件
-- windows
.\redis-cli-- linux
docker exec -it 容器标识 redis-clipsubscribe __keyevent@0__:expired
控制台订阅
使用StackExchange.Redis组件订阅过期事件
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10));
Console.WriteLine("开始订阅");var subscriber = connectionMultiplexer.GetSubscriber();//订阅库0的过期通知事件
subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
{Console.WriteLine($"key过期 channel:{channel} key:{key}");
});Console.ReadLine();
输出结果:
key过期 channel:keyevent@0:expired key:orderno:123456
如果启动多个客户端监听,那么多个客户端都可以收到过期事件。
WebApi中订阅
创建RedisListenService继承自:BackgroundService
public class RedisListenService : BackgroundService
{private readonly ISubscriber _subscriber;public RedisListenService(IServiceScopeFactory serviceScopeFactory){using var scope = serviceScopeFactory.CreateScope();var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);var db = connectionMultiplexer.GetDatabase(0);_subscriber = connectionMultiplexer.GetSubscriber();}protected override Task ExecuteAsync(CancellationToken stoppingToken){//订阅库0的过期通知事件_subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>{Console.WriteLine($"key过期 channel:{channel} key:{key}");});return Task.CompletedTask;}
}
注册该后台服务
services.AddHostedService<RedisListenService>();
启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。
RabbitMq延迟队列
版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2
要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的
插件介绍:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine
进入容器
docker exec -it 容器名称/标识 bash
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看是否启用
rabbitmq-plugins list
[E*]和[e*]表示启用,然后重启服务
rabbitmq-server restart
然后在管理界面添加交换机都可以看到
生产消息
发送的消息类型是:x-delayed-message
[HttpGet("send/delay")]
public string SendDelayedMessage()
{var factory = new ConnectionFactory(){HostName = "localhost",//IP地址Port = 5672,//端口号UserName = "admin",//用户账号Password = "123456",//用户密码VirtualHost = "customer"};using var connection = factory.CreateConnection();using var channel = connection.CreateModel();var exchangeName = "delay-exchange";var routingkey = "delay.delay";var queueName = "delay_queueName";//设置Exchange队列类型var argMaps = new Dictionary<string, object>(){{"x-delayed-type", "topic"}};//设置当前消息为延时队列channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);channel.QueueDeclare(queueName, true, false, false, argMaps);channel.QueueBind(queueName, exchangeName, routingkey);var time = 1000 * 5;var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";var body = Encoding.UTF8.GetBytes(message);var props = channel.CreateBasicProperties();//设置消息的过期时间props.Headers = new Dictionary<string, object>(){{ "x-delay", time }};channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);Console.WriteLine("成功发送消息:" + message);return "success";
}
消费消息
消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理
public class RabbitmqDelayedHostService : BackgroundService
{private readonly IModel _channel;private readonly IConnection _connection;public RabbitmqDelayedHostService(){var connFactory = new ConnectionFactory//创建连接工厂对象{HostName = "localhost",//IP地址Port = 5672,//端口号UserName = "admin",//用户账号Password = "123456",//用户密码VirtualHost = "customer"};_connection = connFactory.CreateConnection();_channel = _connection.CreateModel();//交换机名称var exchangeName = "exchangeDelayed";var queueName = "delay_queueName";var routingkey = "delay.delay";var argMaps = new Dictionary<string, object>(){{"x-delayed-type", "topic"}};_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);_channel.QueueDeclare(queueName, true, false, false, argMaps);_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);//声明为手动确认_channel.BasicQos(0, 1, false);}protected override Task ExecuteAsync(CancellationToken stoppingToken){var queueName = "delay_queueName";var consumer = new EventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.ToArray());var routingKey = ea.RoutingKey;Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");//手动确认_channel.BasicAck(ea.DeliveryTag, true);};_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);return Task.CompletedTask;}public override void Dispose(){_connection.Dispose();_channel.Dispose();base.Dispose();}
}
注册该后台任务
services.AddHostedService<RabbitmqDelayedHostService>();
输出结果
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
其他方案
Hangfire延迟队列
BackgroundJob.Schedule(() => Console.WriteLine("Delayed!"),TimeSpan.FromDays(7));
时间轮
Redisson DelayQueue
计时管理器