C#并行编程(4):基于任务的并行

C#中的任务Task

在C#编程中,实现并行可以直接使用线程,但使用起来很繁琐;也可以使用线程池,线程池很大程度上简化了线程的使用,但是也有着一些局限,比如我们不知道作业什么时候完成,也取不到作业的返回值;解决线程池局限性的方案是使用任务。本文将总结C#中Task的使用。

类似于线程池工作项对异步操作的封装,任务是对异步操作的另一种形式的封装,这种封装抽象层次更高,让我们能够对异步操作进行更多的控制。

任务启动后,通过任务调度器TaskScheduler来调度。.NET中提供两种任务调度器,一种是线程池任务调度器,也是默认调度器,它会将任务派发给线程池工作者线程;另一种是上下文同步任务调度器,它会将任务派发给当前上下文线程,例如GUI线程。此外,我们也能自定义任务调度器,例如可以将异步IO任务派发给线程池IO线程。

Task的使用方法

隐式使用

Parallel静态类除了提供并行循环的各种重载,还提供了一个方法Parallel.Invoke。这个方法可以创建并执行一个或多个异步任务,使用方法如下:




private static void DoWork(int workId = 0)
{
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started work[{workId}].");
Thread.Sleep(3000);
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] done work[{workId}].");
}




public static void ImplicitUsingOfTask()
{
Parallel.Invoke(()=>DoWork(1),()=>DoWork(2),() => DoWork(3));
}

上例的运行结果如下:

2019/3/27 20:40:18=> Thread[9] started work[1].
2019/3/27 20:40:18=> Thread[12] started work[3].
2019/3/27 20:40:18=> Thread[10] started work[2].
2019/3/27 20:40:21=> Thread[9] done work[1].
2019/3/27 20:40:21=> Thread[12] done work[3].
2019/3/27 20:40:21=> Thread[10] done work[2].

对于简单的多任务并行,使用上述的方式很方便,但是这种方式与线程池一样,我们不能控制任务的执行或者获取任务返回值。

显式使用

相对于使用Parallel.Invoke执行并行操作,更常用的是使用TaskTask<T>提供的方法进行异步和并行处理。下面是任务最基本的使用:

Task.Run(() =>
{

});
Task.Factory.StartNew(() =>
{

});

任务的常用操作

获取任务的返回值

具有返回值的任务使用Task<T>,T可根据我们的需求指定,下面是获取任务返回值的方法。

Task<int> task = Task<int>.Factory.StartNew(() =>
{
Thread.Sleep(3000);
return DateTime.Now.Day;
});
int day = task.Result;

需要说明的是,获取任务的结果会阻塞当前线程。

等待任务完成

有时候,我们需要等待一些任务全部完成后才能执行后续操作,有时候只要多个任务中的一个完成了,就可以执行后续操作。Task提供了WaitWaitAllWaitAny等方法满足我们的需求。下面的例子展示了各种等待方法的使用。




public static void TaskWait()
{
Stopwatch watch = new Stopwatch();

#region 场景1:等待一个任务完成
Task task = Task.Run(() => DoWorkOfTask(1000));
Console.WriteLine("start wait. work duration: 1000");
watch.Start();
task.Wait();
watch.Stop();
Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
#endregion

#region 场景2:等待多个任务完成
Task[] tasks = new Task[3]
{
Task.Run(() => DoWorkOfTask(1000)),
Task.Run(() => DoWorkOfTask(2000)),
Task.Run(() => DoWorkOfTask(3000)),
};

Console.WriteLine("start wait all. work duration: min 1000 max 3000.");
watch.Restart();
Task.WaitAll(tasks);
watch.Stop();
Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
#endregion

#region 场景3:等待某个任务完成
tasks = new Task[3]
{
Task.Run(() => DoWorkOfTask(1000)),
Task.Run(() => DoWorkOfTask(2000)),
Task.Run(() => DoWorkOfTask(3000)),
};
Console.WriteLine("start wait any. work duration: min 1000 max 3000.");
watch.Restart();
Task.WaitAny(tasks);
watch.Stop();
Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
#endregion
}





private static void DoWorkOfTask(int workDuration)
{
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task[{Task.CurrentId}].");
Thread.Sleep(workDuration);
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task[{Task.CurrentId}].");
}

使用WaitWaitAllWaitAny方法时,我们可以设置超时时间或者传入取消Token,以控制等待时间。但这些方法返回布尔值,只能表明是否等待成功;假如我们需要知道所等待的任务返回值,则可以使用WhenAllWhenAny方法,这两个方法不能控制等待时间,但会返回一个完成的任务。如下例:

