node中的Stream-Readable和Writeable解读

在node中,只要涉及到文件IO的场景一般都会涉及到一个类-Stream。Stream是对IO设备的抽象表示,其在JAVA中也有涉及,主要体现在四个类-InputStream、Reader、OutputStream、Writer,其中InputStream和OutputStream类针对字节数据进行读写;Reader和Writer针对字符数据读写。同时Java中有多种针对这四种类型的扩展类,如节点流、缓冲流和转换流等。比较而言,node中Stream类型也和Java中的类似,同样提供了支持字节和字符读写的Readable和Writeable类,也存在转换流Transform类,本文主要分析node中Readable和Writeable的实现机制,从底层的角度更好的理解Readable和Writeable实现机制,解读在读写过程中发生的一些重要事件。

Readable类

Readable对应于Java中的InputStream和Reader两个类,针对Readable设置encode编码可完成内部数据由Buffer到字符的转换。Readable Stream有两种模式,即flowing和paused模式。这两种模式对于用户而言区别在于是否需要手动调用Readable.prototype.read(n),读取缓冲区的数据。查询node API文档可知触发flowing模式有三种方式:

  • 侦听data事件
  • readable.resume()
  • readable.pipe()
    而触发paused模式同样有几种方式:
  • 移除data事件
  • readable.pause()
  • readable.unpipe()
    可能这样讲解大家仍不明白Readable Stream这两种模式的区别,那么下文从更深层次分析两种模式的机制。

深入Readable的实现

Readable继承EventEmitter,大家也都知道。但是相信大家应该不怎么熟悉Readable的实例属性**_readableState**。该属性是一个ReadableState类型的对象,保存了Readable实例的重要信息,如读取模式(是否为对象模式)、highWaterMark(缓冲区存放的最大字节数)、缓冲区、flowing模式等。在Readable的实现中,处处使用ReadableState对象记录当前读取状态,并设置缓冲区保证读操作的顺利进行。

首先需要针对Readable.prototype.read方法进行特别解读:

  if (n === 0 &&state.needReadable &&(state.length >= state.highWaterMark || state.ended)) {debug('read: emitReadable', state.length, state.ended);if (state.length === 0 && state.ended)endReadable(this);elseemitReadable(this);return null;}

当读入的数据为0时,执行emitReadable操作。这意味着,针对Readable Stream执行read(0)方法会触发readable事件,但是不会读当前缓冲区。因此使用read(0)可以完成一些比较巧妙的事情,如在readable处理函数中可以使用read(0)触发下一次readable事件,可选的操作读缓冲区。

继续分析代码,如果读入的数据并不是0,则计算读取缓冲区的具体字节数,

