07 | Swoole 源码分析之 Channel 通道模块

原文首发链接:Swoole 源码分析之 Channel 通道模块
大家好,我是码农先森。

引言

通道,用于协程间通讯,支持多生产者协程和多消费者协程。底层自动实现了协程的切换和调度。

通道与 PHP 的 Array 类似,仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无 IO 消耗。

底层使用 PHP 引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗 channel 基于引用计数实现,是零拷贝的。

源码拆解

Channel 通道需要在协程环境中使用,我们先看下面这段代码,使用 new Channel(1) 创建一个 channel 对象,然后在第一个协程中向通道中推送数据,在第二个协程获取到通道内的数据进行消费。

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;run(function(){// 创建 channel 通道对象$channel = new Channel(1);Coroutine::create(function () use ($channel) {for($i = 0; $i < 10; $i++) {Coroutine::sleep(1.0);// 向通道内推送数据$channel->push(['rand' => rand(1000, 9999), 'index' => $i]);echo "{$i}\n";}});Coroutine::create(function () use ($channel) {while(1) {// 从通道中获取数据$data = $channel->pop(2.0);if ($data) {var_dump($data);} else {assert($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);break;}}});
});

在分析源代码之前,我们可以提前看一下源码整体的调用逻辑图,以便我们有个大致的印象。

这段代码主要是在 Swoole 的协程环境中创建 Channel 对象并初始化其容量的逻辑。

// swoole-src/ext-src/swoole-channel.cc:132
static PHP_METHOD(swoole_channel_coro, __construct) {zend_long capacity = 1;// 解析传入的参数ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)Z_PARAM_OPTIONALZ_PARAM_LONG(capacity)ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);if (capacity <= 0) {capacity = 1;}// 当前对象对应的 ChannelObject 结构体指针ChannelObject *chan_t = php_swoole_channel_coro_fetch_object(Z_OBJ_P(ZEND_THIS));// 为该通道对象分配新的 Channel 实例,并设置其容量为传入的值。chan_t->chan = new Channel(capacity);zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("capacity"), capacity);
}

这段代码主要是在 Swoole 的协程环境中向通道中推送数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:149
static PHP_METHOD(swoole_channel_coro, push) {// 获取当前对象的 Channel 实例Channel *chan = php_swoole_get_channel(ZEND_THIS);zval *zdata;double timeout = -1;// 解析传入的参数ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 2)Z_PARAM_ZVAL(zdata)Z_PARAM_OPTIONALZ_PARAM_DOUBLE(timeout)ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);Z_TRY_ADDREF_P(zdata);zdata = sw_zval_dup(zdata);// 向通道中推入数据if (chan->push(zdata, timeout)) {zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);RETURN_TRUE;} else {zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());Z_TRY_DELREF_P(zdata);efree(zdata);RETURN_FALSE;}
}// swoole-src/coroutine/channel.cc:105
bool Channel::push(void *data, double timeout) {// 获取当前协程对象 current_coCoroutine *current_co = Coroutine::get_current_safe();// 如果通道已关闭if (closed) {// 设置错误并返回空指针error_ = ERROR_CLOSED;return false;}// 如果通道已满或生产者队列不为空,则设置超时消息,并根据传入的超时值添加定时器,等待生产者。if (is_full() || !producer_queue.empty()) {TimeoutMessage msg;msg.error = false;msg.timer = nullptr;if (timeout > 0) {msg.chan = this;msg.type = PRODUCER;msg.co = current_co;// 根据传入的超时值添加定时器msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);}// 挂起生产者协程yield(PRODUCER);// 如果设置了定时器,则在超时消息中删除定时器if (msg.timer) {swoole_timer_del(msg.timer);}// 如果当前协程被取消if (current_co->is_canceled()) {// 设置错误并返回空指针error_ = ERROR_CANCELED;return nullptr;}// 如果发生超时if (msg.error) {// 设置错误并返回空指针error_ = ERROR_TIMEOUT;return nullptr;}// 如果通道关闭且为空的情况if (closed && is_empty()) {// 设置相应的错误并返回空指针。error_ = ERROR_CLOSED;return nullptr;}}// 将数据压入数据队列。data_queue.push(data);swoole_trace_log(SW_TRACE_CHANNEL, "push data to channel, count=%ld", length());// 如果消费者队列不为空,则唤醒消费者协程。if (!consumer_queue.empty()) {Coroutine *co = pop_coroutine(CONSUMER);// 恢复消费者协程co->resume();}return true;
}

