[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

在这里插入图片描述

文章目录

  • 一.网络层与传输层协议
    • sockaddr结构体继承体系(Linux体系)
    • 贯穿计算机系统的网络通信架构图示:
  • 二.实现并部署多线程并发Tcp服务器框架
    • 线程池模块
    • 序列化反序列化工具模块
    • 通信信道建立模块
    • 服务器主体模块
    • 任务回调模块(根据具体应用场景可重构)
    • Tips:DebugC++代码过程中遇到的问题记录

在这里插入图片描述

一.网络层与传输层协议

  • 网络层与传输层内置于操作系统的内核中,网络层一般使用ip协议,传输层常用协议为Tcp协议和Udp协议,Tcp协议和Udp协议拥有各自的特点和应用场景:
    在这里插入图片描述

sockaddr结构体继承体系(Linux体系)

  • sockaddr_in结构体用于存储网络通信主机进程的ip和端口号等信息
    在这里插入图片描述

贯穿计算机系统的网络通信架构图示:

在这里插入图片描述

二.实现并部署多线程并发Tcp服务器框架

小项目的完整文件的gittee链接

  • Tcp服务器架构:
    在这里插入图片描述

线程池模块

#pragma once
#include <iostream>
#include <pthread.h>
#include "log.hpp"
#include <semaphore.h>
#include <vector>
#include <cstdio>template<class T>
class RingQueue{
private:pthread_mutex_t Clock_;pthread_mutex_t Plock_;sem_t Psem_;sem_t Csem_;std::vector<T> Queue_;int Pptr_;int Cptr_;int capacity_;
public:RingQueue(int capacity = 10) : Queue_(capacity),Pptr_(0),Cptr_(0),capacity_(capacity){sem_init(&Psem_,0,capacity);sem_init(&Csem_,0,0);pthread_mutex_init(&Clock_,nullptr);pthread_mutex_init(&Plock_,nullptr);}~RingQueue(){sem_destroy(&Psem_);sem_destroy(&Csem_);pthread_mutex_destroy(&Clock_);pthread_mutex_destroy(&Plock_);}T Pop(){sem_wait(&Csem_);pthread_mutex_lock(&Clock_);T tem = Queue_[Cptr_];Cptr_++;Cptr_ %= capacity_;pthread_mutex_unlock(&Clock_);sem_post(&Psem_);return tem;}void Push(T t){sem_wait(&Psem_);pthread_mutex_lock(&Plock_);Queue_[Pptr_] = t;Pptr_++;Pptr_%= capacity_;pthread_mutex_unlock(&Plock_);sem_post(&Csem_);}
};
#pragma once
#include "sem_cp.cpp"
#include <pthread.h>
#include <iostream>
#include <string>
#include <mutex>
#include "CalTask.cpp"template<class Task>
class Thread_Pool{struct Thread_Data{int Thread_num;pthread_t tid;};
private:RingQueue<Task> Queue_;  //线程安全的环形队列std::vector<Thread_Data> thread_arr; //管理线程的容器static std::mutex lock_;            //单例锁static Thread_Pool<Task> * ptr_;    //单例指针
private:Thread_Pool(int capacity_Of_queue = 20) : Queue_(capacity_Of_queue){}Thread_Pool(const Thread_Pool<Task>& Tp) = delete;Thread_Pool<Task>& operator=(const Thread_Pool<Task> & Tp) = delete;
public:~Thread_Pool(){}//获取线程池单例-->注意C++的类模板静态成员函数需要在类体外进行定义static Thread_Pool<Task> * Getinstance();//创建多线程void Create_thread(int thread_num = 10){Thread_Data T_data;for(int i = 0 ; i < thread_num ; ++i){//注意线程池对象的this指针传递给线程pthread_create(&T_data.tid,nullptr,Routine,this);T_data.Thread_num = i + 1;thread_arr.push_back(T_data);}}//线程等待void Thread_join(){for(int i = 0 ;i < thread_arr.size() ; ++i){pthread_join(thread_arr[i].tid,nullptr);}}//向线程池中加入任务void Push(Task T){Queue_.Push(T);}void Push(Task && T){Queue_.Push(std::forward<Task>(T));}
private://线程函数-->该函数没有在类外调用,所以无须在类体外定义static void* Routine(void * args){Thread_Pool<Task> * Pool = static_cast<Thread_Pool<Task> *>(args);while(true){std::cout << "Thread prepare to work\n" << std::endl;Task Thread_Task = Pool->Queue_.Pop();//要求Task类重载()-->用于执行具体任务Thread_Task();}return nullptr;}
};//初始化静态指针
template<class Task>
Thread_Pool<Task> * Thread_Pool<Task>::ptr_ = nullptr;
template<class Task>
std::mutex Thread_Pool<Task>::lock_;//注意C++的类模板静态成员函数需要在类体外进行定义
template<class Task>
Thread_Pool<Task> * Thread_Pool<Task>::Getinstance(){if(ptr_ == nullptr){lock_.lock();if(ptr_ == nullptr){ptr_ = new Thread_Pool<Task>;}lock_.unlock();}return ptr_;
}

序列化反序列化工具模块

  • 序列反序列化是保证通信过程中数据完整性的关键步骤,保证数据语义完整,结构完整

在这里插入图片描述

#pragma once
#include <iostream>
#include <string>// 自定义序列化反序列化协议
const std::string blank_space_sep = " ";
const std::string protocol_sep = "\n";
//封装报文
std::string Encode(std::string &content){//报文正文字节数std::string package = std::to_string(content.size());package += protocol_sep;package += content;    //用分隔符封装正文package += protocol_sep;return package;
}//解析报文package-->"正文长度"\n"正文"\n
bool Decode(std::string &package, std::string& content){size_t pos = package.find(protocol_sep);if(pos == std::string::npos) return false;//解析报文正文长度size_t Len = std::atoi(package.substr(0,pos).c_str());//确定报文是否完整size_t total_Len = pos + Len + 2;if(package.size() != total_Len) return false;//获取正文内容content = package.substr(pos+1,Len);package.erase(0,total_Len);return true;
}//用户层协议请求结构体
class Request{
public:int x;int y;char op; 
public:Request(int data1 , int data2 , char op): x(data1),y(data2),op(op){}Request(){}
public://请求结构体 序列化 成报文正文字符串 "x op y"bool Serialize(std::string& out){std::string content = std::to_string(x);content += blank_space_sep;content += op;content += blank_space_sep;content += std::to_string(y);out = content;return true;// 等价的jason代码// Json::Value root;// root["x"] = x;// root["y"] = y;// root["op"] = op;// // Json::FastWriter w;// Json::StyledWriter w;// out = w.write(root);// return true;}//报文正文字符串 反序列化 成请求结构体// "x op y"bool Deserialize(const std::string &in) {size_t left = in.find(blank_space_sep);if(left == std::string::npos)return false;x = std::stoi(in.substr(0,left).c_str());std::size_t right = in.rfind(blank_space_sep);if (right == std::string::npos)return false;y = std::atoi(in.substr(right + 1).c_str());if(left + 2 != right) return false;op = in[left+1];return true;// 等价的jason代码// Json::Value root;// Json::Reader r;// r.parse(in, root);// x = root["x"].asInt();// y = root["y"].asInt();// op = root["op"].asInt();// return true;}void DebugPrint(){std::cout << "新请求构建完成:  " << x << op << y << "=?" << std::endl;}
};//用户层协议请求回应结构体
class Response{
public:int result;int code; 
public:Response(int res , int c): result(res),code(c){}Response(){}
public://请求回应结构体 序列化 成报文正文字符串 "result code"bool Serialize(std::string& out){std::string s = std::to_string(result);s += blank_space_sep;s += std::to_string(code);out = s;return true;// 等价的jason代码// Json::Value root;// root["result"] = result;// root["code"] = code;// // Json::FastWriter w;// Json::StyledWriter w;// out = w.write(root);// return true;}//"result code"//报文正文字符串 反序列化 成请求回应结构体bool Deserialize(const std::string &in) {std::size_t pos = in.find(blank_space_sep);if (pos == std::string::npos)return false;if(pos == 0 || pos == in.size() - 1) return false;result = std::stoi(in.substr(0, pos).c_str());code = std::stoi(in.substr(pos+1).c_str());return true;// 等价的jason代码// Json::Value root;// Json::Reader r;// r.parse(in, root);// result = root["result"].asInt();// code = root["code"].asInt();// return true;}void DebugPrint(){std::cout << "结果响应完成, result: " << result << ", code: "<< code << std::endl;}
};

通信信道建立模块

#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>   
#include <sys/socket.h>
#include "log.hpp"
#include <memory.h>
#include <arpa/inet.h>
#include <netinet/in.h>namespace MySocket{//Tcp通讯构建器class TcpServer{enum{UsageError = 1,SocketError,BindError,ListenError,};private:int socketfd_ = -1;std :: string ip_;uint16_t port_;int backlog_ = 10;public:TcpServer(const std::string& ip = "172.19.29.44", uint16_t port = 8081) : ip_(ip) , port_(port){}~TcpServer(){if(socketfd_ > 0) close(socketfd_);}public://确定通信协议,建立文件描述符void BuildSocket(){socketfd_ = socket(AF_INET,SOCK_STREAM,0);if(socketfd_ < 0){lg(Fatal,"socket error,%s\n",strerror(errno));exit(SocketError);}}//文件描述符与服务器ip : 端口号绑定void SocketBind(){struct sockaddr_in addr;memset(&addr,0,sizeof(addr));addr.sin_port = htons(port_);addr.sin_family = AF_INET;addr.sin_addr.s_addr = inet_addr(ip_.c_str());if(bind(socketfd_,(const sockaddr*)&addr,sizeof(addr)) < 0){lg(Fatal,"socket bind error,%s\n",strerror(errno));exit(BindError);}lg(Info,"socket bind success\n");}//启动服务监听,等待客户端的连接void Socklisten(){if(socketfd_ <= 0){lg(Fatal,"socket error,%s\n",strerror(errno));exit(SocketError);}if(listen(socketfd_,backlog_) < 0){lg(Fatal, "listen error, %s: %d", strerror(errno), errno);exit(ListenError);}}//服务器接收客户端的连接-->并创建用于通信的文件描述符-->一个客户端连接对应一个文件描述符int SockAccept(std::string& cilent_ip, uint16_t& cilent_port){struct sockaddr_in client_addr;  // 输出型参数,用于获取用户的ip : 端口号memset(&client_addr,0,sizeof(client_addr));socklen_t Len = sizeof(client_addr);int newfd = accept(socketfd_,(struct sockaddr*)&client_addr,&Len);if(newfd < 0){lg(Warning, "accept error, %s: %d", strerror(errno), errno);return -1;}//提取客户端信息-->输出参数char ipstr[64];cilent_ip = inet_ntop(AF_INET,&client_addr.sin_addr,ipstr,sizeof(ipstr));cilent_ip = ipstr;cilent_port = ntohs(client_addr.sin_port);return newfd;}public:int Get_Server_fd(){return socketfd_;}void Close_fd(){if(socketfd_ > 0){close(socketfd_);socketfd_ = -1;}}};
};

