Linux 网络编程之TCP套接字

前言

上一期我们对UDP套接字进行了介绍并实现了简单的UDP网络程序,本期我们来介绍TCP套接字,以及实现简单的TCP网络程序!

🎉目录

前言

1、TCP 套接字API详解

1.1 socket

1.2 bind

1.3 listen

1.4 accept

1.5 connect

2、字符串回响

2.1 核心功能分析

2.2 单进程版

服务端

客户端

2.3 多进程版

设置非阻塞等待

2.4 多线程版

2.5 线程池版

3、多线程的远程命令执行


1、TCP 套接字API详解

下面介绍的 socket API 函数都是在 <sys/socket.h> 头文件中

1.1 socket

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>int socket(int domain, int type, int protocol);

作用

socket 打开一个网络通信的端口,如果成功则会和 open 一样返回一个文件描述符UDP可以拿着文件描述符使用 read write 在网络上收发数据,而TCP是拿着给他获取连接的

注意:这里的文件描述符我们一般称为 监听套接字,具体原因见后面 accept

参数解析

domain : 指定通信类型,IPv4 就是 AF_INET

type TCP 协议是面向字节流的,所以指定为 SOCK_STREAM

procotol : 协议这里直接忽略,直接写 0 即可,会根据 type 自动推

返回值

成功,返回一个文件描述符;失败,返回 -1 

1.2 bind

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>int bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen);

作用

该函数用于将一个套接字(socket)和一个特殊的地址(ip+port)关联起来。该函数通常用于服务端(客户端OS自动绑定)。绑定之后,sockfd (这个用户网络通信的文件描述符) 监听 addr 所描述的 ip 端口号

参数解析

sockfd socket 的返回值,即文件描述符

addr :指向的结构体 struct sockaddr_in 的指针,存储的是需要绑定的 ipport信息

addrlen addr 指向结构体的大小

返回值

成功,0 被返回。失败,-1 被返回

关于结构体 struct sockaddr  和 struct sockaddr_in 以及 struct sockaddr_un 上一期UDP就已经详细介绍了,这里不在赘述了

1.3 listen

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>int listen(int sockfd, int backlog);

作用

声明 服务端的 sockfd (监听套接字)处于监听状态

参数解析

sockfd :通过 sockfd 套接字进行 监听

backlog:全连接队列的长度

返回值

成功,0 被返回。失败,-1 被返回

注意backlog 我们一般设置为 5、8、16、32等 表示 全连接队列 的最大长度,关于 全连接队列 我们将在后面的 TCP协议原理 的博客中专门介绍

1.4 accept

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

作用

TCP面向连接的,当客户端发起请求时,TCP服务端经过3次握手后,调用 accept 接受连接;如果服务端调用 accept 时,还没有客户端连接请求,就会阻塞等待直到客户端连接上来

参数解析

sockfd socket 的返回值,即文件描述符(监听套接字)

addr :指向结构体 struct sockaddr_in 的指针,存储的是客户端连接的 ipport信息

addrlen addr 指向结构体的大小的指针

返回值

成功返回一个文件描述符,表示连接的套接字,这个套接字用于该连接的读写操作

失败:返回 -1 ,错误码被设置

介绍到这里,我们也就明白了为什么上面我们把 socket 那里的套接字称为 监听套接字 ,因为 socket 的 fd 是专门处理连接请求的,而真正的通信用的是 accept 的这个套接字

举个栗子:

假设你今天去杭州西湖玩,到了中午逛到了鱼庄,门口有个人(张三)就会问你帅哥/美女吃饭吗,我们这里的鱼是刚刚从西湖中打上来的,你和你的朋友就进去了,你进去之后,这个门口招呼的张三并没有进来,而是朝里面喊了一声“来客人了,来个人”,此时李四出来专门招待你们,张三又去门口拉客了,过了一会张三又拉了一桌,又朝里面喊“来客人了,来个人”,此时王五去招待那一桌了,张三继续在门口....此时,每一桌的点菜等服务操作就和张三没关系了,而是和你们进店接待你们的那个人(李四、王五)有关  ... ...

上面的例子中,张三就是 socket 创建的套接字,而李四、王五就是 accpet 之后返回的套接字,专门用于服务每一个新链接的IO操作

1.5 connect

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);

作用

客户端需要调用 connect 向服务端发起连接

参数解析

sockfd :客户端创建的套接字

addr :指向的结构体 struct sockaddr_in 的指针,存储的是服务端ipport信息

addrlen addr 指向结构体的大小

返回值

成功,返回 0;失败,返回 -1

OK,有了上面的介绍,我们就可以写TCP的网络程序了!

2、字符串回响

我们还是和UDP一样先写一个的一个最简单的不加任何业务的TCP网络程序,目的是为了熟悉接口,然后在最基础的版本的基础上进行优化,然后加一些简单的业务处理!

2.1 核心功能分析

还是UDP那里的一样,客户端向服务端发送请求,服务端接收到请求之后,直接响应给用户,类似于我们指令部分的 echo 

OK,还是基于上述的先来搭建一个框架出来:

首先服务端是不能够拷贝的,我们可以在服务端的类里面把拷贝构造和赋值拷贝给禁用掉,但是这样做不够优雅,为了复用可以专门直接写一个类,让不能被拷贝的类继承即可

nocopy.hpp

#pragma once
class nocopy
{
public:nocopy() {}nocopy(const nocopy &) = delete;const nocopy &operator=(const nocopy &) = delete;~nocopy() {}
};

TcpServer.hpp

服务端这里,不需要具体的ip,需要指定一个端口号

