计算机网络socket编程(3)_UDP网络编程实现简单聊天室

个人主页:C++忠实粉丝
欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C++忠实粉丝 原创

计算机网络socket编程(3)_UDP网络编程实现简单聊天室

收录于专栏【计算机网络】
本专栏旨在分享学习计算机网络的一点学习笔记,欢迎大家在评论区交流讨论💌 
  

目录

功能介绍 : 

InetAddr.hpp

LockGuard.hpp

Log.hpp

nocopy.hpp

Thread.hpp

Thread Pool.hpp

Route.hpp 

UdpServer.hpp

UdpServerMain.cc

UdpClientMain.cc


书接上回~ 上回我们使用 UDP 网络编程实现了简单的网络字典, 这次, 我们继续开始我们 UDP 网络编程之旅 --- UDP 网络编程实现简单聊天室

功能介绍 : 

UDP 网络编程实现简单聊天室 

InetAddr.hpp

这个代码定义了一个 InetAddr 类,主要用于处理与网络地址(IPv4地址和端口)相关的操作。

#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>class InetAddr
{
private:void ToHost(const struct sockaddr_in &addr){_port = ntohs(addr.sin_port);// _ip = inet_ntoa(addr.sin_addr);char ip_buf[32];// inet_p to n// p: process// n: net// inet_pton(int af, const char *src, void *dst);// inet_pton(AF_INET, ip.c_str(), &addr.sin_addr.s_addr);::inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf));_ip = ip_buf;}public:InetAddr(const struct sockaddr_in &addr):_addr(addr){ToHost(addr);}bool operator == (const InetAddr &addr){return (this->_ip == addr._ip && this->_port == addr._port);}std::string Ip(){return _ip;}uint16_t Port(){return _port;}struct sockaddr_in Addr(){return _addr;}std::string AddrStr(){return _ip + ":" + std::to_string(_port);}~InetAddr(){}private:std::string _ip;uint16_t _port;struct sockaddr_in _addr;
};

InetAddr 类成员变量

_ip:用于存储 IP 地址,以 std::string 类型存储。

_port:用于存储端口号,使用 uint16_t(16 位无符号整数)。

_addr:存储 sockaddr_in 结构体,表示一个 IPv4 地址和端口。

构造函数

构造函数接收一个 sockaddr_in 类型的地址。它将该地址保存到成员变量 _addr 中,并调用 ToHost 方法将其转换为可读的 IP 地址和端口。

ToHost 方法

ToHost 方法负责将 sockaddr_in 结构中的信息提取并转换为可读的形式:

ntohs(addr.sin_port):将网络字节序的端口号转换为主机字节序,并赋值给 _port。

inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf)):将 IP 地址(addr.sin_addr)从网络字节序转换为点分十进制格式的字符串,并保存在 ip_buf 中。然后将该字符串赋给 _ip。

inet_ntop 是一个用于将网络地址转换为文本字符串的标准函数。它与 inet_ntoa 类似,但是支持多种地址族(如 IPv6 和 IPv4),因此 inet_ntop 更加通用。

成员函数

operator == (const InetAddr &addr):重载了 == 操作符,用于比较两个 InetAddr 对象是否相等。相等的条件是它们的 IP 地址和端口号都相同。

Ip():返回 IP 地址的字符串表示。

Port():返回端口号(uint16_t 类型)。

Addr():返回 sockaddr_in 结构体,可以用于与底层 socket API 进行交互。

AddrStr():将 IP 地址和端口组合成一个字符串,例如 "192.168.1.1:8080"。

析构函数

析构函数为空,当前类没有动态分配资源,因此不需要做额外的清理工作。

LockGuard.hpp

这个头文件在上回已经出现过了, 是通用性的, 直接被我拷贝下来了~

#pragma once#include <pthread.h>class LockGuard
{
public:LockGuard(pthread_mutex_t *mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);}
private:pthread_mutex_t *_mutex;
};

Log.hpp

这个是我们的日志系统, 肯定不会换, 所以我还是直接拷贝下来~

