TPL Dataflow .Net 数据流组件,了解一下?

回顾上文

  作为单体程序,依赖的第三方服务虽不多,但是2C的程序还是有不少内容可讲; 作为一个常规互联网系统,无外乎就是接受请求、处理请求,输出响应。

由于业务渐渐增长,数据处理的过程会越来越复杂和冗长,【连贯高效的处理数据】 越来越被看重,  .Net 提供了TPL  Dataflow组件使我们更高效的实现基于数据流和 流水线操作的代码

    下图是单体程序中 数据处理的用例图。

 

 程序中用到的TPL Dataflow 组件,Dataflow是微软前几年给出的数据处理库, 是由不同的处理块组成,可将这些块组装成一个处理管道,"块"对应处理管道中的"阶段", 可类比AspNetCore 中Middleware 和pipeline.。

  • TPL Dataflow库为消息传递和并行化CPU密集型和I / O密集型应用程序提供了编程基础,这些应用程序具有高吞吐量和低延迟。它还可以让您明确控制数据的缓冲方式并在系统中移动。

  • 为了更好地理解数据流编程模型,请考虑从磁盘异步加载图像并创建这些图像的应用程序。
    •   传统的编程模型通常使用回调和同步对象(如锁)来协调任务和访问共享数据, 从宏观看传统模型: 任务是一步步紧接着完成的

    •   通过使用数据流编程模型,您可以创建在从磁盘读取图像时处理图像的数据流对象。在数据流模型下,您可以声明数据在可用时的处理方式以及数据之间的依赖关系。由于运行时管理数据之间的依赖关系,因此通常可以避免同步访问共享数据的要求。此外,由于运行时调度基于数据的异步到达而工作,因此数据流可以通过有效地管理底层线程来提高响应性和吞吐量。    也就是说: 你定义的是任务内容和任务之间的依赖,不关注数据什么时候流到这个任务 。

  •    需要注意的是:TPL Dataflow 非分布式数据流,消息在进程内传递,   使用nuget引用 System.Threading.Tasks.Dataflow 包。

TPL Dataflow 核心概念

 1.  Buffer & Block

TPL Dataflow 内置的Block覆盖了常见的应用场景,当然如果内置块不能满足你的要求,你也可以自定“块”。

Block可以划分为下面3类:

  • Buffering Only    【Buffer不是缓存Cache的概念, 而是一个缓冲区的概念】

  • Execution

  • Grouping 

使用以上块混搭处理管道, 大多数的块都会执行一个操作,有些时候需要将消息分发到不同Block,这时可使用特殊类型的缓冲块给管道“”分叉”。

2. Execution Block

可执行的块有两个核心组件:
  • 输入、输出消息的缓冲区(一般称为Input,Output队列)

  • 在消息上执行动作的委托

  消息在输入和输出时能够被缓冲:当Func委托的运行速度比输入的消息速度慢时,后续消息将在到达时进行缓冲;当下一个块的输入缓冲区中没有容量时,将在输出时缓冲。

每个块我们可以配置:

  • 缓冲区的总容量, 默认无上限

  • 执行操作委托的并发度, 默认情况下块按照顺序处理消息,一次一个。

我们将块链接在一起形成一个处理管道,生产者将消息推向管道。

