https://www.cnblogs.com/bwbfight/p/17594353.html
谈一谈linux下线程池 - 白伟碧一些小心得 - 博客园 (cnblogs.com)
谈一谈linux下线程池 - 白伟碧一些小心得 - 博客园 (cnblogs.com)
https://www.cnblogs.com/bwbfight/p/10901574.html
前面的设计,我们对asio的使用都是单线程模式,为了提升网络io并发处理的效率,这一次我们设计多线程模式下asio的使用方式。总体来说asio有两个多线程模型,第一个是启动多个线程,每个线程管理一个iocontext。第二种是只启动一个iocontext,被多个线程共享,后面的文章会对比两个模式的区别,这里先介绍第一种模式,多个线程,每个线程管理独立的iocontext服务。
单线程和多线程对比
之前的单线程模式图如下
我们设计的IOServicePool类型的多线程模型如下:
IOServicePool多线程模式特点
1 每一个io_context跑在不同的线程里,所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。
2 但是对于不同的socket,回调函数的触发可能是同一个线程(两个socket被分配到同一个io_context),也可能不是同一个线程(两个socket被分配到不同的io_context里)。所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。比如socket1代表玩家1,socket2代表玩家2,玩家1和玩家2在逻辑层存在交互,比如两个玩家都在做工会任务,他们属于同一个工会,工会积分的增加就是共享区的数据,需要保证线程安全。可以通过加锁或者逻辑队列的方式解决安全问题,我们目前采取了后者。
3 多线程相比单线程,极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用, 如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是穿行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个调用的情况,比如两个socket被部署到不同的iocontext上,但是当两个socket部署到同一个iocontext上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦合了,不会出现前一个调用时间影响下一个回调触发的问题。
其中代码如下:
const.h
#pragma once #define MAX_LENGTH 1024*2 //头部总长度 #define HEAD_TOTAL_LEN 4 //头部id长度 #define HEAD_ID_LEN 2 //头部数据长度 #define HEAD_DATA_LEN 2 #define MAX_RECVQUE 10000 #define MAX_SENDQUE 1000enum MSG_IDS {MSG_HELLO_WORD = 1001 };
Singleton.h
#pragma once #include <memory> #include <mutex> #include <iostream> using namespace std; template <typename T> class Singleton { protected:Singleton() = default;Singleton(const Singleton<T>&) = delete;Singleton& operator=(const Singleton<T>& st) = delete;static std::shared_ptr<T> _instance; public:static std::shared_ptr<T> GetInstance() {static std::once_flag s_flag;std::call_once(s_flag, [&]() {_instance = shared_ptr<T>(new T);});return _instance;}void PrintAddress() {std::cout << _instance.get() << endl;}~Singleton() {std::cout << "this is singleton destruct" << std::endl;} };template <typename T> std::shared_ptr<T> Singleton<T>::_instance = nullptr;
MsgNode.h
#pragma once #include <string> #include "const.h" #include <iostream> #include <boost/asio.hpp> using namespace std; using boost::asio::ip::tcp; class LogicSystem; class MsgNode { public:MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {_data = new char[_total_len + 1]();_data[_total_len] = '\0';}~MsgNode() {std::cout << "destruct MsgNode" << endl;delete[] _data;}void Clear() {::memset(_data, 0, _total_len);_cur_len = 0;}short _cur_len;short _total_len;char* _data; };class RecvNode :public MsgNode {friend class LogicSystem; public:RecvNode(short max_len, short msg_id); private:short _msg_id; };class SendNode:public MsgNode {friend class LogicSystem; public:SendNode(const char* msg,short max_len, short msg_id); private:short _msg_id; };
MsgNode.cpp
#include "MsgNode.h" RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len), _msg_id(msg_id){}SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN) , _msg_id(msg_id){//先发送id, 转为网络字节序short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);memcpy(_data, &msg_id_host, HEAD_ID_LEN);//转为网络字节序short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len); }
LogicSystem.h
#pragma once #include "Singleton.h" #include <queue> #include <thread> #include "CSession.h" #include <queue> #include <map> #include <functional> #include "const.h" #include <json/json.h> #include <json/value.h> #include <json/reader.h>typedef function<void(shared_ptr<CSession>, const short &msg_id, const string &msg_data)> FunCallBack; class LogicSystem:public Singleton<LogicSystem> {friend class Singleton<LogicSystem>; public:~LogicSystem();void PostMsgToQue(shared_ptr < LogicNode> msg); private:LogicSystem();void DealMsg();void RegisterCallBacks();void HelloWordCallBack(shared_ptr<CSession>, const short &msg_id, const string &msg_data);std::thread _worker_thread;std::queue<shared_ptr<LogicNode>> _msg_que;std::mutex _mutex;std::condition_variable _consume;bool _b_stop;std::map<short, FunCallBack> _fun_callbacks; };
LogicSystem.cpp
#include "LogicSystem.h"using namespace std;LogicSystem::LogicSystem():_b_stop(false){RegisterCallBacks();_worker_thread = std::thread (&LogicSystem::DealMsg, this); }LogicSystem::~LogicSystem(){_b_stop = true;_consume.notify_one();_worker_thread.join(); }void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) {std::unique_lock<std::mutex> unique_lk(_mutex);_msg_que.push(msg);//由0变为1则发送通知信号if (_msg_que.size() == 1) {unique_lk.unlock();_consume.notify_one();} }void LogicSystem::DealMsg() {for (;;) {std::unique_lock<std::mutex> unique_lk(_mutex);//判断队列为空则用条件变量阻塞等待,并释放锁while (_msg_que.empty() && !_b_stop) {_consume.wait(unique_lk);}//判断是否为关闭状态,把所有逻辑执行完后则退出循环if (_b_stop ) {while (!_msg_que.empty()) {auto msg_node = _msg_que.front();cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();}break;}//如果没有停服,且说明队列中有数据auto msg_node = _msg_que.front();cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();} }void LogicSystem::RegisterCallBacks() {_fun_callbacks[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack, this,placeholders::_1, placeholders::_2, placeholders::_3); }void LogicSystem::HelloWordCallBack(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "<< root["data"].asString() << endl;root["data"] = "server has received msg, msg data is " + root["data"].asString();std::string return_str = root.toStyledString();session->Send(return_str, root["id"].asInt()); }
CSession.h
#pragma once #include <boost/asio.hpp> #include <boost/uuid/uuid_io.hpp> #include <boost/uuid/uuid_generators.hpp> #include <queue> #include <mutex> #include <memory> #include "const.h" #include "MsgNode.h" using namespace std;using boost::asio::ip::tcp; class CServer; class LogicSystem;class CSession: public std::enable_shared_from_this<CSession> { public:CSession(boost::asio::io_context& io_context, CServer* server);~CSession();tcp::socket& GetSocket();std::string& GetUuid();void Start();void Send(char* msg, short max_length, short msgid);void Send(std::string msg, short msgid);void Close();std::shared_ptr<CSession> SharedSelf(); private:void HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self);void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self);tcp::socket _socket;std::string _uuid;char _data[MAX_LENGTH];CServer* _server;bool _b_close;std::queue<shared_ptr<SendNode> > _send_que;std::mutex _send_lock;//收到的消息结构std::shared_ptr<RecvNode> _recv_msg_node;bool _b_head_parse;//收到的头部结构std::shared_ptr<MsgNode> _recv_head_node; };class LogicNode {friend class LogicSystem; public:LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>); private:shared_ptr<CSession> _session;shared_ptr<RecvNode> _recvnode; };
CSession.cpp
#include "CSession.h" #include "CServer.h" #include <iostream> #include <sstream> #include <json/json.h> #include <json/value.h> #include <json/reader.h> #include "LogicSystem.h"CSession::CSession(boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false){boost::uuids::uuid a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN); } CSession::~CSession() {std::cout << "~CSession destruct" << endl; }tcp::socket& CSession::GetSocket() {return _socket; }std::string& CSession::GetUuid() {return _uuid; }void CSession::Start(){::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf())); }void CSession::Send(std::string msg, short msgid) {std::lock_guard<std::mutex> lock(_send_lock);int send_que_size = _send_que.size();if (send_que_size > MAX_SENDQUE) {std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;return;}_send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));if (send_que_size > 0) {return;}auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf())); }void CSession::Send(char* msg, short max_length, short msgid) {std::lock_guard<std::mutex> lock(_send_lock);int send_que_size = _send_que.size();if (send_que_size > MAX_SENDQUE) {std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;return;}_send_que.push(make_shared<SendNode>(msg, max_length, msgid));if (send_que_size>0) {return;}auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf())); }void CSession::Close() {_socket.close();_b_close = true; }std::shared_ptr<CSession>CSession::SharedSelf() {return shared_from_this(); }void CSession::HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {//增加异常处理try {if (!error) {std::lock_guard<std::mutex> lock(_send_lock);//cout << "send data " << _send_que.front()->_data+HEAD_LENGTH << endl;_send_que.pop();if (!_send_que.empty()) {auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self));}}else {std::cout << "handle write failed, error is " << error.what() << endl;Close();_server->ClearSession(_uuid);}}catch (std::exception& e) {std::cerr << "Exception code : " << e.what() << endl;}}void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){try {if (!error) {//已经移动的字符数int copy_len = 0;while (bytes_transferred > 0) {if (!_b_head_parse) {//收到的数据不足头部大小if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);_recv_head_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}//收到的数据比头部多//头部剩余未复制的长度int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);//更新已处理的data长度和剩余未处理的长度copy_len += head_remain;bytes_transferred -= head_remain;//获取头部MSGID数据short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);//网络字节序转化为本地字节序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg_id is " << msg_id << endl;//id非法if (msg_id > MAX_LENGTH) {std::cout << "invalid msg_id is " << msg_id << endl;_server->ClearSession(_uuid);return;}short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);//网络字节序转化为本地字节序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg_len is " << msg_len << endl;//id非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length is " << msg_len << endl;_server->ClearSession(_uuid);return;}_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里if (bytes_transferred < msg_len) {memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));//头部处理完成_b_head_parse = true;return;}memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);_recv_msg_node->_cur_len += msg_len;copy_len += msg_len;bytes_transferred -= msg_len;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';//cout << "receive data is " << _recv_msg_node->_data << endl;//此处将消息投递到逻辑队列中LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));//继续轮询剩余未处理数据_b_head_parse = false;_recv_head_node->Clear();if (bytes_transferred <= 0) {::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}continue;}//已经处理完头部,处理上次未接受完的消息数据//接收的数据仍不足剩余未处理的int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;if (bytes_transferred < remain_msg) {memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);_recv_msg_node->_cur_len += remain_msg;bytes_transferred -= remain_msg;copy_len += remain_msg;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';//cout << "receive data is " << _recv_msg_node->_data << endl;//此处将消息投递到逻辑队列中LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));//继续轮询剩余未处理数据_b_head_parse = false;_recv_head_node->Clear();if (bytes_transferred <= 0) {::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}continue;}}else {std::cout << "handle read failed, error is " << error.what() << endl;Close();_server->ClearSession(_uuid);}}catch (std::exception& e) {std::cout << "Exception code is " << e.what() << endl;} }LogicNode::LogicNode(shared_ptr<CSession> session, shared_ptr<RecvNode> recvnode):_session(session),_recvnode(recvnode) {}
CServer.h
#pragma once #include <boost/asio.hpp> #include "CSession.h" #include <memory.h> #include <map> #include <mutex> using namespace std; using boost::asio::ip::tcp; class CServer { public:CServer(boost::asio::io_context& io_context, short port);~CServer();void ClearSession(std::string); private:void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);void StartAccept();boost::asio::io_context &_io_context;short _port;tcp::acceptor _acceptor;std::map<std::string, shared_ptr<CSession>> _sessions;std::mutex _mutex; };
CServer.cpp
#include "CServer.h" #include <iostream> #include "AsioIOServicePool.h" CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port), _acceptor(io_context, tcp::endpoint(tcp::v4(),port)) {cout << "Server start success, listen on port : " << _port << endl;StartAccept(); }CServer::~CServer() {cout << "Server destruct listen on port : " << _port << endl; }void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){if (!error) {new_session->Start();lock_guard<mutex> lock(_mutex);_sessions.insert(make_pair(new_session->GetUuid(), new_session));}else {cout << "session accept failed, error is " << error.what() << endl;}StartAccept(); }void CServer::StartAccept() {auto &io_context = AsioIOServicePool::GetInstance()->GetIOService();shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1)); }void CServer::ClearSession(std::string uuid) {lock_guard<mutex> lock(_mutex);_sessions.erase(uuid); }
AsioIOServicePool.h
#pragma once #include <vector> #include <boost/asio.hpp> #include "Singleton.h" class AsioIOServicePool:public Singleton<AsioIOServicePool> {friend Singleton<AsioIOServicePool>; public:using IOService = boost::asio::io_context;using Work = boost::asio::io_context::work;using WorkPtr = std::unique_ptr<Work>;~AsioIOServicePool();AsioIOServicePool(const AsioIOServicePool&) = delete;AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;// 使用 round-robin 的方式返回一个 io_serviceboost::asio::io_context& GetIOService();void Stop(); private:AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());std::vector<IOService> _ioServices;std::vector<WorkPtr> _works;std::vector<std::thread> _threads;std::size_t _nextIOService; };
AsioIOServicePool.cpp
#include "AsioIOServicePool.h" #include <iostream> using namespace std; AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size), _works(size), _nextIOService(0){for (std::size_t i = 0; i < size; ++i) {_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));}//遍历多个ioservice,创建多个线程,每个线程内部启动ioservicefor (std::size_t i = 0; i < _ioServices.size(); ++i) {_threads.emplace_back([this, i]() {_ioServices[i].run();});} }AsioIOServicePool::~AsioIOServicePool() {std::cout << "AsioIOServicePool destruct" << endl; }boost::asio::io_context& AsioIOServicePool::GetIOService() {auto& service = _ioServices[_nextIOService++];if (_nextIOService == _ioServices.size()) {_nextIOService = 0;}return service; }void AsioIOServicePool::Stop(){//因为仅仅执行work.reset并不能让iocontext从run的状态中退出//当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务。for (auto& work : _works) {//把服务先停止work->get_io_context().stop();work.reset();}for (auto& t : _threads) {t.join();} }
main.cpp
#include <iostream> #include "CServer.h" #include "Singleton.h" #include "LogicSystem.h" #include <csignal> #include <thread> #include <mutex> #include "AsioIOServicePool.h" using namespace std; bool bstop = false; std::condition_variable cond_quit; std::mutex mutex_quit;int main() {try {auto pool = AsioIOServicePool::GetInstance();boost::asio::io_context io_context;boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);signals.async_wait([&io_context,pool](auto, auto) {io_context.stop();pool->Stop();});CServer s(io_context, 10086);io_context.run();}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << endl;}}