A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.
> Modern TypeScript task orchestration with smart buffering, scheduling, labels, and real-time event streaming



For reporting bugs, issues, or security vulnerabilities, please visit community.foss.global/. This is the central community hub for all issue reporting. Developers who sign and comply with our contribution agreement and go through identification can also get a code.foss.global/ account to submit Pull Requests directly.
- 🎯 Type-Safe Task Management — Full TypeScript support with generics and type inference
- 📊 Real-Time Progress Tracking — Step-based progress with percentage weights
- ⚡ Smart Buffering — Intelligent request debouncing and batching
- ⏰ Cron Scheduling — Schedule tasks with cron expressions
- 🔗 Task Chains & Parallel Execution — Sequential and parallel task orchestration
- 🏷️ Labels — Attach arbitrary Record metadata (userId, tenantId, etc.) for multi-tenant filtering
- 📡 Push-Based Events — rxjs Subject on every Task and TaskManager for real-time state change notifications
- 🛡️ Error Handling — Configurable error propagation with catchErrors, error tracking, and clear error state
- 🎨 Web Component Dashboard — Built-in Lit-based dashboard for real-time task visualization
- 🌐 Distributed Coordination — Abstract coordinator for multi-instance task deduplication
``bash`
pnpm add @push.rocks/taskbufferor
npm install @push.rocks/taskbuffer
`typescript
import { Task } from '@push.rocks/taskbuffer';
const greetTask = new Task({
name: 'Greet',
taskFunction: async (name) => {
return Hello, ${name}!;
},
});
const result = await greetTask.trigger('World');
console.log(result); // "Hello, World!"
`
`typescript
const deployTask = new Task({
name: 'Deploy',
steps: [
{ name: 'build', description: 'Building app', percentage: 30 },
{ name: 'test', description: 'Running tests', percentage: 20 },
{ name: 'deploy', description: 'Deploying to server', percentage: 40 },
{ name: 'verify', description: 'Verifying deployment', percentage: 10 },
] as const,
taskFunction: async () => {
deployTask.notifyStep('build');
await buildApp();
deployTask.notifyStep('test');
await runTests();
deployTask.notifyStep('deploy');
await deployToServer();
deployTask.notifyStep('verify');
await verifyDeployment();
return 'Deployment successful!';
},
});
await deployTask.trigger();
console.log(deployTask.getProgress()); // 100
console.log(deployTask.getStepsMetadata()); // Step details with status
`
> Note: notifyStep() is fully type-safe — TypeScript only accepts step names you declared in the steps array when you use as const.
Prevent overwhelming your system with rapid-fire requests:
`typescript
const apiTask = new Task({
name: 'APIRequest',
buffered: true,
bufferMax: 5, // Maximum 5 concurrent executions
execDelay: 100, // Minimum 100ms between executions
taskFunction: async (endpoint) => {
return await fetch(endpoint).then((r) => r.json());
},
});
// Rapid fire 100 calls — only bufferMax execute concurrently
for (let i = 0; i < 100; i++) {
apiTask.trigger(/api/data/${i});`
}
Buffer Behavior:
- First bufferMax calls execute immediately
- Additional calls are queued
- When buffer is full, new calls overwrite the last queued item
- Perfect for real-time data streams where only recent data matters
Build complex workflows with automatic data flow between tasks:
`typescript
import { Taskchain } from '@push.rocks/taskbuffer';
const fetchTask = new Task({
name: 'Fetch',
taskFunction: async () => {
const res = await fetch('/api/data');
return res.json();
},
});
const transformTask = new Task({
name: 'Transform',
taskFunction: async (data) => {
return data.map((item) => ({ ...item, transformed: true }));
},
});
const saveTask = new Task({
name: 'Save',
taskFunction: async (transformedData) => {
await database.save(transformedData);
return transformedData.length;
},
});
const pipeline = new Taskchain({
name: 'DataPipeline',
taskArray: [fetchTask, transformTask, saveTask],
});
const savedCount = await pipeline.trigger();
console.log(Saved ${savedCount} items);`
Taskchain also supports dynamic mutation:
`typescript`
pipeline.addTask(newTask); // Append to chain
pipeline.removeTask(oldTask); // Remove by reference (returns boolean)
pipeline.shiftTask(); // Remove & return first task
Error context is rich — a chain failure includes the chain name, failing task name, task index, and preserves the original error as .cause.
Execute multiple tasks simultaneously:
`typescript
import { Taskparallel } from '@push.rocks/taskbuffer';
const parallel = new Taskparallel({
taskArray: [emailTask, smsTask, pushNotificationTask, webhookTask],
});
await parallel.trigger(notificationData);
`
Coalesce rapid triggers into a single execution after a quiet period:
`typescript
import { TaskDebounced } from '@push.rocks/taskbuffer';
const searchTask = new TaskDebounced({
name: 'Search',
debounceTimeInMillis: 300,
taskFunction: async (query) => {
return await searchAPI(query);
},
});
// Rapid calls — only the last triggers after 300ms of quiet
searchTask.trigger('h');
searchTask.trigger('he');
searchTask.trigger('hel');
searchTask.trigger('hello'); // ← this one fires
`
Ensure a task only runs once, regardless of how many times it's triggered:
`typescript
import { TaskOnce } from '@push.rocks/taskbuffer';
const initTask = new TaskOnce({
name: 'Init',
taskFunction: async () => {
await setupDatabase();
console.log('Initialized!');
},
});
await initTask.trigger(); // Runs
await initTask.trigger(); // No-op
await initTask.trigger(); // No-op
console.log(initTask.hasTriggered); // true
`
Process a queue of tasks with a configurable parallelism limit:
`typescript
import { TaskRunner } from '@push.rocks/taskbuffer';
const runner = new TaskRunner();
runner.setMaxParallelJobs(3); // Run up to 3 tasks concurrently
await runner.start();
runner.addTask(taskA);
runner.addTask(taskB);
runner.addTask(taskC);
runner.addTask(taskD); // Queued until a slot opens
// When done:
await runner.stop();
`
Attach arbitrary key-value labels to any task for filtering, grouping, or multi-tenant isolation:
`typescript
const task = new Task({
name: 'ProcessOrder',
labels: { userId: 'u-42', tenantId: 'acme-corp', priority: 'high' },
taskFunction: async (order) => {
/ ... /
},
});
// Manipulate labels at runtime
task.setLabel('region', 'eu-west');
task.getLabel('userId'); // 'u-42'
task.hasLabel('tenantId', 'acme-corp'); // true
task.removeLabel('priority'); // true
// Labels are included in metadata snapshots
const meta = task.getMetadata();
console.log(meta.labels); // { userId: 'u-42', tenantId: 'acme-corp', region: 'eu-west' }
`
`typescript
const manager = new TaskManager();
manager.addTask(orderTask1); // labels: { tenantId: 'acme' }
manager.addTask(orderTask2); // labels: { tenantId: 'globex' }
manager.addTask(orderTask3); // labels: { tenantId: 'acme' }
const acmeTasks = manager.getTasksByLabel('tenantId', 'acme');
// → [orderTask1, orderTask3]
const acmeMetadata = manager.getTasksMetadataByLabel('tenantId', 'acme');
// → [ITaskMetadata, ITaskMetadata]
`
Every Task exposes an rxjs Subject that emits events as the task progresses through its lifecycle:
`typescript
import type { ITaskEvent } from '@push.rocks/taskbuffer';
const task = new Task({
name: 'DataSync',
steps: [
{ name: 'fetch', description: 'Fetching data', percentage: 50 },
{ name: 'save', description: 'Saving data', percentage: 50 },
] as const,
taskFunction: async () => {
task.notifyStep('fetch');
const data = await fetchData();
task.notifyStep('save');
await saveData(data);
},
});
// Subscribe to individual task events
task.eventSubject.subscribe((event: ITaskEvent) => {
console.log([${event.type}] ${event.task.name} @ ${new Date(event.timestamp).toISOString()}); Step: ${event.stepName}
if (event.type === 'step') console.log(); Error: ${event.error}
if (event.type === 'failed') console.log();
});
await task.trigger();
// [started] DataSync @ 2025-01-26T...
// [step] DataSync @ 2025-01-26T...
// Step: fetch
// [step] DataSync @ 2025-01-26T...
// Step: save
// [completed] DataSync @ 2025-01-26T...
`
| Type | When | Extra Fields |
| --- | --- | --- |
| 'started' | Task begins execution | — |'step'
| | notifyStep() is called | stepName |'completed'
| | Task finishes successfully | — |'failed'
| | Task throws an error | error (message string) |
Every event includes a full ITaskMetadata snapshot (including labels) at the time of emission.
TaskManager automatically aggregates events from all added tasks into a single taskSubject:
`typescript
const manager = new TaskManager();
manager.addTask(syncTask);
manager.addTask(reportTask);
manager.addTask(cleanupTask);
// Single subscription for ALL task events
manager.taskSubject.subscribe((event) => {
sendToMonitoringDashboard(event);
});
// Events stop flowing for a task after removal
manager.removeTask(syncTask);
`
manager.stop() automatically cleans up all event subscriptions.
By default, trigger() rejects when the task function throws — errors propagate naturally:
`typescript
const task = new Task({
name: 'RiskyOp',
taskFunction: async () => {
throw new Error('something broke');
},
});
try {
await task.trigger();
} catch (err) {
console.error(err.message); // "something broke"
}
`
Set catchErrors: true to swallow errors and return undefined instead of rejecting:
`typescript
const task = new Task({
name: 'BestEffort',
catchErrors: true,
taskFunction: async () => {
throw new Error('non-critical');
},
});
const result = await task.trigger(); // undefined (no throw)
`
Regardless of catchErrors, the task tracks errors:
`typescript
console.log(task.lastError); // Error object (or undefined)
console.log(task.errorCount); // Number of failures across all runs
console.log(task.getMetadata().status); // 'failed'
task.clearError(); // Resets lastError to undefined (errorCount stays)
`
On a subsequent successful run, lastError is automatically cleared.
`typescript
const manager = new TaskManager();
// Add tasks
manager.addTask(dataProcessor);
manager.addTask(deployTask);
// Schedule with cron expressions
manager.addAndScheduleTask(backupTask, '0 2 *'); // Daily at 2 AM
manager.addAndScheduleTask(healthCheck, '/5 *'); // Every 5 minutes
// Query metadata
const meta = manager.getTaskMetadata('Deploy');
console.log(meta);
// {
// name: 'Deploy',
// status: 'completed',
// steps: [...],
// currentProgress: 100,
// runCount: 3,
// labels: { env: 'production' },
// lastError: undefined,
// errorCount: 0,
// ...
// }
// All tasks at once
const allMeta = manager.getAllTasksMetadata();
// Scheduled task info
const scheduled = manager.getScheduledTasks();
const nextRuns = manager.getNextScheduledRuns(5);
// Trigger by name
await manager.triggerTaskByName('Deploy');
// One-shot: add, execute, collect report, remove
const report = await manager.addExecuteRemoveTask(temporaryTask);
console.log(report);
// {
// taskName: 'TempTask',
// startTime: 1706284800000,
// endTime: 1706284801523,
// duration: 1523,
// steps: [...],
// stepsCompleted: ['step1', 'step2'],
// progress: 100,
// result: any
// }
// Lifecycle
await manager.start(); // Starts cron scheduling + distributed coordinator
await manager.stop(); // Stops scheduling, cleans up event subscriptions
`
`typescript`
manager.removeTask(task); // Removes from map and unsubscribes event forwarding
manager.descheduleTaskByName('Deploy'); // Remove cron schedule only
Visualize your tasks in real-time with the included Lit-based web component:
`html
`
The dashboard provides:
- 📊 Real-time progress bars with step indicators
- 📈 Task execution history and metadata
- ⏰ Scheduled task information with next-run times
- 🌓 Light/dark theme support
For multi-instance deployments, extend AbstractDistributedCoordinator to prevent duplicate task execution:
`typescript
import { TaskManager, distributedCoordination } from '@push.rocks/taskbuffer';
class RedisCoordinator extends distributedCoordination.AbstractDistributedCoordinator {
async fireDistributedTaskRequest(request) {
// Implement leader election / distributed lock via Redis
return { shouldTrigger: true, considered: true, rank: 1, reason: 'elected', ...request };
}
async updateDistributedTaskRequest(request) {
/ update status /
}
async start() {
/ connect /
}
async stop() {
/ disconnect /
}
}
const manager = new TaskManager({
distributedCoordinator: new RedisCoordinator(),
});
`
When a distributed coordinator is configured, scheduled tasks consult it before executing — only the elected instance runs the task.
Run setup/teardown tasks automatically:
`typescript
const mainTask = new Task({
name: 'MainWork',
preTask: new Task({
name: 'Setup',
taskFunction: async () => {
console.log('Setting up...');
},
}),
afterTask: new Task({
name: 'Cleanup',
taskFunction: async () => {
console.log('Cleaning up...');
},
}),
taskFunction: async () => {
console.log('Doing work...');
return 'done';
},
});
await mainTask.trigger();
// Setting up... → Doing work... → Cleaning up...
`
Run an expensive initialization exactly once, before the first execution:
`typescriptsetupValue
const task = new Task({
name: 'DBQuery',
taskSetup: async () => {
const pool = await createConnectionPool();
return pool; // This becomes
},
taskFunction: async (input, pool) => {
return await pool.query(input);
},
});
await task.trigger('SELECT * FROM users'); // Setup runs here
await task.trigger('SELECT * FROM orders'); // Setup skipped, pool reused
`
Make one task wait for another to finish before executing:
`typescript
const initTask = new Task({
name: 'Init',
taskFunction: async () => {
await initializeSystem();
},
});
const workerTask = new Task({
name: 'Worker',
taskFunction: async () => {
await doWork();
},
});
workerTask.blockingTasks.push(initTask);
// Triggering worker will automatically wait for init to complete
initTask.trigger();
workerTask.trigger(); // Waits until initTask.finished resolves
`
`typescript
const migration = new Taskchain({
name: 'DatabaseMigration',
taskArray: [backupTask, validateSchemaTask, runMigrationsTask, verifyIntegrityTask],
});
try {
await migration.trigger();
console.log('Migration successful!');
} catch (error) {
// error includes chain name, failing task name, index, and original cause
console.error(error.message);
await rollbackTask.trigger();
}
`
Combine labels + events for a real-time multi-tenant dashboard:
`typescript
const manager = new TaskManager();
// Create tenant-scoped tasks
function createTenantTask(tenantId: string, taskName: string, fn: () => Promise
const task = new Task({
name: ${tenantId}:${taskName},
labels: { tenantId },
taskFunction: fn,
});
manager.addTask(task);
return task;
}
createTenantTask('acme', 'sync', async () => syncData('acme'));
createTenantTask('globex', 'sync', async () => syncData('globex'));
// Stream events to tenant-specific WebSocket channels
manager.taskSubject.subscribe((event) => {
const tenantId = event.task.labels?.tenantId;
if (tenantId) {
wss.broadcast(tenantId, JSON.stringify(event));
}
});
// Query tasks for a specific tenant
const acmeTasks = manager.getTasksMetadataByLabel('tenantId', 'acme');
`
| Class | Description |
| --- | --- |
| Task | Core task unit with optional step tracking, labels, and event streaming |TaskManager
| | Centralized orchestrator with scheduling, label queries, and aggregated events |Taskchain
| | Sequential task executor with data flow between tasks |Taskparallel
| | Concurrent task executor via Promise.all() |TaskOnce
| | Single-execution guard |TaskDebounced
| | Debounced task using rxjs |TaskRunner
| | Sequential queue with configurable parallelism |TaskStep
| | Step tracking unit (internal, exposed via metadata) |
| Method | Returns | Description |
| --- | --- | --- |
| trigger(input?) | Promise | Execute the task |notifyStep(name)
| | void | Advance to named step (type-safe) |getProgress()
| | number | Current progress 0–100 |getStepsMetadata()
| | ITaskStep[] | Step details with status |getMetadata()
| | ITaskMetadata | Full task metadata snapshot |setLabel(key, value)
| | void | Set a label |getLabel(key)
| | string \| undefined | Get a label value |removeLabel(key)
| | boolean | Remove a label |hasLabel(key, value?)
| | boolean | Check label existence / value |clearError()
| | void | Reset lastError to undefined |
| Property | Type | Description |
| --- | --- | --- |
| name | string | Task identifier |running
| | boolean | Whether the task is currently executing |idle
| | boolean | Inverse of running |labels
| | Record | Attached labels |eventSubject
| | Subject | rxjs Subject emitting lifecycle events |lastError
| | Error \| undefined | Last error encountered |errorCount
| | number | Total error count across all runs |runCount
| | number | Total execution count |lastRun
| | Date \| undefined | Timestamp of last execution |blockingTasks
| | Task[] | Tasks that must finish before this one starts |
| Method | Returns | Description |
| --- | --- | --- |
| addTask(task) | void | Register a task (wires event forwarding) |removeTask(task)
| | void | Remove task and unsubscribe events |getTaskByName(name)
| | Task \| undefined | Look up by name |triggerTaskByName(name)
| | Promise | Trigger by name |addAndScheduleTask(task, cron)
| | void | Register + schedule |scheduleTaskByName(name, cron)
| | void | Schedule existing task |descheduleTaskByName(name)
| | void | Remove schedule |getTaskMetadata(name)
| | ITaskMetadata \| null | Single task metadata |getAllTasksMetadata()
| | ITaskMetadata[] | All tasks metadata |getScheduledTasks()
| | IScheduledTaskInfo[] | Scheduled task info |getNextScheduledRuns(limit?)
| | Array<{...}> | Upcoming scheduled runs |getTasksByLabel(key, value)
| | Task[] | Filter tasks by label |getTasksMetadataByLabel(key, value)
| | ITaskMetadata[] | Filter metadata by label |addExecuteRemoveTask(task, opts?)
| | Promise | One-shot execution with report |start()
| | Promise | Start cron + coordinator |stop()
| | Promise | Stop cron + clean up subscriptions |
| Property | Type | Description |
| --- | --- | --- |
| taskSubject | Subject | Aggregated events from all added tasks |taskMap
| | ObjectMap | Internal task registry |
`typescript``
import type {
ITaskMetadata,
ITaskExecutionReport,
IScheduledTaskInfo,
ITaskEvent,
TTaskEventType,
ITaskStep,
ITaskFunction,
StepNames,
} from '@push.rocks/taskbuffer';
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the license.md file.
Please note: The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
Task Venture Capital GmbH
Registered at District Court Bremen HRB 35230 HB, Germany
For any legal inquiries or further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.