TPL Dataflow有一个基于pull的机制(使用Receive和TryReceive方法),但我们将在管道中使用块连接和推送机制。

  • TransformBlock(Execution category)-- 由输入输出缓冲区和一个Func<TInput, TOutput>委托组成,消费的每个消息,都会输出另外一个,你可以使用这个Block去执行输入消息的转换,或者转发输出的消息到另外一个Block。

  • TransformManyBlock (Execution category) -- 由输入输出缓冲区和一个Func<TInput, IEnumerable<TOutput>>委托组成, 它为输入的每个消息输出一个 IEnumerable<TOutput>

  • BroadcastBlock (Buffering category)-- 由只容纳1个消息的缓冲区和Func<T, T>委托组成。缓冲区被每个新传入的消息所覆盖,委托仅仅为了让你控制怎样克隆这个消息,不做消息转换。

            该块可以链接到多个块(管道的分叉),虽然它一次只缓冲一条消息,但它一定会在该消息被覆盖之前将该消息转发到链接块(链接块还有缓冲区)。

  • ActionBlock (Execution category)-- 由缓冲区和Action<T>委托组成,他们一般是管道的结尾,他们不再给其他块转发消息,他们只会处理输入的消息。

  • BatchBlock (Grouping category)-- 告诉它你想要的每个批处理的大小,它将累积消息,直到它达到那个大小,然后将它作为一组消息转发到下一个块。

  还有一下其他的Block类型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,我们暂时不会深入。

3. Pipeline Chain React

  当输入缓冲区达到上限容量,为其供货的上游块的输出缓冲区将开始填充,当输出缓冲区已满时,该块必须暂停处理,直到缓冲区有空间,这意味着一个Block的处理瓶颈可能导致所有前面的块的缓冲区被填满。

  但是不是所有的块变满时,都会暂停,BroadcastBlock 有允许1个消息的缓冲区,每个消息都会被覆盖, 因此如果这个广播块不能将消息转发到下游,则在下个消息到达的时候消息将丢失,这在某种意义上是一种限流(比较生硬).

编程实践

    将按照上图实现TPL Dataflow 

①  定义Dataflow  pipeline
        public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory){_httpClient = httpClientFactory.CreateClient("bce-request");_redisDB0 = redisCache[0];_redisDB = redisCache;_logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));var option = new DataflowLinkOptions { PropagateCompletion = true };publisher = _redisDB.RedisConnection.GetSubscriber();_eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>(// redis piublih 没有做在TransformBlock fun里面, 因为publih失败可能影响后续的block传递eqidPair => EqidResolverAsync(eqidPair),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")});// https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline_logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);_logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );_broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容纳一个消息的缓存区和拷贝函数组成
            _broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);_broadcastBlock.LinkTo(_logPublishBlock, option);_eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);}
