在.NET 4.0中,并行计算与多线程得到了一定程度的加强,这主要体现在并行对象Parallel,多线程Task,与PLinq。这里对这些相关的特性一起总结一下。
使用Thread方式的线程无疑是比较麻烦的,于是在这个版本中有了改善的版本Task。除了运行效率等方面的提升,Task还与并行计算紧紧联系在了一起,这些线程充分的利用了多核的优势,在一定的场合下,大幅的提高了程序的运行效率。屏蔽了运行细节的Task和Parallel方式使得程序员们完全不用编写任何针对多核的程序,只需要使用标准的类库完成任务就可以了,其它的CLR会去处理。
这一篇中先看第一个利器:多线程Task。
Task类为把线程类进行改良,使之使用起来更简便,更加容易。要想开启一个新的线程执行任务,只要调用Task.Factory.StartNew方法就可以了,执行完这个语句后线程就开始运行了。当然了,使用new初始化一个Task,然后适时调用其Start方法开始运行也是很不错的一个选择。
using System;
using System.Threading.Tasks;class Program
{static void Main(string[] args){ var task = Task.Factory.StartNew(() =>{for (int i = 0; i < 100; i++) { Console.Write('B'); }});task.ContinueWith(t =>{Console.WriteLine();Console.WriteLine("sub task {0} done", t.Id);});for (int i = 0; i < 100; i++) { Console.Write('A'); }task.Wait();}
}
注意最后的task.Wait(),调用这个方法是等待子线程执行结束,当需要等待子线程结果的时候,它最有用。
task对象还有很多有用的方法,从它们的名字就可以知道它们各自的用途了,其中实例方法如上面的ContinueWith方法,它会在task执行完毕后执行其参数指定的行为;静态方法如WaitAny,WaitAll等,它们指定了在执行多个task时主线程的等待条件。
对于多线程编程来说,启动线程并等待其自然结束是最常见的一种应用,处理也比较简单。相比而言,线程的中途终止和异常的处理要麻烦的多,难以预计的隐藏bug会出现在线程程序中。
线程的终止类型
线程执行的任务结束以后,线程就正常结束了。这里线程任务结束通常有3种情况:任务正常执行完,任务被取消,任务被异常打断结束。查询Task结束时的状态类型就是调用相关的属性,如下面的例子:
static void Main(string[] args)
{Task t = new Task(() =>{Console.WriteLine("任务开始工作……");//模拟工作过程Thread.Sleep(5000);});t.Start();t.ContinueWith((task) =>{Console.WriteLine("任务完成,完成时候的状态为:");Console.WriteLine("IsCanceled={0}\tIsCompleted={1}\tIsFaulted={2}", task.IsCanceled, task.IsCompleted, task.IsFaulted);});Console.ReadKey();
}
Task对象的三个属性IsCompleted,IsCanceled,IsFaulted就是查询线程的执行情况,不过需要注意,只要线程结束了,不管是以什么方式,IsCompleted始终返回true。IsCanceled代表程序被主动取消了,IsFaulted代表线程出现异常被动结束,这两个值都为false,代表线程是正常执行完结束的。
正常结束的情况比较简单,这里就不多说了,这里看一下子线程任务的取消问题。这在编程中是很常见的一个需求,启动了一个线程以后,发现某些条件具备了,就不需要线程继续运行了,这个时候就需要取消线程任务。
主动取消/中止线程的标准做法
在以前的版本中,我基本上是都通Thread的Abort方法强行的中止线程。在C# 4.0中,标准的取消一个线程任务的做法是使用协作式取消(Cooperative Cancellation)。协作式取消的机制是,如果线程需要被停止,那么线程自身就得负责开放给调用者这样的接口:Cancled,然后线程在工作的同时,不断以某种频率检测Cancled标识(通常是把任务主体包装到循环中),若检测到Cancled,线程自己负责退出。
下面是一个最基础的协作式取消的样例:
// 设定取消标识
CancellationTokenSource cts = new CancellationTokenSource();
Thread t = new Thread(() =>{while (true){// 检查取消标识if (cts.Token.IsCancellationRequested){Console.WriteLine("线程被终止!");break;}Console.WriteLine(DateTime.Now.ToString());Thread.Sleep(1000);}});t.Start();
Console.ReadLine();
// 主线程申请取消
cts.Cancel();
调用者使用CancellationTokenSource的Cancle方法通知工作线程退出。工作线程则以一定的的频率一边工作,一边检查是否有外界传入进来的Cancel信号。若有这样的信号,则负责退出。可以看到,在正确停止线程的机制中,真正起到主要作用的是线程本身,它负责检测相关信号并退出。
协作式取消中的关键类型是CancellationTokenSource。它有一个关键属性Token,Token是一个名为CancellationToken的值类型。CancellationToken继而进一步提供了布尔值的属性IsCancellationRequested作为需要取消工作的标识。CancellationToken还有一个方法尤其值得注意,那就是Register方法。它负责传递一个Action委托,在线程停止的时候被回调,使用方法如:
cts.Token.Register(() =>
{Console.WriteLine("工作线程被终止了。");
});
而且Task对象对CancellationTokenSource对象是天生支持的,在构造Task对象的时候就可以传进去CancellationTokenSource的实例。看一个网上的小例子:
static void Main(string[] args)
{CancellationTokenSource cts = new CancellationTokenSource();Task<int> t = new Task<int>(() => Add(cts.Token), cts.Token);t.Start();t.ContinueWith(TaskEnded);//等待按下任意一个键取消任务Console.ReadKey();cts.Cancel();Console.ReadKey();
}static void TaskEnded(Task<int> task)
{Console.WriteLine("任务完成,完成时候的状态为:");Console.WriteLine("IsCanceled={0}\tIsCompleted={1}\tIsFaulted={2}", task.IsCanceled, task.IsCompleted, task.IsFaulted);Console.WriteLine("任务的返回值为:{0}", task.Result);
}static int Add(CancellationToken ct)
{Console.WriteLine("任务开始……");int result = 0;while (!ct.IsCancellationRequested){result++;Thread.Sleep(1000);}return result;
}
不过需要注意,像上面这么写,使用ct.IsCancellationRequested判断一下,如果取消信号被设定了,则退出任务,这种情况CLR会认为是成功结束的,这切实反应了程序员的期望,IsCanceled会返回false。如果想出现IsCanceled为true的情况,那么程序就要改写成:
static void Main(string[] args)
{CancellationTokenSource cts = new CancellationTokenSource();Task<int> t = new Task<int>(() => AddCancleByThrow(cts.Token), cts.Token);t.Start();t.ContinueWith(TaskEndedByCatch);//等待按下任意一个键取消任务Console.ReadKey();cts.Cancel();Console.ReadKey();
}static void TaskEndedByCatch(Task<int> task)
{Console.WriteLine("任务完成,完成时候的状态为:");Console.WriteLine("IsCanceled={0}\tIsCompleted={1}\tIsFaulted={2}", task.IsCanceled, task.IsCompleted, task.IsFaulted);try{Console.WriteLine("任务的返回值为:{0}", task.Result);}catch (AggregateException e){e.Handle((err) => err is OperationCanceledException);}
}static int AddCancleByThrow(CancellationToken ct)
{Console.WriteLine("任务开始……");int result = 0;while (true){ct.ThrowIfCancellationRequested();result++;Thread.Sleep(1000);}return result;
}
在任务结束求值的方法TaskEndedByCatch中,如果任务是通过ThrowIfCancellationRequested方法结束的,对任务求结果值将会抛出异常OperationCanceledException,而不是得到抛出异常前的结果值。这意味着任务是通过异常的方式被取消掉的,所以可以注意到上面代码的输出中,状态IsCancled为true。同时你会发现IsFaulted状态却还是等于false。这是因为ThrowIfCancellationRequested是协作式取消方式类型CancellationTokenSource的一个方法,CLR进行了特殊的处理。CLR知道这一行程序开发者有意为之的代码,所以不把它看作是一个异常(它被理解为取消)。要得到IsFaulted等于true的状态,自己手动在一个地方抛出一个异常试试就可以了。
此外,CancellationTokenSource就是可以被多个Task共享的,这样可以取消一组任务。取消一组任务最简单的就是使用任务工厂。任务工厂支持多个任务之间共享相同的状态,如取消类型。通过使用任务工厂,可以同时取消一组任务:
static void Main(string[] args)
{CancellationTokenSource cts = new CancellationTokenSource();//等待按下任意一个键取消任务TaskFactory taskFactory = new TaskFactory();Task[] tasks = new Task[]{taskFactory.StartNew(() => Add(cts.Token)),taskFactory.StartNew(() => Add(cts.Token)),taskFactory.StartNew(() => Add(cts.Token))};//CancellationToken.None指示TasksEnded不能被取消taskFactory.ContinueWhenAll(tasks, TasksEnded, CancellationToken.None);Console.ReadKey();cts.Cancel();Console.ReadKey();
}static void TasksEnded(Task[] tasks)
{Console.WriteLine("所有任务已完成!");
}
好了,看完线程的正常取消,再来看一下线程的异常问题,这个在上面也简单说了一下,这是一种程序员不期望的线程结束的方式。
线程的异常处理
先看下面的例子:
Task.Factory.StartNew(() =>
{throw new Exception();
});
运行这段程序,你会发现根本没有异常抛出,线程中的异常会被线程忽略掉,这个是我们不需要的,我们需要知道异常发生了,并进行相应的处理。
跟踪这种问题,通常记日志是一种常用方法。此外通过技术手段去捕获这些异常时另一种方式,这是这里讨论的重点。
Task线程中未捕获的异常会在垃圾回收时终结器执行线程中被抛出。我们可以通过GC.Collect来强制垃圾回收从而引发终结器处理线程,此时Task的未捕获异常会被抛出。例如:
//在Task中抛出异常
Task.Factory.StartNew(() =>
{throw new Exception();
});
//确保任务完成
Thread.Sleep(100);
//强制垃圾回收
GC.Collect();
//等待终结器处理
GC.WaitForPendingFinalizers();
好了,异常抛出,程序崩溃了。不过这个行为在.NET 4.5中又有所改变,直接运行这个程序并不会抛出异常,而在App.config中添加如下配置以后,异常才会抛出:
<configuration><runtime><ThrowUnobservedTaskExceptions enabled="true"/></runtime>
</configuration>
抛出异常,程序崩溃并不是程序员想要的行为,我们期望的是可以捕获异常并处理之。要达到这个目的,针对Task对象,我们可以采用的手段有这么几个:调用Task.Wait/WaitAll,或者引用Task<T>.Result属性(这个在上面的例子中已经使用了),或者最简单的引用Task.Exception属性来捕获Task的异常。
例如通过Task.Wait手动捕获AggregateException:
try
{Task.WaitAll(Task.Factory.StartNew(() =>{throw new Exception();}));
}
catch (AggregateException)
{// 处理异常//......
}
这样我们就捕获到了异常并可以处理它了。
当然最简单的就是直接引用一下Task.Exception属性:
Task.Factory.StartNew(() =>
{throw new Exception();
}).ContinueWith(t => { var exp = t.Exception;// 处理异常//......
});
同样的,我们捕获了异常,并且处理掉异常就可以了,像上面例子中的处理方式就是忽略线程异常,没做任何处理。
另外,可以通过TaskContinuationOptions.OnlyOnFaulted来使得只有在发生异常时才去执行ContinueWith中指定的行为,代码如下:
Task.Factory.StartNew(() =>
{throw new Exception();
}).ContinueWith(t => { var exp = t.Exception; }, TaskContinuationOptions.OnlyOnFaulted);
最后需要说明的是TaskScheduler.UnobservedTaskException事件,该事件是所有未捕获被抛出前的最后可以将其捕获的方法。通过UnobservedTaskExceptionEventArgs.SetObserved方法来将异常标记为已捕获。
TaskScheduler.UnobservedTaskException += (s, e) =>
{//设置所有未捕获异常被捕获e.SetObserved();
};Task.Factory.StartNew(() =>
{throw new Exception();
});
好了,Task的有关问题就总结到这里了,下面将总结一下并行计算方面的知识,它们与Task对象之间其实存在着千丝万缕的联系。