【转】.Net中的异步编程总结

一直以来很想梳理下我在开发过程中使用异步编程的心得和体会,但是由于我是APM异步编程模式的死忠,当TAP模式和TPL模式出现的时候我并未真正的去接纳这两种模式,所以导致我一直没有花太多心思去整理这两部分异步编程模型。今天在CodeProject上面偶然间看到一篇关于异步编程的文章,概括总结的非常好,省去了我自己写的麻烦,索性翻译过来,以飨各位。

      在阻塞和并行编程过程中,异步和多线程是非常重要的特性。异步编程可以涉及到多线程,也可以不用涉及。如果我们能够把二者放到一起来讲解的话,我们会理解的更快一些。所以今天在这里,我主要想讲解的内容是:

  1. 异步编程
  2. 是否需要多线程
  3. TAP编程模型
  4. 并行编程

首先来说下异步编程

所谓的异步编程就是指 当前的操作独立于主操作流程。在C#编程中,我们总是从进入Main方法开始 ,Main方法返回结束。在开始和结束之间的所有操作,都会挨个的执行,下一个操作必须等到上一个操作完毕才开始执行,我们看看如下代码:

static void Main(string[] args){DoTaskOne();DoTaskTwo();}

“DoTaskOne”方法必须在“DoTaskTwo”方法之前执行。也就是发生了阻塞现象。

但是在异步编程中,方法的调用并不会阻塞主线程。当方法被调用后,主线程仍会继续执行其他的任务。这种情形,一般都是使用Thread或者是Task来实现的。

在上面的场景中,如果我们让方法“DoTaskOne”异步执行,那么主线程将会立即返回,然后执行“DoTaskTwo”方法。

在.NET中,我们可以利用Thread类或者异步模式来创建我们自己的线程来实现异步编程。在.NET中有三种不同的异步模型:

1.APM模型

2.EAP模型

但是,遗憾的是,上面的这两种模型都不是微软所推荐的,所以我们将不对其做具体的讨论。如果你对这两种模型感兴趣,请移步:

https://msdn.microsoft.com/en-us/library/ms228963(v=vs.110).aspx

https://msdn.microsoft.com/en-us/library/ms228969(v=vs.110).aspx

3.TAP模型:这是微软所推荐的,我们稍后将会详细的进行讲解。

我们是否需要启用多线程

如果我们在.NET 4.5版本中使用异步编程模型,绝大部分情况下我们无需手动创建线程。因为编译器已经为我们做了足够多的工作了。

创建一个新线程不仅耗费资源而且花费时间,除非我们真的需要去控制一个多线程,否则是不需要去创建的。因为TAP模型和TPL模型已经为异步编程和并行编程提供了绝好的支持。TAP模型和TPL模型使用Task来进行操作,而Task类使用线程池中的线程来完成操作的。

Task可以在如下场景中运行:

  1. 当前线程
  2. 新线程
  3. 线程池中的线程
  4. 没有线程

在我们使用Task过程中,我们无需担心线程的创建或者是使用,因为.NET framework已经为我们处理好了各种细节。但是如果我们需要控制线程的话,比如说以下的场景:

  1. 为线程设置名称
  2. 为线程设置优先级
  3. 设置线程为后台线程

那么我们不得不使用Thread类来控制。

async和await关键字

.NET framework引入了两个新的关键字“async”和“await”来执行异步编程。当在方法上使用await关键字的时候,我们需要同时使用async关键字。await关键字是在调用异步方法之前被使用的。它主要是使方法的后续执行编程异步的,例如:

复制代码

复制代码

private asyncstatic void CallerWithAsync()// async modifier is used 
{// await is used before a method call. It suspends //execution of CallerWithAsync() method and control returs to the calling thread that can//perform other task.string result = await GetSomethingAsync();// this line would not be executed before  GetSomethingAsync() //method completesConsole.WriteLine(result);
}

复制代码

复制代码

在这里,async关键字只能够被那些返回Task或者是void的方法所使用。它不能被用于Main入口方法上。

我们不能够在所有的方法上使用await关键字,因为一旦方法上有了await关键字,那么我们的返回类型就变成了“awaitable “类型。下面的几种类型是” awaitable “的:

  1. Task
  2. Task<T>

TAP模型

首先,我们需要一个能够返回Task或者Task<T>的异步方法。我们可以通过如下方式来创建Task:

  1. Task.Factory.StartNew方法:在.net 4.5版本之前(.net 4)中,这是默认的创建和组织task的方式。
  2. Task.Run或者Task.Run<T>方法:从.net 4.5版本开始,这个方法被引进来了。这个方法足以能够应付大多数的场景。
  3. Task.FromResult方法:如果方法已经执行完毕并返回结果,我们可以使用这个方法来创建一个task。

Task.Factory.StartNew还有一些高级应用场景,请移步:http://blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx

下面的链接则展示了创建Task的几种方式: 
http://dotnetcodr.com/2014/01/01/5-ways-to-start-a-task-in-net-c/

 

Task的创建和等待

