跳到主要內容
黯羽輕揚每天積累一點點

Step 原始碼解讀_Node 非同步流程控制 2

免費2016-06-18#Node#StepJS#Step模块#Node异步控制#node step

Step 更像是個魔術,庫還能這麼設計?

##一。基本用法

Step 最大的特色是只提供了1 個 API,用法看一眼就能記住,示例如下:

var fs = require('fs');

var Step = require('./step.js');

Step(
    function readDir() {
        fs.readdir(__dirname, this);
    },
    function readFiles(err, results) {
        if (err) throw err;
        // Create a new group
        var group = this.group();
        results.forEach(function(filename) {
            if (/\.js$/.test(filename)) {
                fs.readFile(__dirname + "/" + filename, 'utf8', group());
            }
        });
    },
    function showAll(err, files) {
        if (err) throw err;
        files.forEach(function(text) {
            console.log(text.slice(0, 20));
        });
        // console.dir(files);
    }
);

先讀當前目錄,再併發讀取所有檔案。特別簡潔,像是一件小巧的藝術品:

  • Step() 接受任意個函式表示順序執行

  • 各函式中的 this 表示內建回撥函式,收集 1 個結果並傳遞到下一個任務

  • 各函式中的 this.group() 表示結果分組,用來支援併發請求,收集多個結果再傳遞到下一個任務

##二。實現思路

###1. 順序執行

Step()不是建構函式,只是一個普通函式。暴露出來的唯一 API,用來註冊需要順序執行的任務

Step 內部收集傳入的各個任務,控制順序執行,並通過 thisthis.group() 注入結果收集邏輯,例如:

Step(function() {
    var _callback = this;
    setTimeout(function() {
        _callback(null, 1);
    }, 50);
    setTimeout(function() {
        _callback(null, 2, 4);
    }, 30);
}, function(err, res) {
    if (err) throw err;
    console.log(res);
});

this_callback)收集結果並傳入下一個任務,res 接到 2。存在 2 個問題:

  • 4 丟了

  • 1 沒有輸出

4 丟了是因為 Step 限定每個任務的返回結果都是 1 個值,所以每個任務的第一個引數是異常 err,第二個是上一個任務的返回結果 res

1 沒有輸出是因為 this() 表示傳遞當前任務結果給下一環,標誌著當前任務結束,因此後來的重複 this() 無效。更進一步,正因為 this 無法收集多個���果,所以需要 this.group() 建立分組來處理併發情況

###2. 併發請求

為了處理併發請求,專門添了一個 group 分組方法,觀察之前的示例:

// Create a new group
var group = this.group();
results.forEach(function(filename) {
    if (/\.js$/.test(filename)) {
        fs.readFile(__dirname + "/" + filename, 'utf8', group());
    }
});

可以發現,利用分組收集多個結果有 3 個步驟:

  1. this.group() 初始化一個 group,準備接收多個結果

  2. group() 註冊併發請求

  3. group()() 收集結果,並移除對應的註冊項,等待所有註冊項都完成後把結果陣列傳給下一環

具體實現方法類似於 EventProxy 中的 all(),內建計數器按順序記錄結果,最後把收集到的結果陣列傳遞下去

###3. 異常處理

Step 的異常處理比較醜,示例如下:

Step(function() {
    //...
}, function(err, res) {
    if (err) throw err;
    //...
}, function(err, res) {
    if (err) throw err;
    //...
});

除第一環外,後續所有任務內都要人工置頂 if...throw。也就是說,異常處理機制要求手動後拋 err,否則異常無法傳入最後的回撥函式(最後一環)。如下:

Step(function() {
    throw new Error('error occurs');
}, function(err) {
    // if (err) throw err;
    this(null, 'ok');
}, function(err, res) {
    if (!err) {
        console.log(res);   // ok
    }
});

第一環的異常在第二環弄丟了,第三環沒有收到異常。丟個異常沒什麼,但不會報錯有點嚇人,也就是說,如果忘記了人工置頂 if...throw 的話,程式碼可能靜默出錯,這很危險

丟失異常的根本原因是每一個環節執行時都被包在 try 塊裡,如有異常則傳入下一環,通過引數傳入的異常不手動再拋自然就丟失了

##三。原始碼分析

基本結構如下:

// 1.定義 Step() 接收一系列任務
    // 1.1 分離任務(steps)和回撥(最後一環)
    // 1.2 初始化各個狀態變數
