liunx网络套接字 | 实现基于tcp协议的echo服务

        前言:本节讲述linux网络下的tcp协议套接字相关内容。博主以实现tcp服务为主线,穿插一些小知识点。以先粗略实现,后精雕细琢为思路讲述实现服务的过程。下面开始我们的学习吧。

        ps:本节内容建议了解网络端口号的友友们观看哦。

目录

实现内容

线程池版本整体代码

准备文件

makefile

tcpserver.hpp

main.cc

tcpclient

version1运行结果

version2版本

version3版本

version4版本


实现内容

        本篇内容将要实现一个服务端, 一个客户端。 然后客户端用来链接服务端, 向服务端发送消息, 然后服务端能够接收到消息并将消息返回给客户端。 

        实现的版本有四个:

  •         version1:实现单执行流的客户服务echo服务, 就是服务端只为一个服务端进行服务。 
  •         version2:在version1的版本上, 添加进程, 实现多进程的客户服务echo服务, 就是服务端为多个客户端进行服务, 但是因为是多进程,所以开销大。
  •         version3:改进version2版本, 将多进程改成多线程。实现多线程的echo服务。 但是当用户很多的时候, 线程量太大, 无法控制。
  •         version4:终极版本, 改进version3, 以线程池为基础, 实现可控的多线程echo服务。 控制线程个数, 既保证了并发性, 又防止了用户太多,线程爆满的问题。

        博主先实现version1, 然后在version1的基础上进行改版。下面开始实现: 

线程池版本整体代码

tcpserver


#pragma once
#include "Log.hpp"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "ThreadPool.h"
#include "Task.h"
#include <sys/wait.h>
#include <unistd.h>
using namespace std;const int defaultfd = -1;
const int defaultport = 8080;
const string defaultip = "0.0.0.0";
const int backlog = 10; // 直接用,一般不要设置的太大。class TcpServer;  //声明class ThreadData
{
public:ThreadData(int fd, string ip, uint16_t port, TcpServer* const t): sockfd_(fd), clientip_(ip), clientport_(port), t_(t){}public:int sockfd_;string clientip_;uint16_t clientport_;
public:TcpServer* const t_;
};Log lg;enum
{SockError = 2,BindError,ListenError
};class TcpServer
{
public:TcpServer(int port = defaultport, string ip = defaultip, int sockfd = defaultfd): listensockfd_(sockfd), ip_(ip), port_(port){}void InitServer(){listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);if (listensockfd_ < 0){lg(Fatal, "create socket, errno: %d, strerror: %s", errno, strerror(errno));exit(SockError);}//lg(Info, "create socket success, sockfd: %d", listensockfd_);struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port_);inet_aton(ip_.c_str(), &(local.sin_addr)); // 主机序列转网络学列。 inet_aton是一个线程安全的函数。// 绑定if (bind(listensockfd_, (sockaddr *)&local, sizeof(local)) < 0){lg(Fatal, "bind error, errno: %d, strerror: %s", errno, strerror(errno));exit(BindError);}// tcp面向字节流, 是被动的, 所以要将对应的socket设置为监听状态。if (listen(listensockfd_, backlog) < 0) // backlock表示的是底层全连接队列的长度。 这个参数对意思, 不做解释。{lg(Fatal, "Listen error, errno: %d, strerror: %s", errno, strerror(errno));exit(ListenError);}lg(Info, "Listen has success");}void Start(){ThreadPool<Task>::GetInstance()->Start();lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);//version--4线程池版本Task task_(sockfd, clientip, clientport);ThreadPool<Task>::GetInstance()->Push(task_);}}~TcpServer(){}private:int listensockfd_; // 监听套接字, 只用来升起服务器, 接收链接uint16_t port_;string ip_;
};

 main.cc

#include"tcpserver.hpp"
#include<memory>int main(int argc, char* argv[])
{if (argc != 2){cout << "has return" << endl;return 1;}//uint16_t port = stoi(argv[1]);unique_ptr<TcpServer> tcpsvr(new TcpServer(port));tcpsvr->InitServer();tcpsvr->Start();return 0;
}

 tcpclient