这段代码主要是在 Swoole 的协程环境中从通道中取出数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:175
static PHP_METHOD(swoole_channel_coro, pop) {// 获取当前对象的 Channel 实例Channel *chan = php_swoole_get_channel(ZEND_THIS);// 设置超时变量为-1double timeout = -1;// 解析一个超时参数ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)Z_PARAM_OPTIONALZ_PARAM_DOUBLE(timeout)ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);// 从通道中取出数据,并返回一个 zval 指针zval *zdata = (zval *) chan->pop(timeout);// 如果返回的 zval 指针不为空if (zdata) {// 将其返回给 PHP 脚本,并释放内存RETVAL_ZVAL(zdata, 0, 0);efree(zdata);zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);} else {zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());RETURN_FALSE;}
}// swoole-src/coroutine/channel.cc:55
void *Channel::pop(double timeout) {// 获取当前协程对象 current_coCoroutine *current_co = Coroutine::get_current_safe();// 如果通道已关闭且为空if (closed && is_empty()) {// 设置错误并返回空指针error_ = ERROR_CLOSED;return nullptr;}// 如果通道为空或者消费者队列不为空if (is_empty() || !consumer_queue.empty()) {TimeoutMessage msg;msg.error = false;msg.timer = nullptr;if (timeout > 0) {msg.chan = this;msg.type = CONSUMER;msg.co = current_co;// 根据传入的超时值添加定时器msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);}// 挂起消费者协程yield(CONSUMER);// 如果设置了定时器,则在超时消息中删除定时器if (msg.timer) {swoole_timer_del(msg.timer);}// 如果当前协程被取消if (current_co->is_canceled()) {// 设置错误并返回空指针error_ = ERROR_CANCELED;return nullptr;}// 如果发生超时if (msg.error) {// 设置错误并返回空指针error_ = ERROR_TIMEOUT;return nullptr;}// 如果通道关闭且为空的情况if (closed && is_empty()) {// 设置相应的错误并返回空指针。error_ = ERROR_CLOSED;return nullptr;}}// 从数据队列中弹出数据,并返回该数据。void *data = data_queue.front();data_queue.pop();// 如果生产者队列不为空,则唤醒生产者协程if (!producer_queue.empty()) {Coroutine *co = pop_coroutine(PRODUCER);// 恢复到生产者协程co->resume();}return data;
}

这段代码一是针对超时回调处理的处理逻辑,并恢复相关的协程操作。二是实现了协程的挂起操作,并根据不同的类型将当前协程放入不同的队列中,以便后续根据需要恢复执行。

