Worker utilities for consuming messages using amqp-contract
npm install @amqp-contract/workerType-safe AMQP worker for consuming messages using amqp-contract with Future/Result error handling.





``bash`
pnpm add @amqp-contract/worker
- ā
Type-safe message consumption ā Handlers are fully typed based on your contract
- ā
Automatic validation ā Messages are validated before reaching your handlers
- ā
Automatic retry with exponential backoff ā Built-in retry mechanism using RabbitMQ TTL+DLX pattern
- ā
Prefetch configuration ā Control message flow with per-consumer prefetch settings
- ā
Batch processing ā Process multiple messages at once for better throughput
- ā
Automatic reconnection ā Built-in connection management with failover support
`typescript
import { TypedAmqpWorker, RetryableError } from "@amqp-contract/worker";
import type { Logger } from "@amqp-contract/core";
import { Future } from "@swan-io/boxed";
import { contract } from "./contract";
// Optional: Create a logger implementation
const logger: Logger = {
debug: (message, context) => console.debug(message, context),
info: (message, context) => console.info(message, context),
warn: (message, context) => console.warn(message, context),
error: (message, context) => console.error(message, context),
};
// Create worker from contract with handlers (automatically connects and starts consuming)
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: ({ payload }) => {
console.log("Processing order:", payload.orderId);
// Your business logic here
return Future.fromPromise(Promise.all([processPayment(payload), updateInventory(payload)]))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Order processing failed", error));
},
},
urls: ["amqp://localhost"],
logger, // Optional: logs message consumption and errors
});
// Worker is already consuming messages
// Clean up when needed
// await worker.close();
`
For advanced features like prefetch configuration, batch processing, and automatic retry with exponential backoff, see the Worker Usage Guide.
#### Retry with Exponential Backoff
Retry is configured at the queue level in your contract definition. Add retry to your queue definition:
`typescript
import { defineQueue, defineExchange, defineContract } from "@amqp-contract/contract";
const dlx = defineExchange("orders-dlx", "topic", { durable: true });
// Configure retry at queue level
const orderQueue = defineQueue("order-processing", {
deadLetter: { exchange: dlx },
retry: {
mode: "ttl-backoff",
maxRetries: 3, // Retry up to 3 times (default: 3)
initialDelayMs: 1000, // Start with 1 second delay (default: 1000)
maxDelayMs: 30000, // Max 30 seconds between retries (default: 30000)
backoffMultiplier: 2, // Double the delay each time (default: 2)
jitter: true, // Add randomness to prevent thundering herd (default: true)
},
});
`
Then use RetryableError in your handlers:
`typescript
import { TypedAmqpWorker, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: ({ payload }) =>
// If this fails with RetryableError, message is automatically retried
Future.fromPromise(processPayment(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Payment failed", error)),
},
urls: ["amqp://localhost"],
});
`
The retry mechanism uses RabbitMQ's native TTL and Dead Letter Exchange pattern, so it doesn't block the consumer during retry delays. See the Error Handling and Retry section in the guide for complete details.
You can define handlers outside of the worker creation using defineHandler and defineHandlers for better code organization. See the Worker API documentation for details.
Worker handlers return Future for explicit error handling:
`typescript
import { RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
handlers: {
processOrder: ({ payload }) => {
// Validation errors - non-retryable
if (payload.amount <= 0) {
return Future.value(Result.Error(new NonRetryableError("Invalid amount")));
}
// Transient errors - retryable
return Future.fromPromise(process(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Processing failed", error));
},
}
`
Error Types:
Worker defines error classes:
- TechnicalError - Runtime failures (parsing, processing)MessageValidationError
- - Message fails schema validationRetryableError
- - Signals that the error is transient and should be retriedNonRetryableError` - Signals permanent failure, message goes to DLQ
-
For complete API documentation, see the Worker API Reference.
š Read the full documentation ā
MIT