AspNetCore结合Redis实践消息队列

这是年中首发在博客园上的文章,个人觉得是AspNetCore结合Redis做的一次比较优秀的消息队列重构,其中对于点对点/发布-订阅的思路应该也是面试必考题。

引言

  .Net TPL Dataflow是一个进程内数据流管道,应对高并发、低延迟的要求非常有效, 但在实际Docker部署的过程中, 有一个问题一直无法回避:

单体程序部署的瞬间(服务不可用)会有少量流量无法处理;

更糟糕的情况下,迭代部署的这个版本有问题,上线后无法工作, 导致更多流量没有处理。

    背负神圣使命(巨大压力)的程序猿心生一计,为何不将单体程序改成分布式:

增加服务ReceiverApp,ReceiverApp只接受数据,WebApp只处理数据。

知识储备

    消息队列和订阅发布作为老生常谈的两个知识点被反复提及,按照JMS的规范, 官方称为点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)

点对点

    生产者发送消息到Message Queue中,然后消费者从队列中取出消息并消费。

队列会保留消息,直到他们被消费或超时; 

① MQ支持多消费者,但每个消息只能被一个消费者处理

② 发送者和消费者在时间上没有依赖性,当发送者发送消息之后,不管消费者有没有在运行(甚至不管有没有消费者),都不会影响到消息被发送到队列

③ 一般消费者在消费之后需要向队列应答成功

如果你希望发送的每个消息都应该被成功处理,你应该使用p2p模型

发布/订阅

  消息生产者将消息发布到Channel,在此之前已有多个消费者订阅该通道。

和点对点方式不同,发布到特定通道的消息会被通道订阅者收到。

通道没有暂存队列机制,发布的消息只能被当前收听的订阅者接收到

① 每个消息可以有多个订阅者

② 发布者和消费者有时间上依赖性:某通道的订阅者,必须先创建该通道订阅,才能收到消息

发布消息至通道,不关注订阅者是谁;订阅者可收听自己感兴趣的多个通道(类似于topic),也不关注发布者是谁。

③ 故如果没有订阅者,发布的消息将得不到处理;

头脑风暴

Redis内置的List数据结构能形成轻量级消息队列的效果;Redis原生支持发布/订阅 模型

如上分析, Pub/Sub模型在订阅者宕机的时候,发布的消息得不到处理,故此模型不能用于强业务的数据接收和处理。

本次采用的消息队列模型:

  • 解耦业务:新建ReceiverApp作为生产者,专注于接收并发送到队列;原有的WebApp作为消费者专注数据处理。

  • 起到削峰填谷的作用,若缩放出多个WebApp消费者容器,还能形成负载均衡的效果。 

需要关注Redis操作List结构的两个命令( 左进右出,右进左出同理):

    LPUSH  &  RPOP/BRPOP

Brpop中的B 表示"Block",是一个rpop命令的阻塞版本:若指定List没有新元素,在给定时间内,该命令会阻塞当前redis客户端连接,直到超时返回nil

AspNetCore编程实践

本次使用AspNetCore 完成RedisMQ的实践,引入Redis国产第三方开源库CSRedisCore

生产者ReceiverApp

生产者使用LPush命令向Redis List数据结构写入消息。

------------------截取自Startup.cs-------------------------
public void ConfigureServices(IServiceCollection services)
{// Redis客户端要定义成单例, 不然在大流量并发收数的时候, 会造成redis client来不及释放。另一方面也确认api控制器不是单例模式,var csredis = new CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");RedisHelper.Initialization(csredis);services.AddSingleton(csredis);services.AddMvc();
}
------------------截取自数据接收Controller-------------------
[Route("batch")]
[HttpPost]
public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs)
{if (!ModelState.IsValid)throw new ArgumentException("Http Body Payload Error.");var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}"; eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);if (eqidPairs != null && eqidPairs.Any())RedisHelper.LPush(redisKey, eqidPairs.ToArray());await Task.CompletedTask;}

消费者WebApp

    根据以上RedisMQ思路,事件消费方式是拉取pull,故需要轮询Redis  List数据结构,这里使用AspNetCore内置的BackgroundService后台服务类后台轮询消费:

关注后台Job中的循环接收方法。

public class BackgroundJob : BackgroundService
{private readonly IEqidPairHandler _eqidPairHandler;private readonly CSRedisClient[] _cSRedisClients;private readonly IConfiguration _conf;private readonly ILogger _logger;public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory){_eqidPairHandler = eqidPairHandler;_cSRedisClients = csRedisClients;_conf = conf;_logger = loggerFactory.CreateLogger(nameof(BackgroundJob));}protected override async Task ExecuteAsync(CancellationToken stoppingToken){_logger.LogInformation("Service starting");if (_cSRedisClients[0] == null){_cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0);}RedisHelper.Initialization(_cSRedisClients[0]);while (!stoppingToken.IsCancellationRequested){var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";var eqidpair = RedisHelper.BRPop(5, key);if (eqidpair != null)await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));// 强烈建议无论如何休眠一段时间,防止突发大流量导致WebApp进程CPU满载,自行根据场景设置合理休眠时间await Task.Delay(10, stoppingToken);}_logger.LogInformation("Service stopping");}
}