#pragma once#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include <pthread.h>
#include "LockGuard.hpp"namespace log_ns
{enum{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string LevelToString(int level){switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOWN";}}std::string GetCurrTime(){time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",curr_time->tm_year + 1900,curr_time->tm_mon + 1,curr_time->tm_mday,curr_time->tm_hour,curr_time->tm_min,curr_time->tm_sec);return buffer;}class logmessage{public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;};#define SCREEN_TYPE 1
#define FILE_TYPE 2const std::string glogfile = "./log.txt";pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;// log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );class Log{public:Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE){}void Enable(int type){_type = type;}void FlushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile, std::ios::app);if (!out.is_open())return;char logtxt[2048];snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());out.write(logtxt, strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){// 加过滤逻辑 --- TODOLockGuard lockguard(&glock);switch (_type){case SCREEN_TYPE:FlushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}}void logMessage(std::string filename, int filenumber, int level, const char *format, ...){logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber = filenumber;lg._curr_time = GetCurrTime();// va_list 是一个结构体类型,专门用于存储指向可变参数列表的指针。当你使用可变参数时,首先要通过 va_start 宏初始化一个 va_list 变量,然后用它来访问每个可变参数。va_list ap;va_start(ap, format);char log_info[1024];vsnprintf(log_info, sizeof(log_info), format, ap);va_end(ap);lg._message_info = log_info;// 打印出来日志FlushLog(lg);}~Log(){}private:int _type;std::string _logfile;};Log lg;#define LOG(Level, Format, ...)                                        \do                                                                 \{                                                                  \lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \} while (0)
#define EnableScreen()          \do                          \{                           \lg.Enable(SCREEN_TYPE); \} while (0)
#define EnableFILE()          \do                        \{                         \lg.Enable(FILE_TYPE); \} while (0)
};

nocopy.hpp

定义一个 nocopy 的类, 通过 C++11 delete 关键字的特性, 阻止该类的拷贝构造和拷贝赋值, 这个也是一样, 直接拷贝下来~

#pragma onceclass nocopy
{
public:nocopy(){}~nocopy(){}nocopy(const nocopy&) = delete;const nocopy& operator=(const nocopy&) = delete;
};

Thread.hpp

这段代码定义了一个 C++ 的 Thread 类,它封装了 POSIX 线程(pthread)相关的功能,使用了 std::function 类型的回调函数来执行线程任务。该类支持启动、停止、检查状态、连接线程等基本的线程操作。(这个类也是在我 Linux 专栏的线程一章拷贝过来的), 大家想看关于这个类的详细讲解, 可以去专栏寻找~~ 这里也会简单介绍一下~

#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>namespace ThreadMoudle
{// 线程要执行的方法,后面我们随时调整// typedef void (*func_t)(ThreadData *td); // 函数指针类型// typedef std::function<void()> func_t;using func_t = std::function<void(const std::string&)>;class Thread{public:void Excute(){_isrunning = true;_func(_name);_isrunning = false;}public:Thread(const std::string &name, func_t func):_name(name), _func(func){}static void *ThreadRoutine(void *args) // 新线程都会执行该方法!{Thread *self = static_cast<Thread*>(args); // 获得了当前对象self->Excute();return nullptr;}bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);if(n != 0) return false;return true;}std::string Status(){if(_isrunning) return "running";else return "sleep";}void Stop(){if(_isrunning){::pthread_cancel(_tid);_isrunning = false;}}void Join(){::pthread_join(_tid, nullptr);}std::string Name(){return _name;}~Thread(){}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func; // 线程要执行的回调函数};
} // namespace ThreadModle

func_t 类型定义

using func_t = std::function<void(const std::string&)>;

func_t 是一个类型别名,代表了一个接受 std::string 类型参数并返回 void 的函数类型。std::function 是一个非常灵活的 C++ 标准库模板,它可以存储任何可以匹配给定签名的函数、函数指针、成员函数指针、或者 Lambda 表达式。在这里,func_t 表示线程要执行的回调函数,这些函数接受一个 std::string 参数。

Thread 类的成员变量和构造函数

_name:保存线程的名称,这是一个 std::string 类型的变量。

_tid:类型为 pthread_t,用于标识线程的 ID。这个值由 pthread_create 创建时生成。

_isrunning:一个布尔值,表示线程是否正在运行。

_func:一个 func_t 类型的函数对象,表示线程启动后执行的任务。

构造函数接受线程名称和一个回调函数(func_t 类型),并将它们初始化给对应的成员变量。_isrunning 初始值为 false。

Excute 方法

Excute 方法是线程执行的主要操作。当线程启动时,这个方法会被调用。它的功能是:

将 _isrunning 设置为 true,表示线程正在运行。

调用回调函数 _func,并传递线程的名称 _name。

