跳到主要內容
黯羽輕揚每天積累一點點

Nodejs 進程間通信

免費2018-02-16#Node#Node跨进程通信#Node创建进程#node ipc#node communication between processes

如何充分利用多核/多機下多進程的優勢?如何跨進程通信?

一。場景

Node 運行在單線程下,但這並不意味著無法利用多核/多機下多進程的優勢

事實上,Node 最初從設計上就考慮了分佈式網絡場景:

Node is a single-threaded, single-process system which enforces shared-nothing design with OS process boundaries. It has rather good libraries for networking. I believe this to be a basis for designing very large distributed programs. The "nodes" need to be organized: given a communication protocol, told how to connect to each other. In the next couple months we are working on libraries for Node that allow these networks.

P.S. 關於 Node 之所以叫 Node,見 Why is Node.js named Node.js?

二。創建進程

通信方式與進程產生方式有關,而 Node 有 4 種創建進程的方式:spawn()exec()execFile()fork()

spawn

const { spawn } = require('child_process');
const child = spawn('pwd');
// 帶參數的形式
// const child = spawn('find', ['.', '-type', 'f']);

spawn() 返回 ChildProcess 實例,ChildProcess 同樣基於事件機制(EventEmitter API),提供了一些事件:

  • exit:子進程退出時觸發,可以得知進程退出狀態(codesignal

  • disconnect:父進程調用 child.disconnect() 時觸發

  • error:子進程創建失敗,或被 kill 時觸發

  • close:子進程的 stdio 流(標準輸入輸出流)關閉時觸發

  • message:子進程通過 process.send() 發送消息時觸發,父子進程之間可以通過這種內置的消息機制通信

可以通過 child.stdinchild.stdoutchild.stderr 訪問子進程的 stdio 流,這些流被關閉的時,子進程會觸發 close 事件

P.S.closeexit 的區別主要體現在多進程共享同一 stdio 流的場景,某個進程退出了並不意味著 stdio 流被關閉了

在子進程中,stdout/stderr 具有 Readable 特性,而 stdin 具有 Writable 特性,與主進程的情況正好相反

child.stdout.on('data', (data) => {
  console.log(`child stdout:\n${data}`);
});

child.stderr.on('data', (data) => {
  console.error(`child stderr:\n${data}`);
});

利用進程 stdio 流的管道特性,就可以完成更複雜的事情,例如:

const { spawn } = require('child_process');

const find = spawn('find', ['.', '-type', 'f']);
const wc = spawn('wc', ['-l']);

find.stdout.pipe(wc.stdin);

wc.stdout.on('data', (data) => {
  console.log(`Number of files ${data}`);
});

作用等價於 find . -type f | wc -l,遞歸統計當前目錄文件數量

IPC 選項

另外,通過 spawn() 方法的 stdio 選項可以建立 IPC 機制:

const { spawn } = require('child_process');

const child = spawn('node', ['./ipc-child.js'], { stdio: [null, null, null, 'ipc'] });
child.on('message', (m) => {
  console.log(m);
});
child.send('Here Here');

// ./ipc-child.js
process.on('message', (m) => {
  process.send(`< ${m}`);
  process.send('> 不要回答 x3');
});

關於 spawn() 的 IPC 選項的詳細信息,請查看 options.stdio

exec

spawn() 方法默認不會創建 shell 去執行傳入的命令(所以性能上稍微好一點),而 exec() 方法會創建一個 shell。另外,exec() 不是基於 stream 的,而是把傳入命令的執行結果暫存到 buffer 中,再整個傳遞給回調函數

exec() 方法的特點是完全支持 shell 語法,可以直接傳入任意 shell 腳本,例如:

const { exec } = require('child_process');

exec('find . -type f | wc -l', (err, stdout, stderr) => {
  if (err) {
    console.error(`exec error: ${err}`);
    return;
  }

  console.log(`Number of files ${stdout}`);
});

exec() 方法也因此存在 命令注入 的安全風險,在含有用戶輸入等動態內容的場景要特別注意。所以,exec() 方法的適用場景是:希望直接使用 shell 語法,並且預期輸出數據量不大(不存在內存壓力)

那麼,有沒有既支持 shell 語法,還具有 stream IO 優勢的方式?

有。兩全其美的方式如下:

const { spawn } = require('child_process');
const child = spawn('find . -type f | wc -l', {
  shell: true
});
child.stdout.pipe(process.stdout);

開啟 spawn()shell 選項,並通過 pipe() 方法把子進程的標準輸出簡單地接到當前進程的標準輸入上,以便看到命令執行結果。實際上還有更容易的方式:

const { spawn } = require('child_process');
process.stdout.on('data', (data) => {
  console.log(data);
});
const child = spawn('find . -type f | wc -l', {
  shell: true,
  stdio: 'inherit'
});

stdio: 'inherit' 允許子進程繼承當前進程的標準輸入輸出(共享 stdinstdoutstderr),所以上例能夠通過監聽當前進程 process.stdoutdata 事件拿到子進程的輸出結果

另外,除了 stdioshell 選項,spawn() 還支持一些其它選項,如:

const child = spawn('find . -type f | wc -l', {
  stdio: 'inherit',
  shell: true,
  // 修改環境變量,默認 process.env
  env: { HOME: '/tmp/xxx' },
  // 改變當前工作目錄
  cwd: '/tmp',
  // 作為獨立進程存在
  detached: true
});

注意env 選項除了以環境變量形式向子進程傳遞數據外,還可以用來實現沙箱式的環境變量隔離,默認把 process.env 作為子進程的環境變量集,子進程與當前進程一樣能夠訪問所有環境變量,如果像上例中指定自定義對象作為子進程的環境變量集,子進程就無法訪問其它環境變量

所以,想要增/刪環境變量的話,需要這樣做:

var spawn_env = JSON.parse(JSON.stringify(process.env));

// remove those env vars
delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE;
delete spawn_env.ELECTRON_RUN_AS_NODE;

var sp = spawn(command, ['.'], {cwd: cwd, env: spawn_env});

detached 選項更有意思:

const { spawn } = require('child_process');

const child = spawn('node', ['stuff.js'], {
  detached: true,
  stdio: 'ignore'
});

child.unref();

以這種方式創建的獨立進程行為取決於操作系統,Windows 上 detached 子進程將擁有自己的 console 窗口,而 Linux 上該進程會創建新的 process group(這個特性可以用來管理子進程族,實現類似於 tree-kill 的特性)

unref() 方法用來斷絕關係,這樣「父」進程可以獨立退出(不會導致子進程跟著退出),但要注意這時子進程的 stdio 也應該獨立於「父」進程,否則「父」進程退出後子進程仍會受到影響

execFile

const { execFile } = require('child_process');
const child = execFile('node', ['--version'], (error, stdout, stderr) => {
  if (error) {
    throw error;
  }
  console.log(stdout);
});

exec() 方法類似,但不通過 shell 來執行(所以性能稍好一點),所以要求傳入可執行文件。Windows 下某些文件無法直接執行,比如 .bat.cmd,這些文件就不能用 execFile() 來執行,只能借助 exec() 或開啟了 shell 選項的 spawn()

P.S. 與 exec() 一樣也不是基於 stream 的,同樣存在輸出數據量風險

xxxSync

spawnexecexecFile 都有對應的同步阻塞版本,一直等到子進程退出

const { 
  spawnSync, 
  execSync, 
  execFileSync,
} = require('child_process');

同步方法用來簡化腳本任務,比如啟動流程,其它時候應該避免使用這些方法

fork

fork()spawn() 的變體,用來創建 Node 進程,最大的特點是父子進程自帶通信機制(IPC 管道):

The child_process.fork() method is a special case of child_process.spawn() used specifically to spawn new Node.js processes. Like child_process.spawn(), a ChildProcess object is returned. The returned ChildProcess will have an additional communication channel built-in that allows messages to be passed back and forth between the parent and child. See subprocess.send() for details.

例如:

var n = child_process.fork('./child.js');
n.on('message', function(m) {
  console.log('PARENT got message:', m);
});
n.send({ hello: 'world' });

// ./child.js
process.on('message', function(m) {
  console.log('CHILD got message:', m);
});
process.send({ foo: 'bar' });

因為 fork() 自帶通信機制的優勢,尤其適合用來拆分耗時邏輯,例如:

const http = require('http');
const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
  };
  return sum;
};
const server = http.createServer();
server.on('request', (req, res) => {
  if (req.url === '/compute') {
    const sum = longComputation();
    return res.end(`Sum is ${sum}`);
  } else {
    res.end('Ok')
  }
});