迭代验证

使用docker-compose单机部署Nginx,ReceiverApp,WebApp容器。

docker-compose up指令默认只会重建[Service配置或Image变更]的容器。

If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation, docker-compose up picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the --no-recreate flag.

做一次迭代验证,更新docke-compose.yml文件WebApp服务的镜像版本,

docker-compose up;

下图显示仅 数据处理容器 WebApp被Recreate:

Nice,分布式改造完成,效果很明显,现在可以放心安全的迭代核心WebApp数据处理程序。

+ https://redis.io/commands/brpop

+ https://redis.io/commands/lpush

文字+制图,均为原创,

扫码点赞,
让干货飞一会。
............

往期推荐  

TPL Dataflow组件应对高并发,低延迟要求

docker stack,docker-compose前世今生

点赞的朋友年后老板加鸡腿!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/312542.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构:链表(c语言)

1.链表简介&#xff1a; 链表和数组一样&#xff0c;都是存储数据&#xff0c;链表是非连续&#xff0c;非顺序的存储结构 链表是灵活的内存动态管理&#xff08;随机分配空间&#xff09;&#xff0c;删除创建结点非常方便 链表组成&#xff1a;由一系列结点组成 链表结点&…

提升Azure App Service的几个建议

本文介绍了6个技巧&#xff0c;这些技巧可以改善Azure App Service托管应用程序的性能。其中一些技巧是你现在就可以进行的配置变更&#xff0c;而其他技巧则可能需要对应用程序进行一些重新设计和重构&#xff0c; 本文的几个技巧对于常规企业部署依旧有指引作用。长话短说开发…

单向链表的逆转(数据结构)(c语言)

逆转单向链表的意思是&#xff1a;给定你一个单向链表&#xff0c;一个整数n&#xff08;n为要逆转的结点数&#xff09;&#xff0c;要求你把链表从头结点到第n个结点给逆转过来 图示&#xff1a; 给出一个单向链表&#xff0c;一个整数n4。也就是要求把该链表从头结点&#x…

广东职业教育信息化研究会2019年会暨区块链专题研讨会

兹定于2019年12月28日&#xff08;星期六&#xff09;上午9:30召开广东职业教育信息化研究会2019年会暨专题研讨会&#xff0c;本次会议由广东职业教育信息化研究会主办&#xff0c;华南师范大学网络教育学院协办。会议地址&#xff1a;广州市天河区中山大道西55号华南师范大学…

Serial.println()和Serial.print() (Arduino编程)

Arduino的输出基本就用到两个函数&#xff1a;print和println 区别是后者比前者多了回车换行 Serial.println(data)从串行端口输出数据&#xff0c;跟随一个回车&#xff08;ASCII 13或’r’&#xff09;和一个换行符&#xff08;ASCII 10或’n’&#xff09;&#xff0c;这个…

如何快速融入一个团队?

作者&#xff1a;邹溪源&#xff0c;长沙资深互联网从业者&#xff0c;架构师社区特邀嘉宾&#xff01;一我们难免需要离开一个圈子&#xff0c;加入一个陌生的集体。毋庸置疑&#xff0c;离开熟知的圈子&#xff0c;走向未知的圈子难免会产生许多畏惧甚至情怯&#xff0c;这都…

使用Arduino开发ESP32:wifi基本功能使用

1.建立网络&#xff08;AP&#xff09; 2.连接网络&#xff08;STA&#xff09; 3.扫描网络 1.建立网络&#xff08;AP&#xff09; 只需两步&#xff1a;1.引用WiFi库include<WiFi.h> 2.启动AP网络WiFi.softAP(ssid) 将下面代码上传到模块中&#xff1a; #include &l…

关于C#异步编程你应该了解的几点建议

前段时间写了一篇关于C#异步编程入门的文章&#xff0c;你可以点击《C#异步编程入门看这篇就够了》查看。这篇文章我们来讨论下关于C#异步编程几个不成文的建议&#xff0c;希望对你写出高性能的异步编程代码有所帮助。注&#xff1a;本文的很多内容都是学习《Effective C#》的…

数据库分区

