从连接池中获取上下文,接受连接。
async_accept
这个函数本质上是监听和接受客户端连接的结合操作。
void CServer::StartAccept()
{auto& io_context = AsioIOServicePool::GetInstance()->GetIOService();std::shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
}
启动会话,将生成的会话id和会话绑定在一起(在
CSession
构造函数中生成会话id
),插入_sessionsMap
中。**注意:**不要忘记递归调用接收函数StartAccept()
。
void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error)
{if (!error) {new_session->Start();std::lock_guard<mutex> lock(_mutex);_sessions.insert(make_pair(new_session->GetSessionId(), new_session));}StartAccept();
}
首先读取头部字节大小,包括消息
id
和消息长度
void CSession::Start() {// 先读取数据的头部AsyncReadHead(HEAD_TOTAL_LEN);
}
指定读取头部节点的大小(
4
字节)接下来详细解释读取头部回调函数中的内容,这是已经将发生过来的数据完整的读取下来了
首先,若发生错误,则关闭套接字连接,从
Map
中移除会话id
和会话的绑定。若无错误,先需要清空接收头部节点,保证接受头部节点中没有之前的数据,然后将接收的数据复制到接收头部节点中。
然后从接收头部节点中获取消息
id
和消息长度,每次从头部节点取出数据前需要将网络字节序转化为本地字节序。若获取消息id和消息长度无错误,则创建接收消息内容节点然后调用
AsyncReadBody()
函数
void CSession::AsyncReadHead(int total_len)
{auto self = shared_from_this();asyncReadFull(HEAD_TOTAL_LEN, [self, this](const boost::system::error_code& ec, std::size_t bytes_transfered) {try {if (ec) {std::cout << "handle read failed, error is " << ec.what() << endl;Close();_server->ClearSession(_session_id);return;}// 不足一个头部大小if (bytes_transfered < HEAD_TOTAL_LEN) {std::cout << "read length not match, read [" << bytes_transfered << "] , total ["<< HEAD_TOTAL_LEN << "]" << endl;Close();_server->ClearSession(_session_id);return;}_recv_head_node->Clear();memcpy(_recv_head_node->_data, _data, bytes_transfered);// 获取头部MSGID数据short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);// 网络字节序转化为本地字节序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg_id is " << msg_id << endl;// id非法if (msg_id > MAX_LENGTH) {std::cout << "invalid msg_id is " << msg_id << endl;_server->ClearSession(_session_id);return;}short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);// 网络字节序转化为本地字节序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg_len is " << msg_len << endl;// id非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length is " << msg_len << endl;_server->ClearSession(_session_id);return;}_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);AsyncReadBody(msg_len);}catch (std::exception& e) {std::cout << "Exception code is " << e.what() << endl;}});
}
该函数指定读取长度大小,传入需要读取的长度和执行的回调
每次都清空
_data
所以每次读取执行数据的读取时,只需要从头开始即可
// 读取完整长度
void CSession::asyncReadFull(std::size_t maxLength, std::function<void(const boost::system::error_code&, std::size_t)> handler)
{::memset(_data, 0, MAX_LENGTH);asyncReadLen(0, maxLength, handler);
}
第一个参数是起始地址(可以看作已经读取的字节长度),第二个是总长度,第三个是回调函数。
也需要获取共享指针延长生命周期,若无错误则继续读取消息内容(消息头部已经读完了)
若将数据读取完毕就会调用回调函数
传给
asyncReadLen()
函数的参数为read_len + bytesTransfered
(已经读取的长度 +async_read_some
读取的长度),total_len
(总长度),handler
(回调函数)
// 读取指定字节数
void CSession::asyncReadLen(std::size_t read_len, std::size_t total_len,std::function<void(const boost::system::error_code&, std::size_t)> handler)
{auto self = shared_from_this();_socket.async_read_some(boost::asio::buffer(_data + read_len, total_len - read_len),[read_len, total_len, handler, self](const boost::system::error_code& ec, std::size_t bytesTransfered) {if (ec) {// 出现错误,调用回调函数handler(ec, read_len + bytesTransfered);return;}// 实际上不会大于总长度,因为async_read_some中指定读取的长度为// total_len - read_len所以是不会超过指定读取的长度的if (read_len + bytesTransfered >= total_len) {// 长度够了就调用回调函数handler(ec, read_len + bytesTransfered);return;}// 没有错误,且长度不足则继续读取self->asyncReadLen(read_len + bytesTransfered, total_len, handler);});
}
读取消息的内容
也是在其中定义一个回调函数:
若没有错误则将数据读取出来,并添加结束标识符
\0
, 然后利用shared_from_this()
和接收消息内容节点创建逻辑节点,加入到逻辑队列中。逻辑队列主要是保证数据的有序性还松耦合。注意:
最后一定不要忘记继续执行
AsyncReadHead(HEAD_TOTAL_LEN)
接收后续传来的数据
// 读取数据主体
void CSession::AsyncReadBody(int total_len)
{auto self = shared_from_this();asyncReadFull(total_len, [self, this, total_len](const boost::system::error_code& ec, std::size_t bytes_transfered) {try {if (ec) {std::cout << "handle read failed, error is " << ec.what() << endl;Close();_server->ClearSession(_session_id);return;}if (bytes_transfered < total_len) {std::cout << "read length not match, read [" << bytes_transfered << "] , total ["<< total_len << "]" << endl;Close();_server->ClearSession(_session_id);return;}memcpy(_recv_msg_node->_data, _data, bytes_transfered);_recv_msg_node->_cur_len += bytes_transfered;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';cout << "receive data is " << _recv_msg_node->_data << endl;// 此处将消息投递到逻辑队列中LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));// 继续监听头部接受事件AsyncReadHead(HEAD_TOTAL_LEN);}catch (std::exception& e) {std::cout << "Exception code is " << e.what() << endl;}});
}
若队列中有数据就通知线程去队列中去取数据
void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) {std::unique_lock<std::mutex> unique_lk(_mutex);_msg_que.push(msg);// 由0变为1则发送通知信号if (_msg_que.size() == 1) {unique_lk.unlock();_consume.notify_one();}
}
- 逻辑层的构造函数中注册回调函数和开辟一个工作线程取处理消息
- **注意:**这里线程里面执行的是成员函数所以需要指定成员函数的对象,在线程中执行成员函数时,必须显式地传递对象的指针或引用,以便线程知道它应该在哪个对象上调用该成员函数。
- 因为当逻辑层退出时,可能逻辑队列中数据还没有处理完成,所以需要唤醒线程去处理。为了防止主线程(逻辑层)先于工作线程结束,所以执行
join()
阻塞等待工作线程执行完毕。
LogicSystem::LogicSystem() : _b_stop(false) {// 注册回调函数RegisterCallBacks();// 调用工作线程去处理消息_worker_thread = std::thread (&LogicSystem::DealMsg, this);
}LogicSystem::~LogicSystem() {_b_stop = true;_consume.notify_one();_worker_thread.join();
}
这些注册的回调函数在后面再详细的展开
void LogicSystem::RegisterCallBacks() {_fun_callbacks[MSG_CHAT_LOGIN] = std::bind(&LogicSystem::LoginHandler, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_SEARCH_USER_REQ] = std::bind(&LogicSystem::SearchInfo, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_ADD_FRIEND_REQ] = std::bind(&LogicSystem::AddFriendApply, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_AUTH_FRIEND_REQ] = std::bind(&LogicSystem::AuthFriendApply, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_TEXT_CHAT_MSG_REQ] = std::bind(&LogicSystem::DealChatTextMsg, this,placeholders::_1, placeholders::_2, placeholders::_3);
}
处理消息的函数:
- 若逻辑队列中为空则用条件变量阻塞等待,并释放锁
- 首先判断逻辑层是否为关闭状态,若为关闭状态则需要将逻辑队列中的消息处理完成再退出,处理过程为:通过接收的消息
id
先在内存中看_fun_callbacks
中是否存在注册的回调函数,若存在则执行并将逻辑节点弹出队列- 若逻辑层为运行状态,同样是从队列中取出逻辑节点来执行,只不过和上面不同的是每次循环只取出一个
void LogicSystem::DealMsg() {for (;;) {std::unique_lock<std::mutex> unique_lk(_mutex);// 判断队列为空则用条件变量阻塞等待,并释放锁while (_msg_que.empty() && !_b_stop) {_consume.wait(unique_lk);}// 判断是否为关闭状态,把所有逻辑执行完后则退出循环if (_b_stop) {while (!_msg_que.empty()) {auto msg_node = _msg_que.front();cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();}break;}// 如果没有停服,且说明队列中有数据auto msg_node = _msg_que.front();cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();std::cout << "msg id [" << msg_node->_recvnode->_msg_id << "] handler not found" << std::endl;continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();}
}