본문으로 건너뛰기

EventProxy 모방_Node 비동기 플로우 제어 1

무료2016-06-09#Node#EventProxy#EventProxy原理#Node EventProxy#Node异步控制

EventProxy 를 탐구하고, 모방 구현

서두

비동기 제어는 Node 를 이해하는 심수구입니다. 물론, 단순히 비동기 프로그래밍을搞定하고 콜백 피라미드를消除하고 싶기만 하다면, async 모듈을 사용하는 것만으로 모든 것에 대응할 수 있습니다

Node 비동기 플로우 제어專題은 각 비동기 제어 라이브러리 (EventProxy, Step, Wind, asyncES6 PromiseES7 async/await) 에贴近하여, 그 구현을 가까운 거리에서 了解하는 것을 희망합니다. 사용하는 것뿐만 아니라. 실제로 또 하나의 목적은: Wind 에 경의를 표하는 것입니다

영.EventEmitter

Node 의 주선율은 비동기와 이벤트로, 많은 코어 모듈은 events 모듈을 기반으로 구현되어,そのため복잡한 비동기 시나리오와 콜백 피라미드가 나타났습니다. 따라서, 먼저 근원을 봅니다. 예는 다음과 같습니다:

var EventEmitter = require('events');
var util = require('util');

function MyEmitter() {
    EventEmitter.call(this);
}
// 通过继承机制创建自定义 EventEmitter
util.inherits(MyEmitter, EventEmitter);

// 创建实例
var myEmitter = new MyEmitter();
// 添加事件监听器
myEmitter.on('myEvent', function(arg1, arg2) {
    console.log('myEvent occurs');
    console.log(arg1, arg2, this);
    // 1 3 { domain: null,
    //   _events: { myEvent: [Function] },
    //   _maxListeners: undefined }
});
// 触发事件
myEmitter.emit('myEvent', 1, 3);

// 添加一次性事件监听器
myEmitter.once('onceEvent', function() {
    console.log(myEmitter.listenerCount('onceEvent'));
    console.log(EventEmitter.defaultMaxListeners);
    console.log(MyEmitter.defaultMaxListeners);
    console.log(myEmitter.listenerCount('onceEvent'));
    console.log(myEmitter.getMaxListeners());
});
// console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));    // 废弃
console.log(myEmitter.listenerCount('onceEvent'));
// 添加监听器,关注监听被移除的事件(验证 once)
myEmitter.on('removeListener', function(event, listener) {
    console.log(event);
    console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));
});
myEmitter.emit('onceEvent');

P.S. 그 중 util 모듈이 제공하는 상속 메커니즘은 [기생조합식 상속](/articles/重新理解 js 的 6 种继承方式/) 입니다

일.비동기 제어의 열쇠

어떠한 비동기 제어 라이브러리 또는 방법은 2 가지 문제를 해결해야 합니다:

  • 비즈니스 서포트: 비동기 다급 의존을 처리하고, 비동기 요청이 동시에 발생하며, 콜백 순서에 의존이 있는 상황을 처리

  • 예외 처리: 1. 반드시 실행하고且 1 회만 실행하는 콜백 함수 2. 올바르게 예외를 돌려보내 호출자가 판단할 수 있도록

비즈니스에 서포트를 제공하고, 가능한 한 병렬 우위를 이용함과 동시에, 우호적인 예외 제어 방식을 제공

게다가, 병렬 제어도 주목이 필요한 문제로, 비동기 요청이 병렬로 인해 대량으로堆积하는 것을避け, 시스템 안정을 확보하는 전제로 병렬 우위를充分利用

예를 들어 async 모듈은 크고 모두로, 이 3 가지 문제를 모두 해결하고, EventProxy 모듈은 전 2 가지 문제를 해결하며, 병렬 제어는 BagPipe 모듈이 담당합니다. Step, Wind 등의 모듈도 병렬 제어의 문제를 고려하지 않았습니다

이.EventProxy 모방

EventProxy 는 내장의 events 모듈에 의존하지 않지만, 그 구현은 확실히 이벤트에 기반한 것입니다 (이벤트 구독/发布 모드). 따라서, 내장의 events 모듈을 이용하여 EventProxy 를 모방 구현합니다

P.S. 커스텀 이벤트 메커니즘은 구현이 어렵지 않고, 본문의 주목점도 아닙니다. 관심이 있는 경우 JS 학습노트 11_고급기법 8. 관찰자 패턴 참조

###1. 원리

외부에서传入되는 비동기 태스크를 받아내는 몇 가지 인터페이스를 제공하고, 내부에서 이들 태스크의 실행 플로 (순서/병렬/의존) 를 관리하며, 결과를 수집하고, 마지막에 결과를传出합니다. 게다가, 태스크 실행 과정 중의 예외도 관리해야 합니다