#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include "nocopy.hpp"static const int g_sockfd = -1; // 缺省的监听套接字class TcpServer : public nocopy
{
public:TcpServer(uint16_t port): _listen_sockfd(g_sockfd), _port(port), _isrunning(false){}// 初始化服务器void InitServer(){}// 启动服务器void StartServer(){}// 任务处理void Service(int sockfd, Inet_Addr& addr){}~TcpServer(){if(_listen_sockfd > g_sockfd)::close(_listen_sockfd);}private:int _listen_sockfd; // 监听套接字uint16_t _port;     // 端口号bool _isrunning;    // 服务端的状态
};

TcpServerMain.cc

这里我们还是采用命令行参数,将端口号给传进来

#include "TcpServer.hpp"
#include <memory>// ./tcpserver local-port
int main(int argc, char* argv[])
{if(argc != 2){std::cout << "Usage: " << argv[0] << " local-port" << std::endl;exit(1);}uint16_t port = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);// C==14tsvr->InitServer();tsvr->StartServer();return 0;
}

TcpClient.hpp

#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>static const int g_sockfd = -1; // 缺省的监听套接字
class TcpClient
{
public:TcpClient(std::string ip, uint16_t port): _sockfd(g_sockfd),_server_ip(ip), _server_port(port){}void InitClient(){}void StartClient(){}~TcpClient(){if (_sockfd > g_sockfd)::close(_sockfd);}private:int _sockfd;                // 套接字文件描述符uint16_t _server_port;      // 服务端端口号std::string _server_ip;     // 服务端 ipstruct sockaddr_in _server; // 存储服务端信息的结构体
};

TcpClientMain.cc 

#include "TcpClient.hpp"
#include <memory>int main(int argc, char* argv[])
{if(argc != 3){std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;exit(1);}std::string ip = argv[1];uint16_t port = std::stoi(argv[2]);std::unique_ptr<TcpClient> tsvr = std::make_unique<TcpClient>(ip, port);// C==14tsvr->InitClient();tsvr->StartClient();return 0;
}

Makefile

为了后面快速的编译和清理,我么这里写一个makefile

.PHONY: all
all : tcpserver tcpclienttcpserver: TcpServerMain.ccg++ -o $@ $^ -std=c++14tcpclient: TcpClientMain.ccg++ -o $@ $^ -std=c++14.PHONY:clean
clean:rm -f  tcpserver tcpclient

2.2 单进程版

有了上面的简单的框架,我们下面的主要任务就是完善服务端和客户端的接口:

服务端

首先为了后续的信息打印,我们引入 日志Inet_Addr 因为这些都是之前写过的这里直接引入了

初始化服务端这里,前两步和UDP一样,但是由于TCP面向连接的传输协议,所以还得 设置服务器为 监听状态,监听客户端的连接请求

// 初始化服务器
void InitServer()
{// 1、创建监听socket_listen_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (_listen_sockfd < 0){LOG(FATAL, "sockfd create error\n");exit(SOCKET_ERROR);}LOG(INFO, "socket create success, sockfd is %d\n", _listen_sockfd);// 2、bind ip 和 portstruct sockaddr_in local;memset(&local, 0, sizeof(local));   // 清空local.sin_family = AF_INET;         // 通信类型 IPv4local.sin_addr.s_addr = INADDR_ANY; // 服务端绑定任意ip地址local.sin_port = htons(_port);      // 将主机序列转为网络序列// 绑定 套接字 和 localif (::bind(_listen_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0){LOG(FATAL, "bind error\n");exit(BIND_ERROR);}LOG(INFO, "bind success\n");// 3、监听if (::listen(_listen_sockfd, g_backlog) < 0){LOG(INFO, "listen success\n");exit(LISTEN_ERROR);}LOG(INFO, "listen success\n");
}

服务器启动是一个长服务。首先我们得通过 监听 套接字,获取客户端的链接并返回一个sockfd,然后可以拿着这个 sockfd 进行网络IO了,为了后面打印看起来方便,我们构建一个 Inet_Addr对象(获取主机序列),然后将 sockfd Inet_Addr对象给Service即可

// 启动服务器
void StartServer()
{_isrunning = true;// 长服务while (true){// 3、接收链接struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = ::accept(_listen_sockfd, (struct sockaddr *)&peer, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");}LOG(INFO, "accept success\n");// 业务处理Inet_Addr addr(peer);Service(sockfd, addr);// 业务处理函数}_isrunning = false;
}

Service 就是进行收发数据和业务处理的地方,这里的业务处理很简单,收到客户端的消息,然后返回给用户即可

