跳到主要內容
黯羽輕揚每天積累一點點

模擬 EventProxy_Node 非同步流程控制 1

免費2016-06-09#Node#EventProxy#EventProxy原理#Node EventProxy#Node异步控制

探究 EventProxy,並模擬實現

###寫在前面

非同步控制是理解 Node 的深水區,當然,如果只是要搞定非同步編程消除回撥金字塔的話,會用 async 模組就足夠應付一切了

Node 非同步流程控制專題希望貼近各個非同步控制庫(EventProxyStepWindasync 以及 ES6 PromiseES7 async/await),近距離瞭解其實現,而不只是使用它們。其實還有一個目的是:向 Wind 致敬

##零.EventEmitter

Node 的主旋律是非同步與事件,許多核心模組都是基於 events 模組實現的,因而出現了複雜的非同步場景和回撥金字塔。所以,先看根源。示例如下:

var EventEmitter = require('events');
var util = require('util');

function MyEmitter() {
    EventEmitter.call(this);
}
// 通過繼承機制建立自定義 EventEmitter
util.inherits(MyEmitter, EventEmitter);

// 建立例項
var myEmitter = new MyEmitter();
// 新增事件監聽器
myEmitter.on('myEvent', function(arg1, arg2) {
    console.log('myEvent occurs');
    console.log(arg1, arg2, this);
    // 1 3 { domain: null,
    //   _events: { myEvent: [Function] },
    //   _maxListeners: undefined }
});
// 觸發事件
myEmitter.emit('myEvent', 1, 3);

// 新增一次性事件監聽器
myEmitter.once('onceEvent', function() {
    console.log(myEmitter.listenerCount('onceEvent'));
    console.log(EventEmitter.defaultMaxListeners);
    console.log(MyEmitter.defaultMaxListeners);
    console.log(myEmitter.listenerCount('onceEvent'));
    console.log(myEmitter.getMaxListeners());
});
// console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));    // 廢棄
console.log(myEmitter.listenerCount('onceEvent'));
// 新增監聽器,關注監聽被移除的事件(驗證 once)
myEmitter.on('removeListener', function(event, listener) {
    console.log(event);
    console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));
});
myEmitter.emit('onceEvent');

P.S. 其中 util 模組提供的繼承機制是 [寄生組合式繼承](/articles/重新理解 js 的 6 種繼承方式/)

##一。非同步控制的關鍵

任何非同步控制庫或者方法都要解決 2 個問題:

  • 業務支援:處理非同步多級依賴,處理非同步請求同時發出以及回撥順序有依賴的情況

  • 異常處理:1.必須執行且只執行一次回撥函式 2.正確回傳異常供呼叫者判斷

在為業務提供支援,儘可能利用併發優勢的同時,提供友好的異常控制方式

此外,併發控制也是需要關注的問題,避免非同步請求因併發而大量堆積,在確保系統穩定的前提下充分利用併發優勢

比如 async 模組大而全,把這 3 個問題都解決了,而 EventProxy 模組解決了前兩個問題,併發控制由 BagPipe 模組負責。StepWind 等模組也沒有考慮併發控制的問題

##二。模擬 EventProxy

EventProxy 雖然沒有依賴內建的 events 模組,但其實現確實是基於事件的(事件訂閱/釋出模式)。因此,我們利用內建的 events 模組來模擬實現 EventProxy

P.S. 自定義事件機制不難實現,也不是本文的關注點,感興趣可以檢視 JS 學習筆記 11_高級技巧 8.觀察者模式

###1. 原理

提供一些介面接收外部傳入的非同步任務,內部管理這些任務的執行流程(順序/併發/依賴),並收集結果,最後把結果傳出去。此外,還需要管理任務執行過程中的異常

比如 EventProxy 提供的 alltailafter 是 3 種任務管理方式,而 faildone 負責處理異常

###2. 結構

首先自定義 EventEmitter,並進行簡單封裝,如下:

var EventEmitter = require('events');
var util = require('util');