예를 들어 EventProxy 가 제공하는 all, tail, after 는 3 종의 태스크 관리 방식이고, faildone 는 예외 처리를 담당

###2. 구조

먼저 커스텀 EventEmitter 를 생성하고, 간단하게 봉쇄합니다. 다음과 같습니다:

var EventEmitter = require('events');
var util = require('util');

function MyEmitter() {
    EventEmitter.call(this);
}
util.inherits(MyEmitter, EventEmitter);

// event proxy
var EP = function() {
    this.emitter = new MyEmitter();
};
EP.prototype.emit = function(event, data) {
    this.emitter.emit(event, event, data);
};
EP.prototype.on = function(event, callback) {
    this.emitter.on(event, (ev, data) => {
        if (ev === event) {
            callback(data);
        }
    });
};

커스텀 EventEmitter 는 이벤트 메커니즘을 확장하기 쉽게 하기 위함입니다 (여기서는体现되지 않지만), emitterEP 에包み基礎인터페이스 on/emit 를 제공하면, 이벤트 메커니즘은 기본적으로 완전합니다

###3. 코어 부분

다음에 각종 비동기 제어 방식 (태스크 관리 방식) 을 제공합니다. 예를 들어, all:

// 所有依赖事件都触发后,执行回调
EP.prototype.all = function() {
    // 分解参数
    var args = args2arr(arguments);
    var callback = args.pop();  // 最后一个参数是 callback
    var times = args.length;    // 其余参数是事件名(回调函数的形参名)

    var _data = [];

    var _callback = after(times, data => {
        for (var event of args) {
            // 按顺序排列实参
            _data.push(data[event]);
            // 解绑事件
            //! this 指向 EP 实例,因为在箭头函数中
            this.emitter.removeListener(event, _callback);
        }
        callback.apply(null, _data);
    });
    for (var event of args) {
        this.emitter.on(event, _callback);
    }
};

그 중에서 사용하는 툴 함수는:

// utils
// after 返回的函数在 times 次调用后才会真正执行 fn
var after = function(times, fn) {
    var data = {};

    if (times <= 0) {
        return fn();
    }
    return function(key, val) {
        times--;
        if (data.hasOwnProperty(key)) {
            if (!Array.isArray(data[key])) {
                data[key] = [data[key]];
            }
            data[key].push(val);
        }
        else {
            data[key] = val;
        }

        if (times === 0) {
            return fn(data);
        }
    };
};
var args2arr = function(args) {
    return Array.prototype.slice.call(args);
};

모방의 allEventProxyall 기능과 완전히 일치합니다 (예외를 고려하지 않는 경우). 용법은 다음과 같습니다:

var asyncTask = function(name, delay, fn) {
    setTimeout(function() {
        console.log('get ' + name + ' at ' + new Date().getTime());
        if (typeof fn === 'function') {
            fn();
        }
    }, delay);
};

// 利用自定义 EP 实现
var EP = require('./ep.js');
var ep = new EP();

var task = function(res, data) {
    data = data || 'this is ' + res;
    return function() {
        ep.emit(res, data);
    };
};

//- all
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
    console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
asyncTask('res2', 350, task('res2'));
asyncTask('res3', 280, task('res3'));
// 结束之后再次触发 res1 事件
asyncTask('res1', 400, task('res1'));

사용의 비밀은 task 중의 ep.emit(res, data) 에 있으며, 먼저 all 을 통해 EP 에 실행 예정인 비동기 태스크와 큰 콜백 함수를 기록하게 하고,然后외부가 1 개의 비동기 태스크를 실행完毕할 때마다, ep.emit(res, data) 를 통해 EP 에 통지하며, 모든 태스크 실행完毕时, EP 내부가 all 이 당초 기록한 큰 콜백 함수를 호출합니다

유사하게, tailafter 를 구현할 수 있습니다:

// 与 all 类似,但能更新数据执行后续回调
EP.prototype.tail = function() {
    // 分解参数
    var args = args2arr(arguments);
    var callback = args.pop();  // 最后一个参数是 callback
    var times = args.length;    // 其余参数是事件名(回调函数的形参名)

    var _data = [];

    var _callback = after(times, data => {
        for (var event of args) {
            // 按顺序排列实参
            _data.push(data[event]);
            // 解绑事件
            //! this 指向 EP 实例,因为在箭头函数中
            this.emitter.removeListener(event, _callback);
        }
        callback.apply(null, _data);
        // 绑定后续回调
        var tailData = _data.slice();
        var tailCallback = function(key, val) {
            for (var i = 0; i < args.length; i++) {
                if (key === args[i]) {
                    tailData[i] = val;
                    break;
                }
            }
            callback.apply(null, tailData);
        };
        for (var event of args) {
            this.emitter.on(event, tailCallback);
        }
    });
    for (var event of args) {
        this.emitter.on(event, _callback);
    }
};
// 多次调用同一接口,最后返回数组
EP.prototype.after = function(event, times, callback) {
    var _callback = after(times, data => {
        if (Array.isArray(data[event])) {
            // 解绑事件
            this.emitter.removeListener(event, _callback);
            callback.call(null, data[event].slice());
        }
    });
    this.emitter.on(event, _callback);
};