服务器主体模块

在这里插入图片描述

#pragma once
#include "ThreadPool.cpp"
#include "TcpServer.cpp"
#include "CalTask.cpp"
#include "log.hpp"
#include <signal.h>//构建计算器服务器
class CalServer{const int size = 2048;
private:Thread_Pool<CalTask> * Pool_ptr_;MySocket::TcpServer Socket_;int Socket_fd_ = -1;
public:CalServer(const std::string& de_ip = "172.19.29.44",uint16_t de_port = 8081): Socket_(de_ip,de_port){Pool_ptr_ = Thread_Pool<CalTask>::Getinstance();if(Pool_ptr_ == nullptr){lg(Fatal,"Pool_ptr_ is nullptr\n");return;}Pool_ptr_->Create_thread();}~CalServer(){}
public://建立Tcp连接条件bool Init(){Socket_.BuildSocket();Socket_fd_ = Socket_.Get_Server_fd();if(Socket_fd_ < 0){lg(Fatal,"BuildSocket failed\n");return true;}Socket_.SocketBind();Socket_.Socklisten();lg(Info, "init server .... done");return true;}//启动服务器void Start(){signal(SIGCHLD, SIG_IGN);signal(SIGPIPE, SIG_IGN);char ReadBuffer[size];while(true){//接受用户请求std::string client_ip;uint16_t client_port;int client_fd = Socket_.SockAccept(client_ip,client_port);if(client_fd < 0){lg(Warning,"SockAccept error\n");continue;}lg(Info, "accept a new link, sockfd: %d, clientip: %s, clientport: %d", client_fd, client_ip.c_str(), client_port);int n = read(client_fd,ReadBuffer,sizeof(ReadBuffer));ReadBuffer[n] = 0;  std::string TaskStr(ReadBuffer);printf("receives mess from client : %s",ReadBuffer);if(n < 0){lg(Warning,"read error\n");break;}CalTask task(client_fd,client_ip,client_port,TaskStr);Pool_ptr_->Push(task);}}
};

