.Net之延迟队列

介绍

具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

使用场景

延迟队列在项目中的应用还是比较多的,尤其像电商类平台:

  1. 订单成功后,在30分钟内没有支付,自动取消订单

  2. 外卖平台发送订餐通知,下单成功后60s给用户推送短信。

  3. 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存

  4. 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

该介绍来自其他文章

方案

下面的例子没有进行封装,所以代码仅供参考

Redis过期事件

注意:

不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大

不保证一定送达

发送即忘策略,不包含持久化

但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。

redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。

配置

需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:

6a43c54fda240392d4b4e9892090fbbf.png
img
-- 取消注释
notify-keyspace-events Ex-- 注释
#notify-keyspace-events ""

然后重新启动服务器,比如windows

.\redis-server.exe  .\redis.windows.conf

或者linux中使用docker-compose重新部署redis

redis:container_name: redisimage: redishostname: redisrestart: alwaysports: - "6379:6379"volumes: - $PWD/redis/redis.conf:/etc/redis.conf- /root/common-docker-compose/redis/data:/datacommand: /bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件

然后使用客户端订阅事件

-- windows
.\redis-cli-- linux
docker exec -it 容器标识 redis-clipsubscribe __keyevent@0__:expired

控制台订阅

使用StackExchange.Redis组件订阅过期事件

var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10));
Console.WriteLine("开始订阅");var subscriber = connectionMultiplexer.GetSubscriber();//订阅库0的过期通知事件
subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
{Console.WriteLine($"key过期 channel:{channel} key:{key}");
});Console.ReadLine();

输出结果:

key过期 channel:keyevent@0:expired key:orderno:123456

如果启动多个客户端监听,那么多个客户端都可以收到过期事件。

WebApi中订阅

创建RedisListenService继承自:BackgroundService

public class RedisListenService : BackgroundService
{private readonly ISubscriber _subscriber;public RedisListenService(IServiceScopeFactory serviceScopeFactory){using var scope = serviceScopeFactory.CreateScope();var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);var db = connectionMultiplexer.GetDatabase(0);_subscriber = connectionMultiplexer.GetSubscriber();}protected override Task ExecuteAsync(CancellationToken stoppingToken){//订阅库0的过期通知事件_subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>{Console.WriteLine($"key过期 channel:{channel} key:{key}");});return Task.CompletedTask;}
}

注册该后台服务

services.AddHostedService<RedisListenService>();

启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。

RabbitMq延迟队列

版本信息   Rabbitmq版本:3.10.5  Erlang版本:24.3.4.2

要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的

插件介绍:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:

docker run -d --name myrabbit -p 9005:15672 -p 5672:5672  -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:3-management-alpine

进入容器

docker exec -it 容器名称/标识 bash

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看是否启用

rabbitmq-plugins list

[E*]和[e*]表示启用,然后重启服务

rabbitmq-server restart

然后在管理界面添加交换机都可以看到

940d9c684ce4664ad0b16c61eb456085.png
img

生产消息

发送的消息类型是:x-delayed-message

