node那点事(二) -- Writable streams(可写流)、自定义流

可写流(Writable Stream)

可写流是对数据写入'目的地'的一种抽象。

可写流的原理其实与可读流类似,当数据过来的时候会写入缓存池,当写入的速度很慢或者写入暂停时候,数据流便会进入到队列池缓存起来,当然即使缓存池满了,剩余的数据也是存在内存

可写流的简单用法如下代码

let fs = require('fs');
let path = require('path');
let ws = fs.createWriteStream(path.join(__dirname,'1.txt'),{highWaterMark:3,autoClose:true,flags:'w',encoding:'utf8',mode:0o666,start:0,
}); 
let i = 9;
function write(){let flag = true;while(i>0&&flag){flag = ws.write(--i '','utf8',()=>{console.log('ok')});console.log(flag)}
}
write();
// drain只有当缓存区充满后 并且被消费后触发
ws.on('drain',function(){console.log('抽干')write();
});

实现原理

现在就让我们来实现一个简单的可写流,来研究可写流的内部原理,可写流有很多方法与可读流类似,这里不在重复了首先要有一个构造函数来定义一些基本选项属性,然后调用一个open放法打开文件,并且有一个destroy方法来处理关闭逻辑

let EventEmitter = require('events');
let fs = require('fs');class WriteStream extends EventEmitter {constructor(path,options) {super();this.path = path;this.highWaterMark = options.highWaterMark || 16 * 1024;this.autoClose = options.autoClose || true;this.mode = options.mode;this.start = options.start || 0;this.flags = options.flags || 'w';this.encoding = options.encoding || 'utf8';// 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中// 在源码中是一个链表 => []this.buffers = [];// 标识 是否正在写入this.writing = false;// 是否满足触发drain事件this.needDrain = false;// 记录写入的位置this.pos = 0;// 记录缓存区的大小this.length = 0;this.open();}destroy() {if (typeof this.fd !== 'number') {return this.emit('close');}fs.close(this.fd, () => {this.emit('close')});}open() {fs.open(this.path, this.flags, this.mode, (err,fd) => {if (err) {this.emit('error', err);if (this.autoClose) {this.destroy();}return;}this.fd = fd;this.emit('open');})}
}module.exports = WriteStream;

接着我们实现write方法来让可写流对象调用,在write方法中我们首先将数据转化为buffer,接着实现一些事件的触发条件的逻辑,如果现在没有正在写入的话我们就要真正的进行写入操作了,这里我们实现一个_write方法来实现写入操作,否则则代表文件正在写入,那我们就将流传来的数据先放在缓存区中,保证写入数据不会同时进行。

write(chunk,encoding=this.encoding,callback=()=>{}){chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);// write 返回一个boolean类型 this.length =chunk.length; let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小this.needDrain = !ret; // 是否需要触发needDrain// 判断是否正在写入 如果是正在写入 就写入到缓存区中if(this.writing){this.buffers.push({encoding,chunk,callback}); // []}else{// 专门用来将内容 写入到文件内this.writing = true;this._write(chunk,encoding,()=>{callback();this.clearBuffer();}); // 8}return ret;
}_write(chunk,encoding,callback){if(typeof this.fd !== 'number'){return this.once('open',()=>this._write(chunk,encoding,callback));}fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{this.length -= byteWritten;this.pos  = byteWritten;callback(); // 清空缓存区的内容});
}

_write写入之后的回调中我们会调用传入回调函数clearBuffer,这个方法会去buffers中继续递归地把数据取出,然后继续调用_write方法去写入,直到全部buffer中的数据取出后,这样就清空了buffers。

clearBuffer(){let buffer = this.buffers.shift();if(buffer){this._write(buffer.chunk,buffer.encoding,()=>{buffer.callback();this.clearBuffer()});}else{this.writing = false;if(this.needDrain){ // 是否需要触发drain 需要就发射drain事件this.needDrain = false;this.emit('drain');}}
}

最后附上完整的代码

let EventEmitter = require('events');
let fs = require('fs');
class WriteStream extends EventEmitter{constructor(path,options){super();this.path = path;this.highWaterMark = options.highWaterMark||16*1024;this.autoClose = options.autoClose||true;this.mode = options.mode;this.start = options.start||0;this.flags = options.flags||'w';this.encoding = options.encoding || 'utf8';// 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中// 在源码中是一个链表 => []this.buffers = [];// 标识 是否正在写入this.writing = false;// 是否满足触发drain事件this.needDrain = false;// 记录写入的位置this.pos = 0;// 记录缓存区的大小this.length = 0;this.open();}destroy(){if(typeof this.fd !=='number'){return this.emit('close');}fs.close(this.fd,()=>{this.emit('close')})}open(){fs.open(this.path,this.flags,this.mode,(err,fd)=>{if(err){this.emit('error',err);if(this.autoClose){this.destroy();}return}this.fd = fd;this.emit('open');})}write(chunk,encoding=this.encoding,callback=()=>{}){chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);// write 返回一个boolean类型 this.length =chunk.length; let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小this.needDrain = !ret; // 是否需要触发needDrain// 判断是否正在写入 如果是正在写入 就写入到缓存区中if(this.writing){this.buffers.push({encoding,chunk,callback}); // []}else{// 专门用来将内容 写入到文件内this.writing = true;this._write(chunk,encoding,()=>{callback();this.clearBuffer();}); // 8}return ret;}clearBuffer(){let buffer = this.buffers.shift();if(buffer){this._write(buffer.chunk,buffer.encoding,()=>{buffer.callback();this.clearBuffer()});}else{this.writing = false;if(this.needDrain){ // 是否需要触发drain 需要就发射drain事件this.needDrain = false;this.emit('drain');}}}_write(chunk,encoding,callback){if(typeof this.fd !== 'number'){return this.once('open',()=>this._write(chunk,encoding,callback));}fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{this.length -= byteWritten;this.pos  = byteWritten;callback(); // 清空缓存区的内容});}
}module.exports = WriteStream;

Pipe管道流

前面我们了解了可读流与可写流,那么怎么让二者结合起来使用呢,node给我们提供好了方法--Pipe管道,流顾名思义,就是在可读流与可写流中间加入一个管道,实现一边读取,一边写入,读一点写一点。

Pipe的使用方法如下

let fs = require('fs');
let path = require('path');
let ReadStream = require('./ReadStream');
let WriteStream = require('./WriteStream');let rs = new ReadStream(path.join(__dirname, './1.txt'), {highWaterMark: 4
});
let ws = new WriteStream(path.join(__dirname, './2.txt'), {highWaterMark: 1
});
// 4 1
rs.pipe(ws); 

实现原理

Pipe的原理比较简单,简单说监听可读流的data事件来持续获取文件中的数据,然后我们就会去调用写流的write方法。如果可写流缓存区已满,那么当我们得到调用可读流的pause方法来暂停读取,然后等到写流的缓存区已经全部写入并且触发drain事件时,我们就会调用resume重新开启读取的流程。上代码

pipe(ws) {this.on('data', (chunk) => {let flag = ws.write(chunk);if (!flag) {this.pause();}});ws.on('drain', () => {this.resume();})
}

自定义流

Node允许我们自定义流,读流继承于Readable接口,写流则继承于Writable接口,所以我们其实是可以自定义一个流模块,只要继承stream模块对应的接口即可。

自定义可读流

如果我们要自定义读流的话,那我们就需要继承Readable,Readable里面有一个read()方法,默认调用_read(),所以我们只要复写了_read()方法就可实现读取的逻辑,同时Readable中也提供了一个push方法,调用push方法就会触发data事件,push中的参数就是data事件回调函数的参数,当push传入的参数为null的时候就代表读流停止,上代码

let { Readable } = require('stream');// 想实现什么流 就继承这个流
// Readable里面有一个read()方法,默认掉_read()
// Readable中提供了一个push方法你调用push方法就会触发data事件
let index = 9;
class MyRead extends Readable {_read() {// 可读流什么时候停止呢? 当push null的时候停止if (index-- > 0) return this.push('123');this.push(null);}
}let mr = new MyRead();
mr.on('data', function(data) {console.log(data);
});

自定义可写流

与自定义读流类似,自定义写流需要继承Writable接口,并且实现一个_write()方法,这里注意的是_write中可以传入3个参数,chunk, encoding, callback,chunk就是代表写入的数据,通常是一个buffer,encoding是编码类型,通常不会用到,最后的callback要注意,它并不是我们用这个自定义写流调用write时的回调,而是我们上面讲到写流实现时的clearBuffer函数。

let { Writable } = require('stream');// 可写流实现_write方法
// 源码中默认调用的是Writable中的write方法
class MyWrite extends Writable {_write(chunk, encoding, callback) {console.log(chunk.toString());callback(); // clearBuffer}
}let mw = new MyWrite();
mw.write('111', 'utf8', () => {console.log(1);
})
mw.write('222', 'utf8', () => {console.log(1);
});

Duplex 双工流

双工流其实就是结合了上面我们说的自定义读流和自定义写流,它既能读也能写,同时可以做到读写之间互不干扰

let { Duplex } =  require('stream');// 双工流 又能读 又能写,而且读取可以没关系(互不干扰)
let d = Duplex({read() {this.push('hello');this.push(null);},write(chunk, encoding, callback) {console.log(chunk);callback();}
});d.on('data', function(data) {console.log(data);
});
d.write('hello');

Transform 转换流

转换流的本质就是双工流,唯一不同的是它并不需要像上面提到的双工流一样实现read和write,它只需要实现一个transform方法用于转换

let { Transform } =  require('stream');// 它的参数和可写流一样
let tranform1 = Transform({transform(chunk, encoding, callback) {this.push(chunk.toString().toUpperCase()); // 将输入的内容放入到可读流中callback();}
});
let tranform2 = Transform({transform(chunk, encoding, callback){console.log(chunk.toString());callback();}
});// 等待你的输入
// rs.pipe(ws);
// 希望将输入的内容转化成大写在输出出来
process.stdin.pipe(tranform1).pipe(tranform2);
// 对象流 可读流里只能放buffer或者字符串 对象流里可以放对象

对象流

默认情况下,流处理的数据是Buffer/String类型的值。对象流的特点就是它有一个objectMode标志,我们可以设置它让流可以接受任何JavaScript对象。上代码

const { Transform } = require('stream');let fs = require('fs');
let rs = fs.createReadStream('./users.json');rs.setEncoding('utf8');let toJson = Transform({readableObjectMode: true,transform(chunk, encoding, callback) {this.push(JSON.parse(chunk));callback();}
});let jsonOut = Transform({writableObjectMode: true,transform(chunk, encoding, callback) {console.log(chunk);callback();}
});
rs.pipe(toJson).pipe(jsonOut);

更多专业前端知识,请上 【猿2048】www.mk2048.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/366429.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第16章-使用Spring MVC创建REST API

1 了解REST 1.1 REST的基础知识 REST与RPC几乎没有任何关系。RPC是面向服务的&#xff0c;并关注于行为和动作&#xff1b;而REST是面向资源的&#xff0c;强调描述应用程序的事物和名词。 为了理解REST是什么&#xff0c;我们将它的首字母缩写拆分为不同的构成部分&#xf…

使用Apache Mahout创建在线推荐系统

最近&#xff0c; 我们一直在为Yap.TV实施推荐系统&#xff1a;在安装应用程序并转到“ Just for you”选项卡后&#xff0c;您可以看到它的运行情况。 我们以Apache Mahout为基础进行建议。 Mahout是一个“可扩展的机器学习库”&#xff0c;其中包含使用协作过滤算法的基于用户…

linux mono mysql_LJMM平台( Linux +Jexus+MySQL+mono) 上使用MySQL的简单总结

近准备把PDF.NET框架的开源项目“超市管理系统”移植到Linux上跑(演示地址&#xff1a;http://221.123.142.196)&#xff0c;使用Jexus服务器和MySQL数据库&#xff0c;相对使用SQLite而言&#xff0c;用MySQL问题比较多&#xff0c;但最后还是一一解决了&#xff0c;先总结如下…

node中的缓存机制

缓存是node开发中一个很重要的概念&#xff0c;它应用在很多地方&#xff0c;例如&#xff1a;浏览器有缓存、DNS有缓存、包括服务器也有缓存。 一、缓存作用 那缓存是为了做什么呢&#xff1f; 1.为了提高速度&#xff0c;提高效率。 2.减少数据传输&#xff0c;节省网费。 …

template里面要做数据渲染,但是数据还没有出来

<el-dialog title"企业详情" :visible.sync"showEditPayment" close"closeDialog" v-if"detail"><el-tabs type"border-card"><el-tab-pane label"客户信息"><el-row><el-col class&q…

《H5 移动营销设计指南》 读书笔记整理

一个前端工程师最近迷上了营销类的H5页面&#xff0c;被五花八门的H5页面迷的眼花缭乱&#xff0c;兴趣使然&#xff0c;于是买了一本《H5 营销设计指南》&#xff0c;看完以后对营销类的H5页面有了更深的理解&#xff0c;感觉很实在&#xff0c;所以参考读书笔记整理成PPT分享…

Stacktraces告诉了事实。 但事实并非如此。

我们公司致力于使软件错误的原因对开发人员和运营透明。 与替代解决方案相反&#xff0c; 我们将问题的位置浮出水面&#xff0c;使您指向源代码中的恶意行。 即使我们目前以检测内存泄漏的能力而闻名&#xff0c;但我们也正在扩展到其他领域。 为了给您一些有关我们研究方向的…

mysql-plus多数据库_IDEA项目搭建九——MybatisPlus多数据库实现

一、简介MybatisPlus中引用多数据库时&#xff0c;传统的配置就失效了&#xff0c;需要单独写配置来实现&#xff0c;下面就说一下具体应该如何操作二、引入MybatisPlus多数据源配置还是先看一下我的项目结构&#xff0c;Model是单独的模块&#xff0c;请自行创建1、创建一个Ma…

写一个函数的程序,判断是否是浮点数

算法&#xff1a; 0.先把小数&#xff0c;转换成str类型&#xff0c;才能调以下方法判断&#xff1b; 1.先判断数值中&#xff0c;是否有小数点&#xff0c;用count计数器&#xff1b; 2.是小数的&#xff0c;需要以‘.’分割小数&#xff1b; 3.小数点左侧若是负数&#xff0c…

数字逻辑基础篇1

1. 双阈值准则在模拟条件下&#xff0c;假设点亮灯泡需要1.7V以上电压。抽象为数字电路&#xff0c;可以认为&#xff1a; U>1.7V U1 U<1.7V U0 这种条件称之为单阈值&#xff08;1.7&#xff09;&#xff0c;但是单阈值导致的问题是&#xff1a; 电压在1.7V附近…

Neo4j:在Neo4j浏览器的帮助下探索新数据集

当我查看一个新的Neo4j数据库时&#xff0c;发现困难之一是确定其中包含的数据的结构。 我习惯于关系数据库&#xff0c;在该数据库中您可以轻松地获取表列表和外键&#xff0c;从而使它们彼此连接。 传统上&#xff0c;使用Neo4j时很难做到这一点&#xff0c;但是随着Neo4j浏…

V8 —— 你需要知道的垃圾回收机制

前言V8 blog近日发布了文章描述了“并发标记”的新技术&#xff0c;提升标记过程的效率。并发标记是一个主要用新的平行和并发的垃圾收集器替换旧的垃圾回收器的项目&#xff0c;现在Chrome 64和Node.js v10已经默认启用并发标记。讲解之前我们先回顾一下基本知识点。基本概念 …

词法分析器java_Java代码到底是如何编译成机器指令的。

原文地址&#xff1a;https://mp.weixin.qq.com/s/XH-JajAne0O7_yCYE5wBbg作者&#xff1a;Hollis在《Java代码的编译与反编译》中&#xff0c;有过关于Java语言的编译和反编译的介绍。我们可以通过javac命令将Java程序的源代码编译成Java字节码&#xff0c;即我们常说的class文…

python中的PEP是什么?怎么理解?(转)

PEP是什么&#xff1f; PEP的全称是Python Enhancement Proposals&#xff0c;其中Enhancement是增强改进的意思&#xff0c;Proposals则可译为提案或建议书&#xff0c;所以合起来&#xff0c;比较常见的翻译是Python增强提案或Python改进建议书。 我个人倾向于前一个翻译&…

Java方法中的参数太多,第6部分:方法返回

在当前的系列文章中&#xff0c;我正在致力于减少调用Java方法和构造函数所需的参数数量&#xff0c;到目前为止&#xff0c;我一直专注于直接影响参数本身的方法&#xff08; 自定义类型 &#xff0c; 参数对象 &#xff0c; 构建器模式 &#xff0c; 方法重载和方法命名 &…

2017前端技术大盘点

前言 临近2017的尾声&#xff0c;总是希望来盘点一下这一年中前端的发展。到目前为止&#xff0c;前端的井喷期也快临近尾声了。并不像几年前一样&#xff0c;总是会有层出不穷的新东西迸发出来。同时&#xff0c;前端技术也慢慢的趋于稳固&#xff0c;自成一套体系。如果你喜…

jenkins pipeline api获取stage的详细信息_Jenkins + Docker 助力 Serverless 应用构建与部署...

本文来源&#xff1a; ServerlessLife 公众号近日&#xff0c;使用 Serverless 开发了一个应用。其中 CI/CD&#xff0c;是需要考虑的一个问题。这里用到了 Jenkins 和 Docker。并且 Jenkins Pipeline 运行在容器中。本文将介绍如何使用 Jenkins 和 Docker 构建并部署 Serverle…

BZOJ 1305 [CQOI2009]dance跳舞

这是一道最大流的模版题 一定要记住不能开出来重点呀 #include <queue> #include <cstdio> #include <cstring> #include <iostream> #include <algorithm> using namespace std; const int MAXN205; const int MAXM6005; const int inf0x3f3f3f…

项目本地部署

1.将数据库导出&#xff0c;并导入到本地 exp dgpdg/pass192.168.1.33/ORCL fileD:\gd_base.dmp logD:\gd_base.log&#xff08;不要加fully&#xff0c;会把整个数据库下所有用户的表倒下来&#xff09; imp dgpdg/pass127.0.0.1/orcl file"D:\gd_base.dmp" log&quo…

Java方法中的参数太多,第7部分:可变状态

在我的系列文章的第七篇中&#xff0c;有关解决Java方法或构造函数中过多参数的问题 &#xff0c;我着眼于使用状态来减少传递参数的需要。 我等到本系列的第七篇文章来解决这个问题的原因之一是&#xff0c;它是我最不喜欢的减少传递给方法和构造函数的参数的方法之一。 也就是…