Skip to main content

Simulating EventProxy_Node Asynchronous Flow Control 1

Free2016-06-09#Node#EventProxy#EventProxy原理#Node EventProxy#Node异步控制

Explore EventProxy, and simulate implementation

Preface

Asynchronous control is the deep water area of understanding Node, of course, if just want to handle asynchronous programming and eliminate callback pyramid, using async module is enough to handle everything

Node asynchronous flow control topic hopes to be close to various asynchronous control libraries (EventProxy, Step, Wind, async and ES6 Promise and ES7 async/await), closely understand their implementation, not just use them. Actually there's another purpose: pay tribute to Wind

Zero. EventEmitter

Node's main theme is asynchronous and events, many core modules are implemented based on events module, thus complex asynchronous scenarios and callback pyramids appear. So, first look at the root. Example as follows:

var EventEmitter = require('events');
var util = require('util');

function MyEmitter() {
    EventEmitter.call(this);
}
// Create custom EventEmitter through inheritance mechanism
util.inherits(MyEmitter, EventEmitter);

// Create instance
var myEmitter = new MyEmitter();
// Add event listener
myEmitter.on('myEvent', function(arg1, arg2) {
    console.log('myEvent occurs');
    console.log(arg1, arg2, this);
    // 1 3 { domain: null,
    //   _events: { myEvent: [Function] },
    //   _maxListeners: undefined }
});
// Trigger event
myEmitter.emit('myEvent', 1, 3);

// Add one-time event listener
myEmitter.once('onceEvent', function() {
    console.log(myEmitter.listenerCount('onceEvent'));
    console.log(EventEmitter.defaultMaxListeners);
    console.log(MyEmitter.defaultMaxListeners);
    console.log(myEmitter.listenerCount('onceEvent'));
    console.log(myEmitter.getMaxListeners());
});
// console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));    // Deprecated
console.log(myEmitter.listenerCount('onceEvent'));
// Add listener, pay attention to listener being removed event (verify once)
myEmitter.on('removeListener', function(event, listener) {
    console.log(event);
    console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));
});
myEmitter.emit('onceEvent');

P.S. Among them inheritance mechanism provided by util module is [parasitic combination inheritance](/articles/重新理解 js 的 6 种继承方式/)

I. Key to Asynchronous Control

Any asynchronous control library or method must solve 2 problems:

  • Business support: Handle asynchronous multi-level dependencies, handle asynchronous requests issued simultaneously and callback order has dependencies

  • Exception handling: 1. Must execute callback function and only execute once 2. Correctly pass back exception for caller to judge

While providing support for business, make full use of concurrency advantages as much as possible, provide friendly exception control method

Additionally, concurrency control is also a problem needing attention, avoid asynchronous requests accumulating in large quantities due to concurrency, fully utilize concurrency advantages under premise of ensuring system stability

For example async module is large and comprehensive, solves all 3 problems, while EventProxy module solves first two problems, concurrency control is handled by BagPipe module. Step, Wind and other modules also didn't consider concurrency control problem

II. Simulating EventProxy

EventProxy although doesn't depend on built-in events module, but its implementation is indeed based on events (event subscription/publish pattern). Therefore, we use built-in events module to simulate implementation of EventProxy

P.S. Custom event mechanism is not difficult to implement, also not the focus of this article, interested can check JS Learning Notes 11_Advanced Techniques 8. Observer Pattern

1. Principle

Provide some interfaces to receive asynchronous tasks passed in from external, internally manage execution flow of these tasks (sequence/concurrency/dependency), and collect results, finally pass results out. Additionally, also need to manage exceptions during task execution process

For example EventProxy provides all, tail, after are 3 task management methods, while fail and done are responsible for handling exceptions

2. Structure

First customize EventEmitter, and perform simple encapsulation, as follows:

var EventEmitter = require('events');
var util = require('util');

function MyEmitter() {
    EventEmitter.call(this);
}
util.inherits(MyEmitter, EventEmitter);

// event proxy
var EP = function() {
    this.emitter = new MyEmitter();
};
EP.prototype.emit = function(event, data) {
    this.emitter.emit(event, event, data);
};
EP.prototype.on = function(event, callback) {
    this.emitter.on(event, (ev, data) => {
        if (ev === event) {
            callback(data);
        }
    });
};