任务回调模块(根据具体应用场景可重构)

#pragma once
#include <string>
#include "ThreadPool.cpp"
#include "Protocol.cpp"enum{Div_Zero = 1,Mod_Zero,Other_Oper
};class CalTask{
private:int socketfd_;                //网络通信文件描述符std :: string ip_;            //客户端ipuint16_t port_;               //客户端端口号std::string package_;         //客户请求字符串
public:CalTask(int socketfd,const std::string& ip , uint16_t & port,std::string & str): socketfd_(socketfd),ip_(ip),port_(port),package_(str){}CalTask(){}//类一定要有默认构造函数~CalTask(){}
public://执行计算任务并将结果发送给用户void operator() (){std::cout << "Task Running ... \n" << std::endl;std::string content;//将用户发送的报文进行解包获取正文bool r = Decode(package_, content);if (!r)return;//将报文正文进行反序列化Request req;r = req.Deserialize(content);if (!r)return ;req.DebugPrint();content = ""; //构建计算结果                         Response resp = CalculatorHelper(req);resp.DebugPrint();//计算结果序列化成字符串resp.Serialize(content);//字符串正文封装成报文发送给用户std::string ResStr = Encode(content);write(socketfd_,ResStr.c_str(),ResStr.size());if(socketfd_ > 0)close(socketfd_);}private:Response CalculatorHelper(const Request &req){//构建请求回应结构体Response resp(0, 0);switch (req.op){case '+':resp.result = req.x + req.y;break;case '-':resp.result = req.x - req.y;break;case '*':resp.result = req.x * req.y;break;case '/':{if (req.y == 0)resp.code = Div_Zero;elseresp.result = req.x / req.y;}break;case '%':{if (req.y == 0)resp.code = Mod_Zero;elseresp.result = req.x % req.y;}break;default:resp.code = Other_Oper;break;}return resp;}
};

