网络程序 -- TCP版服务器

一 多进程版TCP服务器

1.1 核心功能

  对于之前编写的 字符串回响程序 来说,如果只有一个客户端进行连接并通信,是没有问题的,但如果有多个客户端发起连接请求,并尝试进行通信,服务器是无法应对的

  原因在于 服务器是一个单进程版本,处理连接请求 和 业务处理 是串行化执行的,如果想处理下一个连接请求,需要把当前的业务处理完成。

具体表现为下面这种情况:

 为什么客户端B会显示当前已经连接成功?

  这是因为是客户端是主动发起连接请求的一方,在请求发出后,如果出现连接错误,客户端就认为已经连接成功了,但实际上服务器还没有处理这个连接请求.

  这显然是服务器的问题,处理连接请求业务处理 应该交给两个不同的执行流完成,可以使用多进程或者多线程解决,这里先采用多进程的方案

  所以当前需要实现的网络程序核心功能为:当服务器成功处理连接请求后,fork 新建一个子进程,用于进行业务处理,原来的进程专注于处理连接请求。

1.2 创建子进程

注:当前的版本的修改只涉及 StartServer() 函数

创建子进程使用 fork() 函数,它的返回值含义如下

  • ret == 0 表示创建子进程成功,接下来执行子进程的代码
  • ret > 0 表示创建子进程成功,接下来执行父进程的代码
  • ret < 0 表示创建子进程失败

  子进程创建成功后,会继承父进程的文件描述符表,能轻而易举的获取客户端的 socket 套接字,从而进行网络通信

当然不止文件描述符表,得益于 写时拷贝 机制,子进程还会共享父进程的变量,当发生修改行为时,才会自己创建。

注意: 当子进程取走客户端的 socket 套接字进行通信后,父进程需要将其关闭(因为它不需要了),避免文件描述符泄漏

StartServer() 服务器启动函数 — 位于 server.hppTcpServer

// 进程创建、等待所需要的头文件
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>//启动服务器void StartServer(){// 忽略 SIGCHLD 信号//signal(SIGCHLD, SIG_IGN);while(!_quit){//1 处理连接请求struct sockaddr_in client;socklen_t len = sizeof(client);int sock = accept(_listensock,(struct sockaddr*)&client,&len);//2 如果连接失败 继续尝试连接if(sock == -1){std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;continue;}// 连接成功,获取客户端信息std::string clientip = inet_ntoa(client.sin_addr);uint16_t clientport = ntohs(client.sin_port);std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl;//3 创建子进程 pid_t id=fork();if(id<0){// 创建子进程失败,暂时不与当前客户端建立通信会话close(sock);std::cerr<<"Fork Fail!"<<std::endl;}else if( 0 == id){//进入子进程// 子进程拥有父进程相同的文件描述符,建议把不用的关闭close(_listensock);// 执行业务处理函数//4 这里因为是字节流传递,一般而言我们会自己写一个函数Service(sock,clientip,clientport);exit(0);}else {// 父进程需要等待子进程pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待//更改为非阻塞// pid_t ret = waitpid(id,nullptr,WNOHANG);if(ret == id){std::cout << "Wait " << id << " success!";}}}}

  虽然此时成功创建了子进程,但父进程(处理连接请求)仍然需要等待子进程退出后,才能继续运行,而不能和我们想象中一样单独进行处理连接请求函数,说白了就是 父进程现在处于阻塞等待状态,需要设置为 非阻塞等待.

1.3 设置非阻塞状态

设置父进程为非阻塞的方式有很多,这里来一一列举

方式一:通过参数设置为非阻塞等待(不推荐)

可以直接给 waitpid() 函数的参数3传递 WNOHANG,表示当前为 非阻塞等待.

pid_t ret = waitpid(id, nullptr, WNOHANG); // 设置为非阻塞式等待

  这种方法可行,但不推荐,原因如下:虽然设置成了非阻塞式等待,但父进程终究是需要通过 waitpid() 函数来尝试等待子进程,倘若父进程一直卡在 accept() 函数处,会导致子进程退出后暂时无人收尸,进而导致资源泄漏。

方式二:忽略 SIGCHLD 信号(推荐使用)

  这是一个子进程在结束后发出的信号,默认动作是什么都不做;父进程需要检测并回收子进程,我们可以直接忽略该信号,这里的忽略是个特例,只是父进程不对其进行处理,转而由 操作系统 对其负责,自动清理资源并进行回收,不会产生 僵尸进程。

 //启动服务器void StartServer(){// 忽略 SIGCHLD 信号signal(SIGCHLD, SIG_IGN);while(!_quit){//1 处理连接请求struct sockaddr_in client;socklen_t len = sizeof(client);int sock = accept(_listensock,(struct sockaddr*)&client,&len);//2 如果连接失败 继续尝试连接if(sock == -1){std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;continue;}// 连接成功,获取客户端信息std::string clientip = inet_ntoa(client.sin_addr);uint16_t clientport = ntohs(client.sin_port);std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl;//3 创建子进程 pid_t id=fork();if(id<0){// 创建子进程失败,暂时不与当前客户端建立通信会话close(sock);std::cerr<<"Fork Fail!"<<std::endl;}else if( 0 == id){//进入子进程// 子进程拥有父进程相同的文件描述符,建议把不用的关闭close(_listensock);// 执行业务处理函数//4 这里因为是字节流传递,一般而言我们会自己写一个函数Service(sock,clientip,clientport);exit(0);}// else {//   // 父进程需要等待子进程//     //pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待//     //更改为非阻塞//      pid_t ret = waitpid(id,nullptr,WNOHANG);//     if(ret == id){//      std::cout << "Wait " << id << " success!";//     }// }}}