最后将 _isrunning 设置为 false,表示线程执行完毕。

ThreadRoutine 静态方法

ThreadRoutine 是一个静态函数,是每个线程创建时要执行的函数。pthread_create 会将一个 void* 类型的参数传递给线程的执行函数,在这里我们传递了当前 Thread 对象的指针。为了方便使用,我们通过 static_cast<Thread*>(args) 将 args 转换为 Thread* 类型,然后调用线程对象的 Excute 方法来执行线程任务。

Start 方法

Start 方法用于启动线程。它通过调用 pthread_create 来创建一个新线程:

_tid:保存新创建线程的 ID。

nullptr:该参数可以用来设置线程属性,传入 nullptr 表示使用默认属性。

ThreadRoutine:线程的执行入口函数,即上面提到的静态函数。

this:将当前 Thread 对象的指针作为参数传递给 ThreadRoutine。

如果线程创建失败,pthread_create 会返回非零值,因此返回 false 表示创建失败。否则,返回 true

Status 方法

Status 方法检查线程是否正在运行,并返回当前状态:

如果 _isrunning 为 true,返回 "running"。

否则,返回 "sleep",表示线程没有在运行。

Stop 方法

Stop 方法用于停止线程。如果线程正在运行(即 _isrunning 为 true),它通过调用 pthread_cancel 来请求取消线程的执行,并将 _isrunning 设置为 false。需要注意,pthread_cancel 只是发送一个取消请求,线程的具体取消机制和时机依赖于线程的实现。

Join 方法

Join 方法用来等待线程执行完毕。pthread_join 会阻塞当前线程,直到指定的线程执行完毕。传入的 _tid 是目标线程的 ID。

Name 方法

Name 方法返回线程的名称(_name)。

析构函数

析构函数为空,表明该类没有显式的资源释放要求,pthread_t 会在程序结束时自动清理。如果有需要释放的资源,可以在此添加清理代码。

Thread Pool.hpp

#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"using namespace ThreadMoudle;
using namespace log_ns;static const int gdefaultnum = 5;void test()
{while (true){std::cout << "hello world" << std::endl;sleep(1);}
}template <typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void Wakeup(){pthread_cond_signal(&_cond);}void WakeupAll(){pthread_cond_broadcast(&_cond);}void Sleep(){pthread_cond_wait(&_cond, &_mutex);}bool IsEmpty(){return _task_queue.empty();}void HandlerTask(const std::string &name) // this{while (true){// 取任务LockQueue();while (IsEmpty() && _isrunning){_sleep_thread_num++;LOG(INFO, "%s thread sleep begin!\n", name.c_str());Sleep();LOG(INFO, "%s thread wakeup!\n", name.c_str());_sleep_thread_num--;}// 判定一种情况if (IsEmpty() && !_isrunning){UnlockQueue();LOG(INFO, "%s thread quit\n", name.c_str());break;}// 有任务T t = _task_queue.front();_task_queue.pop();UnlockQueue();// 处理任务t(); // 处理任务,此处不用/不能在临界区中处理// std::cout << name << ": " << t.result() << std::endl;// LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());}}void Init(){func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);for (int i = 0; i < _thread_num; i++){std::string threadname = "thread-" + std::to_string(i + 1);_threads.emplace_back(threadname, func);LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str());}}void Start(){_isrunning = true;for (auto &thread : _threads){LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());thread.Start();}}ThreadPool(int thread_num = gdefaultnum): _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool<T> &) = delete;void operator=(const ThreadPool<T> &) = delete;public:void Stop(){LockQueue();_isrunning = false;WakeupAll();UnlockQueue();LOG(INFO, "Thread Pool Stop Success!\n");}// 如果是多线程获取单例呢?static ThreadPool<T> *GetInstance(){if (_tp == nullptr){LockGuard lockguard(&_sig_mutex);if (_tp == nullptr){LOG(INFO, "create threadpool\n");// thread-1 thread-2 thread-3...._tp = new ThreadPool<T>();_tp->Init();_tp->Start();}else{LOG(INFO, "get threadpool\n");}}return _tp;}void Equeue(const T &in){LockQueue();if (_isrunning){_task_queue.push(in);if (_sleep_thread_num > 0)Wakeup();}UnlockQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _thread_num;std::vector<Thread> _threads;std::queue<T> _task_queue;bool _isrunning;int _sleep_thread_num;pthread_mutex_t _mutex;pthread_cond_t _cond;// 单例模式// volatile static ThreadPool<T> *_tp;static ThreadPool<T> *_tp;static pthread_mutex_t _sig_mutex;
};template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;

