Node.js Stream - 基础篇

背景

在构建较复杂的系统时,通常将其拆解为功能独立的若干部分。这些部分的接口遵循一定的规范,通过某种方式相连,以共同完成较复杂的任务。譬如,shell通过管道|连接各部分,其输入输出的规范是文本流。

在Node.js中,内置的Stream模块也实现了类似功能,各部分通过.pipe()连接。

鉴于目前国内系统性介绍Stream的文章较少,而越来越多的开源工具都使用了Stream,本系列文章将从以下几方面来介绍相关内容:

  1. 流的基本类型,以及Stream模块的基本使用方法
  2. 流式处理与back pressure的工作原理
  3. 如何开发流式程序,包括对Gulp与Browserify的剖析,以及一个实战示例。

本文为系列文章的第一篇。

流的四种类型

Stream提供了以下四种类型的流:

var Stream = require('stream')var Readable = Stream.Readable
var Writable = Stream.Writable
var Duplex = Stream.Duplex
var Transform = Stream.Transform

使用Stream可实现数据的流式处理,如:

var fs = require('fs')
// `fs.createReadStream`创建一个`Readable`对象以读取`bigFile`的内容,并输出到标准输出
// 如果使用`fs.readFile`则可能由于文件过大而失败
fs.createReadStream(bigFile).pipe(process.stdout)

Readable

创建可读流。

实例:流式消耗迭代器中的数据。

'use strict'
const Readable = require('stream').Readableclass ToReadable extends Readable {constructor(iterable) {super()this.iterator = new function *() {yield * iterable}}// 子类需要实现该方法// 这是生产数据的逻辑_read() {const res = this.iterator.next()if (res.done) {// 数据源已枯竭,调用`push(null)`通知流this.push(null)} else {// 通过`push`方法将数据添加到流中this.push(res.value + '\n')}}
}module.exports = ToReadable

实际使用时,new ToReadable(iterable)会返回一个可读流,下游可以流式的消耗迭代器中的数据。

const iterable = function *(limit) {while (limit--) {yield Math.random()}
}(1e10)const readable = new ToReadable(iterable)// 监听`data`事件,一次获取一个数据
readable.on('data', data => process.stdout.write(data))// 所有数据均已读完
readable.on('end', () => process.stdout.write('DONE'))

执行上述代码,将会有100亿个随机数源源不断地写进标准输出流。

创建可读流时,需要继承Readable,并实现_read方法。

  • _read方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑。
  • _read方法中,通过调用push(data)将数据放入可读流中供下游消耗。
  • _read方法中,可以同步调用push(data),也可以异步调用。
  • 当全部数据都生产出来后,必须调用push(null)来结束可读流。
  • 流一旦结束,便不能再调用push(data)添加数据。

可以通过监听data事件的方式消耗可读流。

  • 在首次监听其data事件后,readable便会持续不断地调用_read(),通过触发data事件将数据输出。
  • 第一次data事件会在下一个tick中触发,所以,可以安全地将数据输出前的逻辑放在事件监听后(同一个tick中)。
  • 当数据全部被消耗时,会触发end事件。

上面的例子中,process.stdout代表标准输出流,实际是一个可写流。下小节中介绍可写流的用法。

Writable

创建可写流。

前面通过继承的方式去创建一类可读流,这种方法也适用于创建一类可写流,只是需要实现的是_write(data, enc, next)方法,而不是_read()方法。

有些简单的情况下不需要创建一类流,而只是一个流对象,可以用如下方式去做:

const Writable = require('stream').Writableconst writable = Writable()
// 实现`_write`方法
// 这是将数据写入底层的逻辑
writable._write = function (data, enc, next) {// 将流中的数据写入底层process.stdout.write(data.toString().toUpperCase())// 写入完成时,调用`next()`方法通知流传入下一个数据process.nextTick(next)
}// 所有数据均已写入底层
writable.on('finish', () => process.stdout.write('DONE'))// 将一个数据写入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')// 再无数据写入流时,需要调用`end`方法
writable.end()
  • 上游通过调用writable.write(data)将数据写入可写流中。write()方法会调用_write()data写入底层。
  • _write中,当数据成功写入底层后,必须调用next(err)告诉流开始处理下一个数据。
  • next的调用既可以是同步的,也可以是异步的。
  • 上游必须调用writable.end(data)来结束可写流,data是可选的。此后,不能再调用write新增数据。
  • end方法调用后,当所有底层的写操作均完成时,会触发finish事件。

