Robust message queue management with multiple backends, job processing, and failure handling
npm install @bernierllc/queue-managerRobust message queue management with multiple backends, job processing, priority handling, and automatic retry logic.
``bash`
npm install @bernierllc/queue-manager
For Redis backend support, install the peer dependency:
`bash`
npm install @bernierllc/queue-manager ioredis
`typescript
import { QueueManager, JobPriority } from '@bernierllc/queue-manager';
// Create a queue with memory backend
const queue = new QueueManager('email-queue', {
backend: 'memory',
concurrency: 5,
retryPolicy: {
maxAttempts: 3,
backoff: { type: 'exponential', delay: 1000, maxDelay: 30000 }
}
});
// Register a job processor
queue.process('send-email', async (job) => {
const { to, subject, body } = job.data;
await sendEmail(to, subject, body);
return { sent: true };
});
// Add a job to the queue
await queue.add('send-email', {
to: 'user@example.com',
subject: 'Welcome',
body: 'Welcome to our service!'
}, {
priority: JobPriority.HIGH
});
`
`typescript
import { QueueManager, JobPriority } from '@bernierllc/queue-manager';
const queue = new QueueManager('tasks', { backend: 'memory' });
// Add jobs with different priorities
await queue.add('task', { type: 'urgent' }, { priority: JobPriority.CRITICAL });
await queue.add('task', { type: 'important' }, { priority: JobPriority.HIGH });
await queue.add('task', { type: 'routine' }, { priority: JobPriority.NORMAL });
await queue.add('task', { type: 'background' }, { priority: JobPriority.LOW });
// Jobs are processed in priority order (CRITICAL > HIGH > NORMAL > LOW)
`
`typescript
// Schedule a job to run after 1 hour
await queue.add('reminder', {
message: 'Time to review'
}, {
delay: 3600000 // 1 hour in milliseconds
});
// Schedule with custom retry policy
await queue.add('api-call', {
endpoint: '/users',
method: 'POST'
}, {
delay: 5000,
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000,
maxDelay: 60000
}
});
`
`typescript
// Add multiple jobs efficiently
const jobs = await queue.addBulk([
{ type: 'process-user', data: { userId: 1 }, options: { priority: JobPriority.HIGH } },
{ type: 'process-user', data: { userId: 2 } },
{ type: 'process-user', data: { userId: 3 } }
]);
console.log(Added ${jobs.length} jobs);`
`typescriptJob ${job.id} added to queue
// Listen to queue events
queue.on('job:waiting', (job) => {
console.log();
});
queue.on('job:active', (job) => {
console.log(Processing job ${job.id});
});
queue.on('job:completed', (job) => {
console.log(Job ${job.id} completed);
});
queue.on('job:failed', (job) => {
console.error(Job ${job.id} failed: ${job.error});
});
queue.on('job:progress', (job) => {
console.log(Job ${job.id} progress: ${job.progress}%);`
});
`typescript`
// Get queue statistics
const stats = await queue.getStats();
console.log('Queue Stats:', {
waiting: stats.waiting,
active: stats.active,
completed: stats.completed,
failed: stats.failed,
delayed: stats.delayed,
totalProcessed: stats.totalProcessed,
throughput: stats.throughput
});
`typescript
// Get a specific job
const job = await queue.getJob('job-id-123');
// Remove a job
await queue.removeJob('job-id-123');
// Get jobs by status
const waitingJobs = await queue.getJobs(JobStatus.WAITING);
const failedJobs = await queue.getJobs(JobStatus.FAILED);
// Clean up old completed jobs (older than 24 hours)
const removed = await queue.clean(86400000, JobStatus.COMPLETED);
console.log(Removed ${removed} old jobs);`
`typescript
// Process up to 10 jobs concurrently
const queue = new QueueManager('high-throughput', {
backend: 'memory',
concurrency: 10
});
queue.process('task', async (job) => {
// Job processing logic
await processTask(job.data);
});
`
`typescript
// Create queue with manual start
const queue = new QueueManager('manual-queue', {
backend: 'memory',
autoStart: false // Don't auto-start processing
});
// Add jobs
await queue.add('task', { data: 'test' });
// Start processing when ready
await queue.start();
// Pause processing
await queue.pause();
// Resume processing
await queue.resume();
// Stop processing and close connections
await queue.close();
`
`typescript
const queue = new QueueManager('critical-tasks', {
backend: 'memory',
retryPolicy: {
maxAttempts: 3,
backoff: { type: 'exponential', delay: 1000, maxDelay: 30000 }
},
deadLetterQueue: 'failed-tasks'
});
// Jobs that fail after all retries are moved to the dead letter queue
queue.process('task', async (job) => {
if (job.data.shouldFail) {
throw new Error('Task failed');
}
return { success: true };
});
// Access dead letter queue for manual inspection
const dlq = new QueueManager('failed-tasks', { backend: 'memory' });
const failedJobs = await dlq.getJobs(JobStatus.FAILED);
`
#### Constructor
`typescript`
constructor(name: string, options: QueueOptions)
Creates a new queue manager instance.
Parameters:
- name - Unique queue nameoptions
- - Queue configuration options
#### Methods
##### add(type: string, data: T, options?: JobOptions): Promise
Add a single job to the queue.
Parameters:
- type - Job type identifier for processor routingdata
- - Job payload dataoptions
- - Optional job configuration (priority, delay, attempts, etc.)
Returns: Promise resolving to the created job
##### addBulk(jobs: Array<{type: string, data: T, options?: JobOptions}>): Promise
Add multiple jobs to the queue efficiently.
Parameters:
- jobs - Array of job specifications
Returns: Promise resolving to array of created jobs
##### process(type: string, processor: JobProcessor
Register a processor for a specific job type.
Parameters:
- type - Job type to processprocessor
- - Async function that processes the job
##### start(): Promise
Start processing jobs from the queue.
##### pause(): Promise
Pause job processing (current jobs complete, new jobs wait).
##### resume(): Promise
Resume job processing after pause.
##### close(): Promise
Stop processing and close backend connections.
##### getJob(id: string): Promise
Retrieve a specific job by ID.
Parameters:
- id - Job ID
Returns: Promise resolving to job or null if not found
##### getJobs(status: JobStatus): Promise
Get all jobs with a specific status.
Parameters:
- status - Job status filter (WAITING, ACTIVE, COMPLETED, FAILED, DELAYED)
Returns: Promise resolving to array of jobs
##### removeJob(id: string): Promise
Remove a job from the queue.
Parameters:
- id - Job ID to remove
##### getStats(): Promise
Get queue statistics.
Returns: Promise resolving to statistics object
##### clean(maxAge: number, status: JobStatus): Promise
Remove old jobs with a specific status.
Parameters:
- maxAge - Maximum age in millisecondsstatus
- - Job status to clean
Returns: Promise resolving to number of jobs removed
#### QueueOptions
`typescript`
interface QueueOptions {
backend: 'memory' | 'redis';
concurrency?: number; // Default: 1
defaultDelay?: number; // Default: 0
retryPolicy?: RetryPolicy; // Default: 3 attempts, exponential backoff
deadLetterQueue?: string; // Default: '{name}-dlq'
jobTimeout?: number; // Default: 30000ms
cleanupInterval?: number; // Default: 3600000ms (1 hour)
autoStart?: boolean; // Default: true
}
#### JobOptions
`typescript`
interface JobOptions {
priority?: JobPriority;
delay?: number;
attempts?: number;
timeout?: number;
backoff?: BackoffStrategy;
metadata?: Record
}
#### JobPriority
`typescript`
enum JobPriority {
LOW = 1,
NORMAL = 5,
HIGH = 10,
CRITICAL = 20
}
#### JobStatus
`typescript`
enum JobStatus {
WAITING = 'waiting',
ACTIVE = 'active',
COMPLETED = 'completed',
FAILED = 'failed',
DELAYED = 'delayed'
}
#### Job
`typescript`
interface Job
id: string;
type: string;
data: T;
priority: JobPriority;
attempts: number;
maxAttempts: number;
delay?: number;
createdAt: Date;
processedAt?: Date;
completedAt?: Date;
failedAt?: Date;
error?: string;
progress?: number;
metadata?: Record
}
#### QueueStats
`typescript`
interface QueueStats {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
totalProcessed: number;
throughput: number;
}
This is a pure core utility package focused on queue management. Logger integration is not required as the package:
- Uses standard EventEmitter for job lifecycle events
- Provides comprehensive event system for external logging integration
- Allows consumers to add logging via event handlers
- Keeps the package lightweight and dependency-free
Applications using this package can integrate logging by listening to queue events:
`typescript
import { createLogger } from '@bernierllc/logger';
const logger = createLogger({ service: 'queue' });
queue.on('job:completed', (job) => {
logger.info('Job completed', { jobId: job.id, type: job.type });
});
queue.on('job:failed', (job) => {
logger.error('Job failed', { jobId: job.id, error: job.error });
});
`
Documentation format: Markdown with TypeScript examples
- Complete API reference with type definitions
- Comprehensive usage examples
- Integration patterns documented
This is a foundational core utility package that:
- Provides queue management primitives
- Has no runtime service dependencies
- Operates independently without service discovery needs
- Designed for embedding in higher-level service packages
NeverHub integration should be implemented in service-layer packages that orchestrate multiple core utilities, not in the atomic core packages themselves. This maintains clean separation of concerns per MECE architecture.
This is a pure utility core package with no runtime environment configuration. All configuration is provided programmatically through the constructor options.
`typescript`
const queue = new QueueManager('my-queue', {
backend: 'memory', // or 'redis'
concurrency: 5, // Process 5 jobs concurrently
defaultDelay: 0, // Default job delay in ms
retryPolicy: {
maxAttempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
maxDelay: 30000
}
},
deadLetterQueue: 'my-queue-dlq',
jobTimeout: 30000, // Job timeout in ms
cleanupInterval: 3600000, // Cleanup interval in ms
autoStart: true // Auto-start processing
});
- Multiple Backends: Memory (development) and Redis (production) support
- Priority Queues: Process jobs based on priority (CRITICAL > HIGH > NORMAL > LOW)
- Delayed Jobs: Schedule jobs to run after a specified delay
- Automatic Retries: Configurable retry logic with exponential backoff using @bernierllc/retry-policy
- Dead Letter Queue: Failed jobs moved to DLQ after max retry attempts
- Concurrency Control: Process multiple jobs in parallel with configurable limits
- Job Lifecycle Events: Comprehensive event system for monitoring
- Statistics & Monitoring: Real-time queue metrics and throughput tracking
- Job Management: Get, remove, and clean jobs by status
- Type-Safe: Full TypeScript support with strict typing
- Timeout Protection: Configurable job timeouts to prevent stuck jobs
- Bulk Operations: Efficient bulk job addition
- @bernierllc/retry-policy` - Exponential backoff and retry logic
- @bernierllc/retry-policy - Retry strategies used by this package
- @bernierllc/message-queue - Alternative message queue implementation
Copyright (c) 2025 Bernier LLC. All rights reserved.
This package is licensed under the MIT License.