server.listen(3000);

這樣做的致命問題是一旦有人訪問 /compute,後續請求都無法及時處理,因為事件循環還被 longComputation 阻塞著,直到耗時計算結束才能恢復服務能力

為了避免耗時操作阻塞主進程的事件循環,可以把 longComputation() 拆分到子進程中:

// compute.js
const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
  };
  return sum;
};

// 開關,收到消息才開始做
process.on('message', (msg) => {
  const sum = longComputation();
  process.send(sum);
});

主進程開啟子進程執行 longComputation

const http = require('http');
const { fork } = require('child_process');

const server = http.createServer();

server.on('request', (req, res) => {
  if (req.url === '/compute') {
    const compute = fork('compute.js');
    compute.send('start');
    compute.on('message', sum => {
      res.end(`Sum is ${sum}`);
    });
  } else {
    res.end('Ok')
  }
});

server.listen(3000);

主進程的事件循環不會再被耗時計算阻塞,但進程數量還需要進一步限制,否則資源被進程消耗殆盡時服務能力仍會受到影響

P.S. 實際上,cluster 模塊就是對多進程服務能力的封裝,思路與這個簡單示例類似

三。通信方式

1. 通過 stdin/stdout 傳遞 json

stdin/stdout and a JSON payload

最直接的通信方式,拿到子進程的 handle 後,可以訪問其 stdio 流,然後約定一種 message 格式開始愉快地通信:

const { spawn } = require('child_process');

child = spawn('node', ['./stdio-child.js']);
child.stdout.setEncoding('utf8');
// 父進程 - 發
child.stdin.write(JSON.stringify({
  type: 'handshake',
  payload: '你好吖'
}));
// 父進程 - 收
child.stdout.on('data', function (chunk) {
  let data = chunk.toString();
  let message = JSON.parse(data);
  console.log(`${message.type} ${message.payload}`);
});

子進程與之類似:

// ./stdio-child.js
// 子進程 - 收
process.stdin.on('data', (chunk) => {
  let data = chunk.toString();
  let message = JSON.parse(data);
  switch (message.type) {
    case 'handshake':
      // 子進程 - 發
      process.stdout.write(JSON.stringify({
        type: 'message',
        payload: message.payload + ' : hoho'
      }));
      break;
    default:
      break;
  }
});

P.S. VS Code 進程間通信就採用了這種方式,具體見 access electron API from vscode extension

明顯的限制是需要拿到「子」進程的 handle,兩個完全獨立的進程之間無法通過這種方式來通信(比如跨應用,甚至跨機器的場景)

P.S. 關於 stream 及 pipe 的詳細信息,請查看 Node 中的流

2. 原生 IPC 支持

spawn()fork() 的例子,進程之間可以借助內置的 IPC 機制通信

父進程:

  • process.on('message')

  • child.send()

子進程:

  • process.on('message')

  • process.send()

限制同上,同樣要有一方能夠拿到另一方的 handle 才行

3. sockets

借助網絡來完成進程間通信,不僅能跨進程,還能跨機器

node-ipc 就採用這種方案,例如:

// server
const ipc=require('../../../node-ipc');

ipc.config.id = 'world';
ipc.config.retry= 1500;
ipc.config.maxConnections=1;

