AMQP EventBus adapter for A2A protocol with LavinMQ streams support
npm install @cloudamqp/a2a-amqpAMQP-backed EventBus and WorkQueue for scaling A2A agents with long-running tasks.
A2A agents often need to handle long-running tasks (LLM calls, complex processing, etc.). Running these tasks inline in HTTP handlers causes:
- Timeout issues: HTTP connections timeout on long tasks
- Scaling problems: Single server bottlenecks
- Resource waste: Servers blocked waiting for tasks to complete
This library solves these problems by:
1. Queuing tasks via AMQP instead of processing inline
2. Distributing work across multiple worker processes
3. Event sourcing all task events for replay and recovery
4. Streaming results back via SSE while workers process in the background
```
HTTP Request → Server (enqueues task) → Returns immediately
↓
AMQP Queue
↓
Worker Pool (scales horizontally)
↓
Process task & publish events
↓
AMQP Stream (event sourcing)
↓
Client streams results via SSE
`bashInstalling using bun
bun add @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client
Requires: LavinMQ or RabbitMQ with stream support
`bash
docker run -d -p 5672:5672 -p 15672:15672 cloudamqp/lavinmq:latest
`Quick Start
$3
`typescript
import { AMQPAgentBackend, QueuingRequestHandler } from "@84codes/a2a-amqp";
import { A2AExpressApp } from "@a2a-js/sdk/server/express";
import express from "express";// Create AMQP backend
const backend = await AMQPAgentBackend.create({
url: "amqp://localhost:5672",
agentName: "my-agent",
});
// Create request handler (handles task queuing + event projection)
const requestHandler = new QueuingRequestHandler(agentCard, backend);
await requestHandler.initialize();
// Setup Express with A2A routes
const app = express();
new A2AExpressApp(requestHandler).setupRoutes(app, "/");
app.listen(3000);
`$3
`typescript
import { AMQPAgentBackend, WorkerEventBus } from "@84codes/a2a-amqp";
import { AgentExecutor, RequestContext } from "@a2a-js/sdk/server";// Create backend with same agent name as server
const backend = await AMQPAgentBackend.create({
url: "amqp://localhost:5672",
agentName: "my-agent",
});
// Initialize work queue
await backend.workQueue.initialize();
class MyExecutor implements AgentExecutor {
async execute(context: RequestContext, eventBus: ExecutionEventBus) {
// Your long-running task logic here
eventBus.publish({
kind: "status-update",
taskId: context.taskId,
contextId: context.contextId,
status: { state: "working", timestamp: new Date().toISOString() },
final: false,
});
// ... do work ...
eventBus.publish({
kind: "status-update",
taskId: context.taskId,
contextId: context.contextId,
status: { state: "completed", timestamp: new Date().toISOString() },
final: true,
});
eventBus.finished();
}
}
const executor = new MyExecutor();
// Start consuming with async generator pattern
const messages = backend.workQueue.start();
for await (const taskMessage of messages) {
const { taskId, contextId, requestContext } = taskMessage;
// Create request context
const context = new RequestContext(
requestContext.userMessage,
taskId,
contextId,
requestContext.task,
requestContext.referenceTasks
);
// Create event bus for publishing task events
const eventBus = new WorkerEventBus(backend.amqpConnection, taskId, contextId);
// Execute task
await executor.execute(context, eventBus);
}
`$3
Run multiple workers to process tasks in parallel:
`bash
Terminal 1: HTTP Server
bun run serverTerminal 2-N: Workers (scale as needed)
bun run worker
bun run worker # Add more workers for higher throughput
`Features
- Work Queue: Distribute tasks across multiple worker processes
- Event Sourcing: All task events stored in AMQP streams for replay
- In-Memory Projection: Fast task lookups with automatic recovery from streams
- SSE Streaming: Automatic streaming of task events back to clients
- Horizontal Scaling: Add more workers to increase throughput
- Graceful Shutdown: Clean consumer and connection handling
- Type-Safe: Full TypeScript support with Zod validation
Configuration
`typescript
interface AMQPAgentBackendConfig {
url: string; // AMQP broker URL
agentName: string; // Agent identifier
streamRetention?: string; // Event retention (default: "7d")
streamMaxBytes?: number; // Max stream size (default: 1GB)
workQueueName?: string; // Custom work queue name
exchangeName?: string; // Custom exchange name
logger?: Logger; // Custom logger
connection?: {
heartbeat?: number; // Heartbeat interval in seconds
reconnectDelay?: number; // Reconnection delay in ms
maxReconnectAttempts?: number;// Max reconnection attempts
};
publishing?: {
persistent?: boolean; // Persistent messages (default: true)
confirmMode?: boolean; // Publisher confirms (default: true)
messageTtl?: number; // Message TTL in ms (0 = no expiration)
};
}
`Examples
See complete working examples:
-
src/examples/http-server.ts - HTTP server with queuing
- src/examples/worker.ts - Worker process`bash
Run the example
bun run server # Terminal 1
bun run worker # Terminal 2Send a request
curl -X POST http://localhost:3000/ \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"messages/send","params":{"message":{"kind":"message","role":"user","messageId":"1","contextId":"ctx-1","parts":[{"kind":"text","text":"Hello"}]}}}'
`Testing
`bash
bun run test # Run all tests (unit + integration)
bun run test:unit # Unit tests only
bun run test:integration # Integration tests only
bun run test:watch # Watch mode
bun run test:coverage # With coverage
``MIT