n = howMuchToRead(n, state);function howMuchToRead(n, state) {if (state.length === 0 && state.ended)return 0;if (state.objectMode)return n === 0 ? 0 : 1;if (n === null || isNaN(n)) {// only flow one buffer at a timeif (state.flowing && state.buffer.length)return state.buffer[0].length;// 若是paused状态,则读全部的缓冲区elsereturn state.length;}if (n <= 0)return 0;if (n > state.highWaterMark)state.highWaterMark = computeNewHighWaterMark(n);// don't have that much.  return null, unless we've ended.if (n > state.length) {if (!state.ended) {state.needReadable = true;return 0;} else {return state.length;}}return n;
}

针对对象模式的读取,每次只读一个;对于处在flowing模式下的读取,每次只读缓冲区中第一个buffer的长度;在paused模式下则读取全部缓冲区的长度;若读取的字节数大于设置的缓冲区最大值,则适当扩大缓冲区的大小(默认为16k,最大为8m);若读取的长度大于当前缓冲区的大小,设置needReadable属性并准备数据等待下一次读取。

接下来,判断是否需要准备数据。在这里,依赖于needReadable的值,

var doRead = state.needReadable;debug('need readable', doRead);if (state.length === 0 || state.length - n < state.highWaterMark) {doRead = true;debug('length less than watermark', doRead);}// reading, then it's unnecessary.if (state.ended || state.reading) {doRead = false;debug('reading or ended', doRead);}

如果当前缓冲区为空,或者缓冲区并未超出我们设定的最大值,那么就可以继续准备数据;如果此时正在准备数据或者已经结束读取,那么就放弃准备数据。一旦doRead为true,那么进入准备数据阶段,

if (doRead) {debug('do read');state.reading = true;state.sync = true;// if the length is currently zero, then we *need* a readable event.if (state.length === 0)state.needReadable = true;// call internal read method// 默认Readable未实现_read,抛出Error// 针对自定义的Readable子类,_read可修改state.buffer的数量,进行预处理,// 然后由下面的fromList读出去缓存中的相关数据this._read(state.highWaterMark);state.sync = false;}

接下来设置相关的标志位,进行_read处理。针对这个私有方法_read,文档上有特殊说明,自定义的Readable实现类需要实现这个方法,在该方法中手动添加数据到Readable对象的读缓冲区,然后进行Readable的读取。可以理解为_read函数为读取数据前的准备工作(准备数据),针对的是流的实现者而言。

  if (doRead && !state.reading)n = howMuchToRead(nOrig, state);var ret;if (n > 0)ret = fromList(n, state);elseret = null;if (ret === null) {state.needReadable = true;n = 0;}state.length -= n;if (state.length === 0 && !state.ended)state.needReadable = true;if (nOrig !== n && state.ended && state.length === 0)endReadable(this);// flowing模式下的数据读取依赖于 read函数// data事件触发的次数,依赖于howMuchToRead计算的次数if (ret !== null)this.emit('data', ret);

一旦在_read中更新了缓冲区,那么我们需要重新计算(消费者,即可写流)读取的字节数。fromList方法完成了读缓冲区的slice,如果是objectMode下的读,则只读缓冲区的第一个对象;针对未传参数的read方法而言,默认读取全部缓冲区等等。从读缓冲区读取完数据之后设置相关flag,如needReadable,最终,触发data事件,结束!

上节提到,设置data事件的执行函数会进入flowing模式的读,而上文看到正是read方法触发了data事件,而默认条件下Readable处于paused状态,因此在paused状态读取数据需要手动执行read函数,每次read读取完毕触发一次data事件。从这点看出,flowing和paused状态区别在于是否需要手动执行read()来获取数据。flowing状态下,我们无需执行read,仅需要设置data事件处理函数或者设定导流目标pipe;而在paused状态下,不仅仅是简单的执行read方法,因为读缓冲区的内容时刻在改变,一旦读缓冲区又有新数据,简单执行read()就没法满足需求(因为我们无法知道是否又有新数据到来),因此需要侦听读缓冲区的相关事件,即readable事件,在该事件处理函数中进行read相关数据。

那么,什么情况下会触发readable事件呢?在实现_read私有方法中,我们使用stream.push(chunk)或stream.unshift(chunk)方法注入数据到读缓冲区,那么push和unshift方法都实现了下面的逻辑,

if (state.flowing && state.length === 0 && !state.sync) {stream.emit('data', chunk);stream.read(0);
} else {// update the buffer info.state.length += state.objectMode ? 1 : chunk.length;if (addToFront)state.buffer.unshift(chunk);elsestate.buffer.push(chunk);if (state.needReadable)emitReadable(stream);
}function emitReadable(stream) {var state = stream._readableState;state.needReadable = false;if (!state.emittedReadable) {debug('emitReadable', state.flowing);state.emittedReadable = true;if (state.sync)process.nextTick(emitReadable_, stream);elseemitReadable_(stream);}
}function emitReadable_(stream) {debug('emit readable');stream.emit('readable');flow(stream);
}
// 在flowing状态下,自动读取流(替代paused状态下手动read)
function flow(stream) {var state = stream._readableState;debug('flow', state.flowing);if (state.flowing) {do {var chunk = stream.read();} while (null !== chunk && state.flowing);}
}

一旦处于flowing模式并且当前缓冲区没有数据,那么就立即将预处理的push(unshift)数据传递给data事件处理函数,并执行stream.read(0)。前文已经交代过,read(0)仅仅用来触发readable事件,并不读取缓冲区,这就是触发readable的第一种情况。

第二种则是第一种情况之外的所有情景,即根据操作(push、unshift)的不同将数据插入读缓冲区的不同位置。最后执行emitReadable函数,触发readable事件。针对emitReadable函数,它的作用就是异步触发readable事件,并执行flow函数。flow函数则针对flowing状态的Readable做自适应读取,免去了手动执行read函数和何时执行read函数的苦恼。

这样,对于Readable的实现者,一旦在_read函数插入有效数据到读缓冲区,都会触发readable事件,在paused状态下,设置readable事件处理函数并手动执行read函数,便可完成数据的读取;而在flowing状态下,通过设置data事件处理函数或者定义pipe目标流同样可以实现读取。

既然pipe同样可以触发Readable进入flowing状态,那么pipe方法具体做了什么呢?其实pipe针对Readable和Writeable做了限流,首先针对Readable的data事件进行侦听,并执行Writeable的write函数,当Writeable的写缓冲区大于一个临界值(highWaterMark),导致write函数返回false(此时意味着Writeable无法匹配Readable的速度,Writeable的写缓冲区已经满了),此时,pipe修改了Readable模式,执行pause方法,进入paused模式,停止读取读缓冲区。而同时Writeable开始刷新写缓冲区,刷新完毕后异步触发drain事件,在该事件处理函数中,设置Readable为flowing状态,并继续执行flow函数不停的刷新读缓冲区,这样就完成了pipe限流。需要注意的是,Readable和Writeable各自维护了一个缓冲区,在实现的上有区别:Readable的缓冲区是一个数组,存放Buffer、String和Object类型;而Writeable则是一个有向链表,依次存放需要写入的数据。

Writeable解读

Writeable对应Java的OutputStream和Writer类,实现字节和字符数据的写。与Readable类似,Writeable的实例对象同样维护了一个状态对象-WriteableState,记录了当前输出流的状态信息,如写缓冲区的最大值(hightWaterMark)、缓冲区(有向链表)和缓冲区长度等信息。在本节中,主要分析输出流的关键方法write和事件drain,并解析输出流的实现者需要实现的方法**_writewrite**的关系。

function write
----------------------------
if (state.ended)writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {state.pendingcb++;ret = writeOrBuffer(this, state, chunk, encoding, cb);}return ret;

在write方法中,判断写入数据的格式并执行writeOrBuffer函数,并返回执行结果,该返回值标示当前写缓冲区是否已满。真正执行写入逻辑的是writeOrBuffer函数,该函数的作用在于刷新或者更新写缓冲区,下面看看主要做了什么,

function writeOrBuffer(stream, state, chunk, encoding, cb) {chunk = decodeChunk(state, chunk, encoding);if (chunk instanceof Buffer)encoding = 'buffer';var len = state.objectMode ? 1 : chunk.length;state.length += len;// 如果缓存的长度大于highWaterMark,需要刷新缓冲,所以设置needDrain标志var ret = state.length < state.highWaterMark;// we must ensure that previous needDrain will not be reset to false.if (!ret)state.needDrain = true;// 缓存未处理的写请求,在clearBuffer中执行缓存// 由此看出,Readable和Writeable都有缓存,Readable 中缓存的方式是数组(项为Buffer,字符串或对象),Writeable的// 缓存则是对象链表if (state.writing || state.corked) {var last = state.lastBufferedRequest;state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);if (last) {last.next = state.lastBufferedRequest;} else {state.bufferedRequest = state.lastBufferedRequest;}state.bufferedRequestCount += 1;} else {doWrite(stream, state, false, len, chunk, encoding, cb);}return ret;
}

writeOrBuffer首先针对数据进行编码,字符串转换成Buffer类型,如果设置了Writeable的ObjectMode模式则仍为Object类型;接下来更新写缓冲区的长度,并判断写缓冲区长度是否超过设定的Writeable的最大值(默认16k),如果超过超过则ret=false并更新WriteableState的属性needDrain=true。ret的结果其实就是write方法返回值,因此一旦write返回值为false,意味着当前写缓冲区已满,需要停止继续写入数据。

在Readable的pipe方法中,涉及到了Writeable的drain事件。该事件的触发意味着写缓冲区已可以继续缓存数据,可见drain事件与写缓冲区严格相关。继续分析writeOrBuffer函数,若当前输出流正在写数据,那么则当前数据缓存至写缓冲区(创建WriteReq对象);否则执行doWrite函数,刷新缓冲区。

function doWrite(stream, state, writev, len, chunk, encoding, cb) {state.writelen = len;state.writecb = cb;state.writing = true;state.sync = true;if (writev)stream._writev(chunk, state.onwrite);elsestream._write(chunk, encoding, state.onwrite);state.sync = false;
}

doWrite函数设置了需要写入数据的长度、写入状态等信息,并执行输出流实现者需要实现的_write函数。在_write函数中,针对数据流向做最后的处理,这里分析_write函数的具体实现。_write函数有三个参数,分别为chunk,encoding和state.onwrite回调函数,对该回调函数稍后分析,先着重讲解_write函数的实现。在node的fs模块中,可以通过fs.createWriteStream创建Writeable实例,通过执行

var writeStream = fs.createWriteStream('./output',{decodeStrings: false});
console.log(writeStream._write.toString());-----------------输出-----------------function (data, encoding, cb) {if (!(data instanceof Buffer))return this.emit('error', new Error('Invalid data'));if (typeof this.fd !== 'number')return this.once('open', function() {this._write(data, encoding, cb);});var self = this;fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {if (er) {self.destroy();return cb(er);}self.bytesWritten += bytes;cb();});if (this.pos !== undefined)this.pos += data.length;
}

看出,在_write实现中,只接受Buffer类型的数据,接着执行fs.write操作,写入到对应文件描述符fd对应的文件中,写入成功或失败后执行回调函数,即state.onwrite函数。

function onwrite(stream, er) {var state = stream._writableState;var sync = state.sync;var cb = state.writecb;onwriteStateUpdate(state);// 默认未重写_write方法,会收到er值if (er)onwriteError(stream, state, sync, er, cb);else {// Check if we're actually ready to finish, but don't emit yetvar finished = needFinish(state);// 写缓存的数据if (!finished &&!state.corked &&!state.bufferProcessing &&state.bufferedRequest) {clearBuffer(stream, state);}// 异步触发drain事件if (sync) {process.nextTick(afterWrite, stream, state, finished, cb);} else {afterWrite(stream, state, finished, cb);}}
}

在state.onwrite函数中主要工作有两个:

  • 写缓冲区的数据
  • 写完缓冲区的数据后,异步触发drain事件

第一步,在clearBuffer函数中,就是取出写缓冲区(有向链表)的第一个WriteReq对象,执行doWrite函数,写入缓冲区的第一个数据;这样循环往复最终清空写缓冲区,重置一些标志位。

第二步,异步执行afterWrite函数,触发drain事件,并判断是否写操作完毕触发“finish”事件。这里之所以强调异步触发drain事件,是因为为了保证先获得write()返回值为false,给用户绑定drain处理函数的时隙,然后再触发drain事件。

至此,Writeable的重要流程已全部走通。可以看出来,在核心的write()中,判断写缓冲区是否已满并返回该值,在适当条件下缓存数据或调用_write()写数据,在Writeable实现者需要实现的** _write() 中,主要任务是数据写入方向控制,完成最基本的任务**。

总结

对比Readable的read()和_read(),我总结了下这四个函数在“读写过程”中的执行顺序与关系,如下图所示:
Readable和Writeable的函数执行顺序

转载于:https://www.cnblogs.com/accordion/p/5560531.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/395816.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQL Server读写分离之发布订阅

一、发布 上面有多种发布方式&#xff0c;这里我选择事物发布&#xff0c;具体区别请自行百度。 点击下一步、然后继续选择需要发布的对象。 如果需要筛选发布的数据点击添加。 根据自己的计划选择发布的时间。 点击安全设置&#xff0c;设置代理信息。 最后单击完成系统会自动…

码农和程序员的几个重要区别!

如果一个企业老板大声嚷嚷说&#xff0c;“我要招个程序员”&#xff0c;那么十之八九指的是“码农”——一种纯粹为了钱而写代码的技术人员。这其实是一种非常狭隘和错误的做法&#xff0c;原因么&#xff0c;且听我一一道来。1、码农写代码&#xff0c;程序员写系统从本质上讲…

移动端工程架构与后端工程架构的思想摩擦之旅(1)

此文已由作者黎星授权网易云社区发布。欢迎访问网易云社区&#xff0c;了解更多网易技术产品运营经验记资源投放后端工程的架构调整与优化 架构思考一直以来对软件工程架构有着极大的兴趣&#xff0c;无论是之前负责的移动端Android工程&#xff0c;亦或是现在转到后端开发后维…

logging记录日志

日志是一个系统的重要组成部分&#xff0c;用以记录用户操作、系统运行状态和错误信息。日志记录的好坏直接关系到系统出现问题时定位的速度。logging模块Python2.3版本开始成为Python标准库的一部分。 日志级别 在最简单的使用中&#xff0c;我们直接导入logging模块&#xff…

C#编程之接口

1.定义 接口是把公共方法和属性组合起来&#xff0c;以封装特定功能的一个集合。&#xff08;一旦定义了接口&#xff0c;就可以在类中实现它。这样类就可以支持接口所指定的所有属性和成员&#xff09; 注意1&#xff1a;接口不能单独存在。不能像实例化一个类那样实例化一个接…

supervisor守护进程

2019独角兽企业重金招聘Python工程师标准>>> supervisor 是一个client/server系统,把不是守护进程的进程变成守护进程,并监控和控制类 Unix 操作系统上的进程。 upervisor就是用Python开发的一套通用的进程管理程序&#xff0c;能将一个普通的命令行进程变为后台dae…

【学习笔记】深入理解js原型和闭包(11)——执行上下文栈

继续上文的内容。 执行全局代码时&#xff0c;会产生一个执行上下文环境&#xff0c;每次调用函数都又会产生执行上下文环境。当函数调用完成时&#xff0c;这个上下文环境以及其中的数据都会被消除&#xff0c;再重新回到全局上下文环境。处于活动状态的执行上下文环境只有一个…

Java基础--访问权限控制符

今天我们来探讨一下访问权限控制符。 使用场景一&#xff1a;攻城狮A编写了ClassA&#xff0c;但是他不想所有的攻城狮都可以使用该类&#xff0c;应该怎么办&#xff1f; 使用场景二&#xff1a;攻城狮A编写了ClassA&#xff0c;里面有func1方法和func2方法&#xff0c;但是他…

Dubbo简单介绍及实例

1、概念 Dubbo是一个分布式服务框架&#xff0c;以及阿里巴巴内部的SOA服务化治理方案的核心框架。其功能主要包含&#xff1a;高性能NIO通讯及多协议集成。服务动态寻址与路由。软负载均衡与容错&#xff0c;依赖分析与降级等。 说通俗点&#xff0c;就是首先将程序组件化成一…

bzoj1116: [POI2008]CLO

传送门&#xff1a;http://www.lydsy.com/JudgeOnline/problem.php?id1116 题目大意&#xff1a;Byteotia城市有n个 towns m条双向roads. 每条 road 连接 两个不同的 towns ,没有重复的road. 你要把其中一些road变成单向边使得&#xff1a;每个town都有且只有一个入度 题解&am…

java排序算法大全_各种排序算法的分析及java实现

排序一直以来都是让我很头疼的事&#xff0c;以前上《数据结构》打酱油去了&#xff0c;整个学期下来才勉强能写出个冒泡排序。由于要找工作了&#xff0c;也知道排序算法的重要性(据说是面试必问的知识点)&#xff0c;所以又花了点时间重新研究了一下。排序大的分类可以分为两…

6/12 Sprint2 看板和燃尽图

转载于:https://www.cnblogs.com/queenjuan/p/5578551.html

转:PHP应用性能优化指南

程序员都喜欢最新的PHP 7&#xff0c;因为它使PHP成为执行最快的脚本语言之一&#xff08;参考PHP 7 vs HHVM 比较&#xff09;。但是保持最佳性能不仅需要快速执行代码&#xff0c;更需要我们知道影响性能的问题点&#xff0c;以及这些问题的解决方案。本文涵盖了保障PHP应用平…

java list集合增删改_Java中集合类list的增删改查

今天给大家带来的是Java中list类的使用&#xff0c;java.util 包提供了list类来对线性数据操作List接口是Collection接口的子接口&#xff0c;List有一个重要的实现类--ArrayList类&#xff0c;List中的元素是有序排列的而且可重复&#xff0c;所以被称为是序列List可以精确的控…

IIS6、IIS7和IIS8各版本的差别

一、写在前面 目前市面上所用的IIS版本估计都是>6.0的.所以我们主要以下面三个版本进行讲解 服务器版本IIS默认版本server20036.0server20087.0server20128.0二、IIS6的请求过程 由图可知,所有的请求会被服务器中的http.sys组件监听到,它会根据IIS中的 Metabase 查看基于该 …

Android Studio 插件的使用

1、GsonFormat https://github.com/zzz40500/GsonFormat 2、Android SelectorChapek http://blog.csdn.net/weifei554287925/article/details/41727541

安卓Java虚拟机大小_虚拟机为安卓流畅度背锅,是因为关系数十万程序员饭碗?...

导读&#xff1a;虚拟机相当于应用程序在不同运行环境中的翻译。说起谷歌安卓系统的“虚拟机”&#xff0c;很多人爱拿它和苹果iOS做比较&#xff0c;结果&#xff0c;安卓的很多短腿儿都让虚拟机背了锅&#xff0c;比如安卓手机运存容量是iPhone的两到三倍&#xff0c;流畅度却…

AppCompatActivity实现全屏的问题

前言&#xff1a;我的 Activity 是继承 BaseActivity , 而 BaseActivity 继承 AppCompatActivity 。 BaseActivity 的继承 /*** 应用程序的基类**/ public class BaseActivity extends AppCompatActivity {}HomeActivity 的继承 public class HomeActivity extends BaseActivit…

Cinder 组件详解 - 每天5分钟玩转 OpenStack(47)

本节我们将详细讲解 Cinder 的各个子服务。 cinder-api cinder-api 是整个 Cinder 组件的门户&#xff0c;所有 cinder 的请求都首先由 nova-api 处理。cinder-api 向外界暴露若干 HTTP REST API 接口。在 keystone 中我们可以查询 cinder-api 的 endponits。 客户端可以将请…

RedHat Enterprise Linux 6 配置Xmanager ,实现图形界面连接

我们经常见到的几种最为常用的windows下远程管理Linux服务器的方法&#xff0c;基本上都是利用SecureCRT,或者是PUTTY等客户端工具通过ssh服务来实现Windows下管理Linux服务器的&#xff0c;这些客户端工具几乎不需要什么配置&#xff0c;使用简单&#xff0c;但是它们都无法启…