C#高性能TCP服务的多种实现方式


哎~~ 想想大部分园友应该对 "高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是《猴赛雷,C#编写TCP服务的花样姿势!》

本篇文章的主旨是使用 .NET/C# 实现 TCP 高性能服务的不同方式,包括但不限于如下内容:

  • APM 方式,即 Asynchronous Programming Model

  • TAP 方式,即 Task-based Asynchronous Pattern

  • SAEA 方式,即 SocketAsyncEventArgs

  • RIO 方式,即 Registered I/O

在 .NET/C# 中对于 Socket 的支持均是基于 Windows I/O Completion Ports 完成端口技术的封装,通过不同的 Non-Blocking 封装结构来满足不同的编程需求。以上方式均已在 Cowboy.Sockets 中有完整实现,并且 APM 和 TAP 方式已经在实际项目中应用。Cowboy.Sockets 还在不断的进化和完善中,如有任何问题请及时指正。

虽然有这么多种实现方式,但抽象的看,它们是一样一样的,用两个 Loop 即可描述:Accept Loop和 Read Loop,如下图所示。(这里提及的 "Loop" 指的是一种循环方式,而非特指 while/for 等关键字。)

  • 在任何 TCP Server 的实现中,一定存在一个 Accept Socket Loop,用于接收 Client 端的 Connect 请求以建立 TCP Connection。

  • 在任何 TCP Server 的实现中,一定存在一个 Read Socket Loop,用于接收 Client 端 Write 过来的数据。

如果 Accept 循环阻塞,则会导致无法快速的建立连接,服务端 Pending Backlog 满,进而导致 Client 端收到 Connect Timeout 的异常。如果 Read 循环阻塞,则显然会导致无法及时收到 Client 端发过来的数据,进而导致 Client 端 Send Buffer 满,无法再发送数据。

从实现细节的角度看,能够导致服务阻塞的位置可能在:

  1. Accept 到新的 Socket,构建新的 Connection 需要分配各种资源,分配资源慢;

  2. Accept 到新的 Socket,没有及时触发下一次 Accept;

  3. Read 到新的 Buffer,判定 Payload 消息长度,判定过程长;

  4. Read 到新的 Buffer,发现 Payload 还没有收全,继续 Read,则 "可能" 会导致一次 Buffer Copy;

  5. Payload 接收完毕,进行 De-Serialization 转成可识别的 Protocol Message,反序列化慢;

  6. 由 Business Module 来处理相应的 Protocol Message,处理过程慢;

1-2 涉及到 Accept 过程和 Connection 的建立过程,3-4 涉及到 ReceiveBuffer 的处理过程,5-6 涉及到应用逻辑侧的实现。

Java 中著名的 Netty 网络库从 4.0 版本开始对于 Buffer 部分做了全新的尝试,采用了名叫 ByteBuf 的设计,实现 Buffer Zero Copy 以减少高并发条件下 Buffer 拷贝带来的性能损失和 GC 压力。DotNetty,Orleans ,Helios 等项目正在尝试在 C# 中进行类似的 ByteBuf 的实现。

APM 方式:TcpSocketServer

TcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 APM 的 BeginXXX 和 EndXXX 接口实现。

TcpSocketServer 中的 Accept Loop 指的就是,

  • BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> ...

每一个建立成功的 Connection 由 TcpSocketSession 来处理,所以 TcpSocketSession 中会包含 Read Loop,

  • BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...

TcpSocketServer 通过暴露 Event 来实现 Connection 的建立与断开和数据接收的通知。

  event EventHandler<TcpClientConnectedEventArgs> ClientConnected;  event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;  event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;

使用也是简单直接,直接订阅事件通知。

  private static void StartServer(){_server = new TcpSocketServer(22222);_server.ClientConnected += server_ClientConnected;_server.ClientDisconnected += server_ClientDisconnected;_server.ClientDataReceived += server_ClientDataReceived;_server.Listen();}  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e){Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));}  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e){Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));}  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e){      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));Console.WriteLine(string.Format("{0}", text));_server.Broadcast(Encoding.UTF8.GetBytes(text));}

TAP 方式:AsyncTcpSocketServer

AsyncTcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 TAP 的 async/await 的 XXXAsync 接口实现。

然而,实际上 XXXAsync 并没有创建什么神奇的效果,其内部实现只是将 APM 的方法转换成了 TAP 的调用方式。

  //************* Task-based async public methods *************************[HostProtection(ExternalThreading = true)]  public Task<Socket> AcceptSocketAsync(){      return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);}[HostProtection(ExternalThreading = true)]  public Task<TcpClient> AcceptTcpClientAsync(){      return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);}

