个人主页:C++忠实粉丝
欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C++忠实粉丝 原创计算机网络socket编程(5)_TCP网络编程实现echo_server
收录于专栏【计算机网络】
本专栏旨在分享学习计算机网络的一点学习笔记,欢迎大家在评论区交流讨论💌
目录
功能介绍
InetAddr.hpp
LockGuard.hpp
Log.hpp
Thread.hpp
ThreadPool.hpp
TcpServer.hpp
TcpServerMain.cc
TcpClientMain.cc
效果展示
功能介绍
和上回 UDP 网络编程一样, 实现简单的 echo_server, 不过, 这里我们 TCP 网络编程使用了 多线程, 不过大体都差不多~
还有就是网络编程代码真的是又多又杂, 有的时候我自己都烦, 没办法网络部分就是这样的, 我最近会尽快更完这个 socket 编程, 提早进入概念部分, 一直编程感觉少了什么~ 还得跟概念结合起来看, 感兴趣的宝子们不要忘记了点赞关注哦! 我现在在网络部分真的待不了一点, 希望我能尽快挣脱网络, 更新数据库 MySQL 的东西吧!
InetAddr.hpp
这个类封装了 sockaddr_in 结构体,用于简化对 IP 地址和端口的处理。其核心功能是将网络字节序的 sockaddr_in 地址转换为易于操作的主机字节序的 IP 地址字符串和端口号,并提供相关的成员函数来获取这些信息。
#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>class InetAddr
{
private:void ToHost(const struct sockaddr_in &addr){_port = ntohs(addr.sin_port);// _ip = inet_ntoa(addr.sin_addr);char ip_buf[32];// inet_p to n// p: process// n: net// inet_pton(int af, const char *src, void *dst);// inet_pton(AF_INET, ip.c_str(), &addr.sin_addr.s_addr);::inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf));_ip = ip_buf;}public:InetAddr(const struct sockaddr_in &addr):_addr(addr){ToHost(addr);}InetAddr(){}bool operator == (const InetAddr &addr){return (this->_ip == addr._ip && this->_port == addr._port);}std::string Ip(){return _ip;}uint16_t Port(){return _port;}struct sockaddr_in Addr(){return _addr;}std::string AddrStr(){return _ip + ":" + std::to_string(_port);}~InetAddr(){}private:std::string _ip;uint16_t _port;struct sockaddr_in _addr;
};
私有成员变量
_ip:存储 IP 地址的字符串(如 "192.168.0.1")。
_port:存储端口号。
_addr:存储一个 sockaddr_in 结构体,用于保存 IP 地址和端口。
私有成员函数 ToHost
ToHost 函数的作用是将一个 sockaddr_in 地址结构转换为 InetAddr 类的成员 _ip 和 _port。
ntohs(addr.sin_port):将网络字节顺序的端口号(从 sockaddr_in 中获取)转换为主机字节顺序。网络字节顺序是大端模式,而主机字节顺序通常取决于平台。
inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf)):将 sockaddr_in 中的 IP 地址(以二进制形式存储)转换为点分十进制字符串表示(如 "192.168.0.1")。
这里 inet_ntop 和 ntohs 用于处理网络字节序和主机字节序的转换,确保 IP 地址和端口在不同环境下的正确性。
构造函数 InetAddr(const struct sockaddr_in &addr)
该构造函数接受一个 sockaddr_in 类型的参数 addr,并调用 ToHost 方法将其转换为 InetAddr 类内部的 _ip 和 _port。
_addr(addr):将 sockaddr_in 结构体存储在 _addr 中。
默认构造函数 InetAddr()
默认构造函数没有做任何事情。它用于创建一个空的 InetAddr 对象
运算符重载 ==
重载了 == 运算符,用于比较两个 InetAddr 对象是否相等。它通过比较 ip 和 port 字段来判断是否相同。
成员函数 Ip
返回当前 InetAddr 对象的 IP 地址。
成员函数 Port
返回当前 InetAddr 对象的端口号。
成员函数 Addr
返回存储的 sockaddr_in 结构体。sockaddr_in 包含了完整的 IP 地址和端口信息。
成员函数 AddrStr
返回一个格式化的字符串,表示 IP 地址和端口,格式为 "ip:port"(例如 "192.168.0.1:8080")。
LockGuard.hpp
#pragma once#include <pthread.h>class LockGuard
{
public:LockGuard(pthread_mutex_t *mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);}
private:pthread_mutex_t *_mutex;
};
构造函数 LockGuard(pthread_mutex_t *mutex)
构造函数接收一个 pthread_mutex_t* 类型的指针作为参数,并在构造时通过 pthread_mutex_lock 来锁定该互斥量。
mutex 参数是指向一个 pthread_mutex_t 类型的指针,这个互斥量将用来保护临界区。
_mutex(mutex) 是初始化成员变量 _mutex 的成员初始化列表,它将构造函数的参数 mutex 的值赋给 _mutex 成员变量。
pthread_mutex_lock(_mutex) 调用会尝试锁定互斥量 _mutex。如果互斥量已经被其他线程锁定,当前线程会被阻塞,直到该互斥量变为可用。
析构函数 ~LockGuard()
析构函数负责在 LockGuard 对象生命周期结束时自动解锁互斥量。
当 LockGuard 对象的作用域结束时,析构函数会自动被调用。
pthread_mutex_unlock(_mutex) 会释放锁,即解锁互斥量。这样可以确保即使在发生异常或提前返回的情况下,互斥量也能被正确解锁,从而避免死锁。
成员变量 _mutex
_mutex 是一个指向 pthread_mutex_t 类型的指针,它保存了传递给构造函数的互斥量地址。这个指针将用于在构造和析构中对互斥量进行锁定和解锁操作。
关键特点:
自动锁定:当 LockGuard 对象被创建时,构造函数自动锁定互斥量。
自动解锁:当 LockGuard 对象超出作用域时,析构函数自动解锁互斥量。
简化代码:使用 LockGuard 类可以避免手动调用 pthread_mutex_lock 和 pthread_mutex_unlock,并且保证解锁操作一定会发生。
Log.hpp
#pragma once#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include <pthread.h>
#include "LockGuard.hpp"namespace log_ns
{enum{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string LevelToString(int level){switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOWN";}}std::string GetCurrTime(){time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",curr_time->tm_year + 1900,curr_time->tm_mon + 1,curr_time->tm_mday,curr_time->tm_hour,curr_time->tm_min,curr_time->tm_sec);return buffer;}class logmessage{public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;};#define SCREEN_TYPE 1
#define FILE_TYPE 2const std::string glogfile = "./log.txt";pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;// log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );class Log{public:Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE){}void Enable(int type){_type = type;}void FlushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile, std::ios::app);if (!out.is_open())return;char logtxt[2048];snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());out.write(logtxt, strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){// 加过滤逻辑 --- TODOLockGuard lockguard(&glock);switch (_type){case SCREEN_TYPE:FlushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}}void logMessage(std::string filename, int filenumber, int level, const char *format, ...){logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber = filenumber;lg._curr_time = GetCurrTime();va_list ap;va_start(ap, format);char log_info[1024];vsnprintf(log_info, sizeof(log_info), format, ap);va_end(ap);lg._message_info = log_info;// 打印出来日志FlushLog(lg);}~Log(){}private:int _type;std::string _logfile;};Log lg;#define LOG(Level, Format, ...) \do \{ \lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \} while (0)
#define EnableScreen() \do \{ \lg.Enable(SCREEN_TYPE); \} while (0)
#define EnableFILE() \do \{ \lg.Enable(FILE_TYPE); \} while (0)
};
日志系统, 我们的老演员了, 这里就不再多介绍了~~
Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>namespace ThreadMoudle
{// 线程要执行的方法,后面我们随时调整// typedef void (*func_t)(ThreadData *td); // 函数指针类型// typedef std::function<void()> func_t;using func_t = std::function<void(const std::string&)>;class Thread{public:void Excute(){_isrunning = true;_func(_name);_isrunning = false;}public:Thread(const std::string &name, func_t func):_name(name), _func(func){}static void *ThreadRoutine(void *args) // 新线程都会执行该方法!{Thread *self = static_cast<Thread*>(args); // 获得了当前对象self->Excute();return nullptr;}bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);if(n != 0) return false;return true;}std::string Status(){if(_isrunning) return "running";else return "sleep";}void Stop(){if(_isrunning){::pthread_cancel(_tid);_isrunning = false;}}void Join(){::pthread_join(_tid, nullptr);}std::string Name(){return _name;}~Thread(){}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func; // 线程要执行的回调函数};
} // namespace ThreadModle
线程函数类型 func_t
using func_t = std::function<void(const std::string&)>;
这里使用了 std::function 来定义线程要执行的函数类型。func_t 是一个函数对象类型,它表示接受一个 std::string 类型参数并返回 void 的函数。使用 std::function 的好处是,它可以适配普通函数、lambda 表达式以及成员函数等,使得线程任务的定义更加灵活。
Thread 类
Thread 类是该代码的核心,封装了 POSIX 线程的管理操作,包括线程的创建、执行、停止、等待和状态查询。
成员变量
_name: 线程的名称,用于标识线程。
_tid: 线程标识符 (pthread_t 类型),用于标识线程。
_isrunning: 布尔变量,表示线程是否正在运行。
_func: 线程执行的任务(即回调函数),使用 func_t 类型存储。
构造函数
Thread 类的构造函数接收线程名称 (name) 和线程任务 (func) 作为参数,并初始化相关成员变量。_isrunning 被初始化为 false,表示线程在创建时默认处于非运行状态。
线程执行方法 Excute
Excute 方法是线程执行的主体部分,它首先将 _isrunning 设置为 true,然后执行通过构造函数传入的任务函数 _func。执行完后,将 _isrunning 设置为 false,表示线程已经结束执行。
线程例程 ThreadRoutine
ThreadRoutine 是一个静态方法,它会作为线程的入口函数。当创建线程时,系统会调用这个函数。
在 ThreadRoutine 中,首先通过 static_cast 将传入的 void* 类型的参数转换为 Thread* 类型,这样我们就可以访问到线程的成员变量和方法。
然后调用 self->Excute(),即执行线程实际的工作。
启动线程 Start
Start 方法通过 pthread_create 创建一个新的线程。pthread_create 会接受线程标识符 _tid、线程属性(这里是 nullptr,即默认属性)、线程入口函数(这里是 ThreadRoutine)以及线程传递的参数(这里是 this,即当前对象)。
如果线程创建成功,返回 true;否则返回 false。
查询线程状态 Status
Status 方法返回当前线程的状态。如果线程正在执行(_isrunning == true),返回 "running",否则返回 "sleep"。
停止线程 Stop
Stop 方法用于停止正在运行的线程。它调用 pthread_cancel 来请求终止指定线程 _tid。然后将 _isrunning 设置为 false,表示线程已停止。
等待线程完成 Join
Join 方法用于等待线程执行完毕。它通过 pthread_join 阻塞当前线程,直到指定线程 _tid 执行完毕。
获取线程名称 Name
Name 方法返回线程的名称。
ThreadPool.hpp
这段代码实现了一个 线程池 模式,提供了多线程处理任务的能力,能够管理多个线程执行任务,并且通过线程池单例模式(Singleton)来确保只会创建一个线程池实例。
#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"using namespace ThreadMoudle;
using namespace log_ns;static const int gdefaultnum = 10;void test()
{while (true){std::cout << "hello world" << std::endl;sleep(1);}
}template <typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void Wakeup(){pthread_cond_signal(&_cond);}void WakeupAll(){pthread_cond_broadcast(&_cond);}void Sleep(){pthread_cond_wait(&_cond, &_mutex);}bool IsEmpty(){return _task_queue.empty();}void HandlerTask(const std::string &name) // this{while (true){// 取任务LockQueue();while (IsEmpty() && _isrunning){_sleep_thread_num++;LOG(INFO, "%s thread sleep begin!\n", name.c_str());Sleep();LOG(INFO, "%s thread wakeup!\n", name.c_str());_sleep_thread_num--;}// 判定一种情况if (IsEmpty() && !_isrunning){UnlockQueue();LOG(INFO, "%s thread quit\n", name.c_str());break;}// 有任务T t = _task_queue.front();_task_queue.pop();UnlockQueue();// 处理任务t(); // 处理任务,此处不用/不能在临界区中处理// std::cout << name << ": " << t.result() << std::endl;// LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());}}void Init(){func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);for (int i = 0; i < _thread_num; i++){std::string threadname = "thread-" + std::to_string(i + 1);_threads.emplace_back(threadname, func);LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str());}}void Start(){_isrunning = true;for (auto &thread : _threads){LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());thread.Start();}}ThreadPool(int thread_num = gdefaultnum): _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool<T> &) = delete;void operator=(const ThreadPool<T> &) = delete;public:void Stop(){LockQueue();_isrunning = false;WakeupAll();UnlockQueue();LOG(INFO, "Thread Pool Stop Success!\n");}// 如果是多线程获取单例呢?static ThreadPool<T> *GetInstance(){if (_tp == nullptr){LockGuard lockguard(&_sig_mutex);if (_tp == nullptr){LOG(INFO, "create threadpool\n");// thread-1 thread-2 thread-3...._tp = new ThreadPool<T>();_tp->Init();_tp->Start();}else{LOG(INFO, "get threadpool\n");}}return _tp;}void Equeue(const T &in){LockQueue();if (_isrunning){_task_queue.push(in);if (_sleep_thread_num > 0)Wakeup();}UnlockQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _thread_num;std::vector<Thread> _threads;std::queue<T> _task_queue;bool _isrunning;int _sleep_thread_num;pthread_mutex_t _mutex;pthread_cond_t _cond;// 单例模式// volatile static ThreadPool<T> *_tp;static ThreadPool<T> *_tp;static pthread_mutex_t _sig_mutex;
};template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
类定义:ThreadPool<T>
ThreadPool 类是一个模板类,能够处理类型为 T 的任务。ThreadPool 负责管理一组线程,这些线程会从任务队列中取出任务并执行。
成员变量:
线程池大小:_thread_num 表示线程池中线程的数量。
线程队列:_threads 用来存储所有创建的线程。
任务队列:_task_queue 是一个 std::queue,存储待处理的任务。
运行状态:_isrunning 标志线程池是否正在运行。
空闲线程数:_sleep_thread_num 表示当前空闲的线程数量。
同步机制:
_mutex:用于保护任务队列的互斥锁。
_cond:用于线程同步的条件变量,确保线程池在没有任务时可以进入等待状态。
单例模式:
_tp:一个静态指针,用于存储线程池的唯一实例。
_sig_mutex:一个静态互斥锁,用于控制对 _tp 的访问,确保线程池的单例实现是线程安全的。
成员函数:
(1) 同步操作:
LockQueue() 和 UnlockQueue():这些是保护任务队列的互斥锁方法,确保在访问任务队列时线程是同步的,避免并发冲突。
Wakeup() 和 WakeupAll():分别是唤醒一个或所有线程的函数。当有任务加入时,如果某些线程在等待任务,这些函数可以用来通知线程继续工作。
Sleep():如果任务队列为空,线程会调用这个函数进入等待状态,直到有新的任务被加入到队列中。
IsEmpty():检查任务队列是否为空。
(2) 任务处理:
HandlerTask():这是线程执行的主要任务。每个线程会不断地从任务队列中获取任务并执行,直到线程池被停止。
线程首先会尝试从任务队列中取出任务。
如果队列为空且线程池仍然在运行,线程将会休眠,直到有新任务到来。
如果线程池已停止并且队列为空,线程将退出。
如果队列非空,线程会执行任务。
(3) 初始化和启动:
Init():为线程池中的每个线程创建一个 Thread 对象,并绑定任务处理函数 HandlerTask(),然后将线程添加到 _threads 向量中。
Start():启动所有线程。
(4) 停止:
Stop():停止线程池,首先设置 _isrunning = false,然后唤醒所有处于等待状态的线程。
(5) 单例实现:
GetInstance():这是一个线程安全的单例实现,使用双重检查锁定(Double-Checked Locking)来确保线程池实例 _tp 只会被创建一次。_sig_mutex 用于同步对 _tp 的访问。
(6) 任务队列:
Equeue():将任务 in 添加到任务队列中。任务加入后,如果有空闲线程,某些线程会被唤醒来处理这些任务。
(7) 析构函数:
pthread_mutex_destroy(&_mutex) 和 pthread_cond_destroy(&_cond) 用于销毁互斥锁和条件变量,释放相关资源。
线程池的工作流程:
初始化和启动线程池:
通过 ThreadPool<T>::GetInstance() 获取线程池的唯一实例(如果还没有创建)。
调用 ThreadPool::Start() 启动线程池中的所有线程,每个线程执行 HandlerTask() 函数。
任务处理:
任务通过 ThreadPool::Equeue() 被加入到任务队列 _task_queue 中。
线程在 HandlerTask() 中从队列中取任务并执行。
线程阻塞与唤醒:
如果队列为空,线程会调用 Sleep() 进入等待状态,直到有新任务被添加。
当任务被加入队列时,如果有空闲线程,它们会被唤醒执行任务。
停止线程池:
调用 ThreadPool::Stop() 停止线程池,设置 _isrunning = false,并唤醒所有等待中的线程。每个线程在执行完当前任务后退出。
单例模式的线程安全性分析:
ThreadPool<T>::GetInstance() 中使用了双重检查锁定(Double-Checked Locking)来实现线程安全的单例模式:
第一次检查 _tp == nullptr 是为了减少锁的竞争。
第二次检查 _tp == nullptr 是为了确保在锁定后 _tp 仍然没有被创建(避免其他线程已经创建了 ThreadPool 实例)。
LockGuard 用于确保在操作 _tp 时,访问是线程安全的。
TcpServer.hpp
这个 TcpServer 类是一个简单的 TCP 服务器实现,它能够接受客户端的连接,并处理客户端发送的消息。它使用线程池来处理每个客户端的连接,避免了多进程和单线程模型的缺点。
#pragma once
#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <pthread.h>
#include "Log.hpp"
#include "InetAddr.hpp"
#include "ThreadPool.hpp"using namespace log_ns;enum
{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERR
};const static int gport = 8888;
const static int gsock = -1;
const static int gblcklog = 8;using task_t = std::function<void()>;class TcpServer
{
public:TcpServer(uint16_t port = gport): _port(port),_listensockfd(gsock),_isrunning(false){}void InitServer(){// 1. 创建socket_listensockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (_listensockfd < 0){LOG(FATAL, "socket create error\n");exit(SOCKET_ERROR);}LOG(INFO, "socket create success, sockfd: %d\n", _listensockfd); // 3struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;// 2. bind sockfd 和 Socket addrif (::bind(_listensockfd, (struct sockaddr *)&local, sizeof(local)) < 0){LOG(FATAL, "bind error\n");exit(BIND_ERROR);}LOG(INFO, "bind success\n");// 3. 因为tcp是面向连接的,tcp需要未来不断地能够做到获取连接if (::listen(_listensockfd, gblcklog) < 0){LOG(FATAL, "listen error\n");exit(LISTEN_ERR);}LOG(INFO, "listen success\n");}class ThreadData{public:int _sockfd;TcpServer *_self;InetAddr _addr;public:ThreadData(int sockfd, TcpServer *self, const InetAddr &addr):_sockfd(sockfd), _self(self), _addr(addr){}};void Loop(){// signal(SIGCHLD, SIG_IGN);_isrunning = true;while (_isrunning){struct sockaddr_in client;socklen_t len = sizeof(client);// 4. 获取新连接int sockfd = ::accept(_listensockfd, (struct sockaddr *)&client, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");continue;}InetAddr addr(client);LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", addr.AddrStr().c_str(), sockfd);// version 0 --- 不靠谱版本// Service(sockfd, addr);// version 1 --- 多进程版本// pid_t id = fork();// if (id == 0)// {// // child// ::close(_listensockfd); // 建议!// if(fork() > 0) exit(0);// Service(sockfd, addr);// exit(0);// }// // father// ::close(sockfd);// int n = waitpid(id, nullptr, 0);// if (n > 0)// {// LOG(INFO, "wait child success.\n");// }// version 2 ---- 多线程版本 --- 不能关闭fd了,也不需要了// pthread_t tid;// ThreadData *td = new ThreadData(sockfd, this, addr);// pthread_create(&tid, nullptr, Execute, td); // 新线程进行分离// version 3 ---- 线程池版本 int sockfd, InetAddr addrtask_t t = std::bind(&TcpServer::Service, this, sockfd, addr);ThreadPool<task_t>::GetInstance()->Equeue(t);}_isrunning = false;}static void *Execute(void *args){pthread_detach(pthread_self());ThreadData *td = static_cast<ThreadData *>(args);td->_self->Service(td->_sockfd, td->_addr);delete td;return nullptr;}void Service(int sockfd, InetAddr addr){// 长服务while (true){char inbuffer[1024]; // 当做字符串ssize_t n = ::read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0){inbuffer[n] = 0;LOG(INFO, "get message from client %s, message: %s\n", addr.AddrStr().c_str(), inbuffer);std::string echo_string = "[server echo] #";echo_string += inbuffer;write(sockfd, echo_string.c_str(), echo_string.size());}else if (n == 0){LOG(INFO, "client %s quit\n", addr.AddrStr().c_str());break;}else{LOG(ERROR, "read error: %s\n", addr.AddrStr().c_str());break;}}::close(sockfd);}~TcpServer() {}private:uint16_t _port;int _listensockfd;bool _isrunning;
};
1. 类结构和成员变量
成员变量:
_port:服务器监听的端口号,默认为 8888。
_listensockfd:服务器的监听套接字描述符。
_isrunning:标志服务器是否在运行,控制服务器的生命周期。
枚举常量:
SOCKET_ERROR:表示创建套接字失败时的错误码。
BIND_ERROR:表示绑定地址失败时的错误码。
LISTEN_ERR:表示监听失败时的错误码。
常量:
gport:默认的监听端口号。
gsock:默认的套接字标识符,表示未初始化的套接字。
gblcklog:用于 listen() 调用的 backlog 参数,指定操作系统允许的最大连接数。
task_t:使用 std::function<void()> 定义的任务类型,代表线程池中将要执行的任务。这里用来封装客户端连接的处理工作。
2. TcpServer 类的主要函数
TcpServer::InitServer()
该函数用于初始化服务器,完成以下工作:
创建套接字:
使用 ::socket() 创建一个 TCP 套接字。
如果创建失败,输出错误日志并退出。
绑定地址:
使用 ::bind() 将套接字与指定的地址和端口绑定。
如果绑定失败,输出错误日志并退出。
监听连接:
使用 ::listen() 将套接字设为监听状态,等待客户端连接。
如果监听失败,输出错误日志并退出。
TcpServer::Loop()
该函数是服务器的主循环,它负责接受客户端的连接并将连接交给线程池处理:
在循环中,调用 ::accept() 接受来自客户端的连接请求。
如果连接成功,创建一个 InetAddr 对象以保存客户端的 IP 地址和端口信息。
将处理任务(客户端连接的处理)封装为一个 task_t,然后将任务提交给线程池。
TcpServer::Service()
这是处理客户端请求的核心函数:
该函数会从客户端套接字中读取数据(通过 ::read())。
如果读取成功,则将客户端发送的消息打印出来,并以 "server echo" 开头将数据返回给客户端。
如果读取到的数据为空(客户端关闭连接),则输出日志并关闭套接字。
如果发生读取错误,则输出错误日志并关闭套接字。
TcpServer::Execute()
这是一个静态函数,用于在线程池中执行处理任务。该函数被线程池中的线程调用,处理客户端请求:
它首先将线程标记为分离状态,以便线程结束后自动释放资源。
然后它调用 Service() 来处理客户端请求。
处理完成后,销毁 ThreadData 对象。
3. 线程池的使用
服务器使用线程池来处理每个客户端的连接。每当有新连接时,创建一个 task_t(封装了 TcpServer::Service() 方法),然后将任务提交给线程池:
这样线程池中的线程会并行处理这些任务,避免了每次都创建新线程的开销。
4. ThreadData 类
ThreadData 类用于封装每个客户端连接的相关信息:
sockfd:客户端的套接字。
self:指向当前 TcpServer 对象的指针。
addr:客户端的地址信息。
该类主要用于在线程中传递参数,它的生命周期由线程池中的线程控制。
5. 错误处理
在整个过程中,如果发生错误(如创建套接字失败、绑定失败、监听失败等),会通过 LOG() 打印详细错误信息,并通过 exit() 终止程序。可以在实际应用中根据需求改进错误处理逻辑(如重新尝试,或者返回错误状态)。
6. 程序退出
服务器的退出主要由 Loop() 中的 _isrunning 控制。当前 _isrunning 为 false 时,Loop() 会退出。程序会继续执行后续的清理工作,并最终终止。
TcpServerMain.cc
#include "TcpServer.hpp"#include <memory>// ./tcpserver 8888
int main(int argc, char *argv[])
{if(argc != 2){std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;exit(0);}uint16_t port = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);tsvr->InitServer();tsvr->Loop();return 0;
}
TcpClientMain.cc
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>// ./tcpclient server-ip server-port
int main(int argc, char *argv[])
{if (argc != 3){std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;exit(0);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);// 1. 创建socketint sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){std::cerr << "create socket error" << std::endl;exit(1);}// 注意:不需要显示的bind,但是一定要有自己的IP和port,所以需要隐式的bind,OS会自动bind sockfd,用自己的IP和随机端口号// 什么时候进行自动bind?If the connection or binding succeedsstruct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);::inet_pton(AF_INET, serverip.c_str(), &server.sin_addr);int n = ::connect(sockfd, (struct sockaddr *)&server, sizeof(server));if (n < 0){std::cerr << "connect socket error" << std::endl;exit(2);}while(true){std::string message;std::cout << "Enter #";std::getline(std::cin, message);write(sockfd, message.c_str(), message.size());char echo_buffer[1024];n = read(sockfd, echo_buffer, sizeof(echo_buffer));if(n > 0){echo_buffer[n] = 0;std::cout << echo_buffer << std::endl;}else{break;}}::close(sockfd);return 0;
}
效果展示
虽然做的有点粗糙, 但是完成的还不错!
下一章还是 TCP socket 编程, 实现命令处理的功能, 处理从客户端接收到的命令,检查这些命令的安全性,并执行这些命令。好了, 篇幅已经很长了, 我们下期在见~