强烈推荐使用该方案,因为操作简单,并且没有后患之忧。

方式三:设置 SIGCHLD 信号的处理动作为子进程回收(不是很推荐)

  当子进程退出并发送该信号时,执行父进程回收子进程的操作。

  设置 SIGCHLD 信号的处理动作为 回收子进程后,父进程同样不必再考虑回收子进程的问题

  注意: 因为现在处于 TcpServer 类中,handler() 函数需要设置为静态(避免隐含的 this 指针),避免不符合 signal() 函数中信号处理函数的参数要求。

 // 需要设置为静态static void handler(int signo){printf("进程 %d 捕捉到了 %d 号信号\n", getpid(), signo);// 这里的 -1 表示父进程等待时,只要是已经退出了的子进程,都可以进行回收while (1){pid_t ret = waitpid(-1, NULL, WNOHANG);if (ret > 0)printf("父进程: %d 已经成功回收了 %d 号进程\n", getpid(), ret);elsebreak;}printf("子进程回收成功\n");}//启动服务器void StartServer(){// 设置 SIGCHLD 信号的处理动作signal(SIGCHLD, handler);// 忽略 SIGCHLD 信号// signal(SIGCHLD, SIG_IGN);while(!_quit){//1 处理连接请求struct sockaddr_in client;socklen_t len = sizeof(client);int sock = accept(_listensock,(struct sockaddr*)&client,&len);//2 如果连接失败 继续尝试连接if(sock == -1){std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;continue;}// 连接成功,获取客户端信息std::string clientip = inet_ntoa(client.sin_addr);uint16_t clientport = ntohs(client.sin_port);std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl;//3 创建子进程 pid_t id=fork();if(id<0){// 创建子进程失败,暂时不与当前客户端建立通信会话close(sock);std::cerr<<"Fork Fail!"<<std::endl;}else if( 0 == id){//进入子进程// 子进程拥有父进程相同的文件描述符,建议把不用的关闭close(_listensock);// 执行业务处理函数//4 这里因为是字节流传递,一般而言我们会自己写一个函数Service(sock,clientip,clientport);exit(0);}else {// 父进程需要等待子进程pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待//更改为非阻塞// pid_t ret = waitpid(id,nullptr,WNOHANG);if(ret == id){std::cout << "Wait " << id << " success!";}}}}

为什么不是很推荐这种方法?因为这种方法实现起来比较麻烦,不如直接忽略 SIGCHLD 信号

方式四:设置孙子进程(不是很推荐)

  众所周知,父进程只需要对子进程负责,至于孙子进程交给子进程负责,如果某个子进程的父进程终止运行了,那么它就会变成 孤儿进程,父进程会变成 1 号进程,也就是由操作系统领养,回收进程的重担也交给了操作系统

  可以利用该特性,在子进程内部再创建一个子进程(孙子进程),然后子进程退出,父进程可以直接回收(不必阻塞),子进程(孙子进程)的父进程变成 1 号进程

  这种实现方法比较巧妙,而且与我们后面即将学到的 守护进程 有关

  注意: 使用这种方式时,父进程是需要等待子进程退出的。

   这种方法代码也很简单,我们也不再做过多示例,但依旧不推荐,因为倘若连接请求变多,会导致孤儿进程变多,孤儿进程由操作系统接管,数量变多会给操作系统带来负担

  以上就是设置 非阻塞 的四种方式,推荐使用方式二:忽略 SIGCHLD 信号。

  至此我们的 字符串回响程序 可以支持多客户端了。

细节补充:当子进程取走 sock 套接字进行网络通信后,父进程就不需要使用 sock 套接字了,可以将其进行关闭,下次连接时继续使用,避免文件描述符不断增长。

StartServer() 服务器启动函数 — 位于 server.hpp 服务器头文件中的 TcpServer

// 启动服务器
void StartServer()
{// 忽略 SIGCHLD 信号signal(SIGCHLD, SIG_IGN);while (!_quit){// 1.处理连接请求// ...// 2.如果连接失败,继续尝试连接// ...// 连接成功,获取客户端信息// ...// 3.创建子进程// ...close(sock); // 父进程不再需要资源(建议关闭)}
}

  这个补丁可以减少资源消耗,建议加上,前面是忘记加了,并且不太好修改,server.hpp 服务器头文件完整代码如下:
 

