parallel execution of ES6 Promises without Promise.all
npm install promise-commandIt allows control of parallelism like async.parallelLimit combined with better debug-capabilities and more flexibility.
it works only with node 6 and 7 with full es6-support
`` javascript
const Controller = require("promise-command");
const crawler = //function for parallel execution with promises
(obj) => Promise.resolve(obj)
.then((obj) => {
obj.message = null;
return obj;
})
.then((obj) => controller.tester(obj)); //use test function
const controller = new Controller({
parallel: 200, // maximal parallism
fun: crawler //function to use
});
Promise.resolve()
.then(() => { // prepare array of data
controller.daten = [];
let count = 1000;
for (let i = 0; i < count; i++) {
controller.daten.push({
id: i
});
}
return controller.daten;
})
.then((res) => controller.runner(res)) //execute function over array
.then((x) => { // await results after completion
console.log("finished",x);
})
.catch((err) => {
console.error("exit with error",err);
});
`Overwriting the Controller-Class
` javascript
class Controller1 extends require("promise-command") {
constructor(param) {
super(param);
this.collector = []; //collect the results
}
// overwrting of error handler
errHandler(pos, err) {
debug("error",err,pos);
// delete the data with errors in the source for repeating
delete this.daten[pos.pos];
if ( this.errcollector.size > 30 ){ //stop only after 30 errors
return true;
} else {
return false;
}
}
//user handling of every completed object
objHandler(pos, obj,input) {
//debug("hier da",pos,obj);
this.collector[pos.id] = Object.assign(obj, {
diff: process.hrtime(pos.start) //add timing
}); //store endresult in order of start like Promise.all
}
//overwrite handling of end results
endHandler(){
if (this.errcollector.size > 30) { //throw with error
debug("finished with error %d", this.errcollector.size);
this.emit("ende", [...this.errcollector.entries()]);
} else { // return the array of results
debug("finished with gen");
this.emit("ende", null,this.collector);
}
}
//overwrite data generation
*
dataGenerator(res) {
try {
const iter1 = this.makeIteratorFun(res, null, this.fun); //prepare iterator fun,iterate over input data
//only 1 iteration over data,collector and iterator are seperate
//let first = yield* this.startAll([], () => iter1.next());
//repeat endless with iterator over data, until limit is reached, iterator works on collector
let first = yield* this.startAll(res, () => iter1.next(), 30000); //start generator with iterator, limit 30000 iterations
// in first are now the executions which are finished, the still running executions are missing
debug("wait now until finished");
const l = yield* this.waiterFinished(3000, true); // wait unitl finished or 3secs passed
debug("now end %d", l.length);
return first.concat(l); //all results
} catch (err) {
debug("error generator", err);
}
}
}
``
It implements and uses following features:
after execution of every function call it is brought into the promise-chain of the controller and the results are collected.
Errors are handled with a maximal number of allowed errors before termination.
The controller only terminates when every executed function call is finished. There will be no dangling promises after termination.
the generation of promises can be endless or limited to a finite number. Every promise must terminate otherwise the controller can never end and the program must be terminated with process.exit.