본문으로 건너뛰기

Node.js의 스트림

무료2018-02-10#Node#nodejs stream#nodejs的流#nodejs stream highWaterMark#nodejs backpressure

Node.js 프로세스 간 통신, 스트림과 어떤 관계가 있을까요?

1. 스트림의 개념

stream은 데이터 집합으로, 배열이나 문자열과 비슷합니다. 하지만 stream은 모든 데이터를 한꺼번에 액세스하지 않고 부분적으로 전송/수신(chunk 방식)하므로 그렇게 큰 메모리를 차지하지 않으며, 특히 대량의 (외부) 데이터를 처리하는 시나리오에 적합합니다.

stream은 파이프라인(pipeline) 특성을 가집니다. 예를 들어:

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

프로세스의 stdin/stdout/stderr를 포함한 많은 네이티브 모듈이 stream을 기반으로 합니다.

native-stream-module

흔히 볼 수 있는 시나리오입니다:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

여기서 pipe 메서드는 읽기 가능 스트림(Readable Stream)의 출력(데이터 소스)을 쓰기 가능 스트림(Writable Stream)의 입력(대상)으로 사용합니다. 파일 읽기 출력 스트림을 HTTP 응답 출력 스트림에 입력으로 직접 연결하여 전체 파일을 메모리에 읽어 들이는 것을 방지합니다.

P.S. 심지어 일상적으로 사용하는 console.log() 내부 구현 또한 stream입니다.

2. 스트림의 유형

Node.js에는 4가지 기본 스트림이 있습니다:

  • Readable

읽기 가능 스트림은 소스에 대한 추상화로, 데이터를 소비할 수 있습니다. 예: fs.createReadStream

  • Writable

쓰기 가능 스트림은 데이터를 쓸 수 있는 대상에 대한 추상화입니다. 예: fs.createWriteStream

  • Duplex(듀플렉스)

듀플렉스 스트림은 읽기와 쓰기가 모두 가능합니다. 예: TCP socket

  • Transform(변환)

변환 스트림은 본질적으로 듀플렉스 스트림이며, 데이터를 쓰거나 읽을 때 데이터를 수정하거나 변환하는 데 사용됩니다. 예: zlib.createGzip은 gzip을 사용하여 데이터를 압축합니다.

변환 스트림은 입력은 쓰기 가능 스트림이고 출력은 읽기 가능 스트림인 함수로 볼 수 있습니다.

P.S. FP의 identity = x => x와 유사하게 (Pass)Through Stream(통과 스트림)이라고 불리는 변환 스트림도 있습니다.

3. 파이프라인

src.pipe(res)는 소스가 반드시 읽기 가능해야 하고 대상이 쓰기 가능해야 함을 요구합니다. 따라서 듀플렉스 스트림에 대해 파이프라인 전송을 수행하면 Linux의 파이프라인처럼 체이닝 호출이 가능합니다:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

pipe() 메서드는 대상 스트림을 반환하므로:

// a (readable), b and c (duplex), and d (writable)
a.pipe(b).pipe(c).pipe(d)
// 다음과 같습니다.
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Linux에서는 다음과 같습니다.
$ a | b | c | d

4. 스트림과 이벤트

이벤트 기반은 Node.js 설계의 중요한 특징 중 하나로, 스트림(stream 모듈)을 포함한 많은 Node.js 네이티브 객체가 이벤트 메커니즘(EventEmitter 모듈)을 기반으로 구현되어 있습니다:

Most of Node’s objects?—?like HTTP requests, responses, and streams?—?implement the EventEmitter module so they can provide a way to emit and listen to events.

모든 stream은 EventEmitter의 인스턴스이며, 이벤트 메커니즘을 통해 데이터를 읽고 씁니다. 예를 들어 앞서 언급한 pipe() 메서드는 다음과 같습니다:

// readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});

P.S. pipe는 에러 처리, EoF, 그리고 특정 스트림의 속도가 빠르거나 느린 경우와 같은 다른 일들도 처리합니다.

Readable과 Writable 스트림의 주요 이벤트와 메서드는 다음과 같습니다:

stream-events-and-function

Readable의 주요 이벤트:

  • data 이벤트: 스트림이 청크(chunk)를 소비자에게 전달할 때 발생합니다.

  • end 이벤트: 스트림에서 더 이상 가져올(consume) 데이터가 없을 때 발생합니다.

Writable의 주요 이벤트:

  • drain 이벤트: 흐름이 끊겼을 때 발생하며, 이는 Writable 스트림이 더 많은 데이터를 받을 수 있다는 신호입니다.

  • finish 이벤트: 모든 데이터가 하위 시스템으로 플러시(flush)되었을 때 발생합니다.

