muduo源码阅读笔记(11、TcpClient)

muduo源码阅读笔记(11、TcpClient)

Muduo源码笔记系列:

muduo源码阅读笔记(0、下载编译muduo)

muduo源码阅读笔记(1、同步日志)

muduo源码阅读笔记(2、对C语言原生的线程安全以及同步的API的封装)

muduo源码阅读笔记(3、线程和线程池的封装)

muduo源码阅读笔记(4、异步日志)

muduo源码阅读笔记(5、Channel和Poller)

muduo源码阅读笔记(6、EvevntLoop和Thread)

muduo源码阅读笔记(7、EventLoopThreadPool)

muduo源码阅读笔记(8、定时器TimerQueue)

muduo源码阅读笔记(9、TcpServer)

muduo源码阅读笔记(10、TcpConnection)

muduo源码阅读笔记(11、TcpClient)

前言

本章新涉及的文件有:

  1. TcpClient.h/cc:和TcpServer不同的是,TcpClient位于客户端,主要是对客户发起的连接进行管理,TcpClient只有一个loop,也会和TcpConnection配合,将三次握手连接成功的sockfd交由TcpConnection管理。

  2. Connector.h/cc:Muduo将一个客户端的sock分成了两个阶段,分别是:连接阶段、读写阶段,Connector就是负责fd的连接阶段,当一个sockfd连接成功后,将sockfd传给TcpClient,由TcpClient将sockfd传给TcpConnection进行读写管理,Connector和TcpServer的Acceptor在设计上有这类似的思想,不同的是,Connector是可以针对同一个ip地址进行多次连接,产生不同的sockfd、而Acceptor是去读listen sock来接收连接,产生不同sockfd。

总体来说,TcpClient的实现是严格遵循TcpServer的实现的,

Connector的实现

提供的接口:

class Connector : noncopyable,public std::enable_shared_from_this<Connector>{
public:typedef std::function<void (int sockfd)> NewConnectionCallback;Connector(EventLoop* loop, const InetAddress& serverAddr);~Connector();void setNewConnectionCallback(const NewConnectionCallback& cb){ newConnectionCallback_ = cb; }void start();  // can be called in any threadvoid restart();  // must be called in loop threadvoid stop();  // can be called in any threadconst InetAddress& serverAddress() const { return serverAddr_; }private:enum States { kDisconnected, kConnecting, kConnected };static const int kMaxRetryDelayMs = 30*1000;static const int kInitRetryDelayMs = 500;void setState(States s) { state_ = s; }void startInLoop();void stopInLoop();void connect();void connecting(int sockfd);void handleWrite();void handleError();void retry(int sockfd);int removeAndResetChannel();void resetChannel();EventLoop* loop_; // 连接发起所在loopInetAddress serverAddr_;  // 连接到哪里bool connect_; // atomic  // 开始连接?States state_;  // FIXME: use atomic variable // 连接状态std::unique_ptr<Channel> channel_;  // fd读写以及读写事件管理,对epoll/poll/selectIO多路复用的抽象,方便跨平台。NewConnectionCallback newConnectionCallback_; // 一般是:TcpClient::newConnectionint retryDelayMs_;  // 连接重试毫秒数。
};

简单记录一下连接阶段启动流程:

调用Connector::start()->

  1. connect_ 赋值为 true。

  2. 在loop任务队列追加Connector::startInLoop()回调任务

    1. 执行回调任务:Connector::startInLoop()

    2. 调用Connector::connect()

      1. 创建非阻塞的连接sock

      2. ::connect(sock, …)

      3. 调用Connector::connecting(int sockfd)

        1. new channel(sockfd)赋值给channel_将Connector::handleWrite()和Connector::handleError()设置给cahnnel的写回调以及错误处理回调

        2. 使能Poller开始监听sockfd

当连接成功,会触发sockfd的写事件,从而调用Connector::handleWrite()->

  1. 将sockfd和channel_解绑,并将channel_ rest。

  2. 调用newConnectionCallback_(也即TcpClient::newConnection)将连接完成的sockfd传给TcpClient处理

感兴趣的读者,可以自行阅读源码,了解连接过程中,stop、retry的流程。

实现的伪代码:


