A lightweight async workflow engine for composing, running, and controlling complex task pipelines with retries, parallelism, timeouts, and cancellation.
npm install quantam-asyncA lightweight async workflow engine that lets developers compose, run, and control complex task pipelines with retries, parallel execution, timeouts, and cancellation using a simple fluent API.
Writing reliable async code is hard. Without a framework, you end up with:
- Messy Promise chains — impossible to read after 3 levels
- Nested try/catch — error handling scattered everywhere
- Broken retries — manual retry logic that fails silently
- Race conditions — concurrent tasks stepping on each other
- No cancellation — long-running jobs can't be stopped
- Lost context — data and state disappear between steps
- Non-deterministic behavior — same input, different output
Quantam gives you one clean, powerful API:
``typescript`
const result = await quantam()
.step(fetchUser)
.step(enrichUserData)
.parallel([saveToCache, logAnalytics])
.retry(3)
.timeout(5000)
.run(userId);
- ✅ Fluent API — read like prose, write like a plan
- ✅ Step-by-step execution — functions run in order with data flowing between them
- ✅ Parallel execution — run multiple steps concurrently with parallel()retryCount
- ✅ Automatic retries — built-in exponential backoff with in contextwithSignal()
- ✅ Timeouts — enforce SLA with global or per-step limits
- ✅ Cancellation — AbortController integration for clean shutdown or runMany()
- ✅ Batch processing — run many inputs efficiently with
- ✅ Error propagation — catch errors once at the end
- ✅ Deterministic — same input always produces same behavior
- ✅ Testable — mock steps, assert flows
`bash`
npm install quantam-async
`typescript
import { quantam } from 'quantam-async';
// Define your async functions
async function fetchUser(id: string) {
return { id, name: 'Alice' };
}
async function fetchOrders(user: any) {
return { user, orders: [1, 2, 3] };
}
async function enrichData(data: any) {
return { ...data, enriched: true };
}
// Compose and run
const result = await quantam()
.step(fetchUser)
.step(fetchOrders)
.step(enrichData)
.run('user-123');
console.log(result);
// { user: { id: 'user-123', name: 'Alice' }, orders: [...], enriched: true }
`
Creates a new flow.
`typescript`
const flow = quantam();
Add a single async step. Each step receives the output of the previous step.
`typescript`
flow.step(async (input) => { / process / return output; })
Run multiple steps concurrently. All functions receive the same input. Results are collected in an array.
`typescript`
flow.parallel([
async (input) => { / task A / },
async (input) => { / task B / },
])
Retry the last step on failure with exponential backoff.
`typescript`
flow.retry(3, 100); // Retry 3 times, start with 100ms delay
Enforce a timeout on the entire pipeline.
`typescript`
flow.timeout(5000); // 5 second limit
Apply a timeout only to the most recent step.
`typescript`
flow
.step(fetchUser)
.stepTimeout(200); // only this step is limited to 200ms
Assign a name to the most recent step for better error messages and debugging.
`typescript
flow
.step(validateInput)
.name('validateInput')
.step(processData)
.name('processData');
// If validateInput fails, error will include "(at step 'validateInput')"
// The step name is also available in the context: context.stepName
`
Bind an AbortSignal to the flow for reuse.
`typescript
const controller = new AbortController();
const flow = quantam()
.withSignal(controller.signal)
.step(longTask);
`
Execute the pipeline with the given input. Returns a Promise.
`typescript`
const result = await flow.run(data);
Run the same flow for many inputs with optional concurrency control.
`typescript`
const inputs = Array.from({ length: 100000 }, (_, i) => i);
const results = await flow.runMany(inputs, { concurrency: 512 });
Errors bubble up to a single promise rejection:
`typescript`
try {
const result = await quantam()
.step(riskyOperation)
.run(input);
} catch (error) {
console.error('Pipeline failed:', error.message);
}
Abort a running pipeline:
`typescript
const controller = new AbortController();
const promise = quantam()
.step(longTask)
.run(input, { signal: controller.signal });
// Later...
controller.abort();
``
v0.1.1 — Added step naming for better error messages and debugging.
v0.1.0 — Core features only. API subject to change.
MIT