public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase{private readonly string _dirPath;private readonly Timer _triggerBatchTimer;private readonly Timer _openFileTimer;private DateTime? _nextCheckpoint;private TextWriter _currentWriter;private readonly LogHead _logHead;private readonly object _syncRoot = new object();private readonly ILogger _logger;private readonly BatchBlock<T> _packer;private readonly ActionBlock<T[]> batchWriterBlock;private readonly TimeSpan _logFileIntervalTimeSpan;/// <summary>/// Generate  request log file./// </summary>public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory){_logger = loggerFactory.CreateLogger<LogBatchBlock<T>>();_dirPath = logConfig.DirPath;if (!Directory.Exists(_dirPath)){Directory.CreateDirectory(_dirPath);}_logHead = logConfig.LogHead;_packer = new BatchBlock<T>(logConfig.BatchSize);batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models));     _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });_triggerBatchTimer = new Timer(state =>{_packer.TriggerBatch();}, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period));_logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval);_openFileTimer = new Timer(state =>{AlignCurrentFileTo(DateTime.Now);}, null, TimeSpan.Zero, _logFileIntervalTimeSpan);}public ITargetBlock<T> InputBlock => _packer;private void AlignCurrentFileTo(DateTime dt){if (!_nextCheckpoint.HasValue){OpenFile(dt);}if (dt >= _nextCheckpoint.Value){CloseFile();OpenFile(dt);}}private void OpenFile(DateTime now, string fileSuffix = null){string filePath = null;try{var currentHour = now.Date.AddHours(now.Hour);_nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan);int hourConfiguration = _logFileIntervalTimeSpan.Hours;int minuteConfiguration = _logFileIntervalTimeSpan.Minutes;filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log";var appendHead = !File.Exists(filePath);if (filePath != null){var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write);var sw = new StreamWriter(stream, Encoding.Default);if (appendHead){sw.Write(GenerateHead());}_currentWriter = sw;_logger.LogDebug($"{currentHour} TextWriter has been created.");}}catch (UnauthorizedAccessException ex){_logger.LogWarning("I/O error or specific type of scecurity error,{0}", ex);throw;}catch (Exception e){if (fileSuffix == null){_logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}.", e.StackTrace);OpenFile(now, $"-{Guid.NewGuid()}");}else{_logger.LogError($"OpenFile failed after retry: {filePath}", e);throw;}}}private void CloseFile(){if (_currentWriter != null){_currentWriter.Flush();_currentWriter.Dispose();_currentWriter = null;_logger.LogDebug($"{DateTime.Now} TextWriter has been disposed.");}_nextCheckpoint = null;}private string GenerateHead(){StringBuilder head = new StringBuilder();head.AppendLine("#Software: " + _logHead.Software).AppendLine("#Version: " + _logHead.Version).AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}").AppendLine("#Fields: " + _logHead.Fields);return head.ToString();}private void WriteToFile(T[] models){try{lock (_syncRoot){var flag = false;foreach (var model in models){if (model == null)continue;flag = true;AlignCurrentFileTo(model.ServerLocalTime);_currentWriter.WriteLine(model.ToString());}if (flag)_currentWriter.Flush();}}catch (Exception ex){_logger.LogError("WriteToFile Error : {0}", ex.Message);}}public bool AcceptLogModel(T model){return _packer.Post(model);}public string GetDirPath(){return _dirPath;}public async Task CompleteAsync(){_triggerBatchTimer.Dispose();_openFileTimer.Dispose();_packer.TriggerBatch();_packer.Complete();await InputBlock.Completion;lock (_syncRoot){CloseFile();}}}
仿IIS日志存储代码

② 异常处理

  上述程序在部署时就遇到相关的坑位,在测试环境_eqid2ModelTransformBlock 内Func委托稳定执行,程序并未出现异样;

  部署到生产之后, 该Pipeline会运行一段时间就停止工作,一直很困惑, 后来通过监测_eqid2ModelTransformBlock.Completion 属性,该块提前进入“完成态”   :   程序在执行某次Func委托时报错,Block提前进入完成态

TransfomrBlock.Completion 一个Task对象,当TPL Dataflow不再处理消息并且能保证不再处理消息的时候,就被定义为完成态, Task对象的TaskStatus枚举值将标记此Block进入完成态的真实原因

- TaskStatus.RanToCompletion       根据Block定义的任务成功完成

- TaskStatus.Fault                            因为未处理的异常 导致"过早的完成"

- TaskStatus.Cancled                       因为取消操作 导致 "过早的完成"

  我们需要小心处理异常, 一般情况下我们使用try、catch包含所有的执行代码以确保所有的异常都被处理。

 

    可将TPL Dataflow 做为进程内消息队列,本文只是一个入门参考,更多复杂用法还是看官网, 你需要记住的是, 这是一个.Net 进程内数据流组件, 能让你专注于流程。

 

作者:JulianHuang

感谢您的认真阅读,如有问题请大胆斧正;觉得有用,请下方或加关注。

本文欢迎转载,但请保留此段声明,且在文章页面明显位置注明本文的作者及原文链接。

转载于:https://www.cnblogs.com/JulianHuang/p/11177766.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/569150.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

推荐系统实例

协同过滤与隐语义模型 在机器学习问题中&#xff0c;我们见到的数据集通常是如下的格式&#xff1a; input target ... ... &#xff0c;一个输入向量的集合以及对应的数据集合,就是我们想要去预测的值。 对于…

【转】深入理解JavaScript闭包(closure)

文章来源&#xff1a;http://www.felixwoo.com/archives/247 最近在网上查阅了不少Javascript闭包(closure)相关的资料&#xff0c;写的大多是非常的学术和专业。对于初学者来说别说理解闭包了&#xff0c;就连文字叙述都很难看懂。撰写此文的目的就是用最通俗的文字揭开Java…

从头开始建立神经网络翻译及扩展

目录翻译从头开始建立神经网络-简介导包和配置生成一个数据集实现用来展示决策边界的辅助函数Logistic Regression训练一个神经网络我们的神经网络如何进行预测学习神经网络的参数实现神经网络训练一个隐层有3个神经元的神经网络验证隐层神经元个数对神经网络的影响练习练习题解…

对比 C++ 和 Python,谈谈指针与引用

花下猫语&#xff1a;本文是学习群内 樱雨楼 小姐姐的投稿。之前已发布过她的一篇作品《当谈论迭代器时&#xff0c;我谈些什么&#xff1f;》&#xff0c;大受好评。本文依然是对比 C 与 Python&#xff0c;来探讨编程语言中极其重要的概念。祝大家读有所获&#xff0c;学有所…

《吴恩达深度学习》第一课第四周任意层的神经网络实现及BUG处理

目录一、实现1、吴恩达提供的工具函数sigmoidsigmoid求导relurelu求导2、实现代码导包和配置初始化参数前向运算计算损失后向运算更新参数组装模型3、问题及思考一、实现 1、吴恩达提供的工具函数 这几个函数这里只是展示一下&#xff0c;这是吴恩达写好的工具类&#xff0c;…

球形坐标和Cartesian 坐标的转换 spherical coordinate

spherical coordinate 和cartesian坐标的转换&#xff0c; 个人认为在控制camera的时候最为有用&#xff0c;比如CS中的操作方式&#xff0c; 鼠标负责方向的改变&#xff0c;其恰恰就是球形坐标的改变。而camera的位置改变就是cartesian的改变&#xff0c;所以这两者的转换就必…

【HANA系列】SAP HANA Studio出现Fetching Children...问题

公众号&#xff1a;SAP Technical本文作者&#xff1a;matinal原文出处&#xff1a;http://www.cnblogs.com/SAPmatinal/ 原文链接&#xff1a;【ABAP系列】SAP HANA Studio出现"Fetching Children..."问题前言部分 大家可以关注我的公众号&#xff0c;公众号里的排版…

朴素Bayse新闻分类实践

目录1、信息增益&#xff08;互信息&#xff09;介绍&#xff08;1&#xff09;西瓜书中的信息增益[^1]&#xff08;2&#xff09;PRML中的互信息[^2]&#xff08;3&#xff09; 其实他们是一个东西2、朴素Bayse新闻分类[^3]&#xff08;1&#xff09;常量及辅助函数&#xff0…

【数据仓库】OLTP系统和OLAP系统区别

OLTP&#xff1a;联机事务处理系统(OnLine Transaction Processing) OLAP&#xff1a;联机分析处理系统(OnLine Analytical Processing) 参考文档&#xff1a; 操作数据库系统(OLTP)和联机分析处理系统(OLAP)的区别转载于:https://www.cnblogs.com/badboy200800/p/11189478.htm…

Good Numbers(HDU5447+唯一分解)

题目链接 传送门 题面 题意 首先定义对于\(k\)的好数\(u\)&#xff1a;如果\(u\leq k\)且\(u\)的所有质因子与\(k\)的质因子一样则称\(u\)对于\(k\)是一个好数。 现给你两个数\(k1,k2(1\leq k1,k2\leq 10^{24})\)&#xff0c;要你求\(k1,k2\)的好数个数&#xff0c;对于\(k1,k2…

从机器码到面向对象

1.从机器码到面向对象 本章节主要探讨是什么驱动着编程从机器码发展到了汇编语言&#xff0c;又从汇编语言发展到了面向过程编程&#xff0c;最后从面向过程编程发展到面向对象编程。通过这些探讨最终明确多年来的软件工程发展我们都解决了哪些棘手的问题。 1.1机器码 在真正…

spfa_队列

spfa:1.当给定的图存在负权边时&#xff0c;Dijkstra等算法便没有了用武之地&#xff0c;而Bellman-Ford算法的复杂度又过高&#xff0c;SPFA算法便派上用场了.2.我们约定有向加权图G不存在负权回路&#xff0c;即最短路径一定存在3.思路&#xff1a;用数组d记录每个结点的最短…

Tomcat配置解析

Tomcat文件配置 tomcat解压后目录 bin&#xff1a;可执行文件&#xff08;startup.bat shutdown.bat) conf&#xff1a;配置文件&#xff08;server.xml&#xff09; lib&#xff1a;tomcat依赖的jar文件 log&#xff1a;日志文件&#xff08;记录出错等信息&#xff09; temp&…

教你配置安全的ProFTPD服务器(中)

二、 基本加固ProFTPD服务器步骤 1.升级版本 注&#xff1a;如果当前版本已经是最新版本&#xff0c;可以跳过第一步。 升级陈旧的ProFTPD版本&#xff0c;因为早期的ProFTPD版本存在的安全漏洞。对于一个新配置的ProFTPD服务器来说使用最新稳定版本是最明智的选择&#xff0c;…

Java 将Word转为PDF、PNG、SVG、RTF、XPS、TXT、XML

同一文档在不同的编译或阅读环境中&#xff0c;需要使用特定的文档格式来打开&#xff0c;通常需要通过转换文档格式的方式来实现。下面将介绍在Java程序中如何来转换Word文档为其他几种常见文档格式&#xff0c;如PDF、图片png、svg、xps、rtf、txt、xml等。 使用工具&#xf…

CentOS7上GitLab的使用

生成SSH Keys 生成root账号的ssh key # ssh-keygen -t rsa -C "adminexample.com" 显示pub key的值 # cat ~/.ssh/id_rsa.pub 复制显示出来的 pub key 以root账号登陆gitlab&#xff0c;点击 "profile settings" 然后点击 "SSH Keys" 将复制的pu…

数据库:除运算

除运算 设关系R除以关系S的结果为关系T&#xff0c;则T包含所有在R但不在S中的属性及其值&#xff0c;则T的原则与S的元组的所有组合都在R中。用象集来定义除法&#xff1a;给定关系R&#xff08;X&#xff0c;Y&#xff09;和S&#xff08;Y&#xff0c;Z&#xff09;。其中X&…

[图解tensorflow源码] 入门准备工作附常用的矩阵计算工具[转]

[图解tensorflow源码] 入门准备工作附常用的矩阵计算工具[转] Link: https://www.cnblogs.com/yao62995/p/5773142.html tensorflow使用了自动化构建工具bazel、脚本语言调用c或cpp的包裹工具swig、使用EIGEN作为矩阵处理工具、Nvidia-cuBLAS GPU加速计算库、结构化数据存储格式…

现共收到 5 个分组,其目的地址分别为: (1) 128.96.40.10 (2) 128.96.41.12 (3) 128.96.41.151 (4) 192.4.123.17 (5) 192.4.

计算目的地址的下一跳&#xff1a; 设某路由器建立了如表 1 所示路由表。现共收到 5 个分组&#xff0c;其目的地址分别为&#xff1a;(1) 128.96.40.10(2) 128.96.41.12(3) 128.96.41.151(4) 192.4.123.17(5) 192.4.123.90试分别计算下一跳解答&#xff1a; 用目的IP地址和路由…

【转】Docker学习_本地/容器文件互传(5)

1、查找所有容器 #docker ps a 2、找出我们想要的容器名字并查找容器长ID #docker inspect -f {{.ID}} python 3、拷贝本地文件到容器 docker cp 本地路径 容器长ID:容器路径docker cp /Users/xubowen/Desktop/auto-post-advance.py 38ef22f922704b32cf2650407e16b146bf61c221…