function MyEmitter() {
    EventEmitter.call(this);
}
util.inherits(MyEmitter, EventEmitter);

// event proxy
var EP = function() {
    this.emitter = new MyEmitter();
};
EP.prototype.emit = function(event, data) {
    this.emitter.emit(event, event, data);
};
EP.prototype.on = function(event, callback) {
    this.emitter.on(event, (ev, data) => {
        if (ev === event) {
            callback(data);
        }
    });
};

自定義 EventEmitter 是為了便於擴充套件事件機制(雖然這裡體現不出來),把 emitter 包進 EP 並提供基礎介面 on/emit,事件機制就基本完整了

###3. 核心部分

接下來要提供各種非同步控制方式(任務管理方式),例如,all

// 所有依賴事件都觸發後,執行回撥
EP.prototype.all = function() {
    // 分解引數
    var args = args2arr(arguments);
    var callback = args.pop();  // 最後一個引數是 callback
    var times = args.length;    // 其餘引數是事件名(回撥函式的形參名)

    var _data = [];

    var _callback = after(times, data => {
        for (var event of args) {
            // 按順序排列實參
            _data.push(data[event]);
            // 解綁事件
            //! this 指向 EP 例項,因為在箭頭函式中
            this.emitter.removeListener(event, _callback);
        }
        callback.apply(null, _data);
    });
    for (var event of args) {
        this.emitter.on(event, _callback);
    }
};

其中用到的工具函式為:

// utils
// after 返回的函式在 times 次呼叫後才會真正執行 fn
var after = function(times, fn) {
    var data = {};

    if (times <= 0) {
        return fn();
    }
    return function(key, val) {
        times--;
        if (data.hasOwnProperty(key)) {
            if (!Array.isArray(data[key])) {
                data[key] = [data[key]];
            }
            data[key].push(val);
        }
        else {
            data[key] = val;
        }

        if (times === 0) {
            return fn(data);
        }
    };
};
var args2arr = function(args) {
    return Array.prototype.slice.call(args);
};

模擬的 allEventProxyall 功能完全一致(不考慮異常的話),用法如下:

var asyncTask = function(name, delay, fn) {
    setTimeout(function() {
        console.log('get ' + name + ' at ' + new Date().getTime());
        if (typeof fn === 'function') {
            fn();
        }
    }, delay);
};

// 利用自定義 EP 實現
var EP = require('./ep.js');
var ep = new EP();

var task = function(res, data) {
    data = data || 'this is ' + res;
    return function() {
        ep.emit(res, data);
    };
};

//- all
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
    console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
asyncTask('res2', 350, task('res2'));
asyncTask('res3', 280, task('res3'));
// 結束之後再次觸發 res1 事件
asyncTask('res1', 400, task('res1'));

使用的秘密在於 task 中的 ep.emit(res, data),先通過 all 告知 EP 記錄將要執行的非同步任務以及大回撥函式,然後外部每執行完畢一個非同步任務,都通過 ep.emit(res, data) 通知 EP,所有任務執行完畢時,EP 內部呼叫 all 當初記錄的大回撥函式

類似的,可以實現 tailafter

// 與 all 類似,但能更新資料執行後續回撥
EP.prototype.tail = function() {
    // 分解引數
    var args = args2arr(arguments);
    var callback = args.pop();  // 最後一個引數是 callback
    var times = args.length;    // 其餘引數是事件名(回撥函式的形參名)

    var _data = [];

    var _callback = after(times, data => {
        for (var event of args) {
            // 按順序排列實參
            _data.push(data[event]);
            // 解綁事件
            //! this 指向 EP 例項,因為在箭頭函式中
            this.emitter.removeListener(event, _callback);
        }
        callback.apply(null, _data);
        // 綁定後續回撥
        var tailData = _data.slice();
        var tailCallback = function(key, val) {
            for (var i = 0; i < args.length; i++) {
                if (key === args[i]) {
                    tailData[i] = val;
                    break;
                }
            }
            callback.apply(null, tailData);
        };
        for (var event of args) {
            this.emitter.on(event, tailCallback);
        }
    });
    for (var event of args) {
        this.emitter.on(event, _callback);
    }
};
// 多次呼叫同一介面,最後返回陣列
EP.prototype.after = function(event, times, callback) {
    var _callback = after(times, data => {
        if (Array.isArray(data[event])) {
            // 解綁事件
            this.emitter.removeListener(event, _callback);
            callback.call(null, data[event].slice());
        }
    });
    this.emitter.on(event, _callback);
};

