一.シナリオ
Node はシングルスレッドで実行されますが、これはマルチコア/マルチマシン下のマルチプロセスの利点を利用できないことを意味しません
実際、Node は最初から分散型ネットワークシナリオを考慮して設計されていました:
Node is a single-threaded, single-process system which enforces shared-nothing design with OS process boundaries. It has rather good libraries for networking. I believe this to be a basis for designing very large distributed programs. The "nodes" need to be organized: given a communication protocol, told how to connect to each other. In the next couple months we are working on libraries for Node that allow these networks.
P.S.Node がなぜ Node と呼ばれるかについては、Why is Node.js named Node.js? を参照
二.プロセスの作成
通信方式はプロセスの生成方式に関連し、Node には 4 種類のプロセス作成方法があります:spawn()、exec()、execFile() および fork()
spawn
const { spawn } = require('child_process');
const child = spawn('pwd');
// 带参数的形式
// const child = spawn('find', ['.', '-type', 'f']);
spawn() は ChildProcess インスタンスを返し、ChildProcess もイベント機制(EventEmitter API)に基づき、いくつかのイベントを提供します:
-
exit:子プロセスが終了する時にトリガーされ、プロセス終了状態(codeとsignal)を知ることができます -
disconnect:親プロセスがchild.disconnect()を呼び出す時にトリガーされます -
error:子プロセスの作成が失敗した時、またはkillされた時にトリガーされます -
close:子プロセスのstdioストリーム(標準入出力ストリーム)が閉じられる時にトリガーされます -
message:子プロセスがprocess.send()でメッセージを送信する時にトリガーされ、親子プロセス間はこの内蔵のメッセージ機制で通信できます
child.stdin、child.stdout および child.stderr を通じて子プロセスの stdio ストリームにアクセスでき、これらのストリームが閉じられる時、子プロセスは close イベントをトリガーします
P.S.close と exit の違いは主にマルチプロセスが同一 stdio ストリームを共有するシナリオで体现され、あるプロセスが終了したからといって stdio ストリームが閉じられたことを意味しません
子プロセス中、stdout/stderr は Readable 特性を持ち、stdin は Writable 特性を持ち、主プロセスの状況と正反対です:
child.stdout.on('data', (data) => {
console.log(`child stdout:\n${data}`);
});
child.stderr.on('data', (data) => {
console.error(`child stderr:\n${data}`);
});
プロセス stdio ストリームのパイプ特性を利用して、より複雑なことを完了できます。例えば:
const { spawn } = require('child_process');
const find = spawn('find', ['.', '-type', 'f']);
const wc = spawn('wc', ['-l']);
find.stdout.pipe(wc.stdin);
wc.stdout.on('data', (data) => {
console.log(`Number of files ${data}`);
});
作用は find . -type f | wc -l と同等で、現在のディレクトリのファイル数を再帰的に統計します
IPC オプション
さらに、spawn() メソッドの stdio オプションを通じて IPC 機制を確立できます:
const { spawn } = require('child_process');
const child = spawn('node', ['./ipc-child.js'], { stdio: [null, null, null, 'ipc'] });
child.on('message', (m) => {
console.log(m);
});
child.send('Here Here');
// ./ipc-child.js
process.on('message', (m) => {
process.send(`< ${m}`);
process.send('> 不要回答 x3');
});
spawn() の IPC オプションの詳細情報については、options.stdio を参照
exec
spawn() メソッドはデフォルトで传入されたコマンドを実行するために shell を作成しません(そのためパフォーマンス上で少し良い)が、exec() メソッドは shell を作成します。さらに、exec() は stream ベースではなく、传入されたコマンドの実行結果を buffer に一時保存し、その後まとめてコールバック関数に渡します
exec() メソッドの特徴は完全に shell 構文をサポートし、任意の shell スクリプトを直接传入できることです。例えば:
const { exec } = require('child_process');
exec('find . -type f | wc -l', (err, stdout, stderr) => {
if (err) {
console.error(`exec error: ${err}`);
return;
}
console.log(`Number of files ${stdout}`);
});
しかし exec() メソッドもそのため コマンドインジェクション のセキュリティリスクが存在し、ユーザー入力などの動的コンテンツを含むシナリオでは特に注意が必要です。したがって、exec() メソッドの適用シナリオは:shell 構文を直接使用したい、かつ予想出力データ量が大きくない(メモリ圧力がない)場合です
では、shell 構文をサポートし、かつ stream IO の利点を持つ方式はあるか?
あります。両全其美の方式は以下の通り:
const { spawn } = require('child_process');
const child = spawn('find . -type f | wc -l', {
shell: true
});
child.stdout.pipe(process.stdout);
spawn() の shell オプションをオンにし、pipe() メソッドを通じて子プロセスの標準出力を簡単に現在のプロセスの標準入力に接続し、コマンド実行結果を見られるようにします。実際にはさらに簡単な方式があります:
const { spawn } = require('child_process');
process.stdout.on('data', (data) => {
console.log(data);
});
const child = spawn('find . -type f | wc -l', {
shell: true,
stdio: 'inherit'
});
stdio: 'inherit' は子プロセスが現在のプロセスの標準入出力を継承することを許可(stdin、stdout および stderr を共有)するため、上記例は現在のプロセス process.stdout の data イベントを監視することで子プロセスの出力結果を取得できます
さらに、stdio および shell オプションの他に、spawn() はいくつかの他のオプションもサポートします。例えば:
const child = spawn('find . -type f | wc -l', {
stdio: 'inherit',
shell: true,
// 環境変数を変更、デフォルトは process.env
env: { HOME: '/tmp/xxx' },
// 現在の作業ディレクトリを変更
cwd: '/tmp',
// 独立プロセスとして存在
detached: true
});
注意、env オプションは環境変数形式で子プロセスにデータを伝達する他に、サンドボックス式の環境変数隔離を実現するためにも使用でき、デフォルトで process.env を子プロセスの環境変数セットとし、子プロセスは現在のプロセスと同じようにすべての環境変数にアクセスできます。上記例のようにカスタムオブジェクトを子プロセスの環境変数セットとして指定した場合、子プロセスは他の環境変数にアクセスできません
したがって、環境変数を増/減したい場合、このようにする必要があります:
var spawn_env = JSON.parse(JSON.stringify(process.env));
// remove those env vars
delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE;
delete spawn_env.ELECTRON_RUN_AS_NODE;
var sp = spawn(command, ['.'], {cwd: cwd, env: spawn_env});
detached オプションはさらに面白いです:
const { spawn } = require('child_process');
const child = spawn('node', ['stuff.js'], {
detached: true,
stdio: 'ignore'
});
child.unref();
この方式で作成された独立プロセスの動作はオペレーティングシステムに依存し、Windows 上では detached 子プロセスは独自の console ウィンドウを持ち、Linux 上では該プロセスは新しい process group を作成します(この特性は子プロセス族を管理し、tree-kill に類似した特性を実現するために使用できます)
unref() メソッドは関係を断絶するために使用され、こうして「親」プロセスは独立して終了できます(子プロセスが一緒に終了することを引き起こさない)が、この時子プロセスの stdio も「親」プロセスから独立しているべきであることに注意してください。否则「親」プロセスが終了した後も子プロセスは影響を受けます
execFile
const { execFile } = require('child_process');
const child = execFile('node', ['--version'], (error, stdout, stderr) => {
if (error) {
throw error;
}
console.log(stdout);
});
exec() メソッドと類似していますが、shell を通じて実行しません(そのためパフォーマンスが少し良い)ので、実行可能ファイルを传入する必要があります。Windows 下では一部のファイルは直接実行できません。例えば .bat と .cmd で、これらのファイルは execFile() で実行できず、exec() または shell オプションをオンにした spawn() に頼る必要があります
P.S.exec() と同様にstream ベースではなく、同様に出力データ量のリスクが存在します
xxxSync
spawn、exec および execFile にはすべて対応する同期ブロッキングバージョンがあり、子プロセスが終了するまで待ち続けます
const {
spawnSync,
execSync,
execFileSync,
} = require('child_process');
同期メソッドはスクリプトタスクを簡素化するために使用されます。例えば起動フローで、他の時にはこれらのメソッドの使用を避けるべきです
fork
fork() は spawn() の変種で、Node プロセスを作成するために使用され、最大の特點は親子プロセスが自前で通信機制(IPC パイプ)を持っていることです:
The child_process.fork() method is a special case of child_process.spawn() used specifically to spawn new Node.js processes. Like child_process.spawn(), a ChildProcess object is returned. The returned ChildProcess will have an additional communication channel built-in that allows messages to be passed back and forth between the parent and child. See subprocess.send() for details.
例えば:
var n = child_process.fork('./child.js');
n.on('message', function(m) {
console.log('PARENT got message:', m);
});
n.send({ hello: 'world' });
// ./child.js
process.on('message', function(m) {
console.log('CHILD got message:', m);
});
process.send({ foo: 'bar' });
fork() が自前で通信機制を持つ利点のため、特に時間がかかるロジックを分割するために適しています。例えば:
const http = require('http');
const longComputation = () => {
let sum = 0;
for (let i = 0; i < 1e9; i++) {
sum += i;
};
return sum;
};
const server = http.createServer();
server.on('request', (req, res) => {
if (req.url === '/compute') {
const sum = longComputation();
return res.end(`Sum is ${sum}`);
} else {
res.end('Ok')
}
});
server.listen(3000);
このようにする致命的な問題は、誰かが /compute にアクセスすると、後続のリクエストがすべてタイムリーに処理できなくなることで、イベントループがまだ longComputation によってブロックされており、時間がかかる計算が終了するまでサービス能力を回復できないためです
時間がかかる操作が主プロセスのイベントループをブロックするのを避けるために、longComputation() を子プロセスに分割できます:
// compute.js
const longComputation = () => {
let sum = 0;
for (let i = 0; i < 1e9; i++) {
sum += i;
};
return sum;
};
// スイッチ、メッセージを受け取ってから開始
process.on('message', (msg) => {
const sum = longComputation();
process.send(sum);
});
主プロセスは子プロセスを起動して longComputation を実行:
const http = require('http');
const { fork } = require('child_process');
const server = http.createServer();
server.on('request', (req, res) => {
if (req.url === '/compute') {
const compute = fork('compute.js');
compute.send('start');
compute.on('message', sum => {
res.end(`Sum is ${sum}`);
});
} else {
res.end('Ok')
}
});
server.listen(3000);
主プロセスのイベントループはもう時間がかかる計算によってブロックされませんが、プロセス数量はさらに制限する必要があり、否则リソースがプロセスによって消耗し尽くされた時、サービス能力は依然として影響を受けます
P.S.実際、cluster モジュールはマルチプロセスサービス能力のカプセル化で、思路はこの简单な示例と類似しています
三.通信方式
1.stdin/stdout を通じて json を伝達
stdin/stdout and a JSON payload
最も直接的な通信方式で、子プロセスの handle を取得した後、その stdio ストリームにアクセスでき、その後一種の message 形式を約定して愉快に通信:
const { spawn } = require('child_process');
child = spawn('node', ['./stdio-child.js']);
child.stdout.setEncoding('utf8');
// 父进程 - 发
child.stdin.write(JSON.stringify({
type: 'handshake',
payload: '你好吖'
}));
// 父进程 - 收
child.stdout.on('data', function (chunk) {
let data = chunk.toString();
let message = JSON.parse(data);
console.log(`${message.type} ${message.payload}`);
});
子プロセスもこれと類似:
// ./stdio-child.js
// 子进程 - 收
process.stdin.on('data', (chunk) => {
let data = chunk.toString();
let message = JSON.parse(data);
switch (message.type) {
case 'handshake':
// 子进程 - 发
process.stdout.write(JSON.stringify({
type: 'message',
payload: message.payload + ' : hoho'
}));
break;
default:
break;
}
});
P.S.VS Code プロセス間通信はこの方式を採用しており、詳細は access electron API from vscode extension を参照
明らかな制限は「子」プロセスの handle を取得する必要があり、2 つの完全に独立したプロセス間はこの方式で通信できません(例えばクロスアプリケーション、さらにはクロスマシンのシナリオ)
P.S.stream および pipe の詳細情報については、Node 中のストリーム を参照
2.ネイティブ IPC サポート
spawn() および fork() の示例の如く、プロセス間は内蔵の IPC 機制を借りて通信できます
親プロセス:
-
process.on('message')受信 -
child.send()送信
子プロセス:
-
process.on('message')受信 -
process.send()送信
制限は上記と同じで、同様に一方が他方の handle を取得できる必要があります
3.sockets
ネットワークを借りてプロセス間通信を完了し、プロセスを跨ぐだけでなく、マシンも跨げます
node-ipc はこの方案を採用しており、例えば:
// server
const ipc=require('../../../node-ipc');
ipc.config.id = 'world';
ipc.config.retry= 1500;
ipc.config.maxConnections=1;
ipc.serveNet(
function(){
ipc.server.on(
'message',
function(data,socket){
ipc.log('got a message : ', data);
ipc.server.emit(
socket,
'message',
data+' world!'
);
}
);
ipc.server.on(
'socket.disconnected',
function(data,socket){
console.log('DISCONNECTED\n\n',arguments);
}
);
}
);
ipc.server.on(
'error',
function(err){
ipc.log('Got an ERROR!',err);
}
);
ipc.server.start();
// client
const ipc=require('node-ipc');
ipc.config.id = 'hello';
ipc.config.retry= 1500;
ipc.connectToNet(
'world',
function(){
ipc.of.world.on(
'connect',
function(){
ipc.log('## connected to world ##', ipc.config.delay);
ipc.of.world.emit(
'message',
'hello'
);
}
);
ipc.of.world.on(
'disconnect',
function(){
ipc.log('disconnected from world');
}
);
ipc.of.world.on(
'message',
function(data){
ipc.log('got a message from world : ', data);
}
);
}
);
P.S.より多くの示例は RIAEvangelist/node-ipc を参照
もちろん、単機シナリオでネットワークを通じてプロセス間通信を完了するのは少しパフォーマンスを浪費しますが、ネットワーク通信の利点はクロス環境の互換性とさらに進んだ RPC シナリオです
4.message queue
親子プロセスはすべて外部メッセージ機制を通じて通信し、プロセスを跨ぐ能力は MQ サポートに依存
つまりプロセス間は直接通信せず、中間層(MQ)を通じて、制御層を 1 つ追加することでより多くの柔軟性と利点を獲得:
-
安定性:メッセージ機制は強力な安定性保証を提供。例えば配達確認(メッセージ返信 ACK)、失敗再送信/多重送信防止など
-
優先度制御:メッセージ応答順序を調整可能
-
オフライン能力:メッセージはキャッシュ可能
-
事務性メッセージ処理:関連メッセージを事務に組み合わせ、配達順序及び完全性を保証
P.S.実装が難しい?1 層カプセル化で解決できますか。ダメなら 2 層カプセル化……
比較的人気があるのは smrchy/rsmq で、例えば:
// init
RedisSMQ = require("rsmq");
rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );
// create queue
rsmq.createQueue({qname:"myqueue"}, function (err, resp) {
if (resp===1) {
console.log("queue created")
}
});
// send message
rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}
});
// receive message
rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) {
if (resp.id) {
console.log("Message received.", resp)
}
else {
console.log("No messages for me...")
}
});
Redis server を起動し、基本原理は以下の通り:
Using a shared Redis server multiple Node.js processes can send / receive messages.
メッセージの受信/送信/キャッシュ/永続化は Redis が提供する能力に依存し、この基礎上で完全なキュー機制を実現
5.Redis
基本思路は message queue と類似:
Use Redis as a message bus/broker.
Redis 自前で Pub/Sub 機制(つまり发布 - 購読モード)を持ち、简单な通信シナリオに適しており、例えば 1 対 1 または 1 対多でメッセージ信頼性を気にしないシナリオ
さらに、Redis には list 構造があり、メッセージキューとして使用でき、これによりメッセージ信頼性を向上。一般的な做法は生産者が LPUSH メッセージ、消費者が BRPOP メッセージ。メッセージ信頼性を要求する简单な通信シナリオに適していますが、欠点はメッセージが状態を持たず、ACK 機制がなく、複雑な通信ニーズを満たせないことです
P.S.Redis の Pub/Sub 示例は What's the most efficient node.js inter-process communication library/method? を参照
四.まとめ
Node プロセス間通信には 4 種類の方式があります:
-
stdin/stdout を通じて json を伝達:最も直接的な方式で、「子」プロセスの handle を取得できるシナリオに適し、関連プロセス間通信に適し、マシンを跨げません
-
Node ネイティブ IPC サポート:最も native(地道?)な方式で、上一种より「正規」で、同様の局限性を持ちます
-
sockets を通じて:最も通用的な方式で、良好なクロス環境能力を持ちますが、ネットワークのパフォーマンス損失が存在
-
message queue を借りて:最も強力な方式で、通信する必要があり、シナリオも複雑なら、いっそ 1 層メッセージ中間件を拡張し、綺麗に各種通信問題を解決
コメントはまだありません