본문으로 건너뛰기

Nodejs 프로세스 간 통신

무료2018-02-16#Node#Node跨进程通信#Node创建进程#node ipc#node communication between processes

멀티코어/멀티머신 하의 멀티프로세스의 이점을 어떻게 충분히 활용하는가? 어떻게 프로세스를 건너 통신하는가?

一.시나리오

Node 는 싱글스레드로 실행되지만, 이는 멀티코어/멀티머신 하의 멀티프로세스의 이점을 이용할 수 없음을 의미하지 않습니다

사실, Node 는 처음부터 분산형 네트워크 시나리오를 고려하여 설계되었습니다:

Node is a single-threaded, single-process system which enforces shared-nothing design with OS process boundaries. It has rather good libraries for networking. I believe this to be a basis for designing very large distributed programs. The "nodes" need to be organized: given a communication protocol, told how to connect to each other. In the next couple months we are working on libraries for Node that allow these networks.

P.S.Node 가 왜 Node 라고 불리는지에 대해서는, Why is Node.js named Node.js? 참조

二.프로세스 생성

통신 방식은 프로세스 생성 방식과 관련되며, Node 에는 4 가지 프로세스 생성 방법이 있습니다: spawn(), exec(), execFile()fork()

spawn

const { spawn } = require('child_process');
const child = spawn('pwd');
// 带参数的形式
// const child = spawn('find', ['.', '-type', 'f']);

spawn()ChildProcess 인스턴스를 반환하며, ChildProcess 도 이벤트 메커니즘 (EventEmitter API) 에 기반하며, 몇 가지 이벤트를 제공합니다:

  • exit: 자프로세스가 종료될 때 트리거되며, 프로세스 종료 상태 (codesignal) 를 알 수 있습니다

  • disconnect: 부모 프로세스가 child.disconnect() 를 호출할 때 트리거됩니다

  • error: 자프로세스 생성이 실패했거나, kill 되었을 때 트리거됩니다

  • close: 자프로세스의 stdio 스트림 (표준 입출력 스트림) 이 닫힐 때 트리거됩니다

  • message: 자프로세스가 process.send() 로 메시지를 전송할 때 트리거되며, 부자 프로세스 간은 이내장된 메시지 메커니즘으로 통신할 수 있습니다

child.stdin, child.stdoutchild.stderr 를 통해 자프로세스의 stdio 스트림에 액세스할 수 있으며, 이러한 스트림이 닫힐 때, 자프로세스는 close 이벤트를 트리거합니다

P.S.closeexit 의 차이는 주로 멀티프로세스가 동일 stdio 스트림을 공유하는 시나리오에서 체현되며, 어떤 프로세스가 종료되었다고 해서 stdio 스트림이 닫힌 것을 의미하지는 않습니다

자프로세스 중, stdout/stderr 는 Readable 특성을 가지며, stdin 은 Writable 특성을 가지며, 주프로세스의 상황과 정반대입니다:

child.stdout.on('data', (data) => {
  console.log(`child stdout:\n${data}`);
});

child.stderr.on('data', (data) => {
  console.error(`child stderr:\n${data}`);
});

프로세스 stdio 스트림의 파이프 특성을 이용하여, 더 복잡한 일을 완료할 수 있습니다. 예를 들어:

const { spawn } = require('child_process');

const find = spawn('find', ['.', '-type', 'f']);
const wc = spawn('wc', ['-l']);

find.stdout.pipe(wc.stdin);

wc.stdout.on('data', (data) => {
  console.log(`Number of files ${data}`);
});

작용은 find . -type f | wc -l 과 동등하며, 현재 디렉토리의 파일 수를 재귀적으로 통계합니다

IPC 옵션

또한, spawn() 메서드의 stdio 옵션을 통해 IPC 메커니즘을 확립할 수 있습니다:

const { spawn } = require('child_process');

const child = spawn('node', ['./ipc-child.js'], { stdio: [null, null, null, 'ipc'] });
child.on('message', (m) => {
  console.log(m);
});
child.send('Here Here');

// ./ipc-child.js
process.on('message', (m) => {
  process.send(`< ${m}`);
  process.send('> 不要回答 x3');
});

spawn() 의 IPC 옵션의 상세 정보는, options.stdio 참조

