Node.js Stream.pipeline() Method

Why Stream.pipeline

通过流我们可以将一大块数据拆分为一小部分一点一点的流动起来,而无需一次性全部读入,在 Linux 下我们可以通过 | 符号实现,类似的在 Nodejs 的 Stream 模块中同样也为我们提供了 pipe() 方法来实现。

未使用 Stream pipe 情况

在 Nodejs 中 I/O 操作都是异步的,先用 util 模块的 promisify 方法将 fs.readFile 的 callback 形式转为 Promise 形式,这块代码看似没问题,但是它的体验不是很好,因为它是将数据一次性读入内存再进行的返回,当数据文件很大的时候也是对内存的一种消耗,因此不推荐它。

const Koa = require('koa');
const fs = require('fs');
const app = new Koa();
const { promisify } = require('util');
const { resolve } = require('path');
const readFile = promisify(fs.readFile);app.use(async ctx => {try {ctx.body = await readFile(resolve(__dirname, 'test.json'));} catch(err) { ctx.body = err };
});app.listen(3000);

使用 Stream pipe 情况

下面,再看看怎么通过 Stream 的方式在 Koa 框架中响应数据

...
app.use(async ctx => {try {const readable = fs.createReadStream(resolve(__dirname, 'test.json'));ctx.body = readable;} catch(err) { ctx.body = err };
});

以上在 Koa 中直接创建一个可读流赋值给 ctx.body 就可以了,你可能疑惑了为什么没有 pipe 方法,因为框架给你封装好了,不要被表象所迷惑了,看下相关源码:

// https://github.com/koajs/koa/blob/master/lib/application.js#L256
function respond(ctx) {...let body = ctx.body;if (body instanceof Stream) return body.pipe(res);...
}

没有神奇之处,框架在返回的时候做了层判断,因为 res 是一个可写流对象,如果 body 也是一个 Stream 对象(此时的 Body 是一个可读流),则使用 body.pipe(res) 以流的方式进行响应。

使用 Stream VS 不使用 Stream

动图封面

动图封面

What Stream.pipeline function

The stream.pipeline() method is a module method that is used to the pipe by linking the streams passing on errors and accurately cleaning up and providing a callback function when the pipeline is done. 

Stream.pipeline() 方法是一种模块方法,用于通过链接传递错误的流并在管道完成时准确地清理并提供回调函数来用于管道。

Syntax:

stream.pipeline(...streams, callback)

Parameters: 

This method accepts two parameters as mentioned above and described below.该方法接受如上所述和如下所述的两个参数。

  • …streams: These are two or more streams that are to be piped together.这些是要通过管道连接在一起的两个或多个流。
  • callback: This function is called when the pipeline is fully done and it shows an ‘error’ if the pipeline is not accomplished.当管道完全完成时调用此函数,如果管道未完成,则会显示“错误”。

Return Value:

 It returns a cleanup function. 返回值:它返回一个清理函数。

The below examples illustrate the use of the stream.pipeline() method in Node.js: 

以下示例说明了 Node.js 中的

stream.pipeline() 方法的用法

Example 1: 

// Node.js program to demonstrate the   

// stream.pipeline() method

// Including fs and zlib module

const fs = require('fs');

const zlib = require('zlib');

// Constructing finished from stream

const { pipeline } = require('stream');

// Constructing promisify from

// util

const { promisify } = require('util');

// Defining pipelineAsync method

const pipelineAsync = promisify(pipeline);

// Constructing readable stream

const readable = fs.createReadStream("input.text");

// Constructing writable stream

const writable = fs.createWriteStream("output.text");

// Creating transform stream

const transform = zlib.createGzip();

// Async function

(async function run() {

    try {

        // pipelining three streams

        await pipelineAsync(

            readable,

            transform,

            writable

        );

        console.log("pipeline accomplished.");

    }

    // Shows error

    catch (err) {

        console.error('pipeline failed with error:', err);

    }

})();

Output:

Promise {  }
pipeline accomplished.

Example 2: 

// Node.js program to demonstrate the   

// stream.pipeline() method

// Including fs and zlib module

const fs = require('fs');

const zlib = require('zlib');

// Constructing finished from stream

const { pipeline } = require('stream');

// Constructing promisify from

// util

const { promisify } = require('util');

// Defining pipelineAsync method

const pipelineAsync = promisify(pipeline);

// Constructing readable stream

const readable = fs.createReadStream("input.text");

// Constructing writable stream

