본문으로 건너뛰기

Step 소스코드 해석_Node 비동기 플로우 제어 2

무료2016-06-18#Node#StepJS#Step模块#Node异步控制#node step

Step 은 마치 마술과 같음, 라이브러리를 이렇게 설계할 수도 있나?

일.기본 용법

Step 의 가장 큰 특징은1 개의 API만 제공한다는 것으로, 용법은 한눈에 기억할 수 있습니다. 예시는 다음과 같습니다:

var fs = require('fs');

var Step = require('./step.js');

Step(
    function readDir() {
        fs.readdir(__dirname, this);
    },
    function readFiles(err, results) {
        if (err) throw err;
        // Create a new group
        var group = this.group();
        results.forEach(function(filename) {
            if (/\.js$/.test(filename)) {
                fs.readFile(__dirname + "/" + filename, 'utf8', group());
            }
        });
    },
    function showAll(err, files) {
        if (err) throw err;
        files.forEach(function(text) {
            console.log(text.slice(0, 20));
        });
        // console.dir(files);
    }
);

먼저 현재 디렉토리를 읽고, 그 후 모든 파일을 병렬로 읽습니다. 매우 간결하며, 마치 작은 예술품과 같습니다:

  • Step() 은 임의의 수의 함수를 받아 순차 실행을 나타냅니다

  • 각 함수 내의 this 는 내장 콜백 함수를 나타내며, 1 개의 결과를 수집하여 다음 작업에 전달합니다

  • 각 함수 내의 this.group() 는 결과 그룹화를 나타내며, 병렬 요청을 지원하기 위해 사용되고, 여러 결과를 수집한 후 다음 작업에 전달합니다

이.구현思路

###1.순차 실행

Step() 은 생성자가아니라, 단순히 일반 함수입니다. 공개된 유일한 API 로, 순차 실행이 필요한 작업을 등록하는 데 사용됩니다

Step 내부는 전달된 각 작업을 수집하고, 순서 실행을 제어하며, thisthis.group() 를 통해 결과 수집 로직을 주입합니다. 예를 들어:

Step(function() {
    var _callback = this;
    setTimeout(function() {
        _callback(null, 1);
    }, 50);
    setTimeout(function() {
        _callback(null, 2, 4);
    }, 30);
}, function(err, res) {
    if (err) throw err;
    console.log(res);
});

this_callback)는 결과를 수집하여 다음 작업에 전달하고, res2 를 받습니다. 2 가지 문제가 있습니다:

  • 4 가 손실됨

  • 1 이 출력되지 않음

4 가 손실된 이유는 Step 이 각 작업의 반환 결과를 1 개의 값으로 제한하기 때문입니다. 따라서 각 작업의 첫 번째 파라미터는 예외 err, 두 번째는 이전 작업의 반환 결과 res 입니다

1 이 출력되지 않는 이유는 this() 가 현재 작업 결과를 다음 링에 전달하는 것을 나타내며, 현재 작업 종료를 의미하기 때문입니다. 따라서 이후의 중복 this() 는 무효입니다. 더 나아가, this 가 여러 결과를 수집할 수 없기 때문에 this.group() 로 그룹을 생성하여 병렬 상황을 처리해야 합니다

###2.병렬 요청

병렬 요청을 처리하기 위해,专门 group 그룹 메서드를 추가했습니다. 이전 예시 관찰:

// Create a new group
var group = this.group();
results.forEach(function(filename) {
    if (/\.js$/.test(filename)) {
        fs.readFile(__dirname + "/" + filename, 'utf8', group());
    }
});

그룹을 사용하여 여러 결과를 수집하는 데는 3 단계가 있음을 알 수 있습니다:

  1. this.group() 로 group 을 초기화하여 여러 결과를 받을 준비

  2. group() 으로 병렬 요청 등록

  3. group()() 로 결과 수집 및 해당 등록 항목 제거, 모든 등록 항목 완료 대기 후 결과 배열을 다음 링에 전달