Task<int>[] tasks = new Task<int>[3]
{
Task<int>.Factory.StartNew(() =>
{
Console.WriteLine($"task #{Task.CurrentId} run");
Thread.Sleep(100);
Console.WriteLine($"task #{Task.CurrentId} done");
return 100;
}),
Task<int>.Factory.StartNew(() =>
{
Console.WriteLine($"task #{Task.CurrentId} run");
Thread.Sleep(500);
Console.WriteLine($"task #{Task.CurrentId} done");
return 1000;
}),
Task<int>.Factory.StartNew(() =>
{
Console.WriteLine($"task #{Task.CurrentId} run");
Thread.Sleep(1000);
Console.WriteLine($"task #{Task.CurrentId} done");
return 10000;
}),
};




Task<int> task = Task.WhenAny(tasks).Result;
Console.WriteLine($"task #{task.Id}. result {task.Result}");

Task.WhenAll 和Task.WhenAny在等待结束时,都会创建一个完成状态的任务,WhenAll将等待的所有已完成任务的结果放入创建任务的结果中,WhenAny则将等待的已完成任务放到创建任务的结果中。

任务延续

有时候,我们需要在一个任务完成时开始另一个任务。对于这种需求,我们可以使用TaskContinueWith等方法来处理。

Task task = Task.Run(() => DoWorkOfTask(3000));
task.ContinueWith(t => DoWorkOfTask(1000));

运行结果:

2019/3/27 21:25:09=> Thread[10] started task[1].

2019/3/27 21:25:12=> Thread[10] completed task[1].

2019/3/27 21:25:12=> Thread[11] started task[2].

2019/3/27 21:25:13=> Thread[11] completed task[2].


我们还可以通过TaskContinuationOptions指定延续任务的执行条件,如任务取消时或者任务出现异常时才执行,等。

子任务的使用

有时候,我们要在一个任务里面创建一些其他任务,并且还要在任务里面等待创建的任务完成,此时我们可以使用子任务。

Task parent = Task.Factory.StartNew(() =>
{
Console.WriteLine($"parent task #{Task.CurrentId} run.");
for (int i = 0; i < 10; i++)
{
Task.Factory.StartNew(() =>
{
Console.WriteLine($"child task #{Task.CurrentId} run.");
Thread.Sleep(1000);
Console.WriteLine($"child task #{Task.CurrentId} done.");
}, TaskCreationOptions.AttachedToParent);
}
});
parent.Wait();
Console.WriteLine($"parent task #{parent.Id} done.");

在一个任务中创建的新任务,默认情况下与父级任务是分离的,各自的运行不受影响,除非在创建任务时显式附加到父级任务中。例如,上例中如果不指定TaskCreationOptions.AttachedToParent,parent.Wait()就不会持续到所有子任务都执行完成。

任务的取消

我们在启动任务时,传入取消令牌CancellationToken,当收到取消请求时,抛出取消异常并在等待任务完成时捕获异常TaskCanceledException。我们通过这种方式控制任务的取消。




public static void TaskCancle()
{
Console.WriteLine("Press any key to begin. Press 'c' to cancel. ");
Console.ReadKey(true);
Console.WriteLine();

CancellationTokenSource tokenSource = new CancellationTokenSource();
ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();

Task task1 = Task.Factory.StartNew(() => DoWorkOfTask(5000, tokenSource.Token), tokenSource.Token);
tasks.Add(task1);


Task task2 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i++)
{
int duration = 1000 * i;
tasks.Add(Task.Factory.StartNew(()=>DoWorkOfTask(duration, tokenSource.Token), tokenSource.Token));
}
DoWorkOfTask(5000,tokenSource.Token);
}, tokenSource.Token);
tasks.Add(task2);

char ch = Console.ReadKey().KeyChar;
if (ch == 'c' || ch == 'C')
{
tokenSource.Cancel();
Console.WriteLine($"{DateTime.Now}=> Task cancellation requested.");
}

try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ae)
{
foreach (Exception ex in ae.InnerExceptions)
{
TaskCanceledException tce = ex as TaskCanceledException;
string cancelledTask = tce == null ? string.Empty : $"Task #{tce.Task.Id}";
Console.WriteLine($"Exception: {ex.GetType().Name}. {cancelledTask}");
}
}
finally
{
tokenSource.Dispose();
}

Console.WriteLine();