// 2.定義 next() 作為內建的回撥函式(供外部使用)
    // 2.1 從任務佇列中取出一個並放在 try 塊裡執行
    //    同時通過 this 注入 next 回撥
    // 2.2 收集並傳遞結果/異常,執行下一環
// 3.定義 next.parallel(),僅供內部使用,配合 group
    // 3.1 計數器加加
    // 3.2 返回負責計數器減減的 callback
// 4.定義 next.group(),供外部使用
    // 4.1 初始化計數器及狀態變數
    // 4.2 返回負責收集結果的 callback
// 5.定義 Step.fn(),延長管道(支援新增第一環 leading 資料準備和額外最後一環 tailing 收尾)
    // 5.1 記錄引數傳入的各個任務
    // 5.2 返回新函式用來接收 leading 資料和最後的 tailing
    //    然後添上第一環和最後一環,並啟動 Step
// 6.暴露出 API
    // 6.1 按照 CommonJS 模組定義的方式暴露出 Step()

第 1 步定義入口函式,第 2 步提供任務順序執行支援以及異常處理,第 3 和 4 步提供併發請求支援,第 5 步提供資料匯入支援和收尾支援,錦上添花

具體原始碼(附詳細註釋):

/*
Copyright (c) 2011 Tim Caswell <tim @creationix.com>

MIT License

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/

// Inspired by http://github.com/willconant/flow-js, but reimplemented and
// modified to fit my taste and the node.JS error handling system.

// 1.定義 Step() 接收一系列任務
    // 1.1 分離任務(steps)和回撥(最後一環)
    // 1.2 初始化各個狀態變數
// 2.定義 next() 作為內建的回撥函式(供外部使用)
    // 2.1 從任務佇列中取出一個並放在 try 塊裡執行
    //    同時通過 this 注入 next 回撥
    // 2.2 收集並傳遞結果/異常,執行下一環
// 3.定義 next.parallel(),僅供內部使用,配合 group
    // 3.1 計數器加加
    // 3.2 返回負責計數器減減的 callback
// 4.定義 next.group(),供外部使用
    // 4.1 初始化計數器及狀態變數
    // 4.2 返回負責收集結果的 callback
// 5.定義 Step.fn(),延長管道(支援新增第一環 leading 資料準備和額外最後一環 tailing 收尾)
    // 5.1 記錄引數傳入的各個任務
    // 5.2 返回新函式用來接收 leading 資料和最後的 tailing
    //    然後添上第一環和最後一環,並啟動 Step
// 6.暴露出 API
    // 6.1 按照 CommonJS 模組定義的方式暴露出 Step()
function Step() {
  var steps = Array.prototype.slice.call(arguments),
      pending, counter, results, lock;
  // steps 是呼叫者傳入的一系列函式,最後一個是回撥,之前的都是非同步/同步任務
  // pending 表示待執行項數
  // counter 表示併發執行的總項數
  // results 表示收集到的併發執行的 steps 的結果,results[0] 為 err,再往後與各 step 對應
  // lock 表示正在處理/執行下一個 step

  // Define the main callback that's given as `this` to the steps.
  // 主回撥函式,即 steps 中的 this
  function next() {
    counter = pending = 0;

    // Check if there are no steps left
    if (steps.length === 0) {
      // Throw uncaught errors
      if (arguments[0]) {
        throw arguments[0];
      }
      return;
    }

    // Get the next step to execute
    // 從頭取一個 step
    var fn = steps.shift();
    results = [];

    // Run the step in a try..catch block so exceptions don't get out of hand.
    // steps 的執行被放到了 try 中,未捕獲的異常通過 next(e) 傳遞給下一個 step
    try {
      lock = true;
      //!!! steps 中,this 指向 next 的原因
      var result = fn.apply(next, arguments);
    } catch (e) {
      // Pass any exceptions on through the next callback
      next(e);
    }

    if (counter > 0 && pending == 0) {
      // If parallel() was called, and all parallel branches executed
      // synchronously, go on to the next step immediately.
      // 控制 steps 併發執行
      next.apply(null, results);
    } else if (result !== undefined) {
      // If a synchronous return is used, pass it to the callback
      // 把同步 step 的返回值傳入下一個 step
      next(undefined, result);
    }

    lock = false;
  }

  // Add a special callback generator `this.parallel()` that groups stuff.
  next.parallel = function () {
    // 每次呼叫更新計數器與待執行項數
    var index = 1 + counter++;  // 第一個空位留給 err,結果從 results[1] 開始往後放
    pending++;

    return function () {
      // 回撥執行時,更新待執行項數
      pending--;
      // Compress the error from any result to the first argument
      if (arguments[0]) {
        results[0] = arguments[0];
      }
      // Send the other results as arguments
      results[index] = arguments[1];
      //!!! 丟棄第 3 個引數及之後的所有引數
      if (!lock && pending === 0) {
        // When all parallel branches done, call the callback
        // 沒有待執行項了,執行下一個 step
        next.apply(null, results);
      }
else {
    console.warn('pending ' + pending + ' lock ' + lock);///
}
    };
  };

  // Generates a callback generator for grouped results
  next.group = function () {
    var localCallback = next.parallel();
    var counter = 0;
    var pending = 0;
    var result = [];
    var error = undefined;

    function check() {
console.log('from ' + arguments[0] + ' pending ' + pending);///
      if (pending === 0) {
        // When group is done, call the callback
        localCallback(error, result);
      }
    }
    //! 避免因為 this.group() 之後無 group() 呼叫而無法觸發最後一個回撥的情況
    process.nextTick(check); // Ensures that check is called at least once

    // Generates a callback for the group
    return function () {
      // 類似於 parallel,更新計數器和待執行項數
      var index = counter++;
      pending++;
      return function () {
        pending--;
        // Compress the error from any result to the first argument
        if (arguments[0]) {
          error = arguments[0];
        }
        // Send the other results as arguments
        result[index] = arguments[1];
        // if (!lock) { check(); }
if (!lock) { check('tail'); }///
      };
    };
  };

  // Start the engine an pass nothing to the first step.
  // 僅僅為了初始化 counter 和 padding
  next();
}

// Tack on leading and tailing steps for input and output and return
// the whole thing as a function.  Basically turns step calls into function
// factories.
Step.fn = function StepFn() {
  var steps = Array.prototype.slice.call(arguments);

  // 返回的函式會把接收到的最後一個函式引數作為額外的最後一環
  // 其餘的作為引數傳入第一環
  return function () {
    var args = Array.prototype.slice.call(arguments);

    // Insert a first step that primes the data stream
    // 插入一步資料流準備
    var toRun = [function () {
      this.apply(null, args);
    }].concat(steps);

    // If the last arg is a function add it as a last step
    // 即 tailing hook 的實現
    if (typeof args[args.length-1] === 'function') {
      toRun.push(args.pop());
    }


    Step.apply(null, toRun);
  }
}


// Hook into commonJS module systems
if (typeof module !== 'undefined' && "exports" in module) {
  module.exports = Step;
}

所有細節都在註釋裡,需要注意的是 nextTick check 的問題,直覺感覺有錯,github 也有朋友提出了 相同的疑問,但經過多次測試確認 nextTick check 一般情況下沒問題(測試中沒有找到二般情況用例),而且該庫已經經過相當長的時間檢驗了,姑且可以放心使用

Step 沒有繼續發展“壯大”,決定保持其小巧精緻,但對於 Step 存在的缺陷,原作者有一些新的改進想法,提出了 twostepsafereturn,有興趣可以觀摩研究,原作者戰鬥力很高,觀摩穩賺不賠

P.S. 原作者對於非同步控制庫的設計理念是:

The general idea is to not impose structure on the programmer, but rather help them write good callback based functions.

讓大家用得輕鬆,而不是強加一堆結構讓人去習慣

##四。總結

Step 更像是個魔術,庫還能這麼設計?

撇開小巧簡單等優點不談,Step 存在以下問題:

  • this 指向內部函式,導致各個 step 中巢狀函式不便使用(that = this 或者使用 ES6 箭頭函式)

  • 錯誤處理機制要求每個 step 以 if...throw 開始,否則錯誤無法通過中間 step 傳遞至最後的 callback

  • 狀態丟棄機制,註定無法應對複雜依賴的情況

this 的問題其實沒有想象的那麼嚴重,錯誤處理要求人工置頂 if...throw 確實有點難受,狀態丟棄機制必然不適用於複雜情況,但足夠應付大多數場景了

##參考資料

  • 《深入淺出 NodeJS》

評論

暫無評論,快來發表你的看法吧

提交評論