06 | Swoole 源码分析之 Coroutine 协程模块

首发原文链接:Swoole 源码分析之 Coroutine 协程模块
大家好,我是码农先森。

引言

协程又称轻量级线程,但与线程不同的是;协程是用户级线程,不需要操作系统参与。由用户显式控制,可以在需要的时候挂起、或恢复执行。

通过协程程序可以在执行的过程中保存当前的状态,并在恢复后从该状态处继续执行,整体上来说创建、销毁、切换的成本低。

但在 Swoole 中的协程是无法利用多核 CPU 的,如果想利用多核 CPU 则需要依赖 Swoole 的多进程模型。

协程的出现为 Swoole 程序提升并发效率、及系统的处理能力,注入了强劲的动力;可以说是 Swoole 作为高性能通信框架的的核心模块。

源码拆解

这次我们以下面这段代码,来作为本次拆解源码的切入点。

// 协程容器
Swoole\Coroutine\run(function () {// Socket 协程客户端$socket = new Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, 0);// 建立连接,在建立连接的过程中会发生协程切换$retval = $socket->connect('127.0.0.1', 9601);if ($retval) {// 发送数据,在发送数据的过程中会发生协程切换$n = $socket->send('hello');var_dump($n);// 解释数据,在接收数据的过程中会发生协程切换$data = $socket->recv();var_dump($data);// 关闭连接$socket->close();}
});

这段代码主要是使用 Socket 的协程客户端与本地的 9601 端口建立连接,并且发送、接收数据。在分析源码之前,我对这次的源码做了一个图解梳理,把整个调用链路上的函数串联了起来。我们可以先对整体有个大致的了解,便于后面分析源代码。

Socket 协程客户端

Socket 协程客户端是专门用于 Swoole 在协程环境中使用的,可以实现在 IO 调用时切换协程,让出 CPU 的使用权。例如:在连接建立、发送数据、接收数据 等阶段会进行协程的切换。

这个函数主要是发起 Socket 连接的建立,并且在 wait_event 这个函数内部实现了协程的切换。