foreach (Task task in tasks)
{
Console.WriteLine($"Task: #{task.Id} now is {task.Status}");
}
}

private static void DoWorkOfTask(int workDuration, CancellationToken cancleToken)
{
if (cancleToken.IsCancellationRequested)
{
Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled before it got started.");
cancleToken.ThrowIfCancellationRequested();
}

Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task #{Task.CurrentId}.");
Thread.Sleep(workDuration);

if (cancleToken.IsCancellationRequested)
{
Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled.");
cancleToken.ThrowIfCancellationRequested();
}
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task #{Task.CurrentId}.");
}

任务的异常处理

上面提到通过取消令牌抛出TaskCanceledException的方式控制任务的取消,实际上,Task会把自身执行过程中的所有异常都包装到一个AggregateException中,并传回调用线程。我们在主线程中通过捕获AggregateException来进行异常处理。

简单的处理方式

我们可以在任务的调用线程捕获并遍历AggregateException的内部异常,或者使用AggregateException提供的Handle方法进行处理,如下:

Task task = Task.Run(() =>
{
throw new Exception($"Task #{Task.CurrentId} thrown an exception");
});
try
{
task.Wait();
}
catch (AggregateException ae)
{

foreach (Exception ex in ae.InnerExceptions)
{
Console.WriteLine($"foreach: {ex.Message}");
}


ae.Handle(ex=>
{
Console.WriteLine($"handle: {ex.Message}");
return true ;
});
}

使用延续任务处理任务的异常

有时候,我们可以给任务附加一个任务异常时才会执行的延续任务,并在延续任务中进行异常处理。

Task.Run(() => { throw new Exception($"Task #{Task.CurrentId} thrown an exception"); })
.ContinueWith(t =>
{
Console.WriteLine($"{t.Exception?.InnerException?.Message}");
}, TaskContinuationOptions.OnlyOnFaulted);

嵌套任务的异常处理

下面是一个3层嵌套的任务。