구체적인 구현 방법은 EventProxy 중의 all() 과 유사하며, 내장 카운터로 순서대로 결과를 기록하고, 마지막으로 수집한 결과 배열을 전달합니다

###3.예외 처리

Step 의 예외 처리는 그리 아름답지 않습니다. 예시는 다음과 같습니다:

Step(function() {
    //...
}, function(err, res) {
    if (err) throw err;
    //...
}, function(err, res) {
    if (err) throw err;
    //...
});

제 1 링을 제외하고, 후속 모든 작업 내에서 수동으로 if...throw 를 배치해야 합니다. 즉, 예외 처리 메커니즘은 수동으로 err 를 뒤로 던지기를 요구하며, 그렇지 않으면 예외는 마지막 콜백 함수 (마지막 링) 에 전달되지 않습니다. 다음과 같습니다:

Step(function() {
    throw new Error('error occurs');
}, function(err) {
    // if (err) throw err;
    this(null, 'ok');
}, function(err, res) {
    if (!err) {
        console.log(res);   // ok
    }
});

제 1 링의 예외는 제 2 링에서 손실되고, 제 3 링은 예외를 받지 못했습니다. 예외를 잃는 것은 큰 일이 아니지만, 에러가 보고되지 않는 것은 조금 무섭습니다. 즉, 수동으로 if...throw 를 배치하는 것을 잊은 경우, 코드가 조용히 에러가 발생할 수 있으며, 이는매우 위험합니다

예외가 손실되는 근본적인 원인은 각 링 실행 시 모두 try 블록에 싸여 있으며, 예외가 있으면 다음 링에 전달되고, 파라미터를 통해 전달된 예외를 수동으로 다시 던지지 않으면 자연스럽게 손실되기 때문입니다

삼.소스코드 분석

기본 구조는 다음과 같습니다:

// 1.定義 Step() 接收一系列任务
    // 1.1 分离任务(steps)和回调(最后一环)
    // 1.2 初始化各个状态变量
// 2.定義 next() 作為內置的回調函數(供外部使用)
    // 2.1 從任務隊列中取出一個並放在 try 塊裡執行
    //    同時通過 this 注入 next 回調
    // 2.2 收集並傳遞結果/異常,執行下一環
// 3.定義 next.parallel(),僅供內部使用,配合 group
    // 3.1 計數器加加
    // 3.2 返回負責計數器減減的 callback
// 4.定義 next.group(),供外部使用
    // 4.1 初始化計數器及狀態變量
    // 4.2 返回負責收集結果的 callback
// 5.定義 Step.fn(),延長管道(支持添加第一環 leading 數據準備和額外最後一環 tailing 收尾)
    // 5.1 記錄參數傳入的各個任務
    // 5.2 返回新函數用來接收 leading 數據和最後的 tailing
    //    然後添上第一環和最後一環,並啟動 Step
// 6.暴露出 API
    // 6.1 按照 CommonJS 模塊定義的方式暴露出 Step()

제 1 단계는 진입 함수를 정의하고, 제 2 단계는 작업 순서 실행 지원 및 예외 처리를 제공하며, 제 3 과 4 단계는 병렬 요청 지원을 제공하고, 제 5 단계는 데이터 가져오기 지원 및 마무리 지원을 제공하여, 금상첨화입니다

구체적인 소스코드 (상세 주석 포함):

