nodejs中stream(流)学习分享

为什么需要stream

做为一个前端猿,一直对后端每天可以任性的操作服务器文件感到羡慕。最近学习node的fs模块玩的也是意犹未尽好不满足。fs.readFile方法在读取大文件时无法胜任,所以流出现了。

stream(流)

我的理解
我们在网上下载资源时,稍微大一点就要下载个把小时。我们的带宽远远不能满足从服务端把资源整个端走,所以要把资源拆分成小块,一块一块的运输。资源就像水一样流到了我们本地。

While it is important for all Node.js users to understand how streams work.(官网说流很重要!)

流的类型

nodejs有四种基本的流类型:

  • 1 Readable 可读流
  • 2 Writable 可写流
  • 3 Duplex 可读可写流
  • 4 Transform 在读写过程中可以修改和变换数据的Duplex流

1.Readable

可读流的功能是作为上游,提供数据给下游。

创建与使用

let {Readable} = require('stream');
let readable = Readable();
/*
Readable({read(){})
*/
let source = ['a','b','c'];
readable.setEncoding('utf8');
readable._read = function () {
    let data = source.shift()||null;
    console.log('read:',data);
    this.push(data);
}
readable.on('end', function () {
  console.log('end')
})
readable.on('data', function (data) {
    console.log(data)
})
/*
输出:
read: a
read: b
a
read: c
b
read: null
c
end
*/

Readable通过实例_read或read方法,在需要数据时,_read()方法会自动调用。

  • _read方法结束的标志是this.push(null)。
  • 当readable绑定data时_read()方法自动调用

data事件:当读入数据时触发data事件传入回调读到内容。
end事件:“消耗完”,需要满足两个条件:

  • 已经调用push(null),声明不会再有任何新的数据产生
  • 缓存中的数据也被读取完

两种模式(flowing 和 paused)

流动(flowing)模式

flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。
以下条件均可以使readable进入flowing模式:

  • 调用resume方法
  • 如果之前未调用pause方法进入paused模式,则监听data事件也会调用resume方法。
  • readable.pipe(writable)。pipe中会监听data事件。

暂停(paused)模式

paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。
可读流可以通过下面途径切换到 paused 模式:

  • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
  • 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

Readable流程

  1. 通过read或_read方法读入数据
  2. 将数据push到当前对象
  3. 触发emit data方法

深入理解readable
众所周知node i/o操作通常采用异步操作。在读取数据时会把数据写入缓存区。

stream-how-data-comes-out.png

read方法

在创建流时,会设置一个highWaterMark参数创建最高水位线。

stream-read.png

自定义可读流

var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+'');
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on('data', function(data){
    console.log("读到数据: " + data.toString());//no maybe
});
counter.on('end', function(data){
    console.log("读完了");
});

2.Writable

可写流是对数据写入'目的地'的一种抽象。可写流的功能是作为下游,消耗上游提供的数据。

let {Writable} = require('stream');
var writable = Writable({
    write: function (data,_,next) {
        console.log(data);
        next&&next();
    }
})
/*
Writable._write(data,_,next)
*/
writable.write('a');
writable.write('b');
writable.write('c');
writable.end();
/*
<Buffer 61>
<Buffer 62>
<Buffer 63>
*/

write方法

  • write()或_write()的第三个参数next为回调函数,调用next()表示写入完成,开始写下一个数据。
  • 必须调用end()方法来告诉writable,所有数据均已写入。

自定义可写流

var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString('utf8'));
        console.log("增加: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("项目:" + i, 'utf8');
}
w.end("结束写入",function(){
    console.log(stock);
});

3.管道流

从readable读出数据,writeable写入数据

创建与使用

const stream = require('stream')

var index = 0;
const readable = stream.Readable({
    highWaterMark: 2,
    read: function () {
        process.nextTick(() => {
            console.log('push', ++index)
            this.push(index+'');
        })
    }
})

const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, next) {
        console.log('写入:', chunk.toString())
    }
})

readable.pipe(writable);

4.Duplex

const {Duplex} = require('stream');
const inoutStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    },
    read(size) {
        this.push((++this.index)+'');
        if (this.index > 3) {
            this.push(null);
        }
    }
});

inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);

5.Transform

const {Transform} = require('stream');

const upperCase = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});

process.stdin.pipe(upperCase).pipe(process.stdout);
/*
实现大小写转换
inpt: a
A
*/

最近在学习nodejs,经过整理写下学习笔记。欢迎同学沟通学习心得,如有错误的地方也欢迎指正。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,042评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,996评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,674评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,340评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,404评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,749评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,902评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,662评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,110评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,451评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,577评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,258评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,848评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,726评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,952评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,271评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,452评论 2 348

推荐阅读更多精彩内容

  • stream 流是一个抽象接口,在 Node 里被不同的对象实现。例如 request to an HTTP se...
    明明三省阅读 3,398评论 1 10
  • 什么流 通俗的说就是一种,有起点和终点的字节数据传输手段,把数据从一个地方传到另一个地方。流(Stream)是一个...
    JOKER_HAN阅读 1,135评论 0 3
  • 流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流...
    宫若石阅读 544评论 0 0
  • Buffer Buffer的构成 Buffer对象类似数组,它的元素位16进制的两位数,即0到255的数值。主要是...
    人失格阅读 1,810评论 0 0
  • 为什么应该使用流 你可能看过这样的代码。 这段代码中,服务器每收到一次请求,就会先把data.txt读入到内存中,...
    饥人谷_xxxxx阅读 10,795评论 1 12