void Connector::start(){connect_ = true;loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}void Connector::startInLoop(){loop_->assertInLoopThread();assert(state_ == kDisconnected);if (connect_){connect();}else{LOG_DEBUG << "do not connect";}
}void Connector::stop(){connect_ = false;loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe// FIXME: cancel timer
}void Connector::stopInLoop(){loop_->assertInLoopThread();if (state_ == kConnecting){setState(kDisconnected);int sockfd = removeAndResetChannel();retry(sockfd);}
}void Connector::connect(){int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());int savedErrno = (ret == 0) ? 0 : errno;switch (savedErrno){case 0:case EINPROGRESS:case EINTR:case EISCONN:connecting(sockfd);break;/*...*/}
}void Connector::connecting(int sockfd){setState(kConnecting);assert(!channel_);channel_.reset(new Channel(loop_, sockfd));channel_->setWriteCallback(std::bind(&Connector::handleWrite, this)); // FIXME: unsafechannel_->setErrorCallback(std::bind(&Connector::handleError, this)); // FIXME: unsafe// channel_->tie(shared_from_this()); is not working,// as channel_ is not managed by shared_ptrchannel_->enableWriting();
}int Connector::removeAndResetChannel(){channel_->disableAll();channel_->remove();int sockfd = channel_->fd();// Can't reset channel_ here, because we are inside Channel::handleEventloop_->queueInLoop(std::bind(&Connector::resetChannel, this)); // FIXME: unsafereturn sockfd;
}void Connector::resetChannel(){channel_.reset();
}void Connector::handleWrite(){LOG_TRACE << "Connector::handleWrite " << state_;if (state_ == kConnecting){int sockfd = removeAndResetChannel();int err = sockets::getSocketError(sockfd);if (err){LOG_WARN << "Connector::handleWrite - SO_ERROR = "<< err << " " << strerror_tl(err);retry(sockfd);}else{setState(kConnected);if (connect_){newConnectionCallback_(sockfd);}else{sockets::close(sockfd);}}}else{// what happened?assert(state_ == kDisconnected);}
}void Connector::handleError(){LOG_ERROR << "Connector::handleError state=" << state_;if (state_ == kConnecting){int sockfd = removeAndResetChannel();int err = sockets::getSocketError(sockfd);LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);retry(sockfd);}
}void Connector::retry(int sockfd){sockets::close(sockfd);setState(kDisconnected);if (connect_){LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()<< " in " << retryDelayMs_ << " milliseconds. ";loop_->runAfter(retryDelayMs_/1000.0, // 稍后重试std::bind(&Connector::startInLoop, shared_from_this()));retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);  // 超时加倍}else{LOG_DEBUG << "do not connect";}
}

TcpClient的实现

提供的接口:

class TcpClient : noncopyable
{
public:// TcpClient(EventLoop* loop);// TcpClient(EventLoop* loop, const string& host, uint16_t port);TcpClient(EventLoop* loop,const InetAddress& serverAddr,const string& nameArg);~TcpClient();  // force out-line dtor, for std::unique_ptr members.void connect();void disconnect();void stop();TcpConnectionPtr connection() const{MutexLockGuard lock(mutex_);return connection_;}EventLoop* getLoop() const { return loop_; }bool retry() const { return retry_; }void enableRetry() { retry_ = true; }const string& name() const{ return name_; }/// Set connection callback./// Not thread safe.void setConnectionCallback(ConnectionCallback cb){ connectionCallback_ = std::move(cb); }/// Set message callback./// Not thread safe.void setMessageCallback(MessageCallback cb){ messageCallback_ = std::move(cb); }/// Set write complete callback./// Not thread safe.void setWriteCompleteCallback(WriteCompleteCallback cb){ writeCompleteCallback_ = std::move(cb); }private:/// Not thread safe, but in loopvoid newConnection(int sockfd);/// Not thread safe, but in loopvoid removeConnection(const TcpConnectionPtr& conn);EventLoop* loop_; // 运行在那个loopConnectorPtr connector_; // avoid revealing Connector // 连接器const string name_; // TcpClient名ConnectionCallback connectionCallback_;   // 连接建立和断开回调MessageCallback messageCallback_;   // 可读回调WriteCompleteCallback writeCompleteCallback_;   // 写完回调bool retry_;   // atomic  重连bool connect_; // atomic  // 已经连接?// always in loop threadint nextConnId_;  // 字面意思mutable MutexLock mutex_;TcpConnectionPtr connection_ GUARDED_BY(mutex_);  // 连接读写管理器
};

TcpClient核心函数TcpClient::newConnection,该函数会作为连接器的回调,当sockfd连接成功后,该函数被调用,设置必要信息后,为该sockfd产生一个TcpConnection对象,后续该fd的读写,全权交由TcpConnection处理。逻辑比较简单,实现如下:

实现的伪代码:

TcpClient::TcpClient(EventLoop* loop,const InetAddress& serverAddr,const string& nameArg): loop_(CHECK_NOTNULL(loop)),connector_(new Connector(loop, serverAddr)),name_(nameArg),connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),retry_(false),connect_(true),nextConnId_(1){connector_->setNewConnectionCallback(std::bind(&TcpClient::newConnection, this, _1));// FIXME setConnectFailedCallbackLOG_INFO << "TcpClient::TcpClient[" << name_<< "] - connector " << get_pointer(connector_);
}void TcpClient::connect(){// FIXME: check stateLOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "<< connector_->serverAddress().toIpPort();connect_ = true;connector_->start();
}void TcpClient::disconnect(){connect_ = false;{MutexLockGuard lock(mutex_);if (connection_){connection_->shutdown();}}
}void TcpClient::stop(){connect_ = false;connector_->stop();
}void TcpClient::newConnection(int sockfd){loop_->assertInLoopThread();InetAddress peerAddr(sockets::getPeerAddr(sockfd));char buf[32];snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessaryTcpConnectionPtr conn(new TcpConnection(loop_,connName,sockfd,localAddr,peerAddr));conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe{MutexLockGuard lock(mutex_);connection_ = conn;}conn->connectEstablished(); // 同一loop,可以直接调用
}void TcpClient::removeConnection(const TcpConnectionPtr& conn){loop_->assertInLoopThread();assert(loop_ == conn->getLoop());{MutexLockGuard lock(mutex_);assert(connection_ == conn);connection_.reset();}loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));if (retry_ && connect_){LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "<< connector_->serverAddress().toIpPort();connector_->restart();}
}

