メインコンテンツへ移動

Node.js におけるストリーム

無料2018-02-10#Node#nodejs stream#nodejs的流#nodejs stream highWaterMark#nodejs backpressure

Node プロセス間通信とストリームにはどのような関係があるのでしょうか?

1. ストリームの概念

ストリーム(stream)は、配列や文字列と同様にデータの集合です。しかし、ストリームはすべてのデータに一度にアクセスするのではなく、一部ずつ(チャンク単位で)送信または受信します。そのため、膨大なメモリを占有する必要がなく、特に大量の(外部)データを処理するシナリオに適しています。

ストリームはパイプライン(pipeline)の特性を持っています。例えば:

const grep = ... // grep 出力用のストリーム
const wc = ... // wc 入力用のストリーム
grep.pipe(wc)

多くの組み込みモジュールがストリームに基づいており、プロセスの stdin/stdout/stderr も含まれます:

native-stream-module

例えば、よくあるシナリオは以下の通りです:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

ここで pipe メソッドは、読み取り可能ストリームの出力(データソース)を書き込み可能ストリームの入力(ターゲット)として扱います。ファイルの読み取りストリームを HTTP レスポンスの出力ストリームに直接接続することで、ファイル全体をメモリに読み込むことを避けています。

P.S. 日常的に使用する console.log()内部実装 も実はストリームです。

2. ストリームのタイプ

Node.js には4つの基本ストリームがあります:

  • Readable(読み取り可能)

    ソースの抽象化であり、そこからデータを消費できます。例:fs.createReadStream

  • Writable(書き込み可能)

    データを書き込むターゲットの抽象化です。例:fs.createWriteStream

  • Duplex(双方向)

    読み書きの両方が可能です。例:TCP ソケット。

  • Transform(変換)

    本質的には双方向ストリームですが、データの書き込みや読み取り時にデータを修正または変換するために使用されます。例:zlib.createGzip によるデータの Gzip 圧縮。

    変換ストリームは、書き込み可能ストリームを入力とし、読み取り可能ストリームを出力する関数と見なすことができます。

    P.S. 変換ストリームの一種に (Pass)Through Stream(スルー・ストリーム)があり、これは関数型プログラミングにおける identity = x => x に似ています。

3. パイプ(Pipe)

src.pipe(res) は、ソースが読み取り可能であり、ターゲットが書き込み可能であることを要求します。そのため、双方向ストリームに対してパイプ転送を行う場合は、Linux のパイプのようにメソッドチェーンで呼び出すことができます:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

pipe() メソッドはターゲットストリームを返すため、以下のようになります:

// a (readable), b and c (duplex), and d (writable)
a.pipe(b).pipe(c).pipe(d)
// 以下と同等です
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Linux では以下と同等です
$ a | b | c | d

4. ストリームとイベント

イベント駆動は Node.js の設計上の重要な特徴です。多くの組み込みオブジェクトはイベントメカニズム(EventEmitter モジュール)に基づいて実装されており、ストリーム(stream モジュール)もその一つです:

Most of Node’s objects?—?like HTTP requests, responses, and streams?—?implement the EventEmitter module so they can provide a way to emit and listen to events.

すべてのストリームは EventEmitter のインスタンスであり、イベントメカニズムを通じてデータの読み書きを行います。例えば、前述の pipe() メソッドは以下と実質的に同じです:

// readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});

P.S. pipe は他にも、エラー処理、EoF(ファイル末尾)、およびストリーム間の速度差(一方が速すぎる/遅すぎる場合)などの処理も行っています。

Readable と Writable ストリームの主なイベントとメソッドは以下の通りです:

stream-events-and-function

Readable の主なイベント:

  • data イベント:ストリームがチャンク(chunk)をコンシューマ(利用者)に渡すときに発生します。

  • end イベント:ストリームから取得(消費)すべきデータがなくなったときに発生します。

