TPL Dataflow组件应对高并发,低延迟要求

长话短说

2C互联网业务增长,单机多核的共享内存模式带来的排障问题、编程困难;随着多核时代和分布式系统的到来,共享模型已经不太适合并发编程,因此actor-based模型又重新受到了人们的重视。

---------------------------调试过多线程的都懂-----------------------------

  • 传统编程模型通常使用回调和同步对象(如锁)来协调任务和访问共享数据,从宏观看:若任务的执行需要某些共享资源,不可避免该任务需要关注并抢占资源。

  • actor-based模型是一种流水线模型,actor-based模型share nothing。所有的线程(或进程)通过消息传递的方式进行合作,这些线程(或进程)称为参与者actor,预先定义任务流水线后,不关注数据什么时候流到这个任务,专注完成当前任务本身。

  .Net TPL Dataflow组件帮助我们快速实现actor-based模型,当有多个必须异步通信的操作或要等待数据可用再进一步处理时,Dataflow组件非常有用。

TPL Dataflow是微软前几年给出的数据处理库, 内置常见的处理块,可将这些块组装成一个处理管道,"块"对应处理管道中的"阶段任务",可类比AspNetCore 中Middleware和Pipeline。

  • TPL Dataflow库为消息传递、CPU密集型/I-O密集型应用程序提供了编程基础, 可更明确控制数据的暂存方式、移动路线,达到高吞吐量和低延迟。

  • 需要注意的是:TPL Dataflow非分布式数据流,消息在进程内传递 。

TPL Dataflow核心概念

TPL Dataflow 内置的Block覆盖了常见的应用场景,如果内置块不能满足你的要求,你也可以自定“块”。

Block可以划分为下面3类:

  • Buffering Only   [Buffer不是缓存Cache的概念,而是一个暂存区的概念]

  • Execution

  • Grouping 

使用以上块混搭处理管道, 大多数的块都会执行一个操作,有些时候需要将消息分发到不同Block,这时可使用特殊类型的缓冲块给管道“”分叉”。

Execution Block

可执行的块有两个核心组件:

  • 输入、输出消息的暂存区(一般称为Input,Output队列)

  • 在消息上执行动作的委托

消息在输入和输出时能够被暂存:

当输入的消息速度比Func委托的执行速度比快,后续消息将在到达时暂存;

当下一个块的输入暂存区中无可用空间,将在当前块输出时暂存。

每个块我们可以配置:

  • 暂存区的总容量,默认无上限

  • 执行操作委托的并发度,默认情况下块按照顺序处理消息,一次一个。

将块链接在一起形成处理管道,生产者将消息推向管道。

TPL Dataflow有一个基于pull的机制(使用Receive和TryReceive方法),但我们将在管道中使用块连接和推送机制。

  • TransformBlock(Execution category)-- 由输入输出暂存区和一个Func<TInput, TOutput>委托组成,输入的每个消息,都会输为出另一个,可以使用这个Block去执行消息的转换,或者转发输出的消息到另外一个Block

  • TransformManyBlock (Execution category) -- 由输入输出暂存区和一个Func<TInput, IEnumerable<TOutput>>委托组成, 它为输入的每个消息输出一个IEnumerable<TOutput>

  • BroadcastBlock (Buffering category)-- 只容纳最多1个消息的暂存区和Func<T, T>委托组成(新消息到达会覆盖原消息),委托仅仅为了让你控制怎样克隆这个消息,不做消息转换

该块在需要将消息广播给多个块时很有用(管道分叉)

  • ActionBlock (Execution category)-- 由缓冲区和Action<T>委托组成,它们不再给其他块转发消息,只处理输入的消息,一般作为管道结尾

  • BatchBlock (Grouping category)-- 告诉它你想要的每个批处理的大小,它将累积消息,直到它达到那个大小,然后将它作为一组消息转发到下一个块

其他内建Block类型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,暂时不会深入。