我们接下来将会利用Task.Run<T>方法来创建我们自己的Task。它会将某个特殊的方法放入位于ThreadPool的执行队列中,然后会为这些方法返回一个task句柄。下面的步骤将会为你展示我们如何为一个同步方法创建异步的Task的:

  1. 我们有一个同步的方法,需要耗费一些时间来执行完毕:
   static string Greeting(string name){Thread.Sleep(3000);return string.Format("Hello, {0}", name);}
  1. 为了让此方法一步执行,我们需要对其进行异步方法的包装。这个异步方法我们暂定为“GreetingAsync“。

 

 

static Task<string> GreetingAsync(string name)
{return Task.Run<string>(() =>{return Greeting(name);});
}

 

 

现在我们可以使用await关键字来调用GreetingAsync方法

 

 

private asyncstatic void CallWithAsync(){//some other tasksstring result = awaitGreetingAsync("Bulbul");//We can add  multiple ‘await’ in same ‘async’ method//string result1 = await GreetingAsync(“Ahmed”);//string result2 = await GreetingAsync(“Every Body”);Console.WriteLine(result);}

 

 

当“CallWithAsync”方法被调用的时候,它首先像正常的同步方法被调用,直到遇到await关键字。当它遇到await关键字的时候,它会暂定方法的执行,然后等待“GreetingAsync(“Bulbul”)”方法执行完毕。在此期间,这个控制流程会返回到“CallWithAsync”方法上,然后调用者就可以做其他的任务了。

当“GreetingAsync(“Bulbul”)”方法执行完毕以后,“CallWithAsync”方法就会唤醒在await关键字后面的其他的方法,所以在这里它会继续执行“Console.WriteLine(result)”方法。

  1. 任务的继续执行: “ContinueWith”方法则表明任务的持续执行。

复制代码

复制代码

private static void CallWithContinuationTask(){Task<string> t1 = GreetingAsync("Bulbul");t1.ContinueWith(t =>{string result = t.Result;Console.WriteLine(result);});}

复制代码

复制代码

当我们使用ContinueWith方法的时候,我们无需使用await关键字。因为编译器会自动的将await关键字放到正确的位置上。

等待多个异步方法

让我们先看下面的代码:

复制代码

复制代码

  private asyncstatic void CallWithAsync(){string result = awaitGreetingAsync("Bulbul");string result1 = awaitGreetingAsync("Ahmed");Console.WriteLine(result);Console.WriteLine(result1);}

复制代码

复制代码

在这里,我们在顺序的等待两个方法被执行。GreetingAsync("Ahmed")方法将会在GreetingAsync("Bulbul")执行后,再执行。但是,如果result和result1彼此不是独立的,那么await关键字这样用是不合适的。

在上面的场景中,我们其实是无需使用await关键字的。所以方法可以被改成如下的样子:

复制代码

复制代码

 private async static void MultipleAsyncMethodsWithCombinators(){Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await Task.WhenAll(t1, t2);Console.WriteLine("Finished both methods.\n " +"Result 1: {0}\n Result 2: {1}", t1.Result, t2.Result);}

复制代码

复制代码

在这里我们用了Task.WhenAll方法,它主要是等待所有的Task都完成工作后再触发。Task类还有另一个方法:WhenAny,它主要是等待任何一个Task完成就会触发。

异常处理

当进行错误处理的时候,我们不得不将await关键字放到try代码块中。

复制代码

复制代码

private asyncstatic void CallWithAsync()
{try{string result = awaitGreetingAsync("Bulbul");}catch (Exception ex){Console.WriteLine(&ldquo;handled {0}&rdquo;, ex.Message);}
}

复制代码

复制代码

但是,如果我们有多个await关键字存在于try代码快中,那么只有第一个错误被处理,因为第二个await是无法被执行的。如果我们想要所有的方法都被执行,甚至当其中一个抛出异常的时候,我们不能使用await关键字来调用他们,我们需要使用Task.WhenAll方法来等待所有的task执行。

复制代码

复制代码

private asyncstatic void CallWithAsync(){try{Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await Task.WhenAll(t1, t2);}catch (Exception ex){Console.WriteLine(&ldquo;handled {0}&rdquo;, ex.Message);}}

复制代码

复制代码

尽管所有的任务都会完成,但是我们可以从第一个task那里看到错误。虽然它不是第一个抛出错误的,但是它是列表中的第一个任务。

如果想要从所有的任务中获取错误,那么有一个方式就是将其在try代码块外部进行声明。然后检查Task方法的“IsFaulted”属性。如果有错误抛出,那么其“IsFaulted”属性为true。

示例如下:

 

复制代码

复制代码

 static async void ShowAggregatedException(){Task taskResult = null;try {Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await (taskResult = Task.WhenAll(t1, t2));}catch (Exception ex){Console.WriteLine("handled {0}", ex.Message);foreach (var innerEx in taskResult.Exception.InnerExceptions){Console.WriteLine("inner exception {0}", nnerEx.Message);}}}

复制代码

复制代码

Task的取消执行

如果直接使用ThreadPool中的Thread,我们是无法进行取消操作的。但是现在Task类提供了一种基于CancellationTokenSource类的方式来取消任务的执行,可以按照如下步骤来进行:

  1. 异步方法需要附带一个“CancellationToken”类型。
  2. 创建CancellationTokenSource类的实例:
  3. 将CancellationToken传递给异步方法:
  4. 对于长时间执行的方法,如果想取消的话,我们需要调用CancellationToken的ThrowIfCancellationRequested方法。
  5. 捕捉Task抛出的 OperationCanceledException。
  6. 现在,如果我们通过调用CancellationTokenSource的cancel方法来取消当前的操作的话,对于那些长时间运行的操作,将会抛出OperationCanceledException错误。我们也可以通过设置超时时间来取消任务。更多关于CancellationTokenSource类的信息,请移步:https://msdn.microsoft.com/en-us/library/system.threading.cancellationtokensource%28v=vs.110%29.aspx

复制代码

复制代码

var cts = new CancellationTokenSource();Task<string> t1 = GreetingAsync("Bulbul", cts.Token);
static string Greeting(string name, CancellationToken token)
{Thread.Sleep(3000);token.ThrowIfCancellationRequested();return string.Format("Hello, {0}", name);
}

复制代码

复制代码

下面让我们看看如何设置超时时间来取消任务的执行:

复制代码

复制代码

static void Main(string[] args){CallWithAsync();Console.ReadKey();           }async static void CallWithAsync(){try{CancellationTokenSource source = new CancellationTokenSource();source.CancelAfter(TimeSpan.FromSeconds(1));var t1 = await GreetingAsync("Bulbul", source.Token);}catch (OperationCanceledException ex){Console.WriteLine(ex.Message);}}static Task<string> GreetingAsync(string name, CancellationToken token){return Task.Run<string>(() =>{return Greeting(name, token);});}static string Greeting(string name, CancellationToken token){Thread.Sleep(3000);token.ThrowIfCancellationRequested();return string.Format("Hello, {0}", name);}

复制代码

复制代码

并行编程

在.net 4.5中,存在一个叫做“Parallel”的类,这个类可以进行并行操作。当然这种并行和那些充分利用cpu计算能力的Thread 是有差别的,简单说起来,它有两种表现方式:

1.数据并行。 如果我们有很多的数据需要计算,我们需要他们并行的进行,那么我们可以使用For或者ForEach方法来进行:

复制代码

复制代码

ParallelLoopResult result =Parallel.For(0, 100, async (int i) =>{Console.WriteLine("{0}, task: {1}, thread: {2}", i,Task.CurrentId, Thread.CurrentThread.ManagedThreadId);await Task.Delay(10);});

复制代码

复制代码

如果我们在计算的过程中,想要中断并行,我们可以把ParallelLoopState当做参数传递进去,我们就可以实现认为和中断这种循环:

复制代码

复制代码

ParallelLoopResult result =Parallel.For(0, 100, async (int i, ParallelLoopState pls) =>{Console.WriteLine("{0}, task: {1}, thread: {2}", i,Task.CurrentId, Thread.CurrentThread.ManagedThreadId);await Task.Delay(10);if (i > 5) pls.Break();});

复制代码

复制代码

需要注意的是,当我们需要中断循环的时候,由于其运行在诸多个线程之上,如果线程数多于我们设定的中断数时,上述的执行可能就不太准确。

  1. 任务并行。如果我们想要多个任务并行处理,那么我们可以使用Parallel.Invoke方法来接受Action委托数组。例如:
static void ParallelInvoke()
{Parallel.Invoke(MethodOne, MethodTwo);
}

 参考文章:http://www.codeproject.com/Articles/996857/Asynchronous-programming-and-Threading-in-Csharp-N?bmkres=exist#_articleTop

 

引用:https://www.cnblogs.com/scy251147/p/4597615.html

 

 

异步编程总结

     最近在为公司的分布式服务框架做支持异步调用的开发,这种新特性的上线需要进行各种严格的测试。在并发性能测试时,性能一直非常差,而且非常的不稳定。经过不断的分析调优,发现Socket通信和多线程异步回调存在较为严重的性能问题。经过多方优化,性能终于达标。下面是原版本、支持异步最初版本和优化后版本的性能比较。差异还是非常巨大的。另外说明一下,总耗时是指10000次请求累计执行时间。

Image

     从上图可以看到,支持异步的版本,在单线程模式下,性能的表现与老版本差异并不明显,但是10线程下差异就非常巨大,而100线程的测试结果反而有所好转。通过分析,两个版本的性能差异如此巨大,主要是因为:

  1. 同步模式会阻塞客户端请求,说白了,在线程内就是串行请求的。但是在异步模式中,线程内的请求不再阻塞,网络流量、后台计算压力瞬间暴涨,峰值是同步模式的100倍。网络传输变成瓶颈点。
  2. 在压力暴涨的情况下,CPU资源占用也会突变, 并且ThreadPool、Task、异步调用的执行都将变慢。

     在网络通信方面,把原先半异步的模式调整为了SocketAsyncEventArgs 模式。下面是Socket通信的几种模型的介绍和示例,总结一下,与大家分享。下次再与大家分享,并发下异步调用的性能优化方案。

APM方式: Asynchronous Programming Model

    异步编程模型是一种模式,该模式允许用更少的线程去做更多的操作,.NET Framework很多类也实现了该模式,同时我们也可以自定义类来实现该模式。NET Framework中的APM也称为Begin/End模式。此种模式下,调用BeginXXX方法来启动异步操作,然后返回一个IAsyncResult 对象。当操作执行完成后,系统会触发IAsyncResult 对象的执行。 具体可参考: https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/asynchronous-programming-model-apm

     .net中的Socket异步模式也支持APM,与同步模式或Blocking模式相比,可以更好的利用网络带宽和系统资源编写出具有更高性能的程序。参考具体代码如下:

服务端监听:

    

Socket serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

//本机预使用的IP和端口

IPEndPoint serverIP = new IPEndPoint(IPAddress.Any, 9050);

//绑定服务端设置的IP

serverSocket.Bind(serverIP);

//设置监听个数

serverSocket.Listen(1);

//异步接收连接请求

serverSocket.BeginAccept(ar =>

{

    base.communicateSocket = serverSocket.EndAccept(ar);

   AccessAciton();

 }, null);

客户端连接:

var communicateSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

   communicateSocket.Bind(new IPEndPoint(IPAddress.Any, 9051));

             

        //服务器的IP和端口

        IPEndPoint serverIP;

        try

        {

            serverIP = new IPEndPoint(IPAddress.Parse(IP), 9050);

        }

        catch

        {

            throw new Exception(String.Format("{0}不是一个有效的IP地址!", IP));

        }

             

        //客户端只用来向指定的服务器发送信息,不需要绑定本机的IP和端口,不需要监听

        try

        {

           communicateSocket.BeginConnect(serverIP, ar =>

            {

                AccessAciton();

            }, null);

        }

        catch

        {

            throw new Exception(string.Format("尝试连接{0}不成功!", IP));

        }

客户端请求:

    

if (communicateSocket.Connected == false)

        {

            throw new Exception("还没有建立连接, 不能发送消息");

        }

        Byte[] msg = Encoding.UTF8.GetBytes(message);

        communicateSocket.BeginSend(msg,0, msg.Length, SocketFlags.None,

            ar => {

                 

            }, null);

 

服务端响应:

Byte[] msg = new byte[1024];

        //异步的接受消息

        communicateSocket.BeginReceive(msg, 0, msg.Length, SocketFlags.None,

            ar => {

                //对方断开连接时, 这里抛出Socket Exception              

                    communicateSocket.EndReceive(ar);

                ReceiveAction(Encoding.UTF8.GetString(msg).Trim('\0',' '));

                Receive(ReceiveAction);

            }, null);

 

      注意:异步模式虽好,但是如果进行大量异步套接字操作,是要付出很高代价的。针对每次操作,都必须创建一个IAsyncResult对象,而且该对象不能被重复使用。由于大量使用对象分配和垃圾收集,这会影响系统性能。如需要更好的理解APM模式,最了解EAP模式:Event-based Asynchronous Pattern:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/event-based-asynchronous-pattern-eap 。

 

TAP 方式: Task-based Asynchronous Pattern

      基于任务的异步模式,该模式主要使用System.Threading.Tasks.Task和Task<T>类来完成异步编程,相对于APM 模式来讲,TAP使异步编程模式更加简单(因为这里我们只需要关注Task这个类的使用),同时TAP也是微软推荐使用的异步编程模式。APM与TAP的本质区别,请参考我的一篇历史博客:http://www.cnblogs.com/vveiliang/p/7943003.html

     TAP模式与APM模式是两种异步模式的实现,从性能上看没有本质的差别。TAP的资料可参考:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap 。参考具体代码如下:

服务端:

publicclassStateContext

{

   // Client socket.   

   publicSocketWorkSocket =null;

   // Size of receive buffer.   

   publicconstintBufferSize = 1024;

   // Receive buffer.   

   publicbyte[] buffer =newbyte[BufferSize];

   // Received data string.   

   publicStringBuildersb =newStringBuilder(100);

}

publicclassAsynchronousSocketListener

{

   // Thread signal.   

   publicstaticManualResetEventreSetEvent =newManualResetEvent(false);

   publicAsynchronousSocketListener()

    {

    }

   publicstaticvoidStartListening()

    {

       // Data buffer for incoming data.   

       byte[] bytes =newByte[1024];

       // Establish the local endpoint for the socket.   

       IPAddressipAddress =IPAddress.Parse("127.0.0.1");

       IPEndPointlocalEndPoint =newIPEndPoint(ipAddress, 11000);

       // Create a TCP/IP socket.   

       Socketlistener =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);

       // Bind the socket to the local   

       try

        {

            listener.Bind(localEndPoint);

            listener.Listen(100);

           while(true)

            {

               // Set the event to nonsignaled state.   

                reSetEvent.Reset();

               // Start an asynchronous socket to listen for connections.   

               Console.WriteLine("Waiting for a connection...");

                listener.BeginAccept(newAsyncCallback(AcceptCallback), listener);

               // Wait until a connection is made before continuing.   

                reSetEvent.WaitOne();

            }

        }

       catch(Exceptione)

        {

           Console.WriteLine(e.ToString());

        }

       Console.WriteLine("\nPress ENTER to continue...");

       Console.Read();

    }

   publicstaticvoidAcceptCallback(IAsyncResultar)

    {

       // Signal the main thread to continue.   

        reSetEvent.Set();

       // Get the socket that handles the client request.   

       Socketlistener = (Socket)ar.AsyncState;

       Sockethandler = listener.EndAccept(ar);

       // Create the state object.   

       StateContextstate =newStateContext();

        state.WorkSocket = handler;

        handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);

    }

   publicstaticvoidReadCallback(IAsyncResultar)

    {

       Stringcontent =String.Empty;

       StateContextstate = (StateContext)ar.AsyncState;

       Sockethandler = state.WorkSocket;

       // Read data from the client socket.   

       intbytesRead = handler.EndReceive(ar);

       if(bytesRead > 0)

        {

           // There might be more data, so store the data received so far.   

            state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));

           // Check for end-of-file tag. If it is not there, read   

           // more data.   

            content = state.sb.ToString();

           if(content.IndexOf("<EOF>") > -1)

            {

               Console.WriteLine("读取 {0} bytes. \n 数据: {1}", content.Length, content);

                Send(handler, content);

            }

           else

            {

                handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);

            }

        }

    }

   privatestaticvoidSend(Sockethandler,Stringdata)

    {

       byte[] byteData =Encoding.ASCII.GetBytes(data);

        handler.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), handler);

    }

   privatestaticvoidSendCallback(IAsyncResultar)

    {

       try

        {

           Sockethandler = (Socket)ar.AsyncState;

           intbytesSent = handler.EndSend(ar);

           Console.WriteLine("发送 {0} bytes.", bytesSent);

            handler.Shutdown(SocketShutdown.Both);

            handler.Close();

        }

       catch(Exceptione)

        {

           Console.WriteLine(e.ToString());

        }

    }

   publicstaticintMain(String[] args)

    {

        StartListening();

       return0;

    }