Duplex

创建可读可写流。

Duplex实际上就是继承了ReadableWritable的一类流。
所以,一个Duplex对象既可当成可读流来使用(需要实现_read方法),也可当成可写流来使用(需要实现_write方法)。

var Duplex = require('stream').Duplexvar duplex = Duplex()// 可读端底层读取逻辑
duplex._read = function () {this._readNum = this._readNum || 0if (this._readNum > 1) {this.push(null)} else {this.push('' + (this._readNum++))}
}// 可写端底层写逻辑
duplex._write = function (buf, enc, next) {// a, bprocess.stdout.write('_write ' + buf.toString() + '\n')next()
}// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))duplex.write('a')
duplex.write('b')duplex.end()

上面的代码中实现了_read方法,所以可以监听data事件来消耗Duplex产生的数据。
同时,又实现了_write方法,可作为下游去消耗数据。

因为它既可读又可写,所以称它有两端:可写端和可读端。
可写端的接口与Writable一致,作为下游来使用;可读端的接口与Readable一致,作为上游来使用。

Transform

在上面的例子中,可读流中的数据(0, 1)与可写流中的数据('a', 'b')是隔离开的,但在Transform中可写端写入的数据经变换后会自动添加到可读端。
Tranform继承自Duplex,并已经实现了_read_write方法,同时要求用户实现一个_transform方法。