Tips:DebugC++代码过程中遇到的问题记录

  • 使用C++类模板时,若在类模板中定义了静态成员函数,且该静态成员函数在类外被调用,则该静态成员函数必须定义在类外,不然链接器无法找到函数体.
  • 注意类模板静态成员的声明格式需要加关键字temlpate<>
  • 声明类模板静态成员无需特化模版类型参数
  • 跨主机并发通信测试:
    在这里插入图片描述
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/695189.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

el-table同时固定左列和右列时,出现错误情况

最近遇到一个问题,就是需求是要求表格同时固定序号列和操作列,我们用的是饿了么组件库的el-table,如下图,出现了错误情况: 解决方法就是使用doLayout方法: 如果使用了keep-alive,可以在activated里执行doLayout方法: activated() {this.$nextTick(() => {this.$ref…

【Crypto | CTF】BUUCTF RSA2

天命&#xff1a;密码学越来越难了&#xff0c;看别人笔记都不知道写啥 天命&#xff1a;莫慌&#xff0c;虽然我不会推演法&#xff0c;但我可以用归纳法 虽然我不知道解题的推演&#xff0c;但我可以背公式啊哈哈哈 虽然我不会这题&#xff0c;但是我也能做出来 公式我不知…

机器学习基本概念(李宏毅课程)

目录 一、概念:1、机器学习概念:2、深度学习概念&#xff1a; 二、深度学习中f(.)的输入和输出&#xff1a;1、输入&#xff1a;2、输出&#xff1a; 三、三种机器学习任务&#xff1a;1、Regression回归任务介绍&#xff1a;2、Classification分类任务介绍&#xff1a;3、Stru…

计算以10为底的对数 math.log10(x)

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 计算以10为底的对数 math.log10(x) [太阳]选择题 以下代码的输出结果中正确的是? import math print("【执行】math.log10(10)") print(math.log10(10)) print("【执行】math…

【elasticsearch实战】知识库文件系统检索工具FSCrawler

需求背景 最近有一个需求需要建设一个知识库文档检索系统&#xff0c;这些知识库物料附件的文档居多&#xff0c;有较多文档格式如&#xff1a;PDF, Open Office, MS Office等&#xff0c;需要将这些格式的文件转化成文本格式&#xff0c;写入elasticsearch 的全文检索索引&am…

进度条小程序

文章目录 铺垫回车换行缓冲区概述强制冲刷缓冲区 简单实现倒计时功能进度条小程序版本一实例代码效果展示分析 版本二 铺垫 回车换行 回车和换行是两个独立的动作 回车是将光标移动到当前行的最开始&#xff08;最左侧&#xff09; 换行是竖直向下平移一行 在C语言中&…

怿星科技测试实验室(EPT LABS)服务介绍

据中国汽车工业协会数据&#xff0c;2023年我国汽车产销量分别达3016.1万辆和3009.4万辆&#xff0c;年产销量双双创历史新高&#xff0c;汽车行业进入了新时代。新汽车时代下的OEM竞争更激烈&#xff0c;汽车电子架构更复杂&#xff0c;研发周期更短&#xff0c;软件迭代更快&…

VSCODE中使用Django处理后端data和data models

链接&#xff1a; Python and Django tutorial in Visual Studio Code MVC的理解 在实际的程序中采用MVC的方式进行任务拆分。 Model&#xff08;模型&#xff09;负责封装应用程序的数据和业务逻辑部分。Model包含数据结构&#xff0c;数据处理逻辑以及相关的操作方法&#…

NetSuite 中Inventory Adjustment批次物料CSV导入分析二

上一篇最后我们有一个遗留问题是说&#xff0c;调增和调减的操作是否能在一个CSV导入模版中进行操作&#xff0c;经过测试后发现&#xff0c;是可以的&#xff0c;只是External ID需要在设置的时候注意对应好就OK。这里建议大家先查看上一篇文章&#xff0c;因为有一些完全重复…

Day14-Linux系统基础权限知识精讲

Day14-Linux系统基础权限知识精讲 1. chattr2. Linux系统权限。2.1 基础权限介绍2.2 画图讲解2.3 文件和目录权限细节总结2.4 建环境测试2.5 数字权限设置2.6 字符权限设置 给文件加特殊属性&#xff0c;实现特殊功能的命令。 1. chattr a 只能追加内容&#xff0c;不能删除。…

UE蓝图 入口(FunctionEntry)节点和源码

系列文章目录 UE蓝图 Get节点和源码 UE蓝图 Set节点和源码 UE蓝图 Cast节点和源码 UE蓝图 分支(Branch)节点和源码 UE蓝图 入口(FunctionEntry)节点和源码 文章目录 系列文章目录一、FunctionEntry节点功能二、入口节点用法1. 创建函数2. 命名函数3. 定义参数4. 编写函数逻辑5…

Git合并固定分支的某一部分至当前分支

在 Git 中&#xff0c;通常使用 git merge 命令来将一个分支的更改合并到另一个分支。如果你只想合并某个分支的一部分代码&#xff0c;可以使用以下两种方法&#xff1a; 1.批量文件合并 1.1.创建并切换到一个新的临时分支 首先&#xff0c;从要合并的源分支&#xff08;即要…

C++面向对象程序设计-北京大学-郭炜【课程笔记(四)】

C面向对象程序设计-北京大学-郭炜【课程笔记&#xff08;四&#xff09;】 1、this指针1.1、this指针的作用1.2、this指针和静态成员函数 2、静态成员变量和静态成员函数2.1、基本概念2.2、基本概念总结2.3、如何访问静态成员2.4、静态成员变量的使用场景&#xff08;重要&…

浏览器---浏览器/http相关面试题

1.localStorage和sessionStorage 共同点&#xff1a;二者都是以key-value的键值对方式存储在浏览器端&#xff0c;大小大概在5M。 区别&#xff1a; &#xff08;1&#xff09;数据有效期不同&#xff1a;sessionStorage仅在当前浏览器窗口关闭之前有效&#xff1b;localStorag…

Eigen:Vector3d 变量初始化遇到的问题

Eigen:Vector3d 变量初始化遇到的问题 2024.2.22 日 &#xff0c;在使用 Eigen:Vector3d 这个类型的 变量&#xff0c;在类中进行初始化时 遇到了如下问题&#xff1a; 首先在类的声明内部&#xff0c;是不能声明完&#xff0c;再给变量赋值的&#xff0c;不管是 Eigen:Vector…

【 Flutter】安装、运行坑记录

运行demo报错 Exception in thread “main” java.net.ConnectException: Connection timed out: connect原因&#xff1a;网络问题&#xff0c;gradle包未能下载 解决方案&#xff1a;配置android studio代理&#xff0c;重新打开项目&#xff0c;as会自动下载缺失依赖

(done) 如何判断一个矩阵是否可逆?

参考视频&#xff1a;https://www.bilibili.com/video/BV15H4y1y737/?spm_id_from333.337.search-card.all.click&vd_source7a1a0bc74158c6993c7355c5490fc600 这个视频里还暗含了一些引理 1.若 AX XB 且 X 和 A,B 同阶可逆&#xff0c;那么 A 和 B 相似。原因&#xff1…

安卓开发:挑战每天发布一个封装类02--Wav录音封装类AudioChannel 1.0

简介 库名称&#xff1a;AudioChannel 版本:1.0 由于项目需求录音并base64编码存到服务器中&#xff0c;就顺手改装了一个别人的封装类 原封装类地址:Android AudioRecord音频录制wav文件输出 - 简书 (jianshu.com) 描述&#xff1a;此封装类基于AudioRecord实现wav的音频…

WEB相关工具(wget、curl、ab)

目录 一、wget 1、wget基本语法 2、wget帮助的更多选项 二、curl 1、curl基本语法 2、curl命令下载 3、curl命令基本用法 3.1 curl伪装 3.2 提取状态码 3.3 提取本地IP地址 3.4 提取远端服务器IP地址 3.5 提取本地端口 3.6 提取远端服务器端口 三、压力测试工具…

Unity xLua开发环境搭建与基础进阶

Unity是一款非常流行的游戏开发引擎&#xff0c;而xLua是一个为Unity开发者提供的Lua框架&#xff0c;可以让开发者使用Lua语言来进行游戏开发。在本文中&#xff0c;我们将介绍如何搭建Unity xLua开发环境&#xff0c;并进行基础进阶的学习。 环境搭建 首先&#xff0c;我们需…