这段代码实现了一个线程池(ThreadPool)类,它能够管理一组工作线程,并使得这些线程从任务队列中获取任务并执行。任务可以是任何类型的可调用对象(例如 std::function<void()>)。该类还使用了 单例模式 来确保全局只有一个线程池实例。

静态常量

gdefaultnum:线程池的默认线程数,默认为 5 个线程。

ThreadPool 类模板定义

ThreadPool<T> 是一个模板类,接受任务类型 T,任务类型必须是可以调用的类型,例如 std::function<void()>。

成员变量

_thread_num:指定线程池中的线程数。

_threads:存储线程池中的线程对象。

_task_queue:任务队列,存储待处理的任务。

_isrunning:线程池是否处于运行状态,控制线程池的生命周期。

_sleep_thread_num:记录当前处于休眠状态的线程数。

_mutex:用于保护任务队列的互斥锁,确保线程安全。

_cond:条件变量,用于在任务队列为空时让线程休眠,等待任务的到来。

_tp:线程池单例对象的指针。

_sig_mutex:用于保护线程池单例的互斥锁。

主要成员方法

LockQueue() 和 UnlockQueue()

LockQueue() 和 UnlockQueue():分别用于加锁和解锁任务队列的互斥锁。确保对任务队列的操作是线程安全的。

Wakeup() 和 WakeupAll()

Wakeup():唤醒一个等待线程。

WakeupAll():唤醒所有等待线程。通常用于停止线程池时,确保所有线程被唤醒。

Sleep()

Sleep():让线程在条件变量上等待,并释放锁。当条件满足时,线程会被唤醒。

IsEmpty()

IsEmpty():检查任务队列是否为空。

HandlerTask(const std::string &name)

HandlerTask():这是工作线程的主函数。每个工作线程会不断从任务队列中取任务并执行。如果任务队列为空,线程会休眠,直到有新的任务被添加到队列中。

睡眠与唤醒机制:

如果队列为空且线程池正在运行,线程会进入休眠状态并等待条件变量的通知。

如果队列不为空,线程会取出任务并执行。

Init()

Init():初始化线程池。通过 std::bind 将 HandlerTask 方法绑定到工作线程中,并将每个线程的任务(处理任务的方法)分配给 Thread 类实例。

每个线程会被命名为 thread-1, thread-2, ...,并启动。

Start()

Start():启动线程池中的所有线程。通过调用每个线程对象的 Start() 方法,让它们开始执行任务。

Stop()

Stop():停止线程池。设置 _isrunning 为 false,并通过 WakeupAll() 唤醒所有线程,使它们退出循环,最终结束工作。

Equeue(const T &in)

Equeue():将任务 in 添加到任务队列。如果有线程在休眠,唤醒一个线程去执行任务。

单例模式的实现

单例模式保证了类只有一个实例,并提供了一个全局访问点。该实现中的 ThreadPool<T> 使用了双重检查锁定(Double-Checked Locking)来确保线程池的实例在多线程环境下安全地创建。

_tp 和 _sig_mutex 的定义:

_tp:这是一个静态指针,指向 ThreadPool<T> 的唯一实例。最初,它被初始化为 nullptr,表示线程池尚未被创建。

_sig_mutex:这是一个静态互斥锁,保证在创建单例实例时,多个线程不会同时进入创建实例的代码区块,从而避免竞态条件。它通过 PTHREAD_MUTEX_INITIALIZER 进行初始化。

GetInstance() 方法

GetInstance() 方法实现了线程池的 单例 逻辑:

第一次检查(if (_tp == nullptr)):通过检查静态指针 _tp 是否为 nullptr,来判断是否已经创建了线程池。如果 nullptr,说明线程池还没有被创建。

加锁:当线程池尚未创建时,使用 LockGuard 对 _sig_mutex 锁进行加锁,保证在多线程环境中只有一个线程可以进入到创建线程池的代码区域。这是防止竞态条件的发生。

第二次检查(if (_tp == nullptr)):由于加锁后,其他线程可能已经创建了线程池,因此需要再次检查 _tp 是否为 nullptr。这是典型的双重检查锁定(Double-Checked Locking)模式。

