##I. Basic Usage
Step's biggest feature is that it only provides 1 API, the usage can be remembered at a glance, example as follows:
var fs = require('fs');
var Step = require('./step.js');
Step(
function readDir() {
fs.readdir(__dirname, this);
},
function readFiles(err, results) {
if (err) throw err;
// Create a new group
var group = this.group();
results.forEach(function(filename) {
if (/\.js$/.test(filename)) {
fs.readFile(__dirname + "/" + filename, 'utf8', group());
}
});
},
function showAll(err, files) {
if (err) throw err;
files.forEach(function(text) {
console.log(text.slice(0, 20));
});
// console.dir(files);
}
);
First read current directory, then concurrently read all files. Very concise, like a small work of art:
-
Step()accepts any number of functions indicating sequential execution -
thisin each function represents built-in callback function, collects 1 result and passes to next task -
this.group()in each function represents result grouping, used to support concurrent requests, collects multiple results then passes to next task
##II. Implementation Ideas
###1. Sequential Execution
Step() is not a constructor, just a normal function. The only exposed API, used to register tasks that need sequential execution
Step internally collects various tasks passed in, controls sequential execution, and injects result collection logic through this and this.group(), for example:
Step(function() {
var _callback = this;
setTimeout(function() {
_callback(null, 1);
}, 50);
setTimeout(function() {
_callback(null, 2, 4);
}, 30);
}, function(err, res) {
if (err) throw err;
console.log(res);
});
this (_callback) collects results and passes to next task, res receives 2. There are 2 problems:
-
4is lost -
1is not output
4 is lost because Step limits each task's return result to 1 value, so each task's first parameter is exception err, second is previous task's return result res
1 is not output because this() represents passing current task result to next ring, marks current task end, therefore subsequent repeated this() is invalid. Furthermore, precisely because this cannot collect multiple results, this.group() is needed to create groups to handle concurrent situations
###2. Concurrent Requests
To handle concurrent requests, a group grouping method was specially added, observe the previous example:
// Create a new group
var group = this.group();
results.forEach(function(filename) {
if (/\.js$/.test(filename)) {
fs.readFile(__dirname + "/" + filename, 'utf8', group());
}
});
Can discover, using grouping to collect multiple results has 3 steps:
-
this.group()initializes a group, prepares to receive multiple results -
group()registers concurrent requests -
group()()collects results, and removes corresponding registration item, waits for all registration items to complete then passes result array to next ring
Specific implementation method is similar to all() in EventProxy, built-in counter records results in order, finally passes collected result array down
###3. Exception Handling
Step's exception handling is quite ugly, example as follows:
Step(function() {
//...
}, function(err, res) {
if (err) throw err;
//...
}, function(err, res) {
if (err) throw err;
//...
});
Except first ring, all subsequent tasks must manually top-place if...throw. That is, exception handling mechanism requires manually re-throwing err, otherwise exception cannot be passed to final callback function (last ring). As follows:
Step(function() {
throw new Error('error occurs');
}, function(err) {
// if (err) throw err;
this(null, 'ok');
}, function(err, res) {
if (!err) {
console.log(res); // ok
}
});
First ring's exception is lost in second ring, third ring doesn't receive exception. Losing an exception is nothing, but no error reported is a bit scary, that is, if you forget to manually top-place if...throw, code may silently error, this is very dangerous
The root cause of lost exceptions is that every link execution is wrapped in try block, if there's exception then pass to next ring, exceptions passed through parameters naturally get lost if not manually re-thrown
##III. Source Code Analysis
Basic structure as follows:
// 1.Define Step() to receive a series of tasks
// 1.1 Separate tasks (steps) and callback (last ring)
// 1.2 Initialize various state variables
// 2.Define next() as built-in callback function (for external use)
// 2.1 Take one from task queue and execute in try block
// Simultaneously inject next callback through this
// 2.2 Collect and pass results/exceptions, execute next ring
// 3.Define next.parallel(), for internal use only, cooperate with group
// 3.1 Counter increment
// 3.2 Return callback responsible for counter decrement
// 4.Define next.group(), for external use
// 4.1 Initialize counter and state variables
// 4.2 Return callback responsible for collecting results
// 5.Define Step.fn(), extend pipeline (support adding first ring leading data preparation and extra last ring tailing wrap-up)
// 5.1 Record various tasks passed in parameters
// 5.2 Return new function to receive leading data and final tailing
// Then add first ring and last ring, and start Step
// 6.Expose API
// 6.1 Expose Step() according to CommonJS module definition method
Step 1 defines entry function, Step 2 provides task sequential execution support and exception handling, Steps 3 and 4 provide concurrent request support, Step 5 provides data import support and wrap-up support, icing on the cake
Specific source code (with detailed comments):
/*
Copyright (c) 2011 Tim Caswell <tim @creationix.com>
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
// Inspired by http://github.com/willconant/flow-js, but reimplemented and
// modified to fit my taste and the node.JS error handling system.
// 1.Define Step() to receive a series of tasks
// 1.1 Separate tasks (steps) and callback (last ring)
// 1.2 Initialize various state variables
// 2.Define next() as built-in callback function (for external use)
// 2.1 Take one from task queue and execute in try block
// Simultaneously inject next callback through this
// 2.2 Collect and pass results/exceptions, execute next ring
// 3.Define next.parallel(), for internal use only, cooperate with group
// 3.1 Counter increment
// 3.2 Return callback responsible for counter decrement
// 4.Define next.group(), for external use
// 4.1 Initialize counter and state variables
// 4.2 Return callback responsible for collecting results
// 5.Define Step.fn(), extend pipeline (support adding first ring leading data preparation and extra last ring tailing wrap-up)
// 5.1 Record various tasks passed in parameters
// 5.2 Return new function to receive leading data and final tailing
// Then add first ring and last ring, and start Step
// 6.Expose API
// 6.1 Expose Step() according to CommonJS module definition method
function Step() {
var steps = Array.prototype.slice.call(arguments),
pending, counter, results, lock;
// steps is a series of functions passed by caller, last one is callback, previous ones are async/sync tasks
// pending indicates number of items waiting to execute
// counter indicates total number of items executing concurrently
// results indicates collected results of concurrently executing steps, results[0] is err, afterwards corresponds to each step
// lock indicates processing/executing next step
// Define the main callback that's given as `this` to the steps.
// Main callback function, i.e., this in steps
function next() {
counter = pending = 0;
// Check if there are no steps left
if (steps.length === 0) {
// Throw uncaught errors
if (arguments[0]) {
throw arguments[0];
}
return;
}
// Get the next step to execute
// Take one step from beginning
var fn = steps.shift();
results = [];
// Run the step in a try..catch block so exceptions don't get out of hand.
// steps execution is put in try, uncaught exceptions passed to next step through next(e)
try {
lock = true;
//!!! Reason why this in steps points to next
var result = fn.apply(next, arguments);
} catch (e) {
// Pass any exceptions on through the next callback
next(e);
}
if (counter > 0 && pending == 0) {
// If parallel() was called, and all parallel branches executed
// synchronously, go on to the next step immediately.
// Control steps parallel execution
next.apply(null, results);
} else if (result !== undefined) {
// If a synchronous return is used, pass it to the callback
// Pass sync step return value to next step
next(undefined, result);
}
lock = false;
}
// Add a special callback generator `this.parallel()` that groups stuff.
next.parallel = function () {
// Update counter and pending items each call
var index = 1 + counter++; // First slot left for err, results start from results[1] onwards
pending++;
return function () {
// When callback executes, update pending items
pending--;
// Compress the error from any result to the first argument
if (arguments[0]) {
results[0] = arguments[0];
}
// Send the other results as arguments
results[index] = arguments[1];
//!!! Discard 3rd parameter and all subsequent parameters
if (!lock && pending === 0) {
// When all parallel branches done, call the callback
// No pending items, execute next step
next.apply(null, results);
}
else {
console.warn('pending ' + pending + ' lock ' + lock);///
}
};
};
// Generates a callback generator for grouped results
next.group = function () {
var localCallback = next.parallel();
var counter = 0;
var pending = 0;
var result = [];
var error = undefined;
function check() {
console.log('from ' + arguments[0] + ' pending ' + pending);///
if (pending === 0) {
// When group is done, call the callback
localCallback(error, result);
}
}
//! Avoid situation where last callback cannot be triggered because no group() call after this.group()
process.nextTick(check); // Ensures that check is called at least once
// Generates a callback for the group
return function () {
// Similar to parallel, update counter and pending items
var index = counter++;
pending++;
return function () {
pending--;
// Compress the error from any result to the first argument
if (arguments[0]) {
error = arguments[0];
}
// Send the other results as arguments
result[index] = arguments[1];
// if (!lock) { check(); }
if (!lock) { check('tail'); }///
};
};
};
// Start the engine an pass nothing to the first step.
// Just for initializing counter and padding
next();
}
// Tack on leading and tailing steps for input and output and return
// the whole thing as a function. Basically turns step calls into function
// factories.
Step.fn = function StepFn() {
var steps = Array.prototype.slice.call(arguments);
// Returned function takes last function parameter received as extra last ring
// Rest as parameters passed to first ring
return function () {
var args = Array.prototype.slice.call(arguments);
// Insert a first step that primes the data stream
// Insert one step for data stream preparation
var toRun = [function () {
this.apply(null, args);
}].concat(steps);
// If the last arg is a function add it as a last step
// i.e., tailing hook implementation
if (typeof args[args.length-1] === 'function') {
toRun.push(args.pop());
}
Step.apply(null, toRun);
}
}
// Hook into commonJS module systems
if (typeof module !== 'undefined' && "exports" in module) {
module.exports = Step;
}
All details are in comments, need to note nextTick check issue, intuition feels there's error, github also has friends who raised same question, but after multiple tests confirmed nextTick check is generally no problem (didn't find second situation test cases in testing), and this library has been tested for quite a long time, can be used with confidence for now
Step didn't continue to develop and "grow", decided to keep it small and exquisite, but for Step's existing defects, original author has some new improvement ideas, proposed twostep and safereturn, interested parties can observe and study, original author's combat capability is very high, observing is definitely worth it
P.S. Original author's design philosophy for asynchronous control libraries is:
The general idea is to not impose structure on the programmer, but rather help them write good callback based functions.
Make it easy for everyone to use, rather than imposing a bunch of structure for people to get used to
##IV. Summary
Step is more like a magic trick, can libraries be designed like this?
Putting aside advantages like small and simple, Step has the following problems:
-
thispoints to internal function, causing nested functions in each step to be inconvenient to use (that = thisor use ES6 arrow functions) -
Error handling mechanism requires each step to start with
if...throw, otherwise errors cannot pass through intermediate steps to final callback -
State discarding mechanism, destined to be unable to handle complex dependency situations
The this problem is actually not as serious as imagined, error handling requiring manual top-placement of if...throw is indeed a bit uncomfortable, state discarding mechanism is inevitably not suitable for complex situations, but sufficient to handle most scenarios
##Reference Materials
- "Deep Dive NodeJS"
No comments yet. Be the first to share your thoughts.