// 任务处理
void Service(int sockfd, Inet_Addr &addr)
{char buffer[1024];while (true){// 接收消息ssize_t n = ::read(sockfd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;LOG(DEBUG, "read success\n");// 业务处理std::string message = "[" + addr.AddrStr() + "]";message += buffer;std::cout << message << std::endl;// 响应给用户n = ::write(sockfd, message.c_str(), message.size());if (n < 0){LOG(FATAL, "write error\n");break;}LOG(INFO, "write success\n");}else if (n == 0){LOG(INFO, "read the end of file\n");break;}else{LOG(INFO, "read error\n");break;}}::close(sockfd);
}

客户端

初始化客户端很简单,前两步还是和 UDP 的一样,TCP面向连接所以得向服务端发送链接请求!但是注意的时,客户端不一定一次就连接成功,所以在客户端这里,我们需要设置重连策略!

void InitClient()
{// 1、创建套接字_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){std::cerr << "sockfd create error" << std::endl;exit(1);}// 2、填充 server的ip和端口号memset(&_server, 0, sizeof(_server));                      // 清空/初始化_server.sin_family = AF_INET;                              // 通信类型Ipv4_server.sin_port = htons(_server_port);                    // 主机转网络序列inet_pton(AF_INET, _server_ip.c_str(), &_server.sin_addr); // 将点分十进制的ip地址转为整数// 3、获取连接int n = ::connect(_sockfd, (struct sockaddr *)&_server, sizeof(_server));if (n < 0){std::cerr << "connect error" << std::endl;exit(2);}
}

这里我们可以测试一下,断线重连的情况:

先启动客户端,服务端没有启动

过几秒之后在启动服务端就会连接成功

这种重连的机制是很常见的,甚至你都可能碰到过

客户端启动,还是和UDP的类似,显示向服务端请求,然后接收到服务端的响应

void StartClient()
{char buffer[1024];while (true){std::cout << "Please Enter# ";std::string message;std::getline(std::cin, message);// 向服务器发送请求ssize_t n = ::write(_sockfd, message.c_str(), message.size());if (n < 0){std::cerr << "write error" << std::endl;break;}// 接收响应n = ::read(_sockfd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << buffer << std::endl;}else if (n == 0){std::cerr << "read the end of file" << std::endl;break;}else{std::cerr << "read error" << std::endl;break;}}
}

OK,测试一下:

全部原码:tcp_echo_server_v1单进程版


2.3 多进程版

上面的代码单个客户端测试下似乎没有问题,那如果是多个客户端呢?

我们看到,当两个客户端时,第一个连接的 客户端可以通信,而第二个客户端是不能通信的!

而在我们把第一个客户端关闭掉之后,第二个客户端才会有获得链接,进行通信

这是为啥呢?我们仔细分析一下代码就知道:

我们服务端的启动服务是长服务,执行业务处理的Service函数也是长服务,服务端启动是单进程的,所以他一旦连接成功一个客户端之后就会去执行业务处理,不在接受客户端的连接了(也就是客户端的链接阻塞住了)!等一个客户端的业务处理完之后在进行继续链接,执行业务。。。。

对于一个服务器来说这固然是不被允许的,所以我们需要将他进行改造!我们可以把他改为多进程的,然后改成多线程、线程池的!

首先还是来改造成多进程版本的:当我们服务端接收到链接之后,创建一个子进程去执行业务处理就好了,不用自己亲自去执行了!

创建子进程使用 fork() 函数,它的返回值含义如下

ret == 0 表示创建子进程成功,接下来执行子进程的代码
ret > 0   表示创建子进程成功,接下来执行父进程的代码
ret < 0   表示创建子进程失败
子进程创建成功后,会继承父进程的文件描述符表,能轻而易举的获取客户端的 socket 套接字,从而进行网络通信

当然不止文件描述符表,得益于 写时拷贝 机制,子进程还会共享父进程的变量,当发生修改行为时,才会自己创建

// 启动服务器
void StartServer()
{_isrunning = true;// 长服务while (true){// 3、接收链接struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = ::accept(_listen_sockfd, (struct sockaddr *)&peer, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");}LOG(INFO, "accept success, sockfd is %d\n", sockfd);// 业务处理Inet_Addr addr(peer);pid_t id = fork();if (id == 0){// child::close(_listen_sockfd);Service(sockfd, addr);exit(0);}// father::close(sockfd);pid_t n = waitpid(id, nullptr, 0);// 等待子进程退出if (n < 0){LOG(WARNING, "wait failed\n");}LOG(WARNING, "wait success\n");}_isrunning = false;
}

此时虽然创建了子进程但是,父进程需要等待子进程退出,所以子进程不退出他依然在等待那里阻塞式的等待着!所以此时本质上还是一个单进程的代码,所以此时就需要设置父进程为非阻塞等了

设置非阻塞等待

非阻塞这里我们实现两种方式,1、采用孙子进程 2、采用信号

方式一:采用子孙进程(不太推荐)

众所周知,父进程只需要对子进程负责,至于孙子进程交给子进程负责,如果某个子进程的父进程终止运行了,那么它就会变成 孤儿进程,父进程会变成 1 号进程,也就是由操作系统领养,回收进程的重担也交给了操作系统

可以利用该特性,在子进程内部再创建一个子进程(孙子进程),然后子进程退出,父进程可以直接回收(不必阻塞),子进程(孙子进程)的父进程变成 1 号进程

这种实现方法比较巧妙,而且与我们后面的 守护进程 有关

注意: 使用这种方式时,父进程是需要等待子进程退出的

// 启动服务器
void StartServer()
{_isrunning = true;// 长服务while (true){// 3、接收链接struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = ::accept(_listen_sockfd, (struct sockaddr *)&peer, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");}LOG(INFO, "accept success, sockfd is %d\n", sockfd);// 业务处理Inet_Addr addr(peer);pid_t id = fork(); // 创建子进程if (id == 0){// child::close(_listen_sockfd);if(fork() > 0)exit(0);// 子进程退出,孙子进程执行业务Service(sockfd, addr);exit(0);}// father::close(sockfd);pid_t n = waitpid(id, nullptr, 0); // 等待子进程if (n < 0){LOG(WARNING, "wait failed\n");}LOG(WARNING, "wait %d success\n", n);}_isrunning = false;
}

此时就支持多个客户端的通信了!

方法二:使用信号(推荐

我们以前在信号部分介绍过,子进程结束的时候是需要向父进程发送 17 号信号SIFCHLD 的,父进程收到该信号后需要检测并回收子进程,我们可以直接忽略该信号,这里的忽略是个特例,只是父进程不对其进行处理,转而由 操作系统 对其负责,自动清理资源并进行回收,不会产生 僵尸进程

直接在 StartServer() 服务器启动函数刚开始时,使用 signal() 函数设置 SIGCHLD 信号的执行动作为 忽略

忽略了该信号后,就不需要父进程等待子进程退出了(由操作系统承担)

// 启动服务器void StartServer(){signal(SIGCHLD, SIG_IGN);// 忽略子进程退出_isrunning = true;// 长服务while (true){// 3、接收链接struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = ::accept(_listen_sockfd, (struct sockaddr *)&peer, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");}LOG(INFO, "accept success, sockfd is %d\n", sockfd);// 业务处理Inet_Addr addr(peer);pid_t id = fork(); // 创建子进程if (id == 0){// child::close(_listen_sockfd);if(fork() > 0)exit(0);// 子进程退出,孙子进程执行业务Service(sockfd, addr);exit(0);}// // father::close(sockfd);// pid_t n = waitpid(id, nullptr, 0); // 等待子进程// if (n < 0)// {//     LOG(WARNING, "wait failed\n");// }// LOG(WARNING, "wait %d success\n", n);}_isrunning = false;}

此时多客户端通信也是没有问题的! 

细节问题:这里因为子进程是继承了父进程的文件描述符表的,所以子进程中的文件描述符有用于监听的,也有 通信 用的,为了避免文件描述符的增长,我们可以将父子进程中的不需要的文件描述符给关掉!当子进程创建后,父进程就不需要关心accept 的返回的fd了,所以父进程关掉它;同理子进程也不需要关心监听的fd也将他关掉!

全部源码:tcp_echo_server_v2多进程版


2.4 多线程版

上面的多进程虽然已经可以实现效果了,但是我们知道创建进程的代价还是蛮大的,这种情况一般可以采用线程来完成,所以接下来我们就把多进程换成多线程的

我们这里采用原生的线程库中的接口实现!也就是 pthread_create 它的参数有4个,第一个是线程的 tid,第二个线程的详细信息(忽略),第三个线程执行的函数,第四个执行函数的参数

这里最重要的是第三个和第四个:因为第三个的参数是 void* 返回值 void*

也就是说,我们线程是无法调到 Service 函数的(无this),这里就很和我们线程部分的一样,我们加一层,然线程去执行void*(void*)的函数,然后再其内部调用 Service 即可,但是如何传递 Service 的参数呢?很简单在创建一个类,里面存放 Service 的参数,然后把这个类的对象的地址给线程的执行函数的参数即可 

这里采用内部类:

// 内部类
class ThreadData
{
public:ThreadData(int sockfd, Inet_Addr &addr, TcpServer *self): _sockfd(sockfd), _addr(addr), _self(self){}public:int _sockfd;Inet_Addr _addr;TcpServer *_self;
};

线程执行的函数

这里因为在类里面,所以是static  为了避免类似于僵尸进程的那种情况,我们直接把线程给分离了

static void* Execute(void* args)
{pthread_detach(pthread_self());// 将自己给分离了,避免主线程等待,以及出现类似于僵尸的问题ThreadData* td = static_cast<ThreadData*>(args);td->_self->Service(td->_sockfd, td->_addr);delete td;return nullptr;
}

注意:这里线程的话不需要关闭 socket 了,因为这些资源线程间共享!

全部源码:tcp_echo_server_v3多线程版


2.5 线程池版

使用 原生线程库 过于单薄了,并且这种方式存在问题:连接都准备好了,才创建线程,如果创建线程所需要的资源较多,会拖慢服务器整体连接效率

为此可以改用之前实现的 线程池

线程池这里的话,我们可以直接把以前的那个线程池给拿过来

ThreadPool.hpp

#ifndef _M_T_P_
#define _M_T_P_#include "Thread.hpp"
#include "Log.hpp"
#include "BlockingQueue.hpp"
#include "LockGuard.hpp"
#include <pthread.h>
#include <vector>
#include <queue>
#include <iostream>
#include <unistd.h>using namespace ThreadModule;
using namespace LogModule;const static int g_default = 5;void test()
{while (true){std::cout << "thread is running..." << std::endl;sleep(1);}
}template <class T>
class ThreadPool
{
private:// 给任务队列加锁void LockQueue(){pthread_mutex_lock(&_mutex);}// 给任务队列解锁void UnLockQueue(){pthread_mutex_unlock(&_mutex);}// 在 _cond 条件下阻塞等待void Sleep(){pthread_cond_wait(&_cond, &_mutex);}// 唤醒一个休眠的线程void WakeUp(){pthread_cond_signal(&_cond);}// 唤醒所有休眠的线程void WakeUpAll(){pthread_cond_broadcast(&_cond);}// 判断任务队列是否为空bool IsEmpty(){// return _task_queue.empty();return _task_queue.IsEmpty();}// 处理任务 -> 消费者void HandlerTask(const std::string &name){while (true){LockQueue();// 任务队列为空while (IsEmpty() && _is_running){LOG(INFO, "%s sleep begin\n", name.c_str());_sleep_thread_num++;Sleep(); // 阻塞等待_sleep_thread_num--;LOG(INFO, "%s wake up\n", name.c_str());}// 如果任务队列为空 && 线程池的状态为 退出if (IsEmpty() && !_is_running){UnLockQueue();LOG(INFO, "%s quit...\n", name.c_str());break;}// 获取任务// T t = _task_queue.front();// _task_queue.pop();T t;_task_queue.Pop(&t);UnLockQueue();// 处理任务t(); // 注意这里的处理任务不应该放在临界区因为处理任务也费时间// std::cout << name << ": " << t.result() << std::endl;// LOG(DEBUG, "%s handler task: %s\n", name.c_str(), t.result().c_str());}}// 私有化构造ThreadPool(int thread_num = g_default): _thread_num(thread_num), _sleep_thread_num(0), _is_running(false){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}// 删除或禁用赋值拷贝和拷贝构造ThreadPool(const ThreadPool &tp) = delete;ThreadPool &operator=(const ThreadPool &tp) = delete;public:~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}// 创建获取单例对象的句柄静态函数 -> 懒汉式static ThreadPool *getInstance(){// 双重检查加锁if (_tp == nullptr){// 加锁 -> RAII风格LockGuard lock(&_static_mutex);if (_tp == nullptr){_tp = new ThreadPool<T>();_tp->Init();_tp->Start();LOG(INFO, "Create ThreadPool...\n");}else{LOG(INFO, "Get ThreadPool...\n");}}return _tp;}void Init(){func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);for (int i = 0; i < _thread_num; i++){std::string threadname = "thread_" + std::to_string(i + 1);_threads.emplace_back(threadname, func);LOG(INFO, "%s is init success!\n", threadname.c_str());}}void Start(){LockQueue();_is_running = true;UnLockQueue();for (auto &t : _threads){t.start();LOG(INFO, "%s is start...\n", t.get_name().c_str());}}void Stop(){LockQueue();LOG(INFO, "threadpool is stop...\n");_is_running = false;WakeUpAll();UnLockQueue();}// 向任务队列推送任务 -> 生产者void PushTask(T &task){LockQueue();// 当线程池是启动的时候才允许推送任务if (_is_running){_task_queue.Push(task);if (_sleep_thread_num > 0){WakeUp();}}UnLockQueue();}private:int _thread_num;              // 线程的数目std::vector<Thread> _threads; // 管理线程的容器// std::queue<T> _task_queue;    // 任务队列BlockingQueue<T> _task_queue; // 阻塞队列int _sleep_thread_num;        // 休眠线程的数目bool _is_running;             // 线程池的状态pthread_mutex_t _mutex;       // 互斥锁pthread_cond_t _cond;         // 条件变量static ThreadPool<T> *_tp;            // 单例模式static pthread_mutex_t _static_mutex; // 单例锁
};// 类外初始化
template <class T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;template <class T>
pthread_mutex_t ThreadPool<T>::_static_mutex = PTHREAD_MUTEX_INITIALIZER;#endif

这里用的是我们当时写的 阻塞队列,这里就不在一一的粘贴了,后面有源码的链接!

线程池这里很简单,只需要包装一个可执行的对象,然后放到线程池中即可!

看似程序已经很完善了,其实隐含着一个大问题:当前线程池中的线程,本质上是在回调一个 while(true) 死循环函数,当连接的客户端大于线程池中的最大线程数时,会导致所有线程始终处于满负载状态,直接影响就是连接成功后,无法再创建通信会话(倘若客户端不断开连接,线程池中的线程就无力处理其他客户端的会话)

说白了就是 线程池 比较适合用于处理任务,对于当前的场景来说,线程池 不适合建立持久通信会话 这里只是演示一下线程池的接入

全部源码:tcp_echo_server_v4线程池版


3、多线程的远程命令执行

这里我们在上面的多线程版本的基础上在加一个业务,实现本地输入适当的指令给服务器,服务器执行完成之后,将结果返回给用户!类似于 Xshell 的效果

为了降低耦合度,我们还是将执行指令(任务)的函数单独封装成一个类 Command.hpp

然后在 TcpServerMain.cc 中绑定一个可调用对象给 TcpServe.hpp 就OK了!

TcpServer中只需要接受链接就好,接收到链接之后创建一个线程,线程执行的函数内部去回调_server 的函数对象即可

所以修改后的TcpServer类如下:

TcpServer.hpp

#pragma once#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <pthread.h>
#include <functional>#include "Log.hpp"
#include "Com_ERR.hpp"
#include "Inet_Addr.hpp"const static int g_sockfd = -1;
const static int g_backlog = 8; // 连接队列的大小using namespace LogModule;using service_t = std::function<void(int, Inet_Addr)>;// 包装一个可调用的函数对象类型class TcpServer
{
private:static void *Execute(void *args){pthread_detach(pthread_self()); // 将自己给分离了,避免主线程等待,以及出现类似于僵尸的问题ThreadData *td = static_cast<ThreadData *>(args);td->_self->_service(td->_sockfd, td->_addr);// 线程回调任务函数::close(td->_sockfd);delete td;return nullptr;}public:TcpServer(uint16_t port, service_t service): _listensocket(g_sockfd), _port(port), _isrunning(false),_service(service){}void InitServer(){// 1、创建监听套接字_listensocket = ::socket(AF_INET, SOCK_STREAM, 0);if (_listensocket < 0){LOG(FATAL, "socket create error\n");exit(SOCKET_ERROR);}LOG(INFO, "socket create success, sockfd is %d\n", _listensocket);// 2、绑定主机的信息struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;         // IPV4local.sin_port = htons(_port);      // 设置端口local.sin_addr.s_addr = INADDR_ANY; // 任意 ipif (::bind(_listensocket, (struct sockaddr *)&local, sizeof(local))){LOG(FATAL, "bind error\n");exit(BIND_ERROR);}LOG(INFO, "bind success\n");// 3、设置监听int n = ::listen(_listensocket, g_backlog);if (n < 0){LOG(FATAL, "listen error");exit(LISTEN_ERROR);}LOG(INFO, "listen success");}// 内部类class ThreadData{public:ThreadData(int sockfd, Inet_Addr &addr, TcpServer *self): _sockfd(sockfd), _addr(addr), _self(self){}public:int _sockfd;Inet_Addr _addr;TcpServer *_self;};void Start(){_isrunning = true;while (_isrunning){// 4、获取链接struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = ::accept(_listensocket, (struct sockaddr *)&peer, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");}LOG(INFO, "accept success\n");// 处理业务Inet_Addr addr(peer);// version 2 多线程版pthread_t tid;ThreadData *td = new ThreadData(sockfd, addr, this);pthread_create(&tid, nullptr, Execute, td);}_isrunning = false;}~TcpServer(){if (_listensocket > g_sockfd){::close(_listensocket);}}private:int _listensocket;  // 监听套接字uint16_t _port;     // 端口号bool _isrunning;    // 服务端状态service_t _service; // 业务回调函数
};

所以下面的只要任务就是在 Command.hpp 的实现上面了

1、因为我们只能让用户执行适当的指令,所以我们得对执行的指令进行判断和存储,所以使用一个set集合存储,如果不限制用户执行的指令,他万一给你 rm -rf/* 咋办

2、可以提供一个判断是否是安全指令的函数,方便在 执行用户指令时检查

3、可以在构造时将合法的指令插入到set(内存级);也可以搞一个文件(持久化存储)在构造时加载然后到set,这里采用前者

#pragma once#include <iostream>
#include <string>
#include <cstdio>
#include <set>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include "Inet_Addr.hpp"
#include "Log.hpp"using namespace LogModule;class Command
{
private:// 判断当前的指令是否是安全的bool IsSafeCommand(const std::string& cmdstr){for(auto & cmd : _safe_command){if(strncmp(cmd.c_str(), cmdstr.c_str(), cmd.size())){return true;}}return false;}public:Command(){_safe_command.insert("ls");_safe_command.insert("touch"); // touch filename_safe_command.insert("pwd");_safe_command.insert("whoami");_safe_command.insert("which"); // which pwd}~Command(){}// 处理指令的函数void HandlerCommand(int sockfd, Inet_Addr addr){}private:std::set<std::string> _safe_command; // 安全指令集
};

剩下的主要任务就是实现处理指令函数了!

1、首先处理的第一步是先得接收到用户的指令,所以显示接受用户输入的指令

前面接受客户端的数据都是使用 read 来接受的,这里可以换一个函数 recv 

#include <sys/types.h>
#include <sys/socket.h>ssize_t recv(int sockfd, void *buf, size_t len, int flags);

参数解析

sockfdIO 的套接字

buf :存储接收到消息的缓冲区

len : 存储接收数据缓冲区的大小

flag :阻塞/非阻塞,一般置为 0 即可

返回值

ret > 0    表示就接收到的字节数

ret == 0  表示读取到了文件结尾

ret < 0    表示读取失败

同样发送消息,这里也不使用 write 而是使用 send 

#include <sys/types.h>
#include <sys/socket.h>ssize_t send(int sockfd, const void *buf, size_t len, int flags);

参数解析

sockfdIO 的套接字

buf :发送的内容缓冲区

len :发送的内容缓冲区的大小

flag :阻塞/非阻塞,一般置为 0 即可

返回值

成功,返回发送成功的字节数;失败,返回 -1 

注意:这样两个接口只适用于 TCP 套接字

所以 HandlerCommand 大致的框架如下:

void HandlerCommand(int sockfd, Inet_Addr addr)
{while (true){char comBuffer[1024];// 读取指令字符串ssize_t n = ::recv(sockfd, comBuffer, sizeof(comBuffer) - 1, 0);if (n > 0){comBuffer[n] = 0;LOG(INFO, "get command from client: %s, command is : %s\n", addr.AddrStr().c_str(), comBuffer);// 处理命令// ...// 返回给客户端//::send();}else if (n == 0){LOG(INFO, "client %s quit\n", addr.AddrStr().c_str());break;}else{LOG(FATAL, "%s read error\n", addr.AddrStr().c_str());break;}}
}

这里的重点就成了如何将用户的指令字符串在服务端执行,并拿到结果

这里将用户的字符指令,在服务端执行,我们单独设计一个函数Execute实现,这个函数会将结果以字符串的形式返回

所以,HandlerCommand  函数就是这样:

void HandlerCommand(int sockfd, Inet_Addr addr)
{while (true){char comBuffer[1024];// 读取指令字符串ssize_t n = ::recv(sockfd, comBuffer, sizeof(comBuffer) - 1, 0);if (n > 0){comBuffer[n] = 0;LOG(INFO, "get command from client: %s, command is : %s\n", addr.AddrStr().c_str(), comBuffer);std::string result = Execute(comBuffer);// 处理命令::send(sockfd, result.c_str(), result.size(), 0);// 返回给客户端}else if (n == 0){LOG(INFO, "client %s quit\n", addr.AddrStr().c_str());break;}else{LOG(FATAL, "%s read error\n", addr.AddrStr().c_str());break;}}
}

接下来的主要任务就是实现 Execute 函数了

1、首先我们拿到用户的指令后先得判断是否合法,可以用上面提供的IsSafeCommand判断

2、使用 poopen 函数 对合法的指令进行处理

3、读取poopen 处理的结果,并处理成一个字符串返回

这里,我们就得介绍一下 poopen  函数了

popen 和 pclose POSIX 标准中定义的函数,用于在程序中执行外部命令,并允许程序与这个外部命令进行输入输出(IO)操作。这两个函数在 <stdio.h> 头文件中声明。

#include <stdio.h>FILE *popen(const char *command, const char *type);int pclose(FILE *stream);

作用

popen 函数用于创建一个管道,并运行一个指定的命令,这个命令在子进程中执行。通过管道,父进程可以与子进程进行通信。

pclose 函数用于关闭由 popen 打开的文件流,并等待子进程结束

参数解析

command:要执行的命令,通常是一个 shell 命令字符串

type:决定管道的方向,可以是 r(从命令读取输出)或 w(向命令写入输入)

stream:由 popen 返回的文件流指针。

返回值 

popen

成功,返回值是一个 FILE * 指针,指向一个文件流,这个文件流可以用来读取或写入数据。

如果失败,返回 NULL

pclose

成功,返回值是子进程的退出状态。如果失败,返回 -1

所以,我们只需要将很安全的指令给 popen 让他执行,最后使用 fgets 读取他的 fd 即可,并将它读取到的结果拼接成一个字符串,最后返回即可! 

std::string Execute(const std::string &cmdstr)
{if(!IsSafeCommand(cmdstr)){return "unsafe";}std::string result;FILE *fp = popen(cmdstr.c_str(), "r");if (fp){char line[1024];while (fgets(line, sizeof(line), fp)){result += line;}return result.empty() ? "success" : result;}pclose(fp);return "exexute error";
}

Command.hpp的全部源码如下

#pragma once#include <iostream>
#include <string>
#include <cstdio>
#include <set>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include "Inet_Addr.hpp"
#include "Log.hpp"using namespace LogModule;class Command
{
private:std::string Execute(const std::string &cmdstr){if(!IsSafeCommand(cmdstr)){return "unsafe";}std::string result;FILE *fp = popen(cmdstr.c_str(), "r");if (fp){char line[1024];while (fgets(line, sizeof(line), fp)){result += line;}return result.empty() ? "success" : result;}pclose(fp);return "exexute error";}bool IsSafeCommand(const std::string& cmdstr){for(auto & cmd : _safe_command){if(strncmp(cmd.c_str(), cmdstr.c_str(), cmd.size())){return true;}}return false;}public:Command(){_safe_command.insert("ls");_safe_command.insert("touch"); // touch filename_safe_command.insert("pwd");_safe_command.insert("whoami");_safe_command.insert("which"); // which pwd}~Command(){}void HandlerCommand(int sockfd, Inet_Addr addr){while (true){char comBuffer[1024];// 读取指令字符串ssize_t n = ::recv(sockfd, comBuffer, sizeof(comBuffer) - 1, 0);if (n > 0){comBuffer[n] = 0;LOG(INFO, "get command from client: %s, command is : %s\n", addr.AddrStr().c_str(), comBuffer);std::string result = Execute(comBuffer);// 处理命令::send(sockfd, result.c_str(), result.size(), 0);// 返回给客户端}else if (n == 0){LOG(INFO, "client %s quit\n", addr.AddrStr().c_str());break;}else{LOG(FATAL, "%s read error\n", addr.AddrStr().c_str());break;}}}private:std::set<std::string> _safe_command; // 安全指令
};

OK,接下来我们只需要在 TcpServerMain.cc 中将 HandlerCommand 函数包装成一个可调用对象,给 TcpServer 即可

#include "TcpServer.hpp"
#include "Command.hpp"
#include <memory>// ./tcpserver local-port
int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage: " << argv[0] << " local-port" << std::endl;exit(1);}uint16_t port = std::stoi(argv[1]);// 包装一个可调用对象,给服务端Command cmd;service_t service = std::bind(&Command::HandlerCommand, &cmd, std::placeholders::_1, std::placeholders::_2);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port, service);tsvr->InitServer();tsvr->Start();return 0;
}

OK,测试一下:

OK,这就是我们的预期效果!

全部源码:tcp_command多线程版本


OK,本期内容就介绍到这里,我是 cp 我们下期再见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/61789.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI/ML 基础知识与常用术语全解析

目录 一.引言 二.AI/ML 基础知识 1.人工智能&#xff08;Artificial Intelligence&#xff0c;AI&#xff09; (1).定义 (2).发展历程 (3).应用领域 2.机器学习&#xff08;Machine Learning&#xff0c;ML&#xff09; (1).定义 (2).学习方式 ①.监督学习 ②.无监督…

计算机网络常见面试题总结(上)

计算机网络基础 网络分层模型 OSI 七层模型是什么&#xff1f;每一层的作用是什么&#xff1f; OSI 七层模型 是国际标准化组织提出的一个网络分层模型&#xff0c;其大体结构以及每一层提供的功能如下图所示&#xff1a; 每一层都专注做一件事情&#xff0c;并且每一层都需…

蓝桥杯准备训练(lesson1,c++方向)

前言 报名参加了蓝桥杯&#xff08;c&#xff09;方向的宝子们&#xff0c;今天我将与大家一起努力参赛&#xff0c;后序会与大家分享我的学习情况&#xff0c;我将从最基础的内容开始学习&#xff0c;带大家打好基础&#xff0c;在每节课后都会有练习题&#xff0c;刚开始的练…

Unity类银河战士恶魔城学习总结(P156 Audio Settings音频设置)

【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili 教程源地址&#xff1a;https://www.udemy.com/course/2d-rpg-alexdev/ 本章节实现了音频的大小设置与保存加载 音频管理器 UI_VolumeSlider.cs 定义了 UI_VolumeSlider 类&#xff0c;用于处理与音频设置相关的…

如何为 ext2/ext3/ext4 文件系统的 /dev/centos/root 增加 800G 空间

如何为 ext2/ext3/ext4 文件系统的 /dev/centos/root 增加 800G 空间 一、引言二、检查当前磁盘和分区状态1. 使用 `df` 命令检查磁盘使用情况2. 使用 `lsblk` 命令查看分区结构3. 使用 `fdisk` 或 `parted` 命令查看详细的分区信息三、扩展逻辑卷(如果使用 LVM)1. 检查 LVM …

java调用ai模型:使用国产通义千问完成基于知识库的问答

整体介绍&#xff1a; 基于RAG&#xff08;Retrieval-Augmented Generation&#xff09;技术&#xff0c;可以实现一个高效的Java智能问答客服机器人。核心思路是将预先准备的问答QA文档&#xff08;例如Word格式文件&#xff09;导入系统&#xff0c;通过数据清洗、向量化处理…

【C++boost::asio网络编程】有关异步Server样例以及伪闭包延长连接生命周期方法的笔记

异步Server 客户端源码Session类start函数handle_readhandle_write Server类构造函数start_accepthandle_accept 可能会造成的隐患利用伪闭包延长连接的生命周期 客户端源码 #include <iostream> #include <boost/asio.hpp> #include <string> int main() {…

力扣hot100道【贪心算法后续解题方法心得】(三)

力扣hot100道【贪心算法后续解题方法心得】 十四、贪心算法关键解题思路1、买卖股票的最佳时机2、跳跃游戏3、跳跃游戏 | |4、划分字母区间 十五、动态规划什么是动态规划&#xff1f;关键解题思路和步骤1、打家劫舍2、01背包问题3、完全平方式4、零钱兑换5、单词拆分6、最长递…

【linux】(23)对象存储服务-MinIo

MinIO 是一个高性能的对象存储服务&#xff0c;兼容 Amazon S3 API。 Docker安装MinIo 前提条件 确保您的系统已经安装了 Docker。如果还没有安装 Docker&#xff0c;可以参考 Docker 官方文档进行安装。 1. 拉取 MinIO Docker 镜像 首先&#xff0c;从 Docker Hub 拉取 Mi…

MySQL有哪些日志?

MySQL主要有三种日志&#xff1a;undo log、redo log、binlog。前两种是InnoDB特有的&#xff0c;binlog是MySQL的Server层中的。 Buffer Pool buffer pool是MySQL的缓冲池&#xff0c;里面存储了数据页、索引页、undo页等&#xff08;与数据库不一致的即为脏页&#xff09;。…

机器学习周志华学习笔记-第13章<半监督学习>

机器学习周志华学习笔记-第13章&#xff1c;半监督学习&#xff1e; 卷王&#xff0c;请看目录 13半监督学习13.1 生成式方法13.2 半监督SVM13.3 基于分歧的方法13.4 半监督聚类 13半监督学习 前面我们一直围绕的都是监督学习与无监督学习&#xff0c;监督学习指的是训练样本包…

SpringCloud框架学习(第六部分:Sentinel实现熔断与限流)

目录 十四、SpringCloud Alibaba Sentinel实现熔断与限流 1.简介 2.作用 3.下载安装 4.微服务 8401 整合 Sentinel 入门案例 5.流控规则 &#xff08;1&#xff09;基本介绍 &#xff08;2&#xff09;流控模式 Ⅰ. 直接 Ⅱ. 关联 Ⅲ. 链路 &#xff08;3&#xff0…

【Java基础面试题009】Java的I/O流是什么?

相关知识补充&#xff1a;黑马-字符集、IO流&#xff08;一&#xff09;.pdf Autism_Btkrsr/Blog_md_to_pdf - 码云 - 开源中国 (gitee.com) 黑马-IO流&#xff08;二&#xff09;.pdf Autism_Btkrsr/Blog_md_to_pdf - 码云 - 开源中国 (gitee.com) 回答重点 Java的I/O&…

第六届国际科技创新学术交流会暨管理科学信息化与经济创新发展(MSIEID 2024)

重要信息 大会官网&#xff1a;msieid2024.iaecst.org &#xff08;点击了解大会&#xff0c;参会等内容&#xff09; 大会时间&#xff1a;2024年12月6-8日 大会地点&#xff1a;中国-广州 大会简介 随着全球化和信息化的不断深入&#xff0c;管理科学、信息化和经济发展…

python学opencv|读取视频(一)灰度视频制作和保存

【1】引言 上一次课学习了用opencv读取图像&#xff0c;掌握了三个函数&#xff1a;cv.imread()、cv.imshow()、cv.imwrite() 相关链接如下&#xff1a; python学opencv|读取图像-CSDN博客 这次课我们继续&#xff0c;来学习用opencv读取视频。 【2】学习资源 首先是官网…

题外话 (火影密令)

哥们&#xff01; 玩火影不&#xff01; 村里人全部评论&#xff01; 不评论的忍战李全保底&#xff01; 哥们&#xff01; 密令领了不&#xff01; “1219村里人集合”领了吗&#xff01; 100金币&#xff01; 哥们&#xff01; 我粉丝没人能上影&#xff01; 老舅说的…

人形机器人训练、机器臂远程操控、VR游戏交互、影视动画制作,一副手套全部解决!

广州虚拟动力基于自研技术推出了多节点mHand Pro动捕数据手套&#xff0c;其最大的特点就是功能集成与高精度捕捉&#xff0c;可以用于人形机器人训练、机器臂远程操控、VR游戏交互、影视动画制作等多种场景。 一、人形机器人训练 mHand Pro动捕数据手套双手共装配16个9轴惯性…

vue3+view-ui-plus+vite+less 实现自定义iview样式

首先是结论&#xff1a; "less": "^2.7.3", "less-loader": "^4.1.0", vite.config.js resolve: {alias: {// 设置路径~: path.resolve(__dirname, ./),// 设置别名: path.resolve(__dirname, ./src)},extensions: [.mjs, .js, .ts…

SpringMVC接收数据

一、访问路径设置: RequestMapping注解的作用就是将请求的URL地址和处理请求的方式(handler方法)关联起来&#xff0c;建立映射关系;SpringMVC接收到指定的请求&#xff0c;就会来找到在映射关系中对应的方法来处理这个请求 1.精准路径匹配: 在RequestMapping注解指定URL地址…

【微服务】Docker

一、Docker基础 1、依赖的兼容问题&#xff1a;Docker允许开发中将应用、依赖、函数库、配置一起打包&#xff0c;形成可移植镜像Docker应用运行在容器中&#xff0c;使用沙箱机制&#xff0c;相互隔离。 2、如何解决开发、测试、生产环境有差异的问题&#xff1a;Docker镜像…