创建线程池实例:如果线程池还没有创建(_tp == nullptr),则使用 new 创建线程池实例,并调用 Init() 和 Start() 方法初始化并启动线程池。

返回线程池实例:最后,无论线程池是否已创建,都返回线程池实例 _tp。

Route.hpp 

这段代码定义了一个 Route 类,用于管理在线用户并处理消息转发。它使用了线程池、互斥锁、以及其他辅助类(如 InetAddr、ThreadPool 和 LockGuard)。 

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include "InetAddr.hpp"
#include "ThreadPool.hpp"
#include "LockGuard.hpp"// class user
// {};
using task_t = std::function<void()>;class Route
{
public:Route(){pthread_mutex_init(&_mutex, nullptr);}void CheckOnlineUser(InetAddr &who){LockGuard lockguard(&_mutex);for (auto &user : _online_user){if (user == who){LOG(DEBUG, "%s is exists\n", who.AddrStr().c_str());return;}}LOG(DEBUG, "%s is not exists, add it\n", who.AddrStr().c_str());_online_user.push_back(who);}// for testvoid Offline(InetAddr &who){LockGuard lockguard(&_mutex);auto iter = _online_user.begin();for (; iter != _online_user.end(); iter++){if (*iter == who){LOG(DEBUG, "%s is offline\n", who.AddrStr().c_str());_online_user.erase(iter);break;}}}void ForwardHelper(int sockfd, const std::string message, InetAddr who){LockGuard lockguard(&_mutex);std::string send_message = "[" + who.AddrStr() + "]# " + message;for (auto &user : _online_user){struct sockaddr_in peer = user.Addr();LOG(DEBUG, "Forward message to %s, message is %s\n", user.AddrStr().c_str(), send_message.c_str());::sendto(sockfd, send_message.c_str(), send_message.size(), 0, (struct sockaddr *)&peer, sizeof(peer));}}void Forward(int sockfd, const std::string &message, InetAddr &who){// 1. 该用户是否在 在线用户列表中呢?如果在,什么都不做,如果不在,自动添加到_online_userCheckOnlineUser(who);// 1.1 message == "QUIT" "Q"if (message == "QUIT" || message == "Q"){Offline(who);}// 2. who 一定在_online_user列表里面// ForwardHelper(sockfd, message);task_t t = std::bind(&Route::ForwardHelper, this, sockfd, message, who);ThreadPool<task_t>::GetInstance()->Equeue(t);}~Route(){pthread_mutex_destroy(&_mutex);}private:std::vector<InetAddr> _online_user;pthread_mutex_t _mutex;
};

类成员与依赖

_online_user:存储在线用户的列表,类型为 std::vector<InetAddr>,每个 InetAddr 代表一个用户的 IP 地址和端口。

_mutex:互斥锁,确保在线用户列表 _online_user 在多线程环境下安全访问。线程在访问 Route 中的共享资源时会加锁,防止数据竞争。

构造函数与析构函数

构造函数:初始化互斥锁 _mutex,以确保在多线程环境下可以安全地访问 _online_user。

析构函数:销毁互斥锁,释放资源。

CheckOnlineUser 方法

作用:检查用户 who 是否已经在 _online_user 中,如果已经存在,不做任何处理;如果不存在,则将其添加到 _online_user 中。

加锁:使用 LockGuard 来保证 _online_user 的访问是线程安全的,即确保不会在多线程环境下发生竞争条件。

遍历:通过迭代器遍历 _online_user,比较每个 user 是否等于 who(InetAddr 的 operator== 比较函数)。

日志:使用 LOG(DEBUG, ...) 打印调试信息。

Offline 方法

作用:将用户 who 从 _online_user 中移除,表示该用户下线。

加锁:使用 LockGuard 加锁,确保多线程环境下 _online_user 的安全修改。

遍历:遍历 _online_user,如果找到了与 who 匹配的用户,则移除该用户并输出调试日志。

ForwardHelper 方法

作用:将 message 转发给所有在线用户。

加锁:再次使用 LockGuard 来保证多线程安全,避免并发访问 _online_user。

构造发送消息:构造一个格式化的消息,包含发送者的地址和消息内容。

发送消息:遍历 _online_user,对每个在线用户通过 sendto 发送消息。sendto 是一个 socket 函数,用于发送数据报(UDP)。

Forward 方法

作用:处理消息转发的逻辑。

