Unity处理Socket粘包拆包

现在游戏协议的数据格式基本上都是用protobuf协议格式,而protobuf最后会转换为二进制,所以这个例子实现的逻辑的也是二进制的处理。

处理粘包拆包的逻辑主要是在DecodePackage方法中。

using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Msg;
using UnityEngine;
using UnityEngine.Events;namespace Billiard.Server
{public class SocketMgr : SingleMgr<SocketMgr>{private bool m_nYesInit = false;private Queue<byte[]> m_SendQueue = new Queue<byte[]>();private Queue<byte[]> m_ReceiveQueue = new Queue<byte[]>();private MemoryStream m_ReceiveStream = null;private bool m_IsConnected = false;private string m_IP = string.Empty;private int m_Port = int.MaxValue;private Socket m_Socket = null;private IPEndPoint endPoint;private bool sendCompletedAsync = true;private bool receiveCompletedAsync = true;private readonly object IncomingMessageLock = new object();private readonly object OutcomingMessageLock = new object();private byte[] tempBufferArr = new byte[0];private Thread sendThread;private Thread receiveThread;private SocketAsyncEventArgsPool m_readWritePool;public void Init(){if (!m_nYesInit){OnMsg<object>((int) ENetworkMessage.Net_SyncEntityID, OnSyncEntityID);OnMsg<object>((int) ENetworkMessage.Net_HeartBeat, OnHeartBeat);OnMsg<object>((int) ENetworkMessage.Net_HadConnect, OnNetHadConnect);}m_nYesInit = true;}public void Connect(string ip, int port){m_IP = ip;m_Port = port;m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);endPoint = new IPEndPoint(IPAddress.Parse(m_IP), m_Port);try{SocketAsyncEventArgs args = new SocketAsyncEventArgs(); //创建连接参数对象args.RemoteEndPoint = endPoint;args.Completed += OnConnectedCompleted; //添加连接创建成功监听m_Socket.ConnectAsync(args);m_ReceiveStream = new MemoryStream();}catch (Exception e){Debug.LogWarning("连接错误! " + e.Message);CloseSocket();}}private void OnConnectedCompleted(object sender, SocketAsyncEventArgs args){try{if (args.SocketError != SocketError.Success){CloseSocket();}else{Debug.Log($"连接服务器成功! ip:{args.ConnectSocket.LocalEndPoint} ThreadId:{Thread.CurrentThread.ManagedThreadId.ToString()}");m_IsConnected = true;m_readWritePool = new SocketAsyncEventArgsPool(10, IoCompleted);sendThread = new Thread(SendMsgThread);sendThread.IsBackground = true;sendThread.Start();receiveThread = new Thread(ReceiveMsgThread);receiveThread.IsBackground = true;receiveThread.Start();doSyncEntityID();}}catch (Exception e){Debug.Log("开启接收数据异常" + e.Message);CloseSocket();}}private void ReceiveMsgThread(object obj){while (m_IsConnected){if (m_Socket.Available > 0){if (!receiveCompletedAsync ){Debug.LogWarning("---<<< 异步任务还未处理完成!");return;}try{SocketAsyncEventArgs e = m_readWritePool.Pop();byte[] buffer = new byte[8192];e.SetBuffer(buffer, 0, buffer.Length);receiveCompletedAsync = m_Socket.ReceiveAsync(e);}catch (Exception e){Debug.LogWarning($"<<< error! {e} | 当前容量:{m_readWritePool.Count}");}}}}public void ReceiveCallback(SocketAsyncEventArgs e){if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success){if (e.BytesTransferred <= 0){Debug.Log("---<<< 空数据不处理!");return;}byte[] bytes = new byte[e.BytesTransferred];Buffer.BlockCopy(e.Buffer, 0, bytes, 0, bytes.Length);lock (IncomingMessageLock){m_ReceiveQueue.Enqueue(bytes);}}else{//TODO }}private void SendMsgThread(object obj){while (m_IsConnected){//如果消息队列中有消息,则发送消息if (m_SendQueue.Count > 0){if (!sendCompletedAsync){Debug.LogWarning("--->>> 异步发送io还未处理完成!");return;}lock (OutcomingMessageLock){try{byte[] sendBuffer = m_SendQueue.Dequeue();SocketAsyncEventArgs e = m_readWritePool.Pop();e.SetBuffer(sendBuffer, 0, sendBuffer.Length);sendCompletedAsync = m_Socket.SendAsync(e);}catch (Exception e){Debug.LogWarning($">>> error! {e.Message}");}}}}}private void SendCallback(SocketAsyncEventArgs e){if (!m_Socket.Connected){OnClose();return;}if (e.SocketError == SocketError.Success){sendCompletedAsync = true;}else{Debug.LogWarning($"--->>> 发送数据失败!");}}void IoCompleted(object sender, SocketAsyncEventArgs e){switch (e.LastOperation){case SocketAsyncOperation.Receive:ReceiveCallback(e);m_readWritePool.Push(e);break;case SocketAsyncOperation.Send:SendCallback(e);m_readWritePool.Push(e);break;default:throw new ArgumentException("The last operation completed on the socket was not a receive or send");}}public void MsgUpdate(){if (m_ReceiveQueue.Count > 0){lock (IncomingMessageLock){DecodePackage(m_ReceiveQueue.Dequeue());}}}private void DecodePackage(byte[] buffer){int length = buffer.Length;//处理拆包、粘包if (length >= OFFSET_SIZE + BYTE_SIZE + BYTE_SIZE){if (tempBufferArr.Length > 0){buffer = tempBufferArr.Concat(buffer).ToArray();length = buffer.Length;//Array.Clear(temgBufferArr,0,temgBufferArr.Length);tempBufferArr = new byte[0];}m_ReceiveStream.Write(buffer, 0, length);m_ReceiveStream.Position = 0;while (m_ReceiveStream.Length != 0){int oneMsgLength = (int) BitConverter.ToUInt32(buffer,(int) m_ReceiveStream.Position + OFFSET_SIZE + BYTE_SIZE);if (length >= OFFSET_SIZE + BYTE_SIZE + BYTE_SIZE + oneMsgLength){int oneBufferLength = OFFSET_SIZE + BYTE_SIZE + BYTE_SIZE + oneMsgLength;byte[] oneMsgBuffer = new byte[oneBufferLength];m_ReceiveStream.Read(oneMsgBuffer, 0, oneBufferLength);try{ReceivePacket(oneMsgBuffer);}catch (Exception e){//do nothing 不处理业务层异常逻辑}}else{//Debug.LogWarning($"---<<< 本次消息包长度不够,暂不处理!");int leftLength = length - (int) m_ReceiveStream.Position;tempBufferArr = new byte[leftLength];Buffer.BlockCopy(buffer, (int) m_ReceiveStream.Position, tempBufferArr, 0, leftLength);break;}if (m_ReceiveStream.Position == m_ReceiveStream.Length){break;}}}else{Debug.LogWarning($" 包大小 : {length} 消息号 : {BitConverter.ToUInt32(buffer, OFFSET_SIZE)}");//TODO }m_ReceiveStream.SetLength(0);m_ReceiveStream.Seek(0, SeekOrigin.Begin);receiveCompletedAsync = true;}private const int OFFSET_SIZE = 8;private const int BYTE_SIZE = 4;private byte[] msgIDBytes = new byte[BYTE_SIZE];private byte[] msgLenBytes = new byte[BYTE_SIZE];public void ReceivePacket(byte[] readBuff){for (int i = 0; i < BYTE_SIZE; i++){msgIDBytes[i] = readBuff[OFFSET_SIZE + i];}uint msgID = BitConverter.ToUInt32(msgIDBytes, 0);for (int i = 0; i < BYTE_SIZE; i++){msgLenBytes[i] = readBuff[OFFSET_SIZE + BYTE_SIZE + i];}uint msgLen = BitConverter.ToUInt32(msgLenBytes, 0);if (msgID != 100006)Debug.Log("接收消息 :" + msgID + " 消息 :" + msgLen);IMessage rspPacket = null;if (msgID == (uint) ENetworkMessage.Net_SyncEntityID|| msgID == (uint) ENetworkMessage.Net_HeartBeat|| msgID == (uint) ENetworkMessage.Net_HadConnect){//特殊处理,不处理回包仅转发}else{int StartIdx = OFFSET_SIZE + BYTE_SIZE + BYTE_SIZE;int PacketSize = readBuff.Length - StartIdx;// 长度校验if (StartIdx + msgLen != readBuff.Length){Debug.LogError($"pack size error! msgID : {msgID} StartIdx : {StartIdx} msgLen : {msgLen} BuffLegnth : {readBuff.Length}");return;}rspPacket = GameServerMgr.DecodeMsg((MsgTile) msgID, StartIdx, PacketSize, readBuff);if (rspPacket == null){Debug.LogWarning(" Unpack Error! MsgID :" + msgID);return;}}DispatchMsgData<object>((int) msgID, rspPacket);}public void SendMsg(MsgTile networkMessage, byte[] packet){if (m_Socket == null){return;}//判断Websocket状态if (!m_Socket.Connected){return;}byte[] offset1 = MiniConverter.IntToBytes(2 + 2 + 4 + 4 + packet.Length);byte[] offset2 = MiniConverter.IntToBytes(1);byte[] offset3 = MiniConverter.IntToBytes(2);byte[] networkMessageBytes = MiniConverter.IntToBytes((int) networkMessage);byte[] offset4 = networkMessageBytes;byte[] offset5 = MiniConverter.IntToBytes(networkMessageBytes.Length);byte[] sendBuffer = new byte[4 + 2 + 2 + 4 + 4 + (int) packet.Length];Array.Copy(offset1, 0, sendBuffer, 0, 4);Array.Copy(offset2, 0, sendBuffer, 4, 2);Array.Copy(offset3, 0, sendBuffer, 6, 2);Array.Copy(offset4, 0, sendBuffer, 8, 4);Array.Copy(offset5, 0, sendBuffer, 12, 4);Array.Copy(packet, 0, sendBuffer, 16, packet.Length);Debug.Log("发送消息 : " + (int) networkMessage + " 包长度 :" + packet.Length);lock (OutcomingMessageLock){m_SendQueue.Enqueue(sendBuffer);}}private Dictionary<int, IEventInfo> m_msgMap = new Dictionary<int, IEventInfo>(); //消息集合public void OnMsg<T>(int msgkey, UnityAction<T> callbackFun){if (m_msgMap.ContainsKey(msgkey)){(m_msgMap[msgkey] as EventInfo<T>).actions += callbackFun;}else{m_msgMap.Add(msgkey, new EventInfo<T>(callbackFun));}}public void OnRemoveMsg<T>(int msgkey, UnityAction<T> callbackFun){if (m_msgMap.ContainsKey(msgkey)){(m_msgMap[msgkey] as EventInfo<T>).actions -= callbackFun;}}public void DispatchMsgData<T>(int msgID, T info){if (m_msgMap.ContainsKey(msgID)){(m_msgMap[msgID] as EventInfo<T>).actions?.Invoke(info);}else{if (msgID != 100006)Debug.LogWarning(" No msg regiter ! id :" + msgID);}}public void doSyncEntityID(){byte[] b = MiniConverter.IntToBytes(PlayerData.EntityID);byte[] sendBuffer = new byte[4];Array.Copy(b, 0, sendBuffer, 0, b.Length);SendMsg((MsgTile) ENetworkMessage.Net_SyncEntityID, sendBuffer);}public void OnSyncEntityID(object obj){EventMgr.GetInstance().DispatchEvent(WSEvent.HandShake);}//发送心跳包public void OnHeartBeat(object obj){byte[] b = MiniConverter.IntToBytes(PlayerData.EntityID);byte[] sendBuffer = new byte[4];Array.Copy(b, 0, sendBuffer, 0, b.Length);SendMsg((MsgTile) ENetworkMessage.Net_HeartBeat, sendBuffer);}//该用户已经在登录中public void OnNetHadConnect(object obj){UICommon.GetInstance().ShowDialogYes("提示", "该用户正在登录中!");}public void OnClose(){UICommon.GetInstance().ShowDialogYesAndNo("提示", "服务器已经断开连接!",() => { SceneMgr.GetInstance().EnterLoginScene(); },() => { SceneMgr.GetInstance().EnterLoginScene(); },null,null);}public void CloseSocket(){if (m_Socket != null && m_Socket.Connected){m_IsConnected = false;m_ReceiveQueue.Clear();m_ReceiveQueue.Clear();m_Socket?.Shutdown(SocketShutdown.Both);m_Socket?.Close();m_Socket?.Dispose();m_ReceiveStream?.Close();m_ReceiveStream?.Dispose();sendThread.Abort();receiveThread.Abort();sendCompletedAsync = true;receiveCompletedAsync = true;m_readWritePool.Clear();}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/31797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2.XSS-存储型

储存型XSS 或持久型 XSS 交互的数据会被存在在数据库里面,永久性存储,具有很强的稳定性。 在留言板里面进行测试一下是否有做过滤 "<>?&66666点击提交 查看元素代码&#xff0c;已经提交完成&#xff0c;并且没有做任何的过滤措施 接下来写一个javascrip…

select实现超时保护机制

1、使用channel优雅地关闭服务 package mainimport ("context""fmt""net/http""os""os/signal""syscall""time" )func IndexHandler(w http.ResponseWriter, r *http.Request) {if r.Method ! http.Me…

软件工程学系统设计

一、概述 软件设计阶段用比较抽象概括的方式确定目标系统如何完成预定的任务&#xff0c;即确定系统的物理模型。 回答系统 “做什么”。 软件设计是将需求转化为最终产品的唯一途径&#xff0c;是后续开发和维护工作的基础。 1、软件设计过程 从工程管理角度&#xff0c;…

AI 语录(一)

Midjourney创始人谈AI Midjourney创始人关于AI的一些看法给了我新的输入&#xff0c;AI越来越智能化给人了一种脊背发凉的感觉&#xff0c;但是从新水源和冲浪板的角度看&#xff0c;学习驾驭AI和AI共存的方式更加积极也更加合理。 "当计算机比99%的人善于视觉想象的时候…

AI网络爬虫:用deepseek批量提取gptstore.ai上的gpts数据

网站首页&#xff1a;https://gptstore.ai/gpts/categories/finance 翻页规律如下&#xff1a; https://gptstore.ai/_next/data/S9vKNrHo4K82xWjuXpw-O/en/gpts/categories/finance.json?slugfinance&page2 https://gptstore.ai/_next/data/S9vKNrHo4K82xWjuXpw-O/en/g…

Python | Leetcode Python题解之第172题阶乘后的零

题目&#xff1a; 题解&#xff1a; class Solution:def trailingZeroes(self, n: int) -> int:ans 0while n:n // 5ans nreturn ans

目标检测经典算法及其应用

目标检测是计算机视觉领域中的一项核心技术,它旨在让计算机能够像人眼一样识别和定位图像或视频中的物体。具体来说,目标检测不仅需要识别出图像或视频中有哪些对象,还要确定它们在图像或视频中的位置(通常以边界框的形式表示)以及它们的类别。 目标检测的基本框架通常包括…

MySQL与SQLite的区别

MySQL 和 SQLite 是两种常见的关系型数据库管理系统&#xff0c;但它们在设计目标、架构和使用场景上有显著的区别。以下是它们的主要区别&#xff1a; 1. 架构与模式 MySQL&#xff1a; 客户端/服务器模式&#xff1a;MySQL 采用 C/S 架构&#xff0c;数据库服务器运行在一…

【六】【QT开发应用】信号和信号槽的五种写法

第一种写法 第二种写法 第三种写法 第四种写法 第五种写法 完整代码 mainwindow.h #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow>QT_BEGIN_NAMESPACE namespace Ui { class MainWindow; } QT_END_NAMESPACEclass MainWindow : public QMainWindow …

Golang | Leetcode Golang题解之第172题阶乘后的零

题目&#xff1a; 题解&#xff1a; func trailingZeroes(n int) (ans int) {for n > 0 {n / 5ans n}return }

用全志T113做了块多功能卡片电脑,成本只要60块

FunnyPi-T113是一款基于全志T113-S3/D1S处理器的完全开源多功能开发板&#xff0c;设计FunnyPi最初的目的是想借此T113卡片电脑来满足日常学习&#xff0c;并结合T113高效能和低功耗的特点&#xff0c;来满足像语音助手&#xff0c;智能家居屏幕、桌面摆件屏、博客服务器等嵌入…

React@16.x(35)动画(下)封装动画组件需要注意的问题

目录 1&#xff0c;封装举例2&#xff0c;问题2.1&#xff0c;timeout2.2&#xff0c;配合 SwitchTransition / TransitionGroup 接上篇文章 React动画&#xff08;中&#xff09; 1&#xff0c;封装举例 封装一个渐入渐出效果的动画组件 import { CSSTransition } from &qu…

Maven笔记(更新中)

一、Maven简介 Maven是一款为Java项目构建,依赖管理的工具(软件),使用Maven可以自动化构建,测试,打包和发布项目,大大提高了开发效率和质量 Maven主要作用理解 依赖管理 Maven可以管理项目的依赖,包括自动下载所需依赖库,自动下载依赖所需的依赖并且保证版本没有冲突,依赖版…

(五)数据采集与处理基础练习题(17道选择题)

本文整理了数据采集与处理基础相关的练习题&#xff0c;共17道&#xff0c;适用于想巩固理论基础的同学。来源&#xff1a;如荷学数据科学题库&#xff08;CDA二级-第二三四章&#xff09;。 1&#xff09; 2&#xff09; 3&#xff09; 4&#xff09; 5&#xff09; 6&#x…

电脑系统重装怎么操作?分享四个win10重装系统方法

“我遇到了一些笔记本电脑的问题&#xff0c;别人告诉我解决这个问题需要重新安装Win10电脑系统。但我不记得我把光盘放在哪里了&#xff0c;我能否在不丢失文件的情况下重新安装操作系统&#xff1f;电脑系统重装怎么操作&#xff1f;”虽然电脑自带系统中有多种方法可供选择&…

工业边缘计算网关

1 介绍 HINETG系列边缘计算网关&#xff08;Linux操作系统&#xff09;&#xff0c;是华辰智通的—款面向工业现场设备接入、数据采集、设备监控的工业级边缘计算网关。采用ARM Cortex-A7 800MHz高性能CPU,拥有以太网、串口、CAN口、IO口等丰富的接口&#xff0c;支持以太网、…

项目实践---Windows11中安装Zookeeper/Hadoop/Hive的部分问题解决

一.Hadoop与Hive兼容版本选择 正常来说&#xff0c;Hadoop与Hive版本不兼容会出现很多问题导致hive安装失败&#xff0c;可以先确定HIve的版本&#xff0c;比如&#xff1a;要用Hive3.1.2版本&#xff0c;该如何确定使用Hadoop的版本呢&#xff0c;需要我们在hive源码中找到对…

开发 Golang 项目的 Docker 化案例

开发 Golang 项目的 Docker 化案例 在这个案例中&#xff0c;我们将展示如何使用 Docker 容器化一个简单的 Golang Web 应用。我们将创建一个基于 Go 的 Hello World 应用&#xff0c;并使用 Docker 和 Docker Compose 管理容器化环境。 1. 创建 Golang Web 应用 首先&#…

C++ 最小生成树

描述 一个有 n 户人家的村庄&#xff0c;有 m 条路相互连接着。村里现在要修路&#xff0c;每条路都有一个成本价格&#xff0c;现在请你帮忙计算下&#xff0c;最少需要花费多少钱&#xff0c;就能让这 n 户人家连接起来。cost 为一个二维数组&#xff0c;每个元素是一个长度…

数据挖掘的基本介绍以及Python、pandas的基本应用

1. 介绍与准备 1.1 数据挖掘是什么&#xff1f; 1.1.1 什么是数据挖掘 数据挖掘是寻找数据中隐含的知识并用于产生商业价值的过程。它通过分析大量数据&#xff0c;揭示其中的模式和关系&#xff0c;帮助企业做出更明智的决策。 1.1.2 为什么要做数据挖掘&#xff1f; 数据挖…