AsyncTcpSocketServer 中的 Accept Loop 指的就是,

  while (IsListening){      var tcpClient = await _listener.AcceptTcpClientAsync();}

每一个建立成功的 Connection 由 AsyncTcpSocketSession 来处理,所以 AsyncTcpSocketSession 中会包含 Read Loop,

  while (State == TcpSocketConnectionState.Connected){      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);}

为了将 async/await 异步到底,AsyncTcpSocketServer 所暴露的接口也同样是 Awaitable 的。

  public interface IAsyncTcpSocketServerMessageDispatcher{Task OnSessionStarted(AsyncTcpSocketSession session);Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);Task OnSessionClosed(AsyncTcpSocketSession session);}

使用时仅需将一个实现了该接口的对象注入到 AsyncTcpSocketServer 的构造函数中即可。

  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher{      public async Task OnSessionStarted(AsyncTcpSocketSession session){Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;}  public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count){          var text = Encoding.UTF8.GetString(data, offset, count);Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));Console.WriteLine(string.Format("{0}", text));  await session.SendAsync(Encoding.UTF8.GetBytes(text));}  public async Task OnSessionClosed(AsyncTcpSocketSession session){Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;}}

当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。

  public AsyncTcpSocketServer(IPEndPoint listenedEndPoint,Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,Func<AsyncTcpSocketSession, Task> onSessionStarted = null,Func<AsyncTcpSocketSession, Task> onSessionClosed = null,AsyncTcpSocketServerConfiguration configuration = null){}

SAEA 方式:TcpSocketSaeaServer

SAEA 是 SocketAsyncEventArgs 的简写。SocketAsyncEventArgs 是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。SocketAsyncEventArgs 相比于 APM 方式的主要优点可以描述如下:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.

也就是说,优点就是无需为每次调用都生成 IAsyncResult 等对象,向原生 Socket 更靠近一些。

使用 SocketAsyncEventArgs 的推荐步骤如下:

  1. Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.

  2. Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).

  3. Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.

  4. If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.

  5. If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.

  6. Reuse the context for another operation, put it back in the pool, or discard it.

重点在于池化(Pooling),池化的目的就是为了重用和减少运行时分配和垃圾回收的压力。

TcpSocketSaeaServer 即是对 SocketAsyncEventArgs 的应用和封装,并实现了 Pooling 技术。TcpSocketSaeaServer 中的重点是 SaeaAwaitable 类,SaeaAwaitable 中内置了一个 SocketAsyncEventArgs,并通过 GetAwaiter 返回 SaeaAwaiter 来支持 async/await 操作。同时,通过 SaeaExtensions 扩展方法对来扩展 SocketAsyncEventArgs 的 Awaitable 实现。

  public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)

SaeaPool 则是一个 QueuedObjectPool<SaeaAwaitable> 的衍生实现,用于池化 SaeaAwaitable 实例。同时,为了减少 TcpSocketSaeaSession 的构建过程,也实现了 SessionPool 即 QueuedObjectPool<TcpSocketSaeaSession>。

TcpSocketSaeaServer 中的 Accept Loop 指的就是,

  while (IsListening){      var saea = _acceptSaeaPool.Take();  var socketError = await _listener.AcceptAsync(saea);      if (socketError == SocketError.Success){          var acceptedSocket = saea.Saea.AcceptSocket;}_acceptSaeaPool.Return(saea);}

每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,

  var saea = _saeaPool.Take();saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);  while (State == TcpSocketConnectionState.Connected){saea.Saea.SetBuffer(0, _receiveBuffer.Length);  var socketError = await _socket.ReceiveAsync(saea);      if (socketError != SocketError.Success)          break;  var receiveCount = saea.Saea.BytesTransferred;      if (receiveCount == 0)          break;}

同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。

  public interface ITcpSocketSaeaServerMessageDispatcher{Task OnSessionStarted(TcpSocketSaeaSession session);Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);Task OnSessionClosed(TcpSocketSaeaSession session);}

使用起来也是简单直接:

  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher{      public async Task OnSessionStarted(TcpSocketSaeaSession session){Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;}  public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count){          var text = Encoding.UTF8.GetString(data, offset, count);Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));Console.WriteLine(string.Format("{0}", text));  await session.SendAsync(Encoding.UTF8.GetBytes(text));}  public async Task OnSessionClosed(TcpSocketSaeaSession session){Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;}}