检查用户是否在线:调用 CheckOnlineUser 确保 who 在 _online_user 中。如果不在,则会被自动添加。

退出消息处理:如果 message 是 "QUIT" 或 "Q",则调用 Offline 方法让该用户下线。

任务转发:将转发消息的任务包装成一个 task_t(即 std::function<void()> 类型的任务),并提交给线程池。

线程池:使用 ThreadPool<task_t>::GetInstance()->Equeue(t) 将消息转发任务异步地提交到线程池进行处理。这意味着消息的发送不会阻塞主线程,而是由线程池中的工作线程去执行。

UdpServer.hpp

#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <cstring>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include "nocopy.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"using namespace log_ns;static const int gsockfd = -1;
static const uint16_t glocalport = 8888;enum
{SOCKET_ERROR = 1,BIND_ERROR
};using service_t = std::function<void(int, const std::string &message, InetAddr &who)>;// UdpServer user("192.1.1.1", 8899);
// 一般服务器主要是用来进行网络数据读取和写入的。IO的
// 服务器IO逻辑 和 业务逻辑 解耦
class UdpServer : public nocopy
{
public:UdpServer(service_t func, uint16_t localport = glocalport): _func(func),_sockfd(gsockfd),_localport(localport),_isrunning(false){}void InitServer(){// 1. 创建socket文件_sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);if (_sockfd < 0){LOG(FATAL, "socket error\n");exit(SOCKET_ERROR);}LOG(DEBUG, "socket create success, _sockfd: %d\n", _sockfd); // 3// 2. bindstruct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_localport);// local.sin_addr.s_addr = inet_addr(_localip.c_str()); // 1. 需要4字节IP 2. 需要网络序列的IP -- 暂时local.sin_addr.s_addr = INADDR_ANY; // 服务器端,进行任意IP地址绑定int n = ::bind(_sockfd, (struct sockaddr *)&local, sizeof(local));if (n < 0){LOG(FATAL, "bind error\n");exit(BIND_ERROR);}LOG(DEBUG, "socket bind success\n");}void Start(){_isrunning = true;char message[1024];while (_isrunning){struct sockaddr_in peer;socklen_t len = sizeof(peer);ssize_t n = recvfrom(_sockfd, message, sizeof(message) - 1, 0, (struct sockaddr *)&peer, &len);if (n > 0){InetAddr addr(peer);message[n] = 0;LOG(DEBUG, "[%s]# %s\n", addr.AddrStr().c_str(), message);_func(_sockfd, message, addr);LOG(DEBUG, "return udpserver\n");}else{std::cout << "recvfrom ,  error" << std::endl;}}_isrunning = false;}~UdpServer(){if (_sockfd > gsockfd)::close(_sockfd);}private:int _sockfd; // 读写都用同一个sockfd, 反应说明:UDP是全双工通信的!uint16_t _localport;// std::string _localip; // TODO:后面专门要处理一下这个IPbool _isrunning;service_t _func;
};

UdpServerMain.cc

#include "UdpServer.hpp"
#include "Route.hpp"#include <memory>// ./udp_server local-port
// ./udp_server 8888
int main(int argc, char *argv[])
{if(argc != 2){std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;exit(0);}uint16_t port = std::stoi(argv[1]);EnableScreen();Route messageRoute;service_t message_route = std::bind(&Route::Forward,\&messageRoute, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);std::unique_ptr<UdpServer> usvr = std::make_unique<UdpServer>(message_route, port); //C++14的标准usvr->InitServer();usvr->Start();return 0;
}

UdpClientMain.cc

#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Thread.hpp"using namespace ThreadMoudle;int InitClient()
{int sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);if (sockfd < 0){std::cerr << "create socket error" << std::endl;exit(1);}return sockfd;
}void RecvMessage(int sockfd, const std::string &name)
{while (true){struct sockaddr_in peer;socklen_t len = sizeof(peer);char buffer[1024];int n = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);if (n > 0){buffer[n] = 0;std::cerr << buffer << std::endl;}else{std::cerr << "recvfrom error" << std::endl;break;}}
}void SendMessage(int sockfd, std::string serverip, uint16_t serverport, const std::string &name)
{struct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);server.sin_addr.s_addr = inet_addr(serverip.c_str());std::string cli_profix = name + "# "; // sender-thread# 你好while (true){std::string line;std::cout << cli_profix;std::getline(std::cin, line);int n = sendto(sockfd, line.c_str(), line.size(), 0, (struct sockaddr *)&server, sizeof(server));if (n <= 0)break;}
}int main(int argc, char *argv[])
{if (argc != 3){std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;exit(0);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);int sockfd = InitClient();Thread recver("recver-thread", std::bind(&RecvMessage, sockfd, std::placeholders::_1));Thread sender("sender-thread", std::bind(&SendMessage, sockfd, serverip, serverport, std::placeholders::_1));recver.Start();sender.Start();recver.Join();sender.Join();::close(sockfd);return 0;
}

 最后实在写不下去了, 下期再见吧~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/61588.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