Writable の主なイベント:

  • drain イベント:書き込みバッファが空になり、Writable ストリームがさらにデータを受け取れるようになった合図です。

  • finish イベント:すべてのデータが下層のシステムにフラッシュ(書き出し)されたときに発生します。

5. Readable ストリームの2つのモード:Paused と Flowing

Readable ストリームは、Flowing(流動)モードか Paused(一時停止)モードのいずれかの状態にあります。これは「プル(pull)」モードと「プッシュ(push)」モードとも呼ばれます。

作成直後はデフォルトで Paused 状態にあり、read() メソッドを使用してデータを読み取ることができます。Flowing 状態になるとデータは継続的に流れ出し、イベントをリッスンすることでそのデータを利用します。コンシューマがいない場合、データは失われます。そのため、通常は Readable ストリームの data イベントをリッスンします。実際、data イベントをリッスンすると Readable ストリームは Paused 状態から Flowing 状態に切り替わり、リスナーを削除すると元に戻ります。手動で切り替えるには resume()pause() を使用します。

pipe() を使用する場合、これらは自動的に適切に処理されるため気にする必要はありません:

  1. Readable が data イベントを発生させ続け、Writable が処理しきれなくなるまで続けます。

  2. pipe が信号を受け取ると Readable.pause() を呼び出し、Paused モードに入ります。

  3. Writable が溜まったデータを処理して余裕ができると、drain イベントが発生します。このとき pipeReadable.resume() を呼び出して Flowing モードに戻し、Readable が再び data イベントを発生させるようにします。

highWaterMark と backpressure

実は drain イベントは Backpressure(背圧)現象 に対応するためのものです。簡単に言えば、Backpressure とは下流の消費速度が転送速度を制限し、下流から上流へとかかる逆方向の圧力のことです。

消費速度が生産速度よりも遅い場合、下流でデータが滞留します。処理しきれないデータは Writable のバッファに保存されます。これに対処(流量制限)しないと、バッファは増大し続け、オーバーフローによるエラーやデータの消失を招く可能性があります。

Backpressure 現象が発生した合図は Writable.write()false を返すことです。これは上流からの未処理データ量が highWaterMark(高水位線、デフォルトは 16kb)に達したことを意味します:

Buffer level when stream.write() starts returning false. Defaults to 16384 (16kb), or 16 for objectMode streams.

これは下流が手一杯になり始めた(しばらく忙しくなる)という合図です。この時点で上流の流量を制限すること、つまり Readable.pause() を呼び出して一時停止させ、下流に滞留データを処理する時間を与えることが推奨されます。下流に余裕ができると drain イベントが発生し、再びデータを受け入れ可能であることを示します。このとき、水門を開けて放水(Readable.resume())すべきです。

注意点として、Readable のデータは Writable によって消費されるまでキャッシュに保持されます。Paused 状態は「これ以上流さない」ということであり、すでにキャッシュされたデータは Readable のバッファに残ります。流量制限を行わない場合、処理しきれないデータは下流でキャッシュされ続け、蓄積していきます。流量制限を行えば、そのデータは上流でキャッシュされ、制限によってそれ以上蓄積されることはありません。

また、Readable にも highWaterMark の概念があります:

The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams

これは実際のリソース(ディスクからの読み取りなど)からの読み取り速度に対する制限であり、生産速度が速すぎてキャッシュが溜まりすぎるのを防ぎます(例えば猛烈な勢いで push() するなど)。Flowing Readable の正常な動作は、push() が繰り返され、バッファが1チャンク分溜まったら下流に吐き出す、というものです。同様に Readable が highWaterMark に達した合図は push()false を返すことです。これは Readable のバッファに余裕がなくなったことを示しており、この状態で push() を続けると、ここでも Backpressure が発生します(Readable の消費能力が、データソースから Readable への転送速度を制限します):

  速-------------遅
データソース-------->Readable------->Writable
                 速--------------遅