RIO 方式:TcpSocketRioServer

从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions来支持高性能 Socket 服务的实现,简称 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue

  • RIOCreateCompletionQueue

  • RIOCreateRequestQueue

  • RIODequeueCompletion

  • RIODeregisterBuffer

  • RIONotify

  • RIOReceive

  • RIOReceiveEx

  • RIORegisterBuffer

  • RIOResizeCompletionQueue

  • RIOResizeRequestQueue

  • RIOSend

  • RIOSendEx

到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。

Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。

同样,通过 TcpSocketRioServer 来实现 Accept Loop,

_listener.OnAccepted = (acceptedSocket) =>{Task.Run(async () =>{        await Process(acceptedSocket);}).Forget();
};

通过 TcpSocketRioSession 来处理 Read Loop,

  while (State == TcpSocketConnectionState.Connected){      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);      if (receiveCount == 0)          break;}

测试代码一如既往的类似:

  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher{      public async Task OnSessionStarted(TcpSocketRioSession session){          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));Console.WriteLine(string.Format("TCP session has connected {0}.", session));          await Task.CompletedTask;}  public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count){          var text = Encoding.UTF8.GetString(data, offset, count);          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));Console.Write(string.Format("Client : --> "));Console.WriteLine(string.Format("{0}", text));  await session.SendAsync(Encoding.UTF8.GetBytes(text));}  public async Task OnSessionClosed(TcpSocketRioSession session){Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;}}



友情公众号推荐:


玩闹天津


每日都发布最新资讯消息,各种搞笑娱乐新闻等你来转发~带你玩转天津酷炫不能停!微信号:daqin_NBTJ玩闹天津!奏是要闹起来!长按识别图中二维码,或者扫描下方二维码,即可关注哟~


官方微信扫我们


点击下方阅读原文有惊喜等着你~




内容转载自公众号

程序员联盟
程序员联盟
了解更多

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/327158.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql中 where in 用法详解

https://blog.csdn.net/haibo0668/article/details/52584307 sssss mysql中 where in 用法详解 我是高手高手高高手 2016-09-19 11:33:29 102915 收藏 14 分类专栏&#xff1a; php MY SQL &#xfeff;&#xfeff; 这里分两种情况来介绍 WHERE column IN (value1,valu…

Servlet使用适配器模式进行增删改查案例(Dept.java)