// swoole-src/coroutine/channel.cc:22
void Channel::timer_callback(Timer *timer, TimerNode *tnode) {TimeoutMessage *msg = (TimeoutMessage *) tnode->data;msg->error = true;msg->timer = nullptr;if (msg->type == CONSUMER) {// 从消费者队列中移除该协程msg->chan->consumer_remove(msg->co);} else {// 从生产者队列中移除该协程msg->chan->producer_remove(msg->co);}// 恢复协程msg->co->resume();
}// swoole-src/coroutine/channel.cc:34
void Channel::yield(enum Opcode type) {// 获取当前协程Coroutine *co = Coroutine::get_current_safe();if (type == PRODUCER) {// 将当前协程放入到生产者队列producer_queue.push_back(co);swoole_trace_log(SW_TRACE_CHANNEL, "producer cid=%ld", co->get_cid());} else {// 将当前协程放入到消费者队列consumer_queue.push_back(co);swoole_trace_log(SW_TRACE_CHANNEL, "consumer cid=%ld", co->get_cid());}// 挂起被取消,则调用该函数Coroutine::CancelFunc cancel_fn = [this, type](Coroutine *co) {if (type == CONSUMER) {consumer_remove(co);} else {producer_remove(co);}co->resume();return true;};// 挂起当前协程co->yield(&cancel_fn);
}

总结

  1. Channel 通道需要在协程的环境中进行使用,通道是纯内存操作,没有 IO 消耗,非常高效。
  2. 底层使用 Channel::yield 函数实现了协程的自动切换和调度,如果通道处理超时则会自动调用 Channel::timer_callback 函数。
  3. Channel 通道是跨协程直接通信的一大利器,在实际的场景中使用起来十分的便利、高效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/798337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hive安装配置

1 在conf目录下vim 创建hive-site.xml <?xml version"1.0"?> <?xml-stylesheet type"text/xsl" href"configuration.xsl"?> <configuration><property><name>javax.jdo.option.ConnectionURL</name>&l…

Open3D (C++) 计算点云的特征值特征向量

目录 一、算法原理二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、算法原理 针对整个点云 P = { p i } i

互联网架构实践心得 (六)—给飞机换引擎和安全意识十原则

文章目录 给飞行中的飞机换引擎安全意识十原则开发层面产品层面运维层面给飞行中的飞机换引擎 所谓给飞行中的飞机(或飞驰的汽车)换引擎,说的是我们需要对一个正在飞速发展的系统进行大幅度的架构改造,比如把 All-in-one 的架构改造成微服务架构,尽可能减少或者消除停服的…

华为USG6000v

1、安全区域 一个及或多个接口的集合 默认的安全区域 Trust --- 优先级85&#xff0c;一般连接内网 Untrust --- 优先级5&#xff0c; 一般连接外网 Dmz --- 优先级50&#xff0c;一般连接服务器、 Local --- 优先级100&#xff0c;防火墙接口所在区的区域 2…

力扣 --组合

给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;n 4, k 2 输出&#xff1a; [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4], ] 示例 2&#xff1a; 输入&#xff1a;n 1, k …

2024.4.7作业

//登陆界面 this->setWindowTitle("传奇霸业"); this->setWindowIcon(QIcon("C:\\Users\\l1693\\Desktop\\pictures\\1.png")); this->resize(400,300); this->setFixedSize(400,300); //登录界面修饰 //底图 QLabel *lab5 new QLabel(this);…

nginx的正向代理是什么?如何结合Java实现nginx的正向代理?

1、nginx的正向代理是什么&#xff1f; 正向代理是客户端&#xff08;如浏览器&#xff09;通过代理服务器发送请求到目标服务器&#xff0c;然后代理服务器将响应返回给客户端的过程。在这个场景中&#xff0c;客户端知道它正在使用代理&#xff0c;并且所有的请求都首先被发…

揭秘!接口自动化测试应该做什么?

在软件开发过程中&#xff0c;接口测试是一个至关重要的环节&#xff0c;它确保了系统或组件之间的数据交换、传递和控制管理过程以及相互逻辑依赖关系的正确性。传统的瀑布软件流程中&#xff0c;测试人员在做某个系统的手工功能测试时&#xff0c;会首先从业务人员或开发人员…

每日一题 第七十三期 洛谷 [蓝桥杯 2013 省 B] 带分数

[蓝桥杯 2013 省 B] 带分数 题目描述 100 100 100 可以表示为带分数的形式&#xff1a; 100 3 69258 714 100 3 \frac{69258}{714} 100371469258​。 还可以表示为&#xff1a; 100 82 3546 197 100 82 \frac{3546}{197} 100821973546​。 注意特征&#xff1a;带分…

Java集合框架概览

Java 集合&#xff0c; 也叫作容器&#xff0c;主要是由两大接口派生而来&#xff1a;一个是 Collection接口&#xff0c;主要用于存放单一元素&#xff1b;另一个是 Map 接口&#xff0c;主要用于存放键值对。 对于Collection 接口&#xff0c;下面有三个主要的子接口&#x…

16个Python接单平台,做私活爽歪歪!(附100个爬虫源码)

一、python爬虫是可以做副业的&#xff0c;主要是爬取网站、小程序或者APP的数据&#xff0c;对数据进行分析与处理&#xff0c;或者直接向客户提供爬虫程序与技术支持。 当初学会Python那会儿&#xff0c;有朋友来介绍我去接私活&#xff0c;是为一家公司做网站&#xff0c;那…

background背景图参数边渐变CSS中创建背景图像的渐变效果

效果:可以看到灰色边边很难受,希望和背景融为一体 原理: 可以使用线性渐变&#xff08;linear-gradient&#xff09;或径向渐变&#xff08;radial-gradient&#xff09;。以下是一个使用线性渐变作为背景图像 代码: background: linear-gradient(to top, rgba(255,255,255,0)…

Collection与数据结构 Stack与Queue(二):队列与Queue

1. 队列 1.1 概念 只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表&#xff0c;队列具有先进先出FIFO(First In First Out) 入队列&#xff1a;进行插入操作的一端称为队尾&#xff08;Tail/Rear&#xff09; 出队列&#xff1a;进行删除操作…

C语言分支语句

一、什么是语句 C语句可分为以下五类&#xff1a; 表达式语句 函数调用语句 控制语句 复合语句 空语句 本周后面介绍的是控制语句。 控制语句用于控制程序的执行流程&#xff0c;以实现程序的各种结构方式&#xff0c;它们由特定的语句定义符组成&#xff0c;C语 言有…

android 资源文件混淆

AGP7.0以上引用AndResGuard有坑 记录下 在项目的build.gradle中添加如下 buildscript {ext.kotlin_version "1.4.31"repositories {google()jcenter()maven {url "https://s01.oss.sonatype.org/content/repositories/snapshots/"}}dependencies {class…

C++实现更改8位无符号整形的第n比特位值为1或0

value为8位无符号整形&#xff0c;如何更改其第n比特位的值&#xff1f;比如&#xff1a;value为243&#xff0c;二进制表示为&#xff1a; 1111 0011 如何将value更改为&#xff1a; 1011 0011 即在不改变其它比特位值的情况下&#xff0c;仅仅通过更改需要更改的比特位的…

树莓派游戏简单应用实例

树莓派是一款小巧的单板电脑&#xff0c;其工作原理是通过将电子元件如处理器、内存、存储器、输入输出接口等集成在一块电路板上&#xff0c;通过外部连接器与外部设备进行通信。 树莓派设备的工作原理主要包括以下几个方面&#xff1a; 处理器&#xff1a;树莓派采用ARM架构…

JQuery(二)---【使用JQuery对HTML、CSS进行操作】

零.前言 JQuery(一)---【JQuery简介、安装、初步使用、各种事件】-CSDN博客 一.使用JQuery对HTML操作 1.1获取元素内容、属性 使用JQ可以操作元素的“内容” text()&#xff1a;设置或返回元素的文本内容html()&#xff1a;设置或返回元素的内容(包括HTML标记)val()&#…

Flask Python Flask-SQLAlchemy中数据库的数据类型、flask中数据可的列约束配置

Flask Python Flask-SQLAlchemy中数据库的数据类型、flask中数据可的列约束配置 SQLAlchemy官方文档地址实战的代码分享数据类型列约束配置自定义方法 SQLAlchemy官方文档地址 SQLAlchemy官方文档地址 实战的代码分享 Flask-SQLAlchemy框架为创建数据库的实例提供了一个基类…

数据库系统概论

数据库系统概论 一、引言 数据库系统作为现代信息技术的重要组成部分&#xff0c;已经深入到社会生活的各个领域。无论是商务领域、科技发展&#xff0c;还是国家政府部门&#xff0c;数据库系统都发挥着举足轻重的作用。通过高效、稳定、安全的数据存储和管理&#xff0c;数…