博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
流,理解流,简单的自己模拟的流
阅读量:6247 次
发布时间:2019-06-22

本文共 10570 字,大约阅读时间需要 35 分钟。

众所周知,NODEJS在高并发,I/O密集型应用操作的时候有很多优势,而这些都脱离不了“流”的支撑。请求流,响应流,文件流,Socket流,甚至console模块都使用了流。而流的实现,尤其是其内部的实现,在整个NODEJS的学习中就很有学习的必要。

流的模型可以总结为“生产者,消费者”模型。流的一端生产数据(可以理解为从水龙头放水),另一端消费数据(水放出来后你干嘛用不管,消费就好了)。当然,其内部代码实现会考虑很多细节的地方,这些可以通过调试进入源码查看很多细节的地方。

在源码中跟流相关的模块有:

  • lib/module.js
  • lib/stream_readable.js
  • lib/stream_writable.js
  • lib/stream_transform.js
  • lib/stream_duplex.js 源码非常清晰,这就对应流的四种类型,Readable流,Writable流,Transform流,Duplex流。 其中Readable和Writable是重点,这两个搞明白,Transform和Duplex就比较简单了。

Readable Stream Readable Stream有两种模式,一种是Flowing Mode,流动模式;另外一种是Paused Mode,暂停模式。 切换到流动模式的方式有:

  • 监听data时间 rs.on("data", (chunk)=>{});
  • 调用stream.resume方法
  • 调用stream.pipe方法将数据发送给writable stream

切换到暂停模式的方法有:

  • 调用stream.pause方法
  • 如果存在管道,调用stream.unpipe方法

tip: 这两种流是可以随时切换的

  • 流动模式和暂停模式有什么区别,为什么要这么设计。

流动模式就是像流水一样源源不断的读取数据(注意不是到缓存对象),不管你消费不消费。 暂停模式可以暂时不读取数据,关闭水龙头。

有一个用的比较多的词语叫背压,一般来说,读的速度会比写入的速度快,如果不暂停,还是源源不断的读取数据,会造成内存过大,消耗性能,这个时候最好的方式是消费多少,读取多少,由此引申出管道的概念。

最好的管道就是生产和消费同步,如果读的过快,先暂停读取,有需要再通知读取即可。

从上面的语义描述可以看出,流是基于事件的,事件在这里承担着消息的注册和发送。

上面的都是概念相关,现在来一个简单版的可读流,可写流。代码虽然简化了很多,但是对于理解整个流的流程,会非常有帮助。

先来看流动模式的可读流