细节明细:

疑问

在TcpConnection::handleClose()实现当中,为什么没有调用close,关闭sockfd?也看了一下TcpConnection的析构、TcpConnection::connectDestroyed(),没有一个地方调用了close来关闭sockfd

解答

向github上提交了discuss,待更新。。。

总结

Muduo设计的TcpServer和TcpClient代码思想及其统一,一些算法题也是需要这样的抽象思维,所以我认为这也是以后从事it最重要的品质,可以避免很多不必要的bug。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/651580.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++设计模式介绍:优雅编程的艺术

物以类聚 人以群分 文章目录 简介为什么有设计模式&#xff1f; 设计模式七大原则单一职责原则&#xff08;Single Responsibility Principle - SRP&#xff09;开放封闭原则&#xff08;Open/Closed Principle - OCP&#xff09;里氏替换原则&#xff08;Liskov Substitution …

1.27学习总结

今天做了些队列的题&#xff1a; 1.逛画展&#xff08;单调队列&#xff09; 2.打印队列 Printer Queue&#xff08;优先队列&#xff09; 3.[NOIP2010 提高组] 机器翻译(模拟队列) 4.求m区间内的最小值(单调队列板子题) 5.日志统计(滑动窗口&#xff0c;双指针) 总结一下&…

3分钟 docker搭建 帕鲁服务器

1. 安装docker 1.安装依赖环境 yum -y install yum-utils device-mapper-persistent-data lvm22.设置镜像源 yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo3.安装docker 3.1 yum makecache fast yum install docker-ce …

MongoDB:从容器使用到 Mongosh、Python/Node.js 数据操作

文章目录 1. 容器与应用之间的关系介绍2. 使用 Docker 容器安装 MongoDB3. Mongosh 操作3.1 Mongosh 连接到 MongoDB3.2 基础操作与 CRUD 4. Python 操作 MongoDB5. Nodejs 操作 MongoDB参考文献 1. 容器与应用之间的关系介绍 MongoDB 的安装有时候并不是那么容易的&#xff0…

消息中间件及java线程池

1. ActiveMQ是什么&#xff1f; Apache ActiveMQ是一个开源的消息中间件&#xff08;Message Oriented Middleware, MOM&#xff09;&#xff0c;它遵循Java消息服务&#xff08;Java Message Service, JMS&#xff09;规范&#xff0c;提供高效、可靠和异步的消息传递功能。Ac…

【从浅到深的算法技巧】初级排序算法 上

5.排序 5.1 初级排序算法 作为对排序算法领域的第一次探索&#xff0c; 我们将学习两种初级的排序算法以及其中种的一个变体。深入学习这些相对简单的算法的原因在于:第一,我们将通过它们熟悉些术语和简单的技巧 第二&#xff0c;这些简单的算法在某些情况下比我们之后将会讨论…

《HelloGitHub》第 94 期

兴趣是最好的老师&#xff0c;HelloGitHub 让你对编程感兴趣&#xff01; 简介 HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。 https://github.com/521xueweihan/HelloGitHub 这里有实战项目、入门教程、黑科技、开源书籍、大厂开源项目等&#xff0c;涵盖多种编程语言 …

Redis6基础知识梳理~

初识NOSQL&#xff1a; NOSQL是为了解决性能问题而产生的技术&#xff0c;在最初&#xff0c;我们都是使用单体服务器架构&#xff0c;如下所示&#xff1a; 随着用户访问量大幅度提升&#xff0c;同时产生了大量的用户数据&#xff0c;单体服务器架构面对着巨大的压力 NOSQL解…