#include<iostream>
using namespace std;
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<string.h>
#include<netinet/in.h>int main(int argc, char* argv[])
{    //处理argc, argv[]if (argc != 3){cout << "has return " << endl;return 1;}//uint16_t serverport = stoi(argv[2]);string serverip = argv[1];//创建addr结构体, 设置端口号ip地址int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){cout << "socket error" << endl;return 1;}sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));//1、客户端要绑定端口号, 但是不需要显示的绑定, 而是系统进行随机端口的绑定。 int n = connect(sockfd,(sockaddr*)&server, sizeof(server));if (n < 0) {cerr << "connect error..." << endl;return 2;}//2、发送信息, 接收信息。string message;while (true){cout << "Please Enter# ";getline(cin, message);//write(sockfd, message.c_str(), message.size());char inbuffer[4096];int n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0){inbuffer[n] = 0;cout << inbuffer << endl;}}return 0;
}

ThreadPool


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{pthread_t tid_;string name_;   
};template<class T>
class ThreadPool
{static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)private://加锁解锁void Lock(){pthread_mutex_lock(&mutex_);}void Unlock(){pthread_mutex_unlock(&mutex_);}//唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒void Wakeup(){pthread_cond_signal(&cond_);}void ThreadSleep(){pthread_cond_wait(&cond_, &mutex_);}bool IsQueueEmpty(){return tasks_.empty();}public://线程要执行的函数static void* HandlerTask(void* args){ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);while(true) {tp->Lock();while (tp->tasks_.empty()){tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。}//否则就去拿到tasks里面的任务T t = tp->tasks_.front();tp->tasks_.pop();//tp->Unlock();t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    }}//运行这个线程池, 也就是先将线程创建出来。 然后去运行线程void Start(){int num = threads_.size();for (int i = 0; i < num; i++){threads_[i].name_ = "thread-";threads_[i].name_ += to_string(i);   pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);}}//主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理void Push(const T& t){Lock();tasks_.push(t);Wakeup();Unlock();}//获取单例//改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么//要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。 //解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够//进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连//第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!static ThreadPool<T>* GetInstance(int num = defaultnum){if (tp_ == nullptr){pthread_mutex_lock(&tp_->lock_);if (tp_ == nullptr) {tp_ = new ThreadPool<T>(num);}pthread_mutex_unlock(&tp_->lock_);}return tp_;}private://构造函数私有化, 只有Getinstance里面才能创建。 ThreadPool(int num = defaultnum):threads_(num){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&cond_, nullptr);}//单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。 ThreadPool(const ThreadPool<T>& tp) = delete;const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}private:vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 static pthread_mutex_t lock_;         //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。static ThreadPool<T>* tp_;    //tp指针, 这就是唯一个单例对象。 };template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;

Task

#include <iostream>
using namespace std;
#include"Log.hpp"
#include <vector>
#include <string>
extern Log lg;// Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:// 构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算Task(int sockfd, string clientip, uint16_t clientport):sockfd_(sockfd), clientip_(clientip), clientport_(clientport){}~Task() {}// 执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。void run(){char buffer[4096];string temp = clientip_;while (true){ssize_t n = read(sockfd_, buffer, sizeof(buffer));if (n > 0){buffer[n] = 0;cout << "client say#: " << buffer << endl;string echo_string("tcpserver echo#  " + (string)buffer);write(sockfd_, echo_string.c_str(), echo_string.size());}else if (n == 0){lg(Info, "quit sockfd:%d ", sockfd_);exit(1);}else{lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd_, clientport_, temp.c_str());}//}}// 仿函数, 为了方便我们的对象能够像函数一样使用。void operator()(){run();}private:// 每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_int sockfd_;string clientip_;uint16_t clientport_;
};

 

准备文件

先准备好文件。 tcpserver.hpp实现服务的接口, main.cc运行服务端。 然后tcpclient.cc运行客户端。 

 

makefile

makefile不解释, 直接上代码(这里加上-g是为了后续方便调试, 也可以不加)

.PHONY:all
all: tcpserver.exe tcpclient.exetcpserver.exe:main.ccg++ -o $@ $^ -std=c++11 -lpthread -g
tcpclient.exe:tcpclient.ccg++ -o $@ $^ -std=c++11 -lpthread -g.PHONY:clean
clean:rm -rf tcpserver.exe tcpclient.exe

 tcpserver.hpp