一、分区原理分区并不是生成新的数据表&#xff0c;而是将表的数据均衡分摊到不同的硬盘&#xff0c;系统或是不同服务器存储介子中&#xff0c;实际上还是一张表。要实现这一功能&#xff0c;首先要了解数据库对水平分区表进行分区存储的原理。数据库分区和分表相似&#xff0…

(Arduino编程)Serial(串行通信)函数

串行端口用于Arduino和个人电脑或其他设备进行通信。所有Arduino控制器都有至少一个串行端口&#xff08;也称为UART或者USART&#xff09;。个人电脑可以通过USB端口与Arduino的引脚0(RX)和引脚1(TX) 进行通信。所以当Arduino的引脚0和引脚1用于串行通信功能时&#xff0c;Ard…

Arduino_esp32_WiFi代码

#include<WiFi.h>const char* ssid"BlackWalnut"; const char* password"blackwalnut";void setup() {Serial.begin(115200);while(WiFi.status()!WL_CONNECTED){delay(2500);WiFi.begin(ssid,password);Serial.println("正在连接...");}S…

如何在 C# 平台调用云开发?

▌关于作者苏震巍&#xff0c;云开发Linker计划成员&#xff0c;《微信开发深度解析》作者、Senparc.Weixin 微信 SDK 作者、微软最有价值专家&#xff08;MVP&#xff09;、盛派网络创始人兼首席架构师、微软 Ignite 技术大会讲师、从事软件及互联网研发已有26年&#xff0c;发…

如何打造组织级敏捷,你想知道的都在这里!

“敏捷是适应和响应变化的能力……敏捷组织将变化视为机遇&#xff0c;而不是威胁。” — Jim Highsmith注&#xff1a;Highsmith 在软件开发和 IT 行业有着超过 30 年的经验&#xff0c;曾是敏捷宣言的签署人之一&#xff0c;敏捷联盟的发起人和第一任理事&#xff0c;在很多行…

Azure DevOps Server CI - 自搭跨平台容器代理Agents

前言最近在地端(On-premises)幫團隊搭一套CI/CD流程&#xff0c;也順帶整理了一下從無到有的搭建過程&#xff0c;這次使用了docker技術來解決現有團隊使用CI/CD時讓現有CI/CD hosting環境過於複雜的問題。在開始之前&#xff0c;我先預備一下搭建的環境&#xff0c;如下:Windo…

python练习题:列表排序

Description 已知一个列表a [1,3,5,7,9]&#xff0c;系统会通过input()函数给你输入2、4、6、8中的任意一个数字&#xff0c;请你将这个数字和列表a中的数字重新排列&#xff0c;要求新列表中的数字依旧按从小到大的方式排列&#xff0c;您只需要输出新列表即可。 Input 系统会…

.Net Core 认证组件源码解析

不知不觉.Net Core已经推出到3.1了,大多数以.Net为技术栈的公司也开始逐步的切换到了Core,从业也快3年多了,一直坚持着.不管环境怎么变,坚持自己的当初的选择,坚持信仰 .Net Core是个非常优秀的框架,如果各位是从WebForm开始,一步步走到今天,自然而然就会发现.微软慢慢的开始将…

7-10 逆波兰表达式求值 (20 分)(c语言)(数据结构)

逆波兰表示法是一种将运算符&#xff08;operator&#xff09;写在操作数&#xff08;operand&#xff09;后面的描述程序&#xff08;算式&#xff09;的方法。举个例子&#xff0c;我们平常用中缀表示法描述的算式&#xff08;1 2&#xff09;*&#xff08;5 4&#xff09;…

在.NET Core下的机器学习--学习笔记

摘要.NET Core 在机器学习的应用场景&#xff0c;除了 ML .NET 还会介绍一个非常棒的開源技術 TensorFlow .NET &#xff0c; Keras .NET.讲师介绍本课内容人工智能介绍ML .NETICSharpCoreTensorFlow .NETKeras .NETSciSharp人工智能应用图像识别/物体识别自然语言/翻译搜索/知…

6-5 顺序表操作集 (20 分)(创建,查找,插入,删除)以及顺序表的理解

顺序表&#xff1a; 线性表的顺序存储 线性表的顺序存储是指在内存中用地址连续的一块存储空间顺序存放线性表的各元素 在程序设计语言中&#xff0c;一维数组在内存中占用的存储空间就是一组连续的存储区域&#xff0c;因此&#xff0c;用一维数组来表示顺序存储的数据区域是…

asp.net core 自定义基于 HttpContext 的 Serilog Enricher

asp.net core 自定义基于 HttpContext 的 Serilog EnricherIntro通过 HttpContext 我们可以拿到很多有用的信息&#xff0c;比如 Path/QueryString/RequestHeader 等请求信息, StatusCode/ResponseHeader 等响应信息&#xff0c;借助 HttpContext 我们可以在日志中记录很多有用…