5. Readable 스트림의 두 가지 모드: Paused와 Flowing

Readable 스트림은 흐름(Flowing) 상태이거나 일시 정지(Paused) 상태입니다. 이는 각각 끌어오기(pull)와 밀어넣기(push) 모드라고도 불립니다.

생성 직후에는 기본적으로 Paused 상태이며, read() 메서드를 통해 데이터를 읽을 수 있습니다. Flowing 상태라면 데이터가 지속적으로 흘러나오며, 이때는 이벤트를 수신하여 데이터를 사용하기만 하면 됩니다. 소비자가 없다면 데이터가 손실되므로 보통 Readable 스트림의 data 이벤트를 수신합니다. 실제로 data 이벤트를 수신하면 Readable 스트림이 Paused 상태에서 Flowing 상태로 전환되며, data 이벤트 리스너를 제거하면 다시 전환됩니다. 수동으로 전환하려면 resume()pause()를 사용하면 됩니다.

pipe() 방식을 사용할 때는 이러한 사항을 신경 쓸 필요 없이 자동으로 처리됩니다:

  1. Readable이 data 이벤트를 발생시키다가 Writable이 감당하기 힘들 때까지 계속합니다.

  2. pipe가 신호를 받으면 Readable.pause()를 호출하여 Paused 모드로 진입합니다.

  3. Writable이 작업을 더 처리하여 부하가 줄어들면 drain 이벤트를 발생시키고, 이때 pipeReadable.resume()을 호출하여 Flowing 모드로 진입시켜 Readable이 다시 data 이벤트를 발생시키게 합니다.

highWaterMark와 백프레셔(backpressure)

사실 drain 이벤트는 백프레셔(backpressure) 현상에 대응하기 위한 것입니다. 간단히 말해, 백프레셔는 하류의 소비 속도가 전송을 제한하여 하류에서 상류로 향하는 반대 방향의 압력을 형성하는 것입니다.

소비 속도가 생산 속도보다 느리면 하류에 데이터가 쌓이게 되고, 처리되지 못한 데이터는 Writable의 버퍼에 보관됩니다. (유량 제한) 처리를 하지 않으면 이 버퍼는 계속 커져서 오버플로가 발생하거나 데이터 손실로 이어질 수 있습니다.

백프레셔 현상이 발생했다는 신호는 Writable.write()false를 반환하는 것입니다. 이는 상류로부터 온 처리 대기 데이터량이 highWaterMark(고수위선, 기본 16kb)에 도달했음을 의미합니다:

Buffer level when stream.write() starts returning false. Defaults to 16384 (16kb), or 16 for objectMode streams.

이는 하류가 조금 바빠지기 시작했다는 신호입니다. 이때 상류의 유량을 제한하는 것, 즉 Readable.pause()를 호출하여 일시 정지시키고 하류가 쌓인 데이터를 처리할 시간을 더 주는 것이 좋습니다. 하류가 여유가 생기면 drain 이벤트를 발생시켜 더 많은 데이터를 처리할 수 있음을 알리고, 이때 다시 밸브를 열어주어야(Readable.resume()) 합니다.

주의할 점은, Readable의 데이터는 Writable이 소비할 때까지 캐시에 보관된다는 것입니다. 따라서 Paused 상태는 하류로 흘려보내지 않는다는 것일 뿐, 이미 캐싱된 데이터는 여전히 Readable의 버퍼에 남아 있습니다. 유량 제한을 하지 않으면 처리되지 못한 데이터가 하류에 캐싱되어 계속 쌓이게 되지만, 유량 제한을 하면 이 데이터가 상류에 캐싱되어 더 이상 쌓이지 않게 됩니다.

또한, Readable에도 highWaterMark 개념이 있습니다:

The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams

이는 실제 데이터 소스(예: 디스크 파일)에서 읽어오는 속도에 대한 제한으로, 생산 속도가 너무 빨라 캐시가 쌓이는 것을 방지합니다. 따라서 Flowing Readable의 정상적인 작동 방식은 push()가 계속되다가 버퍼의 양이 청크 하나를 채울 만큼 모이면 하류로 내보내는 것입니다. 마찬가지로 Readable이 highWaterMark에 도달했다는 신호는 push()false를 반환하는 것입니다. 이는 Readable의 버퍼가 꽤 찼음을 의미하며, 이때도 계속 push()를 한다면 백프레셔가 발생합니다(Readable의 소비 능력이 데이터 소스에서 Readable로의 전송 속도를 제한함).

  빠름-------------느림