exec

spawn() 메서드는 기본적으로传入된 명령을 실행하기 위해 shell 을 생성하지 않습니다 (그래서퍼포먼스 상에서 조금 좋음), exec() 메서드는 shell 을 생성합니다. 또한, exec() 는 stream 베이스가 아니며,传入된 명령의 실행 결과를 buffer 에 임시 저장한 후, 한꺼번에 콜백 함수에 전달합니다

exec() 메서드의 특징은완전히 shell 구문을 서포트하며, 임의의 shell 스크립트를 직접传入할 수 있다는 것입니다. 예를 들어:

const { exec } = require('child_process');

exec('find . -type f | wc -l', (err, stdout, stderr) => {
  if (err) {
    console.error(`exec error: ${err}`);
    return;
  }

  console.log(`Number of files ${stdout}`);
});

그러나 exec() 메서드도そのため 명령어 인젝션 의 보안 리스크가 존재하며, 사용자 입력 등의 동적 콘텐츠를 포함한 시나리오에서는 특히 주의가 필요합니다. 따라서, exec() 메서드의 적용 시나리오는: shell 구문을 직접 사용하고 싶고, 예상 출력 데이터량이 크지 않은 (메모리 압력이 없는) 경우입니다

그렇다면, shell 구문을 서포트하며, stream IO 의 이점도 가진 방식이 있는가?

있습니다. 양전기미의 방식은 다음과 같습니다:

const { spawn } = require('child_process');
const child = spawn('find . -type f | wc -l', {
  shell: true
});
child.stdout.pipe(process.stdout);

spawn()shell 옵션을 켜고, pipe() 메서드를 통해 자프로세스의 표준 출력을 간단히 현재 프로세스의 표준 입력에 연결하여, 명령 실행 결과를 볼 수 있도록 합니다. 실제로는 더 쉬운 방식이 있습니다:

const { spawn } = require('child_process');
process.stdout.on('data', (data) => {
  console.log(data);
});
const child = spawn('find . -type f | wc -l', {
  shell: true,
  stdio: 'inherit'
});

stdio: 'inherit' 는 자프로세스가 현재 프로세스의 표준 입출력을 상속하는 것을 허용 (stdin, stdoutstderr 를 공유) 하므로, 상기 예는 현재 프로세스 process.stdoutdata 이벤트를 감시함으로써 자프로세스의 출력 결과를 얻을 수 있습니다

또한, stdioshell 옵션 외에, spawn() 은 몇 가지 다른 옵션도 서포트합니다. 예를 들어:

const child = spawn('find . -type f | wc -l', {
  stdio: 'inherit',
  shell: true,
  // 환경 변수를 변경, 기본은 process.env
  env: { HOME: '/tmp/xxx' },
  // 현재 작업 디렉토리를 변경
  cwd: '/tmp',
  // 독립 프로세스로 존재
  detached: true
});

주의, env 옵션은 환경 변수 형식으로 자프로세스에 데이터를 전달하는 외에, 샌드박스식의 환경 변수 격리를 실현하기 위해서도 사용할 수 있으며, 기본적으로 process.env 를 자프로세스의 환경 변수 세트로 하고, 자프로세스는 현재 프로세스와 마찬가지로 모든 환경 변수에 액세스할 수 있습니다. 상기 예처럼 커스텀 오브젝트를 자프로세스의 환경 변수 세트로 지정한 경우, 자프로세스는 다른 환경 변수에 액세스할 수 없습니다

따라서, 환경 변수를 증/감하고 싶다면, 이렇게 해야 합니다:

var spawn_env = JSON.parse(JSON.stringify(process.env));

// remove those env vars
delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE;
delete spawn_env.ELECTRON_RUN_AS_NODE;

var sp = spawn(command, ['.'], {cwd: cwd, env: spawn_env});

detached 옵션은 더 재미있습니다:

const { spawn } = require('child_process');

const child = spawn('node', ['stuff.js'], {
  detached: true,
  stdio: 'ignore'
});

child.unref();

이 방식으로 생성된 독립 프로세스의 동작은 오퍼레이팅 시스템에 의존하며, Windows 상에서는 detached 자프로세스는 자체 console 윈도우를 가지며, Linux 상에서는 해당 프로세스는새로운 process group 을 생성합니다 (이 특성은 자프로세스 족을 관리하고, tree-kill 에 유사한 특성을 실현하는 데 사용할 수 있습니다)