客户端:

publicclassAsynchronousClient

{

   // The port number for the remote device.   

   privateconstintport = 11000;

   // ManualResetEvent instances signal completion.   

   privatestaticManualResetEventconnectResetEvent =newManualResetEvent(false);

   privatestaticManualResetEventsendResetEvent =newManualResetEvent(false);

   privatestaticManualResetEventreceiveResetEvent =newManualResetEvent(false);

   privatestaticStringresponse =String.Empty;

   privatestaticvoidStartClient()

    {

       try

        {

         

           IPAddressipAddress =IPAddress.Parse("127.0.0.1");

           IPEndPointremoteEP =newIPEndPoint(ipAddress, port);

           // Create a TCP/IP socket.   

           Socketclient =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);

           // Connect to the remote endpoint.   

            client.BeginConnect(remoteEP,newAsyncCallback(ConnectCallback), client);

            connectResetEvent.WaitOne();

            Send(client,"This is a test<EOF>");

            sendResetEvent.WaitOne();

            Receive(client);

            receiveResetEvent.WaitOne();

           Console.WriteLine("Response received : {0}", response);

           // Release the socket.   

            client.Shutdown(SocketShutdown.Both);

            client.Close();

           Console.ReadLine();

        }

       catch(Exceptione)

        {

           Console.WriteLine(e.ToString());

        }

    }

   privatestaticvoidConnectCallback(IAsyncResultar)

    {

       try

        {

           Socketclient = (Socket)ar.AsyncState;

            client.EndConnect(ar);

           Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint.ToString());

            connectResetEvent.Set();

        }

       catch(Exceptione)

        {

           Console.WriteLine(e.ToString());

        }

    }

   privatestaticvoidReceive(Socketclient)

    {

       try

        {

           StateContextstate =newStateContext();

            state.WorkSocket = client;

            client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);

        }

       catch(Exceptione)

        {

           Console.WriteLine(e.ToString());

        }

    }

   privatestaticvoidReceiveCallback(IAsyncResultar)

    {

       try

        {

           StateContextstate = (StateContext)ar.AsyncState;

           Socketclient = state.WorkSocket;

           intbytesRead = client.EndReceive(ar);

           if(bytesRead > 0)

            {

                state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));

                client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);

            }

           else

            {

               if(state.sb.Length > 1)

                {

                    response = state.sb.ToString();

                }

                receiveResetEvent.Set();

            }

        }

       catch(Exceptione)

        {

           Console.WriteLine(e.ToString());

        }

    }

   privatestaticvoidSend(Socketclient,Stringdata)

    {

       byte[] byteData =Encoding.ASCII.GetBytes(data);

        client.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), client);

    }

   privatestaticvoidSendCallback(IAsyncResultar)

    {

       try

        {

           Socketclient = (Socket)ar.AsyncState;

           intbytesSent = client.EndSend(ar);

           Console.WriteLine("Sent {0} bytes to server.", bytesSent);

            sendResetEvent.Set();

        }

       catch(Exceptione)

        {

           Console.WriteLine(e.ToString());

        }

    }

   publicstaticintMain(String[] args)

    {

        StartClient();

       return0;

    }

}

