C# 使用.NET的SocketAsyncEventArgs实现高效能多并发TCPSocket通信

简介:

 SocketAsyncEventArgs是一个套接字操作得类,主要作用是实现socket消息的异步接收和发送,跟Socket的BeginSend和BeginReceive方法异步处理没有多大区别,它的优势在于完成端口的实现来处理大数据的并发情况。

  • BufferManager类, 管理传输流的大小
  • SocketEventPool类: 管理SocketAsyncEventArgs的一个应用池. 有效地重复使用.
  •  AsyncUserToken类: 这个可以根据自己的实际情况来定义.主要作用就是存储客户端的信息.
  • SocketManager类: 核心,实现Socket监听,收发信息等操作.
  • 额外功能   1.自动检测无效连接并断开    2.自动释放资源

BufferManager类

using System;  
using System.Collections.Generic;  
using System.Linq;  
using System.Net.Sockets;  
using System.Text;  namespace Plates.Service  
{  class BufferManager  {  int m_numBytes;                 // the total number of bytes controlled by the buffer pool  byte[] m_buffer;                // the underlying byte array maintained by the Buffer Manager  Stack<int> m_freeIndexPool;     //   int m_currentIndex;  int m_bufferSize;  public BufferManager(int totalBytes, int bufferSize)  {  m_numBytes = totalBytes;  m_currentIndex = 0;  m_bufferSize = bufferSize;  m_freeIndexPool = new Stack<int>();  }  // Allocates buffer space used by the buffer pool  public void InitBuffer()  {  // create one big large buffer and divide that   // out to each SocketAsyncEventArg object  m_buffer = new byte[m_numBytes];  }  // Assigns a buffer from the buffer pool to the   // specified SocketAsyncEventArgs object  //  // <returns>true if the buffer was successfully set, else false</returns>  public bool SetBuffer(SocketAsyncEventArgs args)  {  if (m_freeIndexPool.Count > 0)  {  args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);  }  else  {  if ((m_numBytes - m_bufferSize) < m_currentIndex)  {  return false;  }  args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);  m_currentIndex += m_bufferSize;  }  return true;  }  // Removes the buffer from a SocketAsyncEventArg object.    // This frees the buffer back to the buffer pool  public void FreeBuffer(SocketAsyncEventArgs args)  {  m_freeIndexPool.Push(args.Offset);  args.SetBuffer(null, 0, 0);  }  }  
}  

    

SocketEventPool类:

using System;  
using System.Collections.Generic;  
using System.Linq;  
using System.Net.Sockets;  
using System.Text;  namespace Plates.Service  
{  class SocketEventPool  {  Stack<SocketAsyncEventArgs> m_pool;  public SocketEventPool(int capacity)  {  m_pool = new Stack<SocketAsyncEventArgs>(capacity);  }  public void Push(SocketAsyncEventArgs item)  {  if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }  lock (m_pool)  {  m_pool.Push(item);  }  }  // Removes a SocketAsyncEventArgs instance from the pool  // and returns the object removed from the pool  public SocketAsyncEventArgs Pop()  {  lock (m_pool)  {  return m_pool.Pop();  }  }  // The number of SocketAsyncEventArgs instances in the pool  public int Count  {  get { return m_pool.Count; }  }  public void Clear()  {  m_pool.Clear();  }  }  
}  

 AsyncUserToken类

using System;  
using System.Collections;  
using System.Collections.Generic;  
using System.Linq;  
using System.Net;  
using System.Net.Sockets;  
using System.Text;  namespace Plates.Service  
{  class AsyncUserToken  {  /// <summary>  /// 客户端IP地址  /// </summary>  public IPAddress IPAddress { get; set; }  /// <summary>  /// 远程地址  /// </summary>  public EndPoint Remote { get; set; }  /// <summary>  /// 通信SOKET  /// </summary>  public Socket Socket { get; set; }  /// <summary>  /// 连接时间  /// </summary>  public DateTime ConnectTime { get; set; }  /// <summary>  /// 所属用户信息  /// </summary>  public UserInfoModel UserInfo { get; set; }  /// <summary>  /// 数据缓存区  /// </summary>  public List<byte> Buffer { get; set; }  public AsyncUserToken()  {  this.Buffer = new List<byte>();  }  }  
}  

  SocketManager类

using Plates.Common;  
using System;  
using System.Collections;  
using System.Collections.Generic;  
using System.Linq;  
using System.Net;  
using System.Net.Sockets;  
using System.Text;  
using System.Threading;  namespace Plates.Service  
{  class SocketManager  {  private int m_maxConnectNum;    //最大连接数  private int m_revBufferSize;    //最大接收字节数  BufferManager m_bufferManager;  const int opsToAlloc = 2;  Socket listenSocket;            //监听Socket  SocketEventPool m_pool;  int m_clientCount;              //连接的客户端数量  Semaphore m_maxNumberAcceptedClients;  List<AsyncUserToken> m_clients; //客户端列表  #region 定义委托  /// <summary>  /// 客户端连接数量变化时触发  /// </summary>  /// <param name="num">当前增加客户的个数(用户退出时为负数,增加时为正数,一般为1)</param>  /// <param name="token">增加用户的信息</param>  public delegate void OnClientNumberChange(int num, AsyncUserToken token);  /// <summary>  /// 接收到客户端的数据  /// </summary>  /// <param name="token">客户端</param>  /// <param name="buff">客户端数据</param>  public delegate void OnReceiveData(AsyncUserToken token, byte[] buff);  #endregion  #region 定义事件  /// <summary>  /// 客户端连接数量变化事件  /// </summary>  public event OnClientNumberChange ClientNumberChange;  /// <summary>  /// 接收到客户端的数据事件  /// </summary>  public event OnReceiveData ReceiveClientData;  #endregion  #region 定义属性  /// <summary>  /// 获取客户端列表  /// </summary>  public List<AsyncUserToken> ClientList { get { return m_clients; } }  #endregion  /// <summary>  /// 构造函数  /// </summary>  /// <param name="numConnections">最大连接数</param>  /// <param name="receiveBufferSize">缓存区大小</param>  public SocketManager(int numConnections, int receiveBufferSize)  {  m_clientCount = 0;  m_maxConnectNum = numConnections;  m_revBufferSize = receiveBufferSize;  // allocate buffers such that the maximum number of sockets can have one outstanding read and   //write posted to the socket simultaneously    m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize);  m_pool = new SocketEventPool(numConnections);  m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);  }  /// <summary>  /// 初始化  /// </summary>  public void Init()  {  // Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds   // against memory fragmentation  m_bufferManager.InitBuffer();  m_clients = new List<AsyncUserToken>();  // preallocate pool of SocketAsyncEventArgs objects  SocketAsyncEventArgs readWriteEventArg;  for (int i = 0; i < m_maxConnectNum; i++)  {  readWriteEventArg = new SocketAsyncEventArgs();  readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);  readWriteEventArg.UserToken = new AsyncUserToken();  // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object  m_bufferManager.SetBuffer(readWriteEventArg);  // add SocketAsyncEventArg to the pool  m_pool.Push(readWriteEventArg);  }  }  /// <summary>  /// 启动服务  /// </summary>  /// <param name="localEndPoint"></param>  public bool Start(IPEndPoint localEndPoint)  {  try  {  m_clients.Clear();  listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  listenSocket.Bind(localEndPoint);  // start the server with a listen backlog of 100 connections  listenSocket.Listen(m_maxConnectNum);  // post accepts on the listening socket  StartAccept(null);  return true;  }  catch (Exception)  {  return false;  }  }  /// <summary>  /// 停止服务  /// </summary>  public void Stop()  {  foreach (AsyncUserToken token in m_clients)  {  try  {  token.Socket.Shutdown(SocketShutdown.Both);  }  catch (Exception) { }  }  try  {  listenSocket.Shutdown(SocketShutdown.Both);  }  catch (Exception) { }  listenSocket.Close();  int c_count = m_clients.Count;  lock (m_clients) { m_clients.Clear(); }  if (ClientNumberChange != null)  ClientNumberChange(-c_count, null);  }  public void CloseClient(AsyncUserToken token)  {  try  {  token.Socket.Shutdown(SocketShutdown.Both);  }  catch (Exception) { }  }  // Begins an operation to accept a connection request from the client   //  // <param name="acceptEventArg">The context object to use when issuing   // the accept operation on the server's listening socket</param>  public void StartAccept(SocketAsyncEventArgs acceptEventArg)  {  if (acceptEventArg == null)  {  acceptEventArg = new SocketAsyncEventArgs();  acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);  }  else  {  // socket must be cleared since the context object is being reused  acceptEventArg.AcceptSocket = null;  }  m_maxNumberAcceptedClients.WaitOne();  if (!listenSocket.AcceptAsync(acceptEventArg))  {  ProcessAccept(acceptEventArg);  }  }  // This method is the callback method associated with Socket.AcceptAsync   // operations and is invoked when an accept operation is complete  //  void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)  {  ProcessAccept(e);  }  private void ProcessAccept(SocketAsyncEventArgs e)  {  try  {  Interlocked.Increment(ref m_clientCount);  // Get the socket for the accepted client connection and put it into the   //ReadEventArg object user token  SocketAsyncEventArgs readEventArgs = m_pool.Pop();  AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken;  userToken.Socket = e.AcceptSocket;  userToken.ConnectTime = DateTime.Now;  userToken.Remote = e.AcceptSocket.RemoteEndPoint;  userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address;  lock (m_clients) { m_clients.Add(userToken); }  if (ClientNumberChange != null)  ClientNumberChange(1, userToken);  if (!e.AcceptSocket.ReceiveAsync(readEventArgs))  {  ProcessReceive(readEventArgs);  }  }  catch (Exception me)  {  RuncomLib.Log.LogUtils.Info(me.Message + "\r\n" + me.StackTrace);  }  // Accept the next connection request  if (e.SocketError == SocketError.OperationAborted) return;  StartAccept(e);  }  void IO_Completed(object sender, SocketAsyncEventArgs e)  {  // determine which type of operation just completed and call the associated handler  switch (e.LastOperation)  {  case SocketAsyncOperation.Receive:  ProcessReceive(e);  break;  case SocketAsyncOperation.Send:  ProcessSend(e);  break;  default:  throw new ArgumentException("The last operation completed on the socket was not a receive or send");  }  }  // This method is invoked when an asynchronous receive operation completes.   // If the remote host closed the connection, then the socket is closed.    // If data was received then the data is echoed back to the client.  //  private void ProcessReceive(SocketAsyncEventArgs e)  {  try  {  // check if the remote host closed the connection  AsyncUserToken token = (AsyncUserToken)e.UserToken;  if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)  {  //读取数据  byte[] data = new byte[e.BytesTransferred];  Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);  lock (token.Buffer)  {  token.Buffer.AddRange(data);  }  //注意:你一定会问,这里为什么要用do-while循环?   //如果当客户发送大数据流的时候,e.BytesTransferred的大小就会比客户端发送过来的要小,  //需要分多次接收.所以收到包的时候,先判断包头的大小.够一个完整的包再处理.  //如果客户短时间内发送多个小数据包时, 服务器可能会一次性把他们全收了.  //这样如果没有一个循环来控制,那么只会处理第一个包,  //剩下的包全部留在token.Buffer中了,只有等下一个数据包过来后,才会放出一个来.  do  {  //判断包的长度  byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray();  int packageLen = BitConverter.ToInt32(lenBytes, 0);  if (packageLen > token.Buffer.Count - 4)  {   //长度不够时,退出循环,让程序继续接收  break;  }  //包够长时,则提取出来,交给后面的程序去处理  byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray();  //从数据池中移除这组数据  lock (token.Buffer)  {  token.Buffer.RemoveRange(0, packageLen + 4);  }  //将数据包交给后台处理,这里你也可以新开个线程来处理.加快速度.  if(ReceiveClientData != null)  ReceiveClientData(token, rev);  //这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.  //若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.  } while (token.Buffer.Count > 4);  //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明  if (!token.Socket.ReceiveAsync(e))  this.ProcessReceive(e);  }  else  {  CloseClientSocket(e);  }  }  catch (Exception xe)  {  RuncomLib.Log.LogUtils.Info(xe.Message + "\r\n" + xe.StackTrace);  }  }  // This method is invoked when an asynchronous send operation completes.    // The method issues another receive on the socket to read any additional   // data sent from the client  //  // <param name="e"></param>  private void ProcessSend(SocketAsyncEventArgs e)  {  if (e.SocketError == SocketError.Success)  {  // done echoing data back to the client  AsyncUserToken token = (AsyncUserToken)e.UserToken;  // read the next block of data send from the client  bool willRaiseEvent = token.Socket.ReceiveAsync(e);  if (!willRaiseEvent)  {  ProcessReceive(e);  }  }  else  {  CloseClientSocket(e);  }  }  //关闭客户端  private void CloseClientSocket(SocketAsyncEventArgs e)  {  AsyncUserToken token = e.UserToken as AsyncUserToken;  lock (m_clients) { m_clients.Remove(token); }  //如果有事件,则调用事件,发送客户端数量变化通知  if (ClientNumberChange != null)  ClientNumberChange(-1, token);  // close the socket associated with the client  try  {  token.Socket.Shutdown(SocketShutdown.Send);  }  catch (Exception) { }  token.Socket.Close();  // decrement the counter keeping track of the total number of clients connected to the server  Interlocked.Decrement(ref m_clientCount);  m_maxNumberAcceptedClients.Release();  // Free the SocketAsyncEventArg so they can be reused by another client  e.UserToken = new AsyncUserToken();  m_pool.Push(e);  }  /// <summary>  /// 对数据进行打包,然后再发送  /// </summary>  /// <param name="token"></param>  /// <param name="message"></param>  /// <returns></returns>  public void SendMessage(AsyncUserToken token, byte[] message)  {  if (token == null || token.Socket == null || !token.Socket.Connected)  return;  try  {  //对要发送的消息,制定简单协议,头4字节指定包的大小,方便客户端接收(协议可以自己定)  byte[] buff = new byte[message.Length + 4];  byte[] len = BitConverter.GetBytes(message.Length);  Array.Copy(len, buff, 4);  Array.Copy(message, 0, buff, 4, message.Length);  //token.Socket.Send(buff);  //这句也可以发送, 可根据自己的需要来选择  //新建异步发送对象, 发送消息  SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();  sendArg.UserToken = token;  sendArg.SetBuffer(buff, 0, buff.Length);  //将数据放置进去.  token.Socket.SendAsync(sendArg);  }  catch (Exception e){  RuncomLib.Log.LogUtils.Info("SendMessage - Error:" + e.Message);  }  }  }  
}  

使用方法:

SocketManager m_socket = new SocketManager(200, 1024);  

m_socket.Init();  

m_socket.Start(new IPEndPoint(IPAddress.Any, 13909));  

//m_socket.Stop();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/127012.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

服务熔断保护实践--Sentinal

目录 概述 环境说明 步骤 Sentinel服务端 Sentinel客户端 依赖 在客户端配置sentinel参数 测试 保护规则设置 设置资源名 设置默认的熔断规则 RestTemplate的流控规则 Feign的流控规则 概述 微服务有很多互相调用的服务&#xff0c;构成一系列的调用链路&#xf…

蜜罐系统HFish的部署与功能实测

1. 引入 根据参考1对蜜罐的定义&#xff1a; 蜜罐&#xff08;Honeypot&#xff09;是一个计算机科学领域的术语&#xff0c;指用于检测或防御未经授权的行为或黑客攻击的陷阱。其名称来源于其工作原理类似于用来诱捕昆虫的蜜罐。蜜罐通常伪装成看似有利用价值的网路、资料、…

3.15每日一题(分部积分求不定积分)

解法一&#xff1a;令lnx等于t&#xff1b;求出x与t的关系&#xff0c;带入f(lnx)的式子中&#xff1b;通过凑微分&#xff0c;分部积分等方法求出答案 注&#xff1a;在分部积分后&#xff0c;求不定积分时 &#xff08;1&#xff09;可以加项减项拆的方法求&#xff08;常规…

61. 旋转链表、Leetcode的Python实现

博客主页&#xff1a;&#x1f3c6;李歘歘的博客 &#x1f3c6; &#x1f33a;每天不定期分享一些包括但不限于计算机基础、算法、后端开发相关的知识点&#xff0c;以及职场小菜鸡的生活。&#x1f33a; &#x1f497;点关注不迷路&#xff0c;总有一些&#x1f4d6;知识点&am…

小程序day02

目标 WXML模板语法 数据绑定 事件绑定 那麽問題來了&#xff0c;一次點擊會觸發兩個組件事件的話&#xff0c;該怎么阻止事件冒泡呢&#xff1f; 文本框和data的双向绑定 注意点: 只在标签里面用value“{{info}}”&#xff0c;只会是info到文本框的单向绑定&#xff0c;必须在…

【项目源码解析】某3C产品自动光学检测系统

解决方案源码解析思维导图 一、带有桁架机械手的自动光学检测系统介绍 二、关于机械手运动控制&#xff08;是否需要机器人学方面的知识&#xff09; 机械手的运动控制不需要深入了解机器人学方面的知识的情况包括&#xff1a; 预配置和任务单一性&#xff1a;如果机械手已经预…

Unity地面交互效果——2、动态法线贴图实现轨迹效果

Unity引擎动态法线贴图制作球滚动轨迹 大家好&#xff0c;我是阿赵。   之前说了一个使用局部UV采样来实现轨迹的方法。这一篇在之前的基础上&#xff0c;使用法线贴图进行凹凸轨迹的绘制。 一、实现的目标 先来回顾一下&#xff0c;上一篇最终我们已经绘制了一个轨迹的贴图…

NI‑9237国产化50 kS/s/ch,桥模拟输入,4通道C系列应变/桥输入模块

50 kS/s/ch&#xff0c;桥模拟输入&#xff0c;4通道C系列应变/桥输入模块 NI‑9237提供了所有的信号调理功能来实现多达四个基于桥的传感器的供电和测量。该模块提供通道间零相位延迟的应变或负载测量。它还具有60 VDC隔离和1&#xff0c;000 Vrms瞬态隔离&#xff0c;提供高…

java基础--多线程学习

写在前面&#xff1a; 多线程在面试中问的很多&#xff0c;之前没有过系统的学习&#xff0c;现在来进行一个系统的总结学习 文章目录 基础java多线程实现无参无返回值线程快速创建start和run方法的探讨run方法线程状态 有返回值线程线程池执行小结关于抛出异常的扩展 线程方…

云安全—kubelet攻击面

0x00 前言 虽然说总结的是kubelet的攻击面&#xff0c;但是在总结攻击面之前还是需要去了解kubelet的基本原理&#xff0c;虽然说我们浅尝即止&#xff0c;但是还是要以能给别人讲出来为基本原则。 其他文章: 云安全—K8s APi Server 6443 攻击面云安全—K8S API Server 未授…

解决【spring boot】Process finished with exit code 0的问题

文章目录 1. 复现错误2. 分析错误3. 解决问题 1. 复现错误 今天从https://start.spring.io下载配置好的spring boot项目&#xff1a; 启动后却报出如下错误&#xff1a; 即Process finished with exit code 0 2. 分析错误 Process finished with exit code 0翻译成中文进程已完…

USART HMI串口屏+GPS模块显示时间和经纬度

USART HMI串口屏GPS模块显示时间和经纬度 &#x1f4cd;相关篇《基于u-box GPS模块通过串口指令调整输出信息》 &#x1f4cb;在不使用其他单片机做数据中转处理情况下&#xff0c;利用USART HMI串口屏主动解析模式&#xff0c;来接收并解析GPS模块数据并显示&#xff0c;功能包…

k8s-调度约束

目录 工作机制 调度过程 指定调度节点 Kubernetes 是通过 List-Watch 的机制进行每个组件的协作&#xff0c;保持数据同步的&#xff0c;每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件&#xff0c;向 APIServer 发送命令&#xff0c;在 Node 节点上面…

Linux 命令速查

Network ping ping -c 3 -i 0.01 127.0.0.1 # -c 指定次数 # -i 指定时间间隔 日志 一般存放位置&#xff1a; /var/log&#xff0c;包含&#xff1a;系统连接日志 进程统计 错误日志 常见日志文件说明 日志功能access-logweb服务访问日志acct/pacct用户命令btmp记录失…

打造中国汽车出海新名片,比亚迪亮相东京车展

作为全球知名的国际车展&#xff0c;东京车展向来都被业界人士誉为“亚洲汽车风向标”。2023年10月25日&#xff0c;第47届东京车展&#xff08;自2023年更名为“日本移动出行展”&#xff09;在东京国际展览中心如期揭幕。 作为中国车企的代表品牌&#xff0c;比亚迪携海豹、海…

el-date-picker 控件,获取到的日期比选择的日期少1天

element-ui的el-date-picker日期控件日期少一天_el-date-picker 时间误差一个小时-CSDN博客 加一个日期控件时&#xff0c;发现实际获取到的值比选中的日期少一天。 解决方法&#xff1a;加一个 value-format"yyyy-MM-dd" <el-col :span"24" class&qu…

8+双疾病+WGCNA+多机器学习筛选疾病的共同靶点并验证表达

今天给同学们分享一篇双疾病WGCNA多机器学习的生信文章“Shared diagnostic genes and potential mechanism between PCOS and recurrent implantation failure revealed by integrated transcriptomic analysis and machine learning”&#xff0c;这篇文章于2023年5月16日发表…

Springboot使用EasyExcel导入导出Excel文件

1&#xff0c;准备Excel文件和数据库表结果 2&#xff0c;导入代码 1&#xff0c;引入依赖 <!-- https://mvnrepository.com/artifact/com.alibaba/easyexcel --><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifac…

STM32—PWM开发SG90舵机

目录 PWM介绍 PWM输出模式&#xff1a; ​编辑PWM占空比&#xff1a; PWM周期与频率公式&#xff1a;​编辑 SG90舵机介绍 1. 什么是舵机 2. 怎么控制舵机 SG90舵机介绍实战 1. 在 SYS 选项里&#xff0c;将 Debug 设为 Serial Wire​编辑 2. 将 RCC 里的 HSE 设置为 …

代码随想录算法训练营第四十天丨 动态规划part03

343. 整数拆分 思路 看到这道题目&#xff0c;都会想拆成两个呢&#xff0c;还是三个呢&#xff0c;还是四个.... 来看一下如何使用动规来解决。 动态规划 动规五部曲&#xff0c;分析如下&#xff1a; 确定dp数组&#xff08;dp table&#xff09;以及下标的含义 dp[i]…