unref() 메서드는 관계를 단절하는 데 사용되며, 이렇게 하여 "부모" 프로세스는 독립적으로 종료할 수 있습니다 (자프로세스가 함께 종료되는 것을 일으키지 않음). 그러나 이 때 자프로세스의 stdio 도 "부모" 프로세스로부터 독립되어 있어야 함에 주의하십시오.否则 "부모" 프로세스가 종료된 후에도 자프로세스는 영향을 받습니다

execFile

const { execFile } = require('child_process');
const child = execFile('node', ['--version'], (error, stdout, stderr) => {
  if (error) {
    throw error;
  }
  console.log(stdout);
});

exec() 메서드와 유사하지만, shell 을 통해 실행하지 않습니다 (그래서 퍼포먼스가 조금 좋음). 따라서실행 가능 파일을传入해야 합니다. Windows 하에서는 일부 파일은 직접 실행할 수 없습니다. 예를 들어 .bat.cmd 로, 이러한 파일은 execFile() 로 실행할 수 없으며, exec() 또는 shell 옵션을 켠 spawn() 에 의지해야 합니다

P.S.exec() 와 마찬가지로stream 베이스가 아니며, 마찬가지로 출력 데이터량의 리스크가 존재합니다

xxxSync

spawn, execexecFile 에는 모두 대응하는 동기 블로킹 버전이 있으며, 자프로세스가 종료할 때까지 기다립니다

const { 
  spawnSync, 
  execSync, 
  execFileSync,
} = require('child_process');

동기 메서드는 스크립트 태스크를 간소화하는 데 사용됩니다. 예를 들어 기동 플로우에서, 다른 때에는 이러한 메서드의 사용을 피해야 합니다

fork

fork()spawn() 의 변종으로, Node 프로세스를 생성하는 데 사용되며, 최대의 특징은 부자 프로세스가 자체적으로 통신 메커니즘 (IPC 파이프) 을 가지고 있다는 것입니다:

The child_process.fork() method is a special case of child_process.spawn() used specifically to spawn new Node.js processes. Like child_process.spawn(), a ChildProcess object is returned. The returned ChildProcess will have an additional communication channel built-in that allows messages to be passed back and forth between the parent and child. See subprocess.send() for details.

예를 들어:

var n = child_process.fork('./child.js');
n.on('message', function(m) {
  console.log('PARENT got message:', m);
});
n.send({ hello: 'world' });

// ./child.js
process.on('message', function(m) {
  console.log('CHILD got message:', m);
});
process.send({ foo: 'bar' });

fork() 가 자체적으로 통신 메커니즘을 가진 이점 때문에, 특히 시간이 걸리는 로직을 분할하는 데 적합합니다. 예를 들어:

const http = require('http');
const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
  };
  return sum;
};
const server = http.createServer();
server.on('request', (req, res) => {
  if (req.url === '/compute') {
    const sum = longComputation();
    return res.end(`Sum is ${sum}`);
  } else {
    res.end('Ok')
  }
});

server.listen(3000);

이렇게 하는 치명적인 문제는 누군가가 /compute 에 액세스하면, 후속 리퀘스트가 모두 적시에 처리될 수 없게 되는 것으로, 이벤트 루프가 아직 longComputation 에 의해 블로킹되어 있으며, 시간이 걸리는 계산이 종료될 때까지 서비스 능력을 회복할 수 없기 때문입니다

시간이 걸리는 조작이 주프로세스의 이벤트 루프를 블로킹하는 것을 피하기 위해, longComputation() 을 자프로세스에 분할할 수 있습니다:

// compute.js
const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
  };
  return sum;
};

// 스위치, 메시지를 받아서 시작
process.on('message', (msg) => {
  const sum = longComputation();
  process.send(sum);
});

주프로세스는 자프로세스를 기동하여 longComputation 을 실행:

const http = require('http');
const { fork } = require('child_process');

const server = http.createServer();

server.on('request', (req, res) => {
  if (req.url === '/compute') {
    const compute = fork('compute.js');
    compute.send('start');
    compute.on('message', sum => {
      res.end(`Sum is ${sum}`);
    });
  } else {
    res.end('Ok')
  }
});

