A modular data processing framework with nestable pipelines, processors, parallel execution, batch processing, and contract validation
npm install pipeline-processorA modular Pipeline-Processor pattern library for structured data processing with optional contract validation.
- Modular Architecture: Easily build processing pipelines with reusable processors
- Event-Based Progress: Track progress with built-in event system
- Contract Validation: Optional contract system to validate processor compatibility
- Type Safety: Written in TypeScript with full type definitions
``bash`
npm install pipeline-processor
`typescript
import { BasePipeline, BaseProcessor } from 'pipeline-processor';
// Create a processor
class MyProcessor extends BaseProcessor
public async execute(options: InputType): Promise
// Implement your processing logic here
this.updateProgress(0.5, 'Processing halfway done');
// Always check if aborted to allow clean cancellation
this.checkAborted();
// Return processed result
return result;
}
}
// Create a pipeline
const pipeline = new BasePipeline
// Register processors
pipeline.register(new MyProcessor());
// Add event listeners
pipeline.on('progress', (data) => {
console.log(Progress: ${data.progress * 100}%);
});
// Initialize and execute
await pipeline.initialize();
const result = await pipeline.execute(inputData);
`
`typescript
import {
BasePipeline,
BaseProcessor,
GlobalContractManager,
ContractResolveType
} from 'pipeline-processor';
class MyProcessor extends BaseProcessor
constructor() {
super();
// Setup contracts for this processor
const inputContracts = [
GlobalContractManager.createRequiredInputCT(
ContractResolveType.OBJECT_HAS_PROPERTIES,
'input_data',
(value, context) => {
// Validate input data
if (!value || !value.requiredProperty) {
context.message = 'Missing required property';
return false;
}
return true;
},
undefined,
'Input must include required property'
)
];
// Register contracts with the contract manager
GlobalContractManager.registerEntityContracts(
this.constructor.name,
'processor',
inputContracts
);
}
public async execute(options: InputType): Promise
// Implementation
}
}
`
For full API documentation, examples, and guides, visit the documentation site
(...)
MIT
Now let's demonstrate a real-world example of using the library:
`typescript
import {
BasePipeline,
BaseProcessor,
ProcessorEvent,
PipelineEvent
} from 'pipeline-processor';
// Define processor input and output types
interface ImageProcessorInput {
imagePath: string;
outputPath: string;
scale?: number;
}
interface ImageProcessorOutput {
outputPath: string;
dimensions: { width: number; height: number };
processingTime: number;
}
// Create a specialized processor
class ImageResizeProcessor extends BaseProcessor
public async execute(options: ImageProcessorInput): Promise
const startTime = Date.now();
this.updateProgress(0, 'Starting image resize process');
// Check for abort signals throughout processing
this.checkAborted();
// Simulate image processing steps
await this.simulateProcessing(0.3, 'Loading image');
this.checkAborted();
await this.simulateProcessing(0.6, 'Resizing image');
this.checkAborted();
await this.simulateProcessing(1.0, 'Saving image');
const processingTime = (Date.now() - startTime) / 1000;
// Return the result
return {
outputPath: options.outputPath,
dimensions: { width: 800, height: 600 },
processingTime
};
}
private async simulateProcessing(progressTarget: number, message: string): Promise
// Simulate a processing step
const delay = 500; // 500ms
await new Promise(resolve => setTimeout(resolve, delay));
this.updateProgress(progressTarget, message);
}
}
// Create a pipeline for image processing
const pipeline = new BasePipeline
// Register the processor
pipeline.register(new ImageResizeProcessor());
// Add pipeline event listeners
pipeline.on(PipelineEvent.PROGRESS, (data) => {
console.log(Pipeline progress: ${Math.round(data.progress * 100)}%);
});
pipeline.on(PipelineEvent.PIPELINE_COMPLETE, (data) => {
console.log(Processing complete. Result:, data.result);
});
// Initialize and execute the pipeline
const run = async () => {
await pipeline.initialize();
const result = await pipeline.execute({
imagePath: '/path/to/input.jpg',
outputPath: '/path/to/output.jpg',
scale: 0.5
});
console.log('Final result:', result);
};
run().catch(console.error);
`
The library includes a comprehensive test suite that covers the core functionality:
`bashRun all tests
npm test
The tests cover:
- Pipeline execution and events
- Processor execution and progress tracking
- Contract validation and resolution
- Integration between components
Validators
The library includes several built-in validators for common data types:
$3
`typescript
import {
objectHasProperties,
valueInRange,
valueType
} from 'pipeline-processor';// Check if an object has specified properties
const hasRequiredProps = objectHasProperties(['id', 'name', 'type']);
// Check if a number is within a range
const isInRange = valueInRange(0, 100);
// Check if a value is of a specified type
const isString = valueType('string');
const isNumber = valueType('number');
const isBoolean = valueType('boolean');
const isObject = valueType('object');
const isArray = valueType('array');
`$3
`typescript
import {
emailFormat,
urlFormat,
stringLength
} from 'pipeline-processor';// Check if a string is a valid email
const isEmail = emailFormat();
// Check if a string is a valid URL
const isUrl = urlFormat();
// Check if a string has a specific length range
const hasValidLength = stringLength(5, 100);
`$3
`typescript
import {
arrayLength,
arrayItemType
} from 'pipeline-processor';// Check if an array has a specific length range
const hasValidLength = arrayLength(1, 10);
// Check if all array items are of a specific type
const hasStringItems = arrayItemType('string');
const hasNumberItems = arrayItemType('number');
`$3
`typescript
import {
isInteger,
valueInRange
} from 'pipeline-processor';// Check if a number is an integer
const isIntegerValue = isInteger();
// Check if a number is within a range
const isInRange = valueInRange(0, 100);
`$3
`typescript
import {
GlobalContractManager,
ContractResolveType,
emailFormat
} from 'pipeline-processor';// Create a contract requiring a valid email
const emailContract = GlobalContractManager.createRequiredInputCT(
ContractResolveType.STRING_FORMAT,
'email',
emailFormat(),
undefined,
'Input must include a valid email'
);
// Register the contract with a processor
GlobalContractManager.registerEntityContracts(
'MyProcessor',
'processor',
[emailContract]
);
`Batch Processing for Large Datasets
For processing large datasets efficiently, the library provides batch processing capabilities:
`typescript
import { BasePipeline, BaseProcessor } from 'pipeline-processor';// Define a processor for handling chunks of data
class DataProcessor extends BaseProcessor {
public async execute(options: { data: any[] }): Promise {
// Process the current batch of data
const processedData = options.data.map(item => {
// Your processing logic here
return { ...item, processed: true };
});
return { ...options, data: processedData };
}
}
// Create and configure the pipeline
const pipeline = new BasePipeline();
pipeline.register(new DataProcessor());
await pipeline.initialize();
// Large dataset to process
const largeDataset = Array.from({ length: 10000 }, (_, i) => ({ id: i, value:
Item ${i} }));// Process in batches of 500 items
const result = await pipeline.executeBatch(
{ data: largeDataset },
500,
(batchResult, batchIndex) => {
console.log(
Processed batch ${batchIndex + 1});
}
);console.log(
Processed ${result.data.length} items in total);
`$3
- Memory Efficient: Processes large datasets in manageable chunks
- Progress Tracking: Provides detailed progress updates per batch
- Cancellation: Can be aborted at any batch boundary
- Customizable: Override
combineBatchResults method to customize how batch results are merged$3
You can extend BasePipeline to customize how batch results are combined:
`typescript
class CustomPipeline extends BasePipeline {
protected combineBatchResults(batchResults: any[]): any {
// Custom logic to combine batch results
const allData = batchResults.flatMap(result => result.data || []);
// Calculate aggregated statistics
const total = allData.reduce((sum, item) => sum + item.value, 0);
const average = total / allData.length;
return {
data: allData,
stats: {
total,
average,
count: allData.length
}
};
}
}
`Advanced Performance Optimizations
The library includes several advanced features for high-performance processing of large datasets:
$3
For extremely large datasets where memory usage is a concern:
`typescript
// Process a large dataset with memory optimization
const result = await pipeline.executeBatch(
{ data: largeDataset },
500,
(batchResult, batchIndex) => {
console.log(Processed batch ${batchIndex + 1});
},
{
memoryOptimized: true // Only keep final result, discard intermediate batch results
}
);
`$3
For faster processing on multi-core systems:
`typescript
// Process batches in parallel using multiple CPU cores
const result = await pipeline.executeBatch(
{ data: largeDataset },
500,
undefined, // No batch callback needed
{
parallelBatches: 4, // Process 4 batches in parallel
memoryOptimized: true // Optional memory optimization
}
);
`$3
For cases where you might need to stop processing once certain conditions are met:
`typescript
// Stop processing early if a condition is met
const result = await pipeline.executeBatch(
{ data: largeDataset },
500,
undefined,
{
earlyTermination: true,
terminationCheck: (batchResult) => {
// Check if this batch result meets termination criteria
// For example, if we found what we're looking for
return batchResult.data.some(item => item.isSpecialItem);
}
}
);
`$3
For processing data as it arrives, without loading the entire dataset into memory:
`typescript
// Create an async data source
async function* generateData() {
for (let i = 0; i < 10000; i++) {
// Simulate data arriving over time
await new Promise(resolve => setTimeout(resolve, 10));
yield { id: i, value: Item ${i} };
}
}// Process data in a streaming fashion
const processingStream = pipeline.createProcessingStream(generateData(), 100);
// Process results as they become available
for await (const result of processingStream) {
console.log(
Processed batch with ${result.data.length} items);
// You can do something with each batch result immediately
await saveResults(result.data);
}
`This streaming approach is ideal for:
- Processing data from network sources in real-time
- Handling infinitely large datasets
- Minimizing memory consumption
- Displaying incremental results as they become available
Parallel Processing
The library supports parallel execution of independent processors, which can significantly improve performance for complex processing pipelines:
`typescript
import { BasePipeline, BaseProcessor } from 'pipeline-processor';// Create processors
class PreprocessorA extends BaseProcessor {
// implementation
}
class PreprocessorB extends BaseProcessor {
// implementation
}
class Combiner extends BaseProcessor {
// implementation that uses results from both preprocessors
}
// Create pipeline with parallel processing
const pipeline = new BasePipeline({ parallelProcessingEnabled: true });
// First two processors can run in parallel (no dependencies)
pipeline.register(new PreprocessorA()); // index 0
pipeline.register(new PreprocessorB()); // index 1
// Combiner depends on both preprocessors
pipeline.register(new Combiner(), {}, [0, 1]); // depends on indices 0 and 1
// Alternative way to enable parallel processing
pipeline.setParallelProcessing(true);
// Execute the pipeline
await pipeline.initialize();
const result = await pipeline.execute(inputData);
`$3
1. By default, processors execute sequentially in the order they are registered
2. When parallel processing is enabled, the pipeline analyzes processor dependencies
3. Processors with the same dependencies will execute in parallel when possible
4. Dependencies can be specified when registering a processor
5. If no dependencies are specified, a processor depends on the previous processor
6. Circular dependencies are detected and will cause an error
$3
Parallel processing is most beneficial when:
- You have multiple independent preprocessing steps
- Some processors are CPU-intensive but don't depend on each other
- Your pipeline has natural points of parallelism (e.g., data enrichment from multiple sources)
For simple linear pipelines, sequential processing may be more appropriate.
Nested Pipelines
The library supports nested pipelines, allowing you to use a pipeline as a processor within another pipeline. This enables building complex processing workflows with reusable components:
`typescript
import {
BasePipeline,
BaseProcessor,
PipelineProcessor
} from 'pipeline-processor';// Create an inner pipeline
const innerPipeline = new BasePipeline();
innerPipeline.register(new ProcessorA());
innerPipeline.register(new ProcessorB());
await innerPipeline.initialize();
// Wrap the inner pipeline as a processor
const pipelineProcessor = new PipelineProcessor(innerPipeline);
// Create an outer pipeline
const outerPipeline = new BasePipeline();
outerPipeline.register(new PreProcessor());
outerPipeline.register(pipelineProcessor); // Use the pipeline as a processor
outerPipeline.register(new PostProcessor());
await outerPipeline.initialize();
// Execute the outer pipeline, which will also execute the inner pipeline
const result = await outerPipeline.execute(inputData);
`$3
Nested pipelines offer several advantages:
1. Reusability: Create reusable pipeline modules for common processing tasks
2. Encapsulation: Hide the complexity of multi-step processes behind a single processor
3. Maintainability: Break down complex workflows into smaller, manageable units
4. Composability: Compose complex workflows from simpler building blocks
$3
Events from inner pipelines are automatically forwarded to the wrapping PipelineProcessor:
- Progress events are forwarded as processor progress updates
- Error events are forwarded as processor errors
- Completion events are forwarded as processor completion
- Abort signals propagate to nested pipelines
Enhanced Type Inference
The library has been enhanced with improved TypeScript type inference to provide better type safety across processor chains.
$3
`typescript
import {
TypedBaseProcessor,
TypedBasePipeline,
TypedPipelineBuilder,
ProcessorEvent,
PipelineEvent
} from 'pipeline-processor';// Define your data types
interface InputData {
id: string;
data: string[];
}
interface EnrichedData extends InputData {
metadata: Record;
}
interface OutputData {
result: string;
processedAt: Date;
}
// Create strongly-typed processors
class EnrichmentProcessor extends TypedBaseProcessor {
async execute(input: InputData): Promise {
// Implementation...
return {
...input,
metadata: { processedBy: 'EnrichmentProcessor' }
};
}
}
class TransformProcessor extends TypedBaseProcessor {
async execute(input: EnrichedData): Promise {
// Implementation...
return {
result: input.data.join(','),
processedAt: new Date()
};
}
}
// Method 1: Create a pipeline with type checking across processor chain
const pipeline = new TypedBasePipeline();
pipeline.register(new EnrichmentProcessor());
pipeline.register(new TransformProcessor());
// Type-safe event handling
pipeline.on(PipelineEvent.PIPELINE_COMPLETE, (data) => {
// TypeScript knows data.result is OutputData
console.log(
Completed at: ${data.result.processedAt});
});// Method 2: Use the fluent builder API with type inference
const pipeline2 = TypedPipelineBuilder
.create()
.withFirstProcessor(new EnrichmentProcessor())
.pipe(new TransformProcessor())
.build();
// Execute with properly typed input and output
const result = await pipeline.execute({
id: '123',
data: ['a', 'b', 'c']
});
// Result is properly typed as OutputData
console.log(result.result); // Type-safe access
`$3
- Catch Errors Earlier: TypeScript catches type incompatibilities between processors at compile time
- Self-Documenting Code: Types clearly show the data flow through the pipeline
- IDE Support: Get proper code completion and hover information
- Refactoring Safety: When changing processor interfaces, TypeScript identifies all impacted areas
- Type-Safe Events: Event handlers receive properly typed event data
$3
The library provides several utility types to help with type inference:
`typescript
import {
ProcessorInput,
ProcessorOutput,
CanChain,
ChainOutput,
EnsureCompatibleChain
} from 'pipeline-processor';// Get the input type of a processor
type InputType = ProcessorInput;
// Get the output type of a processor
type OutputType = ProcessorOutput;
// Check if processors can be chained
type CanBeChained = CanChain; // true or false
// Get the final output type from a chain of processors
type FinalOutputType = ChainOutput<[ProcessorA, ProcessorB, ProcessorC]>;
// Check if an entire chain is compatible
type IsChainValid = EnsureCompatibleChain<[ProcessorA, ProcessorB, ProcessorC]>;
`Schema-Based Validation
The library now supports schema-based validation for complex data structures using JSON Schema and composable validators:
`typescript
import {
jsonSchema,
complexObject,
allOf,
anyOf,
conditional,
GlobalContractManager,
ContractResolveType
} from 'pipeline-processor';
import { JSONSchemaType } from 'ajv';// Define a schema for your data type
interface User {
id: string;
name: string;
email: string;
age: number;
}
const userSchema: JSONSchemaType = {
type: 'object',
properties: {
id: { type: 'string' },
name: { type: 'string', minLength: 2 },
email: { type: 'string', format: 'email' },
age: { type: 'number', minimum: 18 }
},
required: ['id', 'name', 'email', 'age'],
additionalProperties: false
};
// Use in a processor
class UserProcessor extends BaseProcessor {
constructor() {
super();
// Create a contract using JSON Schema
const schemaContract = GlobalContractManager.createRequiredInputCT(
ContractResolveType.JSON_SCHEMA,
'user',
jsonSchema(userSchema),
undefined,
'Input must include a valid user object'
);
// Create complex validation with nested property validation
const complexValidation = GlobalContractManager.createRequiredInputCT(
ContractResolveType.COMPLEX_OBJECT,
'data',
complexObject({
'user.name': stringLength(2, 50),
'user.email': emailFormat(),
'metadata.tags': arrayLength(1, 10)
}),
undefined,
'Complex validation for user data'
);
// Use conditional validation
const conditionalValidation = GlobalContractManager.createRequiredInputCT(
ContractResolveType.CONDITIONAL,
'conditionalData',
conditional(
value => value.type === 'premium',
// For premium users
complexObject({
'subscription.level': valueType('number'),
'subscription.expiresAt': (value) => value instanceof Date
}),
// For regular users
valueType('object')
),
undefined,
'Conditional validation based on user type'
);
// Register all contracts
GlobalContractManager.registerEntityContracts(
this.constructor.name,
'processor',
[schemaContract, complexValidation, conditionalValidation]
);
}
// Implementation...
}
`$3
You can compose validators to create complex validation logic:
-
allOf([validator1, validator2, ...]): All validators must pass
- anyOf([validator1, validator2, ...]): At least one validator must pass
- conditional(predicate, thenValidator, elseValidator?): Apply different validators based on a condition
- complexObject({ 'prop.path': validator }): Apply different validators to different object properties$3
Full JSON Schema validation is supported through Ajv:
`typescript
const validator = jsonSchema({
type: 'object',
properties: {
name: { type: 'string' },
age: { type: 'number', minimum: 0 }
},
required: ['name', 'age']
});
`This enables:
- Complex nested validation with a declarative approach
- Type constraints, string formats, patterns, and ranges
- Custom formats and keywords
- Detailed validation error messages
Advanced Error Recovery
The library includes advanced error recovery mechanisms to build resilient processing pipelines:
$3
Automatically retry failed operations with configurable attempts and backoff:
`typescript
import {
BaseProcessor,
withRetry,
createNetworkRetryConfig
} from 'pipeline-processor';// Create a processor that might fail
class NetworkProcessor extends BaseProcessor {
// implementation
}
// Add retry capabilities with exponential backoff
const processor = withRetry(
new NetworkProcessor(),
createNetworkRetryConfig(3) // Retry up to 3 times with exponential backoff
);
// Standard retry with custom options
const customRetryProcessor = withRetry(
new NetworkProcessor(),
{
maxAttempts: 5,
delayMs: 1000,
useExponentialBackoff: true,
backoffMultiplier: 2,
maxBackoffDelayMs: 30000,
isRetryable: (error) => {
// Custom logic to determine if an error is retryable
return error.message.includes('timeout');
}
}
);
`$3
Specify fallback data sources when primary processors fail:
`typescript
import {
BaseProcessor,
withFallback
} from 'pipeline-processor';// Primary and fallback processors
class PrimaryDataSource extends BaseProcessor {
// implementation that might fail
}
class CachedDataSource extends BaseProcessor {
// implementation providing fallback data
}
// Create processor with fallback capabilities
const processor = withFallback(
new PrimaryDataSource(),
new CachedDataSource(),
(error) => {
// Optional: Only use fallback for specific errors
return error.message.includes('network');
}
);
`$3
Prevent cascading failures by automatically "opening the circuit" when error thresholds are reached:
`typescript
import {
BaseProcessor,
withCircuitBreaker,
createCircuitBreakerConfig
} from 'pipeline-processor';// Create processor with circuit breaker
const processor = withCircuitBreaker(
new UnreliableServiceProcessor(),
createCircuitBreakerConfig({
failureThreshold: 5, // Open after 5 failures
failureWindowMs: 60000, // Within a 1-minute window
resetTimeoutMs: 30000 // Try again after 30 seconds
})
);
// Listen for circuit state changes
processor.on('circuitStateChange', (data) => {
console.log(
Circuit changed from ${data.from} to ${data.to});
});
`$3
Allow batch operations to succeed partially when some items fail:
`typescript
import {
BaseProcessor,
withPartialCompletion
} from 'pipeline-processor';// Create a processor for batch operations
class BatchProcessor extends BaseProcessor<{data: any[]}, {data: any[]}> {
// implementation that processes multiple items
}
// Add partial completion capabilities
const processor = withPartialCompletion(
new BatchProcessor(),
{
allowPartialCompletion: true,
minCompletionThreshold: 0.7, // Require at least 70% success
isItemSuccessful: (item, error) => {
// Custom logic to determine if an item with error should be considered successful
return error?.message.includes('non-critical');
},
combinePartialResults: (results) => {
// Custom logic to combine partial results
return {
success: true,
data: results,
partialSuccess: true
};
}
}
);
`$3
Recovery mechanisms can be combined for sophisticated error handling:
`typescript
import {
withRetry,
withFallback,
withCircuitBreaker
} from 'pipeline-processor';// Start with base processor
const baseProcessor = new DataProcessor();
// Add retry capabilities
const withRetries = withRetry(baseProcessor, {
maxAttempts: 3,
delayMs: 1000
});
// Add circuit breaker
const withCircuit = withCircuitBreaker(withRetries, {
failureThreshold: 5,
resetTimeoutMs: 30000
});
// Finally add fallback
const resilientProcessor = withFallback(
withCircuit,
new FallbackProcessor()
);
// The result is a processor with all three recovery mechanisms
pipeline.register(resilientProcessor);
`These error recovery mechanisms help build robust pipelines that can handle
real-world failure scenarios gracefully.
---
Roadmap
- [x] ISS_001: Fixing event recursion issues in abort functionality when an event listener calls
abort()` while processing pipeline events