【Muduo】TcpConnection类

Muduo网络库的TcpConnection类代表了TCP连接的一端,即服务器与远程对等端之间的连接。TcpConnection类知道自身和对端的InetAddress、封装了前面讲过的Socket类和Channel类,并且保有管理自己的subLoop指针,还有多种事件处理函数和回调,还有属于自己的接收Buffer/发送Buffer。为上层应用提供了简洁的接口来处理连接的生命周期和数据传输。

主要成员变量

  • socket:Socket类封装了底层的socket文件描述符,用于网络通信。
  • channel:与socket关联的Channel对象,用于在事件循环中注册读/写/错误等事件。
  • loopEventLoop指针,表示该连接所属的事件循环。
  • name:连接的名称或标识,通常用于日志记录。
  • state:连接的状态,如kConnecting、kConnected、kDisconnecting、kDisconnected等。
  • highWaterMark:高水位线,用于流量控制。
  • messageCallback:当接收到数据时调用的回调函数。
  • writeCompleteCallback:当数据发送完成时调用的回调函数。
  • connectionCallback:当连接状态发生变化(如建立、断开)时调用的回调函数。

主要功能

1. 连接管理
  • 建立连接:在mainLoop中的Acceptor接收到一个新连接请求的时候,回调TcpServer中的新连接处理函数,在其中选择一个subLoop并由此构造出一个TcpConnection对象,构造函数中设置好本连接的相关信息,mainLoop就会将这个新Connection放到subLoop中运行。在连接建立之后,会紧接着向EPollPoller注册感兴趣的事件,并设置连接状态。
  • 断开连接:当需要断开连接时,TcpConnection会关闭底层的socket文件描述符,并触发相关的回调函数。此外,它还会处理连接关闭时的清理工作,如取消事件注册、释放资源等。
2. 数据传输
  • 接收数据:当底层socket有数据可读时,事件循环会通知关联的Channel对象。TcpConnection会读取数据并调用注册的messageCallback回调函数来处理接收到的数据。
  • 发送数据:应用程序可以通过调用TcpConnection的发送接口(send和sendInLoop)来发送数据。这些数据会被缓存在内部缓冲区中,并通过底层socket逐步发送出去。当所有数据都发送完成时,TcpConnection会调用注册的writeCompleteCallback回调函数来通知应用程序。

    关于数据传输,需要在源码中仔细品味。

3. 状态通知
  • 连接状态变化:当连接状态发生变化时(如建立、断开),TcpConnection会触发注册的connectionCallback回调函数来通知应用程序。这样,应用程序可以根据连接状态的变化来执行相应的逻辑,如重新连接、关闭会话等。

设计思想

  • 封装底层细节TcpConnection类封装了底层的socket API和事件循环机制,为上层应用提供了简洁的接口来处理连接和数据传输。这样,应用程序可以专注于业务逻辑的实现,而无需关心底层的网络编程细节。
  • 事件驱动:通过使用事件循环和回调函数机制,TcpConnection类实现了基于事件驱动的编程模型。这种模型使得应用程序能够高效地处理多个并发连接和数据传输任务。
  • 状态管理TcpConnection类维护了连接的状态信息,并通过回调函数机制将状态变化通知给上层应用。这样,应用程序可以根据连接状态的变化来执行相应的逻辑操作。

源码