// swoole-src/src/coroutine/socket.cc:595
bool Socket::connect(const struct sockaddr *addr, socklen_t addrlen) {if (sw_unlikely(!is_available(SW_EVENT_RDWR))) {return false;}int retval;do {// 发起连接建立retval = ::connect(sock_fd, addr, addrlen);} while (retval < 0 && errno == EINTR);if (retval < 0) {if (errno != EINPROGRESS) {set_err(errno);return false;} else {TimerController timer(&write_timer, connect_timeout, this, timer_callback);// wait_event 这个函数内部实现了协程的切换if (!timer.start() || !wait_event(SW_EVENT_WRITE)) {if (is_closed()) {set_err(ECONNABORTED);}return false;} else {if (socket->get_option(SOL_SOCKET, SO_ERROR, &errCode) < 0 || errCode != 0) {set_err(errCode);return false;}}}}connected = true;set_err(0);return true;
}

再看看 wait_event 函数的内部实现,先是获取到当前的协程,然后根据事件的类型调用函数 add_event 将事件添加到事件管理的结构体中,最后将当前的协程切换出去,让出其 CPU 的控制权。

// swoole-src/src/coroutine/socket.cc:147
bool Socket::wait_event(const EventType event, const void **__buf, size_t __n) {EventType added_event = event;// 获取到当前的协程Coroutine *co = Coroutine::get_current_safe();if (!co) {return false;}if (sw_unlikely(socket->close_wait)) {set_err(SW_ERROR_CO_SOCKET_CLOSE_WAIT);return false;}// clear the last errCodeset_err(0);
#ifdef SW_USE_OPENSSL// 根据事件的类型调用函数 add_event 将事件添加到事件管理的结构体中if (sw_unlikely(socket->ssl && ((event == SW_EVENT_READ && socket->ssl_want_write) ||(event == SW_EVENT_WRITE && socket->ssl_want_read)))) {if (sw_likely(socket->ssl_want_write && add_event(SW_EVENT_WRITE))) {want_event = SW_EVENT_WRITE;} else if (socket->ssl_want_read && add_event(SW_EVENT_READ)) {want_event = SW_EVENT_READ;} else {return false;}added_event = want_event;} else
#endifif (sw_unlikely(!add_event(event))) {return false;}swoole_trace_log(SW_TRACE_SOCKET,"socket#%d blongs to cid#%ld is waiting for %s event",sock_fd,co->get_cid(),get_wait_event_name(this, event));Coroutine::CancelFunc cancel_fn = [this, event](Coroutine *co) { return cancel(event); };// 将当前的协程切换出去,让出其 CPU 的控制权if (sw_likely(event == SW_EVENT_READ)) {read_co = co;read_co->yield(&cancel_fn);read_co = nullptr;} else if (event == SW_EVENT_WRITE) {if (sw_unlikely(!zero_copy && __n > 0 && *__buf != get_write_buffer()->str)) {write_buffer->clear();if (write_buffer->append((const char *) *__buf, __n) != SW_OK) {set_err(ENOMEM);goto _failed;}*__buf = write_buffer->str;}write_co = co;write_co->yield(&cancel_fn);write_co = nullptr;} else {assert(0);return false;}
_failed:
#ifdef SW_USE_OPENSSL// maybe read_co and write_co are all waiting for the same event when we use SSLif (sw_likely(want_event == SW_EVENT_NULL || !has_bound()))
#endif{Reactor *reactor = SwooleTG.reactor;if (sw_likely(added_event == SW_EVENT_READ)) {reactor->remove_read_event(socket);} else {reactor->remove_write_event(socket);}}
#ifdef SW_USE_OPENSSLwant_event = SW_EVENT_NULL;
#endifswoole_trace_log(SW_TRACE_SOCKET,"socket#%d blongs to cid#%ld trigger %s event",sock_fd,co->get_cid(),get_trigger_event_name(this, added_event));return !is_closed() && !errCode;
}

同理 send()recv() 函数,也和 connect() 函数是一样的实现方式。

// swoole-src/src/coroutine/socket.cc:847
ssize_t Socket::send(const void *__buf, size_t __n) {if (sw_unlikely(!is_available(SW_EVENT_WRITE))) {return -1;}ssize_t retval;TimerController timer(&write_timer, write_timeout, this, timer_callback);do {// 发送数据retval = socket->send(__buf, __n, 0);} while (retval < 0 && socket->catch_write_error(errno) == SW_WAIT && timer.start() &&wait_event(SW_EVENT_WRITE, &__buf, __n));check_return_value(retval);return retval;
}// swoole-src/src/coroutine/socket.cc:874
ssize_t Socket::recv(void *__buf, size_t __n) {if (sw_unlikely(!is_available(SW_EVENT_READ))) {return -1;}ssize_t retval;TimerController timer(&read_timer, read_timeout, this, timer_callback);do {// 接收数据retval = socket->recv(__buf, __n, 0);} while (retval < 0 && socket->catch_read_error(errno) == SW_WAIT && timer.start() && wait_event(SW_EVENT_READ));check_return_value(retval);return retval;
}

也是调用 wait_event() 函数来实现当前的协程切换,唯一的区别就是事件的类型不同,一个是读事件,一个是写事件。

Run 协程容器

在 Swoole 中要想使用协程,那么必须要在协程的环境中使用协程的客户端,或者支持 Hook 的原生 PHP 函数。才能享受到 Swoole 中协程带来的高性能,不然和普通的 PHP 执行程序没有什么区别,变成了同步阻塞。

在源码中协程容器主要是实现了事件循环的初始化、协程上下文的创建管理、事件循环的 IO 事件监听,接下来我们会主要分析关于事件管理的部分内容。

// swoole-src/src/coroutine/base.cc:210
namespace coroutine {bool run(const CoroutineFunc &fn, void *arg) {// 事件循环的初始化if (swoole_event_init(SW_EVENTLOOP_WAIT_EXIT) < 0) {return false;}// 协程上下文的创建管理Coroutine::activate();long cid = Coroutine::create(fn, arg);// 事件循环的 IO 事件监听swoole_event_wait();Coroutine::deactivate();return cid > 0;}
}

Event 事件初始化

Event 事件初始化主要是定义一些事件的回调函数,便于在事件被触发时恢复对应的协程进行后续的逻辑处理,例如:读事件回调函数 readable_event_callback、写事件回调函数 writable_event_callback 等。

// swoole-src/src/wrapper/event.cc:37
int swoole_event_init(int flags) {if (!SwooleG.init) {std::unique_lock<std::mutex> lock(init_lock);swoole_init();}// 创建一个 Reactor 实例对象Reactor *reactor = new Reactor(SW_REACTOR_MAXEVENTS);if (!reactor->ready()) {return SW_ERR;}if (flags & SW_EVENTLOOP_WAIT_EXIT) {reactor->wait_exit = 1;}// Socket 事件初始化coroutine::Socket::init_reactor(reactor);coroutine::System::init_reactor(reactor);network::Client::init_reactor(reactor);SwooleTG.reactor = reactor;return SW_OK;
}
// swoole-src/include/swoole_coroutine_sokcet.h:157
static inline void init_reactor(Reactor *reactor) {// 定义对应事件的回调函数reactor->set_handler(SW_FD_CO_SOCKET | SW_EVENT_READ, readable_event_callback);reactor->set_handler(SW_FD_CO_SOCKET | SW_EVENT_WRITE, writable_event_callback);reactor->set_handler(SW_FD_CO_SOCKET | SW_EVENT_ERROR, error_event_callback);
}
// swoole-src/src/coroutine/socket.c:48
int Socket::readable_event_callback(Reactor *reactor, Event *event) {Socket *socket = (Socket *) event->socket->object;socket->set_err(0);
#ifdef SW_USE_OPENSSLif (sw_unlikely(socket->want_event != SW_EVENT_NULL)) {if (socket->want_event == SW_EVENT_READ) {// 恢复对应的协程socket->write_co->resume();}} else
#endif{if (socket->recv_barrier && (*socket->recv_barrier)() && !event->socket->event_hup) {return SW_OK;}// 恢复对应的协程socket->read_co->resume();}return SW_OK;
}

Event 事件监听

Event 事件监听主要是针对被加入到事件循环中的 Socket 进行 IO 事件的监听,如果有读或写 IO 事件的触发,则回调到对应的处理函数上进行执行。

// swoole-src/src/warpper/event.cc:84
int swoole_event_wait() {Reactor *reactor = SwooleTG.reactor;int retval = 0;if (!reactor->wait_exit or !reactor->if_exit()) {// 事件循环等待调用retval = reactor->wait(nullptr);}swoole_event_free();return retval;
}
// swoole-src/src/reactor/epoll.cc:153
int ReactorEpoll::wait(struct timeval *timeo) {Event event;ReactorHandler handler;int i, n, ret;int reactor_id = reactor_->id;int max_event_num = reactor_->max_event_num;if (reactor_->timeout_msec == 0) {if (timeo == nullptr) {reactor_->timeout_msec = -1;} else {reactor_->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000;}}reactor_->before_wait();while (reactor_->running) {if (reactor_->onBegin != nullptr) {reactor_->onBegin(reactor_);}// 监听 IO 事件n = epoll_wait(epfd_, events_, max_event_num, reactor_->get_timeout_msec());if (n < 0) {if (!reactor_->catch_error()) {swoole_sys_warning("[Reactor#%d] epoll_wait failed", reactor_id);return SW_ERR;} else {goto _continue;}} else if (n == 0) {reactor_->execute_end_callbacks(true);SW_REACTOR_CONTINUE;}for (i = 0; i < n; i++) {event.reactor_id = reactor_id;event.socket = (Socket *) events_[i].data.ptr;event.type = event.socket->fd_type;event.fd = event.socket->fd;if (events_[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {event.socket->event_hup = 1;}// read 读事件,这里的 handler 对应 readable_event_callbackif ((events_[i].events & EPOLLIN) && !event.socket->removed) {handler = reactor_->get_handler(SW_EVENT_READ, event.type);ret = handler(reactor_, &event);if (ret < 0) {swoole_sys_warning("EPOLLIN handle failed. fd=%d", event.fd);}}// write 写事件,这里的 handler 对应 writable_event_callbackif ((events_[i].events & EPOLLOUT) && !event.socket->removed) {handler = reactor_->get_handler(SW_EVENT_WRITE, event.type);ret = handler(reactor_, &event);if (ret < 0) {swoole_sys_warning("EPOLLOUT handle failed. fd=%d", event.fd);}}// error 错误处理,这里的 handler 对应 error_event_callbackif ((events_[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) && !event.socket->removed) {// ignore ERR and HUP, because event is already processed at IN and OUT handler.if ((events_[i].events & EPOLLIN) || (events_[i].events & EPOLLOUT)) {continue;}handler = reactor_->get_error_handler(event.type);ret = handler(reactor_, &event);if (ret < 0) {swoole_sys_warning("EPOLLERR handle failed. fd=%d", event.fd);}}if (!event.socket->removed && (event.socket->events & SW_EVENT_ONCE)) {reactor_->_del(event.socket);}}_continue:reactor_->execute_end_callbacks(false);SW_REACTOR_CONTINUE;}return 0;
}

总结

  • 协程又称轻量级线程,协程是用户级线程;不需要操作系统参与,创建切换成本低。
  • Swoole 中的协程是无法利用多核 CPU 的,如果想利用多核 CPU 则需要依赖 Swoole 的多进程模型。
  • Swoole 中协程的是利用的 Event 事件循环进行调度的,将遇到 IO 操作的 Socket 统一加入到事件循环中。
  • 本次的源码分析旨在了解整个协程在 Swoole 中的运行逻辑,打开我们的思路,便于我们更好的体会到协程所带来的高性能价值。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/787968.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis中的复制功能(三)

复制 服务器运行ID 除了复制偏移量和复制积压缓冲区之外&#xff0c;实现部分重同步还需要用到服务器运行ID(run ID): 1.每隔Redis服务器&#xff0c;不论主服务器还是从服务&#xff0c;都会有自己的运行ID2.运行ID在服务器启动时自动生成&#xff0c;由40个随机的十六进制…

迈向数字化医疗:互联网医院APP开发中的设计思路与技术要点

在开发互联网医院APP时&#xff0c;需要综合考虑设计思路和技术要点&#xff0c;确保用户体验和医疗服务质量的提升。接下来&#xff0c;小编将从设计思路和技术要点两个方面进行讲解。 一、设计思路 用户导向&#xff1a;在设计互联网医院APP时&#xff0c;需要将用户体验放在…

RocketMQ 消费者源码解读:消费过程、负载原理、顺序消费原理

B站学习地址 上一遍学习了三种常见队列的消费原理&#xff0c;本次我们来从源码的角度来证明上篇中的理论。 1、准备 RocketMQ 版本 <!-- RocketMQ --> <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-s…

vs2022断点找bug出错(打上100个断点)

初步分析&#xff1a;故障出自-具体功能模块 进一步分析&#xff1a;故障出自-该功能代码流程 进一步分析&#xff1a;从该功能起点-终点&#xff0c;一路打100个断点

ICLR 2024 | 鸡生蛋蛋生鸡?再论生成数据能否帮助模型训练

ChatGPT狂飙160天&#xff0c;世界已经不是之前的样子。 新建了人工智能中文站https://ai.weoknow.com 每天给大家更新可用的国内可用chatGPT资源 发布在https://it.weoknow.com 更多资源欢迎关注 随着生成模型&#xff08;如 ChatGPT、扩散模型&#xff09;飞速发展&#x…

Nomad Web更新没有最快只有更快

大家好&#xff0c;才是真的好。 很长时间没介绍运行在浏览器中的Notes客户端即Nomad Web更新情况。 不用安装&#xff0c;直接使用&#xff0c;还可以完美地兼容适应各种操作系统&#xff0c;Nomad Web一定是Notes/Domino产品现在和将来重点发展的用户访问模式。 不过&…

【CKA模拟题】一文教你用StorageClass轻松创建PV

题干 For this question, please set this context (In exam, diff cluster name) kubectl config use-context kubernetes-adminkubernetesYour task involves setting up storage components in a Kubernetes cluster. Follow these steps: Step 1: Create a Storage Class…

书生 浦语 大模型趣味 Demo

目录 一. 部署 InternLM2-Chat-1.8B 模型进行智能对话 1. 环境准备 2. 下载模型参数 3. 运行Demo 二. 部署实战营 八戒-Chat-1.8B 模型 1. 下载Demo仓库 2. 启动web服务端加载八戒模型&#xff1a; 3. 将SSH远程端口映射到本地 4. 在本地浏览器打开&#xff1a;http:/…

Python抓取抖音直播间数据:技术探索与实践

目录 一、引言 二、技术准备 三、分析抖音直播间网页结构 四、编写爬虫代码 五、处理反爬虫机制 六、数据清洗与存储 七、总结 一、引言 随着互联网的快速发展&#xff0c;直播行业已成为当下的热门领域。抖音作为其中的佼佼者&#xff0c;吸引了大量的用户和主播。对于…

元宇宙虚拟空间的场景构造(二)

前言 该文章主要讲元宇宙虚拟空间的场景构造&#xff0c;基本核心技术点&#xff0c;不多说&#xff0c;直接引入正题。 场景的构造 使用引入的天空模块 this.sky new Sky(this); 在Sky模块里&#xff0c;有设置对其中的阳光进行不同时间段的光线处理。而天空又是怎么样的…

STM32 DWT数据观察触发器作为延时函数的使用

STM32 DWT数据观察触发器作为延时函数的使用 &#x1f4d1;DWT(Data Watchpoint and Trace数据观察触发器&#xff09;描述 &#x1f4dd;DWT是属于处理器内核单元中的调试组件之一&#xff0c;由四个比较器组成。它们可配置为&#xff1a;硬件监视点或对ETM或PC采样器或数据地…

dcoker 下redis设置密码

修改Docker里面Redis密码 Redis是一个开源的内存数据结构存储系统&#xff0c;常用于缓存、消息队列和数据持久化等场景。在使用Docker部署Redis时&#xff0c;默认情况下是没有设置密码的&#xff0c;这可能会导致安全隐患。因此&#xff0c;为了保证数据的安全性&…

蓝桥杯真题Day44 倒计时10天 练了六道真题 !

[蓝桥杯 2020 省 B2] 平面切分 题目描述 平面上有 N 条直线, 其中第 i 条直线是 yAi​⋅xBi​ 。请计算这些直线将平面分成了几个部分。 输入格式 第一行包含一个整数 N。 以下 N 行, 每行包含两个整数 Ai​,Bi​。 输出格式 一个整数代表答案。 代码表示 #include<…

基于SpringBoot的图书馆管理系统设计与实现

介绍 基于&#xff1a;java8 SpringBoot thymeleaf MySQL8.0.17 mybatis-plus maven Xadmin 实现图书馆管理系统 系统要实现如下的基本管理功能&#xff1a; &#xff08;1&#xff09;用户分为两类&#xff1a;管理员&#xff0c;一般用户。 &#xff08;2&#xff09…

Day57:WEB攻防-SSRF服务端请求Gopher伪协议无回显利用黑白盒挖掘业务功能点

目录 SSRF-原理&挖掘&利用&修复 SSRF无回显解决办法 SSRF漏洞挖掘 SSRF协议利用 http:// &#xff08;常用&#xff09; file:/// &#xff08;常用&#xff09; dict:// &#xff08;常用&#xff09; sftp:// ldap:// tftp:// gopher:// &#xff08;…

群晖NAS使用Docker部署大语言模型Llama 2结合内网穿透实现公网访问本地GPT聊天服务

文章目录 1. 拉取相关的Docker镜像2. 运行Ollama 镜像3. 运行Chatbot Ollama镜像4. 本地访问5. 群晖安装Cpolar6. 配置公网地址7. 公网访问8. 固定公网地址 随着ChatGPT 和open Sora 的热度剧增,大语言模型时代,开启了AI新篇章,大语言模型的应用非常广泛&#xff0c;包括聊天机…

Nginx漏洞之未授权访问和源码泄漏漏洞处理

一、漏洞描述 某次安全扫描&#xff0c;发现某平台存在资源&#xff1a;未授权访问和源码泄漏&#xff1b;攻击者可能获取到网站的配置文件、敏感数据存储位置和访问凭证等信息。这意味着攻击者可以获得对网站的完全或部分控制权&#xff0c;进而进行恶意篡改、删除或添加恶意…

6.8物联网RK3399项目开发实录-驱动开发之RTC实时时钟的使用(wulianjishu666)

90款行业常用传感器单片机程序及资料【stm32,stc89c52,arduino适用】 链接&#xff1a;https://pan.baidu.com/s/1M3u8lcznKuXfN8NRoLYtTA?pwdc53f RTC 使用 简介 AIO-3399J 开发板上有 一个集成于 RK808 上的RTC(Real Time Clock)&#xff0c;主要功能有时钟&#xff0c…

【PowerDesigner】PGSQL反向工程过程已中断

问题 反向工程过程已中断,原因是某些字符无法通过ANSI–&#xff1e;UTF-16转换进行映射。pg导入sql时报错&#xff0c;一查询是power designer 反向工程过程已中断&#xff0c;某些字符无法通过ANSI–>UTF-16转换进行映射&#xff08;会导致数据丢失&#xff09; 处理 注…

代码随想录第28天| 131.分割回文串 93.复原IP地址 78.子集

131.分割回文串 131. 分割回文串 - 力扣&#xff08;LeetCode&#xff09; 代码随想录 (programmercarl.com) 带你学透回溯算法-分割回文串&#xff08;对应力扣题目&#xff1a;131.分割回文串&#xff09;| 回溯法精讲&#xff01;_哔哩哔哩_bilibili 给你一个字符串 s&…