一.基本用法
Step の最大の特徴は1 つの APIのみを提供していることで、使い方は一目で覚えられます。示例は以下の通り:
var fs = require('fs');
var Step = require('./step.js');
Step(
function readDir() {
fs.readdir(__dirname, this);
},
function readFiles(err, results) {
if (err) throw err;
// Create a new group
var group = this.group();
results.forEach(function(filename) {
if (/\.js$/.test(filename)) {
fs.readFile(__dirname + "/" + filename, 'utf8', group());
}
});
},
function showAll(err, files) {
if (err) throw err;
files.forEach(function(text) {
console.log(text.slice(0, 20));
});
// console.dir(files);
}
);
まず現在のディレクトリを読み取り、その後すべてのファイルを並列で読み取ります。非常に簡潔で、まるで小さな芸術品のようです:
-
Step()は任意の数の関数を受け取り、順次実行を表します -
各関数内の
thisは内蔵コールバック関数を表し、1 つの結果を収集して次のタスクに渡します -
各関数内の
this.group()は結果グループ化を表し、並列リクエストをサポートするために使用され、複数の結果を収集してから次のタスクに渡します
二.実装思路
###1.順次実行
Step() はコンストラクタではなく、単なる通常の関数です。公開されている唯一の API で、順次実行が必要なタスクを登録するために使用されます
Step 内部は传入された各タスクを収集し、順序実行を制御し、this 及び this.group() を通じて結果収集ロジックを注入します。例えば:
Step(function() {
var _callback = this;
setTimeout(function() {
_callback(null, 1);
}, 50);
setTimeout(function() {
_callback(null, 2, 4);
}, 30);
}, function(err, res) {
if (err) throw err;
console.log(res);
});
this(_callback)は結果を収集して次のタスクに渡し、res は 2 を受け取ります。2 つの問題があります:
-
4が失われた -
1が出力されない
4 が失われたのは、Step が各タスクの戻り結果を 1 つの値に限定しているためです。そのため、各タスクの最初のパラメータは例外 err、2 番目は前のタスクの戻り結果 res です
1 が出力されないのは、this() が現在のタスク結果を次のリングに渡すことを表し、現在のタスクの終了を意味するためです。そのため、後からの重複 this() は無効です。さらに言えば、this が複数の結果を収集できないため、this.group() でグループを作成して並列状況を処理する必要があるのです
###2.並列リクエスト
並列リクエストを処理するために、专门 group グループメソッドを追加しました。前の示例を観察:
// Create a new group
var group = this.group();
results.forEach(function(filename) {
if (/\.js$/.test(filename)) {
fs.readFile(__dirname + "/" + filename, 'utf8', group());
}
});
グループを使用して複数の結果を収集するには 3 つのステップがあることがわかります:
-
this.group()で group を初期化し、複数の結果を受け取る準備 -
group()で並列リクエストを登録 -
group()()で結果を収集し、対応する登録項目を削除し、すべての登録項目が完了するのを待ってから結果配列を次のリングに渡す
具体的な実装方法は EventProxy 中の all() に類似し、内蔵カウンターで順序通りに結果を記録し、最後に収集した結果配列を渡します
###3.例外処理
Step の例外処理はあまり美しくありません。示例は以下の通り:
Step(function() {
//...
}, function(err, res) {
if (err) throw err;
//...
}, function(err, res) {
if (err) throw err;
//...
});
第 1 リングを除き、後続のすべてのタスク内で手動で if...throw を配置する必要があります。つまり、例外処理メカニズムは手動で err を後投げすることを要求し、そうでないと例外は最後のコールバック関数(最後のリング)に伝達されません。以下の通り:
Step(function() {
throw new Error('error occurs');
}, function(err) {
// if (err) throw err;
this(null, 'ok');
}, function(err, res) {
if (!err) {
console.log(res); // ok
}
});
第 1 リングの例外は第 2 リングで失われ、第 3 リングは例外を受け取りませんでした。例外を失うことは大したことではありませんが、エラーが報告されない のは少し怖いです。つまり、手動で if...throw を配置するのを忘れた場合、コードは静かにエラーが発生する可能性があり、これは非常に危険です
例外が失われる根本的な原因は、各リングの実行時がすべて try ブロックに包まれており、例外がある場合は次のリングに伝達され、パラメータを通じて伝達された例外を手動で再投げしないと自然に失われるからです
三.ソースコード分析
基本構造は以下の通り:
// 1.定義 Step() 接收一系列任务
// 1.1 分离任务(steps)和回调(最后一环)
// 1.2 初始化各个状态变量
// 2.定義 next() 作為內置的回調函數(供外部使用)
// 2.1 從任務隊列中取出一個並放在 try 塊裡執行
// 同時通過 this 注入 next 回調
// 2.2 收集並傳遞結果/異常,執行下一環
// 3.定義 next.parallel(),僅供內部使用,配合 group
// 3.1 計數器加加
// 3.2 返回負責計數器減減的 callback
// 4.定義 next.group(),供外部使用
// 4.1 初始化計數器及狀態變量
// 4.2 返回負責收集結果的 callback
// 5.定義 Step.fn(),延長管道(支持添加第一環 leading 數據準備和額外最後一環 tailing 收尾)
// 5.1 記錄參數傳入的各個任務
// 5.2 返回新函數用來接收 leading 數據和最後的 tailing
// 然後添上第一環和最後一環,並啟動 Step
// 6.暴露出 API
// 6.1 按照 CommonJS 模塊定義的方式暴露出 Step()
第 1 歩はエントリー関数を定義し、第 2 歩はタスク順序実行サポート及び例外処理を提供し、第 3 と 4 歩は並列リクエストサポートを提供し、第 5 歩はデータインポートサポート及びフィニッシュサポートを提供し、錦上に花を添えます
具体的なソースコード(詳細注釈付き):
/*
Copyright (c) 2011 Tim Caswell <tim @creationix.com>
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
// Inspired by http://github.com/willconant/flow-js, but reimplemented and
// modified to fit my taste and the node.JS error handling system.
// 1.定義 Step() 接收一系列任务
// 1.1 分离任务(steps)和回调(最后一环)
// 1.2 初始化各个状态变量
// 2.定義 next() 作為內置的回調函數(供外部使用)
// 2.1 從任務隊列中取出一個並放在 try 塊裡執行
// 同時通過 this 注入 next 回調
// 2.2 收集並傳遞結果/異常,執行下一環
// 3.定義 next.parallel(),僅供內部使用,配合 group
// 3.1 計數器加加
// 3.2 返回負責計數器減減的 callback
// 4.定義 next.group(),供外部使用
// 4.1 初始化計數器及狀態變量
// 4.2 返回負責收集結果的 callback
// 5.定義 Step.fn(),延長管道(支持添加第一環 leading 數據準備和額外最後一環 tailing 收尾)
// 5.1 記錄參數傳入的各個任務
// 5.2 返回新函數用來接收 leading 數據和最後的 tailing
// 然後添上第一環和最後一環,並啟動 Step
// 6.暴露出 API
// 6.1 按照 CommonJS 模塊定義的方式暴露出 Step()
function Step() {
var steps = Array.prototype.slice.call(arguments),
pending, counter, results, lock;
// steps は呼び出し者が传入した一連の関数、最後はコールバック、それ以前は非同期/同期タスク
// pending は実行待ち項目数
// counter は並列実行の総項目数
// results は収集された並列実行 steps の結果、results[0] は err、その後各 step に対応
// lock は次の step を処理/実行中
// Define the main callback that's given as `this` to the steps.
// 主コールバック関数、すなわち steps 中の this
function next() {
counter = pending = 0;
// Check if there are no steps left
if (steps.length === 0) {
// Throw uncaught errors
if (arguments[0]) {
throw arguments[0];
}
return;
}
// Get the next step to execute
// 先頭から step を 1 つ取得
var fn = steps.shift();
results = [];
// Run the step in a try..catch block so exceptions don't get out of hand.
// steps の実行は try 中に置かれ、未捕獲の例外は next(e) で次の step に伝達
try {
lock = true;
//!!! steps 中、this が next を指す理由
var result = fn.apply(next, arguments);
} catch (e) {
// Pass any exceptions on through the next callback
next(e);
}
if (counter > 0 && pending == 0) {
// If parallel() was called, and all parallel branches executed
// synchronously, go on to the next step immediately.
// steps の並列実行を制御
next.apply(null, results);
} else if (result !== undefined) {
// If a synchronous return is used, pass it to the callback
// 同期 step の戻り値を次の step に伝達
next(undefined, result);
}
lock = false;
}
// Add a special callback generator `this.parallel()` that groups stuff.
next.parallel = function () {
// 毎回呼び出し時にカウンターと実行待ち項目数を更新
var index = 1 + counter++; // 最初の空位を err に残し、結果は results[1] から後方に配置
pending++;
return function () {
// コールバック実行時、実行待ち項目数を更新
pending--;
// Compress the error from any result to the first argument
if (arguments[0]) {
results[0] = arguments[0];
}
// Send the other results as arguments
results[index] = arguments[1];
//!!! 第 3 パラメータ及びそれ以降のすべてのパラメータを破棄
if (!lock && pending === 0) {
// When all parallel branches done, call the callback
// 実行待ち項目がなくなった、次の step を実行
next.apply(null, results);
}
else {
console.warn('pending ' + pending + ' lock ' + lock);///
}
};
};
// Generates a callback generator for grouped results
next.group = function () {
var localCallback = next.parallel();
var counter = 0;
var pending = 0;
var result = [];
var error = undefined;
function check() {
console.log('from ' + arguments[0] + ' pending ' + pending);///
if (pending === 0) {
// When group is done, call the callback
localCallback(error, result);
}
}
//! this.group() 後に group() 呼び出しがないため最後のコールバックをトリガーできない状況を避ける
process.nextTick(check); // Ensures that check is called at least once
// Generates a callback for the group
return function () {
// parallel と類似、カウンターと実行待ち項目数を更新
var index = counter++;
pending++;
return function () {
pending--;
// Compress the error from any result to the first argument
if (arguments[0]) {
error = arguments[0];
}
// Send the other results as arguments
result[index] = arguments[1];
// if (!lock) { check(); }
if (!lock) { check('tail'); }///
};
};
};
// Start the engine an pass nothing to the first step.
// 単に counter と padding を初期化するため
next();
}
// Tack on leading and tailing steps for input and output and return
// the whole thing as a function. Basically turns step calls into function
// factories.
Step.fn = function StepFn() {
var steps = Array.prototype.slice.call(arguments);
// 戻り関数は受け取った最後の関数パラメータを追加の最後のリングとして
// 残りはパラメータとして第 1 リングに伝入
return function () {
var args = Array.prototype.slice.call(arguments);
// Insert a first step that primes the data stream
// データストリーム準備の 1 歩を挿入
var toRun = [function () {
this.apply(null, args);
}].concat(steps);
// If the last arg is a function add it as a last step
// すなわち tailing hook の実装
if (typeof args[args.length-1] === 'function') {
toRun.push(args.pop());
}
Step.apply(null, toRun);
}
}
// Hook into commonJS module systems
if (typeof module !== 'undefined' && "exports" in module) {
module.exports = Step;
}
すべての詳細は注釈中にあり、注意が必要なのは nextTick check の問題で、直感的には間違いがあるように感じられます。github にも友人が同じ疑問 を提起していますが、多次テストで nextTick check は一般的に問題ないことを確認しました(テスト中で二般的状況用例は見つかりませんでした)。しかも該ライブラリは相当長い時間検証を経ていますので、とりあえず安心して使用できます
Step は引き続き発展して「壮大」になることはなく、その小巧精致を維持することを決定しましたが、Step に存在する欠陥について、原作者にはいくつかの新しい改善想法があり、twostep と safereturn を提案しました。興味がある方は観摩研究できます。原作者の戦闘力は非常に高く、観摩は確実に儲かります
P.S. 原作者の非同期制御ライブラリの設計理念は:
The general idea is to not impose structure on the programmer, but rather help them write good callback based functions.
让大家用得轻松,而不是强加一堆结构让人去习惯
四.まとめ
Step はまるで手品のよう、ライブラリはこんな設計も可能なのか?
小巧簡単などの利点はさておき、Step には以下の問題が存在します:
-
thisが内部関数を指すため、各 step 中のネスト関数が不便に使用される(that = thisまたは ES6 アロー関数を使用) -
エラー処理メカニズムは各 step が
if...throwで開始することを要求し、そうでないとエラーは中間 step を通じて最後の callback に伝達されない -
状態破棄メカニズムは、複雑な依存状況に対処できないことを意味する
this の問題は実際には想像ほど深刻ではありません。エラー処理が手動で if...throw を配置することを要求するのは確かに少し辛いですが、状態破棄メカニズムは確かに複雑な状況には適用できません。しかし、大多数のシナリオには十分対応できます
参考資料
- 《深入浅出 NodeJS》
コメントはまだありません