tail 只在 all 的基礎上做了一點點改動,在每次新資料到來時用新資料執行舊回撥函式

after 非常簡單,只是對 after 工具函式的簡單應用

###4. 異常處理

異常處理規則比較簡單:一旦發生異常,就卸載所有處理函式並呼叫 error 事件監聽器。如下:

// 異常處理
EP.prototype.fail = function(callback) {
    this.emitter.on('error', (err) => {
        // 卸載所有處理函式
        this.emitter.removeAllListeners();
        // 執行異常回撥
        callback(err);
    });
};
EP.prototype.done = function(event) {
    return (err, result) => {
        if (err) {
            // 異常統一交由 error 事件處理
            return this.emitter.emit('error', err);
        }
        this.emitter.emit(event, result);
    };
};

done 是之前 taskep.emit(res, data) 的翻版,加入了異常處理

done 其實是 EventProxy 中比較精巧的部分,避免了 if (err) {...} 這樣繁瑣的異常處理操作,簡化了業務程式碼,同時隱藏了異常處理,用起來更清爽也更安全

至此,模擬 EventProxy 結束,它提供的其它非同步控制方式的實現與 allafter 類似,只是更複雜的控制方式可能需要更多的程式碼,如果繼續實現下去的話,最終結果就是類似於 async 模組的非同步控制方式大全,此處不再深究

##三.EventProxy 到底做了什麼

EventProxy 其實沒做什麼,它最大的特點就是侵入性小,像外掛一樣

展開之前使用模擬的 all 的例子,如下:

ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
    console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));

// 展開 asyncTask('res1', 300, task('res1'))
// asyncTask
setTimeout(function() {
    console.log('get ' + 'res1' + ' at ' + new Date().getTime());

    // task('res1')
    var data = 'this is ' + 'res1';
    ep.emit('res1', data);
}, 300);

可以看到,使用 EP 的過程是大片業務程式碼中偶爾穿插 1 句 EP 程式碼(ep.allep.emit),對業務塊本身幾乎沒有影響(只是可能要在業務塊出口插入一條 EP 程式碼),更不用重構業務程式碼去迎合框架,對比 Promise 的話,這一點很明顯

回到問題,EventProxy 到底做了什麼?

EventProxy 簡化了用事件機制管理非同步任務的過程。用內建的 events 模組也很容易實現類似的功能,而且程式碼量不會比 EventProxy 原始碼多很多。對業務程式碼的侵入性很小,因此更像是 util,而不是大隻的框架

##四。總結

EventProxy 是一個精巧的工具庫(侵入性小),提供了基於事件機制的非同步控制方法

對比 async 模組,EventProxy 用起來更麻煩,功能也不夠全面,但其小巧與靈活性是亮點,外掛式的工具,可以隨時選擇用或者不用,而對於稍微「強勢」一點的框架,棄用需要勇氣

對比 Promise,事件訂閱/釋出機制(EventProxy)的缺點是必須預先確定分支,否則事件發生後再指定分支就無效了。Promise 最大的特點是:

  • 分離了正向用例和反向用例(p.then(onFulfilled, onRejected)

  • 延遲邏輯處理

延遲邏輯處理,即不用預先指定分支,先執行非同步呼叫,延遲指定分支處理。但 Promise 的缺點是要為不同的場景封裝不同的 API,存在包裝成本

P.S. 關於 Promise 的詳細資訊,請檢視 [完全理解 Promise](/articles/完全理解 promise/)

###參考資料

  • 《深入淺出 NodeJS》

評論

暫無評論,快來發表你的看法吧

提交評論