管道连锁反应

  当B块输入缓冲区达到上限容量,为其供货的上游A块的输出暂存区将开始被填充,当A块输出暂存区已满时,该块必须暂停处理,直到暂存区有空间,这意味着一个Block的处理瓶颈可能导致所有前面的块的暂存区被填满。

    但是不是所有的块暂存区满时都会暂停,BroadcastBlock有1个消息的暂存区,每个消息都会被覆盖, 因此如果这个广播块不能及时将消息转发到下游,则在下个消息到达的时候消息将丢失,某种意义上达到一种限流效果(比较残暴).

编程实践

生产者投递消息

 可使用Post或者SendAsync方法向首块投递消息:

  • Post方法即时返回true/false,True意味着消息被block接收(暂存区有空余),false意味着拒绝了消息(暂存区已满或者Block已经出错)。

  • SendAsync方法返回一个Task<bool>, 将会以异步的方式阻塞直到块接收、拒绝、块出错。

Post、SendAsync的不同点在于SendAsync可以延迟投递(后置管道的输入buffer不空,得到异步通知后再投递)。

定义流水线管道

按照上图业务定义流水线:

public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory){_httpClient = httpClientFactory.CreateClient("bce-request");_redisDB0 = redisCache[0];_redisDB = redisCache;_logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));var option = new DataflowLinkOptions { PropagateCompletion = true };publisher = _redisDB.RedisConnection.GetSubscriber();_eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>(// redis piublih 没有做在TransformBlock fun里面, 因为publih失败可能影响后续的block传递eqidPair => EqidResolverAsync(eqidPair),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")});// https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline_logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);_logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );_broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容纳一个消息的缓存区和拷贝函数组成_broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);_broadcastBlock.LinkTo(_logPublishBlock, option);_eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);}

 仿IIS日志写入组件

异常处理

上述程序在生产部署时遇到相关的坑位:

在测试环境_eqid2ModelTransformBlock块委托函数稳定执行,程序并未出现异样; 

部署到生产之后,该Pipeline运行一段时间就停止工作,一直很困惑。

后来通过监测_eqid2ModelTransformBlock.Completion属性,发现该块在执行某次委托时报错,提前进入完成态。

当TPL Dataflow不再处理消息且保证不再处理消息的时候,就被定义为 "完成态", IDataflow.Completion属性(Task对象)标记该状态,Task对象的TaskStatus枚举值描述此Block进入完成态的真实原因。

  •  TaskStatus.RanToCompletion    "成功完成" 在Block中定义的任务  

  •  TaskStatus.Fault    因未处理的异常导致"过早的完成"

  •  TaskStatus.Canceled    因取消操作导致 "过早的完成"

官方资料表明:某块进入Fault、Cancel状态,都会导致该块提前进入“完成态”,但因Fault、Canceled进入的“完成态”会导致输入暂存区和输出暂存区被清空。

After Fault has been called on a dataflow block, that block will complete, and its Completion task will enter a final state. Faulting a block, as with canceling a block, causes buffered messages (unprocessed input messages as well as unoffered output messages) to be lost.

故需要严肃对待异常,一般情况下我们使用try、catch包含所有的执行代码以确保所有的异常都被处理。

    本文作为TPL Dataflow的入门指南(代码较多建议左下角转向原文)

微软技术栈的可持续关注actor-based模型的流水线处理组件,应对单体程序中高并发,低延迟相当巴适。

+ https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.broadcastblock-1?view=netcore-3.1

+ https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.idataflowblock.fault?redirectedfrom=MSDN&view=netcore-2.2#System_Threading_Tasks_Dataflow_IDataflowBlock_Fault_System_Exception_

文字+制图,均为原创,

扫一扫左边二维码,

让干货飞一会。
............

历史推荐

AspNetCore应用注意这一点,CTO会对你刮目相看

手撕公司SSO登录原理

.Net线程同步技术解读

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/312587.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

abp模块生命周期设计思路剖析

abp中将生命周期事件抽象为4个接口&#xff1a;//预初始化 public interface IOnPreApplicationInitialization {void OnPreApplicationInitialization([NotNull] ApplicationInitializationContext context); }//初始化 public interface IOnApplicationInitialization {void …

.Net Core + 微信赋能企业级智能客服系统--学习笔记

摘要围绕目前需求猛增的微信及移动端企业智能客服业务&#xff0c;利用 .NET Core 的一系列优秀特性及 SignalR 模块打造全双工、跨微信/QQ/钉钉等应用平台、跨系统平台、跨终端、支持企业级并发的移动端客服系统。讲师介绍目录微信应用生态简介微信小程序基础通讯原理Senparc.…

基于docker-compose的Gitlab CI/CD实践排坑指南

长话短说经过长时间实操验证&#xff0c;终于完成基于Gitlab的CI/CD实践&#xff0c;本次实践的坑位很多&#xff0c; 实操过程尽量接近最佳实践&#xff08;不做hack, 不做骚操作&#xff09;&#xff0c;记录下来加深理解。看过博客园《docker-compose真香》一文的园友留意到…

Is It a Complete AVL Tree AVL树

思路&#xff1a; 考察的点是建立AVL树以及如何判断是否为满二叉树。 建立AVL树需要搞清楚LL、LR、RR、RL四种情况如何左旋和右旋&#xff0c;如下&#xff1a; 类型BF条件操作LLBF(root)2,BF(root->lchild)1root右旋LRBF(root)2,BF(root->lchild)-1先root->lchild左…

AcWing 删减 栈思想

思路&#xff1a; 这道题要是不卡时间复杂度&#xff0c;是道大水题&#xff0c;然而字符串的长度到了6次方&#xff0c;若使用string中的erase函数&#xff0c;看似时间复杂度不高&#xff0c;其实&#xff0c;每次删除子字符串后&#xff0c;后边的字符串需要移动到前面来&am…

读《可复制的领导力》

最近很忙&#xff0c;是特别忙&#xff0c;连上厕所的时间都在回复着各种消息&#xff0c;但还是挤时间看完了《可复制的领导力》&#xff0c;这本书也是领导推荐的。说起领导力&#xff0c;大多数人都会觉得得靠悟&#xff0c;并不能做到言传身教&#xff0c;但书名中却提到了…

AcWing 构造数组 区间合并

思路&#xff1a; 这道题第一眼来看以为是动态规划类型的题目&#xff0c;然而尝试了用dp的方法做&#xff0c;然而超时了&#xff0c;过了差不多一半的测试店&#xff0c;显示的是超时。那么应该来说动态规划是可以做的&#xff0c;但数据卡的比较严。在看其他同学的评论后&am…

为什么需要动态SQL

为什么需要动态SQL在使用 EF或者写 SQL语句时&#xff0c;查询条件往往是这样一种非常常见的逻辑&#xff1a;如果客户填了查询信息&#xff0c;则查询该条件&#xff1b;如果客户没填&#xff0c;则返回所有数据。我常常看到很多人解决这类问题时使用了错误的静态 SQL的解决办…

【好文】为什么必须学好.Net Core?怎样弯道超车新年高薪?这样做,一周就够了!(文末彩蛋)...

都2020了你还不会.Net Core&#xff1f;恕我直言&#xff0c;2020年还不会.Net Core是会被淘汰的&#xff01;12月3号&#xff0c;.Net Core3.1的LTS版正式发布&#xff0c;4年来7个正式版本和几十个Preview版本&#xff0c;热烈可见一斑&#xff01;越来越多的互联网软件公司开…

基于 Kubernetes 的基础设施即代码

11 月 9、10 号两天&#xff0c;.NET 社区第一次以“.NET 大会”为品牌在上海召开了第一届峰会&#xff0c;现场与会者达到 600 人规模。大会的第 1 天是各类演讲分享&#xff0c;第 2 天有多个动手实践课。张善友队长、 刘腾飞 和我一起策划了基于 Kubernetes 的 .NET Core 微…

Steeltoe 2.4新增代码生成工具、全新入门指南等,助力.NET微服务开发

Steeltoe框架现可帮助.NET开发人员创建云原生应用。随着其功能的扩充&#xff0c;该框架越来越受欢迎&#xff0c;下载量达到580万&#xff08;并且仍在增加&#xff09;&#xff0c;这其中大部分的功能创新都源自于用户反馈、社区贡献和.NET运行环境各方面的改进。但这些还不够…

2019 年终回顾:不忘初心,负重前行

点击上方蓝字关注“汪宇杰博客”导语2019 年就要接近尾声&#xff0c;这一年对于我来说&#xff0c;有许多有意义的事件。我成长了许多&#xff0c;并依然保持着对技术的热情。在辞旧迎新之际&#xff0c;我想回顾一下我这一年中有意义的事件与收获&#xff0c;期待与大家一起在…

【C#】设计模式的学习征途系列文章目录(2019版)

Photo &#xff1a;Design Patterns文 | Edison Zhou2017年&#xff0c;我开始系统学习设计模式&#xff0c;参考了《大话设计模式》、《设计模式的艺术》等书籍&#xff0c;并通过C#语言写了各种模式的代码示例&#xff08;已经放到了我的github上并收获了120个star&#xff0…

原创 | 为什么年终奖是一个彻头彻尾的职场圈套?

0前言之前写过几篇职场专题的文章&#xff0c;反响不错&#xff0c;也先后被不少公众号转载过&#xff0c;这几天来了不少新朋友&#xff0c;如果之前没阅读过&#xff0c;可以在后台回复“职场”2个字&#xff0c;查看系列文章。转眼又到年底了&#xff0c;不知道有多少人在心…

Blazor 机制初探以及什么是前后端分离,还不赶紧上车?

上一篇文章发了一个 BlazAdmin 的尝鲜版基于 Blazui 的 Blazor 后台管理模板 BlazAdmin 正式尝鲜&#xff0c;这一次主要聊聊 Blazor 是如何做到用 C# 来写前端的&#xff0c;传送门&#xff1a;https://www.cnblogs.com/wzxinchen/p/12057171.html飚车前需要说明的一点是&…

云原生

一、云原生概念的诞生云原生&#xff08;Cloud Native&#xff09;的概念&#xff0c;由来自Pivotal的MattStine于2013年首次提出&#xff0c;被一直延续使用至今。这个概念是Matt Stine根据其多年的架构和咨询经验总结出来的一个思想集合&#xff0c;并得到了社区的不断完善&a…

基于 Kubernetes 的 CICD 基础设施即代码

在上一篇基于 Kubernetes 的基础设施即代码一文中&#xff0c;我概要地介绍了基于 Kubernetes 的 .NET Core 微服务和 CI/CD 动手实践工作坊 使用的基础设施是如何使用代码描述的&#xff0c;以及它的自动化执行过程。如果要查看基于 Kubernetes 的基础设施即代码架构全图&…

Azure Arc:微软是怎么玩多云游戏的?

混合云在竞争性云提供商的基础上提供了来自云提供商的服务&#xff0c;从而使组织能够以不同方式一起使用来自不同供应商的云服务。例如&#xff0c;组织可以使用将数据存储在一个云中存储上的功能&#xff0c;而另一个云服务商则在该应用程序或数据之上运行。因此&#xff0c;…

当我们在谈 .NET Core 跨平台时,我们在谈些什么?--学习笔记

摘要.NET Framework在过去十多年在跨平台上的尝试。.NET Core跨平台的实现有何不同&#xff1f;基于 .NET Standard的平台兼容性是如何实现的&#xff1f;讲师介绍历史枷锁.NET Framework FCL CLR"跨平台"的 .NET Framework完全独立&#xff0c;各自为政复用之殇由…

IdentityServer4学习笔记汇总(实现传送门在底部)

前言互联网时代,对信息和资源的保护越发苛刻,在所有应用中授权和认证是必不可少缺少的一部分。如果一个应用没有授权和认证那么这个应用就是不完整或者说不安全的应用。在.Net平台给我们提供了一套完整的授权认证框架,那就是IdentityServer4。它实现了OpenId Connect和OAuth2.0…