server.listen(3000);

주프로세스의 이벤트 루프는 더 이상 시간이 걸리는 계산에 의해 블로킹되지 않지만, 프로세스 수량은 더욱 제한할 필요가 있으며,否则 리소스가 프로세스에 의해 소진되었을 때 서비스 능력은 여전히 영향을 받습니다

P.S.실제로, cluster 모듈은 멀티프로세스 서비스 능력의 캡슐화로, 思路는 이 간단한 예시와 유사합니다

三.통신 방식

1.stdin/stdout 를 통해 json 을 전달

stdin/stdout and a JSON payload

가장 직접적인 통신 방식으로, 자프로세스의 handle 를 취득한 후, 그 stdio 스트림에 액세스할 수 있으며, 그 후 일종의 message 형식을 약정하여 유쾌하게 통신:

const { spawn } = require('child_process');

child = spawn('node', ['./stdio-child.js']);
child.stdout.setEncoding('utf8');
// 父进程 - 发
child.stdin.write(JSON.stringify({
  type: 'handshake',
  payload: '你好吖'
}));
// 父进程 - 收
child.stdout.on('data', function (chunk) {
  let data = chunk.toString();
  let message = JSON.parse(data);
  console.log(`${message.type} ${message.payload}`);
});

자프로세스도 이와 유사:

// ./stdio-child.js
// 子进程 - 收
process.stdin.on('data', (chunk) => {
  let data = chunk.toString();
  let message = JSON.parse(data);
  switch (message.type) {
    case 'handshake':
      // 子进程 - 发
      process.stdout.write(JSON.stringify({
        type: 'message',
        payload: message.payload + ' : hoho'
      }));
      break;
    default:
      break;
  }
});

P.S.VS Code 프로세스 간 통신은 이 방식을 채택하고 있으며, 상세는 access electron API from vscode extension 참조

명확한제한은 "자" 프로세스의 handle 를 취득할 필요가 있으며, 2 개의 완전히 독립된 프로세스 간은 이 방식으로 통신할 수 없습니다 (예를 들어 크로스 애플리케이션, 심지어 크로스 머신의 시나리오)

P.S.stream 및 pipe 의 상세 정보는, Node 중의 스트림 참조

2.네이티브 IPC 서포트

spawn()fork() 의 예시처럼, 프로세스 간은 내장된 IPC 메커니즘을 빌려 통신할 수 있습니다

부모 프로세스:

  • process.on('message') 수신

  • child.send() 송신

자프로세스:

  • process.on('message') 수신

  • process.send() 송신

제한은 상기 동일하며, 마찬가지로 일방이 타방의 handle 를 취득할 수 있어야 합니다

3.sockets

네트워크를 빌려 프로세스 간 통신을 완료하며, 프로세스를 건너는 것뿐만 아니라, 머신도 건널 수 있습니다

node-ipc 는 이 방안을 채택하고 있으며, 예를 들어:

// server
const ipc=require('../../../node-ipc');

ipc.config.id = 'world';
ipc.config.retry= 1500;
ipc.config.maxConnections=1;

ipc.serveNet(
    function(){
        ipc.server.on(
            'message',
            function(data,socket){
                ipc.log('got a message : ', data);
                ipc.server.emit(
                    socket,
                    'message',
                    data+' world!'
                );
            }
        );

        ipc.server.on(
            'socket.disconnected',
            function(data,socket){
                console.log('DISCONNECTED\n\n',arguments);
            }
        );
    }
);
ipc.server.on(
    'error',
    function(err){
        ipc.log('Got an ERROR!',err);
    }
);
ipc.server.start();

// client
const ipc=require('node-ipc');

ipc.config.id = 'hello';
ipc.config.retry= 1500;

ipc.connectToNet(
    'world',
    function(){
        ipc.of.world.on(
            'connect',
            function(){
                ipc.log('## connected to world ##', ipc.config.delay);
                ipc.of.world.emit(
                    'message',
                    'hello'
                );
            }
        );
        ipc.of.world.on(
            'disconnect',
            function(){
                ipc.log('disconnected from world');
            }
        );
        ipc.of.world.on(
            'message',
            function(data){
                ipc.log('got a message from world : ', data);
            }
        );
    }
);

