はじめに
非同期制御は Node を理解する深水区です。もちろん、単に非同期プログラミングを搞定してコールバックピラミッドを消除したいだけなら、async モジュールを使うだけで全てに対応できます
Node 非同期フロー制御專題は各非同期制御ライブラリ(EventProxy、Step、Wind、async および ES6 Promise と ES7 async/await)に贴近し、その実装を近距离で了解することを希望しています。使用だけでなく。実際にもう 1 つの目的は:Wind に敬意を表することです
零.EventEmitter
Node の主旋律は非同期とイベントで、多くのコアモジュールは events モジュールを基に実装されており、そのため複雑な非同期シナリオとコールバックピラミッドが現れました。したがって、まず根源を見ます。例は以下の通り:
var EventEmitter = require('events');
var util = require('util');
function MyEmitter() {
EventEmitter.call(this);
}
// 通过继承机制创建自定义 EventEmitter
util.inherits(MyEmitter, EventEmitter);
// 创建实例
var myEmitter = new MyEmitter();
// 添加事件监听器
myEmitter.on('myEvent', function(arg1, arg2) {
console.log('myEvent occurs');
console.log(arg1, arg2, this);
// 1 3 { domain: null,
// _events: { myEvent: [Function] },
// _maxListeners: undefined }
});
// 触发事件
myEmitter.emit('myEvent', 1, 3);
// 添加一次性事件监听器
myEmitter.once('onceEvent', function() {
console.log(myEmitter.listenerCount('onceEvent'));
console.log(EventEmitter.defaultMaxListeners);
console.log(MyEmitter.defaultMaxListeners);
console.log(myEmitter.listenerCount('onceEvent'));
console.log(myEmitter.getMaxListeners());
});
// console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent')); // 废弃
console.log(myEmitter.listenerCount('onceEvent'));
// 添加监听器,关注监听被移除的事件(验证 once)
myEmitter.on('removeListener', function(event, listener) {
console.log(event);
console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));
});
myEmitter.emit('onceEvent');
P.S. その中 util モジュールが提供する継承メカニズムは [寄生組合せ式継承](/articles/重新理解 js 的 6 种继承方式/) です
一.非同期制御の鍵
あらゆる非同期制御ライブラリまたは方法は 2 つの問題を解決する必要があります:
-
ビジネスサポート:非同期多級依存を処理し、非同期リクエストが同時に発生し、コールバック順序に依存がある状況を処理
-
例外処理:1. 必ず実行し且つ 1 回のみ実行するコールバック関数 2. 正しく例外を戻して呼び出し者が判断できるように
ビジネスにサポートを提供し、可能な限り並列優位を利用すると同時に、友好的な例外制御方式を提供
さらに、並列制御も注目が必要な問題で、非同期リクエストが並列により大量に堆积するのを避け、システム安定を確保する前提で並列優位を充分利用
例えば async モジュールは大きく全てで、この 3 つの問題をすべて解決し、EventProxy モジュールは前 2 つの問題を解決し、並列制御は BagPipe モジュールが担当します。Step、Wind などのモジュールも並列制御の問題を考慮していません
二.EventProxy を模倣
EventProxy は内蔵の events モジュールに依存していませんが、その実装は確かにイベントに基づくものです(イベント購読/发布モード)。したがって、内蔵の events モジュールを利用して EventProxy を模倣実装します
P.S. カスタムイベントメカニズムは実装が難しくなく、本文の注目点でもありません。興味がある場合は JS 学習ノート 11_高度テクニック 8. 観察者パターン を参照
###1. 原理
外部から传入される非同期タスクを受け取るいくつかのインターフェースを提供し、内部でこれらのタスクの実行フロー(順序/並列/依存)を管理し、結果を収集し、最後に結果を传出します。さらに、タスク実行過程中の例外も管理する必要があります
例えば EventProxy が提供する all、tail、after は 3 種のタスク管理方式で、fail と done は例外処理を担当
###2. 構造
まずカスタム EventEmitter を作成し、簡単に封鎖します。以下の通り:
var EventEmitter = require('events');
var util = require('util');
function MyEmitter() {
EventEmitter.call(this);
}
util.inherits(MyEmitter, EventEmitter);
// event proxy
var EP = function() {
this.emitter = new MyEmitter();
};
EP.prototype.emit = function(event, data) {
this.emitter.emit(event, event, data);
};
EP.prototype.on = function(event, callback) {
this.emitter.on(event, (ev, data) => {
if (ev === event) {
callback(data);
}
});
};
カスタム EventEmitter はイベントメカニズムを拡張しやすくするためです(ここでは体现されませんが)、emitter を EP に包み込み基礎インターフェース on/emit を提供し、イベントメカニズムは基本的に完全です
###3. コア部分
次に各種非同期制御方式(タスク管理方式)を提供します。例えば、all:
// 所有依赖事件都触发后,执行回调
EP.prototype.all = function() {
// 分解参数
var args = args2arr(arguments);
var callback = args.pop(); // 最后一个参数是 callback
var times = args.length; // 其余参数是事件名(回调函数的形参名)
var _data = [];
var _callback = after(times, data => {
for (var event of args) {
// 按顺序排列实参
_data.push(data[event]);
// 解绑事件
//! this 指向 EP 实例,因为在箭头函数中
this.emitter.removeListener(event, _callback);
}
callback.apply(null, _data);
});
for (var event of args) {
this.emitter.on(event, _callback);
}
};
その中で使用するツール関数は:
// utils
// after 返回的函数在 times 次调用后才会真正执行 fn
var after = function(times, fn) {
var data = {};
if (times <= 0) {
return fn();
}
return function(key, val) {
times--;
if (data.hasOwnProperty(key)) {
if (!Array.isArray(data[key])) {
data[key] = [data[key]];
}
data[key].push(val);
}
else {
data[key] = val;
}
if (times === 0) {
return fn(data);
}
};
};
var args2arr = function(args) {
return Array.prototype.slice.call(args);
};
模倣の all は EventProxy の all 機能と完全に一致します(例外を考慮しない場合)。用法は以下の通り:
var asyncTask = function(name, delay, fn) {
setTimeout(function() {
console.log('get ' + name + ' at ' + new Date().getTime());
if (typeof fn === 'function') {
fn();
}
}, delay);
};
// 利用自定义 EP 实现
var EP = require('./ep.js');
var ep = new EP();
var task = function(res, data) {
data = data || 'this is ' + res;
return function() {
ep.emit(res, data);
};
};
//- all
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
asyncTask('res2', 350, task('res2'));
asyncTask('res3', 280, task('res3'));
// 结束之后再次触发 res1 事件
asyncTask('res1', 400, task('res1'));
使用の秘密は task 中の ep.emit(res, data) にあり、まず all を通じて EP に実行予定の非同期タスクと大コールバック関数を記録させ、然后外部が 1 つの非同期タスクを実行完毕するごとに、ep.emit(res, data) を通じて EP に通知し、すべてのタスク実行完毕时、EP 内部が all が当初記録した大コールバック関数を呼び出します
類似して、tail と after を実装できます:
// 与 all 类似,但能更新数据执行后续回调
EP.prototype.tail = function() {
// 分解参数
var args = args2arr(arguments);
var callback = args.pop(); // 最后一个参数是 callback
var times = args.length; // 其余参数是事件名(回调函数的形参名)
var _data = [];
var _callback = after(times, data => {
for (var event of args) {
// 按顺序排列实参
_data.push(data[event]);
// 解绑事件
//! this 指向 EP 实例,因为在箭头函数中
this.emitter.removeListener(event, _callback);
}
callback.apply(null, _data);
// 绑定后续回调
var tailData = _data.slice();
var tailCallback = function(key, val) {
for (var i = 0; i < args.length; i++) {
if (key === args[i]) {
tailData[i] = val;
break;
}
}
callback.apply(null, tailData);
};
for (var event of args) {
this.emitter.on(event, tailCallback);
}
});
for (var event of args) {
this.emitter.on(event, _callback);
}
};
// 多次调用同一接口,最后返回数组
EP.prototype.after = function(event, times, callback) {
var _callback = after(times, data => {
if (Array.isArray(data[event])) {
// 解绑事件
this.emitter.removeListener(event, _callback);
callback.call(null, data[event].slice());
}
});
this.emitter.on(event, _callback);
};
tail は all の基礎上で一点点の改动をしただけで、毎回新データが到来する時に新データで旧コールバック関数を実行
after は非常にシンプルで、ただ after ツール関数のシンプルな応用
###4. 例外処理
例外処理規則は比較的シンプル:一旦发生例外、すべての処理関数をアンインストールし error イベントリスナーを呼び出します。以下の通り:
// 异常处理
EP.prototype.fail = function(callback) {
this.emitter.on('error', (err) => {
// 卸载所有处理函数
this.emitter.removeAllListeners();
// 执行异常回调
callback(err);
});
};
EP.prototype.done = function(event) {
return (err, result) => {
if (err) {
// 异常统一交由 error 事件处理
return this.emitter.emit('error', err);
}
this.emitter.emit(event, result);
};
};
done は之前 task 中の ep.emit(res, data) の翻版で、例外処理を追加
done は実際 EventProxy 中で比較精巧な部分で、if (err) {...} のような煩雑な例外処理操作を避け、ビジネスコードを簡素化し、同時に例外処理を隠蔽し、使用起来より清爽でより安全
至此、模倣 EventProxy 終了。それが提供する其它非同期制御方式の実装は all、after と類似で、ただより複雑な制御方式はより多くのコードが必要かもしれません。もし继续実装下去的话、最終結果は async モジュールに類似の非同期制御方式大全になり、此处不再深究
三.EventProxy は結局何をしたのか
EventProxy は実際何もしていません。その最大の特徴は侵入性が小さく、プラグインのようなことです
之前使用模倣の all の例を展開します。以下の通り:
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
// 展开 asyncTask('res1', 300, task('res1'))
// asyncTask
setTimeout(function() {
console.log('get ' + 'res1' + ' at ' + new Date().getTime());
// task('res1')
var data = 'this is ' + 'res1';
ep.emit('res1', data);
}, 300);
可以看到、EP を使用する過程は大片ビジネスコード中に偶尔 1 句 EP コード(ep.all と ep.emit)が穿插するだけで、ビジネスブロック自体にはほとんど影響がなく(ただビジネスブロック出口に 1 条 EP コードを挿入する必要があるかもしれません)、さらにビジネスコードをリファクタリングしてフレームワークに迎合する必要はありません。Promise と比較すると、この点は明らかです
問題に戻ります。EventProxy は結局何をしたのか?
EventProxy はイベントメカニズムで非同期タスクを管理する過程を簡素化しました。内蔵の events モジュールでも類似の機能を簡単に実装でき、コード量も EventProxy ソースコードより多くありません。ビジネスコードへの侵入性が非常に小さいため、より util に似ており、大きなフレームワークではありません
四. まとめ
EventProxy は精巧なツールライブラリ(侵入性が小さい)で、イベントメカニズムに基づく非同期制御方法を提供
async モジュールと比較すると、EventProxy は使用起来より面倒で、機能も十分に包括的ではありませんが、その小巧と柔軟性は亮点で、プラグイン式のツールで、いつでも使用するか使用しないかを選択でき、少し「強勢」なフレームワークに対して、棄用には勇気が必要
Promise と比較すると、イベント購読/发布メカニズム(EventProxy)の欠点は必ず事前にブランチを確定することで、否则イベント発生後にブランチを指定しても無効です。Promise の最大の特徴は:
-
正向用例と反向用例を分離(
p.then(onFulfilled, onRejected)) -
ロジック処理を遅延
ロジック処理を遅延、つまり事前にブランチを指定する必要がなく、まず非同期呼び出しを実行し、遅延してブランチ処理を指定。しかし Promise の欠点は異なるシナリオのために異なる API を封鎖する必要があり、包装コストが存在すること
P.S. Promise に関する詳細情報は、[完全理解 Promise](/articles/完全理解 promise/) を参照
参考資料
- 《深入浅出 NodeJS》
コメントはまだありません