ipc.serveNet(
    function(){
        ipc.server.on(
            'message',
            function(data,socket){
                ipc.log('got a message : ', data);
                ipc.server.emit(
                    socket,
                    'message',
                    data+' world!'
                );
            }
        );

        ipc.server.on(
            'socket.disconnected',
            function(data,socket){
                console.log('DISCONNECTED\n\n',arguments);
            }
        );
    }
);
ipc.server.on(
    'error',
    function(err){
        ipc.log('Got an ERROR!',err);
    }
);
ipc.server.start();

// client
const ipc=require('node-ipc');

ipc.config.id = 'hello';
ipc.config.retry= 1500;

ipc.connectToNet(
    'world',
    function(){
        ipc.of.world.on(
            'connect',
            function(){
                ipc.log('## connected to world ##', ipc.config.delay);
                ipc.of.world.emit(
                    'message',
                    'hello'
                );
            }
        );
        ipc.of.world.on(
            'disconnect',
            function(){
                ipc.log('disconnected from world');
            }
        );
        ipc.of.world.on(
            'message',
            function(data){
                ipc.log('got a message from world : ', data);
            }
        );
    }
);

P.S. 更多示例見 RIAEvangelist/node-ipc

當然,單機場景下通過網絡來完成進程間通信有些浪費性能,但網絡通信的優勢在於跨環境的兼容性與更進一步的 RPC 場景

4. message queue

父子進程都通過外部消息機制來通信,跨進程的能力取決於 MQ 支持

即進程間不直接通信,而是通過中間層(MQ),加一個控制層就能獲得更多靈活性和優勢:

  • 穩定性:消息機制提供了強大的穩定性保證,比如確認送達(消息回執 ACK),失敗重發/防止多發等等

  • 優先級控制:允許調整消息響應次序

  • 離線能力:消息可以被緩存

  • 事務性消息處理:把關聯消息組合成事務,保證其送達順序及完整性

P.S. 不好實現?包一層能解決嗎,不行就包兩層……

比較受歡迎的有 smrchy/rsmq,例如:

// init
RedisSMQ = require("rsmq");
rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );
// create queue
rsmq.createQueue({qname:"myqueue"}, function (err, resp) {
    if (resp===1) {
      console.log("queue created")
    }
});
// send message
rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) {
  if (resp) {
    console.log("Message sent. ID:", resp);
  }
});
// receive message
rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) {
  if (resp.id) {
    console.log("Message received.", resp)	
  }
  else {
    console.log("No messages for me...")
  }
});

會起一個 Redis server,基本原理如下:

Using a shared Redis server multiple Node.js processes can send / receive messages.

消息的收/發/緩存/持久化依靠 Redis 提供的能力,在此基礎上實現完整的隊列機制

5. Redis

基本思路與 message queue 類似:

Use Redis as a message bus/broker.

Redis 自帶 Pub/Sub 機制(即發布 - 訂閱模式),適用於簡單的通信場景,比如一對一或一對多並且不關注消息可靠性的場景

另外,Redis 有 list 結構,可以用作消息隊列,以此提高消息可靠性。一般做法是生產者 LPUSH 消息,消費者 BRPOP 消息。適用於要求消息可靠性的簡單通信場景,但缺點是消息不具狀態,且沒有 ACK 機制,無法滿足複雜的通信需求

P.S. Redis 的 Pub/Sub 示例見 What's the most efficient node.js inter-process communication library/method?

四。總結

Node 進程間通信有 4 種方式:

  • 通過 stdin/stdout 傳遞 json:最直接的方式,適用於能夠拿到「子」進程 handle 的場景,適用於關聯進程之間通信,無法跨機器

  • Node 原生 IPC 支持:最 native(地道?)的方式,比上一種「正規」一些,具有同樣的局限性

  • 通過 sockets:最通用的方式,有良好的跨環境能力,但存在網絡的性能損耗

  • 借助 message queue:最強大的方式,既然要通信,場景還複雜,不妨擴展出一層消息中間件,漂亮地解決各種通信問題

參考資料

評論

暫無評論,快來發表你的看法吧

提交評論