pgmq-style message queue on Durable Objects with single-alarm visibility timeout and Cap'n Web RPC
npm install @dotdo/pg-queue



A high-performance, pgmq-style message queue built on Cloudflare Durable Objects with SQLite storage. Designed for cost efficiency and low latency with Cap'n Web RPC integration.
- Single Alarm for Multiple Visibility Timeouts - O(1) instead of O(n) alarm operations per dequeue, reducing costs by ~95%
- Cap'n Web RPC - Promise pipelining allows multiple operations in a single round-trip
- SKIP LOCKED Pattern - Atomic message claiming prevents concurrent consumers from receiving the same message
- Batch Operations - enqueueBatch, ackBatch, nackBatch for high-throughput scenarios
- Dead Letter Queue (DLQ) - Automatic DLQ routing after configurable retry attempts
- Visibility Timeout Management - Automatic message re-delivery if not acknowledged
- Archive Support - Optional message archiving for audit trails
``bash`
npm install @dotdo/pg-queueor
pnpm add @dotdo/pg-queueor
yarn add @dotdo/pg-queue
Add the Durable Object binding to your wrangler.toml:
`toml
name = "my-worker"
main = "src/index.ts"
compatibility_date = "2024-12-30"
compatibility_flags = ["nodejs_compat"]
[[durable_objects.bindings]]
name = "QUEUE"
class_name = "QueueDO"
[[migrations]]
tag = "v1"
new_sqlite_classes = ["QueueDO"]
`
`typescript
import { QueueDO } from '@dotdo/pg-queue'
export { QueueDO }
export default {
async fetch(request: Request, env: Env): Promise
const id = env.QUEUE.idFromName('my-queue')
const stub = env.QUEUE.get(id)
return stub.fetch(request)
}
}
interface Env {
QUEUE: DurableObjectNamespace
}
`
`typescript
// Enqueue a message
const response = await stub.fetch(new Request('https://queue/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
payload: { task: 'process-image', url: 'https://example.com/image.jpg' },
delaySeconds: 0,
maxRetries: 3
})
}))
// Dequeue messages
const messages = await stub.fetch(new Request('https://queue/dequeue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ vtSeconds: 30, qty: 10 })
}))
`
`typescript
interface Message
msg_id: string
payload: T
status: 'pending' | 'processing' | 'completed' | 'dead'
created_at: number
vt: number // Visibility timeout (Unix timestamp in ms)
read_count: number
max_retries: number
worker_id: string | null
archived_at: number | null
}
interface SendOptions {
delaySeconds?: number // Delay before message becomes visible (default: 0)
maxRetries?: number // Max retries before moving to DLQ (default: 3)
}
interface ReadOptions {
qty?: number // Max messages to read (default: 1)
workerId?: string // Worker ID for tracking (auto-generated if not provided)
}
interface QueueMetrics {
total: number
pending: number
processing: number
completed: number
dead: number
archived: number
oldest_pending_age_seconds: number | null
next_alarm_at: number | null
alarm_invocations: number
}
`
The QueueRpc interface defines all available queue operations:
`typescript
interface QueueRpc {
// Single-Message Operations
enqueue
dequeue
ack(msgId: string): Promise
nack(msgId: string): Promise
archive(msgId: string): Promise
extend(msgId: string, additionalSeconds: number): Promise
// Batch Operations
enqueueBatch
ackBatch(msgIds: string[]): Promise
nackBatch(msgIds: string[]): Promise
// Monitoring & Management
metrics(): Promise
listDLQ
retryDLQ(msgId: string): Promise
purge(): Promise
}
`
`typescript
import type { QueueRpc, Message } from '@dotdo/pg-queue'
interface MyTask {
task: string
url: string
}
// Producer: Enqueue jobs
async function enqueueJob(stub: DurableObjectStub, task: MyTask) {
const response = await stub.fetch(new Request('https://queue/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ payload: task })
}))
const result = await response.json()
return result.data.msg_id
}
// Consumer: Process jobs
async function processJobs(stub: DurableObjectStub) {
// Dequeue up to 10 messages with 30-second visibility timeout
const response = await stub.fetch(new Request('https://queue/dequeue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ vtSeconds: 30, qty: 10 })
}))
const { data: messages } = await response.json() as { data: Message
for (const message of messages) {
try {
await processTask(message.payload)
// Acknowledge successful processing
await stub.fetch(new Request(https://queue/ack/${message.msg_id}, {https://queue/nack/${message.msg_id}
method: 'DELETE'
}))
} catch (error) {
// Negative acknowledge - returns to queue or moves to DLQ
const nackResponse = await stub.fetch(new Request(, {
method: 'POST'
}))
const { data } = await nackResponse.json() as { data: { moved_to_dlq: boolean } }
if (data.moved_to_dlq) {
console.error(Message ${message.msg_id} moved to DLQ after exhausting retries)`
}
}
}
}
`typescript
// Schedule a job to run in 5 minutes
await stub.fetch(new Request('https://queue/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
payload: { task: 'send-reminder', userId: '123' },
delaySeconds: 300 // 5 minutes
})
}))
// Schedule a job to run in 1 hour with more retries
await stub.fetch(new Request('https://queue/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
payload: { task: 'daily-report' },
delaySeconds: 3600,
maxRetries: 5
})
}))
`
`typescriptEnqueued ${batchResult.successCount} messages
// Enqueue multiple messages efficiently
const batchResponse = await stub.fetch(new Request('https://queue/batch/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
items: [
{ payload: { task: 'job1' } },
{ payload: { task: 'job2' }, options: { delaySeconds: 60 } },
{ payload: { task: 'job3' }, options: { maxRetries: 5 } }
]
})
}))
const { data: batchResult } = await batchResponse.json()
console.log()
// Acknowledge multiple messages at once
const messages = await dequeueMessages(stub, 100)
const processedIds = await processAllMessages(messages)
await stub.fetch(new Request('https://queue/batch/ack', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ msgIds: processedIds })
}))
`
For long-running tasks, extend the visibility timeout to prevent re-delivery:
`typescripthttps://queue/extend/${message.msg_id}
async function processLongRunningTask(stub: DurableObjectStub, message: Message
// Set up heartbeat to extend visibility every 20 seconds
const heartbeatInterval = setInterval(async () => {
const response = await stub.fetch(new Request(, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ additionalSeconds: 30 })
}))
const { data } = await response.json() as { data: { extended: boolean } }
if (!data.extended) {
// Message was already deleted or visibility expired
clearInterval(heartbeatInterval)
}
}, 20000)
try {
await longRunningTask(message.payload) // May take several minutes
await stub.fetch(new Request(https://queue/ack/${message.msg_id}, {`
method: 'DELETE'
}))
} finally {
clearInterval(heartbeatInterval)
}
}
`typescript
// List messages in the DLQ
const dlqResponse = await stub.fetch(new Request('https://queue/dlq?limit=50'))
const { data: dlqMessages } = await dlqResponse.json() as { data: Message
for (const msg of dlqMessages) {
console.log(Failed message: ${msg.msg_id}) Payload: ${JSON.stringify(msg.payload)}
console.log() Attempts: ${msg.read_count}
console.log()
}
// Retry a specific DLQ message after fixing the issue
await stub.fetch(new Request(https://queue/dlq/${msgId}/retry, {
method: 'POST'
}))
// Or delete it permanently
await stub.fetch(new Request(https://queue/ack/${msgId}, {`
method: 'DELETE'
}))
`typescript
async function checkQueueHealth(stub: DurableObjectStub) {
const response = await stub.fetch(new Request('https://queue/metrics'))
const { data: metrics } = await response.json() as { data: QueueMetrics }
console.log(Queue Status:) Pending: ${metrics.pending}
console.log() Processing: ${metrics.processing}
console.log() Dead (DLQ): ${metrics.dead}
console.log() Archived: ${metrics.archived}
console.log() Oldest pending age: ${metrics.oldest_pending_age_seconds}s
console.log() Alarm invocations: ${metrics.alarm_invocations}
console.log()
// Alert if DLQ is growing
if (metrics.dead > 100) {
console.warn('DLQ has over 100 messages - investigate failures!')
}
// Alert if queue depth is high
if (metrics.pending > 10000) {
console.warn('High queue depth - consider scaling consumers!')
}
}
`
For maximum efficiency, use Cap'n Web RPC with promise pipelining:
`typescript
import { createRpcClient } from 'capnweb/client'
import type { QueueRpc } from '@dotdo/pg-queue'
// WebSocket connection (hibernated for cost savings)
const ws = new WebSocket('wss://your-worker.example.com/rpc/my-queue')
const queue = createRpcClient
// Promise pipelining - all operations in single round-trip
const [result1, result2, stats] = await Promise.all([
queue.enqueue({ task: 'job1' }),
queue.enqueue({ task: 'job2' }),
queue.metrics()
])
// Batch operations for even higher throughput
const batchResult = await queue.enqueueBatch([
{ payload: { task: 'job1' } },
{ payload: { task: 'job2' }, options: { delaySeconds: 60 } },
{ payload: { task: 'job3' }, options: { maxRetries: 5 } }
])
// Process messages
const messages = await queue.dequeue
await processAllTasks(messages)
await queue.ackBatch(messages.map(m => m.msg_id))
`
The queue exposes a REST API for operations:
| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | /enqueue or /send | Enqueue a message |POST
| | /dequeue or /read | Dequeue messages with visibility timeout |DELETE
| | /ack/:msgId or /delete/:msgId | Acknowledge (delete) a message |POST
| | /nack/:msgId | Negative acknowledge (retry or DLQ) |POST
| | /archive/:msgId | Archive a message for audit trail |POST
| | /extend/:msgId | Extend visibility timeout |
| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | /batch/enqueue | Enqueue multiple messages |POST
| | /batch/ack | Acknowledge multiple messages |POST
| | /batch/nack | Nack multiple messages |
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | /metrics | Get queue metrics |GET
| | /dlq | List dead letter queue |POST
| | /dlq/:msgId/retry | Retry a DLQ message |DELETE
| | /purge | Purge all messages (use with caution!) |
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| delaySeconds | number | 0 | Delay before message becomes visible |maxRetries
| | number | 3 | Maximum retry attempts before DLQ |
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| qty | number | 1 | Maximum messages to read |workerId
| | string | auto-generated | Worker ID for tracking |
Traditional queue implementations schedule a separate alarm for each message's visibility timeout, resulting in O(n) alarm operations. This package uses a single-alarm approach:
1. One alarm is set to the earliest visibility timeout expiration
2. When the alarm fires, all expired messages are processed
3. The alarm is rescheduled to the next earliest expiration
This reduces Durable Object alarm costs by approximately 95% at scale.
Messages are claimed atomically using SQLite's UPDATE with a subquery:
`sql`
UPDATE messages
SET status = 'processing', vt = ?, worker_id = ?
WHERE msg_id IN (
SELECT msg_id FROM messages
WHERE status = 'pending' AND vt <= ?
ORDER BY created_at ASC
LIMIT ?
)
RETURNING *
This prevents race conditions when multiple consumers dequeue simultaneously.
``
pending -> processing -> (ack) -> deleted
-> (nack) -> pending (retry)
-> (nack + max retries) -> dead (DLQ)
-> (archive) -> archived
-> (vt expires) -> pending (auto-retry)
1. Check queue metrics: Use /metrics to see if messages are pending
2. Verify visibility timeout: Messages reappear after VT expires if not acked
3. Check DLQ: Failed messages may have moved to the dead letter queue
1. Review error logs: Check why processing is failing
2. Inspect DLQ messages: Use /dlq to see failed payloads/dlq/:id/retry
3. Increase maxRetries: If failures are transient, allow more attempts
4. Fix and retry: After fixing issues, use
1. Visibility timeout too short: Increase vtSeconds for long tasksack()
2. Missing acknowledgment: Ensure is called after processingextend()
3. Use heartbeat pattern: Call for long-running tasks
The SQLite storage in Durable Objects has limits. For very high-volume queues:
1. Archive processed messages: Use /archive instead of /ack if audit needed
2. Purge completed messages: Periodically clean up old data
3. Use multiple queue instances: Shard by queue name
`typescript
// Main Durable Object
export { QueueDO, handleQueueRpc } from '@dotdo/pg-queue'
// Store (for standalone/testing use)
export { QueueStore, QUEUE_SCHEMA } from '@dotdo/pg-queue/store'
// Visibility management utilities
export {
VisibilityAlarmManager,
parseMessageRow,
generateMsgId,
generateWorkerId,
VISIBILITY_QUERIES,
} from '@dotdo/pg-queue/visibility'
// Types
export type {
Message,
MessageRow,
MessageStatus,
SendOptions,
ReadOptions,
QueueMetrics,
SendResult,
DeleteResult,
ArchiveResult,
NackResult,
BatchEnqueueItem,
BatchEnqueueResult,
BatchAckResult,
BatchNackResult,
QueueEnv,
QueueRpc,
} from '@dotdo/pg-queue'
``
MIT