// server.hpp#pragma once#include <signal.h>
#include<iostream>
#include<string>
#include<functional>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"err.hpp"
#include<cstring>
#include<unistd.h>
#include<cerrno>
#include <sys/types.h>
#include <sys/wait.h>namespace My_server{// 默认端口号const uint16_t default_port = 8088;//全连接队列的最大长度const int backlog = 32;using func_t =std::function<std::string(std::string)>;class server{private:/* data *///套接字int _listensock;//端口号uint16_t _port;// 判断服务器是否结束运行bool _quit;// 外部传入的回调函数func_t _func;public:server(const func_t &func,const uint16_t &port = default_port):_func(func),_port(port),_quit(false){}~server(){}//初始化服务器void InitServer(){//1 创建套接字_listensock = socket(AF_INET,SOCK_STREAM,0);if(_listensock == -1){//绑定失败std::cerr<<"Create Socket Fail:"<<strerror(errno)<<std::endl;exit(SOCKET_ERR);}std::cout<<"Create Socket Success!" <<_listensock<<std::endl;//2 绑定端口号和IP地址struct sockaddr_in local;bzero(&local,sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;if(bind(_listensock,(const sockaddr*)&local,sizeof(local))){std::cerr << "Bind IP&&Port Fali" << strerror(errno) << std::endl;exit(BIND_ERR);}//3 开始监听if(listen(_listensock,backlog)== -1){std::cerr<<"Listen Fail: "<< strerror(errno) << std::endl;//新增一个报错exit(LISTEN_ERR);}std::cout<<"Listen Success!"<<std::endl;}// // 需要设置为静态//     static void handler(int signo){//         printf("进程 %d 捕捉到了 %d 号信号\n", getpid(), signo);//         // 这里的 -1 表示父进程等待时,只要是已经退出了的子进程,都可以进行回收//         while (1){//             pid_t ret = waitpid(-1, NULL, WNOHANG);//             if (ret > 0)//                 printf("父进程: %d 已经成功回收了 %d 号进程\n", getpid(), ret);//             else//                 break;//         }//         printf("子进程回收成功\n");//     }//启动服务器void StartServer(){// 设置 SIGCHLD 信号的处理动作//signal(SIGCHLD, handler);// 忽略 SIGCHLD 信号signal(SIGCHLD, SIG_IGN);while(!_quit){//1 处理连接请求struct sockaddr_in client;socklen_t len = sizeof(client);int sock = accept(_listensock,(struct sockaddr*)&client,&len);//2 如果连接失败 继续尝试连接if(sock == -1){std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;continue;}// 连接成功,获取客户端信息std::string clientip = inet_ntoa(client.sin_addr);uint16_t clientport = ntohs(client.sin_port);std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl;//3 创建子进程 pid_t id=fork();if(id<0){// 创建子进程失败,暂时不与当前客户端建立通信会话close(sock);std::cerr<<"Fork Fail!"<<std::endl;}else if( 0 == id){//进入子进程// 子进程拥有父进程相同的文件描述符,建议把不用的关闭close(_listensock);// 执行业务处理函数//4 这里因为是字节流传递,一般而言我们会自己写一个函数Service(sock,clientip,clientport);exit(0);}// else {//   // 父进程需要等待子进程//     pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待//     //更改为非阻塞//     // pid_t ret = waitpid(id,nullptr,WNOHANG);//     if(ret == id){//      std::cout << "Wait " << id << " success!";//     }// }close(sock); // 父进程不再需要资源(建议关闭)}}void Service(int sock,std::string &clientip,const uint16_t &clientport){char buff[1024];std::string who = clientip + "-" + std::to_string(clientport);while(true){// 以字符串格式读取,预留\0的位置ssize_t n = read(sock,buff,sizeof(buff)-1);if(n>0){//读取成功buff[n]='\0';std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl;//实际处理可以交给上层逻辑指定std::string respond = _func(buff);write(sock,buff,strlen(buff));}else if(n==0){//表示当前读到了文件末尾,结束读取std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl;close(sock);break;}else{// 读取出问题(暂时)std::cerr << "Read Fail!" << strerror(errno) << std::endl;close(sock); // 关闭文件描述符break;}    }}};}

二 多线程版服务器

2.1 核心功能

通过多线程,实现支持多客户端同时通信的服务器

核心功能:服务器与客户端成功连接后,创建一个线程,服务于客户端的业务处理

'这里先通过 原生线程库 模拟实现.

2.2 使用原生线程库

  线程的回调函数中需要 Service() 业务处理函数中的所有参数,同时也需要具备访问 Service() 业务处理函数的能力,单凭一个 void* 的参数是无法解决的,为此可以创建一个类,里面可以包含我们所需要的参数。