英文版本-带EXCEL函数的数据分析

一、问题&#xff1a; 二、表格内容 三、分析结果 四、具体的操作步骤&#xff1a; 销售工作表公式设计与数据验证 类别&#xff08;Category&#xff09;列公式&#xff1a; 在Category列&#xff08;假设为D列&#xff09;&#xff0c;根据ProductCode在Catalogue工作表中查找…

三层交换机静态路由实验

1、前置知识 2、实验目的 3、实验器材&#xff1a; 3560-23PS交换机2台、主机4台、交叉线1根和直通网线4根。 4、实验规划及拓扑 实验要求&#xff1a; &#xff08;1&#xff09;在交换机A和交换机B上分别划分基于端口的VLAN&#xff1a; 交换机 VLAN 端口成员 交换机…

PLC与PLC跨网段通讯的几种方法:厂区组网实践

PLC通常通过以太网或其他工业网络协议&#xff08;如PROFINET、Modbus TCP等&#xff09;进行通信。当PLC位于不同的网段时&#xff0c;它们不能直接通信&#xff0c;需要特殊的配置或设备来实现通信&#xff0c;不同网段的PLC通讯变得尤为重要。 随着工业网络的发展和工业4.0概…

观察者模式和订阅模式

观察者模式和订阅模式在概念上是相似的&#xff0c;它们都涉及到一个对象&#xff08;通常称为“主题”或“发布者”&#xff09;和多个依赖对象&#xff08;称为“观察者”或“订阅者”&#xff09;之间的关系。然而&#xff0c;尽管它们有相似之处&#xff0c;但在某些方面也…

HarmonyOs鸿蒙开发实战(20)=>一文学会基础使用组件导航Navigation

敲黑板&#xff0c;以下是重点技巧。文章末尾有实战项目效果截图及代码截图可参考 1.概要 Navigation是路由导航的根视图容器Navigation组件主要包含​导航页&#xff08;NavBar&#xff09;和子页&#xff08;NavDestination&#xff09;&#xff0c;导航页不存在页面栈中&am…

DevOps-Jenkins-新手入门级

1. Jenkins概述 1. Jenkins是一个开源持续集成的工具&#xff0c;是由JAVA开发而成 2. Jenkins是一个调度平台&#xff0c;本身不处理任何事情&#xff0c;调用插件来完成所有的工作 1.1 什么是代码部署 代码发布/部署>开发书写的程序代码---->部署测试/生产环境 web服务…

在win10下搭建ftp服务器

1 说明 本文档在win10下实现。 2 安装ftp服务器 打开“控制面板/程序和功能”&#xff0c;如下&#xff1a; 点击“启用或关闭windows功能”&#xff0c;如下&#xff1a; 安装“ftp服务器”&#xff0c;将下图红色圈中部分打勾&#xff0c;如下&#xff1a; 必须勾选…

数据结构C语言描述4(图文结合)--栈的实现,中序转后序表达式的实现

前言 这个专栏将会用纯C实现常用的数据结构和简单的算法&#xff1b;有C基础即可跟着学习&#xff0c;代码均可运行&#xff1b;准备考研的也可跟着写&#xff0c;个人感觉&#xff0c;如果时间充裕&#xff0c;手写一遍比看书、刷题管用很多&#xff0c;这也是本人采用纯C语言…

vue中路由缓存

vue中路由缓存 问题描述及截图解决思路关键代码及打印信息截图 问题描述及截图 在使用某一平台时发现当列表页码切换后点击某一卡片进入详情页后&#xff0c;再返回列表页时页面刷新了。这样用户每次看完详情回到列表页都得再重新输入自己的查询条件&#xff0c;或者切换分页到…

如何在 UniApp 中实现 iOS 版本更新检测