Task parent = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i++)
{
Task.Factory.StartNew(() =>
{
for (int j = 0; j < 10; j++)
{
Task.Factory.StartNew(() =>
{
throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
}

throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
}

throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
try
{
parent.Wait();
}
catch (AggregateException ae)
{
ae.Flatten().Handle(ex =>
{
Console.WriteLine(ex.Message);
return true;
});
}

运行上面的代码只会得到一行输出:

Task #1 thrown an exception.

看起来有点奇怪,为什么只捕获到一个异常呢?其实也是在情理之中的:任务默认只会把自身异常传递到它自己的调用线程,子任务是在父任务中调用的,其异常只会传递到父任务的执行线程,所以我们在父任务的调用线程,也就是我们的主线程中是捕获不到子任务的异常的。

取消上面代码的两处/*, TaskCreationOptions.AttachedToParent*/,就会捕获到所有异常。

任务调度器

.NET提供的任务调度器

任务是由TaskScheduler调度的,启动任务时,默认使用线程池任务调度器,任务将会被派发到线程池工作线程。线程池的调度前面已经总结过,这里不再展开。.NET提供的另一种任务调度器是同步上下文调度器,用TaskScheduler.FromCurrentSynchronizationContext()获取,这个调度器会把任务派发给当前的上下文线程,常用在GUI应用程序中。

例如,我们在一个窗体中新建一个ListBox,新建几个任务向其中添加项,代码如下:

this.lbxMsg.Items.Add($"{DateTime.Now:O}=>Current thread is thread #{Thread.CurrentThread.ManagedThreadId} .");
for (int i = 0; i < 10; i++)
{
new Task(() =>
{
for (int j = 0; j < 3; j++)
{
this.lbxMsg.Items.Add($"{DateTime.Now:O}=> Task #{Task.CurrentId} add an item with thread #{Thread.CurrentThread.ManagedThreadId}.");
}

}).Start(TaskScheduler.FromCurrentSynchronizationContext());
}

运行上面的代码可以发现创建的任务都是由界面线程执行的。这里如果使用默认的任务调度器将产生"线程间操作无效"的异常。

实际使用时,可以给一个异步任务添加延续任务,来处理异步任务的结果或者异常等。如下:

Task.Run(() =>
{
Thread.Sleep(3000);
return 1000;
}).ContinueWith(t =>
{
this.lbxMsg.Items.Add(t.Result);
}, TaskScheduler.FromCurrentSynchronizationContext());

自定义任务调度器

除了使用.NET提供的调度器外,我们能够继承类TaskScheduler来实现自己的任务调度器。这里不再展开,需要了解的可以参考Samples for Parallel Programming with the .NET Framework。

原文地址:https://www.cnblogs.com/chenbaoshun/p/10621819.html

.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com 
640?wx_fmt=jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/316316.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

.net core 注入中的三种模式:Singleton、Scoped 和 Transient

从上篇内容不如题的文章《.net core 并发下的线程安全问题》扩展认识.net core注入中的三种模式&#xff1a;Singleton、Scoped 和 Transient我们都知道在 Startup 的 ConfigureServices 可以注入我们想要的服务&#xff0c;那么在注入的时候有三种模式可以选择&#xff0c;那么…

【西安活动】 | 4月20日「拥抱开源,又见.NET:云时代 • 新契机」

云计算日渐兴起&#xff0c;成为提升企业效率和生产力的最终解决方案&#xff0c;而云时代也为软件开发模式带来了翻天覆地的变化。可以说 .NET Core就是这个时代催生的产物。自2016年 .NET Core 1.0 发布以来&#xff0c;其强大的生命力让越来越多技术爱好者对她的未来满怀憧憬…

C#并行编程(5):需要知道的异步

异步与并行的联系大家知道“并行”是利用CPU的多个核心或者多个CPU同时执行不同的任务&#xff0c;我们不关心这些任务之间的依赖关系。但是在我们实际的业务中&#xff0c;很多任务之间是相互影响的&#xff0c;比如统计车间全年产量的运算要依赖于各月产量的统计结果。假如你…

从壹开始 [ Id4 ] 之一║ 授权服务器 IdentityServer4 开篇讲计划书

哈喽大家周四好&#xff01;时间过的很快&#xff0c;现在已经是三月份了&#xff0c;我的 IdentityServer4 教程也拖了一定的时间了&#xff0c;正好最近有精力学新东西了&#xff0c;主要中间被小伙伴要求写一个管理后台&#xff0c;目前1.0已经上线&#xff08;《权限后台系…

.NET 泛型,详细介绍

今天的文章是因为再给一个朋友讲这个的时候随手记录下整理出来的。说白了就是把前辈们曾经给我吹过的我又吹了出去。泛型&#xff1a;是C# FrameWork 2.0 时代 加入进来的&#xff0c;可以说对与Net开发人员来说泛型是无处不再的&#xff0c;喜欢看源码的同学&#xff0c;可能会…

P2257 YY的GCD

P2257 YY的GCD 题意&#xff1a; 求 1≤x≤N,1≤y≤M1 \leq x \leq N,1 \leq y \leq M1≤x≤N,1≤y≤M 且gcd(x, y) 为质数的 (x,y) 有多少对。 题解&#xff1a; 莫比乌斯反演 代码&#xff1a; #include <bits/stdc.h> #include <unordered_map> #define…

程序员修神之路--问世间异步为何物?

菜菜哥&#xff0c;今天天气挺热的&#xff0c;我都穿裙子了说吧&#xff0c;什么事&#xff1f;&#xff1f;苦笑一下..... 老大说把所有的接口都改成异步操作异步好呀&#xff0c;最少比同步能提高吞吐量异步是怎么回事呢&#xff0c;能讲讲不&#xff1f;来&#xff0c;凑近…

P3455 [POI2007]ZAP-Queries

P3455 [POI2007]ZAP-Queries 题意&#xff1a; 求满足1≤x≤a,1≤y≤b1\leq x\leq a,1\leq y\leq b1≤x≤a,1≤y≤b&#xff0c;且gcd(x,y)dgcd(x,y)dgcd(x,y)d的二元组(x,y)的数量 题解&#xff1a; 莫比乌斯反演板子 代码&#xff1a; // Problem: P3455 [POI2007]ZAP…

.NET Core 使用MailKit发送电子邮件

点击上方蓝字关注“汪宇杰博客”发送邮件通知的功能在各种系统里都很常见。我的博客也能在有新评论、新回复&#xff0c;或者文章被其他网站引用时向管理员发送邮件。那么在.NET Core里&#xff0c;如何实现发送电子邮件呢&#xff1f;准备工作我的案例会利用微软outlook.com的…

P3327 [SDOI2015]约数个数和

P3327 [SDOI2015]约数个数和 题意&#xff1a; 设 d(x) 为 x 的约数个数&#xff0c;给定 n,m&#xff0c;求 ∑i1n∑j1md(i,j)\sum_{i1}^{n}\sum_{j1}^{m}d(i,j)∑i1n​∑j1m​d(i,j) 题解&#xff1a; 代码&#xff1a; // Problem: P3327 [SDOI2015]约数个数和 // Conte…

C#并行编程(6):线程同步面面观

理解线程同步线程的数据访问在并行&#xff08;多线程&#xff09;环境中&#xff0c;不可避免地会存在多个线程同时访问某个数据的情况。多个线程对共享数据的访问有下面3种情形&#xff1a;多个线程同时读取数据&#xff1b;单个线程更新数据&#xff0c;此时其他线程读取数据…

P2522 [HAOI2011]Problem b

P2522 [HAOI2011]Problem b 题意&#xff1a; 对于给出的 n 个询问&#xff0c;每次求有多少个数对 (x,y)&#xff0c;满足 a≤x≤b&#xff0c;c≤y≤d&#xff0c;且 gcd(x,y)k&#xff0c;gcd(x,y) 函数为 x 和 y 的最大公约数。 题解&#xff1a; 这个题跟P3455 [POI20…

.NET Core/Framework 创建委托以大幅度提高反射调用的性能

都知道反射伤性能&#xff0c;但不得不反射的时候又怎么办呢&#xff1f;当真的被问题逼迫的时候还是能找到解决办法的。为反射得到的方法创建一个委托&#xff0c;此后调用此委托将能够提高近乎直接调用方法本身的性能。&#xff08;当然 Emit 也能够帮助我们显著提升性能&…

[省选联考 2020 A/B 卷] 冰火战士(树状数组上二分)

文章目录problemsolution(10pts)code(10pts)solution(30pts)code(30pts)solution(60pts)code(60pts)solution(100pts)code(100pts)problem luogu-P6619 一场比赛即将开始。 每位战士有两个属性&#xff1a;温度和能量。 有两派战士&#xff1a; 冰系战士的技能会对周围造成…

P1829 [国家集训队]Crash的数字表格 / JZPTAB

P1829 [国家集训队]Crash的数字表格 / JZPTAB 题意&#xff1a; 求∑i1n∑j1mlcm(i,j)\sum_{i1}^{n}\sum_{j1}^{m}lcm(i,j)∑i1n​∑j1m​lcm(i,j) 1<n<m<1e7 结果mod20101009 题解&#xff1a; 跟这个题P3911 最小公倍数之和很相近&#xff0c;但是本题数据范围大…

C# 跨设备前后端开发探索

每个人都拥有 好奇心&#xff0c;好奇心驱使着我们总是去尝试做一些有趣的事情。带起你的好奇心&#xff0c;本文将使用 C# 开发各种各样好玩的东西。每个人都拥有 好奇心&#xff0c;好奇心驱使着我们总是去尝试做一些有趣的事情。比如这件事&#xff1a;在好奇心的驱使下&…

Docker最全教程之Python爬网实战(二十二)

Python目前是流行度增长最快的主流编程语言&#xff0c;也是第二大最受开发者喜爱的语言&#xff08;参考Stack Overflow 2019开发者调查报告发布&#xff09;。笔者建议.NET、Java开发人员可以将Python发展为第二语言&#xff0c;一方面Python在某些领域确实非常犀利&#xff…

P2870 [USACO07DEC]Best Cow Line G

P2870 [USACO07DEC]Best Cow Line G 题意&#xff1a; 给你一个字符串&#xff0c;每次从首或尾取一个字符组成字符串&#xff0c;问所有能够组成的字符串中字典序最小的一个。 题解&#xff1a; 现在要组成字典序最小的&#xff0c;那我们每次就尽可能取小的 我们从两端开…

中间件是什么?在.NET Core中的工作原理又是怎样的呢?

本文出自《从零开始学ASP.NET CORE MVC》推荐文章&#xff1a;ASP.NET Core appsettings.json文件ASP.NET Core 中的中间件(Middleware)在这个视频中&#xff0c;我们将了解&#xff0c;ASP.NET Core 中的中间件是 什么&#xff1f;中间件很重要&#xff0c;尤其是在你想当架构…

从高德采集最新的省市区三级坐标和行政区域边界,用js在浏览器中运行

本文描述的是对国家统计局于2019-01-31发布的《2018年统计用区划代码和城乡划分代码(截止2018年10月31日)》中省市区三级的坐标和行政区域边界的采集。本文更新&#xff08;移步查阅&#xff09;&#xff1a;19-04-15 新采集了2018的省市区三级的坐标和行政区域边界数据csv格式…