P.S.더 많은 예시는 RIAEvangelist/node-ipc 참조

물론,单机 시나리오에서 네트워크를 통해 프로세스 간 통신을 완료하는 것은 조금 퍼포먼스를 낭비하지만, 네트워크 통신의이점은 크로스 환경의 호환성과 더 나아간 RPC 시나리오입니다

4.message queue

부자 프로세스는 모두 외부 메시지 메커니즘을 통해 통신하며, 프로세스를 건너는 능력은 MQ 서포트에 의존

즉 프로세스 간은 직접 통신하지 않고, 중간층 (MQ) 을 통해, 제어층을 1 개 추가함으로써 더 많은 유연성과 이점을 획득:

  • 안정성: 메시지 메커니즘은 강력한 안정성 보장을 제공. 예를 들어 배달 확인 (메시지 회신 ACK), 실패 재송신/중복 송신 방지 등

  • 우선도 제어: 메시지 응답 순서를 조정 가능

  • 오프라인 능력: 메시지는 캐시 가능

  • 사무성 메시지 처리: 관련 메시지를 사무에 조합하여, 배달 순서 및 완전성을 보증

P.S.구현이 어렵다? 1 층 캡슐화로 해결할 수 있는가, 안 되면 2 층 캡슐화……

비교적 인기 있는 것은 smrchy/rsmq 로, 예를 들어:

// init
RedisSMQ = require("rsmq");
rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );
// create queue
rsmq.createQueue({qname:"myqueue"}, function (err, resp) {
    if (resp===1) {
      console.log("queue created")
    }
});
// send message
rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) {
  if (resp) {
    console.log("Message sent. ID:", resp);
  }
});
// receive message
rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) {
  if (resp.id) {
    console.log("Message received.", resp)	
  }
  else {
    console.log("No messages for me...")
  }
});

Redis server 를 기동하며, 기본 원리는 다음과 같습니다:

Using a shared Redis server multiple Node.js processes can send / receive messages.

메시지의 수신/송신/캐시/영속화는 Redis 가 제공하는 능력에 의존하며, 이 기초 위에서 완전한 큐 메커니즘을 실현

5.Redis

기본思路는 message queue 와 유사:

Use Redis as a message bus/broker.

Redis 는 자체적으로 Pub/Sub 메커니즘 (즉发布 - 購読 모드) 을 가지며, 간단한 통신 시나리오에 적합하며, 예를 들어 1 대 1 또는 1 대 다로메시지 신뢰성을気に하지 않는시나리오

또한, Redis 에는 list 구조가 있으며, 메시지 큐로 사용할 �� 있어, 이로써 메시지 신뢰성을 향상. 일반적인做法는 생산자가 LPUSH 메시지, 소비자가 BRPOP 메시지. 메시지 신뢰성을 요구하는 간단한 통신 시나리오에 적합하지만, 단점은 메시지가 상태를 가지지 않으며, ACK 메커니즘이 없어, 복잡한 통신 니즈를 충족할 수 없는 것입니다

P.S.Redis 의 Pub/Sub 예시는 What's the most efficient node.js inter-process communication library/method? 참조

四.정리

Node 프로세스 간 통신에는 4 가지 방식이 있습니다:

  • stdin/stdout 를 통해 json 을 전달: 가장 직접적인 방식으로, "자" 프로세스의 handle 를 취득할 수 있는 시나리오에 적합하며, 관련 프로세스 간 통신에 적합하며, 머신을 건널 수 없습니다

  • Node 네이티브 IPC 서포트: 가장 native(地道?) 한 방식으로, 위一种보다 "정규"하며, 동일한 국한성을 가집니다

  • sockets 를 통해: 가장通用的인 방식으로, 양호한 크로스 환경 능력을 가지지만, 네트워크의 퍼포먼스 손실이 존재

  • message queue 를 빌려: 가장 강력한 방식으로, 통신할 필요가 있고, 시나리오도 복잡하다면, 차라리 1 층 메시지 미들웨어를 확장하여, 깔끔하게 각종 통신 문제를 해결

참고 자료

댓글

아직 댓글이 없습니다

댓글 작성