随着移动应用的不断发展&#xff0c;保持应用程序的更新是必不可少的&#xff0c;这样用户才能获得更好的体验。本文将帮助你在 UniApp 中实现 iOS 版的版本更新检测和提示&#xff0c;适合刚入行的小白。我们将分步骤进行说明&#xff0c;每一步所需的代码及其解释都会一一列出…

移动充储机器人“小奥”的多场景应用(上)

一、高速公路服务区应用 在高速公路服务区&#xff0c;新能源汽车的充电需求得到“小奥”机器人的及时响应。该机器人配备有储能电池和自动驾驶技术&#xff0c;能够迅速定位至指定充电点&#xff0c;为待充电的新能源汽车提供服务。得益于“小奥”的机动性&#xff0c;其服务…

Redis 的代理类注入失败,连不上 redis

在测试 redis 是否成功连接时&#xff0c;发现 bean 没有被创建成功&#xff0c;导致报错 根据报错提示&#xff0c;需要我们添加依赖&#xff1a; <dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>&l…

桌面怎么快速添加便签?适合桌面记事的便签小工具

在数字化时代&#xff0c;我们每天面对电脑处理大量任务&#xff0c;无论是工作计划、会议纪要还是个人生活琐事&#xff0c;都需要一个可靠的桌面记事工具来帮助我们记录和整理。因此&#xff0c;一款适合桌面使用的便签软件成为了我们不可或缺的助手。 敬业签就是这样一款功…

UE5 腿部IK 解决方案 footplacement

UE5系列文章目录 文章目录 UE5系列文章目录前言一、FootPlacement 是什么&#xff1f;二、具体实现 前言 在Unreal Engine 5 (UE5) 中&#xff0c;腿部IK&#xff08;Inverse Kinematics&#xff0c;逆向运动学&#xff09;是一个重要的动画技术&#xff0c;用于实现角色脚部准…

KLV6008固态继电器:高压应用的理想紧凑方案

在当今快节奏的电子领域&#xff0c;找到平衡性能、可靠性和安全性的组件至关重要。CRIA Semiconductor的KLV6008固态继电器(SSR)正是满足了这一要求。这款紧凑型继电器专为高压、低电流切换而设计&#xff0c;是适用于各种应用的多功能解决方案。 为什么选择KLV6008&#xff1…

在 Swift 中实现字符串分割问题:以字典中的单词构造句子

文章目录 前言摘要描述题解答案题解代码题解代码分析示例测试及结果时间复杂度空间复杂度总结 前言 本题由于没有合适答案为以往遗留问题&#xff0c;最近有时间将以往遗留问题一一完善。 LeetCode - #140 单词拆分 II 不积跬步&#xff0c;无以至千里&#xff1b;不积小流&…

HarmonyOs鸿蒙开发实战(21)=>组件间通信@ohos/liveeventbus

1.简介 LiveEventBus是一款消息总线&#xff0c;具有生命周期感知能力&#xff0c;支持Sticky&#xff0c;支持跨进程&#xff0c;支持跨APP发送消息。 2.下载安装 ohpm install ohos/liveeventbus 3.订阅&#xff0c;注册监听 4.发送事件 5. 完成 > 记得关注博主&#xff…

OpenCV和Qt坐标系不一致问题

“ OpenCV和QT坐标系导致绘图精度下降问题。” OpenCV和Qt常用的坐标系都是笛卡尔坐标系&#xff0c;但是细微处有些不同。 01 — OpenCV坐标系 OpenCV是图像处理库&#xff0c;是以图像像素为一个坐标位置&#xff0c;即一个像素对应一个坐标&#xff0c;所以其坐标系也叫图像…

nohup java -jar supporterSys.jar --spring.profiles.active=prod

文章目录 1、ps -ef | grep java2、kill 13713、ps -ef | grep java4、nohup java -jar supporterSys.jar --spring.profiles.activeprod &5、ps -ef | grep java1. 启动方式进程 1371进程 19994 2. 主要区别3. 可能的原因4. 建议 1、ps -ef | grep java rootshipper:~# p…

Ubuntu上安装MySQL并且实现远程登录

目录 下载网络工具 查看网络连接 更新系统软件包&#xff1b; 安装mysql数据库 查看mysql数据库状态 以数字ip形式显示mysql的监听状态。&#xff08;默认监听端口是3306&#xff09; 查看安装mysql数据库时系统创建的目录信息。 根据查询到的系统用户名以及随机密码&a…