ThreadData 类 — 位于 server.hpp 服务器头文件中。

   //包含我们所需参数的类型class ThreadData{public:ThreadData(int sock,const std::string&ip,const uint16_t&port,server*ptr):_sock(sock),_clientip(ip),_clientport(port),_current(ptr){}public:int _sock;std::string _clientip;uint16_t _clientport;server* _current;};

接下来就可以考虑如何借助多线程了

线程创建后,需要关闭不必要的 socket 套接字吗?

  • 不需要,线程之间是可以共享这些资源的,无需关闭

如何设置主线程不必等待次线程退出?

  • 可以把次线程进行分离

  所以接下来我们需要在连接成功后,创建次线程,利用已有信息构建 ThreadData 对象,为次线程编写回调函数(最终目的是为了执行 Service() 业务处理函数)

注意: 因为当前在类中,线程的回调函数需要使用 static 设置为静态函数。

server.hpp 服务器头文件

// server.hpp#pragma once#include<iostream>
#include<string>
#include<functional>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"err.hpp"
#include<cstring>
#include<unistd.h>
#include<cerrno>namespace My_server{// 默认端口号const uint16_t default_port = 8088;//全连接队列的最大长度const int backlog = 32;using func_t = std::function<std::string(std::string)>;//前置声明class server;//包含我们所需参数的类型class ThreadData{public:ThreadData(int sock,const std::string&ip,const uint16_t&port,server*ptr):_sock(sock),_clientip(ip),_clientport(port),_current(ptr){}public:int _sock;std::string _clientip;uint16_t _clientport;server* _current;};class server{private:/* data *///套接字int _listensock;//端口号uint16_t _port;// 判断服务器是否结束运行bool _quit;// 外部传入的回调函数func_t _func;public:server(const func_t &func,const uint16_t &port = default_port):_func(func),_port(port),_quit(false){}~server(){}//初始化服务器void InitServer(){//1 创建套接字_listensock = socket(AF_INET,SOCK_STREAM,0);if(_listensock == -1){//绑定失败std::cerr<<"Create Socket Fail:"<<strerror(errno)<<std::endl;exit(SOCKET_ERR);}std::cout<<"Create Socket Success!" <<_listensock<<std::endl;//2 绑定端口号和IP地址struct sockaddr_in local;bzero(&local,sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;if(bind(_listensock,(const sockaddr*)&local,sizeof(local))){std::cerr << "Bind IP&&Port Fali" << strerror(errno) << std::endl;exit(BIND_ERR);}//3 开始监听if(listen(_listensock,backlog)== -1){std::cerr<<"Listen Fail: "<< strerror(errno) << std::endl;//新增一个报错exit(LISTEN_ERR);}std::cout<<"Listen Success!"<<std::endl;}//启动服务器void StartServer(){while(!_quit){//1 处理连接请求struct sockaddr_in client;socklen_t len = sizeof(client);int sock = accept(_listensock,(struct sockaddr*)&client,&len);//2 如果连接失败 继续尝试连接if(sock == -1){std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;continue;}// 连接成功,获取客户端信息std::string clientip = inet_ntoa(client.sin_addr);uint16_t clientport = ntohs(client.sin_port);std::cout<<"Server accept"<< clientip + "-"<< clientport <<sock<<" from "<<_listensock << "success!"<<std::endl;// 3.创建线程及所需要的线程信息类ThreadData* td = new ThreadData(sock, clientip, clientport, this);pthread_t p;pthread_create(&p, nullptr, Routine, td);}}// 线程回调函数static void* Routine(void* args){// 线程分离pthread_detach(pthread_self());ThreadData* td = static_cast<ThreadData*>(args);// 调用业务处理函数td->_current->Service(td->_sock, td->_clientip, td->_clientport);// 销毁对象delete td;return nullptr;}void Service(int sock,std::string &clientip,const uint16_t &clientport){char buff[1024];std::string who = clientip + "-" + std::to_string(clientport);while(true){// 以字符串格式读取,预留\0的位置ssize_t n = read(sock,buff,sizeof(buff)-1);if(n>0){//读取成功buff[n]='\0';std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl;//实际处理可以交给上层逻辑指定std::string respond = _func(buff);write(sock,buff,strlen(buff));}else if(n==0){//表示当前读到了文件末尾,结束读取std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl;close(sock);break;}else{// 读取出问题(暂时)std::cerr << "Read Fail!" << strerror(errno) << std::endl;close(sock); // 关闭文件描述符break;}    }}};}

因为当前使用了 原生线程库,所以在编译时,需要加上 -lpthread

Makefile 文件

.PHONY:all
all:server clientserver:server.ccg++ -o $@ $^ -std=c++11 -lpthreadclient:client.ccg++ -o $@ $^ -std=c++11 -lpthread.PHONY:clean
clean:rm -rf server client

  使用 原生线程库 过于单薄了,并且这种方式存在问题:连接都准备好了,才创建线程,如果创建线程所需要的资源较多,会拖慢服务器整体连接效率

为此可以改用之前实现的 线程池

三 线程池版服务器

3.1 ThreadPool.hpp 线程池头文件