先来看框架

#pragma once
#include "Log.hpp"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include"ThreadPool.h"
#include"Task.h"
#include <sys/wait.h>
#include <unistd.h>
using namespace std;const int defaultfd = -1;
const int defaultport = 8080;
const string defaultip = "0.0.0.0";
const int backlog = 10; // 直接用,一般不要设置的太大。Log lg;enum
{SockError = 2,BindError,ListenError
};class TcpServer
{
public://构造函数, 将服务端的端口号, ip地址传过来TcpServer(int port = defaultport, string ip = defaultip, int sockfd = defaultfd): listensockfd_(sockfd), ip_(ip), port_(port){}//初始化服务端,分两步:绑定和监听void InitServer(){//绑定//监听}//运行服务端void Start(){}//执行相应的服务void Service(int sockfd, const string &clientip, uint16_t clientport){} //析构函数~TcpServer(){}private:int listensockfd_; // 监听套接字, 只用来升起服务器, 接收链接uint16_t port_;string ip_;
};

然后初始化就是创建sockaddr结构体, 创建套接字, 然后绑定。 因为tcp是面向字节流的。 所以还要对网卡进行监听。下面是初始化服务。

//对服务端进行初始化void InitServer(){listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);if (listensockfd_ < 0){lg(Fatal, "create socket, errno: %d, strerror: %s", errno, strerror(errno));exit(SockError);}//lg(Info, "create socket success, sockfd: %d", listensockfd_);struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port_);inet_aton(ip_.c_str(), &(local.sin_addr)); // 主机序列转网络学列。 inet_aton是一个线程安全的函数。// 绑定if (bind(listensockfd_, (sockaddr *)&local, sizeof(local)) < 0){lg(Fatal, "bind error, errno: %d, strerror: %s", errno, strerror(errno));exit(BindError);}// tcp面向字节流, 是被动的, 所以要将对应的socket设置为监听状态。if (listen(listensockfd_, backlog) < 0) // backlock表示的是底层全连接队列的长度。 这个参数对意思, 不做解释。{lg(Fatal, "Listen error, errno: %d, strerror: %s", errno, strerror(errno));exit(ListenError);}lg(Info, "Listen has success");}

        然后是运行服务, 运行服务也是分两步: accept与客户端建立连接。 然后执行服务。

    void Start(){lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);//version--1 单进程版Service(sockfd, clientip, clientport);close(sockfd);}}

        然后执行的服务是echo服务, 就是先接受客户端发来的信息, 然后将信息加工一下发回去。