tailall 의基礎上에서一点点의改动을 했을 뿐으로, 매번 새 데이터가 도래할 때 새 데이터로 구 콜백 함수를 실행

after 는 매우シンプル하며, 다만 after 툴 함수의シンプルな 응용

###4. 예외 처리

예외 처리 규칙은 비교적シンプル:一旦发生예외, 모든 처리 함수를アンインストール하고 error 이벤트리스너를 호출합니다. 다음과 같습니다:

// 异常处理
EP.prototype.fail = function(callback) {
    this.emitter.on('error', (err) => {
        // 卸载所有处理函数
        this.emitter.removeAllListeners();
        // ���行异常回调
        callback(err);
    });
};
EP.prototype.done = function(event) {
    return (err, result) => {
        if (err) {
            // 异常统一交由 error 事件处理
            return this.emitter.emit('error', err);
        }
        this.emitter.emit(event, result);
    };
};

done 은之前 task 중의 ep.emit(res, data) 의翻版으로, 예외 처리를 추가

done 은 실제 EventProxy 중에서 비교적精巧한 부분으로, if (err) {...} 와 같은 번잡한 예외 처리 조작을避け, 비즈니스 코드를簡素化하고, 동시에 예외 처리를隠蔽하여,使用起来보다清爽하고보다 안전

至此, 모방 EventProxy 종료. 그것이 제공하는其它비동기 제어 방식의 구현은 all, after 와 유사하며, 다만 보다 복잡한 제어 방식은 보다 많은 코드가 필요할 수 있습니다. 만약继续구현下去的话, 최종 결과는 async 모듈에 유사한 비동기 제어 방식 대전이 되며,此处不再深究

삼.EventProxy 는結局무엇을 했는가

EventProxy 는 실제 아무것도 하지 않았습니다. 그 최대의 특징은침입성이 작고, 플러그인처럼이라는 것입니다

之前사용 모방의 all 의 예를 전개합니다. 다음과 같습니다:

ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
    console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));

// 展开 asyncTask('res1', 300, task('res1'))
// asyncTask
setTimeout(function() {
    console.log('get ' + 'res1' + ' at ' + new Date().getTime());

    // task('res1')
    var data = 'this is ' + 'res1';
    ep.emit('res1', data);
}, 300);

볼 수 있듯이, EP 를 사용하는 과정은大片비즈니스 코드 중에偶尔 1 구 EP 코드 (ep.allep.emit) 가穿插될 뿐이며, 비즈니스 블록 자체에는 거의 영향이 없고 (다만 비즈니스 블록出口에 1 条 EP 코드를挿入할 필요가 있을 뿐입니다), 더욱이 비즈니스 코드를 리팩토링하여 프레임워크에迎合할 필요도 없습니다. Promise 와 비교하면, 이 점은 명확합니다

문제로 돌아갑니다. EventProxy 는結局무엇을 했는가?

EventProxy 는 이벤트 메커니즘으로 비동기 태스크를 관리하는 과정을簡素化했습니다. 내장의 events 모듈에서도 유사한 기능을쉽게구현할 수 있고, 코드량도 EventProxy 소스 코드보다많지않습니다. 비즈니스 코드에의침입성이매우작으므로, 보다 util 에닮았으며, 큰 프레임워크가아닙니다

사.정리

EventProxy 는精巧한툴라이브러리 (침입성이작음) 로, 이벤트메커니즘에기반한비동기제어방법을제공

async 모듈과비교하면, EventProxy 는使用起来보다번거롭고, 기능도충분히포괄적이지않지만, 그小巧와유연성은亮点로, 플러그인식의툴로, 언제든지사용할지사용하지않을지선택할수있고, 조금「강세」한프레임워크에대해서는, 棄用에는용기가필요

Promise 와비교하면, 이벤트구독/发布메커니즘 (EventProxy) 의단점은반드시사전에도브런치를확정하는것으로,否则이벤트발생후에도브런치를지정해도무효입니다. Promise 의최대의특징은:

  • 정향用例와반향用例를분리 (p.then(onFulfilled, onRejected))

  • 로직처리를지연

로직처리를지연, 즉사전에브런치를지정할필요가없고, 먼저비동기호출을실행하고, 지연하여브런치처리를지정. 그러나 Promise 의단점은다른시나리오를위해다른 API 를封鎖할필요가있고, 포장비용이존재하는것

P.S. Promise 에관한상세정보는, [완전이해 Promise](/articles/완전이해 promise/) 참조

참고자료

  • 《얕고깊게 NodeJS》

댓글

아직 댓글이 없습니다

댓글 작성