const writable = fs.createWriteStream("output.text");

// Creating transform stream

const transform = zlib.createGzip();

// Async function

(async function run() {

    try {

        // pipelining three streams

        await pipelineAsync(

            readable,

            writable,

            transform

        );

        console.log("pipeline accomplished.");

    }

    // Shows error

    catch (err) {

        console.error('pipeline failed with error:', err);

    }

})();

Output: Here, the order of streams is not proper while piping so an error occurs.

Promise {  }
pipeline failed with error: Error [ERR_STREAM_CANNOT_PIPE]: Cannot pipe, not readableat WriteStream.Writable.pipe (_stream_writable.js:243:24)at pipe (internal/streams/pipeline.js:57:15)at Array.reduce ()at pipeline (internal/streams/pipeline.js:88:18)at Promise (internal/util.js:274:30)at new Promise ()at pipeline (internal/util.js:273:12)at run (/home/runner/ThirstyTimelyKey/index.js:33:11)at /home/runner/ThirstyTimelyKey/index.js:45:5at Script.runInContext (vm.js:133:20)

解析Stream.PipeLine

在应用层我们调用了 fs.createReadStream() 这个方法,顺藤摸瓜找到这个方法创建的可读流对象的 pipe 方法实现,以下仅列举核心代码实现,基于 Nodejs v12.x 源码。

2.1.1 /lib/fs.js

导出一个 createReadStream 方法,在这个方法里面创建了一个 ReadStream 可读流对象,且 ReadStream 来自 internal/fs/streams 文件,继续向下找。

// https://github.com/nodejs/node/blob/v12.x/lib/fs.js
// 懒加载,主要在用到的时候用来实例化 ReadStream、WriteStream ... 等对象
function lazyLoadStreams() {if (!ReadStream) {({ ReadStream, WriteStream } = require('internal/fs/streams'));[ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];}
}function createReadStream(path, options) {lazyLoadStreams();return new ReadStream(path, options); // 创建一个可读流
}module.exports = fs = {createReadStream, // 导出 createReadStream 方法...
}

2.1.2 /lib/internal/fs/streams.js

这个方法里定义了构造函数 ReadStream,且在原型上定义了 open、_read、_destroy 等方法,并没有我们要找的 pipe 方法。

但是呢通过 ObjectSetPrototypeOf 方法实现了继承,ReadStream 继承了 Readable 在原型中定义的函数,接下来继续查找 Readable 的实现

// https://github.com/nodejs/node/blob/v12.x/lib/internal/fs/streams.js
const { Readable, Writable } = require('stream');function ReadStream(path, options) {if (!(this instanceof ReadStream))return new ReadStream(path, options);...Readable.call(this, options);...
}
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);ReadStream.prototype.open = function() { ... };ReadStream.prototype._read = function(n) { ... };;ReadStream.prototype._destroy = function(err, cb) { ... };
...module.exports = {ReadStream,WriteStream
};

2.1.3 /lib/stream.js

在 stream.js 的实现中,有条注释:在 Readable/Writable/Duplex/... 之前导入 Stream,原因是为了避免 cross-reference(require),为什么会这样?

第一步 stream.js 这里将 require('internal/streams/legacy') 导出复制给了 Stream。

在之后的 _stream_readable、Writable、Duplex ... 模块也会反过来引用 stream.js 文件,具体实现下面会看到。

Stream 导入了 internal/streams/legacy

上面 /lib/internal/fs/streams.js 文件从 stream 模块获取了一个 Readable 对象,就是下面的 Stream.Readable 的定义。

// https://github.com/nodejs/node/blob/v12.x/lib/stream.js
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
...

2.1.4 /lib/internal/streams/legacy.js

上面的 Stream 等于 internal/streams/legacy,首先继承了 Events 模块,之后呢在原型上定义了 pipe 方法,刚开始看到这里的时候以为实现是在这里了,但后来看 _stream_readable 的实现之后,发现 _stream_readable 继承了 Stream 之后自己又重新实现了 pipe 方法,那么疑问来了这个模块的 pipe 方法是干嘛的?什么时候会被用?翻译文件名 “legacy=遗留”?有点没太理解,难道是遗留了?有清楚的大佬可以指点下,也欢迎在公众号 “Nodejs技术栈” 后台加我微信一块讨论下!

// https://github.com/nodejs/node/blob/v12.x/lib/internal/streams/legacy.js
const {ObjectSetPrototypeOf,
} = primordials;
const EE = require('events');
function Stream(opts) {EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);Stream.prototype.pipe = function(dest, options) {...
};module.exports = Stream;

