TCP客户端
TcpConnect.h
// Fill out your copyright notice in the Description page of Project Settings.#pragma once#include "CoreMinimal.h"
#include "Common/UdpSocketReceiver.h"
#include "GameFramework/Actor.h"DECLARE_DELEGATE_TwoParams(FOnServerResponseReceived, const int32&, bool);class FTcpConnect : public FRunnable
{
public:// Sets default values for this actor's propertiesFTcpConnect();virtual ~FTcpConnect(){if (ReceiveThread != nullptr){ReceiveThread->Kill(true);delete ReceiveThread;}}// virtual FSingleThreadRunnable* GetSingleThreadInterface() override// {// return this;// }
public:// Called every framevoid ConnectToServer(FString ServerAddress, const int32 ServerPort);void SendMessage(const FString& Message);FOnServerResponseReceived& OnDataReceived(){return OnServerResponseReceived;}virtual bool Init() override;virtual void Stop() override;protected:virtual uint32 Run() override;virtual void Exit() override;private:FSocket* Socket;TSharedPtr<FInternetAddr> RemoteAddr;FIPv4Endpoint LocalEndpoint;TArray<uint8> ReceivedData;FUdpSocketReceiver* UDPReceiver;bool bIsReceiving;FRunnableThread* ReceiveThread;int64 StartMs;FOnServerResponseReceived OnServerResponseReceived;
};
TcpConnect.cpp
// Fill out your copyright notice in the Description page of Project Settings.#include "Subsystem/TcpConnect.h"// Sets default values
FTcpConnect::FTcpConnect()
{
}void FTcpConnect::ConnectToServer(FString ServerAddress, const int32 ServerPort)
{// Socket = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateSocket(NAME_Stream, TEXT("default"), false);// RemoteAddr = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateInternetAddr();bool bValid;RemoteAddr->SetIp(*ServerAddress, bValid);RemoteAddr->SetPort(ServerPort);if (Socket->Connect(*RemoteAddr)){UE_LOG(LogTemp, Display, TEXT("Connected to server"));// bIsReceiving = true;ReceiveThread = FRunnableThread::Create(this, TEXT("ReceiveThread"), 128 * 1024);}else{UE_LOG(LogTemp, Error, TEXT("Failed to connect to server"));}
}void FTcpConnect::SendMessage(const FString& Message)
{if (Socket){const TCHAR* MessageData = *Message;int32 BytesSent = 0;StartMs = FDateTime::Now().ToUnixTimestamp() * 1000.0 + FDateTime::Now().GetMillisecond();UE_LOG(LogTemp, Warning, TEXT(" start ms %d"), StartMs);Socket->Send((uint8*)TCHAR_TO_UTF8(MessageData), FCString::Strlen(MessageData), BytesSent);}
}// FOnServerResponseReceived& FTcpConnect::OnDataReceived()
//
// {
// // check(ReceiveThread == nullptr);
// return OnServerResponseReceived;
// }bool FTcpConnect::Init()
{if (ReceiveThread && Socket){return true;}return false;
}void FTcpConnect::Stop()
{bIsReceiving = false;if (ReceiveThread){ReceiveThread->WaitForCompletion();// ReceiveThread.re}if (Socket){Socket->Close();Socket = nullptr;// Socket.Reset();}
}uint32 FTcpConnect::Run()
{while (bIsReceiving){uint8 Data[1024];int32 BytesReceived = 0;if (Socket->Recv(Data, sizeof(Data), BytesReceived, ESocketReceiveFlags::None)){if (BytesReceived > 0){FString Message = FString(UTF8_TO_TCHAR((const char*)Data));UE_LOG(LogTemp, Warning, TEXT(" message %s"), *Message)int64 EndMs = FDateTime::Now().ToUnixTimestamp() * 1000.0 + FDateTime::Now().GetMillisecond();UE_LOG(LogTemp, Warning, TEXT(" start ms %d %d"), EndMs, EndMs- StartMs);auto r = OnServerResponseReceived.ExecuteIfBound(EndMs - StartMs, true);bIsReceiving = false;}}FPlatformProcess::Sleep(0.05f);}return 0;
}void FTcpConnect::Exit()
{// FRunnable::Exit();
}
tcp server
// Fill out your copyright notice in the Description page of Project Settings.#pragma once#include "CoreMinimal.h"
#include "HAL/Runnable.h"class FSocket;
class FInternetAddr;/*** */
class MYPROJECT2_API FTcpServer : public FRunnable
{
public:FTcpServer();~FTcpServer();// 初始化服务器virtual bool Init() override;// 开始监听连接bool StartListening(FString IpAddress, int32 Port);// 停止监听连接void StopListening();// FRunnable 接口virtual uint32 Run() override;virtual void Stop() override;void HandleTextMessage(const FString& Message);private:// 处理客户端连接void HandleConnection(FSocket* NewClientSocket, const TSharedRef<FInternetAddr>& ClientAddress);FString StringFromBinaryArray(const TArray<uint8>& BinaryArray);class FSocket* serverSocket;FSocket* ListenSocket;FRunnableThread* ServerThread;FThreadSafeBool bIsStopping;TMap<FSocket*, FString> ClientDataMap; // 用于存储每个客户端的数据};
cpp
// Fill out your copyright notice in the Description page of Project Settings.#include "Net/TcpServer.h"#include "Common/TcpSocketBuilder.h"
#include "Interfaces/IPv4/IPv4Endpoint.h"FTcpServer::FTcpServer() : ListenSocket(nullptr), ServerThread(nullptr), bIsStopping(false)
{ServerThread = FRunnableThread::Create(this, TEXT("TcpServerThread"));
}FTcpServer::~FTcpServer()
{StopListening();
}bool FTcpServer::Init()
{// 初始化网络模块// if (!ISocketSubsystem::Init())// {// UE_LOG(LogTemp, Error, TEXT("Failed to initialize socket subsystem."));// return false;// }return true;
}bool FTcpServer::StartListening(FString IpAddress, int32 Port)
{FString ServerIP = IpAddress;FIPv4Address ServerAddr;if (!FIPv4Address::Parse(ServerIP, ServerAddr)){UE_LOG(LogTemp, Error, TEXT("Server Ip %s is illegal"), *ServerIP);}ListenSocket = FTcpSocketBuilder(TEXT("Socket Listener")).AsReusable().AsBlocking().BoundToAddress(ServerAddr).BoundToPort(Port).Listening(8).WithReceiveBufferSize(1024).WithSendBufferSize(1024);if (ListenSocket){UE_LOG(LogTemp, Warning, TEXT("Server Create Success!"), *ServerIP);// SocketCreateDelegate.Broadcast(true);// GetWorld()->GetTimerManager().SetTimer(ConnectCheckHandler, this, &ATCPServer::ConnectCheck, 1, true);return false;}else{UE_LOG(LogTemp, Error, TEXT("Server Create Failed!"));// SocketCreateDelegate.Broadcast(false);}return false;
}void FTcpServer::StopListening()
{if (ListenSocket){ListenSocket->Close();ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->DestroySocket(ListenSocket);ListenSocket = nullptr;}if (ServerThread){ServerThread->WaitForCompletion();delete ServerThread;ServerThread = nullptr;}
}uint32 FTcpServer::Run()
{ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);if (!SocketSubsystem){UE_LOG(LogTemp, Error, TEXT("Socket subsystem not available."));return 1;}while (!bIsStopping){if (!ListenSocket) continue;TSharedRef<FInternetAddr> ClientAddress = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateInternetAddr();FSocket* NewClientSocket = ListenSocket->Accept(*ClientAddress, TEXT("MyTcpServer Connection"));UE_LOG(LogTemp, Log, TEXT("Run"));if (NewClientSocket){// 新客户端连接处理HandleConnection(NewClientSocket, ClientAddress);}FPlatformProcess::Sleep(0.01);}return 0;
}void FTcpServer::Stop()
{UE_LOG(LogTemp, Warning, TEXT("St opped"));bIsStopping = true;
}void FTcpServer::HandleTextMessage(const FString& Message)
{UE_LOG(LogTemp, Warning, TEXT("Received message: %s"), *Message);
}void FTcpServer::HandleConnection(FSocket* NewClientSocket, const TSharedRef<FInternetAddr>& ClientAddress)
{// 在这里处理新客户端连接的逻辑UE_LOG(LogTemp, Warning, TEXT("Client connected: %s"), *ClientAddress->ToString(true));// 接收和处理消息while (NewClientSocket && !bIsStopping){TArray<uint8> ReceivedData;uint32 Size;// 接收数据while (NewClientSocket->HasPendingData(Size)){ReceivedData.Init(0, FMath::Min(Size, 65507u));int32 Read = 0;NewClientSocket->Recv(ReceivedData.GetData(), ReceivedData.Num(), Read);// 处理接收到的数据(这里假设消息以'\n'分隔)FString ReceivedString = StringFromBinaryArray(ReceivedData);TArray<FString> Messages;ReceivedString.ParseIntoArray(Messages, TEXT("\n"), true);for (const FString& Message : Messages){// 处理文本消息HandleTextMessage(Message);}}// 睡眠一段时间,以避免空循环造成CPU过度使用FPlatformProcess::Sleep(0.01);}// 断开客户端连接// UE_LOG(LogTemp, Warning, TEXT("Client disconnected: %s"), *ClientAddress->ToString(true));// NewClientSocket->Close();// ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->DestroySocket(NewClientSocket);}FString FTcpServer::StringFromBinaryArray(const TArray<uint8>& BinaryArray)
{return FString(ANSI_TO_TCHAR(reinterpret_cast<const char*>(BinaryArray.GetData())));
}
FTcpServerReceive
h
// Fill out your copyright notice in the Description page of Project Settings.#pragma once#include "CoreMinimal.h"
#include "HAL/Runnable.h"/*** */
class MYPROJECT2_API FTcpServerReceive: public FRunnable
{
public:FTcpServerReceive(FSocket* InSocket);~FTcpServerReceive();
protected:virtual bool Init() override;virtual void Stop() override;FString StringFromBinaryArray(TArray<uint8> Array);void HandleTextMessage(const FString& String);virtual uint32 Run() override;
private:FSocket* ClientSocket;FRunnableThread* ServerReceiveThread;bool bIsStopping;
};
cpp
// Fill out your copyright notice in the Description page of Project Settings.#include "Net/FTcpServerReceive.h"#include "Sockets.h"
#include "SocketSubsystem.h"FTcpServerReceive::FTcpServerReceive(FSocket* InSocket)
{ClientSocket = InSocket;ServerReceiveThread = FRunnableThread::Create(this, TEXT("ServerReceiveThread"));bIsStopping = false;
}FTcpServerReceive::~FTcpServerReceive()
{if (ClientSocket){ClientSocket->Close();ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->DestroySocket(ClientSocket);ClientSocket = nullptr;}if (ServerReceiveThread){ServerReceiveThread->WaitForCompletion();delete ServerReceiveThread;ServerReceiveThread = nullptr;}
}bool FTcpServerReceive::Init()
{if (ServerReceiveThread && ClientSocket){UE_LOG(LogTemp, Warning, TEXT(" %s start"), *FString(__FUNCTION__));return true;}return false;
}void FTcpServerReceive::Stop()
{bIsStopping = true;
}FString FTcpServerReceive::StringFromBinaryArray(TArray<uint8> BinaryArray)
{return FString(ANSI_TO_TCHAR(reinterpret_cast<const char*>(BinaryArray.GetData())));
}void FTcpServerReceive::HandleTextMessage(const FString& Message)
{UE_LOG(LogTemp, Warning, TEXT("Received message: %s"), *Message);
}uint32 FTcpServerReceive::Run()
{while (ClientSocket){TArray<uint8> ReceivedData;uint32 Size;// 接收数据while (ClientSocket->HasPendingData(Size)){ReceivedData.Init(0, FMath::Min(Size, 65507u));int32 Read = 0;ClientSocket->Recv(ReceivedData.GetData(), ReceivedData.Num(), Read);// 处理接收到的数据(这里假设消息以'\n'分隔)FString ReceivedString = StringFromBinaryArray(ReceivedData);TArray<FString> Messages;ReceivedString.ParseIntoArray(Messages, TEXT("\n"), true);for (const FString& Message : Messages){// 处理文本消息HandleTextMessage(Message);}}// 睡眠一段时间,以避免空循环造成CPU过度使用FPlatformProcess::Sleep(0.05);}return 0;
}
使用
在GameInstance
使用
void UMyGameInstance::HandleTextMessage(const FString& Message)
{UE_LOG(LogTemp, Warning, TEXT("Received message: %s"), *Message);
}bool UMyGameInstance::Connected(FSocket* Socket, const FIPv4Endpoint& FiPv4Endpoint)
{UE_LOG(LogTemp, Warning, TEXT("Connected %s"), *FiPv4Endpoint.ToString());FTcpServerReceive* ServerReceive = new FTcpServerReceive(Socket);return true;
}void UMyGameInstance::Init()
{Super::Init();FIPv4Address ServerAddr;FIPv4Address::Parse("0.0.0.0", ServerAddr);FIPv4Endpoint IPv4;IPv4.Address = ServerAddr;IPv4.Port = 6666;#if UE_SERVER/FTcpListener* TcpServer = new FTcpListener(IPv4);TcpServer->OnConnectionAccepted().BindUObject(this, &UMyGameInstance::Connected);#elseFTcpClient* TcpClient = new FTcpClient();TcpClient->ConnectToServer("192.168.1.6", 6666);FTimerHandle h;FTimerDelegate d = FTimerDelegate::CreateLambda([=](){FString sent = "aaaaaa";TcpClient->SendMessage(sent);});// GetWorld()->GetTimerManager().SetTimer(h, d, 1.0f, true, 3.0f);#endif
}FString UMyGameInstance::StringFromBinaryArray(const TArray<uint8>& BinaryArray)
{return FString(ANSI_TO_TCHAR(reinterpret_cast<const char*>(BinaryArray.GetData())));
}TSharedPtr<TArray<uint8>, ESPMode::ThreadSafe> UMyGameInstance::StringToByteArray(FString DataString)
{TArray<uint8> DataArray;FTCHARToUTF8 Converter(*DataString);DataArray.Append(reinterpret_cast<const uint8*>(Converter.Get()), Converter.Length());return MakeShared<TArray<uint8>, ESPMode::ThreadSafe>(MoveTemp(DataArray));
}