一直以来很想梳理下我在开发过程中使用异步编程的心得和体会,但是由于我是APM异步编程模式的死忠,当TAP模式和TPL模式出现的时候我并未真正的去接纳这两种模式,所以导致我一直没有花太多心思去整理这两部分异步编程模型。今天在CodeProject上面偶然间看到一篇关于异步编程的文章,概括总结的非常好,省去了我自己写的麻烦,索性翻译过来,以飨各位。
在阻塞和并行编程过程中,异步和多线程是非常重要的特性。异步编程可以涉及到多线程,也可以不用涉及。如果我们能够把二者放到一起来讲解的话,我们会理解的更快一些。所以今天在这里,我主要想讲解的内容是:
- 异步编程
- 是否需要多线程
- TAP编程模型
- 并行编程
首先来说下异步编程
所谓的异步编程就是指 当前的操作独立于主操作流程。在C#编程中,我们总是从进入Main方法开始 ,Main方法返回结束。在开始和结束之间的所有操作,都会挨个的执行,下一个操作必须等到上一个操作完毕才开始执行,我们看看如下代码:
static void Main(string[] args){DoTaskOne();DoTaskTwo();}
“DoTaskOne”方法必须在“DoTaskTwo”方法之前执行。也就是发生了阻塞现象。
但是在异步编程中,方法的调用并不会阻塞主线程。当方法被调用后,主线程仍会继续执行其他的任务。这种情形,一般都是使用Thread或者是Task来实现的。
在上面的场景中,如果我们让方法“DoTaskOne”异步执行,那么主线程将会立即返回,然后执行“DoTaskTwo”方法。
在.NET中,我们可以利用Thread类或者异步模式来创建我们自己的线程来实现异步编程。在.NET中有三种不同的异步模型:
1.APM模型
2.EAP模型
但是,遗憾的是,上面的这两种模型都不是微软所推荐的,所以我们将不对其做具体的讨论。如果你对这两种模型感兴趣,请移步:
https://msdn.microsoft.com/en-us/library/ms228963(v=vs.110).aspx
https://msdn.microsoft.com/en-us/library/ms228969(v=vs.110).aspx
3.TAP模型:这是微软所推荐的,我们稍后将会详细的进行讲解。
我们是否需要启用多线程
如果我们在.NET 4.5版本中使用异步编程模型,绝大部分情况下我们无需手动创建线程。因为编译器已经为我们做了足够多的工作了。
创建一个新线程不仅耗费资源而且花费时间,除非我们真的需要去控制一个多线程,否则是不需要去创建的。因为TAP模型和TPL模型已经为异步编程和并行编程提供了绝好的支持。TAP模型和TPL模型使用Task来进行操作,而Task类使用线程池中的线程来完成操作的。
Task可以在如下场景中运行:
- 当前线程
- 新线程
- 线程池中的线程
- 没有线程
在我们使用Task过程中,我们无需担心线程的创建或者是使用,因为.NET framework已经为我们处理好了各种细节。但是如果我们需要控制线程的话,比如说以下的场景:
- 为线程设置名称
- 为线程设置优先级
- 设置线程为后台线程
那么我们不得不使用Thread类来控制。
async和await关键字
.NET framework引入了两个新的关键字“async”和“await”来执行异步编程。当在方法上使用await关键字的时候,我们需要同时使用async关键字。await关键字是在调用异步方法之前被使用的。它主要是使方法的后续执行编程异步的,例如:
private asyncstatic void CallerWithAsync()// async modifier is used {// await is used before a method call. It suspends //execution of CallerWithAsync() method and control returs to the calling thread that can//perform other task.string result = await GetSomethingAsync();// this line would not be executed before GetSomethingAsync() //method completesConsole.WriteLine(result); }
在这里,async关键字只能够被那些返回Task或者是void的方法所使用。它不能被用于Main入口方法上。
我们不能够在所有的方法上使用await关键字,因为一旦方法上有了await关键字,那么我们的返回类型就变成了“awaitable “类型。下面的几种类型是” awaitable “的:
- Task
- Task<T>
TAP模型
首先,我们需要一个能够返回Task或者Task<T>的异步方法。我们可以通过如下方式来创建Task:
- Task.Factory.StartNew方法:在.net 4.5版本之前(.net 4)中,这是默认的创建和组织task的方式。
- Task.Run或者Task.Run<T>方法:从.net 4.5版本开始,这个方法被引进来了。这个方法足以能够应付大多数的场景。
- Task.FromResult方法:如果方法已经执行完毕并返回结果,我们可以使用这个方法来创建一个task。
Task.Factory.StartNew还有一些高级应用场景,请移步:http://blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx
下面的链接则展示了创建Task的几种方式:
http://dotnetcodr.com/2014/01/01/5-ways-to-start-a-task-in-net-c/
Task的创建和等待
我们接下来将会利用Task.Run<T>方法来创建我们自己的Task。它会将某个特殊的方法放入位于ThreadPool的执行队列中,然后会为这些方法返回一个task句柄。下面的步骤将会为你展示我们如何为一个同步方法创建异步的Task的:
- 我们有一个同步的方法,需要耗费一些时间来执行完毕:
static string Greeting(string name){Thread.Sleep(3000);return string.Format("Hello, {0}", name);}
- 为了让此方法一步执行,我们需要对其进行异步方法的包装。这个异步方法我们暂定为“GreetingAsync“。
static Task<string> GreetingAsync(string name) {return Task.Run<string>(() =>{return Greeting(name);}); }
现在我们可以使用await关键字来调用GreetingAsync方法
private asyncstatic void CallWithAsync(){//some other tasksstring result = awaitGreetingAsync("Bulbul");//We can add multiple ‘await’ in same ‘async’ method//string result1 = await GreetingAsync(“Ahmed”);//string result2 = await GreetingAsync(“Every Body”);Console.WriteLine(result);}
当“CallWithAsync”方法被调用的时候,它首先像正常的同步方法被调用,直到遇到await关键字。当它遇到await关键字的时候,它会暂定方法的执行,然后等待“GreetingAsync(“Bulbul”)”方法执行完毕。在此期间,这个控制流程会返回到“CallWithAsync”方法上,然后调用者就可以做其他的任务了。
当“GreetingAsync(“Bulbul”)”方法执行完毕以后,“CallWithAsync”方法就会唤醒在await关键字后面的其他的方法,所以在这里它会继续执行“Console.WriteLine(result)”方法。
- 任务的继续执行: “ContinueWith”方法则表明任务的持续执行。
private static void CallWithContinuationTask(){Task<string> t1 = GreetingAsync("Bulbul");t1.ContinueWith(t =>{string result = t.Result;Console.WriteLine(result);});}
当我们使用ContinueWith方法的时候,我们无需使用await关键字。因为编译器会自动的将await关键字放到正确的位置上。
等待多个异步方法
让我们先看下面的代码:
private asyncstatic void CallWithAsync(){string result = awaitGreetingAsync("Bulbul");string result1 = awaitGreetingAsync("Ahmed");Console.WriteLine(result);Console.WriteLine(result1);}
在这里,我们在顺序的等待两个方法被执行。GreetingAsync("Ahmed")方法将会在GreetingAsync("Bulbul")执行后,再执行。但是,如果result和result1彼此不是独立的,那么await关键字这样用是不合适的。
在上面的场景中,我们其实是无需使用await关键字的。所以方法可以被改成如下的样子:
private async static void MultipleAsyncMethodsWithCombinators(){Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await Task.WhenAll(t1, t2);Console.WriteLine("Finished both methods.\n " +"Result 1: {0}\n Result 2: {1}", t1.Result, t2.Result);}
在这里我们用了Task.WhenAll方法,它主要是等待所有的Task都完成工作后再触发。Task类还有另一个方法:WhenAny,它主要是等待任何一个Task完成就会触发。
异常处理
当进行错误处理的时候,我们不得不将await关键字放到try代码块中。
private asyncstatic void CallWithAsync() {try{string result = awaitGreetingAsync("Bulbul");}catch (Exception ex){Console.WriteLine(“handled {0}”, ex.Message);} }
但是,如果我们有多个await关键字存在于try代码快中,那么只有第一个错误被处理,因为第二个await是无法被执行的。如果我们想要所有的方法都被执行,甚至当其中一个抛出异常的时候,我们不能使用await关键字来调用他们,我们需要使用Task.WhenAll方法来等待所有的task执行。
private asyncstatic void CallWithAsync(){try{Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await Task.WhenAll(t1, t2);}catch (Exception ex){Console.WriteLine(“handled {0}”, ex.Message);}}
尽管所有的任务都会完成,但是我们可以从第一个task那里看到错误。虽然它不是第一个抛出错误的,但是它是列表中的第一个任务。
如果想要从所有的任务中获取错误,那么有一个方式就是将其在try代码块外部进行声明。然后检查Task方法的“IsFaulted”属性。如果有错误抛出,那么其“IsFaulted”属性为true。
示例如下:
static async void ShowAggregatedException(){Task taskResult = null;try {Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await (taskResult = Task.WhenAll(t1, t2));}catch (Exception ex){Console.WriteLine("handled {0}", ex.Message);foreach (var innerEx in taskResult.Exception.InnerExceptions){Console.WriteLine("inner exception {0}", nnerEx.Message);}}}
Task的取消执行
如果直接使用ThreadPool中的Thread,我们是无法进行取消操作的。但是现在Task类提供了一种基于CancellationTokenSource类的方式来取消任务的执行,可以按照如下步骤来进行:
- 异步方法需要附带一个“CancellationToken”类型。
- 创建CancellationTokenSource类的实例:
- 将CancellationToken传递给异步方法:
- 对于长时间执行的方法,如果想取消的话,我们需要调用CancellationToken的ThrowIfCancellationRequested方法。
- 捕捉Task抛出的 OperationCanceledException。
- 现在,如果我们通过调用CancellationTokenSource的cancel方法来取消当前的操作的话,对于那些长时间运行的操作,将会抛出OperationCanceledException错误。我们也可以通过设置超时时间来取消任务。更多关于CancellationTokenSource类的信息,请移步:https://msdn.microsoft.com/en-us/library/system.threading.cancellationtokensource%28v=vs.110%29.aspx
var cts = new CancellationTokenSource();Task<string> t1 = GreetingAsync("Bulbul", cts.Token); static string Greeting(string name, CancellationToken token) {Thread.Sleep(3000);token.ThrowIfCancellationRequested();return string.Format("Hello, {0}", name); }
下面让我们看看如何设置超时时间来取消任务的执行:
static void Main(string[] args){CallWithAsync();Console.ReadKey(); }async static void CallWithAsync(){try{CancellationTokenSource source = new CancellationTokenSource();source.CancelAfter(TimeSpan.FromSeconds(1));var t1 = await GreetingAsync("Bulbul", source.Token);}catch (OperationCanceledException ex){Console.WriteLine(ex.Message);}}static Task<string> GreetingAsync(string name, CancellationToken token){return Task.Run<string>(() =>{return Greeting(name, token);});}static string Greeting(string name, CancellationToken token){Thread.Sleep(3000);token.ThrowIfCancellationRequested();return string.Format("Hello, {0}", name);}
并行编程
在.net 4.5中,存在一个叫做“Parallel”的类,这个类可以进行并行操作。当然这种并行和那些充分利用cpu计算能力的Thread 是有差别的,简单说起来,它有两种表现方式:
1.数据并行。 如果我们有很多的数据需要计算,我们需要他们并行的进行,那么我们可以使用For或者ForEach方法来进行:
ParallelLoopResult result =Parallel.For(0, 100, async (int i) =>{Console.WriteLine("{0}, task: {1}, thread: {2}", i,Task.CurrentId, Thread.CurrentThread.ManagedThreadId);await Task.Delay(10);});
如果我们在计算的过程中,想要中断并行,我们可以把ParallelLoopState当做参数传递进去,我们就可以实现认为和中断这种循环:
ParallelLoopResult result =Parallel.For(0, 100, async (int i, ParallelLoopState pls) =>{Console.WriteLine("{0}, task: {1}, thread: {2}", i,Task.CurrentId, Thread.CurrentThread.ManagedThreadId);await Task.Delay(10);if (i > 5) pls.Break();});
需要注意的是,当我们需要中断循环的时候,由于其运行在诸多个线程之上,如果线程数多于我们设定的中断数时,上述的执行可能就不太准确。
- 任务并行。如果我们想要多个任务并行处理,那么我们可以使用Parallel.Invoke方法来接受Action委托数组。例如:
static void ParallelInvoke() {Parallel.Invoke(MethodOne, MethodTwo); }
参考文章:http://www.codeproject.com/Articles/996857/Asynchronous-programming-and-Threading-in-Csharp-N?bmkres=exist#_articleTop
引用:https://www.cnblogs.com/scy251147/p/4597615.html
异步编程总结
最近在为公司的分布式服务框架做支持异步调用的开发,这种新特性的上线需要进行各种严格的测试。在并发性能测试时,性能一直非常差,而且非常的不稳定。经过不断的分析调优,发现Socket通信和多线程异步回调存在较为严重的性能问题。经过多方优化,性能终于达标。下面是原版本、支持异步最初版本和优化后版本的性能比较。差异还是非常巨大的。另外说明一下,总耗时是指10000次请求累计执行时间。
从上图可以看到,支持异步的版本,在单线程模式下,性能的表现与老版本差异并不明显,但是10线程下差异就非常巨大,而100线程的测试结果反而有所好转。通过分析,两个版本的性能差异如此巨大,主要是因为:
- 同步模式会阻塞客户端请求,说白了,在线程内就是串行请求的。但是在异步模式中,线程内的请求不再阻塞,网络流量、后台计算压力瞬间暴涨,峰值是同步模式的100倍。网络传输变成瓶颈点。
- 在压力暴涨的情况下,CPU资源占用也会突变, 并且ThreadPool、Task、异步调用的执行都将变慢。
在网络通信方面,把原先半异步的模式调整为了SocketAsyncEventArgs 模式。下面是Socket通信的几种模型的介绍和示例,总结一下,与大家分享。下次再与大家分享,并发下异步调用的性能优化方案。
APM方式: Asynchronous Programming Model
异步编程模型是一种模式,该模式允许用更少的线程去做更多的操作,.NET Framework很多类也实现了该模式,同时我们也可以自定义类来实现该模式。NET Framework中的APM也称为Begin/End模式。此种模式下,调用BeginXXX方法来启动异步操作,然后返回一个IAsyncResult 对象。当操作执行完成后,系统会触发IAsyncResult 对象的执行。 具体可参考: https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/asynchronous-programming-model-apm
.net中的Socket异步模式也支持APM,与同步模式或Blocking模式相比,可以更好的利用网络带宽和系统资源编写出具有更高性能的程序。参考具体代码如下:
服务端监听:
Socket serverSocket =
new
Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
//本机预使用的IP和端口
IPEndPoint serverIP =
new
IPEndPoint(IPAddress.Any, 9050);
//绑定服务端设置的IP
serverSocket.Bind(serverIP);
//设置监听个数
serverSocket.Listen(1);
//异步接收连接请求
serverSocket.BeginAccept(ar =>
{
base
.communicateSocket = serverSocket.EndAccept(ar);
AccessAciton();
},
null
);
客户端连接:
var communicateSocket =
new
Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
communicateSocket.Bind(
new
IPEndPoint(IPAddress.Any, 9051));
//服务器的IP和端口
IPEndPoint serverIP;
try
{
serverIP =
new
IPEndPoint(IPAddress.Parse(IP), 9050);
}
catch
{
throw
new
Exception(String.Format(
"{0}不是一个有效的IP地址!"
, IP));
}
//客户端只用来向指定的服务器发送信息,不需要绑定本机的IP和端口,不需要监听
try
{
c
ommunicateSocket.BeginConnect(serverIP, ar =>
{
AccessAciton();
},
null
);
}
catch
{
throw
new
Exception(
string
.Format(
"尝试连接{0}不成功!"
, IP));
}
客户端请求:
if
(communicateSocket.Connected ==
false
)
{
throw
new
Exception(
"还没有建立连接, 不能发送消息"
);
}
Byte[] msg = Encoding.UTF8.GetBytes(message);
communicateSocket.BeginSend(msg,0, msg.Length, SocketFlags.None,
ar => {
},
null
);
服务端响应:
Byte[] msg =
new
byte
[1024];
//异步的接受消息
communicateSocket.BeginReceive(msg, 0, msg.Length, SocketFlags.None,
ar => {
//对方断开连接时, 这里抛出Socket Exception
communicateSocket.EndReceive(ar);
ReceiveAction(Encoding.UTF8.GetString(msg).Trim(
'\0'
,
' '
));
Receive(ReceiveAction);
},
null
);
注意:异步模式虽好,但是如果进行大量异步套接字操作,是要付出很高代价的。针对每次操作,都必须创建一个IAsyncResult对象,而且该对象不能被重复使用。由于大量使用对象分配和垃圾收集,这会影响系统性能。如需要更好的理解APM模式,最了解EAP模式:Event-based Asynchronous Pattern:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/event-based-asynchronous-pattern-eap 。
TAP 方式: Task-based Asynchronous Pattern
基于任务的异步模式,该模式主要使用System.Threading.Tasks.Task和Task<T>类来完成异步编程,相对于APM 模式来讲,TAP使异步编程模式更加简单(因为这里我们只需要关注Task这个类的使用),同时TAP也是微软推荐使用的异步编程模式。APM与TAP的本质区别,请参考我的一篇历史博客:http://www.cnblogs.com/vveiliang/p/7943003.html
TAP模式与APM模式是两种异步模式的实现,从性能上看没有本质的差别。TAP的资料可参考:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap 。参考具体代码如下:
服务端:
publicclassStateContext
{
// Client socket.
publicSocketWorkSocket =null;
// Size of receive buffer.
publicconstintBufferSize = 1024;
// Receive buffer.
publicbyte[] buffer =newbyte[BufferSize];
// Received data string.
publicStringBuildersb =newStringBuilder(100);
}
publicclassAsynchronousSocketListener
{
// Thread signal.
publicstaticManualResetEventreSetEvent =newManualResetEvent(false);
publicAsynchronousSocketListener()
{
}
publicstaticvoidStartListening()
{
// Data buffer for incoming data.
byte[] bytes =newByte[1024];
// Establish the local endpoint for the socket.
IPAddressipAddress =IPAddress.Parse("127.0.0.1");
IPEndPointlocalEndPoint =newIPEndPoint(ipAddress, 11000);
// Create a TCP/IP socket.
Socketlistener =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
// Bind the socket to the local
try
{
listener.Bind(localEndPoint);
listener.Listen(100);
while(true)
{
// Set the event to nonsignaled state.
reSetEvent.Reset();
// Start an asynchronous socket to listen for connections.
Console.WriteLine("Waiting for a connection...");
listener.BeginAccept(newAsyncCallback(AcceptCallback), listener);
// Wait until a connection is made before continuing.
reSetEvent.WaitOne();
}
}
catch(Exceptione)
{
Console.WriteLine(e.ToString());
}
Console.WriteLine("\nPress ENTER to continue...");
Console.Read();
}
publicstaticvoidAcceptCallback(IAsyncResultar)
{
// Signal the main thread to continue.
reSetEvent.Set();
// Get the socket that handles the client request.
Socketlistener = (Socket)ar.AsyncState;
Sockethandler = listener.EndAccept(ar);
// Create the state object.
StateContextstate =newStateContext();
state.WorkSocket = handler;
handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);
}
publicstaticvoidReadCallback(IAsyncResultar)
{
Stringcontent =String.Empty;
StateContextstate = (StateContext)ar.AsyncState;
Sockethandler = state.WorkSocket;
// Read data from the client socket.
intbytesRead = handler.EndReceive(ar);
if(bytesRead > 0)
{
// There might be more data, so store the data received so far.
state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
// Check for end-of-file tag. If it is not there, read
// more data.
content = state.sb.ToString();
if(content.IndexOf("<EOF>") > -1)
{
Console.WriteLine("读取 {0} bytes. \n 数据: {1}", content.Length, content);
Send(handler, content);
}
else
{
handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);
}
}
}
privatestaticvoidSend(Sockethandler,Stringdata)
{
byte[] byteData =Encoding.ASCII.GetBytes(data);
handler.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), handler);
}
privatestaticvoidSendCallback(IAsyncResultar)
{
try
{
Sockethandler = (Socket)ar.AsyncState;
intbytesSent = handler.EndSend(ar);
Console.WriteLine("发送 {0} bytes.", bytesSent);
handler.Shutdown(SocketShutdown.Both);
handler.Close();
}
catch(Exceptione)
{
Console.WriteLine(e.ToString());
}
}
publicstaticintMain(String[] args)
{
StartListening();
return0;
}
客户端:
publicclassAsynchronousClient
{
// The port number for the remote device.
privateconstintport = 11000;
// ManualResetEvent instances signal completion.
privatestaticManualResetEventconnectResetEvent =newManualResetEvent(false);
privatestaticManualResetEventsendResetEvent =newManualResetEvent(false);
privatestaticManualResetEventreceiveResetEvent =newManualResetEvent(false);
privatestaticStringresponse =String.Empty;
privatestaticvoidStartClient()
{
try
{
IPAddressipAddress =IPAddress.Parse("127.0.0.1");
IPEndPointremoteEP =newIPEndPoint(ipAddress, port);
// Create a TCP/IP socket.
Socketclient =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
// Connect to the remote endpoint.
client.BeginConnect(remoteEP,newAsyncCallback(ConnectCallback), client);
connectResetEvent.WaitOne();
Send(client,"This is a test<EOF>");
sendResetEvent.WaitOne();
Receive(client);
receiveResetEvent.WaitOne();
Console.WriteLine("Response received : {0}", response);
// Release the socket.
client.Shutdown(SocketShutdown.Both);
client.Close();
Console.ReadLine();
}
catch(Exceptione)
{
Console.WriteLine(e.ToString());
}
}
privatestaticvoidConnectCallback(IAsyncResultar)
{
try
{
Socketclient = (Socket)ar.AsyncState;
client.EndConnect(ar);
Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint.ToString());
connectResetEvent.Set();
}
catch(Exceptione)
{
Console.WriteLine(e.ToString());
}
}
privatestaticvoidReceive(Socketclient)
{
try
{
StateContextstate =newStateContext();
state.WorkSocket = client;
client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);
}
catch(Exceptione)
{
Console.WriteLine(e.ToString());
}
}
privatestaticvoidReceiveCallback(IAsyncResultar)
{
try
{
StateContextstate = (StateContext)ar.AsyncState;
Socketclient = state.WorkSocket;
intbytesRead = client.EndReceive(ar);
if(bytesRead > 0)
{
state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);
}
else
{
if(state.sb.Length > 1)
{
response = state.sb.ToString();
}
receiveResetEvent.Set();
}
}
catch(Exceptione)
{
Console.WriteLine(e.ToString());
}
}
privatestaticvoidSend(Socketclient,Stringdata)
{
byte[] byteData =Encoding.ASCII.GetBytes(data);
client.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), client);
}
privatestaticvoidSendCallback(IAsyncResultar)
{
try
{
Socketclient = (Socket)ar.AsyncState;
intbytesSent = client.EndSend(ar);
Console.WriteLine("Sent {0} bytes to server.", bytesSent);
sendResetEvent.Set();
}
catch(Exceptione)
{
Console.WriteLine(e.ToString());
}
}
publicstaticintMain(String[] args)
{
StartClient();
return0;
}
}
SAEA方式: SocketAsyncEventArgs
APM模式、TAP模式虽然解决了Socket的并发问题,但是在大并发下还是有较大性能问题的。这主要是因为上述两种模式都会生产 IAsyncResult 等对象 ,而大量垃圾对象的回收会非常影响系统的性能。为此,微软推出了 SocketAsyncEventArgs 。SocketAsyncEventArgs 是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。SocketAsyncEventArgs 相比于 APM 方式的主要优点可以描述如下,无需每次调用都生成 IAsyncResult 等对象,向原生 Socket 更靠近一些。这是官方的解释:
The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.
SocketAsyncEventArgs主要为高性能网络服务器应用程序而设计,避免了在异步套接字 I/O 量非常大时,大量垃圾对象创建与回收。使用此类执行异步套接字操作的模式包含以下步骤,具体说明可参考:https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx 。
- 分配一个新的 SocketAsyncEventArgs 上下文对象,或者从应用程序池中获取一个空闲的此类对象。
- 将该上下文对象的属性设置为要执行的操作(例如,完成回调方法、数据缓冲区、缓冲区偏移量以及要传输的最大数据量)。
- 调用适当的套接字方法 (xxxAsync) 以启动异步操作。
- 如果异步套接字方法 (xxxAsync) 返回 true,则在回调中查询上下文属性来获取完成状态。
- 如果异步套接字方法 (xxxAsync) 返回 false,则说明操作是同步完成的。 可以查询上下文属性来获取操作结果。
- 将该上下文重用于另一个操作,将它放回到应用程序池中,或者将它丢弃。
下面是封装的一个组件代码:
classBufferManager
{
intm_numBytes; // the total number of bytes controlled by the buffer pool
byte[] m_buffer; // the underlying byte array maintained by the Buffer Manager
Stack<int> m_freeIndexPool; //
intm_currentIndex;
intm_bufferSize;
publicBufferManager(inttotalBytes,intbufferSize)
{
m_numBytes = totalBytes;
m_currentIndex = 0;
m_bufferSize = bufferSize;
m_freeIndexPool =newStack<int>();
}
// Allocates buffer space used by the buffer pool
publicvoidInitBuffer()
{
// create one big large buffer and divide that
// out to each SocketAsyncEventArg object
m_buffer =newbyte[m_numBytes];
}
// Assigns a buffer from the buffer pool to the
// specified SocketAsyncEventArgs object
//
// <returns>true if the buffer was successfully set, else false</returns>
publicboolSetBuffer(SocketAsyncEventArgsargs)
{
if(m_freeIndexPool.Count > 0)
{
args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
}
else
{
if((m_numBytes - m_bufferSize) < m_currentIndex)
{
returnfalse;
}
args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
m_currentIndex += m_bufferSize;
}
returntrue;
}
// Removes the buffer from a SocketAsyncEventArg object.
// This frees the buffer back to the buffer pool
publicvoidFreeBuffer(SocketAsyncEventArgsargs)
{
m_freeIndexPool.Push(args.Offset);
args.SetBuffer(null, 0, 0);
}
}
///<summary>
///This class is used to communicate with a remote application over TCP/IP protocol.
///</summary>
classTcpCommunicationChannel
{
#regionPrivate fields
///<summary>
///Size of the buffer that is used to receive bytes from TCP socket.
///</summary>
privateconstintReceiveBufferSize = 8 * 1024;//4KB
///<summary>
///This buffer is used to receive bytes
///</summary>
privatereadonlybyte[] _buffer;
///<summary>
///Socket object to send/reveice messages.
///</summary>
privatereadonlySocket_clientSocket;
///<summary>
///A flag to control thread's running
///</summary>
privatevolatilebool_running;
///<summary>
///This object is just used for thread synchronizing (locking).
///</summary>
privatereadonlyobject_syncLock;
privateBufferManagerreceiveBufferManager;
privateSocketAsyncEventArgsreceiveBuff =null;
#endregion
#regionConstructor
///<summary>
///Creates a new TcpCommunicationChannel object.
///</summary>
///<param name="clientSocket">A connected Socket object that is
///used to communicate over network</param>
publicTcpCommunicationChannel(SocketclientSocket)
{
_clientSocket = clientSocket;
_clientSocket.Blocking =false;
_buffer =newbyte[ReceiveBufferSize];
_syncLock =newobject();
Init();
}
privatevoidInit()
{
//初始化接收Socket缓存数据
receiveBufferManager =newBufferManager(ReceiveBufferSize*2, ReceiveBufferSize);
receiveBufferManager.InitBuffer();
receiveBuff =newSocketAsyncEventArgs();
receiveBuff.Completed += ReceiveIO_Completed;
receiveBufferManager.SetBuffer(receiveBuff);
//初始化发送Socket缓存数据
}
#endregion
#regionPublic methods
///<summary>
///Disconnects from remote application and closes channel.
///</summary>
publicvoidDisconnect()
{
_running =false;
receiveBuff.Completed -= ReceiveIO_Completed;
receiveBuff.Dispose();
if(_clientSocket.Connected)
{
_clientSocket.Close();
}
_clientSocket.Dispose();
}
#endregion
publicvoidStartReceive()
{
_running =true;
boolresult = _clientSocket.ReceiveAsync(receiveBuff);
}
privatevoidReceiveIO_Completed(objectsender,SocketAsyncEventArgse)
{
if(e.BytesTransferred > 0 && e.SocketError ==SocketError.Success && _clientSocket.Connected ==true&& e.LastOperation ==SocketAsyncOperation.Receive)
{
if(!_running)
{
return;
}
//Get received bytes count
DateTimereceiveTime =DateTime.Now;
//Copy received bytes to a new byte array
varreceivedBytes =newbyte[e.BytesTransferred];
Array.Copy(e.Buffer, 0, receivedBytes, 0, e.BytesTransferred);
//处理消息....
if(_running)
{
StartReceive();
}
}
}
///<summary>
///Sends a message to the remote application.
///</summary>
///<param name="message">Message to be sent</param>
publicvoidSendMessage(byte[] messageBytes)
{
//Send message
if(_clientSocket.Connected)
{
SocketAsyncEventArgsdata =newSocketAsyncEventArgs();
data.SocketFlags =SocketFlags.None;
data.Completed += (s, e) =>
{
e.Dispose();
};
data.SetBuffer(messageBytes, 0, messageBytes.Length);
//Console.WriteLine("发送:" + messageBytes.LongLength);
_clientSocket.SendAsync(data);
}
}
}