[HttpGet("send/delay")]
public string SendDelayedMessage()
{var factory = new ConnectionFactory(){HostName = "localhost",//IP地址Port = 5672,//端口号UserName = "admin",//用户账号Password = "123456",//用户密码VirtualHost = "customer"};using var connection = factory.CreateConnection();using var channel = connection.CreateModel();var exchangeName = "delay-exchange";var routingkey = "delay.delay";var queueName = "delay_queueName";//设置Exchange队列类型var argMaps = new Dictionary<string, object>(){{"x-delayed-type", "topic"}};//设置当前消息为延时队列channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);channel.QueueDeclare(queueName, true, false, false, argMaps);channel.QueueBind(queueName, exchangeName, routingkey);var time = 1000 * 5;var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";var body = Encoding.UTF8.GetBytes(message);var props = channel.CreateBasicProperties();//设置消息的过期时间props.Headers = new Dictionary<string, object>(){{  "x-delay", time }};channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);Console.WriteLine("成功发送消息:" + message);return "success";
}

消费消息

消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理

public class RabbitmqDelayedHostService : BackgroundService
{private readonly IModel _channel;private readonly IConnection _connection;public RabbitmqDelayedHostService(){var connFactory = new ConnectionFactory//创建连接工厂对象{HostName = "localhost",//IP地址Port = 5672,//端口号UserName = "admin",//用户账号Password = "123456",//用户密码VirtualHost = "customer"};_connection = connFactory.CreateConnection();_channel = _connection.CreateModel();//交换机名称var exchangeName = "exchangeDelayed";var queueName = "delay_queueName";var routingkey = "delay.delay";var argMaps = new Dictionary<string, object>(){{"x-delayed-type", "topic"}};_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);_channel.QueueDeclare(queueName, true, false, false, argMaps);_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);//声明为手动确认_channel.BasicQos(0, 1, false);}protected override Task ExecuteAsync(CancellationToken stoppingToken){var queueName = "delay_queueName";var consumer = new EventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.ToArray());var routingKey = ea.RoutingKey;Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");//手动确认_channel.BasicAck(ea.DeliveryTag, true);};_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);return Task.CompletedTask;}public override void Dispose(){_connection.Dispose();_channel.Dispose();base.Dispose();}
}

注册该后台任务

services.AddHostedService<RabbitmqDelayedHostService>();

输出结果

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

其他方案

  • Hangfire延迟队列

BackgroundJob.Schedule(() => Console.WriteLine("Delayed!"),TimeSpan.FromDays(7));
  • 时间轮

  • Redisson DelayQueue

  • 计时管理器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/285305.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言试题134之画直线

📃个人主页:个人主页 🔥系列专栏:C语言试题200例 💬推荐一款模拟面试、刷题神器👉 点击跳转进入网站 ✅作者简介:大家好,我是码莎拉蒂,CSDN博客专家(全站排名Top 50),阿里云博客专家、51CTO博客专家、华为云享专家 1、题目 题目:用 line 画直线 2 、温馨提示…

KeyMob应用开发者服务平台帮助开发者推广和盈利

为什么80%的码农都做不了架构师&#xff1f;>>> 2014年10月&#xff0c;新版KeyMob移动广告聚合平台正式上线&#xff0c;登入KeyMob移动广告聚合平台即可看到聚合功能&#xff0c;目前新KeyMob整合了国内外多家主流的广告平台&#xff0c;实实在在的为应用开发者赚…

Android GIS开发系列-- 入门季(13)Gdal简单写个shp文件

Gdal是用来读写栅格与矢量数据的&#xff0c;在Gdal官网&#xff0c;可以下载相关的资源进行平台的编译。其实Arcgis底层也是用Gdal来读取shp文件的&#xff0c;那在Android中可以直接读写shp文件吗&#xff0c;是可以的。这里已经有人编译了Android端的so &#xff0c;下载地址…

VS code 搭建Vue 项目

必备工具&#xff1a;Vs Code、NodeJs 1、新建一文件目录来存放工程文件 2、右键用VS Code打开 3、Ctr 打开terminal面板&#xff0c;建议先将设置镜像服务器&#xff0c;可能否则安装比较慢。 npm config set registry https://registry.npm.taobao.org --globalnpm config…

C语言试题135之画方形

📃个人主页:个人主页 🔥系列专栏:C语言试题200例 💬推荐一款模拟面试、刷题神器👉 点击跳转进入网站 ✅作者简介:大家好,我是码莎拉蒂,CSDN博客专家(全站排名Top 50),阿里云博客专家、51CTO博客专家、华为云享专家 1、题目 题目:用 rectangle 画方形 2 、温…

SpringMVC4零配置--web.xml

servlet3.0规范后&#xff0c;允许servlet&#xff0c;filter&#xff0c;listener不必声明在web.xml中&#xff0c;而是以硬编码的方式存在&#xff0c;实现容器的零配置。 ServletContainerInitializer&#xff1a;启动容器时负责加载相关配置 Java代码 package javax.servl…

GoldenGate介绍

Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件&#xff0c;它通过解析源数据库在线日志或归档日志获得数据的增量变化&#xff0c;再将这些变化应用到目标数据库&#xff0c;从而实现源数据库与目标数据库同步。Oracle Golden Gate可以在异构的IT基础结构&…

如何获取GC(垃圾回收器)的STW(暂停)时间?

前言在现代的容器化和微服务应用中&#xff0c;因为分布式的环境和错综复杂的调用关系&#xff0c;APM&#xff08;Application Performance Monitoring 应用性能监控&#xff09;显得尤为重要&#xff0c;它通过采集应用程序各种指标和请求链路&#xff0c;让你知道系统当前的…

C语言试题136之打印出杨辉三角形(要求打印出 10 行如下图)

📃个人主页:个人主页 🔥系列专栏:C语言试题200例 💬推荐一款模拟面试、刷题神器👉 点击跳转进入网站 ✅作者简介:大家好,我是码莎拉蒂,CSDN博客专家(全站排名Top 50),阿里云博客专家、51CTO博客专家、华为云享专家 1、题目 题目:打印出杨辉三角形(要求打印…

【ArcGIS风暴】ArcGIS矢量数据分层设色后导出或裁剪后颜色分类丢失完美解决办法

在利用ArcGIS做土地利用现状图或者规划图时,事先费了好大劲把每个地类对应的图斑进行了符号化(用不同的颜色表示),后来把符号化好的数据导出,并进行加载,发现颜色分类完全丢失了,同样,在利用某个重点区域范围裁剪后,颜色变成了单一的颜色。 符号化成果: 导出后加载矢…

如何使用Cmder替换cmd

一、cmder介绍 cmder是一款Windows环境下非常简洁美观易用的cmd替代者&#xff0c;它支持了大部分的Linux命令。支持ssh连接linux&#xff0c;使用起来非常方便。比起cmd、powershell、conEmu&#xff0c;其界面美观简洁&#xff0c;功能强大。 二、下载地址 地址&#xff1a…

C语言试题137之画点

📃个人主页:个人主页 🔥系列专栏:C语言试题200例 💬推荐一款模拟面试、刷题神器👉 点击跳转进入网站 ✅作者简介:大家好,我是码莎拉蒂,CSDN博客专家(全站排名Top 50),阿里云博客专家、51CTO博客专家、华为云享专家 1、题目 题目:利用putpixel 画点。 2 、温…

Delphi WinExec ShellExec 用法介绍

2019独角兽企业重金招聘Python工程师标准>>> Delphi WinExec ShellExec 用法介绍 在Windows程序设计中WinAPI也为我们提供了类似的函数&#xff0c;它们就是WinExec()和ShellExecute()&#xff0c;下面就来讨论一下这两个函数的用法。 1)WinExec() 函数原型&#x…

各大主流编程语言性能PK,结果出乎意料

出品 | OSC开源社区&#xff08;ID&#xff1a;oschina2013&#xff09;“什么编程语言速度最快”&#xff0c;为了回答这个问题&#xff0c;The Benchmarks Game 专门面向主流编程语言设计了性能测试。测试的项目包括&#xff08;可点击文末阅读原文查看详情&#xff09;&…

【MapBox】5种地图(底图)样式自由切换附源码

文章目录 一、不同地图显示样式1. 卫星2. 浅色3. 深色4. 街道5. 户外二、完整源码一、不同地图显示样式 MapBox提供了5种不同形式的地图显示样式: 1. 卫星 2. 浅色

[转]vue项目中,main.js,App.vue,index.html如何调用

1、main.js是我们的入口文件&#xff0c;主要作用是初始化vue实例&#xff0c;并引入所需要的插件 2、App.vue是我们的主组件&#xff0c;所有页面都是在App.vue下进行切换的。其实你也可以理解为所有的路由也是App.vue的子组件。所以我将router标示为App.vue的子组件。 index…

C语言试题138之画椭圆

📃个人主页:个人主页 🔥系列专栏:C语言试题200例 💬推荐一款模拟面试、刷题神器👉 点击跳转进入网站 ✅作者简介:大家好,我是码莎拉蒂,CSDN博客专家(全站排名Top 50),阿里云博客专家、51CTO博客专家、华为云享专家 1、题目 题目:画椭圆 ellipse 2 、温馨提示…

光纤熔接过程详细说明

在我们实际的网络工作当中会遇到光纤的熔接&#xff0c;由于目前光纤熔接的工艺已经成熟完善&#xff0c;对于一根六芯光纤整个熔接过程需要2名工程师&#xff0c;大约需要花费1个小时的时间就可以完成。我们首先考虑的是为光纤线缆留出足够的长度来&#xff0c;多了好办&#…

Playwright之录制

前言前段时间看了大佬分享的关于Playwright.NET的文章感觉挺有意思&#xff0c;想要阅读点击&#xff1a;此处&#xff0c;然后跟随大佬的脚步&#xff0c;学习了一点自动化玩&#xff0c;其中有一个录制功能感觉挺好玩&#xff0c;下面就来简单看看介绍手动操作浏览器&#xf…

【MapBox】在地图上创建多个Marker点,设置鼠标经过提示信息,单击跳转到链接

本文讲述在MapBox地图上创建多个Point点,然后设置鼠标经过每个点提示信息,再让每个点鼠标点击跳转到指定的不同的链接地址。 任务描述: 显示MapBox地图添加Marker设置鼠标经过提示设置Marker单击链接效果展示: 点击Marker跳转到指定的链接地址。 源码赠送: <!DOCTYPE …