Customize EventEmitter is for ease of extending event mechanism (although can't reflect here), wrap emitter into EP and provide basic interface on/emit, event mechanism is basically complete

3. Core Part

Next need to provide various asynchronous control methods (task management methods), for example, all:

// Execute callback after all dependent events are triggered
EP.prototype.all = function() {
    // Decompose parameters
    var args = args2arr(arguments);
    var callback = args.pop();  // Last parameter is callback
    var times = args.length;    // Rest parameters are event names (callback function parameter names)

    var _data = [];

    var _callback = after(times, data => {
        for (var event of args) {
            // Arrange actual parameters in order
            _data.push(data[event]);
            // Unbind event
            //! this points to EP instance, because in arrow function
            this.emitter.removeListener(event, _callback);
        }
        callback.apply(null, _data);
    });
    for (var event of args) {
        this.emitter.on(event, _callback);
    }
};

Utility functions used among them are:

// utils
// Function returned by after will only truly execute fn after times calls
var after = function(times, fn) {
    var data = {};

    if (times <= 0) {
        return fn();
    }
    return function(key, val) {
        times--;
        if (data.hasOwnProperty(key)) {
            if (!Array.isArray(data[key])) {
                data[key] = [data[key]];
            }
            data[key].push(val);
        }
        else {
            data[key] = val;
        }

        if (times === 0) {
            return fn(data);
        }
    };
};
var args2arr = function(args) {
    return Array.prototype.slice.call(args);
};

Simulated all and EventProxy's all function completely consistent (not considering exceptions), usage as follows:

var asyncTask = function(name, delay, fn) {
    setTimeout(function() {
        console.log('get ' + name + ' at ' + new Date().getTime());
        if (typeof fn === 'function') {
            fn();
        }
    }, delay);
};

// Use custom EP to implement
var EP = require('./ep.js');
var ep = new EP();

var task = function(res, data) {
    data = data || 'this is ' + res;
    return function() {
        ep.emit(res, data);
    };
};

//- all
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
    console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
asyncTask('res2', 350, task('res2'));
asyncTask('res3', 280, task('res3'));
// Trigger res1 event again after finish
asyncTask('res1', 400, task('res1'));

Secret of use lies in ep.emit(res, data) in task, first inform EP through all to record asynchronous tasks to be executed and big callback function, then externally every time an asynchronous task completes, notify EP through ep.emit(res, data), when all tasks complete, EP internally calls big callback function recorded by all initially

Similarly, can implement tail and after:

// Similar to all, but can update data to execute subsequent callbacks
EP.prototype.tail = function() {
    // Decompose parameters
    var args = args2arr(arguments);
    var callback = args.pop();  // Last parameter is callback
    var times = args.length;    // Rest parameters are event names (callback function parameter names)

    var _data = [];

    var _callback = after(times, data => {
        for (var event of args) {
            // Arrange actual parameters in order
            _data.push(data[event]);
            // Unbind event
            //! this points to EP instance, because in arrow function
            this.emitter.removeListener(event, _callback);
        }
        callback.apply(null, _data);
        // Bind subsequent callbacks
        var tailData = _data.slice();
        var tailCallback = function(key, val) {
            for (var i = 0; i < args.length; i++) {
                if (key === args[i]) {
                    tailData[i] = val;
                    break;
                }
            }
            callback.apply(null, tailData);
        };
        for (var event of args) {
            this.emitter.on(event, tailCallback);
        }
    });
    for (var event of args) {
        this.emitter.on(event, _callback);
    }
};
// Call same interface multiple times, finally return array
EP.prototype.after = function(event, times, callback) {
    var _callback = after(times, data => {
        if (Array.isArray(data[event])) {
            // Unbind event
            this.emitter.removeListener(event, _callback);
            callback.call(null, data[event].slice());
        }
    });
    this.emitter.on(event, _callback);
};

tail only made a little change on basis of all, execute old callback function with new data every time new data arrives

after is very simple, just simple application of after utility function

4. Exception Handling

Exception handling rules are relatively simple: once exception occurs, uninstall all processing functions and call error event listener. As follows:

// Exception handling
EP.prototype.fail = function(callback) {
    this.emitter.on('error', (err) => {
        // Uninstall all processing functions
        this.emitter.removeAllListeners();
        // Execute exception callback
        callback(err);
    });
};
EP.prototype.done = function(event) {
    return (err, result) => {
        if (err) {
            // Exceptions uniformly handled by error event
            return this.emitter.emit('error', err);
        }
        this.emitter.emit(event, result);
    };
};

done is previous ep.emit(res, data) in task remake, added exception handling

done is actually relatively clever part in EventProxy, avoids tedious exception handling operations like if (err) {...}, simplifies business code, meanwhile hides exception handling, use is cleaner and safer

At this point, simulating EventProxy ends, its provided other asynchronous control method implementations are similar to all, after, just more complex control methods may need more code, if continue to implement, final result would be similar to async module asynchronous control method collection, no longer delve deeper here

III. What EventProxy Actually Does

EventProxy actually didn't do much, its biggest feature is small invasiveness, like a plugin

Expand previous example using simulated all, as follows:

ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
    console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));

// Expand asyncTask('res1', 300, task('res1'))
// asyncTask
setTimeout(function() {
    console.log('get ' + 'res1' + ' at ' + new Date().getTime());

    // task('res1')
    var data = 'this is ' + 'res1';
    ep.emit('res1', data);
}, 300);

Can see, using EP process is large pieces of business code occasionally interspersed with 1 line of EP code (ep.all and ep.emit), almost no impact on business blocks themselves (just may need to insert one EP code at business block exit), no need to refactor business code to accommodate framework, comparing Promise, this point is very obvious

Back to question, what does EventProxy actually do?

EventProxy simplifies process of using event mechanism to manage asynchronous tasks. Using built-in events module also easy to implement similar functionality, and code volume won't be much more than EventProxy source code. Very small invasiveness to business code, therefore more like util, rather than big framework

IV. Summary

EventProxy is a clever tool library (small invasiveness), provides asynchronous control methods based on event mechanism

Comparing async module, EventProxy is more troublesome to use, functionality also not comprehensive enough, but its small and flexible is highlight, plugin-style tool, can choose to use or not use at any time, while for slightly more "aggressive" frameworks, abandoning requires courage

Comparing Promise, event subscription/publish mechanism (EventProxy)'s disadvantage is must pre-determine branches, otherwise specifying branches after event occurs is invalid. Promise's biggest features are:

  • Separated positive use cases and negative use cases (p.then(onFulfilled, onRejected))

  • Delayed logic processing

Delayed logic processing, that is no need to pre-specify branches, first execute asynchronous call, delay specifying branch processing. But Promise's disadvantage is need to encapsulate different APIs for different scenarios, exists packaging cost

P.S. For detailed information about Promise, please check [Completely Understand Promise](/articles/完全理解 promise/)

Reference Materials

  • "Deep Dive NodeJS"

Comments

No comments yet. Be the first to share your thoughts.

Leave a comment