2.1.5 /lib/_stream_readable.js

在 _stream_readable.js 的实现里面定义了 Readable 构造函数,且继承于 Stream,这个 Stream 正是我们上面提到的 /lib/stream.js 文件,而在 /lib/stream.js 文件里加载了 internal/streams/legacy 文件且重写了里面定义的 pipe 方法。

经过上面一系列的分析,终于找到可读流的 pipe 在哪里,同时也更进一步的认识到了在创建一个可读流时的执行调用过程,下面将重点来看这个方法的实现。

module.exports = Readable;
Readable.ReadableState = ReadableState;const EE = require('events');
const Stream = require('stream');ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);function Readable(options) {if (!(this instanceof Readable))return new Readable(options);...Stream.call(this, options); // 继承自 Stream 构造函数的定义
}
...

2.2 _stream_readable 实现分析

2.2.1 声明构造函数 Readable

声明构造函数 Readable 继承 Stream 的构造函数和原型。

Stream 是 /lib/stream.js 文件,上面分析了,这个文件继承了 events 事件,此时也就拥有了 events 在原型中定义的属性,例如 on、emit 等方法。

const Stream = require('stream');
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);function Readable(options) {if (!(this instanceof Readable))return new Readable(options);...Stream.call(this, options);
}

2.2.2 声明 pipe 方法,订阅 data 事件

在 Stream 的原型上声明 pipe 方法,订阅 data 事件,src 为可读流对象,dest 为可写流对象。

我们在使用 pipe 方法的时候也是监听的 data 事件,一边读取数据一边写入数据。

看下 ondata() 方法里的几个核心实现:

  • dest.write(chunk):接收 chunk 写入数据,如果内部的缓冲小于创建流时配置的 highWaterMark,则返回 true,否则返回 false 时应该停止向流写入数据,直到 'drain' 事件被触发
  • src.pause():可读流会停止 data 事件,意味着此时暂停数据写入了。

之所以调用 src.pause() 是为了防止读入数据过快来不及写入,什么时候知道来不及写入呢,要看 dest.write(chunk) 什么时候返回 false,是根据创建流时传的 highWaterMark 属性,默认为 16384 (16kb),对象模式的流默认为 16。

Readable.prototype.pipe = function(dest, options) {const src = this;src.on('data', ondata);function ondata(chunk) {const ret = dest.write(chunk);if (ret === false) {...src.pause();}}...
};

2.2.3 订阅 drain 事件,继续流动数据

上面提到在 data 事件里,如果调用 dest.write(chunk) 返回 false,就会调用 src.pause() 停止数据流动,什么时候再次开启呢?

如果说可以继续写入事件到流时会触发 drain 事件,也是在 dest.write(chunk) 等于 false 时,如果 ondrain 不存在则注册 drain 事件。

