前言
之前我们学习过socket之tcp通信,知道了使用tcp建立连接的一系列操作,并通过write与read函数能让客户端与服务端进行通信,但是tcp是面向字节流的,有可能我们write时只写入了部分数据,此时另一端就来read了,可能会导致读取的数据不完整的问题。这就引入到tcp通信的一个重要操作了——序列化与反序列化!
一、使用结构体进行传输
在我们之前学习的代码中,一般想传递很多数据,都会使用到结构体。
比如pthread_create()需要传递函数地址,你如果需要传递很多参数,就得创建一个结构体,将他们组织起来,传递这个结构体指针给到pthread_create(),因为他第四个参数只能接受void*指针。不是可变参数包。
如下,制作一个网络计算器,我们客户端创建结构体数据并发送
服务端对数据进行接受
这样服务器就可以接受到数据并做相应的处理了,这里我们显示出来表示接收到了数据,处理起来也就很简单了。
结构体代码链接
二、自定义序列化与反序列化
tcp中的数据使用结构体传输在网络压力很大,或者传输内容很多时,就会发生问题。同样在不同的编译环境下,结构体内存对齐也不一致,并且有可能客户端是使用其他代码进行编写,C/C++的结构体也不会适合。
- 如果我们将传输的数据统一处理成字符串,让字符串在tcp中进行传输,同时设置一些固定的字段,代表当前数据包的结束,是不是能解决上述问题呢?
- 这就是数据序列化后进行传输,接收方将数据进行反序列化后提取再做处理。序列化的本质就是对字符串作处理。
一样的计算器,一样的需要传递两个整形数字与一个字符。
- 如果我们将该结构体转为字符串,如 "x oper y\n" ,这样我们就可以通过读取空格来表示取到的整形,字符,整形,如果一直没读到"\n"证明当前数据包还没读完,读到"\n"证明取到了一个完整的数据包,这样就可以完美解决问题了。
- 但是我们这个写法没有普适性,因为如果后面如果结构体内容比较复杂,带了一个"\n",就会导致序列化代码需要重新设计。如果在前面添加一个报头len,代表后面内容的长度,这样无论后面代码是什么,都可以统一看待处理。
- 于是,我们得到如下的序列化公式。
"len\nx op y"
这里len后面的"\n"代表我们读取到了len数据,后面就是真正要传输并处理的数据了。
了解了这些,那么我们进行网络计算器的序列化与反序列化也就不难了,代码如下
Protocol.hpp 自定义协议
#pragma once#include <iostream>
#include <memory>
using namespace std;namespace Protocol
{const string ProtSep = " ";const string LineBreakSep = "\n";// message 现在我要给他头部添加len\n 尾部添加\n// 如"x op y"-> "len\nx op y\n" "result code"-> "len\nresult code\n"string Encode(string &message){string len = to_string(message.size());string package = len + LineBreakSep + message + LineBreakSep;return package;}// 从一个完整的package进行解析,"len\nx op y\n"->"x op y"bool Decode(string &package, string *message){int left = package.find(LineBreakSep);if (left == string::npos)return false;int len = stoi(package.substr(0, left));int total = left + len + 2 * LineBreakSep.size();if (package.size() < total)return false;// 到这里至少有一个完整的报文 截取出来一个报文写入message,然后删除该报文*message = package.substr(left + LineBreakSep.size(), len);package.erase(0, total);return true;}class Request{public:Request() : _data_x(0), _data_y(0), _oper(0){}Request(int x, int y, char op): _data_x(x), _data_y(y), _oper(op){}void Debug(){cout << "_data_x: " << _data_x << endl;cout << "_data_y: " << _data_y << endl;cout << "_oper: " << _oper << endl;}void Inc(){_data_x++;_data_y++;}// 序列化 变成这种字符串"x op y"void Serialize(string *out){*out = to_string(_data_x) + ProtSep + _oper + ProtSep + to_string(_data_y);}// 反序列化 "x op y"字符串分别取出bool Deserialize(string &in){size_t left = in.find(ProtSep);if (left == string::npos)return false;size_t right = in.rfind(ProtSep);if (right == string::npos)return false;_data_x = stoi(in.substr(0, left));_data_y = stoi(in.substr(right + ProtSep.size()));string op = in.substr(left + ProtSep.size(), right - left - ProtSep.size());if (op.size() != 1)return false;_oper = op[0];return true;}int Getx(){return _data_x;}int Gety(){return _data_y;}char Getop(){return _oper;}string To_string(){return to_string(_data_x)+" "+_oper+ " "+to_string(_data_y)+" = ";}private:// x oper y 比如 10 + 20int _data_x;int _data_y;char _oper; // 符号};class Response{public:Response() : _result(0), _code(0){}Response(int result, int code): _result(result), _code(code){}void Serialize(string *out){//_result _code*out = to_string(_result) + ProtSep + to_string(_code);}bool Deserialize(string &in){size_t pos = in.find(ProtSep);if (pos == string::npos)return false;_result = stoi(in.substr(0, pos));_code = stoi(in.substr(pos + ProtSep.size()));return true;}int Getresult(){return _result;}int Getcode(){return _code;}void Setresult(int result){_result = result;}void Setcode(int code){_code = code;}private:int _result; // 运算结果int _code; // 运算状态};// 工厂模式class Factory{public:shared_ptr<Request> BuildRequest(){return make_shared<Request>();}shared_ptr<Request> BuildRequest(int x, int y, char op){return make_shared<Request>(x, y, op);}shared_ptr<Response> BuildResponse(){return make_shared<Response>();}shared_ptr<Response> BuildResponse(int result, int code){return make_shared<Response>(result, code);}};
}
Socket.hpp 套接字函数封装
#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstring>
#include <unistd.h>
using namespace std;
namespace Net_Work
{static const int default_backlog = 5;static const int default_sockfd = -1;using namespace std;enum{SocketError = 1,BindError,ListenError,ConnectError,};// 封装套接字接口基类class Socket{public:// 封装了socket相关方法virtual ~Socket() {}virtual void CreateSocket() = 0;virtual void BindSocket(uint16_t port) = 0;virtual void ListenSocket(int backlog) = 0;virtual bool ConnectSocket(string &serverip, uint16_t serverport) = 0;virtual Socket *AcceptSocket(string *peerip, uint16_t *peerport) = 0;virtual int GetSockFd() = 0;virtual void SetSockFd(int sockfd) = 0;virtual void CloseSocket() = 0;virtual bool Recv(string *buff, int size) = 0;virtual void Send(string &send_string) = 0;// 方法的集中在一起使用public:void BuildListenSocket(uint16_t port, int backlog = default_backlog){CreateSocket();BindSocket(port);ListenSocket(backlog);}bool BuildConnectSocket(string &serverip, uint16_t serverport){CreateSocket();return ConnectSocket(serverip, serverport);}void BuildNormalSocket(int sockfd){SetSockFd(sockfd);}};class TcpSocket : public Socket{public:TcpSocket(int sockfd = default_sockfd): _sockfd(sockfd){}~TcpSocket() {}void CreateSocket() override{_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0)exit(SocketError);}void BindSocket(uint16_t port) override{int opt = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));if (n < 0)exit(BindError);}void ListenSocket(int backlog) override{int n = listen(_sockfd, backlog);if (n < 0)exit(ListenError);}bool ConnectSocket(string &serverip, uint16_t serverport) override{struct sockaddr_in addr;memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET;addr.sin_port = htons(serverport);// addr.sin_addr.s_addr = inet_addr(serverip.c_str());inet_pton(AF_INET, serverip.c_str(), &addr.sin_addr);int n = connect(_sockfd, (sockaddr *)&addr, sizeof(addr));if (n == 0)return true;return false;}Socket *AcceptSocket(string *peerip, uint16_t *peerport) override{struct sockaddr_in addr;socklen_t len = sizeof(addr);int newsockfd = accept(_sockfd, (sockaddr *)&addr, &len);if (newsockfd < 0)return nullptr;// *peerip = inet_ntoa(addr.sin_addr);// INET_ADDRSTRLEN 是一个定义在头文件中的宏,表示 IPv4 地址的最大长度char ip_str[INET_ADDRSTRLEN];inet_ntop(AF_INET, &addr.sin_addr, ip_str, INET_ADDRSTRLEN);*peerip = ip_str;*peerport = ntohs(addr.sin_port);Socket *s = new TcpSocket(newsockfd);return s;}int GetSockFd() override{return _sockfd;}void SetSockFd(int sockfd) override{_sockfd = sockfd;}void CloseSocket() override{if (_sockfd > default_sockfd)close(_sockfd);}bool Recv(string *buff, int size) override{char inbuffer[size];ssize_t n = recv(_sockfd, inbuffer, size - 1, 0);if (n > 0){inbuffer[n] = 0;*buff += inbuffer;return true;}elsereturn false;}void Send(string &send_string) override{send(_sockfd, send_string.c_str(),send_string.size(),0);}private:int _sockfd;string _ip;uint16_t _port;};
}
Calculate.hpp 计算业务封装
#pragma once#include <iostream>
#include <memory>
#include "Protocol.hpp"enum ErrCode
{Sucess = 0,DivZeroErr,ModZeroErr,UnKnowOper,
};//计算业务
class Calculate
{
public:Calculate(){}shared_ptr<Protocol::Response> Cal(shared_ptr<Protocol::Request> req){shared_ptr<Protocol::Response> resp = factory.BuildResponse();resp->Setcode(Sucess);switch (req->Getop()){case '+':resp->Setresult(req->Getx() + req->Gety());break;case '-':resp->Setresult(req->Getx() - req->Gety());break;case '*':resp->Setresult(req->Getx() * req->Gety());break;case '/':{if (req->Gety() == 0)resp->Setcode(DivZeroErr);elseresp->Setresult(req->Getx() / req->Gety());}break;case '%':{if (req->Gety() == 0)resp->Setcode(ModZeroErr);elseresp->Setresult(req->Getx() % req->Gety());}break;default:resp->Setcode(UnKnowOper);break;}return resp;}private:Protocol::Factory factory;
};
TcpServer.hpp tcp服务端封装
#pragma once#include "Protocol.hpp"
#include "Socket.hpp"
#include <thread>
#include <functional>using func_t = function<string(string &inbufferstream, bool *error_code)>;class TcpServer
{
public:TcpServer(uint16_t port, func_t hander_request): _port(port), _listen_socket(new Net_Work::TcpSocket()), _handler_request(hander_request){_listen_socket->BuildListenSocket(port);}void ThreadRun(Net_Work::Socket *s){// cout<< " in HandlerRequest !"<<endl;string inbufferstream;while (true){bool resultcode = true;// 1.读取报文if (!s->Recv(&inbufferstream, 1024))break;// 2.报文处理string send_string = _handler_request(inbufferstream, &resultcode);if (resultcode){// 发送数据if (!send_string.empty()){s->Send(send_string);}}else{break;}}s->CloseSocket();delete s;}void Loop(){while (true){string peerip;uint16_t peerport;Net_Work::Socket *newsockfd = _listen_socket->AcceptSocket(&peerip, &peerport);if (newsockfd == nullptr)continue;cout << "获取一个新链接, sockfd: " << newsockfd->GetSockFd() << " ,client info: " << peerip << ":" << peerport << endl;thread t1(&TcpServer::ThreadRun, this, newsockfd);t1.detach();}}~TcpServer(){delete _listen_socket;}private:int _port;Net_Work::Socket *_listen_socket;func_t _handler_request;
};
Main.cc 服务端实现
#include "TcpServer.hpp"
#include "Calculate.hpp"
#include <unistd.h>
#include <memory>string HandlerRequest(string &inbufferstream, bool *error_code)
{Calculate calculate;// 1.创建请求对象unique_ptr<Protocol::Factory> factory(new Protocol::Factory());shared_ptr<Protocol::Request> req = factory->BuildRequest();string total_resp_string;// 2.判断字节流是否读取到了一个完整的报文 把报文读完string message;while (Protocol::Decode(inbufferstream, &message)){// 3.这里已经读到完整的报文,需要进行反序列化 反序列化失败证明是非法请求if (!req->Deserialize(message)){// 反序列化失败 不能忍受,因为客户端的请求是不合法的*error_code = false;return string();}// 4.业务处理shared_ptr<Protocol::Response> resp = calculate.Cal(req);// 5.序列化string send_string;resp->Serialize(&send_string);// 6.添加报头send_string = Protocol::Encode(send_string);total_resp_string+=send_string;}return total_resp_string;
}int main(int argc, char *argv[])
{if (argc != 2){cout << "Usage : " << argv[0] << " port" << endl;return 0;}uint16_t localport = stoi(argv[1]);unique_ptr<TcpServer> tsvr(new TcpServer(localport, HandlerRequest));tsvr->Loop();return 0;
}
TcpClient.cc 服务端实现
#include <iostream>
#include <string>
#include <unistd.h>
#include <ctime>
#include <cstdlib>
#include "Socket.hpp"
#include "Protocol.hpp"using namespace std;
int main(int argc, char *argv[])
{if (argc != 3){cout << "Usage : " << argv[0] << " serverip serverport" << endl;return 0;}string serverip = argv[1];uint16_t serverport = stoi(argv[2]);// 创建套接字对象Net_Work::Socket *s = new Net_Work::TcpSocket();if (!s->BuildConnectSocket(serverip, serverport)){cerr << "connect [" << serverip << ":" << serverport << "] failed" << endl;return Net_Work::ConnectError;}cout << "connect [" << serverip << ":" << serverport << "] success" << endl;// 创建工厂unique_ptr<Protocol::Factory> factory(new Protocol::Factory());srand(time(nullptr) ^ getpid());const string opers = "+-*/%^&=";while (true){// 1.构建一个请求int x = rand() % 100;usleep(rand() % 5555);int y = rand() % 100;int oper = opers[rand() % opers.size()];shared_ptr<Protocol::Request> req = factory->BuildRequest(x, y, oper);// 2.序列化该请求string requeststr;req->Serialize(&requeststr);// 3.添加报头信息requeststr = Protocol::Encode(requeststr);// 4.发送请求s->Send(requeststr);string message;while (true){// 5.读取响应string responsestr;s->Recv(&responsestr, 1024);// 6.解析报文 收到的"len\n result code" 现在开始解析if(!Protocol::Decode(responsestr,&message))continue;// 7.拆分数据 目前是"result code" 现在开始写入结构体shared_ptr<Protocol::Response> resp = factory->BuildResponse();resp->Deserialize(message);// 8.resp已经得到的数据cout<<req->To_string()<<resp->Getresult()<<"\tcode: "<<resp->Getcode()<<endl;break;}sleep(1);}s->CloseSocket();return 0;
}
Makefile
.PHONY:all
all:tcpserver tcpclienttcpserver:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
tcpclient:TcpClient.ccg++ -o $@ $^ -std=c++11 -lpthread.PHONY:clean
clean:rm -f tcpclient tcpserver
代码链接
运行结果吐下