项目服务器登录部分

从连接池中获取上下文,接受连接。

async_accept 这个函数本质上是监听和接受客户端连接的结合操作

void CServer::StartAccept()
{auto& io_context = AsioIOServicePool::GetInstance()->GetIOService();std::shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
}

启动会话,将生成的会话id和会话绑定在一起(在 CSession 构造函数中生成会话 id),插入 _sessionsMap 中。**注意:**不要忘记递归调用接收函数 StartAccept()

void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error)
{if (!error) {new_session->Start();std::lock_guard<mutex> lock(_mutex);_sessions.insert(make_pair(new_session->GetSessionId(), new_session));}StartAccept();
}

首先读取头部字节大小,包括消息 id 和消息长度

void CSession::Start() {// 先读取数据的头部AsyncReadHead(HEAD_TOTAL_LEN);
}

指定读取头部节点的大小(4 字节)

接下来详细解释读取头部回调函数中的内容,这是已经将发生过来的数据完整的读取下来了

首先,若发生错误,则关闭套接字连接,从 Map 中移除会话 id 和会话的绑定。

若无错误,先需要清空接收头部节点,保证接受头部节点中没有之前的数据,然后将接收的数据复制到接收头部节点中。

然后从接收头部节点中获取消息 id 和消息长度,每次从头部节点取出数据前需要将网络字节序转化为本地字节序。

若获取消息id和消息长度无错误,则创建接收消息内容节点然后调用 AsyncReadBody() 函数