Readable.prototype.pipe = function(dest, options) {const src = this;src.on('data', ondata);function ondata(chunk) {const ret = dest.write(chunk);if (ret === false) {...if (!ondrain) {// When the dest drains, it reduces the awaitDrain counter// on the source.  This would be more elegant with a .once()// handler in flow(), but adding and removing repeatedly is// too slow.ondrain = pipeOnDrain(src);dest.on('drain', ondrain);}src.pause();}}...
};// 当可写入流 dest 耗尽时,它将会在可读流对象 source 上减少 awaitDrain 计数器
// 为了确保所有需要缓冲的写入都完成,即 state.awaitDrain === 0 和 src 可读流上的 data 事件存在,切换流到流动模式
function pipeOnDrain(src) {return function pipeOnDrainFunctionResult() {const state = src._readableState;debug('pipeOnDrain', state.awaitDrain);if (state.awaitDrain)state.awaitDrain--;if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {state.flowing = true;flow(src);}};
}// stream.read() 从内部缓冲拉取并返回数据。如果没有可读的数据,则返回 null。在可读流上 src 还有一个 readable 属性,如果可以安全地调用 readable.read(),则为 true
function flow(stream) {const state = stream._readableState;debug('flow', state.flowing);while (state.flowing && stream.read() !== null);
}

2.2.4 触发 data 事件

调用 readable 的 resume() 方法,触发可读流的 'data' 事件,进入流动模式。

Readable.prototype.pipe = function(dest, options) {const src = this;// Start the flow if it hasn't been started already.if (!state.flowing) {debug('pipe resume');src.resume();}...

然后实例上的 resume(Readable 原型上定义的)会在调用 resume() 方法,在该方法内部又调用了 resume_(),最终执行了 stream.read(0) 读取了一次空数据(size 设置的为 0),将会触发实例上的 _read() 方法,之后会在触发 data 事件。

function resume(stream, state) {...process.nextTick(resume_, stream, state);
}function resume_(stream, state) {debug('resume', state.reading);if (!state.reading) {stream.read(0);}...
}

2.2.5 订阅 end 事件

end 事件:当可读流中没有数据可供消费时触发,调用 onend 函数,执行 dest.end() 方法,表明已没有数据要被写入可写流,进行关闭(关闭可写流的 fd),之后再调用 stream.write() 会导致错误。

Readable.prototype.pipe = function(dest, options) {...const doEnd = (!pipeOpts || pipeOpts.end !== false) &&dest !== process.stdout &&dest !== process.stderr;const endFn = doEnd ? onend : unpipe;if (state.endEmitted)process.nextTick(endFn);elsesrc.once('end', endFn);dest.on('unpipe', onunpipe);...function onend() {debug('onend');dest.end();}
}

2.2.6 触发 pipe 事件

在 pipe 方法里面最后还会触发一个 pipe 事件,传入可读流对象

Readable.prototype.pipe = function(dest, options) {...const source = this;dest.emit('pipe', src);...
};

在应用层使用的时候可以在可写流上订阅 pipe 事件,做一些判断,具体可参考官网给的这个示例 stream_event_pipe[1]

2.2.7 支持链式调用

最后返回 dest,支持类似 unix 的用法:A.pipe(B).pipe(C)

Readable.prototype.pipe = function(dest, options) {return dest;
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/637219.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pyro —— Understanding how pyro works

目录 Simulation fields Inside the Pyro Solver Colliders Pressure projection Simulation fields Pyro是纯体积流体解算器,表示流体状态的数据存于各种标量场和矢量场;Smoke Object会创建这些场,且能可视化这些场; vel场&…

【JavaEE】网络原理:网络中的一些基本概念

目录 1. 网络通信基础 1.1 IP地址 1.2 端口号 1.3 认识协议 1.4 五元组 1.5 协议分层 什么是协议分层 分层的作用 OSI七层模型 TCP/IP五层(或四层)模型 网络设备所在分层 网络分层对应 封装和分用 1. 网络通信基础 1.1 IP地址 概念:IP地址…

C语言/c++指针详细讲解【超详细】【由浅入深】

指针用法简单介绍 指针,是内存单元的编号。 内存条分好多好多小单元,一个小单元有 8 位,可以存放 8 个 0 或 1;也就是说,内存的编号不是以位算的,而是以字节算的,不是一个 0 或 1 是一个编号&…

立体视觉几何(一)

1.什么是立体视觉几何 立体视觉对应重建: • 对应:给定一幅图像中的点pl,找到另一幅图像中的对应点pr。 • 重建:给定对应关系(pl, pr),计算空间中相应点的3D 坐标P。 立体视觉:从图像中的投影恢复场景中点…

list下

文章目录 注意:const迭代器怎么写?运用场合? inserterase析构函数赋值和拷贝构造区别?拷贝构造不能写那个swap,为什么?拷贝构造代码 面试问题什么是迭代器失效?vector、list的区别? 完整代码 注…

qt学习:QT对话框+颜色+文件+字体+输入

目录 概述 继承图 QColorDialog 颜色对话框 QFileDialog 文件对话框 保存文件对话框 QFontDialog 字体对话框 QInputDialog 输入对话框 概述 对于对话框的功能,在GUI图形界面开发过程,使用是非常多,那么Qt也提供了丰富的对话框类QDia…

网络:FTP

1. FTP 文件传输协议,FTP是用来传输文件的协议。使用FTP实现远程文件传输的同时,还可以保证数据传输的可靠性和高效性。 2. 特点 明文传输。 作用:可以从服务器上下载文件,或将本地文件上传到服务器。 3. FTP原理 FTP有控制层面…

坦克大战游戏代码

坦克大战游戏 主函数战场面板开始界面坦克父类敌方坦克我方坦克子弹爆炸效果数据存盘及恢复图片 主函数 package cn.wenxiao.release9;import java.awt.event.ActionEvent; import java.awt.event.ActionListener;import javax.swing.JFrame; import javax.swing.JMenu; impor…

RS-485通讯

RS-485通讯协议简介 与CAN类似,RS-485是一种工业控制环境中常用的通讯协议,它具有抗干扰能力强、传输距离远的特点。RS-485通讯协议由RS-232协议改进而来,协议层不变,只是改进了物理层,因而保留了串口通讯协议应用简单…

【HarmonyOS】掌握布局组件,提升应用体验

从今天开始,博主将开设一门新的专栏用来讲解市面上比较热门的技术 “鸿蒙开发”,对于刚接触这项技术的小伙伴在学习鸿蒙开发之前,有必要先了解一下鸿蒙,从你的角度来讲,你认为什么是鸿蒙呢?它出现的意义又是…

【RT-DETR有效改进】华为 | GhostnetV2移动端的特征提取网络效果完爆MobileNet系列

前言 大家好,这里是RT-DETR有效涨点专栏。 本专栏的内容为根据ultralytics版本的RT-DETR进行改进,内容持续更新,每周更新文章数量3-10篇。 专栏以ResNet18、ResNet50为基础修改版本,同时修改内容也支持ResNet32、ResNet101和PP…

自动控制原理——数学模型建立

目标 1.数学模型概念 描述系统输入、输出变量以及内部个变量之间的关系的数学表达式 2.建模方法 解析法(机理解析法): 根据系统工作所依据的物理定律写运动方程 实验法(系统辨识法): 给系统施加某种测试信号&am…

万户 ezOFFICE wf_process_attrelate_aiframe.jsp SQL注入漏洞复现

0x01 产品简介 万户OA ezoffice是万户网络协同办公产品多年来一直将主要精力致力于中高端市场的一款OA协同办公软件产品,统一的基础管理平台,实现用户数据统一管理、权限统一分配、身份统一认证。统一规划门户网站群和协同办公平台,将外网信息维护、客户服务、互动交流和日…

Intel开发环境Quartus、Eclipse与WSL的安装

PC :win10 64bit 安装顺序:先安装Quartus 21.4,接着Eclipse或者WSL(Windows Subsystem for Linux),Eclipse与WSL的安装不分先后。 为什么要安装Eclipse? 因为Eclipse可以开发基于Nios II的C/…

SwiftUI 框架有哪些主要优势

SwiftUI是苹果公司在2019年推出的一种用于构建用户界面的框架,它使用Swift语言编写,并且与iOS、iPadOS、macOS、watchOS和tvOS等平台兼容。下面简单的看下有哪些主要的优势。 声明式的界面描述 使用声明式编程风格,通过简洁的代码描述用户界…

力扣645.错误的集合

一点一点地刷,慢慢攻克力扣!! 王子公主请看题 集合 s 包含从 1 到 n 的整数。不幸的是,因为数据错误,导致集合里面某一个数字复制了成了集合里面的另外一个数字的值,导致集合 丢失了一个数字 并且 有一个数…

C++:基于C的语法优化

C:基于C的语法优化 命名空间命名空间域域作用限定符展开命名空间域 输入输出缺省参数全缺省参数半缺省参数 函数重载参数类型不同参数个数不同参数类型的顺序不同 引用基本语法按引用传递返回引用引用与指针的区别 内联函数autoauto与指针和引用结合 范围for循环nul…

红队打靶练习:W34KN3SS: 1

目录 信息收集 1、arp 2、nmap 3、nikto 4、gobuster 5、dirsearch WEB web信息收集 目录探测 漏洞利用 openssl密钥碰撞 SSH登录 提权 get user.txt get passwd 信息收集 1、arp ┌──(root㉿ru)-[~/kali] └─# arp-scan -l Interface: eth0, type: EN10MB…

羊驼系列大模型LLaMa、Alpaca、Vicuna

羊驼系列大模型:大模型的安卓系统 GPT系列:类比ios系统,不开源 LLaMa让大模型平民化 LLaMa优势 用到的数据:大部分英语、西班牙语,少中文 模型下载地址 https://huggingface.co/meta-llama Alpaca模型 Alpaca是斯…

java枚举详细解释

枚举的基本认识 我们一般直接定义一个单独的枚举类 public enum 枚举类名{枚举项1,枚举项2,枚举项3 } 可以通过 枚举类名.枚举项 来访问该枚举项的 - 可以理解为 枚举项就是我们自己定义的一个数据类型,是独一无二的 接下来我们直接用一个例子来完全理解 加深理解 这里…