/*
Copyright (c) 2011 Tim Caswell <tim @creationix.com>

MIT License

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/

// Inspired by http://github.com/willconant/flow-js, but reimplemented and
// modified to fit my taste and the node.JS error handling system.

// 1.定義 Step() 接收一系列任务
    // 1.1 分离任务(steps)和回调(最后一环)
    // 1.2 初始化各个状态变量
// 2.定義 next() 作為內置的回調函數(供外部使用)
    // 2.1 從任務隊列中取出一個並放在 try 塊裡執行
    //    同時通過 this 注入 next 回調
    // 2.2 收集並傳遞結果/異常,執行下一環
// 3.定義 next.parallel(),僅供內部使用,配合 group
    // 3.1 計數器加加
    // 3.2 返回負責計數器減減的 callback
// 4.定義 next.group(),供外部使用
    // 4.1 初始化計數器及狀態變量
    // 4.2 返回負責收集結果的 callback
// 5.定義 Step.fn(),延長管道(支持添加第一環 leading 數據準備和額外最後一環 tailing 收尾)
    // 5.1 記錄參數傳入的各個任務
    // 5.2 返回新函數用來接收 leading 數據和最後的 tailing
    //    然後添上第一環和最後一環,並啟動 Step
// 6.暴露出 API
    // 6.1 按照 CommonJS 模塊定義的方式暴露出 Step()
function Step() {
  var steps = Array.prototype.slice.call(arguments),
      pending, counter, results, lock;
  // steps 는 호출자가 전달한 일련의 함수, 마지막은 콜백, 그 이전은 비동기/동기 작업
  // pending 은 실행 대기 항목 수
  // counter 는 병렬 실행의 총 항목 수
  // results 는 수집된 병렬 실행 steps 의 결과, results[0] 는 err, 그 후 각 step 에 대응
  // lock 은 다음 step 처리/실행 중

  // Define the main callback that's given as `this` to the steps.
  // 주 콜백 함수, 즉 steps 중의 this
  function next() {
    counter = pending = 0;

    // Check if there are no steps left
    if (steps.length === 0) {
      // Throw uncaught errors
      if (arguments[0]) {
        throw arguments[0];
      }
      return;
    }

    // Get the next step to execute
    // 선두에서 step 을 1 개 취득
    var fn = steps.shift();
    results = [];

    // Run the step in a try..catch block so exceptions don't get out of hand.
    // steps 의 실행은 try 중에 놓여지며, 미포획의 예외는 next(e) 로 다음 step 에 전달
    try {
      lock = true;
      //!!! steps 중, this 가 next 를 가리키는 이유
      var result = fn.apply(next, arguments);
    } catch (e) {
      // Pass any exceptions on through the next callback
      next(e);
    }

    if (counter > 0 && pending == 0) {
      // If parallel() was called, and all parallel branches executed
      // synchronously, go on to the next step immediately.
      // steps 의 병렬 실행을 제어
      next.apply(null, results);
    } else if (result !== undefined) {
      // If a synchronous return is used, pass it to the callback
      // 동기 step 의 반환값을 다음 step 에 전달
      next(undefined, result);
    }

    lock = false;
  }

  // Add a special callback generator `this.parallel()` that groups stuff.
  next.parallel = function () {
    // 매번 호출 시 카운터와 실행 대기 항목 수를 업데이트
    var index = 1 + counter++;  // 첫 번째 빈 자리를 err 에 남기고, 결과는 results[1] 부터 뒤로 배치
    pending++;

    return function () {
      // 콜백 실행 시, 실행 대기 항목 수를 업데이트
      pending--;
      // Compress the error from any result to the first argument
      if (arguments[0]) {
        results[0] = arguments[0];
      }
      // Send the other results as arguments
      results[index] = arguments[1];
      //!!! 제 3 파라미터 및 그 이후의 모든 파라미터를 폐기
      if (!lock && pending === 0) {
        // When all parallel branches done, call the callback
        // 실행 대기 항목이 없어짐, 다음 step 실행
        next.apply(null, results);
      }
else {
    console.warn('pending ' + pending + ' lock ' + lock);///
}
    };
  };

  // Generates a callback generator for grouped results
  next.group = function () {
    var localCallback = next.parallel();
    var counter = 0;
    var pending = 0;
    var result = [];
    var error = undefined;

    function check() {
console.log('from ' + arguments[0] + ' pending ' + pending);///
      if (pending === 0) {
        // When group is done, call the callback
        localCallback(error, result);
      }
    }
    //! this.group() 후 group() 호출이 없어 마지막 콜백을 트리거 못하는 상황을 피함
    process.nextTick(check); // Ensures that check is called at least once

    // Generates a callback for the group
    return function () {
      // parallel 과 유사, 카운터와 실행 대기 항목 수를 업데이트
      var index = counter++;
      pending++;
      return function () {
        pending--;
        // Compress the error from any result to the first argument
        if (arguments[0]) {
          error = arguments[0];
        }
        // Send the other results as arguments
        result[index] = arguments[1];
        // if (!lock) { check(); }
if (!lock) { check('tail'); }///
      };
    };
  };

  // Start the engine an pass nothing to the first step.
  // 단순히 counter 와 padding 을 초기화하기 위함
  next();
}

// Tack on leading and tailing steps for input and output and return
// the whole thing as a function.  Basically turns step calls into function
// factories.
Step.fn = function StepFn() {
  var steps = Array.prototype.slice.call(arguments);

  // 반환 함수는 받은 마지막 함수 파라미터를 추가의 마지막 링으로
  // 나머지는 파라미터로 제 1 링에 전달
  return function () {
    var args = Array.prototype.slice.call(arguments);

    // Insert a first step that primes the data stream
    // 데이터 스트림 준비의 1 단계를 삽입
    var toRun = [function () {
      this.apply(null, args);
    }].concat(steps);

    // If the last arg is a function add it as a last step
    // 즉 tailing hook 의 구현
    if (typeof args[args.length-1] === 'function') {
      toRun.push(args.pop());
    }


    Step.apply(null, toRun);
  }
}


// Hook into commonJS module systems
if (typeof module !== 'undefined' && "exports" in module) {
  module.exports = Step;
}

모든 세부사항은 주석 중에 있으며, 주의가 필요한 것은 nextTick check 의 문제로, 직감적으로는 오류가 있는 것처럼 느껴집니다. github 에도 친구가 같은 의문 을 제기했지만, 여러 번 테스트로 nextTick check 은 일반적으로 문제없음을 확인했습니다 (테스트 중에 이례적인 상황用例는 찾지 못했습니다). 게다가 해당 라이브러리는 상당히 긴 시간 검증을 거쳤으므로, 당분간안심하고 사용할 수 있습니다

Step 은 계속 발전하여「장대」해지는 것은 없고, 그小巧精致을 유지하기로 결정했지만, Step 에 존재하는 결함에 대해, 원저자에게는 몇 가지 새로운 개선 생각이 있어, twostepsafereturn 을 제안했습니다. 관심이 있는 분은관찰 연구할 수 있습니다. 원저자의 전투력은 매우 높아, 관찰은 확실히 이득입니다

P.S. 원저자의 비동기 제어 라이브러리 설계 이념은:

The general idea is to not impose structure on the programmer, but rather help them write good callback based functions.

让大家用得轻松,而不是强加一堆结构让人去习惯

사.정리

Step 은 마치 마술과 같음, 라이브러리를 이렇게 설계할 수도 있나?

小巧簡単などの 장점은 제쳐두고, Step 에는 다음과 같은 문제가 존재합니다:

  • this 가 내부 함수를 가리키므로, 각 step 중의 중첩 함수가 불편하게 사용됨 (that = this 또는 ES6 화살표 함수 사용)

  • 에러 처리 메커니즘은 각 step 이 if...throw 로 시작할 것을 요구하며, 그렇지 않으면 에러는 중간 step 을 통해 마지막 callback 에 전달되지 않음

  • 상태 폐기 메커니즘은, 복잡한 의존 상황에 대처할 수 없음을 의미함

this 의 문제는 실제로는 상상만큼 심각하지 않습니다. 에러 처리가 수동으로 if...throw 를 배치할 것을 요구하는 것은 확실히 조금 괴롭지만, 상태 폐기 메커니즘은 확실히 복잡한 상황에는 적용할 수 없습니다. 그러나 대부분의 시나리오에는 충분히 대응할 수 있습니다

참고 자료

  • 《深入浅出 NodeJS》

댓글

아직 댓글이 없습니다

댓글 작성