'use strict'const Transform = require('stream').Transformclass Rotate extends Transform {constructor(n) {super()// 将字母旋转`n`个位置this.offset = (n || 13) % 26}// 将可写端写入的数据变换后添加到可读端_transform(buf, enc, next) {var res = buf.toString().split('').map(c => {var code = c.charCodeAt(0)if (c >= 'a' && c <= 'z') {code += this.offsetif (code > 'z'.charCodeAt(0)) {code -= 26}} else if (c >= 'A' && c <= 'Z') {code += this.offsetif (code > 'Z'.charCodeAt(0)) {code -= 26}}return String.fromCharCode(code)}).join('')// 调用push方法将变换后的数据添加到可读端this.push(res)// 调用next方法准备处理下一个next()}}var transform = new Rotate(3)
transform.on('data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world!')
transform.end()// khoor, zruog!

objectMode

前面几节的例子中,经常看到调用data.toString()。这个toString()的调用是必需的吗?
本节介绍完如何控制流中的数据类型后,自然就有了答案。

在shell中,用管道(|)连接上下游。上游输出的是文本流(标准输出流),下游输入的也是文本流(标准输入流)。在本文介绍的流中,默认也是如此。

对于可读流来说,push(data)时,data只能是StringBuffer类型,而消耗时data事件输出的数据都是Buffer类型。对于可写流来说,write(data)时,data只能是StringBuffer类型,_write(data)调用时传进来的data都是Buffer类型。

也就是说,流中的数据默认情况下都是Buffer类型。产生的数据一放入流中,便转成Buffer被消耗;写入的数据在传给底层写逻辑时,也被转成Buffer类型。

但每个构造函数都接收一个配置对象,有一个objectMode的选项,一旦设置为true,就能出现“种瓜得瓜,种豆得豆”的效果。

Readable未设置objectMode时:

const Readable = require('stream').Readableconst readable = Readable()readable.push('a')
readable.push('b')
readable.push(null)readable.on('data', data => console.log(data))

输出:

<Buffer 61>
<Buffer 62>

Readable设置objectMode后:

const Readable = require('stream').Readableconst readable = Readable({ objectMode: true })readable.push('a')
readable.push('b')
readable.push({})
readable.push(null)readable.on('data', data => console.log(data))

输出:

a
b
{}

可见,设置objectMode后,push(data)的数据被原样地输出了。此时,可以生产任意类型的数据。

系列文章

  • 第一部分:《Node.js Stream - 基础篇》,介绍Stream接口的基本使用。
  • 第二部分:《Node.js Stream - 进阶篇》,重点剖析Stream底层如何支持流式数据处理,及其back pressure机制。
  • 第三部分:《Node.js Stream - 实战篇》,介绍如何使用Stream进行程序设计。从BrowserifyGulp总结出两种设计模式,并基于Stream构建一个为Git仓库自动生成changelog的应用作为示例。

 

参考文献

  • GitHub,substack/browserify-handbook
  • GitHub,zoubin/streamify-your-node-program

来自:http://tech.meituan.com/stream-basics.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/283917.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Axure RP使用攻略--动态面板的用途(8)

写了几个Axure教程之后发现&#xff0c;可能教程的起点有些高了&#xff0c;过分的去讲效果的实现&#xff0c;而忽略了axure功能以及基础元件的使用&#xff0c;那么从这个教程开始&#xff0c;把这些逐渐的展开讲解。 关于动态面板 动态面板是axure原型制作中使用非常频繁的一…

ABP 6.0.0-rc.1的新特性

2022-07-26官方发布ABP 6.0.0-rc.1版本&#xff0c;本文挑选了几个新特性进行了介绍&#xff0c;主要包括LeptonX Lite默认主题、OpenIddict模块&#xff0c;以及如何将Identity Server迁移到OpenIddict。据ABP官方公众号介绍&#xff0c;ABP 6.0.0稳定版的计划发布日期为2022-…

Java并发包--线程池框架

转载请注明出处&#xff1a;http://www.cnblogs.com/skywang12345/p/3509903.html 线程池架构图 线程池的架构图如下&#xff1a; 1. Executor 它是"执行者"接口&#xff0c;它是来执行任务的。准确的说&#xff0c;Executor提供了execute()接口来执行已提交的 Runna…

c 试水解码jpeg图片比特流(已成功解码)

找到一张采用霍夫曼通用DC,AC编码表的图片&#xff0c;提取出此图片的比特流准备对它解码&#xff0c;再反推怎样编码。 下图是此图片比特流前100个字节。解码是每次读一字节&#xff0c;对这8比特解码&#xff0c;如8比特不能解码&#xff0c;再读入一字节。因为霍夫曼表最多…

Raft算法详解

Raft算法属于Multi-Paxos算法&#xff0c;它是在Multi-Paxos思想的基础上&#xff0c;做了一些简化和限制&#xff0c;比如增加了日志必须是连续的&#xff0c;只支持领导者、跟随者和候选人三种状态&#xff0c;在理解和算法实现上都相对容易许多 从本质上说&#xff0c;Raft算…

淘宝弹性布局方案lib-flexible研究

1. lib-flexible不能与响应式布局兼容 先说说响应式布局的一些基本认识&#xff1a; 响应式布局的表现是&#xff1a;网页通过css媒介查询判断可视区域的宽度&#xff0c;在不同的范围应用不同的样式&#xff0c;以便在不同尺寸的设备上呈现最佳的界面效果。典型的例子是&#…

[No0000DB]C# FtpClientHelper Ftp客户端上传下载重命名 类封装

using System; using System.Diagnostics; using System.IO; using System.Text; using Shared;namespace Helpers {public static class FileHelper{#region Methods/// <summary>/// 向文本文件的尾部追加内容/// </summary>/// <param name"filePa…

WPF效果第一百九十四篇之伸缩面板

前面一篇玩耍了一下登录实现效果;今天在原来的基础上来玩耍一下伸缩面板的效果;闲话不多扯直接看效果:1、关于前台简单布局:2、左侧面板伸缩动画&#xff1a;<Storyboard x:Key"ShowConfigSb"><ThicknessAnimationUsingKeyFrames Storyboard.TargetProperty…

你不知道的JavaScript(二)

第三章 原生函数 JS有很多原生函数&#xff0c;为基本的数据类型值提供了封装对象&#xff0c;String&#xff0c;Number&#xff0c;Boolean等。我们可以通过{}.call.toString()来查看所有typeof返回object的对象的内置属性[[class]],这个属性无法直接访问。我们基本类型调用的…

[转]guava快速入门

Guava工程包含了若干被Google的 Java项目广泛依赖 的核心库&#xff0c;例如&#xff1a;集合 [collections] 、缓存 [caching] 、原生类型支持 [primitives support] 、并发库 [concurrency libraries] 、通用注解 [common annotations] 、字符串处理 [string processing] 、I…

数据库编程1 Oracle 过滤 函数 分组 外连接 自连接

【本文谢绝转载原文来自http://990487026.blog.51cto.com】<大纲>数据库编程1 Oracle 过滤 函数 分组 外连接 自连接本文实验基于的数据表:winsows安装好Oracle11g之后,开始实验SQLplus 登陆 ORaclesqlplus 退出的方式查看用户之下有什么表查看表的所有记录&#xff0c;不…

【.NET 6】开发minimal api以及依赖注入的实现和代码演示

前言&#xff1a;.net 6 LTS版本发布已经有一段时间了。此处做一个关于使用.net 6 开发精简版webapi&#xff08;minimal api&#xff09;的入门教程演示。1、新建一个项目。此处就命名为 SomeExample:2、选择 .net6版本&#xff0c;并且此处先去掉HTTPS配置以及去掉使用控制器…

(转载)VS2010/MFC编程入门之四(MFC应用程序框架分析)

上一讲鸡啄米讲的是VS2010应用程序工程中文件的组成结构&#xff0c;可能大家对工程的运行原理还是很模糊&#xff0c;理不出头绪&#xff0c;毕竟跟C编程入门系列中的例程差别太大。这一节鸡啄米就为大家分析下MFC应用程序框架的运行流程。 一.SDK应用程序与MFC应用程序运行过…

个人博客开发-开篇

迈出第一步&#xff1a; 很久以前就有这个想法&#xff0c;自己动手开发一套个人博客系统&#xff0c;终于&#xff0c;现在开始迈出了第一步。做这件事一点是做一个有个人风格的博客系统&#xff0c;第二点是对做这件事所使用的技术栈进行学习&#xff0c;所谓最好的学习就是实…

2022年中国中小学教育信息化行业研究报告

教育信息化丨研究报告 核心摘要&#xff1a; 背景篇 目前&#xff0c;我国中小学教育主要呈现信息时代教育的特征&#xff0c;智能时代教育特征初露端倪&#xff1b;中小学教育信息化正从量变迈向质变&#xff0c;创新引领与生态变革成为行业纵深的主旋律&#xff1b; 2021年…

使用curl指令发起websocket请求

昨日的文章没指出websocket请求协商切换的精髓&#xff0c;删除重发。前文相关&#xff1a;• .NET WebSockets 核心原理初体验[1]• SignalR 从开发到生产部署避坑指南[2]tag&#xff1a;浏览器--->nginx--> server其中提到nginx默认不会为客户端转发Upgrade、Connectio…

Yii 2 的安装 之 踩坑历程

由于刚接触yii2 ,决定先装个试试&#xff1b;可是这一路安装差点整吐血&#xff0c;可能还是水平有限吧&#xff0c; 但还是想把这个过程分享出来&#xff0c;让遇到同样问题的同学有个小小的参考&#xff0c;好了言归正传&#xff01;&#xff01; <(~.~)> 下面是安装流…

设计模式之代理模式(上) 静态代理与JDK动态代理

2019独角兽企业重金招聘Python工程师标准>>> 代理模式 给某一个对象提供一个代理&#xff0c;并由代理对象控制对原对象的引用。静态代理 静态代理是由我们编写好的类&#xff0c;在程序运行之前就已经编译好的的类&#xff0c;此时就叫静态代理。 说理论还是比较懵…

mysql 分页查询

使用limit函数 limit关键字的用法&#xff1a; LIMIT [offset,] rows offset指定要返回的第一行的偏移量&#xff0c;rows第二个指定返回行的最大数目。初始行的偏移量是0(不是1)。转载于:https://www.cnblogs.com/xping/p/6703986.html