TcpConnection.h
#pragma once#include "noncopyable.h"
#include "Buffer.h"
#include "Timestamp.h"
#include "InetAddress.h"
#include "Callbacks.h"#include <memory>
#include <string>
#include <atomic>class Channel;
class EventLoop;
class Socket;// 已连接socket的通信链路
/*** TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd* => TcpConnetion 设置回调 => Channel => Poller => Channel的回调操作* 专门描述一个已建立连接的相应信息 
*/
class TcpConnection: noncopyable,public std::enable_shared_from_this<TcpConnection> // 运行本类的对象产生智能指针
{
public:TcpConnection(EventLoop* loop,const std::string& name,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr);~TcpConnection();EventLoop* getLoop() const { return loop_; }const std::string& name() const { return name_; }const InetAddress& localAddress() const { return localAddr_; }const InetAddress& peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }bool disconnected() const { return state_ == kDisconnected; }std::string getTcpInfoString() const;// 用户调用,给客户端发送数据void send(const std::string& message);// 关闭连接void shutdown();void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback& cb){ writeCompleteCallback_ = cb; }void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark){ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }void setCloseCallback(const CloseCallback& cb){ closeCallback_ = cb; }// called when TcpServer accepts a new connection// 连接建立void connectEstablished();   // should be called only once// called when TcpServer has removed me from its map// 连接销毁void connectDestroyed();  // should be called only onceprivate://            初始化       建立连接    调用shutdown      关闭完底层soketenum StateE { kConnecting, kConnected, kDisconnecting,  kDisconnected};void handleRead(Timestamp receiveTime);void handleWrite();void handleClose();void handleError();void sendInLoop(const void* message, size_t len);void shutdownInLoop();const char* stateToString() const;// void startReadInLoop();// void stopReadInLoop();void setState(StateE s) { state_ = s; }EventLoop* loop_; // TcpConnection都是在subLoop里管理的const std::string name_;std::atomic_int state_; bool reading_;// 这里和mainLoop中的Acceptor很像,而TcpConnection在subLoop中std::unique_ptr<Socket> socket_;std::unique_ptr<Channel> channel_;const InetAddress localAddr_;const InetAddress peerAddr_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;HighWaterMarkCallback highWaterMarkCallback_;CloseCallback closeCallback_; // 来自TcpServer的回调,用于在Server中删除本connsize_t highWaterMark_; // 防止本端发送过快对面接收不及Buffer inputBuffer_; // 接收数据的缓冲区Buffer outputBuffer_; // 发送数据的缓冲区
};
TcpConnection.cc
#include "TcpConnection.h"
#include "LogStream.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"#include <functional>
#include <sys/types.h> 
#include <sys/socket.h>static EventLoop *checkLoopNotNull(EventLoop *loop)
{if (loop == nullptr){LOG_FATAL << "subLoop is null!";}return loop;
}TcpConnection::TcpConnection(EventLoop *loop, const std::string &nameArg, int sockfd,const InetAddress &localAddr, const InetAddress &peerAddr): loop_(checkLoopNotNull(loop)),name_(nameArg),state_(kConnecting),reading_(true),socket_(new Socket(sockfd)),channel_(new Channel(loop, sockfd)),localAddr_(localAddr),peerAddr_(peerAddr),highWaterMark_(64 * 1024 * 1024) // 64M
{// 给channel设置相应的回调函数// poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this<< " fd=" << sockfd;socket_->setKeepAlive(true);
}TcpConnection::~TcpConnection()
{LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this<< " fd=" << channel_->fd()<< " state=" << stateToString();
}void TcpConnection::send(const std::string& message)
{LOG_DEBUG << "state_:" << stateToString() << "  loop_=" << loop_;if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(message.c_str(), message.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop,this,message.c_str(),message.size()));}}
}/*** 发送数据,应用写得快,而内核发送数据慢,需要将待发送数据写入缓冲区,而且设置水位回调
*/
void TcpConnection::sendInLoop(const void *data, size_t len)
{LOG_DEBUG << "TcpConnection::sendInLoop()  send len:" << len;ssize_t nwrote = 0;size_t remaining = len; // 剩余字节数bool faultError = false;if(state_ == kDisconnected){LOG_ERROR << "disconnected, give up writing";return;}// 刚开始对写事件还不感兴趣// 表示channe_第一次开始写数据,而且缓冲区没有待发送的数据,尝试直接发送if(!channel_->isWriting() && outputBuffer_.writableBytes() == 0){nwrote = ::write(channel_->fd(), data, len); // 返回具体发送个数if(nwrote >= 0){remaining = len - nwrote;if(remaining == 0 && writeCompleteCallback_){ // 全部发送完了,执行用户的写完回调loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else // nwrote < 0{nwrote = 0;if (errno != EWOULDBLOCK) // EWOULDBLOCK 表示由于非阻塞没有数据的一个正常返回{LOG_ERROR << "TcpConnection::sendInLoop";if (errno == EPIPE || errno == ECONNRESET) // 接收到对端的重置{faultError = true;}}}}// 说明当前这次write没有把数据全部发送出去,剩余的数据需要保存到缓冲区中// 然后给channel注册epollout事件,poller发现tcp的发送缓冲区有空间,会通知相应的sock-channel,// 调用writeCallback_回调方法,也就是调用TcpConnction::handleWrite方法,把发送缓冲区中的数据全部发送完成if (!faultError && remaining > 0) {//  目前发送缓冲区剩余待发送数据长度size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_&& oldLen < highWaterMark_&& highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);if (!channel_->isWriting()){// 一定要注册channel的写事件,不然poller不会给channl通知epolloutchannel_->enableWriting();}}
}// 用户调用的
void TcpConnection::shutdown()
{if (state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{if (!channel_->isWriting()) // 说明outputBuffer中的数据已经全部发送完成{socket_->shutdownWrite(); // 关闭写端,触发epllHUP,调用closeCB,最终执行handleClose}
}const char *TcpConnection::stateToString() const
{switch (state_){case kDisconnected:return "kDisconnected";case kConnecting:return "kConnecting";case kConnected:return "kConnected";case kDisconnecting:return "kDisconnecting";default:return "unknown state";}
}void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this()); // Channel绑定本connection,防止TcpConnection对象析构之后Channel还要执行其回调操作channel_->enableReading(); // 向poller注册EPOLLIN事件// 连接建立,执行回调connectionCallback_(shared_from_this());
}void TcpConnection::connectDestroyed()
{if (state_ == kConnected){setState(kDisconnected);channel_->disableAll(); // 从Epoll中移除所有感兴趣事件connectionCallback_(shared_from_this());}channel_->remove();
}void TcpConnection::handleRead(Timestamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0){LOG_DEBUG << "TcpConnection::handleRead()  read bytes from buf:" << n;// 已建立连接的用户,有可读事件发生了,调用用户传入的回调onMessage// 从本对象产生一个智能指针messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0){handleClose();}else{errno = savedErrno;LOG_ERROR << "TcpConnection::handleRead";handleError();}
}// 有EpollOut事件的时候才执行
void TcpConnection::handleWrite()
{if (channel_->isWriting()){int saveError = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(), &saveError);if (n > 0){// n个数据写入成功,恢复n个字节outputBuffer_.retrieve(n);// ==0:已经发送完成了;!=0:还没发送完成if (outputBuffer_.readableBytes() == 0){channel_->disableWriting(); // ??if (writeCompleteCallback_){// 唤醒subLoop对应的线程,在其中执行回调函数// 其实调用 TcpConnection::handleWrite() 函数的时候,已经在这个subLoop中了loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}// 可能client在接收到数据后就会shutdownif (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_ERROR << "TcpConnection::handleWrite";}}// 上一轮监听中,已经设置了对写事件的不感兴趣else{LOG_INFO << "Connection fd = " << channel_->fd()<< " is down, no more writing";}
}void TcpConnection::handleClose()
{LOG_INFO << "fd = " << channel_->fd() << " state = " << stateToString();// we don't close fd, leave it to dtor, so we can find leaks easily.setState(kDisconnected);channel_->disableAll(); // 在poller上删除Channel所有的感兴趣事件TcpConnectionPtr guardThis(shared_from_this());if(connectionCallback_){connectionCallback_(guardThis); // 执行用户注册的连接回调,用户会在其中判断链接状态}if(closeCallback_){closeCallback_(guardThis); // 关闭连接的回调,调用到TcpServer::removeConnection} 
}void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;if(::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0){LOG_ERROR << "TcpConnection::handleError [" << name_<< "] - errno = " << errno;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/14452.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【搜索】BFS

#include <iostream> #include <cstring> #include <queue>using namespace std;const int N 110;typedef pair<int, int> PII;int n, m; int g[N][N], d[N][N];//存放地图//存每一个点到起点的距离int bfs() {queue< PII > q;q.push({0, 0});m…

C语言什么是位段?其优点是什么?

一、问题 在内存中&#xff0c;1byte 8bit&#xff0c;即 1 字节等于 8 位。位由两个值组成&#xff0c;即 0 和 1 。因此&#xff0c;存储在计算机中的 1 字节&#xff0c;可以看成是8个⼆进制数字&#xff08;0 和1&#xff09;组成的串。了解了内存空间的最⼩单位&#xff…

16.js数学方法和进制转换

数学方法 &#xff08;1&#xff09;Math.random() 默认生成0-1的随机数 var resMath.random() console.log(res) &#xff08;2&#xff09;Math.round(数字) 取整&#xff1a;正数-四舍五入 负数-5舍6入 var resMath.round(11)console.log(res) //11var res1Math.round(1…

Aerospike设置日志按日期保存及日志保存日期

配置文件位置&#xff1a;/etc/aerospike/aerospike.conf 是Aerospike的主配置文件&#xff0c;其中包含了日志配置以及其他各种设置。 日志配置&#xff1a;在aerospike.conf文件中&#xff0c;找到logging部分进行配置。以下是一个示例配置&#xff1a; logging { # 日志文…

CentOS7安装内网穿透实现远程推送镜像到本地Docker Registry

文章目录 前言1. 部署Docker Registry2. 本地测试推送镜像3. Linux 安装cpolar4. 配置Docker Registry公网访问地址5. 公网远程推送Docker Registry6. 固定Docker Registry公网地址 前言 本文主要介绍如何部署Docker Registry 本地镜像仓库,简单几步结合cpolar内网穿透工具实现…

网络安全之重发布与路由策略详解

重发布&#xff1b;import &#xff08;路由导入&#xff09; 将不同方式&#xff08;直连、静态、缺省、其他协议&#xff09;的路由器重发布进入RIP&#xff0c;OSPF中。 注意&#xff1a;1、华为中不能将缺省路由重发布进入RUO协议&#xff08;思科也是一样&#xff09;。…

Mac下QT开发环境搭建详细教程

QT Qt是一个跨平台的C应用程序框架&#xff0c;用于开发具有图形用户界面&#xff08;GUI&#xff09;的应用程序&#xff0c;同时也可用于开发非GUI程序&#xff0c;比如控制台工具和服务器。Qt是设计成通用、可移植和高效的&#xff0c;它广泛应用于全球的企业和开发者社区中…

青少年 CTF 练习平台:Misc(一)

前言 当然&#xff0c;我可以更详细地介绍一下青少年CTF练习平台。 青少年CTF练习平台是一个专为青少年设计的网络安全竞赛和训练平台。该平台由思而听&#xff08;山东&#xff09;网络科技有限公司与克拉玛依市思而听网络科技有限公司共同建设&#xff0c;自2018年创建以来…

图论定理汇总(二)

第六章 平面图 (一)、平面图的概念 定义1 如果能把图 G G G画在平面上&#xff0c;使得除顶点外&#xff0c;边与边之间没有交叉&#xff0c;称 G G G可嵌入平面&#xff0c;或称 G G G是可平面图。可平面图 G G G的边不交叉的一种画法&#xff0c;称为 G G G的一种平面嵌入&…

入门四认识HTML

一、HTML介绍 1、Web前端三大核心技术 HTML&#xff1a;负责网页的架构 CSS&#xff1a;负责网页的样式、美化 JS&#xff1a;负责网页的行动 2、什么是HTML HTML是用来描述网页的一种语言。 3、Html标签 单标签<html> 双标签<h>内容</h> 4、标…

spring boot整合j2cache 关闭二级缓存

我们整合了 j2cache 的项目启动 日志会输出 一级缓存 二级缓存 一级是 EhCacheProvider 二级是 SpringRedisProvider 如果 我们不想用二级缓存 在 j2cache.properties 中 加上 j2cache.12-cache-open配置 值为 true/false true是启用二级缓存 false 是不起用 默认 true 所以 …

多输入多输出 | Matlab实现GA-CNN遗传算法优化卷积神经网络多输入多输出预测

多输入多输出 | Matlab实现GA-CNN遗传算法优化卷积神经网络多输入多输出预测 目录 多输入多输出 | Matlab实现GA-CNN遗传算法优化卷积神经网络多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab实现GA-CNN遗传算法优化卷积神经网络多输入多输出预测&…

微服务技术框架-注册中心-负载均衡

应用层的负载均衡可以选择依赖注册中心&#xff0c;也可以不依赖注册中心。以下是两种情况的详细说明&#xff1a; 1. 不依赖注册中心的负载均衡 在没有注册中心的情况下&#xff0c;应用层负载均衡可以通过配置静态服务器列表或动态检测服务器健康状态来实现。以下是一些常见…

企业防泄密软件有哪些,哪个排名最好

机密数据的泄密对于企业而言&#xff0c;已成为最大的信息安全威胁之一。近年来企业面对的最大威胁来自于内部&#xff0c;以利益为出发点的互联网信息犯罪及案件&#xff0c;在世界各地不断传出&#xff0c;因此&#xff0c;信息保护与管控将逐渐成为企业信息安全重点部署项目…

VMware 安装Windows 7 SP1

1.下载镜像 迅雷&#xff1a;ed2k://|file|cn_windows_7_enterprise_with_sp1_x64_dvd_u_677685.iso|3265574912|E9DB2607EA3B3540F3FE2E388F8C53C4|/ 2.安装过程 自定义名字&#xff0c;点击【浏览】自定义安装路径 点击【浏览】&#xff0c;选择下载镜像的路径 结束啦~

html+css绘制自定义样式输入框

效果&#xff1a; 代码&#xff1a; html部分&#xff1a; <div class"box"> <div class"newbox"><input type"text" required><div class"name">Username</div></div> </div>css部分 …

投骰子——(随机游戏的控制)

精华点在于&#xff1a;利用封装&#xff0c;函数之间的良好调用&#xff0c;从而清晰明了的解决问题。 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> # include<stdlib.h> # include<time.h> # include"math.h" # define ARR_LEN 10 # d…

hpc中查看显存占用,等效nvidia-smi

nvidia-smi在hpc中无法使用&#xff0c; 但是可以通过以下方法查看应用程序占用的显存 先执行程序&#xff0c;之后 bjobs输出 可以看到使用的是gpu01节点 之后 ssh gpu01

react antd中transfer穿梭框组件中清除搜索框内容

如图&#xff1a;需要清除search搜索框内容 antd的transfer穿梭框组件未提供入口修改input框的值。 2种方法修改。 1、直接操作dom元素设置值&#xff08;不推荐&#xff09; useEffect(() > {const searchInput document.querySelector(.ant-transfer-list-search input)…

Proteus仿真小技巧(隔空连线)

用了好几天Proteus了.总结一下使用的小技巧. 目录 一.隔空连线 1.打开添加网络标号 2.输入网络标号 二.常用元件 三.运行仿真 四.总结 一.隔空连线 引出一条线,并在末尾点一下. 1.打开添加网络标号 选择添加网络标号, 也可以先点击按钮,再去选择线(注意不要点端口) 2.…