SAEA方式: SocketAsyncEventArgs

      APM模式、TAP模式虽然解决了Socket的并发问题,但是在大并发下还是有较大性能问题的。这主要是因为上述两种模式都会生产 IAsyncResult 等对象 ,而大量垃圾对象的回收会非常影响系统的性能。为此,微软推出了 SocketAsyncEventArgs 。SocketAsyncEventArgs 是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。SocketAsyncEventArgs 相比于 APM 方式的主要优点可以描述如下,无需每次调用都生成 IAsyncResult 等对象,向原生 Socket 更靠近一些。这是官方的解释:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.

      SocketAsyncEventArgs主要为高性能网络服务器应用程序而设计,避免了在异步套接字 I/O 量非常大时,大量垃圾对象创建与回收。使用此类执行异步套接字操作的模式包含以下步骤,具体说明可参考:https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx 。

  1. 分配一个新的 SocketAsyncEventArgs 上下文对象,或者从应用程序池中获取一个空闲的此类对象。
  2. 将该上下文对象的属性设置为要执行的操作(例如,完成回调方法、数据缓冲区、缓冲区偏移量以及要传输的最大数据量)。
  3. 调用适当的套接字方法 (xxxAsync) 以启动异步操作。
  4. 如果异步套接字方法 (xxxAsync) 返回 true,则在回调中查询上下文属性来获取完成状态。
  5. 如果异步套接字方法 (xxxAsync) 返回 false,则说明操作是同步完成的。 可以查询上下文属性来获取操作结果。
  6. 将该上下文重用于另一个操作,将它放回到应用程序池中,或者将它丢弃。

    下面是封装的一个组件代码:

classBufferManager

    {

       intm_numBytes;                // the total number of bytes controlled by the buffer pool

       byte[] m_buffer;               // the underlying byte array maintained by the Buffer Manager

       Stack<int> m_freeIndexPool;    //

       intm_currentIndex;

       intm_bufferSize;

       publicBufferManager(inttotalBytes,intbufferSize)

        {

            m_numBytes = totalBytes;

            m_currentIndex = 0;

            m_bufferSize = bufferSize;

            m_freeIndexPool =newStack<int>();

        }

       // Allocates buffer space used by the buffer pool

       publicvoidInitBuffer()

        {

           // create one big large buffer and divide that

           // out to each SocketAsyncEventArg object

            m_buffer =newbyte[m_numBytes];

        }

       // Assigns a buffer from the buffer pool to the

       // specified SocketAsyncEventArgs object

       //

       // <returns>true if the buffer was successfully set, else false</returns>

       publicboolSetBuffer(SocketAsyncEventArgsargs)

        {

           if(m_freeIndexPool.Count > 0)

            {

                args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);

            }

           else

            {

               if((m_numBytes - m_bufferSize) < m_currentIndex)

                {

                   returnfalse;

                }

                args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);

                m_currentIndex += m_bufferSize;

            }

           returntrue;

        }

       // Removes the buffer from a SocketAsyncEventArg object.

       // This frees the buffer back to the buffer pool

       publicvoidFreeBuffer(SocketAsyncEventArgsargs)

        {

            m_freeIndexPool.Push(args.Offset);

            args.SetBuffer(null, 0, 0);

        }

    }

   ///<summary>

   ///This class is used to communicate with a remote application over TCP/IP protocol.

   ///</summary>

   classTcpCommunicationChannel

    {

      

       #regionPrivate fields

       ///<summary>

       ///Size of the buffer that is used to receive bytes from TCP socket.

       ///</summary>

       privateconstintReceiveBufferSize = 8 * 1024;//4KB

       ///<summary>

       ///This buffer is used to receive bytes

       ///</summary>

       privatereadonlybyte[] _buffer;

       ///<summary>

       ///Socket object to send/reveice messages.

       ///</summary>

       privatereadonlySocket_clientSocket;

       ///<summary>

       ///A flag to control thread's running

       ///</summary>

       privatevolatilebool_running;

       ///<summary>

       ///This object is just used for thread synchronizing (locking).

       ///</summary>

       privatereadonlyobject_syncLock;

       privateBufferManagerreceiveBufferManager;

       privateSocketAsyncEventArgsreceiveBuff =null;

       #endregion

       #regionConstructor

       ///<summary>

       ///Creates a new TcpCommunicationChannel object.

       ///</summary>

       ///<param name="clientSocket">A connected Socket object that is

       ///used to communicate over network</param>

       publicTcpCommunicationChannel(SocketclientSocket)

        {

            _clientSocket = clientSocket;

            _clientSocket.Blocking =false;

            _buffer =newbyte[ReceiveBufferSize];

            _syncLock =newobject();

            Init();

        }

       privatevoidInit()

        {

           //初始化接收Socket缓存数据

            receiveBufferManager =newBufferManager(ReceiveBufferSize*2, ReceiveBufferSize);

            receiveBufferManager.InitBuffer();

            receiveBuff =newSocketAsyncEventArgs();

            receiveBuff.Completed += ReceiveIO_Completed;

            receiveBufferManager.SetBuffer(receiveBuff);

           //初始化发送Socket缓存数据

        }

       #endregion

       #regionPublic methods

       ///<summary>

       ///Disconnects from remote application and closes channel.

       ///</summary>

       publicvoidDisconnect()

        {

            _running =false;

            receiveBuff.Completed -= ReceiveIO_Completed;

            receiveBuff.Dispose();

           if(_clientSocket.Connected)

            {

                _clientSocket.Close();

            }

            _clientSocket.Dispose();

        }

       #endregion

     

       publicvoidStartReceive()

        {

            _running =true;

           boolresult = _clientSocket.ReceiveAsync(receiveBuff);

        }

       privatevoidReceiveIO_Completed(objectsender,SocketAsyncEventArgse)

        {

           if(e.BytesTransferred > 0 && e.SocketError ==SocketError.Success && _clientSocket.Connected ==true&& e.LastOperation ==SocketAsyncOperation.Receive)

            {

               if(!_running)

                {

                   return;

                }

               //Get received bytes count

               DateTimereceiveTime =DateTime.Now;

               //Copy received bytes to a new byte array

               varreceivedBytes =newbyte[e.BytesTransferred];

               Array.Copy(e.Buffer, 0, receivedBytes, 0, e.BytesTransferred);

               //处理消息....

               if(_running)

                {

                    StartReceive();

                }

            }

        }

       ///<summary>

       ///Sends a message to the remote application.

       ///</summary>

       ///<param name="message">Message to be sent</param>

       publicvoidSendMessage(byte[] messageBytes)

        {

           //Send message

           if(_clientSocket.Connected)

            {

               SocketAsyncEventArgsdata =newSocketAsyncEventArgs();

                data.SocketFlags =SocketFlags.None;

                data.Completed += (s, e) =>

                {

                    e.Dispose();

                };

                data.SetBuffer(messageBytes, 0, messageBytes.Length);

               //Console.WriteLine("发送:" + messageBytes.LongLength);

                _clientSocket.SendAsync(data);

            }

        }

    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/437223.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习swing鼠标点击事件心得体会_西门子COMOS软件开发定制学习8-查询列表间的数据交互...

​本篇在西门子COMOS软件开发定制学习6-管理界面定制基础上定制&#xff0c;简单介绍两个查询列表之间的数据交互。实现效果&#xff1a;在左侧列表中选择某一设备&#xff0c;右侧列表自动根据所选设备&#xff0c;显示该设备相关的设计图纸(如PID图纸或电气图纸等)&#xff1…

【转】异步编程:.NET 4.5 基于任务的异步编程模型(TAP)

最近我为大家陆续介绍了“IAsyncResult异步编程模型 (APM)”和“基于事件的异步编程模式(EAP)”两种异步编程模型。在.NET4.0 中Microsoft又为我们引入了新的异步编程模型“基于任务的异步编程模型(TAP)”&#xff0c;并且推荐我们在开发新的多线程应用程序中首选TAP&#xff0…

python实验题_python实验二

安徽工程大学 Python 程序设计 实验报告 班级 物流191 姓名 王凡 学号 3190505102 成绩____________ 日期20200322 指导教师 修宇 【实验名称】 实验二 顺序结构程序设计 【思考题】 &#xff08;注意不要漏答&#xff09; 1、影响题1和题3计算准确性的因素有哪些&#xff1f;如…

解决:VS中进行Qt开发,编译时报错:打不开QWidgets.h等文件的问题

1. 先检查是否通过Qt VS Tools添加了Qt路径和配置了正确的Qt版本&#xff0c;这一步基本上都没问题。 2. 再检查此项目是否配置了正确的Qt版本&#xff0c;右键项目&#xff0c;选择Qt Project Settings&#xff0c;选择你需要的Qt版本即可&#xff0c;不可以为空。 3. 完成上面…

【转】1:C#的三种异步的详细介绍及实现

一、介绍异步的前世今生&#xff1a; 异步编程模型 (APM&#xff0c;Asynchronous Programming Model) 模式&#xff08;也称 IAsyncResult 模式&#xff09;&#xff0c;在此模式中异步操作需要 Begin 和 End 方法&#xff08;比如用于异步写入操作的 BeginWrite 和 EndWrite…

elasticsearch date_Elasticsearch在日志分析领域应用和运维实践

主要讲述了&#xff1a;基于ELK Kafka 的日志分析系统Elasticsearch 优化经验Elasticsearch 运维实践ElasticSearch介绍分布式实时分析搜索引擎&#xff0c;优点包括&#xff1a;查询近实时内存消耗小&#xff0c;搜索速度快可扩展性强高可用数据结构FST(Finite State Transdu…

【转】2:C#TPL探秘

理论&#xff1a; 1、 只要方法是 Task类型的返回值&#xff0c;都可以用 await 来等待调用获取返回值。 2、 如果一个返回 Task类型的方法被标记了 async&#xff0c;那么只要方法内部直接 return T 这个 类型的实例就可以。 3、 一个返回 Task类型的方法没有被标记了 asyn…

Qt添加翻译文件无效或部分无效

原因&#xff1a; QTranslator::load路径错误qApp->installTranslator调用时机应该在所有界面起来之前。命名空间宏导致的问题。如果一个类有命名空间宏&#xff0c;则宏不会转换为命名空间&#xff0c;导致ts文件内的上下文不包含命名空间。上下文对不上导致部分窗口翻译失…

linux shell脚本攻略第3版_「技术干货」师傅说不会写shell脚本的网安不是一个好黑客,实战...

shell脚本&#xff1f;在说什么是shell脚本之前&#xff0c;先说说什么是shell。shell是外壳的意思&#xff0c;就是操作系统的外壳。我们可以通过shell命令来操作和控制操作系统&#xff0c;比如Linux中的Shell命令就包括ls、cd、pwd等等。总结来说&#xff0c;Shell是一个命令…

【转】3:C#异步WaitAll的使用

编写界面如图&#xff1a; private async void button1_Click(object sender, EventArgs e){#region 单个执行的异步&#xff0c;效率慢HttpClient wc new HttpClient();string s1 await wc.GetStringAsync(textBox1.Text);label1.Text s1.Length.ToString();string s2 awa…

Qt实现QTextEdit背景透明

QTextEdit为什么要拿出来单独说&#xff0c;因为它继承自QAbstractScrollArea&#xff0c;一般的设置无效。滚动区域ScrollArea内部有一个widget&#xff0c;需要同时设置ScrollArea和viewport两个窗口才能实现透明。代码如下&#xff1a; m_text_editor->setWindowFlags(Q…

python断点调试_「Python调试器」,快速定位各种疑难杂症!!!

在很多的编辑器其实都带着「调试程序」的功能&#xff0c;比如写 c/c 的 codeblocks&#xff0c;写 Python 的 pycharm&#xff0c;这种图形界面的使用和显示都相当友好&#xff0c;简单方便易学&#xff0c;这个不是我这篇文章要讲的重点。今天主要是想给大家介绍一下 「 Pyth…

【转】C# 温故而知新:Stream篇(—)

目录&#xff1a; 什么是Stream? 什么是字节序列&#xff1f; Stream的构造函数 Stream的重要属性及方法 Stream的示例 Stream异步读写 Stream 和其子类的类图 本章总结 什么是Stream? MSDN 中的解释太简洁了: 提供字节序列的一般视图 &#xff08;我可不想这么理解…

python 画树 递归_数据结构 - python如何递归生成树?

问 题 class Tree: def __init__(self, label): self.root label self.child {} def set_child(self, label, relate): self.child[label] relate def get_root(self): return self.root def get_child(self): return self.child 这么一颗树结构&#xff0c;该如何写 def cr…

java integer valueof_一文读懂什么是Java中的自动拆装箱

本文主要介绍Java中的自动拆箱与自动装箱的有关知识。基本数据类型基本类型&#xff0c;或者叫做内置类型&#xff0c;是Java中不同于类(Class)的特殊类型。它们是我们编程中使用最频繁的类型。Java是一种强类型语言&#xff0c;第一次申明变量必须说明数据类型&#xff0c;第一…

【转】面试:一个单例模式,足以把你秒成渣

去面试&#xff08;对&#xff0c;又去面试&#xff09; 问&#xff1a;单例模式了解吧&#xff0c;来&#xff0c;拿纸和笔写一下单例模式。 我心想&#xff0c;这TM不是瞧不起人吗&#xff1f;我编程十年&#xff0c;能不知道单例模式。 答&#xff1a;&#xff08;.net 平…

【转】SQL 语句执行顺序

From&#xff1a;http://www.jellythink.com/archives/924 Oracle-SQL语句执行原理和完整过程详解&#xff1a;https://wenku.baidu.com/view/398bc427964bcf84b8d57b00.html 详解一条 SQL 语句的执行过程&#xff1a;http://www.cnblogs.com/cdf-opensource-007/p/6502556.h…

堆和栈的概念和区别 python_堆和栈的概念和区别

在说堆和栈之前&#xff0c;我们先说一下JVM&#xff08;虚拟机&#xff09;内存的划分&#xff1a; Java程序在运行时都要开辟空间&#xff0c;任何软件在运行时都要在内存中开辟空间&#xff0c;Java虚拟机运行时也是要开辟空间的。JVM运行时在内存中开辟一片内存区域&#x…

【手算】哈夫曼编码—树形倒置快速画法

哈夫曼编码的原理 参考文章&#xff1a;哈夫曼编码详解——图解真能看了秒懂 简单总结其原理&#xff1a; 需求&#xff1a;对重复出现的元素进行二进制编码&#xff0c;最高效的编码方式是哈夫曼编码。 方法&#xff1a;按照元素出现的频率大小构造一棵树&#xff0c;出现次…

【转】Web API项目中使用Area对业务进行分类管理

在之前开发的很多Web API项目中&#xff0c;为了方便以及快速开发&#xff0c;往往把整个Web API的控制器放在基目录的Controllers目录中&#xff0c;但随着业务越来越复杂&#xff0c;这样Controllers目录中的文件就增加很快&#xff0c;难以管理&#xff0c;而且如果有不同业…