###寫在前面
非同步控制是理解 Node 的深水區,當然,如果只是要搞定非同步編程消除回撥金字塔的話,會用 async 模組就足夠應付一切了
Node 非同步流程控制專題希望貼近各個非同步控制庫(EventProxy、Step、Wind、async 以及 ES6 Promise 和 ES7 async/await),近距離瞭解其實現,而不只是使用它們。其實還有一個目的是:向 Wind 致敬
##零.EventEmitter
Node 的主旋律是非同步與事件,許多核心模組都是基於 events 模組實現的,因而出現了複雜的非同步場景和回撥金字塔。所以,先看根源。示例如下:
var EventEmitter = require('events');
var util = require('util');
function MyEmitter() {
EventEmitter.call(this);
}
// 通過繼承機制建立自定義 EventEmitter
util.inherits(MyEmitter, EventEmitter);
// 建立例項
var myEmitter = new MyEmitter();
// 新增事件監聽器
myEmitter.on('myEvent', function(arg1, arg2) {
console.log('myEvent occurs');
console.log(arg1, arg2, this);
// 1 3 { domain: null,
// _events: { myEvent: [Function] },
// _maxListeners: undefined }
});
// 觸發事件
myEmitter.emit('myEvent', 1, 3);
// 新增一次性事件監聽器
myEmitter.once('onceEvent', function() {
console.log(myEmitter.listenerCount('onceEvent'));
console.log(EventEmitter.defaultMaxListeners);
console.log(MyEmitter.defaultMaxListeners);
console.log(myEmitter.listenerCount('onceEvent'));
console.log(myEmitter.getMaxListeners());
});
// console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent')); // 廢棄
console.log(myEmitter.listenerCount('onceEvent'));
// 新增監聽器,關注監聽被移除的事件(驗證 once)
myEmitter.on('removeListener', function(event, listener) {
console.log(event);
console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));
});
myEmitter.emit('onceEvent');
P.S. 其中 util 模組提供的繼承機制是 [寄生組合式繼承](/articles/重新理解 js 的 6 種繼承方式/)
##一。非同步控制的關鍵
任何非同步控制庫或者方法都要解決 2 個問題:
-
業務支援:處理非同步多級依賴,處理非同步請求同時發出以及回撥順序有依賴的情況
-
異常處理:1.必須執行且只執行一次回撥函式 2.正確回傳異常供呼叫者判斷
在為業務提供支援,儘可能利用併發優勢的同時,提供友好的異常控制方式
此外,併發控制也是需要關注的問題,避免非同步請求因併發而大量堆積,在確保系統穩定的前提下充分利用併發優勢
比如 async 模組大而全,把這 3 個問題都解決了,而 EventProxy 模組解決了前兩個問題,併發控制由 BagPipe 模組負責。Step、Wind 等模組也沒有考慮併發控制的問題
##二。模擬 EventProxy
EventProxy 雖然沒有依賴內建的 events 模組,但其實現確實是基於事件的(事件訂閱/釋出模式)。因此,我們利用內建的 events 模組來模擬實現 EventProxy
P.S. 自定義事件機制不難實現,也不是本文的關注點,感興趣可以檢視 JS 學習筆記 11_高級技巧 8.觀察者模式
###1. 原理
提供一些介面接收外部傳入的非同步任務,內部管理這些任務的執行流程(順序/併發/依賴),並收集結果,最後把結果傳出去。此外,還需要管理任務執行過程中的異常
比如 EventProxy 提供的 all、tail、after 是 3 種任務管理方式,而 fail 和 done 負責處理異常
###2. 結構
首先自定義 EventEmitter,並進行簡單封裝,如下:
var EventEmitter = require('events');
var util = require('util');
function MyEmitter() {
EventEmitter.call(this);
}
util.inherits(MyEmitter, EventEmitter);
// event proxy
var EP = function() {
this.emitter = new MyEmitter();
};
EP.prototype.emit = function(event, data) {
this.emitter.emit(event, event, data);
};
EP.prototype.on = function(event, callback) {
this.emitter.on(event, (ev, data) => {
if (ev === event) {
callback(data);
}
});
};
自定義 EventEmitter 是為了便於擴充套件事件機制(雖然這裡體現不出來),把 emitter 包進 EP 並提供基礎介面 on/emit,事件機制就基本完整了
###3. 核心部分
接下來要提供各種非同步控制方式(任務管理方式),例如,all:
// 所有依賴事件都觸發後,執行回撥
EP.prototype.all = function() {
// 分解引數
var args = args2arr(arguments);
var callback = args.pop(); // 最後一個引數是 callback
var times = args.length; // 其餘引數是事件名(回撥函式的形參名)
var _data = [];
var _callback = after(times, data => {
for (var event of args) {
// 按順序排列實參
_data.push(data[event]);
// 解綁事件
//! this 指向 EP 例項,因為在箭頭函式中
this.emitter.removeListener(event, _callback);
}
callback.apply(null, _data);
});
for (var event of args) {
this.emitter.on(event, _callback);
}
};
其中用到的工具函式為:
// utils
// after 返回的函式在 times 次呼叫後才會真正執行 fn
var after = function(times, fn) {
var data = {};
if (times <= 0) {
return fn();
}
return function(key, val) {
times--;
if (data.hasOwnProperty(key)) {
if (!Array.isArray(data[key])) {
data[key] = [data[key]];
}
data[key].push(val);
}
else {
data[key] = val;
}
if (times === 0) {
return fn(data);
}
};
};
var args2arr = function(args) {
return Array.prototype.slice.call(args);
};
模擬的 all 與 EventProxy 的 all 功能完全一致(不考慮異常的話),用法如下:
var asyncTask = function(name, delay, fn) {
setTimeout(function() {
console.log('get ' + name + ' at ' + new Date().getTime());
if (typeof fn === 'function') {
fn();
}
}, delay);
};
// 利用自定義 EP 實現
var EP = require('./ep.js');
var ep = new EP();
var task = function(res, data) {
data = data || 'this is ' + res;
return function() {
ep.emit(res, data);
};
};
//- all
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
asyncTask('res2', 350, task('res2'));
asyncTask('res3', 280, task('res3'));
// 結束之後再次觸發 res1 事件
asyncTask('res1', 400, task('res1'));
使用的秘密在於 task 中的 ep.emit(res, data),先通過 all 告知 EP 記錄將要執行的非同步任務以及大回撥函式,然後外部每執行完畢一個非同步任務,都通過 ep.emit(res, data) 通知 EP,所有任務執行完畢時,EP 內部呼叫 all 當初記錄的大回撥函式
類似的,可以實現 tail 和 after:
// 與 all 類似,但能更新資料執行後續回撥
EP.prototype.tail = function() {
// 分解引數
var args = args2arr(arguments);
var callback = args.pop(); // 最後一個引數是 callback
var times = args.length; // 其餘引數是事件名(回撥函式的形參名)
var _data = [];
var _callback = after(times, data => {
for (var event of args) {
// 按順序排列實參
_data.push(data[event]);
// 解綁事件
//! this 指向 EP 例項,因為在箭頭函式中
this.emitter.removeListener(event, _callback);
}
callback.apply(null, _data);
// 綁定後續回撥
var tailData = _data.slice();
var tailCallback = function(key, val) {
for (var i = 0; i < args.length; i++) {
if (key === args[i]) {
tailData[i] = val;
break;
}
}
callback.apply(null, tailData);
};
for (var event of args) {
this.emitter.on(event, tailCallback);
}
});
for (var event of args) {
this.emitter.on(event, _callback);
}
};
// 多次呼叫同一介面,最後返回陣列
EP.prototype.after = function(event, times, callback) {
var _callback = after(times, data => {
if (Array.isArray(data[event])) {
// 解綁事件
this.emitter.removeListener(event, _callback);
callback.call(null, data[event].slice());
}
});
this.emitter.on(event, _callback);
};
tail 只在 all 的基礎上做了一點點改動,在每次新資料到來時用新資料執行舊回撥函式
after 非常簡單,只是對 after 工具函式的簡單應用
###4. 異常處理
異常處理規則比較簡單:一旦發生異常,就卸載所有處理函式並呼叫 error 事件監聽器。如下:
// 異常處理
EP.prototype.fail = function(callback) {
this.emitter.on('error', (err) => {
// 卸載所有處理函式
this.emitter.removeAllListeners();
// 執行異常回撥
callback(err);
});
};
EP.prototype.done = function(event) {
return (err, result) => {
if (err) {
// 異常統一交由 error 事件處理
return this.emitter.emit('error', err);
}
this.emitter.emit(event, result);
};
};
done 是之前 task 中 ep.emit(res, data) 的翻版,加入了異常處理
done 其實是 EventProxy 中比較精巧的部分,避免了 if (err) {...} 這樣繁瑣的異常處理操作,簡化了業務程式碼,同時隱藏了異常處理,用起來更清爽也更安全
至此,模擬 EventProxy 結束,它提供的其它非同步控制方式的實現與 all、after 類似,只是更複雜的控制方式可能需要更多的程式碼,如果繼續實現下去的話,最終結果就是類似於 async 模組的非同步控制方式大全,此處不再深究
##三.EventProxy 到底做了什麼
EventProxy 其實沒做什麼,它最大的特點就是侵入性小,像外掛一樣
展開之前使用模擬的 all 的例子,如下:
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
// 展開 asyncTask('res1', 300, task('res1'))
// asyncTask
setTimeout(function() {
console.log('get ' + 'res1' + ' at ' + new Date().getTime());
// task('res1')
var data = 'this is ' + 'res1';
ep.emit('res1', data);
}, 300);
可以看到,使用 EP 的過程是大片業務程式碼中偶爾穿插 1 句 EP 程式碼(ep.all 和 ep.emit),對業務塊本身幾乎沒有影響(只是可能要在業務塊出口插入一條 EP 程式碼),更不用重構業務程式碼去迎合框架,對比 Promise 的話,這一點很明顯
回到問題,EventProxy 到底做了什麼?
EventProxy 簡化了用事件機制管理非同步任務的過程。用內建的 events 模組也很容易實現類似的功能,而且程式碼量不會比 EventProxy 原始碼多很多。對業務程式碼的侵入性很小,因此更像是 util,而不是大隻的框架
##四。總結
EventProxy 是一個精巧的工具庫(侵入性小),提供了基於事件機制的非同步控制方法
對比 async 模組,EventProxy 用起來更麻煩,功能也不夠全面,但其小巧與靈活性是亮點,外掛式的工具,可以隨時選擇用或者不用,而對於稍微「強勢」一點的框架,棄用需要勇氣
對比 Promise,事件訂閱/釋出機制(EventProxy)的缺點是必須預先確定分支,否則事件發生後再指定分支就無效了。Promise 最大的特點是:
-
分離了正向用例和反向用例(
p.then(onFulfilled, onRejected)) -
延遲邏輯處理
延遲邏輯處理,即不用預先指定分支,先執行非同步呼叫,延遲指定分支處理。但 Promise 的缺點是要為不同的場景封裝不同的 API,存在包裝成本
P.S. 關於 Promise 的詳細資訊,請檢視 [完全理解 Promise](/articles/完全理解 promise/)
###參考資料
- 《深入淺出 NodeJS》
暫無評論,快來發表你的看法吧