Workflow orchestration library with state machines, queue services, and ETL framework for building complex workflows and background job processing.
npm install @egintegrations/workflowWorkflow orchestration library with state machines, queue services, and ETL framework for building complex workflows and background job processing.
- State Machine: Generic state machine with guards, actions, and history tracking
- Queue Service: Background job queue with priorities, retries, and concurrency control
- ETL Framework: Extract-Transform-Load pipelines with batch processing and progress tracking
``bash`
npm install @egintegrations/workflow
Create workflows with state transitions, guards, and actions:
`typescript
import { StateMachine } from '@egintegrations/workflow';
enum OrderState {
DRAFT = 'DRAFT',
SUBMITTED = 'SUBMITTED',
APPROVED = 'APPROVED',
REJECTED = 'REJECTED',
}
enum OrderEvent {
SUBMIT = 'SUBMIT',
APPROVE = 'APPROVE',
REJECT = 'REJECT',
}
const orderMachine = new StateMachine({
initialState: OrderState.DRAFT,
transitions: [
{ from: OrderState.DRAFT, to: OrderState.SUBMITTED, event: OrderEvent.SUBMIT },
{
from: OrderState.SUBMITTED,
to: OrderState.APPROVED,
event: OrderEvent.APPROVE,
guard: (ctx) => ctx.amount < 10000, // Only auto-approve small orders
action: (from, to, event, ctx) => {
console.log(Order ${ctx.orderId} approved);Transitioned from ${from} to ${to}
},
},
{ from: OrderState.SUBMITTED, to: OrderState.REJECTED, event: OrderEvent.REJECT },
],
context: { orderId: '123', amount: 5000 },
enableHistory: true,
onTransition: (from, to, event, ctx) => {
console.log();
},
});
// Check if transition is possible
if (orderMachine.can(OrderEvent.SUBMIT)) {
await orderMachine.transition(OrderEvent.SUBMIT);
}
console.log(orderMachine.getCurrentState()); // SUBMITTED
console.log(orderMachine.getHistory()); // Array of state transitions
`
#### State Machine Features
- Generic Types: Type-safe states, events, and context
- Guards: Conditional transitions based on context
- Actions: Execute code during transitions
- History: Track all state transitions with timestamps
- Async Support: Guards and actions can be async
- Context Management: Store and update workflow data
Process background jobs with priorities, retries, and concurrency control:
`typescript
import { InMemoryQueue } from '@egintegrations/workflow';
const queue = new InMemoryQueue({
pollInterval: 1000, // Check for jobs every second
concurrency: 5, // Process up to 5 jobs concurrently
enableDeadLetter: true, // Move failed jobs to dead letter queue
});
// Register job handler
queue.process<{ email: string; subject: string }>('send-email', async (job) => {
console.log(Sending email to ${job.data.email});
await sendEmail(job.data.email, job.data.subject);
});
// Enqueue jobs
const jobId = await queue.enqueue(
'send-email',
{
email: 'user@example.com',
subject: 'Welcome!',
},
{
priority: 10, // Higher priority jobs run first
maxRetries: 3, // Retry up to 3 times on failure
}
);
// Check job status
const job = await queue.getStatus(jobId);
console.log(job?.status); // PENDING, PROCESSING, COMPLETED, FAILED, CANCELLED
// Manage jobs
await queue.cancel(jobId); // Cancel a pending job
await queue.retry(failedJobId); // Retry a failed job
await queue.clearCompleted(); // Remove completed jobs
await queue.stop(); // Stop processing jobs
`
#### Queue Features
- Priority Queuing: Higher priority jobs run first
- Automatic Retries: Configurable retry attempts with exponential backoff
- Concurrency Control: Limit concurrent job execution
- Job Management: Cancel, retry, and query jobs
- Status Tracking: Track job lifecycle (PENDING → PROCESSING → COMPLETED/FAILED)
- Dead Letter Queue: Failed jobs moved to separate queue
- Type Safety: Typed job data and handlers
Build data extraction, transformation, and loading pipelines:
`typescript
import {
ETLPipeline,
ArrayExtractor,
FunctionTransformer,
ArrayLoader,
} from '@egintegrations/workflow';
interface User {
id: number;
name: string;
email: string;
age: number;
}
interface TransformedUser {
userId: number;
displayName: string;
contact: string;
}
const sourceData: User[] = [
{ id: 1, name: 'Alice', email: 'alice@example.com', age: 30 },
{ id: 2, name: 'Bob', email: 'bob@example.com', age: 25 },
];
const pipeline = new ETLPipeline({
name: 'user-import',
extractor: new ArrayExtractor(sourceData),
transformer: new FunctionTransformer
userId: user.id,
displayName: user.name.toUpperCase(),
contact: user.email,
})),
loader: new ArrayLoader
batchSize: 100, // Process in batches of 100
onProgress: (stats) => {
console.log(Loaded ${stats.loaded}/${stats.extracted} records);Failed to process record:
},
onError: (error, record) => {
console.error(, error);
},
});
const result = await pipeline.run();
console.log(result);
// {
// pipelineName: 'user-import',
// status: 'completed',
// extracted: 2,
// transformed: 2,
// filtered: 0,
// loaded: 2,
// failed: 0,
// duration: 15,
// startTime: Date,
// endTime: Date
// }
`
#### Built-in Extractors
- ArrayExtractor: Extract from in-memory array
- JSONFileExtractor: Extract from JSON file
- FunctionExtractor: Extract using custom function
#### Built-in Transformers
- IdentityTransformer: Pass data through unchanged
- FunctionTransformer: Transform using custom function
- FilterTransformer: Filter records based on predicate
- MapTransformer: Map fields from input to output
- ComposeTransformer: Chain multiple transformers
#### Built-in Loaders
- ArrayLoader: Load into in-memory array
- JSONFileLoader: Load into JSON file
- ConsoleLoader: Log records to console (debugging)
- FunctionLoader: Load using custom function
- BatchCallbackLoader: Execute callback for each batch
#### Custom ETL Components
Create custom extractors, transformers, and loaders:
`typescript
import { Extractor, Transformer, Loader } from '@egintegrations/workflow';
// Custom extractor
class DatabaseExtractor implements Extractor
async extract(): Promise
return db.query('SELECT * FROM users');
}
}
// Custom transformer
class UppercaseNameTransformer implements Transformer
async transform(user: User): Promise
return { ...user, name: user.name.toUpperCase() };
}
}
// Custom loader
class DatabaseLoader implements Loader
async load(users: User[]): Promise
await db.insertMany(users);
return users.length;
}
}
`
#### Constructor
`typescript`
new StateMachine({
initialState: S,
transitions: Transition[],
context?: C,
onTransition?: (from: S, to: S, event: E, context: C) => void | Promise
enableHistory?: boolean,
})
#### Methods
- getCurrentState(): S - Get current stategetContext(): C | undefined
- - Get contextsetContext(context: C): void
- - Update contextcan(event: E): boolean
- - Check if transition is possiblegetPossibleEvents(): E[]
- - Get all possible events from current statetransition(event: E): Promise
- - Execute state transitiongetHistory(): StateHistoryEntry
- [] - Get transition historyclearHistory(): void
- - Clear historyreset(initialState?: S): void
- - Reset to initial state
#### Constructor
`typescript`
new InMemoryQueue({
pollInterval?: number, // Default: 1000ms
concurrency?: number, // Default: 5
enableDeadLetter?: boolean, // Default: false
})
#### Methods
- enqueue - Add job to queueprocess
- - Register job handlergetStatus(jobId: string): Promise
- - Get job statuscancel(jobId: string): Promise
- - Cancel pending jobretry(jobId: string): Promise
- - Retry failed jobgetJobsByStatus(status: JobStatus): Promise
- - Get jobs by statusclearCompleted(olderThan?: Date): Promise
- - Clear completed jobsstop(): Promise
- - Stop processing
#### Constructor
`typescript`
new ETLPipeline({
name: string,
extractor: Extractor
transformer: Transformer
loader: Loader
batchSize?: number, // Default: 100
onProgress?: (stats: ETLProgress) => void,
onError?: (error: Error, record?: any) => void,
})
#### Methods
- run(): Promise - Execute the ETL pipeline
Extracted from FTA:
- State Machine: backend/app/services/state_machine.py - Generic state machine patternbackend/app/services/queue.py
- Queue Service: - Background job queue patternbackend/app/services/etl.py
- ETL Framework: - Extract-Transform-Load pattern
Refactored from Python to TypeScript with:
- Generic type support
- Pluggable extractors, transformers, and loaders
- Modern async/await patterns
- Comprehensive test coverage
- @egintegrations/core-utils` - Retry logic, error handling, health checks
MIT
See the main egi-comp-library repository for contribution guidelines.