#pragma once#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp" // CP模型namespace My_pool{const int THREAD_NUM = 10;template<class T>class ThreadPool{private:ThreadPool(int num = THREAD_NUM):_num(num){}~ThreadPool(){// 等待线程退出for(auto &t : _threads)t.join();}// 删除拷贝构造ThreadPool(const ThreadPool<T> &) = delete;public:static ThreadPool<T>* getInstance(){// 双检查if(_inst == nullptr){// 加锁LockGuard lock(&_mtx);if(_inst == nullptr){// 创建对象_inst = new ThreadPool<T>();// 初始化及启动服务_inst->init();_inst->start();}}return _inst;}public:void init(){// 创建一批线程for(int i = 0; i < _num; i++)_threads.push_back(Thread(i, threadRoutine, this));}void start(){// 启动线程for(auto &t : _threads)t.run();}// 提供给线程的回调函数(已修改返回类型为 void)static void threadRoutine(void *args){// 避免等待线程,直接剥离pthread_detach(pthread_self());auto ptr = static_cast<ThreadPool<T>*>(args);while (true){// 从CP模型中获取任务T task = ptr->popTask();task(); // 回调函数}}// 装载任务void pushTask(const T& task){_blockqueue.Push(task);}protected:T popTask(){T task;_blockqueue.Pop(&task);return task;}private:std::vector<Thread> _threads;int _num; // 线程数量My_Queue::BlockingQueue<T> _blockqueue; // 阻塞队列// 创建静态单例对象指针及互斥锁static ThreadPool<T> *_inst;static pthread_mutex_t _mtx;};// 初始化指针template<class T>ThreadPool<T>* ThreadPool<T>::_inst = nullptr;// 初始化互斥锁template<class T>pthread_mutex_t ThreadPool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;
}

3.2 Thread.hpp 封装实现的线程库头文件