openssl3.2 - 测试程序的学习 - test\acvp_test.c

文章目录 openssl3.2 - 测试程序的学习 - test\acvp_test.c概述笔记要单步学习的测试函数备注END openssl3.2 - 测试程序的学习 - test\acvp_test.c 概述 openssl3.2 - 测试程序的学习 将test*.c 收集起来后, 就不准备看makefile和make test的日志参考了. 按照收集的.c, 按照…

换个思维方式快速上手UML和 plantUML——类图

和大多数朋友一样&#xff0c;Jeffrey 在一开始的时候也十分的厌烦软件工程的一系列东西&#xff0c;对工程化工具十分厌恶&#xff0c;觉得它繁琐&#xff0c;需要记忆很多没有意思的东西。 但是之所以&#xff0c;肯定有是因为。对工程化工具的不理解和不认可主要是基于两个逻…

【c++】类对象模型

1.如何计算类对象的大小 class A { public:void PrintA(){cout<<_a<<endl;} private:char _a; }; 问题&#xff1a;类中既可以有成员变量&#xff0c;又可以有成员函数&#xff0c;那么一个类的对象中包含了什么&#xff1f;如何计算一个类的大小&#xff1f; 2…

C++算法之枚举、模拟与排序

1.AcWing 1210.连号区间数 分析思路 由题意是在 1∼N 的某个排列中有多少个连号区间&#xff0c;所以每个数出现并且不重复&#xff01; 如果是连续的&#xff0c;那么Max-Minj-i&#xff08;[i,j]&#xff09; 代码实现 #include<iostream> #include<algorithm>…

关于AOP的@Around特殊处理RequestBody的使用小结

目录 1. 概述 1.1 背景 1.2 源码 2. 测试 2.1 Controller 2.2 SpecialName配置 2.3 RequestConverter 2.4 测试 最近项目上遇到一个这样的需求&#xff1a;用户请求的时候传过来A&#xff0c;在api处理过程中要把A当成B去处理&#xff0c;但是返回的标识中又必须是A作为…

freeswitch智能外呼系统搭建流程

1.获取实时音频数据 media_bug &#xff08;好多mrcp方式也崩溃所以用以下方式&#xff09; 可以参考 方式可以通过socket或者webscoket freeswitch[1.05]用websocket发送mediabug语音流到ASRProxy实现实时质检和坐席辅助 - 知乎 2.webscoket 好多c的库放模块容易崩溃 可以…

2. MySQL 多实例

重点&#xff1a; MySQL 的 三种安装方式&#xff1a;包安装&#xff0c;二进制安装&#xff0c;源码编译安装。 MySQL 的 基本使用 MySQL 多实例 DDLcreate alter drop DML insert update delete DQL select 2.5&#xff09;通用 二进制格式安装 MySQL 2.5.1&#xff…

Linux BIO如何下发到HDD?

在Linux操作系统中&#xff0c;当创建一个Block I/O请求&#xff08;BIO&#xff09;时&#xff0c;它会被封装成适合硬件交互的数据结构&#xff0c;并通过内核存储子系统传递到对应的硬件控制器上&#xff0c;如SAS&#xff08;Serial Attached SCSI&#xff09;HBA&#xff…

Linux ---- Shell编程之函数与数组

目录 一、函数 1、函数的基本格式 2、查看函数列表 3、删除函数 4、函数的传参数 5、函数返回值 实验&#xff1a; 1.判断输入的ip地址正确与否 2. 判断是否为管理员用户登录 6、函数变量的作用范围 7、函数递归&#xff08;重要、难点&#xff09; 实验&#xff1…

《Q年文峰》GPT应用的交互式非线性体验

Phoncent博客创始人庄泽峰把自己的小说《Q年文峰》做成GPT应用&#xff0c;显然这是一件值得探索且具有创新意义的事情。 因为传统的阅读体验是线性的&#xff0c;读者只能按照固定的情节顺序进行阅读&#xff0c;而把小说制作成GPT应用后&#xff0c;读者阅读小说的方式是非线…

力扣0085——最大矩形

最大矩形 难度&#xff1a;困难 题目描述 给定一个仅包含 0 和 1 、大小为 rows x cols 的二维二进制矩阵&#xff0c;找出只包含 1 的最大矩形&#xff0c;并返回其面积。 示例1 输入&#xff1a;matrix [["1","0","1","0",&qu…

分布式id-雪花算法

一、雪花算法介绍 Snowflake&#xff0c;雪花算法是有Twitter开源的分布式ID生成算法&#xff0c;以划分命名空间的方式将64bit位分割成了多个部分&#xff0c;每个部分都有具体的不同含义&#xff0c;在Java中64Bit位的整数是Long类型&#xff0c;所以在Java中Snowflake算法生…