void CSession::AsyncReadHead(int total_len)
{auto self = shared_from_this();asyncReadFull(HEAD_TOTAL_LEN, [self, this](const boost::system::error_code& ec, std::size_t bytes_transfered) {try {if (ec) {std::cout << "handle read failed, error is " << ec.what() << endl;Close();_server->ClearSession(_session_id);return;}// 不足一个头部大小if (bytes_transfered < HEAD_TOTAL_LEN) {std::cout << "read length not match, read [" << bytes_transfered << "] , total ["<< HEAD_TOTAL_LEN << "]" << endl;Close();_server->ClearSession(_session_id);return;}_recv_head_node->Clear();memcpy(_recv_head_node->_data, _data, bytes_transfered);// 获取头部MSGID数据short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);// 网络字节序转化为本地字节序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg_id is " << msg_id << endl;// id非法if (msg_id > MAX_LENGTH) {std::cout << "invalid msg_id is " << msg_id << endl;_server->ClearSession(_session_id);return;}short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);// 网络字节序转化为本地字节序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg_len is " << msg_len << endl;// id非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length is " << msg_len << endl;_server->ClearSession(_session_id);return;}_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);AsyncReadBody(msg_len);}catch (std::exception& e) {std::cout << "Exception code is " << e.what() << endl;}});
}

该函数指定读取长度大小,传入需要读取的长度和执行的回调

每次都清空 _data 所以每次读取执行数据的读取时,只需要从头开始即可

// 读取完整长度
void CSession::asyncReadFull(std::size_t maxLength, std::function<void(const boost::system::error_code&, std::size_t)> handler)
{::memset(_data, 0, MAX_LENGTH);asyncReadLen(0, maxLength, handler);
}

第一个参数是起始地址(可以看作已经读取的字节长度),第二个是总长度,第三个是回调函数。

也需要获取共享指针延长生命周期,若无错误则继续读取消息内容(消息头部已经读完了)

若将数据读取完毕就会调用回调函数

传给 asyncReadLen() 函数的参数为 read_len + bytesTransfered (已经读取的长度 + async_read_some 读取的长度), total_len (总长度), handler (回调函数)

// 读取指定字节数
void CSession::asyncReadLen(std::size_t read_len, std::size_t total_len,std::function<void(const boost::system::error_code&, std::size_t)> handler)
{auto self = shared_from_this();_socket.async_read_some(boost::asio::buffer(_data + read_len, total_len - read_len),[read_len, total_len, handler, self](const boost::system::error_code& ec, std::size_t  bytesTransfered) {if (ec) {// 出现错误,调用回调函数handler(ec, read_len + bytesTransfered);return;}// 实际上不会大于总长度,因为async_read_some中指定读取的长度为// total_len - read_len所以是不会超过指定读取的长度的if (read_len + bytesTransfered >= total_len) {// 长度够了就调用回调函数handler(ec, read_len + bytesTransfered);return;}// 没有错误,且长度不足则继续读取self->asyncReadLen(read_len + bytesTransfered, total_len, handler);});
}

读取消息的内容

也是在其中定义一个回调函数:

若没有错误则将数据读取出来,并添加结束标识符 \0, 然后利用 shared_from_this() 和接收消息内容节点创建逻辑节点,加入到逻辑队列中。逻辑队列主要是保证数据的有序性还松耦合。

注意:

最后一定不要忘记继续执行 AsyncReadHead(HEAD_TOTAL_LEN) 接收后续传来的数据

// 读取数据主体
void CSession::AsyncReadBody(int total_len)
{auto self = shared_from_this();asyncReadFull(total_len, [self, this, total_len](const boost::system::error_code& ec, std::size_t bytes_transfered) {try {if (ec) {std::cout << "handle read failed, error is " << ec.what() << endl;Close();_server->ClearSession(_session_id);return;}if (bytes_transfered < total_len) {std::cout << "read length not match, read [" << bytes_transfered << "] , total ["<< total_len << "]" << endl;Close();_server->ClearSession(_session_id);return;}memcpy(_recv_msg_node->_data, _data, bytes_transfered);_recv_msg_node->_cur_len += bytes_transfered;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';cout << "receive data is " << _recv_msg_node->_data << endl;// 此处将消息投递到逻辑队列中LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));// 继续监听头部接受事件AsyncReadHead(HEAD_TOTAL_LEN);}catch (std::exception& e) {std::cout << "Exception code is " << e.what() << endl;}});
}

若队列中有数据就通知线程去队列中去取数据

void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) {std::unique_lock<std::mutex> unique_lk(_mutex);_msg_que.push(msg);// 由0变为1则发送通知信号if (_msg_que.size() == 1) {unique_lk.unlock();_consume.notify_one();}
}
  • 逻辑层的构造函数中注册回调函数和开辟一个工作线程取处理消息
  • **注意:**这里线程里面执行的是成员函数所以需要指定成员函数的对象,在线程中执行成员函数时,必须显式地传递对象的指针或引用,以便线程知道它应该在哪个对象上调用该成员函数。
  • 因为当逻辑层退出时,可能逻辑队列中数据还没有处理完成,所以需要唤醒线程去处理。为了防止主线程(逻辑层)先于工作线程结束,所以执行 join() 阻塞等待工作线程执行完毕。
LogicSystem::LogicSystem() : _b_stop(false) {// 注册回调函数RegisterCallBacks();// 调用工作线程去处理消息_worker_thread = std::thread (&LogicSystem::DealMsg, this);
}LogicSystem::~LogicSystem() {_b_stop = true;_consume.notify_one();_worker_thread.join();
}

这些注册的回调函数在后面再详细的展开

void LogicSystem::RegisterCallBacks() {_fun_callbacks[MSG_CHAT_LOGIN] = std::bind(&LogicSystem::LoginHandler, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_SEARCH_USER_REQ] = std::bind(&LogicSystem::SearchInfo, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_ADD_FRIEND_REQ] = std::bind(&LogicSystem::AddFriendApply, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_AUTH_FRIEND_REQ] = std::bind(&LogicSystem::AuthFriendApply, this,placeholders::_1, placeholders::_2, placeholders::_3);_fun_callbacks[ID_TEXT_CHAT_MSG_REQ] = std::bind(&LogicSystem::DealChatTextMsg, this,placeholders::_1, placeholders::_2, placeholders::_3);
}

处理消息的函数:

  • 若逻辑队列中为空则用条件变量阻塞等待,并释放锁
  • 首先判断逻辑层是否为关闭状态,若为关闭状态则需要将逻辑队列中的消息处理完成再退出,处理过程为:通过接收的消息 id 先在内存中看 _fun_callbacks 中是否存在注册的回调函数,若存在则执行并将逻辑节点弹出队列
  • 若逻辑层为运行状态,同样是从队列中取出逻辑节点来执行,只不过和上面不同的是每次循环只取出一个
void LogicSystem::DealMsg() {for (;;) {std::unique_lock<std::mutex> unique_lk(_mutex);// 判断队列为空则用条件变量阻塞等待,并释放锁while (_msg_que.empty() && !_b_stop) {_consume.wait(unique_lk);}// 判断是否为关闭状态,把所有逻辑执行完后则退出循环if (_b_stop) {while (!_msg_que.empty()) {auto msg_node = _msg_que.front();cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();}break;}// 如果没有停服,且说明队列中有数据auto msg_node = _msg_que.front();cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();std::cout << "msg id [" << msg_node->_recvnode->_msg_id << "] handler not found" << std::endl;continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/877778.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JS中encodeURIComponent和encodeURI函数的区别

1、encodeURIComponent函数用于对完整的URL组件进行编码&#xff0c;包括查询参数、路径片段等。 它会对所有非字母数字字符进行编码&#xff0c;并将其替换为相应的URL编码形式。这包括对特殊字符&#xff08;如冒号、斜杠、问号、等号、加号等&#xff09;的编码。encodeURI…

拍抖音在哪里去水印,三招教你快速掌握去水印技巧

在抖音上&#xff0c;我们经常会看到一些精彩的内容&#xff0c;想要保存下来&#xff0c;但往往视频上会有水印。本文将分享五个免费且高效的去除抖音视频水印的技巧&#xff0c;帮助你轻松保存无水印的视频。 技巧一&#xff1a;奈斯水印助手(小程序) 奈斯水印助手是一款专…

GIS空间数据库,基本概念

文章目录 一、前言二、空间概念三、地理空间四、空间数据五、数据库六、空间数据库七、空间数据库与传统数据库的差异7.1 信息描述差异7.2 数据管理差异7.3 数据操作差异7.3 数据更新差异7.3 服务应用差异 一、前言 数据是指客观事物的属性、数量、位置及其相互关系等的符号描…

JavaScript(30)——解构

数组解构 数组解构是将数组的单元值快速批量赋值给一系列变量的简洁语法 基本语法&#xff1a; 赋值运算符左侧的[]用于批量声明变量&#xff0c;右侧数组的单元值将被赋值给左侧变量变量的顺序对应数组单元值的位置依次进行赋值操作 const arr [1, 2, 3, 4, 5]const [a, b…

云渲染的三个条件是指什么!哪三点最重要!

云渲染技术以其灵活性和效率&#xff0c;让创意人士和企业无论身处何地&#xff0c;都能通过网络接入强大的远程服务器&#xff0c;轻松完成复杂的图形渲染任务&#xff0c;但要发挥其魔力&#xff0c;我们得满足一些关键条件。 一、网络连接&#xff1a;云渲染的桥梁 首先&am…

PHP伪协议漏洞详解(附案例讲解)

文章目录 引言什么是PHP伪协议常见的PHP伪协议PHP伪协议漏洞原理实际案例分析案例一&#xff1a;利用php://filter读取敏感文件源码案例二&#xff1a;利用zip://协议执行压缩包中的恶意代码案例三&#xff1a;利用data://协议执行任意代码 引言 PHP伪协议是PHP中一种用于访问…

Go RPC 和 gRPC 技术详解

引言 在分布式系统中&#xff0c;服务之间的通信是非常重要的组成部分。远程过程调用 (RPC) 是一种广泛使用的通信方式&#xff0c;它允许程序在不同的计算机上执行函数或过程&#xff0c;就像调用本地函数一样。随着微服务架构的流行&#xff0c;RPC 成为了连接各个服务的重要…

协作新选择:即时白板在线白板软件分享

在团队合作中&#xff0c;产品经理扮演着至关重要的角色&#xff0c;他们不仅是产品与用户之间的纽带&#xff0c;更是产品性能和用户需求的桥梁。他们需要深入参与产品的研发过程&#xff0c;并与研发团队保持紧密的沟通。因此&#xff0c;产品经理需要一款高效的协作工具来提…

arthas源码刨析:arthas 命令粗谈(3)

文章目录 dashboardwatchretransform 前面介绍了 arthas 启动相关的代码并聊了聊怎么到一个 shellserver 的建立。 本篇我们来探讨一下几个使用频次非常高的命令是如何实现的。 dashboard 想看这个命令的主要原因是编程这些年来从来没有开发过 terminal 的这种比较花哨的界面&a…

php生成json字符串,python解析json字符串

<?php $nodes []; $_tmp[title] 标题1; $_tmp[titlekey] actt; $_tmp[child] [acww.zip, acww21.zip, tta.zip]; $nodes[] $_tmp;$_tmp2[title] 标题2; $_tmp2[titlekey] kfij; $_tmp2[child] [KL7SHR47.zip, fdgfdg.zip, qweqw.zip]; $nodes[] $_tmp2;// 构建调用…

SpringBoot集成kafka-获取生产者发送的消息(阻塞式和非阻塞式获取)

说明 CompletableFuture对象需要的SpringBoot版本为3.X.X以上&#xff0c;需要的kafka依赖版本为3.X.X以上&#xff0c;需要的jdk版本17以上。 1、阻塞式&#xff08;等待式&#xff09;获取生产者发送的消息 生产者&#xff1a; package com.power.producer;import org.ap…

【html+css 绚丽Loading】 000014 三元波动盘

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享htmlcss 绚丽Loading&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495…

JVM系列--初始JVM

根据《黑马程序员JVM虚拟机入门到实战全套视频教程》整理 1 什么是JVM JVM 全称是 Java Virtual Machine&#xff0c;中文译名 Java虚拟机。JVM 本质上是一个运行在计算机上的程序&#xff0c;他的职责是运行Java字节码文件。 Java源代码执行流程如下&#xff1a; 分为三个步…

代码随想录day52 101孤岛的总面积 102沉没孤岛 103水流问题 104建造最大岛屿

代码随想录day52 101孤岛的总面积 102沉没孤岛 103水流问题 104建造最大岛屿 101孤岛的总面积 代码随想录 #include <iostream> #include <vector>using namespace std; int count 0; int dir[4][2] {{1, 0}, {0, 1}, {-1 ,0}, {0, -1}};void dfs(vector<v…

书生大模型实战营第三期基础岛第二课——8G 显存玩转书生大模型 Demo

8G 显存玩转书生大模型 Demo 基础任务进阶作业一&#xff1a;进阶作业二&#xff1a; 基础任务 使用 Cli Demo 完成 InternLM2-Chat-1.8B 模型的部署&#xff0c;并生成 300 字小故事&#xff0c;记录复现过程并截图。 创建conda环境 # 创建环境 conda create -n demo pytho…

[Meachines] [Easy] Legacy nmap 漏洞扫描脚本深度发现+MS08-067

信息收集 IP AddressOpening Ports10.10.10.4TCP:135,139,445 $ nmap -p- 10.10.10.4 --min-rate 1000 -sC -sV -Pn PORT STATE SERVICE VERSION 135/tcp open msrpc Microsoft Windows RPC 139/tcp open netbios-ssn Microsoft Windows n…

Docker私人学习笔记

俗话说“好记性不如烂笔头”&#xff0c;编程的海洋如此的浩大&#xff0c;养成做笔记的习惯是成功的一步&#xff01; 此笔记主要是antlr4.13版本的笔记&#xff0c;并且笔记都是博主自己一字一字编写和记录&#xff0c;有错误的地方欢迎大家指正。 一、基础概念&#xff1a;…

Tomcat 服务器详解与优化实践

文章目录 Tomcat 服务器详解与优化实践一、Tomcat 简介1.1 什么是 Tomcat1.2 Tomcat 的核心组件1.3 什么是 Servlet 和 JSP 二、Tomcat 的核心组件结构2.1 Connector2.2 Container2.3 Tomcat 请求处理过程 三、Tomcat 服务部署3.1 安装准备3.2 安装 JDK3.3 安装和启动 Tomcat3.…

Java二十三种设计模式-责任链模式(17/23)

责任链模式&#xff1a;实现请求处理的灵活流转 引言 在这篇博客中&#xff0c;我们深入探讨了责任链模式的精髓&#xff0c;从其定义和用途到实现方法&#xff0c;再到使用场景、优缺点、与其他模式的比较&#xff0c;以及最佳实践和替代方案&#xff0c;旨在指导开发者如何…

SAP BW:QUERY数据结果写入ADSO

作者 idan lian 如需转载备注出处 如果对你有帮助&#xff0c;请点赞收藏~~~ 需求背景 客户基于QUERY进行报表展示&#xff0c;现需迁移到永洪报表平台&#xff0c;query中的变量参数&#xff0c;公式等无法直接生成视图&#xff0c;query相对复杂&#xff0c;不想直接在视图…