#pragma once #include<iostream>
#include<pthread.h>
#include<string>//代表线程状态
enum class Status{NEW = 0,RUNNING ,EXIT
};// 参数。返回值为void* 返回值的函数类型
typedef void (*func_t)(void*);class Thread
{
private:pthread_t _tid; // 线程 IDstd::string _name; // 线程名Status _status; // 线程状态func_t _func; // 线程回调函数void* _args; // 传递给回调函数的参数
public:Thread(int num=0,func_t func = nullptr,void *args = nullptr):_tid(num),_func(func),_status(Status::NEW),_args(args){char name[1024];snprintf(name,sizeof(name),"thread - %d",num);_name = name;}~Thread(){}//获取线程名std::string getName() const{return _name;}// 获取状态Status getStatus() const{return _status;}// 回调方法static void* runHelper(void *args){Thread * myThis = static_cast<Thread*>(args);myThis->_func(myThis->_args);return nullptr;}//启动线程void run(){int ret = pthread_create(&_tid,nullptr,runHelper,this);if(0 != ret){std::cerr << "Thread create fail!"<<std::endl;exit(1);}_status = Status::RUNNING;}// 线程等待void join(){int ret = pthread_join(_tid,nullptr);if(0 != ret){if(0 != ret){std::cerr << "Thread join fail!"<<std::endl;exit(1);}}_status = Status::EXIT;}};

3.3 BlockingQueue.hpp 生产者消费者模型头文件

#pragma once#include <queue>
#include <mutex>
#include <pthread.h>
#include "LockGuard.hpp"namespace My_Queue{const int DEF_SIZE = 10;template<class T>class BlockingQueue{private:// 任务队列std::queue<T> _queue;size_t _cap; // 阻塞队列的容量pthread_mutex_t _mtx; // 互斥锁pthread_cond_t _pro_cond; // 生产者条件变量pthread_cond_t _con_cond; // 消费者条件变量public:BlockingQueue(size_t cap = DEF_SIZE):_cap(cap){// 初始化锁与条件变量pthread_mutex_init(&_mtx,nullptr);pthread_cond_init(&_pro_cond,nullptr);pthread_cond_init(&_con_cond,nullptr);}~BlockingQueue(){//销毁锁与条件变量pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_pro_cond);pthread_cond_destroy(&_con_cond);}// 生产数据(入队)void Push(const T& inData){// 加锁(RAII风格)LockGuard lock(&_mtx);// 循环判断条件是否满足while(IsFull()){pthread_cond_wait(&_pro_cond, &_mtx);}_queue.push(inData);// 可以加策略唤醒,比如生产一半才唤醒消费者pthread_cond_signal(&_con_cond);// 自动解锁}// 消费数据(出队)void Pop(T* outData){// 加锁(RAII 风格)LockGuard lock(&_mtx);// 循环判读条件是否满足while(IsEmpty()) {pthread_cond_wait(&_con_cond, &_mtx);}*outData = _queue.front();_queue.pop();// 可以加策略唤醒,比如消费完后才唤醒生产者pthread_cond_signal(&_pro_cond);// 自动解锁}private://判断是否为满bool IsFull(){return _queue.size() == _cap;}//判断是否为空bool IsEmpty(){return _queue.empty();}};}

3.4 LockGuard.hpp 自动化锁头文件

#pragma once#include<pthread.h>class LockGuard
{
private:pthread_mutex_t* _pmtx;
public:LockGuard(pthread_mutex_t *pmtx):_pmtx(pmtx){//加锁pthread_mutex_lock(_pmtx);}~LockGuard(){//解锁pthread_mutex_unlock(_pmtx);}
};

 3.5 Task.hpp 任务类

  现在需要修改 Task.hpp 任务头文件中的 Task 任务类,将其修改为一个服务于 网络通信中业务处理 的任务类(也就是 Service() 业务处理函数)

   在 Service() 业务处理函数中,需要包含 socket 套接字、客户端 IP、客户端端口号 等必备信息,除此之外,我们还可以将 可调用对象(Service() 业务处理函数) 作为参数传递给 Task 对象.

#pragma once#include <string>
#include <functional>namespace My_task{// Service() 业务处理函数的类型using cb_t = std::function<void(int, std::string, uint16_t)>;class Task{public:// 可以再提供一个默认构造(防止部分场景中构建对象失败)Task(){}Task(int sock, const std::string& ip, const uint16_t& port, const cb_t& cb):_sock(sock),_ip(ip),_port(port),_cb(cb){}// 重载运算操作,用于回调 [业务处理函数]void operator()(){// 直接回调 cb [业务处理函数] 即可_cb(_sock, _ip, _port);}private:int _sock;std::string _ip;uint16_t _port;cb_t _cb; // 回调函数};
}

3.6 server.hpp 头文件

准备工作完成后,接下来就是往 server.hpp 服务器头文件中添加组件了

注意:

  • 在构建 Task 对象时,需要使用 bind 绑定类内函数,避免参数不匹配
  • 当前的线程池是单例模式,在 Task 任务对象构建后,通过线程池操作句柄 push 对象即可
// server.hpp#pragma once#include<iostream>
#include<string>
#include<functional>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"err.hpp"
#include<cstring>
#include<unistd.h>
#include<cerrno>
#include"ThreadPool.hpp"
#include"Task.hpp"namespace My_server{// 默认端口号const uint16_t default_port = 1111;//全连接队列的最大长度const int backlog = 32;using func_t = std::function<std::string(std::string)>;//前置声明class server;//包含我们所需参数的类型class ThreadData{public:ThreadData(int sock,const std::string&ip,const uint16_t&port,server*ptr):_sock(sock),_clientip(ip),_clientport(port),_current(ptr){}public:int _sock;std::string _clientip;uint16_t _clientport;server* _current;};class server{private:/* data *///套接字int _listensock;//端口号uint16_t _port;// 判断服务器是否结束运行bool _quit;// 外部传入的回调函数func_t _func;public:server(const func_t &func,const uint16_t &port = default_port):_func(func),_port(port),_quit(false){}~server(){}//初始化服务器void InitServer(){//1 创建套接字_listensock = socket(AF_INET,SOCK_STREAM,0);if(_listensock == -1){//绑定失败std::cerr<<"Create Socket Fail:"<<strerror(errno)<<std::endl;exit(SOCKET_ERR);}std::cout<<"Create Socket Success!" <<_listensock<<std::endl;//2 绑定端口号和IP地址struct sockaddr_in local;bzero(&local,sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;if(bind(_listensock,(const sockaddr*)&local,sizeof(local))){std::cerr << "Bind IP&&Port Fali" << strerror(errno) << std::endl;exit(BIND_ERR);}//3 开始监听if(listen(_listensock,backlog)== -1){std::cerr<<"Listen Fail: "<< strerror(errno) << std::endl;//新增一个报错exit(LISTEN_ERR);}std::cout<<"Listen Success!"<<std::endl;}//启动服务器void StartServer(){while(!_quit){//1 处理连接请求struct sockaddr_in client;socklen_t len = sizeof(client);int sock = accept(_listensock,(struct sockaddr*)&client,&len);//2 如果连接失败 继续尝试连接if(sock == -1){std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl;continue;}// 连接成功,获取客户端信息std::string clientip = inet_ntoa(client.sin_addr);uint16_t clientport = ntohs(client.sin_port);std::cout<<"Server accept"<< clientip + "-"<< clientport <<sock<<" from "<<_listensock << "success!"<<std::endl;// 3.构建任务对象 注意:使用 bind 绑定 this 指针My_task::Task t(sock, clientip, clientport, std::bind(&server::Service, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 4.通过线程池操作句柄,将任务对象 push 进线程池中处理//s//std::cout<<std::endl<<"push Task"<<std::endl;My_pool::ThreadPool<My_task::Task>::getInstance()->pushTask(t);}}void Service(int sock,const std::string &clientip,const uint16_t &clientport){char buff[1024];std::string who = clientip + "-" + std::to_string(clientport);while(true){// 以字符串格式读取,预留\0的位置ssize_t n = read(sock,buff,sizeof(buff)-1);if(n>0){//读取成功buff[n]='\0';std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl;//实际处理可以交给上层逻辑指定std::string respond = _func(buff);write(sock,buff,strlen(buff));}else if(n==0){//表示当前读到了文件末尾,结束读取std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl;close(sock);break;}else{// 读取出问题(暂时)std::cerr << "Read Fail!" << strerror(errno) << std::endl;close(sock); // 关闭文件描述符break;}    }}};}

接下来编译并运行程序,当服务器启动后(此时无客户端连接),只有一个线程,这是因为我们当前的 线程池 是基于 懒汉模式 实现的,只有当第一次使用时,才会创建线程.

接下来启动客户端,可以看到确实创建了一批次线程(十个)

  看似程序已经很完善了,其实隐含着一个大问题:当前线程池中的线程,本质上是在回调一个 while(true) 死循环函数,当连接的客户端大于线程池中的最大线程数时,会导致所有线程始终处于满负载状态,直接影响就是连接成功后,无法再创建通信会话(倘若客户端不断开连接,线程池中的线程就无力处理其他客户端的会话)

  说白了就是 线程池 比较适合用于处理短任务,对于当前的场景来说,线程池 不适合建立持久通信会话,应该将其用于处理 read 读取、write 写入 任务.

  如果想解决这个问题,有两个方向:Service() 函数中支持一次 [收 / 发],或者多线程+线程池,多线程用于构建通信会话,线程池则用于处理 [收 / 发] 任务

前者实现起来比较简单,无非就是把 Service() 业务处理函数中的 while(true) 循环去掉

Service() 业务处理函数

void Service(int sock,const std::string &clientip,const uint16_t &clientport){char buff[1024];std::string who = clientip + "-" + std::to_string(clientport);// 以字符串格式读取,预留\0的位置ssize_t n = read(sock,buff,sizeof(buff)-1);if(n>0){//读取成功buff[n]='\0';std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl;//实际处理可以交给上层逻辑指定std::string respond = _func(buff);write(sock,buff,strlen(buff));}else if(n==0){//表示当前读到了文件末尾,结束读取std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl;close(sock);}else{// 读取出问题(暂时)std::cerr << "Read Fail!" << strerror(errno) << std::endl;close(sock); // 关闭文件描述符}    }

至于后者就比较麻烦了,需要结合 高级IO 相关知识,这里不再阐述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/4138.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WIFI加密方式对无线速率的影响

文章目录 无线加密三种选择&#xff1a;WEP、WPA和WPA2测试平台和测试方法非加密和WEP加密测试 结果差别巨大非加密条件下 300M无线路由实测WEP加密条件下 300M无线路由实测 TKIP加密算法&#xff1a;WPA与WPA2成绩低迷WPA加密&#xff08;TKIP加密算法&#xff09;条件下 300M…

【C++】---STL之list详解

【C】---STL之list详解 一、了解list的基本信息二、成员函数1、构造2、迭代器3、empty()4、size()5、front()6、back()7、push_front()8、pop_front()9、push_back()10、pop_back()11、insert()12、erase()13、swap()14、sort()15、reverse() 一、了解list的基本信息 1、库里面…

Java同时使用@RequestBody和@RequestParam传参在postman中执行请求报错:Unsupported Media Type

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

C语言数据结构之栈

目录 1.栈的概念及结构2.栈的实现3.栈的代码实现4.相关例题 •͈ᴗ•͈ 个人主页&#xff1a;御翮 •͈ᴗ•͈ 个人专栏&#xff1a;C语言数据结构 •͈ᴗ•͈ 欢迎大家关注和订阅!!! 1.栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插…

ElasticSearch集群

我们ES集群主要解决的是这两个问题&#xff1a;海量数据存储问题、单点故障问题 海量数据存储问题&#xff1a;单机的ES&#xff0c;数据存储能力是有上限的 单点故障问题&#xff1a;如果单机上的Elasticsearch节点发生故障&#xff0c;整个系统会停止服务&#xff0c;导致数据…

Xcode隐私协议适配

1. Privacy manifest files 1.1 简介 自己App或三方SDK&#xff08;通过XCFrameworks|Swift packages|Xcode projects集成的&#xff09;需要包含一个隐私清单文件&#xff08;privacy manifest&#xff09;叫作 PrivacyInfo.xcprivacy。它是一个属性列表&#xff0c;记录了A…

[C++基础学习]----03-程序流程结构之循环结构详解

前言 在C程序中&#xff0c;循环结构在用于重复执行一段代码块&#xff0c;直到满足某个条件为止。循环结构有多种形式&#xff0c;包括while循环、do-while循环和for循环。 正文 01-循环结构简介 1、while循环语句&#xff1a; while循环在每次循环开始前检查条件是否为真&a…

【利兹】XJCO3910/COMP391001 Combinatorial Optimisation组合优化/运筹学 cw考试资料辅导

COMP391001| XJCO3910 (36642) 西交利兹院 Combinatorial Optimisation组合优化/运筹学 资料or辅导 需要请私聊 1.独家近年考试题 包你高分 2. cw and 官方标准答案 3. worksheets and solutions

【docker】安装openjdk

查看可用的 openjdk版本 docker hub 查看地址&#xff1a;https://hub.docker.com/_/openjdk 此图片已被正式弃用&#xff0c;建议所有用户尽快找到并使用合适的替代品。其他官方形象替代品的一些例子&#xff08;按字母顺序列出&#xff0c;没有有意或暗示的偏好&#xff09;…

机器学习:深入解析SVM的核心概念(问题与解答篇)【一、间隔与支持向量】

直接阅读原始论文可能有点难和复杂&#xff0c;所以导师直接推荐我阅读周志华的《西瓜书》&#xff01;&#xff01;然后仔细阅读其中的第六章&#xff1a;支持向量机 间隔与支持向量 问题一&#xff1a;什么叫法向量&#xff1f;为什么是叫法向量 在这个线性方程中&#xff…

给大一大二师生的忠告,如何在校招中脱颖而出做到降维打击

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是「奇点」&#xff0c;江湖人称 singularity。刚工作几年&#xff0c;想和大家一同进步&#x1f91d;&#x1f91d; 一位上进心十足的【Java ToB端大厂…

[超详细]Java子父类树形结构查询和删除[小白]

目录 前言 1、查询子父类通过树形结构封装起来 一、创建实体类 二、创建mapper类 三、创建service和serviceImpl类 四、创建controller类 2、删除该父类下的所有子类&#xff0c;并且删除自己 controller层 service和serviceImpl层 总结 前言 [超详细]Java子父类树形…

STM32与Proteus的串口仿真详细教程与源程序

资料下载地址&#xff1a;STM32与Proteus的串口仿真详细教程与源程序 资料内容 包含LCD1602显示&#xff0c;串口发送接收&#xff0c;完美实现。 文档内容齐全&#xff0c;包含使用说明&#xff0c;相关驱动等。 解决了STM32的Proteus串口收发问题。 注意&#xff1a;每输…

Datart 扩装下载功能之PDF和图片下载

Datart 扩装下载功能之PDF和图片下载 首先下载依赖 yum install mesa-libOSMesa-devel gnu-free-sans-fonts wqy-zenhei-fonts -y 然后下载安装chrome yum install https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm 查看chrome版本号 google…

基于Vue3实现的 宫格 图片摆放

一个可以支持无限宫格的 vue3实现 本来要参考微信群头像的规则实现&#xff0c;网上找到一大堆类似的需求&#xff0c;奈何XXX折磨人&#xff0c;九宫格已经不能满足ta了。 当前代码实现了………… 好多东西(可以多宫格).具体的看效果图 code <style scoped langless> .…

AI预测体彩排列3第2套算法实战化测试第5弹2024年4月27日第5次测试

今天继续进行新算法的测试&#xff0c;今天是第5次测试。好了&#xff0c;废话不多说了&#xff0c;直接上图上结果。 2024年4月27日体彩排3预测结果 6码定位方案如下&#xff1a; 百位&#xff1a;6、2、1、7、8、9 十位&#xff1a;8、9、4、3、1、0 个位&#xff1a;3、7、8…

【SpringBoot整合系列】SpringBoot整合Redis[附redis工具类源码]

目录 SpringBoot整合Redis1.下载和安装Redis2.新建工程&#xff0c;导入依赖3.添加配置4.先来几个基本的示例测试代码输出结果用redis客户端查看一下存储内容 5.封装redis工具类RedisKeyUtilRedisStringUtilRedisHashUtilRedisListUtilRedisSetUtilRedisZsetUtil备注 6.测试通用…

node.js 解析post请求 方法二

前提&#xff1a;以前面发的node.js解析post请求方法一为模板&#xff0c;具体见 http://t.csdnimg.cn/ABaIn 此文我们运用第二种方法&#xff1a;使用第三方模块formidable对post请求进行解析。 1》代码难点 *** 在Node.js中使用formidable模块来解析POST请求主要涉及到处理…

IO流基础

IO流介绍 1.什么是IO流&#xff1f; 流是一种抽象概念&#xff0c;它代表了数据的无结构化传递。按照流的方式进行输入输出&#xff0c;数据被当成无结构的字节序列或字符序列。从流中取得数据的操作称为提取操作&#xff0c;而向流中添加数据的操作称为插入操作。用来进行输入…

JVM (Micrometer)监控SpringBoot(AWS EKS版)

问题 怎样使用JVM (Micrometer)面板&#xff0c;监控Spring&#xff1f;这里不涉及Prometheus和Grafana&#xff0c;重点介绍与Micrometer与Springboot&#xff0c;k8s怎样集成。 pom.xml 引入依赖&#xff0c;如下&#xff1a; <properties><micrometer.version&…