Preface
Asynchronous control is the deep water area of understanding Node, of course, if just want to handle asynchronous programming and eliminate callback pyramid, using async module is enough to handle everything
Node asynchronous flow control topic hopes to be close to various asynchronous control libraries (EventProxy, Step, Wind, async and ES6 Promise and ES7 async/await), closely understand their implementation, not just use them. Actually there's another purpose: pay tribute to Wind
Zero. EventEmitter
Node's main theme is asynchronous and events, many core modules are implemented based on events module, thus complex asynchronous scenarios and callback pyramids appear. So, first look at the root. Example as follows:
var EventEmitter = require('events');
var util = require('util');
function MyEmitter() {
EventEmitter.call(this);
}
// Create custom EventEmitter through inheritance mechanism
util.inherits(MyEmitter, EventEmitter);
// Create instance
var myEmitter = new MyEmitter();
// Add event listener
myEmitter.on('myEvent', function(arg1, arg2) {
console.log('myEvent occurs');
console.log(arg1, arg2, this);
// 1 3 { domain: null,
// _events: { myEvent: [Function] },
// _maxListeners: undefined }
});
// Trigger event
myEmitter.emit('myEvent', 1, 3);
// Add one-time event listener
myEmitter.once('onceEvent', function() {
console.log(myEmitter.listenerCount('onceEvent'));
console.log(EventEmitter.defaultMaxListeners);
console.log(MyEmitter.defaultMaxListeners);
console.log(myEmitter.listenerCount('onceEvent'));
console.log(myEmitter.getMaxListeners());
});
// console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent')); // Deprecated
console.log(myEmitter.listenerCount('onceEvent'));
// Add listener, pay attention to listener being removed event (verify once)
myEmitter.on('removeListener', function(event, listener) {
console.log(event);
console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));
});
myEmitter.emit('onceEvent');
P.S. Among them inheritance mechanism provided by util module is [parasitic combination inheritance](/articles/重新理解 js 的 6 种继承方式/)
I. Key to Asynchronous Control
Any asynchronous control library or method must solve 2 problems:
-
Business support: Handle asynchronous multi-level dependencies, handle asynchronous requests issued simultaneously and callback order has dependencies
-
Exception handling: 1. Must execute callback function and only execute once 2. Correctly pass back exception for caller to judge
While providing support for business, make full use of concurrency advantages as much as possible, provide friendly exception control method
Additionally, concurrency control is also a problem needing attention, avoid asynchronous requests accumulating in large quantities due to concurrency, fully utilize concurrency advantages under premise of ensuring system stability
For example async module is large and comprehensive, solves all 3 problems, while EventProxy module solves first two problems, concurrency control is handled by BagPipe module. Step, Wind and other modules also didn't consider concurrency control problem
II. Simulating EventProxy
EventProxy although doesn't depend on built-in events module, but its implementation is indeed based on events (event subscription/publish pattern). Therefore, we use built-in events module to simulate implementation of EventProxy
P.S. Custom event mechanism is not difficult to implement, also not the focus of this article, interested can check JS Learning Notes 11_Advanced Techniques 8. Observer Pattern
1. Principle
Provide some interfaces to receive asynchronous tasks passed in from external, internally manage execution flow of these tasks (sequence/concurrency/dependency), and collect results, finally pass results out. Additionally, also need to manage exceptions during task execution process
For example EventProxy provides all, tail, after are 3 task management methods, while fail and done are responsible for handling exceptions
2. Structure
First customize EventEmitter, and perform simple encapsulation, as follows:
var EventEmitter = require('events');
var util = require('util');
function MyEmitter() {
EventEmitter.call(this);
}
util.inherits(MyEmitter, EventEmitter);
// event proxy
var EP = function() {
this.emitter = new MyEmitter();
};
EP.prototype.emit = function(event, data) {
this.emitter.emit(event, event, data);
};
EP.prototype.on = function(event, callback) {
this.emitter.on(event, (ev, data) => {
if (ev === event) {
callback(data);
}
});
};
Customize EventEmitter is for ease of extending event mechanism (although can't reflect here), wrap emitter into EP and provide basic interface on/emit, event mechanism is basically complete
3. Core Part
Next need to provide various asynchronous control methods (task management methods), for example, all:
// Execute callback after all dependent events are triggered
EP.prototype.all = function() {
// Decompose parameters
var args = args2arr(arguments);
var callback = args.pop(); // Last parameter is callback
var times = args.length; // Rest parameters are event names (callback function parameter names)
var _data = [];
var _callback = after(times, data => {
for (var event of args) {
// Arrange actual parameters in order
_data.push(data[event]);
// Unbind event
//! this points to EP instance, because in arrow function
this.emitter.removeListener(event, _callback);
}
callback.apply(null, _data);
});
for (var event of args) {
this.emitter.on(event, _callback);
}
};
Utility functions used among them are:
// utils
// Function returned by after will only truly execute fn after times calls
var after = function(times, fn) {
var data = {};
if (times <= 0) {
return fn();
}
return function(key, val) {
times--;
if (data.hasOwnProperty(key)) {
if (!Array.isArray(data[key])) {
data[key] = [data[key]];
}
data[key].push(val);
}
else {
data[key] = val;
}
if (times === 0) {
return fn(data);
}
};
};
var args2arr = function(args) {
return Array.prototype.slice.call(args);
};
Simulated all and EventProxy's all function completely consistent (not considering exceptions), usage as follows:
var asyncTask = function(name, delay, fn) {
setTimeout(function() {
console.log('get ' + name + ' at ' + new Date().getTime());
if (typeof fn === 'function') {
fn();
}
}, delay);
};
// Use custom EP to implement
var EP = require('./ep.js');
var ep = new EP();
var task = function(res, data) {
data = data || 'this is ' + res;
return function() {
ep.emit(res, data);
};
};
//- all
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
asyncTask('res2', 350, task('res2'));
asyncTask('res3', 280, task('res3'));
// Trigger res1 event again after finish
asyncTask('res1', 400, task('res1'));
Secret of use lies in ep.emit(res, data) in task, first inform EP through all to record asynchronous tasks to be executed and big callback function, then externally every time an asynchronous task completes, notify EP through ep.emit(res, data), when all tasks complete, EP internally calls big callback function recorded by all initially
Similarly, can implement tail and after:
// Similar to all, but can update data to execute subsequent callbacks
EP.prototype.tail = function() {
// Decompose parameters
var args = args2arr(arguments);
var callback = args.pop(); // Last parameter is callback
var times = args.length; // Rest parameters are event names (callback function parameter names)
var _data = [];
var _callback = after(times, data => {
for (var event of args) {
// Arrange actual parameters in order
_data.push(data[event]);
// Unbind event
//! this points to EP instance, because in arrow function
this.emitter.removeListener(event, _callback);
}
callback.apply(null, _data);
// Bind subsequent callbacks
var tailData = _data.slice();
var tailCallback = function(key, val) {
for (var i = 0; i < args.length; i++) {
if (key === args[i]) {
tailData[i] = val;
break;
}
}
callback.apply(null, tailData);
};
for (var event of args) {
this.emitter.on(event, tailCallback);
}
});
for (var event of args) {
this.emitter.on(event, _callback);
}
};
// Call same interface multiple times, finally return array
EP.prototype.after = function(event, times, callback) {
var _callback = after(times, data => {
if (Array.isArray(data[event])) {
// Unbind event
this.emitter.removeListener(event, _callback);
callback.call(null, data[event].slice());
}
});
this.emitter.on(event, _callback);
};
tail only made a little change on basis of all, execute old callback function with new data every time new data arrives
after is very simple, just simple application of after utility function
4. Exception Handling
Exception handling rules are relatively simple: once exception occurs, uninstall all processing functions and call error event listener. As follows:
// Exception handling
EP.prototype.fail = function(callback) {
this.emitter.on('error', (err) => {
// Uninstall all processing functions
this.emitter.removeAllListeners();
// Execute exception callback
callback(err);
});
};
EP.prototype.done = function(event) {
return (err, result) => {
if (err) {
// Exceptions uniformly handled by error event
return this.emitter.emit('error', err);
}
this.emitter.emit(event, result);
};
};
done is previous ep.emit(res, data) in task remake, added exception handling
done is actually relatively clever part in EventProxy, avoids tedious exception handling operations like if (err) {...}, simplifies business code, meanwhile hides exception handling, use is cleaner and safer
At this point, simulating EventProxy ends, its provided other asynchronous control method implementations are similar to all, after, just more complex control methods may need more code, if continue to implement, final result would be similar to async module asynchronous control method collection, no longer delve deeper here
III. What EventProxy Actually Does
EventProxy actually didn't do much, its biggest feature is small invasiveness, like a plugin
Expand previous example using simulated all, as follows:
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
// Expand asyncTask('res1', 300, task('res1'))
// asyncTask
setTimeout(function() {
console.log('get ' + 'res1' + ' at ' + new Date().getTime());
// task('res1')
var data = 'this is ' + 'res1';
ep.emit('res1', data);
}, 300);
Can see, using EP process is large pieces of business code occasionally interspersed with 1 line of EP code (ep.all and ep.emit), almost no impact on business blocks themselves (just may need to insert one EP code at business block exit), no need to refactor business code to accommodate framework, comparing Promise, this point is very obvious
Back to question, what does EventProxy actually do?
EventProxy simplifies process of using event mechanism to manage asynchronous tasks. Using built-in events module also easy to implement similar functionality, and code volume won't be much more than EventProxy source code. Very small invasiveness to business code, therefore more like util, rather than big framework
IV. Summary
EventProxy is a clever tool library (small invasiveness), provides asynchronous control methods based on event mechanism
Comparing async module, EventProxy is more troublesome to use, functionality also not comprehensive enough, but its small and flexible is highlight, plugin-style tool, can choose to use or not use at any time, while for slightly more "aggressive" frameworks, abandoning requires courage
Comparing Promise, event subscription/publish mechanism (EventProxy)'s disadvantage is must pre-determine branches, otherwise specifying branches after event occurs is invalid. Promise's biggest features are:
-
Separated positive use cases and negative use cases (
p.then(onFulfilled, onRejected)) -
Delayed logic processing
Delayed logic processing, that is no need to pre-specify branches, first execute asynchronous call, delay specifying branch processing. But Promise's disadvantage is need to encapsulate different APIs for different scenarios, exists packaging cost
P.S. For detailed information about Promise, please check [Completely Understand Promise](/articles/完全理解 promise/)
Reference Materials
- "Deep Dive NodeJS"
No comments yet. Be the first to share your thoughts.