데이터 소스-------->Readable------->Writable
                 빠름--------------느림

상류(생산)가 빠르고 하류(소비)가 느리면 백프레셔가 발생하므로, readable.pipe(writable)의 단순한 시나리오에서도 위의 두 구간에서 백프레셔가 나타날 수 있습니다.

6. 예제

Writable 스트림

흔한 대용량 파일 생성 예제:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

fs.createWriteStream()을 통해 파일을 가리키는 Writable 스트림을 생성하고, write()로 데이터를 채운 뒤 작업이 끝나면 end()를 호출합니다.

또는 더 일반적인 방법으로, 직접 Writable을 생성합니다:

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    // nowrap version
    // process.stdout.write(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

현재 프로세스의 표준 입력을 사용자 정의 출력 스트림 outStream에 연결하는 아주 간단한 echo 구현입니다. 로그 미들웨어처럼 표준 입력이 outStream을 거쳐 callback을 통해 다음 작업을 수행합니다.

write() 메서드의 3개 인자 중 chunk는 Buffer이고, encoding은 특정 상황에서 필요하지만 대부분 무시해도 됩니다. callbackchunk 처리가 완료된 후 호출해야 하는 알림 함수로, 쓰기 성공 여부(실패 시 Error 객체 전달)를 나타내며 테일 트리거 메커니즘의 next()와 유사합니다.

더 간단한 echo 구현:

process.stdin.pipe(process.stdout);

표준 입력 스트림을 표준 출력 스트림에 직접 연결합니다.

Readable 스트림

const { Readable } = require('stream'); 
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);

push를 통해 Readable 스트림에 데이터를 채우고, push(null)로 종료를 알립니다. 위의 예제는 모든 데이터를 읽어 들인 후 표준 출력에 전달하지만, 실제로는 더 효율적인 방식(소비자에게 필요한 만큼 데이터를 밀어주는 방식)이 있습니다:

const { Readable } = require('stream'); 
const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

read() 메서드는 호출될 때마다 문자 하나를 내보냅니다. 소비자가 Readable 스트림에서 데이터를 가져올 때 read()가 지속적으로 발생합니다.

Duplex/Transform 스트림

Duplex 스트림은 Readable과 Writable의 특징을 모두 갖추고 있습니다. 데이터 소스(생산자)이자 대상(소비자)이 될 수 있습니다. 예:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

위의 예제는 앞의 두 예제를 결합한 것입니다. inoutStream이 표준 출력 스트림에 연결되어 A-Z가 데이터 소스로서 표준 출력에 전달(출력)되고, 동시에 표준 입력 스트림이 inoutStream에 연결되어 표준 입력으로부터 오는 모든 데이터가 log로 출력됩니다. 결과는 다음과 같습니다:

ABCDEFGHIJKLMNOPQRSTUVWXYZcc
oo
nn
ss
oo
ll
ee

Console {
  log: [Function: bound consoleCall],
  ...
}

P.S. A-Z가 먼저 출력되는 이유는 pipe()가 Readable 스트림을 Flowing 모드로 전환하기 때문에 처음부터 A-Z가 "흘러나오기" 때문입니다.

주의할 점은 Duplex 스트림의 Readable과 Writable 부분은 완전히 독립적이라는 것입니다. 읽기와 쓰기가 서로 영향을 주지 않으며, Duplex는 단지 두 기능을 하나의 객체로 묶은 것입니다. 마치 두 개의 젓가락을 하나로 묶어놓은 단방향 파이프라인과 같습니다.

Transform 스트림은 흥미로운 Duplex 스트림의 일종으로, 출력이 입력에 따라 계산되어 나옵니다. 따라서 read/write() 메서드를 각각 구현할 필요 없이 transform() 메서드 하나만 구현하면 됩니다:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  // 함수 시그니처는 write와 동일
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

마찬가지로 Transform 스트림의 Readable과 Writable 부분 또한 독립적이며(수동으로 push하지 않으면 Readable 부분으로 자동 전달되지 않음), 형태상으로만 결합되어 있습니다.

P.S. 또한 스트림 간에는 Buffer/String 외에도 Object(Array 포함)를 전달할 수 있습니다. 자세한 내용은 Streams Object Mode를 참조하세요.

Node.js는 zlib이나 crypto 스트림과 같은 몇 가지 네이티브 Transform 스트림을 제공합니다:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

단순한 명령줄 도구인 gzip 압축 예제입니다. 더 많은 예제는 Node’s built-in transform streams를 참조하세요.

참고 자료

댓글

아직 댓글이 없습니다

댓글 작성