void Service(int sockfd, const string &clientip, uint16_t clientport){char buffer[4096];while (true){ssize_t n = read(sockfd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;cout << "client say#: " << buffer << endl;string message("tcpserver echo#  " + string(buffer));//write(sockfd, message.c_str(), message.size());}else if (n == 0){lg(Info, "quit sockfd:%d ", sockfd);exit(1);}else{lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip.c_str());}}}

         上面就是整个的代码。

        我们这里说一下accept这个代码 

        accept代码的第一个参数是sockfd, 就是网卡的文件fd。 然后第二个参数和第三个参数都是输出型参数。 能够将对方也就是客户端的sockaddr带出来。 

main.cc

        主函数就是接收到传来的端口号, 创建服务端然后初始化并运行起来。

#include"tcpserver.hpp"
#include<memory>int main(int argc, char* argv[])
{if (argc != 2){cout << "has return" << endl;return 1;}//uint16_t port = stoi(argv[1]);unique_ptr<TcpServer> tcpsvr(new TcpServer(port));tcpsvr->InitServer();tcpsvr->Start();return 0;
}   

 tcpclient

        先看客户端的框架, 就是先链接服务端。 然后就给服务端发信息,接收信息。 (接收到的这个信息是被服务端处理过的)

#include<iostream>
using namespace std;
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<string.h>
#include<netinet/in.h>int main(int argc, char* argv[])
{//处理argc, argv[]//先连接//再发数据接受数据return 0;
}

处理argc的时候, 因为我们的参数一定是三个。即, 一个程序名一个端口号, 一个IP地址。 所以如果argc不是等于3的话直接返回。 为3的话就将端口号以及ip地址保存一下。 其中ip地址是要连接到的服务端的ip地址, 端口号是要连接到服务端的端口号。

    //处理argc, argv[]if (argc != 3){cout << "has return " << endl;return 1;}//uint16_t serverport = stoi(argv[2]);string serverip = argv[1];

然后创建addr结构,连接到服务端。 

    //创建addr结构体, 设置端口号ip地址int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){cout << "socket error" << endl;return 1;}sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));//1、客户端要绑定端口号, 但是不需要显示的绑定, 而是系统进行随机端口的绑定。 int n = connect(sockfd,(sockaddr*)&server, sizeof(server));if (n < 0) {cerr << "connect error..." << endl;return 2;}

最后就是收发消息, 这里创建循环, 让我们可以执行多次服务, 多次收发消息。

    //2、发送信息, 接收信息。string message;while (true){cout << "Please Enter# ";getline(cin, message);//write(sockfd, message.c_str(), message.size());char inbuffer[4096];int n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0){inbuffer[n] = 0;cout << inbuffer << endl;}}

然后我们就能够运行了。下面是运行结果。

version1运行结果

可以看到已经可以收发消息。

version2版本

        version2版本是创建多进程。 但是, 要知道, 我们的父进程创建子进程后要进行等待。 否则会造成内存泄漏。 但是我们父进程等待, 父进程又不能向下运行代码了, 就不能继续创建子进程了。 所以, 为了解决这个问题。 我们就可以创建一个孤儿进程。 让子进程创建好孙子进程后直接退出, 将孙子进程托孤。 父进程等待子进程后就继续向下执行。 这样就能创建一批孙子进程并发访问!!!

void Start(){lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);// // version--1 单进程版// Service(sockfd, clientip, clientport);// close(sockfd);//version--2 多进程版pid_t id = fork();if (id == 0){close(listensockfd_);if (fork() > 0){exit(0);}Service(sockfd, clientip, clientport);close(sockfd);exit(0);//child}close(sockfd);pid_t rid = waitpid(id, nullptr, 0);}}

 version3版本

        第三版本是多线程版本, 多线程同样有主线程等待的问题,我们的主线程一旦要等待子线程, 那么就不能向后执行了。 所以为了能够并发, 就要对子线程进行分离。 

        下面是代码的改动:

        第一个改动地方就是创建一个ThreadData类:


class TcpServer;  //声明//创建这个类是为了能够将服务端对象传给线程去执行
class ThreadData
{
public:ThreadData(int fd, string ip, uint16_t port, TcpServer* const t): sockfd_(fd), clientip_(ip), clientport_(port), t_(t){}public:int sockfd_;string clientip_;uint16_t clientport_;
public:TcpServer* const t_;
};

        然后第二个改动的地方就是线程要执行的动作:

static void *pthrun(void *args){pthread_detach(pthread_self()); // 子线程直接分离// 一个进程打开的所有的文件描述符表, 其他进程能看到呢?ThreadData* td = static_cast<ThreadData*>(args);td->t_->Service(td->sockfd_, td->clientip_, td->clientport_);}

        第三个改动就是start函数里面执行服务的代码部分:

    void Start(){lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);// // version--1 单进程版// Service(sockfd, clientip, clientport);// close(sockfd);//version--2 多进程版// pid_t id = fork();// if (id == 0)// {//     close(listensockfd_);//     if (fork() > 0)//     {//         exit(0);//     }//     Service(sockfd, clientip, clientport);//     close(sockfd);//     exit(0);//     //child// }// close(sockfd);// pid_t rid = waitpid(id, nullptr, 0);// version--3多线程版本ThreadData* td = new ThreadData(sockfd, clientip, clientport, this);pthread_t tid;pthread_create(&tid, nullptr, pthrun, td);}}

运行结果

一开始一个server服务, 然后我们添加一个client就增加一个server服务, 增加一个client就增加一个服务。

version4版本

        version4线程池版本, version4的线程池版本就是我们创建好线程池。 然后创建一个任务类, 将我们要执行的服务作为任务类的一个结构。 这就要求我们的这个任务类里面必须有我们的ip, port, sockfd这样的。 字段。 并且我们还要引入几个头文件, 下面为代码:

首先要有两个新文件, 一个包含task类, 一个包含线程池类。

我们让task类里面包含服务的方法。就是run方法。


#include <iostream>
using namespace std;
#include"Log.hpp"
#include <vector>
#include <string>
extern Log lg;// Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:// 构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算Task(int sockfd, string clientip, uint16_t clientport):sockfd_(sockfd), clientip_(clientip), clientport_(clientport){}~Task() {}// 执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。void run(){char buffer[4096];string temp = clientip_;while (true){ssize_t n = read(sockfd_, buffer, sizeof(buffer));if (n > 0){buffer[n] = 0;cout << "client say#: " << buffer << endl;string echo_string("tcpserver echo#  " + (string)buffer);write(sockfd_, echo_string.c_str(), echo_string.size());}else if (n == 0){lg(Info, "quit sockfd:%d ", sockfd_);exit(1);}else{lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd_, clientport_, temp.c_str());}//}}// 仿函数, 为了方便我们的对象能够像函数一样使用。void operator()(){run();}private:// 每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_int sockfd_;string clientip_;uint16_t clientport_;
};

 然后线程池代友友们如果没写过可以自己实现一个,或者直接用博主的, 如下:


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{pthread_t tid_;string name_;   
};template<class T>
class ThreadPool
{static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)private://加锁解锁void Lock(){pthread_mutex_lock(&mutex_);}void Unlock(){pthread_mutex_unlock(&mutex_);}//唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒void Wakeup(){pthread_cond_signal(&cond_);}void ThreadSleep(){pthread_cond_wait(&cond_, &mutex_);}bool IsQueueEmpty(){return tasks_.empty();}public://线程要执行的函数static void* HandlerTask(void* args){ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);while(true) {tp->Lock();while (tp->tasks_.empty()){tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。}//否则就去拿到tasks里面的任务T t = tp->tasks_.front();tp->tasks_.pop();//tp->Unlock();t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    }}//运行这个线程池, 也就是先将线程创建出来。 然后去运行线程void Start(){int num = threads_.size();for (int i = 0; i < num; i++){threads_[i].name_ = "thread-";threads_[i].name_ += to_string(i);   pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);}}//主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理void Push(const T& t){Lock();tasks_.push(t);Wakeup();Unlock();}//获取单例//改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么//要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。 //解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够//进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连//第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!static ThreadPool<T>* GetInstance(int num = defaultnum){if (tp_ == nullptr){pthread_mutex_lock(&tp_->lock_);if (tp_ == nullptr) {tp_ = new ThreadPool<T>(num);}pthread_mutex_unlock(&tp_->lock_);}return tp_;}private://构造函数私有化, 只有Getinstance里面才能创建。 ThreadPool(int num = defaultnum):threads_(num){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&cond_, nullptr);}//单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。 ThreadPool(const ThreadPool<T>& tp) = delete;const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}private:vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 static pthread_mutex_t lock_;         //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。static ThreadPool<T>* tp_;    //tp指针, 这就是唯一个单例对象。 };template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;

        然后就是start运行函数

void Start(){ThreadPool<Task>::GetInstance()->Start();lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);// // version--1 单进程版// Service(sockfd, clientip, clientport);// close(sockfd);//version--2 多进程版// pid_t id = fork();// if (id == 0)// {//     close(listensockfd_);//     if (fork() > 0)//     {//         exit(0);//     }//     Service(sockfd, clientip, clientport);//     close(sockfd);//     exit(0);//     //child// }// close(sockfd);// pid_t rid = waitpid(id, nullptr, 0);// // version--3多线程版本// ThreadData* td = new ThreadData(sockfd, clientip, clientport, this);// pthread_t tid;// pthread_create(&tid, nullptr, pthrun, td);//version--4线程池版本Task task_(sockfd, clientip, clientport);ThreadPool<Task>::GetInstance()->Push(task_);}}

 运行结果:

我们可以看到, 只要已启动服务端的瞬间, 就能创建出6个线程(一个主线程, 五个分线程)

 

 ——————以上就是本节全部内容哦, 如果对友友们有帮助的话可以关注博主, 方便学习更多知识哦!!!   

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/58782.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uni-app 运行HarmonyOS项目

1. uni-app 运行HarmonyOS项目 文档中心 1.1. HarmonyOS端 1.1.1. 准备工作 &#xff08;1&#xff09;下载DevEco Studio开发工具。   &#xff08;2&#xff09;在 DevEco Studio 中打开任意一个项目&#xff08;也可以新建一个空项目&#xff09;。   &#xff08;3&…

WPF+MVVM案例实战(十三)- 封装一个自定义消息弹窗控件(上)

文章目录 1、案例效果2、功能实现1、创建文件2、资源文件获取3、枚举实现3、弹窗实现1、界面样式实现2、功能代码实现4、总结1、案例效果 2、功能实现 1、创建文件 打开 Wpf_Examples 项目,我们在用户控件类库中创建一个窗体文件 SMessageBox.xaml,同时创建枚举文件夹 Enum…

Unity BesHttp插件修改Error log的格式

实现代码 找到插件的 UnityOutput.cs 然后按照需求替换为下面的代码即可。如果提示 void ILogOutput.Flush() { } 接口不存在&#xff0c;删除这行代码即可。 using Best.HTTP.JSON.LitJson; using System; using System.Collections.Generic; using UnityEngine; using Syst…

Python热化学固态化学模型模拟

&#x1f3af;要点 使用热化学方式&#xff0c;从材料项目数据库获得热力学数据构建固态材料无机合成模拟模型。固态反应网络是热力学相空间的模型&#xff0c;使得能够纳入简单的反应动力学行为。反应坐标图可视为加权有向图&#xff0c;其表示出热力学相空间的密集连接模型。…

详解软件设计中分库分表的几种实现以及应用示例

详解软件设计中分库分表的几种实现以及应用示例https://mp.weixin.qq.com/s?__bizMzkzMTY0Mjc0Ng&mid2247485108&idx1&sn8b3b803c120c163092c70fa65fe5541e&chksmc266aaa1f51123b7af4d7a3113fe7c25daa938a04ced949fb71a8b7773e861fb93d907435386#rd

简缩极化模型+简缩极化求解用优化的方法,也需要保证方程和未知数个数

一个定标器可以得到一个复数矢量&#xff0c;4个实数方程 而模型中我们有&#xff0c;每个定标器有不同的A和φ (两个实数)和相同的R和δc &#xff08;4个复数&#xff09;

多浏览器同步测试工具的设计与实现

在做Web兼容测试时&#xff0c;测试人员往往需要在不同浏览器上重复执行相同的操作。 现有自动化录制手段&#xff0c;其实是后置的对比&#xff0c;效率与反馈都存在延迟&#xff0c;执行过程相对是黑盒的&#xff0c;过程中如果测试人员没细化到具体的校验点&#xff0c;即使…

Google Recaptcha V2 简单使用

最新的版本是v3&#xff0c;但是一直习惯用v2&#xff0c;就记录一下v2 的简单用法&#xff0c;以免将来忘记了 首先在这里注册你域名&#xff0c;如果是本机可以直接直接填 localhost 或127.0.0.1 https://www.google.com/recaptcha/about/ 这是列子 网站密钥&#xff1a;是…

【初识Linux】

寻不到花的折翼枯叶蝶&#xff0c;永远也看不见凋谢............................................................................. 文章目录 前言 一、【基本指令】 1、ls 2、pwd 3、cd 4. touch 5.mkdir 6.rmdir 7、rm 8.man 9.cp 10、mv 11、cat 12、tac 13、more 14、le…

操作系统知识要点

一.操作系统的特性 1.并发性 在多道程序环境下&#xff0c;并发性是指在一段时间内&#xff0c;宏观上有多个程序同时运行&#xff0c;但实际上在单CPU的运行环境&#xff0c;每一个时刻只有一个程序在执行。 因此&#xff0c;从微观上来说&#xff0c;各个程序是交替、轮流…

jenkins搭建及流水线配置

1.安装docker curl https://mirrors.aliyun.com/repo/Centos-7.repo >> CentOS-Base-Aliyun.repomv CentOS-Base-Aliyun.repo /etc/yum.repos.d/yum -y install yum-utils device-mapper-persistent-data lvm2yum-config-manager --add-repo http://mirrors.aliyun.com/…

混沌接口压测利器Fortio:从TCP/UDP到gRPC,全方位覆盖云原生应用性能测试

#作者&#xff1a; 西门吹雪 文章目录 Fortio 安装docker 安装:MacOS安装&#xff1a;linux安装:对于http负载生成最重要的标志:Fortio server 功能 TCPUDPgRPC负载测试gRPC 负载测试在k8s或者容器中使用fortio进行压测fortio 直接在docker中作为sidecar使用 Fortio是一个微服…

【笔记】数据结构与算法

参考链接&#xff1a;数据结构(全) 参考链接&#xff1a;数据结构与算法学习笔记 一些PPT的整理&#xff0c;思路很不错&#xff0c;主要是理解角度吧&#xff0c;自己干啃书的时候结合一下会比较不错 0.总论 1.数据 注&#xff1a;图是一种数据结构&#xff01;&#xff01;…

leetcode - 684. 冗余连接

684. 冗余连接 解决思路 大致上的思路就是将元素加入到 并查集 中&#xff0c;那么在遍历到边的时候先去判断的边的两个端点的 根节点 是否相等&#xff0c;如果相等&#xff0c;那么就代表此刻把这条边加上去就形成了环【可以这么理解&#xff0c;如果形成了环&#xff0c;那…

【力扣打卡系列】二叉树·灵活运用递归

坚持按题型打卡&刷&梳理力扣算法题系列&#xff0c;语言为go&#xff0c;Day16 相同的树 题目描述 解题思路 边界条件&#xff0c;其中一个节点为空&#xff0c;return 只有p和q均为空才返回true&#xff0c;因此可以简写为pqreturn&#xff0c;先判断节点值是否一样&…

创建一个基于SSM框架的药品商超管理系统

创建一个基于SSM&#xff08;Spring Spring MVC MyBatis&#xff09;框架的药品商超管理系统是一个涉及多个步骤的过程。以下是一个详细的开发指南&#xff0c;包括项目结构、数据库设计、配置文件、Mapper接口、Service层、Controller层和前端页面的示例。 1. 需求分析 明…

二十七、Python基础语法(面向对象-上)

面向对象&#xff08;oop&#xff09;&#xff1a;是一种程序设计的方法&#xff0c;面向对象关注的是结果。 一、类和对象 类和对象&#xff1a;是面向对象编程中非常重要的两个概念。 类&#xff1a;具有相同特征或者行为的一类事物&#xff08;指多个&#xff09;的统称&…

UML图之对象图详解

~犬&#x1f4f0;余~ “我欲贱而贵&#xff0c;愚而智&#xff0c;贫而富&#xff0c;可乎&#xff1f; 曰&#xff1a;其唯学乎” 零、什么是对象图 对象图&#xff08;Object Diagram&#xff09;是UML中一种重要的静态结构图&#xff0c;它用于表示在特定时间点上系统中的对…

同三维T80004EHH-4K30W 4K超清HDMI编解码器

1路HDMI输入1路3.5音频输入&#xff0c;1路HDMI输出1路3.5音频输出&#xff0c;1个USB1个TF卡槽&#xff0c;带RS485 支持4K30&#xff0c;支持2路解码2路转码&#xff0c;可选配WEBRTC/NDI协议&#xff0c;可选配硬件WEBRTC解码&#xff0c;编码、解码、转码、导播、录制多功…

设计一个灵活的RPC架构

RPC架构 RPC本质上就是一个远程调用&#xff0c;需要通过网络来传输数据。传输协议可以有多种选择&#xff0c;但考虑到可靠性&#xff0c;一般默认采用TCP协议。为了屏蔽网络传输的复杂性&#xff0c;需要封装一个单独的数据传输模块用来收发二进制数据&#xff0c;这个单独模…