package org.entity;/*** * * 项目名称&#xff1a;test_BaseDao * 类名称&#xff1a;Dept * 类描述&#xff1a; 部门表的实体类 * 创建人&#xff1a;Mu Xiongxiong * 创建时间&#xff1a;2017-9-13 上午8:44:37 * 修改人&#xff1a;Mu Xiongxiong *…

HashSet的存储方式

存储结构 存储过程

使用org.apache.commons.io.FileUtils,IOUtils工具类操作文件

转载自 使用org.apache.commons.io.FileUtils,IOUtils;工具类操作文件 File src new File("G:/2012/portal/login.jsp"); File tar new File("G:/2012/portal/loginZs.jsp"); File tarDir new File("G:/2012/portal/center/"); FileUti…

写给新手的WebAPI实践

此篇是写给新手的Demo&#xff0c;用于参考和借鉴&#xff0c;用于发散思路。老鸟可以忽略了。 自己经常有这种情况&#xff0c;遇到一个新东西或难题&#xff0c;在了解和解决之前总是说“等搞定了一定要写篇文章记录下来”&#xff0c;但是当掌握了之后&#xff0c;就感觉好简…

用limit 实现java的简单分页

https://blog.csdn.net/xinyuezitang/article/details/84324359 用limit 实现java的简单分页 xinyuezitang 2018-11-21 16:01:13 4447 收藏 9 分类专栏&#xff1a; Java 小Demo 文章标签&#xff1a; 分页 limit mysql 实现java分页 版权 一 mysql 中limit 用法 select …

Map集合的遍历

COPY/*** Map接口的使用* 特点&#xff1a;1.存储键值对 2.键不能重复&#xff0c;值可以重复 3.无序*/ public class Demo1 {public static void main(String[] args) {Map<String,Integer> mapnew HashMap<String, Integer>();//1.添加元素map.put("tang&qu…

Servlet使用适配器模式进行增删改查案例(IBaseDaoUtil.java)

/*** */ package org.dao;import java.util.List;/*** * * 项目名称&#xff1a;test_BaseDao * 类名称&#xff1a;IBaseDaoUtil * 类描述&#xff1a; 公共接口 * 创建人&#xff1a;Mu Xiongxiong * 创建时间&#xff1a;2017-9-10 上午11:02:57 * 修改人…

Mybatis使用IN语句查询

https://blog.csdn.net/u011781521/article/details/79669180 Mybatis使用IN语句查询 lfendo 2018-03-23 16:45:03 201525 收藏 140 分类专栏&#xff1a; MyBatis 文章标签&#xff1a; mybatis JAVA sql 版权 一、简介 在SQL语法中如果我们想使用in的话直接可以像如下一…

IdentityServer4 实现 OpenID Connect 和 OAuth 2.0

关于 OAuth 2.0 的相关内容&#xff0c;点击查看&#xff1a;ASP.NET WebApi OWIN 实现 OAuth 2.0 OpenID 是一个去中心化的网上身份认证系统。对于支持 OpenID 的网站&#xff0c;用户不需要记住像用户名和密码这样的传统验证标记。取而代之的是&#xff0c;他们只需要预先在一…

Java动态代理机制详解(JDK 和CGLIB,Javassist,ASM)

转载自 Java动态代理机制详解&#xff08;JDK 和CGLIB&#xff0c;Javassist&#xff0c;ASM&#xff09; class文件简介及加载 Java编译器编译好Java文件之后&#xff0c;产生.class 文件在磁盘中。这种class文件是二进制文件&#xff0c;内容是只有JVM虚拟机能够识别的机器码…

Servlet使用适配器模式进行增删改查案例(IDeptDao.java和IEmpDao.java)

这两个接口进行放置emp和dept单独接口 /*** */ package org.dao;import org.entity.Dept;/*** * * 项目名称&#xff1a;test_BaseDao * 类名称&#xff1a;IDeptDao * 类描述&#xff1a; 部门的私有接口&#xff0c;此接口专属于部门独立的功能公共的都在IBaseDaoUt…

彻底理解JAVA动态代理

转载自 彻底理解JAVA动态代理 代理设计模式 定义&#xff1a;为其他对象提供一种代理以控制对这个对象的访问。 代理模式的结构如下图所示。 动态代理使用 java动态代理机制以巧妙的方式实现了代理模式的设计理念。 代理模式示例代码 public interface Subject { pu…

docker 安装redis 挂载到宿主机

1.首先去redis获取对应版本的配置文件redis.conf&#xff1a; http://download.redis.io/releases/ 我选择的是 6.0.9 解压以后 有一个redis.conf 2.将 bind 127.0.0.1注释&#xff0c;daemonize yes注释掉&#xff0c;如果需要redis密码则找到 requirepass 并填上你的密码 …

Servlet使用适配器模式进行增删改查案例(BaseDaoUtilImpl.java)

/*** */ package org.dao.impl;import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List;import org.dao.BaseDao; import org.entity.Dept; import org.entity.Emp;/*** * * 项目名称&#xff1a;test_BaseDao …

搭建consul 集群

上图是官网提供的一个事例系统图&#xff0c;图中的Server是consul服务端高可用集群&#xff0c;Client是consul客户端。consul客户端不保存数据&#xff0c;客户端将接收到的请求转发给响应的Server端。Server之间通过局域网或广域网通信实现数据一致性。每个Server或Client都…

Java类加载的那些事

转载自 Java类加载的那些事 前言 Java源代码被编译成class字节码&#xff0c;最终需要加载到虚拟机中才能运行。整个生命周期包括&#xff1a;加载、验证、准备、解析、初始化、使用和卸载7个阶段。 加载 1、通过一个类的全限定名获取描述此类的二进制字节流&#xff1b; …

JAVA集合(笔记)

集合简介 概念&#xff1a;对象的容器&#xff0c;定义了对多个对象进项操作的的常用方法。可实现数组的功能。和数组的区别&#xff1a; 数组长度固定&#xff0c;集合长度不固定。数组可以存储基本类型和引用类型&#xff0c;集合只能存储引用类型。 位置&#xff1a; jav…

部署kafka kafka的service容器和zookeeper kafka客户端 Elasticsearch的客户端

创建network docker network create -d overlay --attachable loc_net docker stack up -c kafka.yml kafka docker stack up -c kafdrop.yml kafdrop docker stack up -c els.yml els docker stack ls docker service ls 查看service的启动错误原因 docker service ps d…