一.流的概念
stream 是數據集合,與陣列、字串差不多。但 stream 不一次性存取全部數據,而是一部分一部分發送/接收(chunk 式的),所以不必佔用那麼大塊記憶體,尤其適用於處理大量(外部)數據的場景
stream 具有管道(pipeline)特性,例如:
const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)
很多原生模組都是基於 stream 的,包括程序的 stdin/stdout/stderr :
例如常見的場景:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
其中 pipe 方法把可讀流的輸出(數據源)作為可寫流的輸入(目標),直接把讀檔案的輸出流作為輸入連接到 HTTP 回應的輸出流,從而避免把整個檔案讀入記憶體
P.S. 甚至日常使用的 console.log() 內部實作 也是 stream
二.流的類型
Node 中有 4 種基礎流:
- Readable
可讀流是對源的抽象,從中可以消耗數據,如 fs.createReadStream
- Writable
可寫流是對可寫入數據的目標的抽象,如 fs.createWriteStream
- Duplex(雙工)
雙工流既可讀又可寫,如 TCP socket
- Transform(轉換)
轉換流本質上是雙工流,用於在寫入和讀取數據時對其進行修改或轉換,如 zlib.createGzip 用 gzip 壓縮數據
轉換流可以看作一個輸入可寫流,輸出可讀流的函數
P.S. 有一種轉換流叫 (Pass)Through Stream(通過流),類似於 FP 中的 identity = x => x
三.管道
src.pipe(res) 要求源必須可讀,目標必須可寫,所以,如果是對雙工流進行管道傳輸,就可以像 Linux 的管道一樣鏈式呼叫:
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
pipe() 方法回傳目標流,所以:
// a (readable), b and c (duplex), and d (writable)
a.pipe(b).pipe(c).pipe(d)
// 等效於
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Linux 下,等效於
$ a | b | c | d
四.流與事件
事件驅動是 Node 在設計上的一個重要特點,很多 Node 原生物件都是基於事件機制( EventEmitter 模組)實作的,包括流( stream 模組):
Most of Node’s objects?—?like HTTP requests, responses, and streams?—?implement the EventEmitter module so they can provide a way to emit and listen to events.
所有 stream 都是 EventEmitter 實例,透過事件機制來讀寫數據,例如上面提到的 pipe() 方法相當於:
// readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});
P.S. pipe 還處理了一些別的事情,比如錯誤處理,EoF 以及某個流的速度較快/較慢的情況
Readable 與 Writable stream 的主要事件和方法如下:
Readable 的主要事件有:
-
data事件:stream 把一個 chunk 傳遞給使用者時觸發 -
end事件:再沒有要從 stream 中獲取(consume)的數據時觸發
Writable 的主要事件有:
-
drain事件,斷流了,這是 Writable stream 可以接收更多數據的信號 -
finish事件,當所有數據都已 flush 到下層系統時觸發
五.Readable stream 的兩種模式:Paused 與 Flowing
一個 Readable stream 要麼流動(Flowing)要麼暫停(Paused),也被稱為拉(pull)和推(push)兩種模式
建立出來後預設處於 Paused 狀態,可以透過 read() 方法讀取數據。如果處於 Flowing 狀態,數據會持續地流出來,此時只需要透過監聽事件來使用這些數據,如果沒有使用者的話,數據會丟失,所以都會監聽 Readable stream 的 data 事件,實際上監聽 data 事件會把 Readable stream 從 Paused 狀態切換到 Flowing,移除 data 事件監聽會再切回來。需要手動切換的話,可以透過 resume() 和 pause() 來做
使用 pipe() 方式時不用關心這些,都會自動處理妥當:
-
Readable 觸發
data事件,直到 Writable 忙不過來了 -
pipe收到信號後呼叫Readable.pause(),進入 Paused 模式 -
Writable 再幹一會兒壓力不大了的時候,會觸發
drain事件,此時pipe呼叫Readable.resume()進入 Flowing 模式,讓 Readable 接著觸發data事件
highWaterMark 與 backpressure
其實 drain 事件就是用來應對 Backpressure 現象 的,簡單地說,Backpressure 就是下游的消費速度限制了傳輸,造成下游向上游的反向壓力
如果消費速度慢於生產速度,會在下游產生堆積,來不及處理的數據會存放到 Writable 的 buffer 裡,如果不加(限流)處理,這個 buffer 會持續增長,可能溢出進而造成錯誤或數據丟失
Backpressure 現象發生的標誌是 Writable.write() 回傳了 false ,說明來自上游的待處理數據量已經觸及 highWaterMark(高水位線,預設 16kb):
Buffer level when stream.write() starts returning false. Defaults to 16384 (16kb), or 16 for objectMode streams.
這是下游開始有點緊張了(todo 項足夠忙一陣子了)的信號。建議在此時對上游限流,即呼叫 Readable.pause() 先給停了,給下游多點時間處理堆積的數據,下游覺得輕鬆了會觸發 darin 事件,表示此時有能力處理更多數據了,所以這時候應該開閘放水( Readable.resume() )
注意,Readable 的數據會存放在快取中,直到有個 Writable 來消耗這些數據。所以 Paused 狀態只是說不往下流了,已經快取的數據還在 Readable 的 buffer 裡。所以如果不限流,來不及處理的數據就快取在下游,並持續堆積,限流的話,這部分數據被快取在上游,因為限流了而不再持續堆積
另外,Readable 也有 highWaterMark 的概念:
The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams
是對從實際數據源讀取速度的限制(比如從磁碟讀檔案),防止生產速度太快引發快取堆積(比如一頓猛 push() )。所以 Flowing Readable 的正常工作方式是被 push() - push() - push() ...誒,發現 buffer 裡的量已經攢夠一個 chunk 了,吐給下游。同樣,Readable 觸及 highWaterMark 的標誌是 push() 回傳 false ,說明 Readable 的 buffer 不那麼十分空了,此時如果還持續 push() ,沒錯,也會出現 BackPressure(Readable 消費能力限制了從數據源到 Readable 的傳輸速度):
快-------------慢
數據源-------->Readable------->Writable
快--------------慢
只要上游(生產)快,下游(消費)慢就會出現 BackPressure,所以在 readable.pipe(writable) 的簡單場景,可能會出現上面兩段 BackPressure
六.範例
Writable stream
常見的造大檔案:
const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
透過 fs.createWriteStream() 建立指向檔案的 Writable stream,透過 write() 填充數據,寫完後 end()
或者更一般的,直接 new 一個 Writable :
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
// nowrap version
// process.stdout.write(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
一個最簡單的 echo 實作,把目前程序的標準輸入接到自定義輸出流 outStream ,像日誌中介軟體一樣(標準輸入流經 outStream ,再該幹嘛幹嘛去 callback ):
cc
oo
nn
ss
oo
ll
ee
Console {
log: [Function: bound consoleCall],
...
}
write() 方法的 3 個參數中, chunk 是個 Buffer, encoding 在某些場景下需要,大多數時候可以忽略, callback 是應該在 chunk 處理完畢後呼叫的通知函數,表明寫入成功與否(失敗的話,傳 Error 物件進去),類似於尾觸發機制中的 next()
或者更簡單的 echo 實作:
process.stdin.pipe(process.stdout);
直接把標準輸入流連接到標準輸出流
Readable stream
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);
透過 push 向 Readable stream 裡填充數據, push(null) 表示結束。上例中把所有數據都讀進來,然後才交給標準輸出,實際上有更高效的方式(按需推數據給使用者):
const { Readable } = require('stream');
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
read() 方法每次吐一個字元,使用者從 Readable stream 取數據的時候, read() 會持續觸發
Duplex/Transform stream
Duplex stream 兼具 Readable 和 Writable 的特點:既可以作為數據源(生產者),也可以作為目標(消費者)。例如:
const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
上例把前 2 個例子結合起來了, inoutStream 被連接到標準輸出流了, A-Z 會作為數據源傳遞給標準輸出(打印出來),同時標準輸入流被接到 inoutStream ,來自標準輸入的所有數據會被 log 出來,效果如下:
ABCDEFGHIJKLMNOPQRSTUVWXYZcc
oo
nn
ss
oo
ll
ee
Console {
log: [Function: bound consoleCall],
...
}
P.S. 先輸出 A-Z 是因為 pipe() 會把 Readable stream 切換到 Flowing 模式,所以一開始就把 A-Z 「流」出來了
注意, Duplex stream 的 Readable 與 Writable 部分是完全獨立的,讀寫互不影響, Duplex 只是把兩個特性組合成一個物件了,就像兩根筷子一樣綁在一起的單向管道
Transform stream 是一種有意思的 Duplex stream:其輸出是根據輸入計算得來的。所以不用分別實作 read/write() 方法,只實作一個 transform() 方法就夠了:
const { Transform } = require('stream');
const upperCaseTr = new Transform({
// 函數簽名與 write 一致
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
同樣, Transform stream 的 Readable 與 Writable 部分也是獨立的(不手動 push 就不會自動傳遞到 Readable 部分),只是形式上結合起來了
P.S. 另外, stream 之間除了可以傳遞 Buffer/String,還可以傳遞 Object(包括 Array),具體見 Streams Object Mode
Node 提供了一些原生 Transform stream,例如 zlib 和 crypto stream:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));
簡單的命令列工具, gzip 壓縮。更多範例見 Node’s built-in transform streams


暫無評論,快來發表你的看法吧