npm install parallel-stream
Transform and writable streams capable of processing chunks concurrently.
A concurrent transform stream
Parameters
- work function a function to process a single chunk. Function
signature should be process(chunk, enc, callback). When finished processing,
fire the provided callback.
- options object options to pass to the transform stream. (optional, default undefined)
- options.concurrency number number of chunks to process concurrently. (optional, default 1)
Examples
``javascript
var parallel = require('parallel-stream');
var transform = parallel.transform(function(chunk, enc, callback) {
processAsync(chunk)
.on('done', function(processedData) {
callback(null, processedData);
});
}, { objectMode: true, concurrency: 15 });
readable.pipe(transform)
.on('data', function(data) {
console.log('got processed data: %j', data);
})
.on('end', function() {
console.log('complete!');
});
`
Returns object a transform stream. Do not override the ._transform function.
A concurrent writable stream
Parameters
- work function a function to process a single chunk. Functionprocess(chunk, enc, callback)
signature should be . When finished processing,callback
fire the provided .flush
- function a function to run once all chunks have beenfinished
processed, but before the stream emits a event. Function signatureflush(callback)
should be , fire the provided callback when complete. (optional, default undefined)options
- object options to pass to the writable stream. (optional, default undefined)options.concurrency
- number number of chunks to process concurrently. (optional, default 1)
Examples
`javascript
var parallel = require('parallel-stream');
var writable = parallel.writable(function(chunk, enc, callback) {
processAsync(chunk)
.on('done', callback);
}, { objectMode: true, concurrency: 15 });
readable.pipe(writable)
.on('finish', function() {
console.log('complete!');
});
`
Returns object a writable stream. Do not override the ._write` function.