上流(生産)が速く、下流(消費)が遅ければ常に Backpressure が発生します。そのため、readable.pipe(writable) という単純な構成でも、上記の2箇所で Backpressure が発生する可能性があります。

6. 例

Writable ストリーム

大きなファイルを作成する例:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

fs.createWriteStream() でファイルへの Writable ストリームを作成し、write() でデータを流し込み、最後に end() を呼び出します。

あるいは、より一般的に Writablenew する方法:

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    // 改行なしバージョン
    // process.stdout.write(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

最もシンプルな echo の実装です。現在のプロセスの標準入力を自作の出力ストリーム outStream に繋いでいます。ログ用ミドルウェアのように機能します(標準入力が outStream を通り、本来の処理を続けるために callback を呼びます):

cc
oo
nn
ss
oo
ll
ee

Console {
  log: [Function: bound consoleCall],
  ...
}

write() メソッドの3つの引数のうち、chunk は Buffer、encoding は特定の場面で必要ですが多くの場合無視できます。callback はチャンクの処理が完了した後に呼び出すべき通知関数で、書き込みの成否(失敗時は Error オブジェクトを渡す)を示します。これはテールトリガー・メカニズムにおける next() に似ています。

さらにシンプルな echo の実装:

process.stdin.pipe(process.stdout);

標準入力ストリームを標準出力ストリームに直接接続します。

Readable ストリーム

const { Readable } = require('stream'); 
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // これ以上のデータなし
inStream.pipe(process.stdout);

push を通じて Readable ストリームにデータを流し込み、push(null) で終了を知らせます。上の例ではすべてのデータを一気に読み込んでから標準出力に渡していますが、実際にはより効率的な方法(コンシューマの要求に応じてデータをプッシュする)があります:

const { Readable } = require('stream'); 
const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

read() メソッドが毎回1文字ずつ吐き出し、コンシューマが Readable ストリームからデータを取得するたびに read() が継続的にトリガーされます。

Duplex/Transform ストリーム

双方向(Duplex)ストリームは Readable と Writable の両方の特徴を併せ持ちます。データソース(生産者)にもターゲット(消費者)にもなれます。例えば:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

この例は前の2つの例を組み合わせたものです。inoutStream は標準出力ストリームに接続されており、A-Z がデータソースとして標準出力に渡されプリントされます。同時に標準入力ストリームが inoutStream に繋がれ、標準入力からのすべてのデータが log として出力されます。結果は以下の通りです:

ABCDEFGHIJKLMNOPQRSTUVWXYZcc
oo
nn
ss
oo
ll
ee

Console {
  log: [Function: bound consoleCall],
  ...
}

P.S. A-Z が先に出力されるのは、pipe() が Readable ストリームを Flowing モードに切り替えるため、最初に A-Z が「流れ出した」からです。

注意点として、双方向ストリームの Readable 部分と Writable 部分は完全に独立しており、読み書きは互いに影響しません。Duplex は単に2つの特性を1つのオブジェクトにまとめたものであり、2本の箸を束ねたような単方向パイプのようなものです。

変換(Transform)ストリームは面白い Duplex ストリームで、その出力は入力に基づいて計算されます。そのため read/write() メソッドを個別に実装する必要はなく、transform() メソッドを1つ実装するだけで済みます:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  // 関数シグネチャは write と同じです
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

同様に、Transform ストリームの Readable 部分と Writable 部分も独立しています(手動で push しない限り、自動的に Readable 部分へは伝わりません)。ただ形式的に組み合わされているだけです。

P.S. ストリーム間では Buffer/String 以外にも、オブジェクト(配列を含む)をやり取りすることもできます。詳細は Streams Object Mode を参照してください。

Node.js は zlibcrypto ストリームなど、いくつかの組み込み変換ストリームを提供しています:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

シンプルなコマンドラインツールでの Gzip 圧縮の例です。更なる例は Node’s built-in transform streams を参照してください。

参考文献

コメント

コメントはまだありません

コメントを書く