let EventEmitter = require('events');let fs = require('fs');class ReadStream extends EventEmitter {    constructor(path, options) {        super(path, options);        this.path = path;        this.flags = options.flags || 'r';        this.mode = options.mode || 0o666;        this.highWaterMark = options.highWaterMark || 64 * 1024;        this.pos = this.start = options.start || 0;        this.end = options.end;        this.encoding = options.encoding;        this.flowing = null;        this.buffer = Buffer.alloc(this.highWaterMark);        this.open();//准备打开文件读取        //当给这个实例添加了任意的监听函数时会触发newListener        this.on('newListener',(type,listener)=>{            //如果监听了data事件,流会自动切换的流动模式            if(type == 'data'){              this.flowing = true;              this.read();            }        });    }    read(){        if(typeof this.fd != 'number'){            return this.once('open',()=>this.read());        }        let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;        //this.buffer并不是缓存区        console.log('howMuchToRead',howMuchToRead);        fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{
//bytes是实际读到的字节数 if(err){ if(this.autoClose) this.destroy(); return this.emit('error',err); } if(bytes){ let data = this.buffer.slice(0,bytes); this.pos += bytes; data = this.encoding?data.toString(this.encoding):data; this.emit('data',data); if(this.end && this.pos > this.end){ return this.endFn(); }else{ if(this.flowing) this.read(); } }else{ return this.endFn(); } }) } endFn(){ this.emit('end'); this.destroy(); } open() { fs.open(this.path,this.flags,this.mode,(err,fd)=>{ if(err){ if(this.autoClose){ this.destroy(); return this.emit('error',err); } } this.fd = fd; this.emit('open'); }) } destroy(){ fs.close(this.fd,()=>{ this.emit('close'); }); } pipe(dest){ this.on('data',data=>{ let flag = dest.write(data); if(!flag){ this.pause(); } }); dest.on('drain',()=>{ this.resume(); }); } //可读流会进入流动模式,当暂停的时候, pause(){ this.flowing = false; } resume(){ this.flowing = true; this.read(); }}module.exports = ReadStream;复制代码

暂停模式的可读流

let fs = require('fs');let EventEmitter = require('events');class ReadStream extends EventEmitter {    constructor(path, options) {        super(path, options);        this.path = path;        this.highWaterMark = options.highWaterMark || 64 * 1024;        this.buffer = Buffer.alloc(this.highWaterMark);        this.flags = options.flags || 'r';        this.encoding = options.encoding;        this.mode = options.mode || 0o666;        this.start = options.start || 0;        this.end = options.end;        this.pos = this.start;        this.autoClose = options.autoClose || true;        this.bytesRead = 0;        this.closed = false;        this.flowing;        this.needReadable = false;        this.length = 0;        this.buffers = [];        this.on('end', function () {            if (this.autoClose) {                this.destroy();            }        });        this.on('newListener', (type) => {            if (type == 'data') {                this.flowing = true;                this.read();            }            if (type == 'readable') {                this.read(0);            }        });        this.open();    }    open() {        fs.open(this.path, this.flags, this.mode, (err, fd) => {            if (err) {                if (this.autoClose) {                    this.destroy();                    return this.emit('error', err);                }            }            this.fd = fd;            this.emit('open');        });    }    read(n) {        if (typeof this.fd != 'number') {            return this.once('open', () => this.read());        }        n = parseInt(n, 10);        if (n != n) {            n = this.length;        }        if (this.length == 0)            this.needReadable = true;        let ret;        if (0 < n < this.length) {            ret = Buffer.alloc(n);            let b;            let index = 0;            while (null != (b = this.buffers.shift())) {                for (let i = 0; i < b.length; i++) {                    ret[index++] = b[i];                    if (index == ret.length) {                        this.length -= n;                        b = b.slice(i + 1);                        this.buffers.unshift(b);                        break;                    }                }            }            if (this.encoding) ret = ret.toString(this.encoding);        }        let _read = () => {            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {                if (err) {                    return                }                let data;                if (bytesRead > 0) {                    data = this.buffer.slice(0, bytesRead);                    this.pos += bytesRead;                    this.length += bytesRead;                    if (this.end && this.pos > this.end) {                        if (this.needReadable) {                            this.emit('readable');                        }                        this.emit('end');                    } else {                        this.buffers.push(data);                        if (this.needReadable) {                            this.emit('readable');                            this.needReadable = false;                        }                    }                } else {                    if (this.needReadable) {                        this.emit('readable');                    }                    return this.emit('end');                }            })        }        if (this.length == 0 || (this.length < this.highWaterMark)) {            _read(0);        }        return ret;    }    destroy() {        fs.close(this.fd, (err) => {            this.emit('close');        });    }    pause() {        this.flowing = false;    }    resume() {        this.flowing = true;        this.read();    }    pipe(dest) {        this.on('data', (data) => {            let flag = dest.write(data);            if (!flag) this.pause();        });        dest.on('drain', () => {            this.resume();        });        this.on('end', () => {            dest.end();        });    }}module.exports = ReadStream;复制代码

可写流

let fs = require('fs');let EventEmitter = require('events');class WriteStream extends EventEmitter {    constructor(path, options) {        super(path, options);        this.path = path;        this.flags = options.flags || 'w';        this.mode = options.mode || 0o666;        this.start = options.start || 0;        this.pos = this.start;//文件的写入索引        this.encoding = options.encoding || 'utf8';        this.autoClose = options.autoClose;        this.highWaterMark = options.highWaterMark || 16 * 1024;        this.buffers = [];//缓存区        this.writing = false;//表示内部正在写入数据        this.length = 0;//表示缓存区字节的长度        this.open();    }    open() {        fs.open(this.path, this.flags, this.mode, (err, fd) => {            if (err) {                if (this.autoClose) {                    this.destroy();                }                return this.emit('error', err);            }            this.fd = fd;            this.emit('open');        });    }    //如果底层已经在写入数据的话,则必须当前要写入数据放在缓冲区里    write(chunk, encoding, cb) {        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);        let len = chunk.length;        //缓存区的长度加上当前写入的长度        this.length += len;        //判断当前最新的缓存区是否小于最高水位线        let ret = this.length < this.highWaterMark;        if (this.writing) {
//表示正在向底层写数据,则当前数据必须放在缓存区里 this.buffers.push({ chunk, encoding, cb }); } else {
//直接调用底层的写入方法进行写入 //在底层写完当前数据后要清空缓存区 this.writing = true; this._write(chunk, encoding, () => this.clearBuffer()); } return ret; } clearBuffer() { //取出缓存区中的第一个buffer //8 7 let data = this.buffers.shift(); if(data){ this._write(data.chunk,data.encoding,()=>this.clearBuffer()) }else{ this.writing = false; //缓存区清空了 this.emit('drain'); } } _write(chunk, encoding, cb) { if(typeof this.fd != 'number'){ return this.once('open',()=>this._write(chunk, encoding, cb)); } fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{ if(err){ if(this.autoClose){ this.destroy(); this.emit('error',err); } } this.pos += bytesWritten; //写入多少字母,缓存区减少多少字节 this.length -= bytesWritten; cb && cb(); }) } destroy() { fs.close(this.fd, () => { this.emit('close'); }) }}module.exports = WriteStream;复制代码

转载地址:http://ocmia.baihongyu.com/

你可能感兴趣的文章
作业四:个人项目---小学四则运算
查看>>
漂亮的按钮样式-button
查看>>
post请求方式的翻页爬取内容及思考
查看>>
VC++ MFC如何生成一个可串行化的类
查看>>
php 变量引用,函数引用
查看>>
NET生成缩略图
查看>>
微软企业库5.0 学习之路——第二步、使用VS2010+Data Access模块建立多数据库项目...
查看>>
渗流稳定性分析(MATLAB实现)
查看>>
POJ2253 Frogger(最短路径)
查看>>
动画总结?
查看>>
HDU 2044 一只小蜜蜂 *
查看>>
Java 斜杠 与 反斜杠
查看>>
垂直居中
查看>>
idea下maven项目,样式css、js更新后,页面不显示更新内容
查看>>
bzoj 1001 平面图转对偶图 最短路求图最小割
查看>>
php 记住密码自动登录
查看>>
NSThread创建线程的三种方法
查看>>
Logger.getLogger与LogFactory.getLog
查看>>
HDU4671 Backup Plan(构造序列-多校七)
查看>>
一些难得一见的代码问题
查看>>