Message bus for handling commands, events, and queries
npm install @auto-engineer/message-busType-safe message bus implementing the CQRS pattern with command handling and event publishing.
---
Without @auto-engineer/message-bus, you would have to implement your own command/event routing, handle handler registration, manage request/correlation ID propagation, and ensure proper error isolation between handlers.
This package provides the core messaging infrastructure for the Auto Engineer ecosystem. It enables decoupled communication through commands (write operations) and events (state changes) with robust debugging support.
---
``bash`
pnpm add @auto-engineer/message-bus
`typescript
import { createMessageBus, defineCommandHandler } from '@auto-engineer/message-bus';
const bus = createMessageBus();
const handler = defineCommandHandler({
name: 'CreateUser',
alias: 'create-user',
description: 'Creates a new user',
fields: {
name: { description: 'User name', required: true },
},
examples: [],
events: ['UserCreated'],
handle: async (cmd) => ({
type: 'UserCreated',
data: { userId: 'u-001', name: cmd.data.name },
}),
});
bus.registerCommandHandler(handler);
const result = await bus.sendCommand({
type: 'CreateUser',
data: { name: 'John' },
});
console.log(result);
// → { type: 'UserCreated', data: { userId: 'u-001', name: 'John' } }
`
---
`typescript
import { createMessageBus, defineCommandHandler } from '@auto-engineer/message-bus';
const handler = defineCommandHandler({
name: 'MyCommand',
alias: 'my-command',
description: 'Does something',
fields: {},
examples: [],
events: ['MyEvent'],
handle: async (cmd) => ({ type: 'MyEvent', data: {} }),
});
const bus = createMessageBus();
bus.registerCommandHandler(handler);
`
`typescript`
const result = await bus.sendCommand({
type: 'CreateUser',
data: { name: 'John', email: 'john@example.com' },
requestId: 'req-123',
});
`typescriptUser created: ${event.data.userId}
const subscription = bus.subscribeToEvent('UserCreated', {
name: 'UserCreatedNotifier',
handle: async (event) => {
console.log();
},
});
subscription.unsubscribe();
`
`typescript[${event.type}]
const subscription = bus.subscribeAll({
name: 'EventLogger',
handle: (event) => {
console.log(, event.data);`
},
});
`typescript`
const handler = defineCommandHandler({
name: 'BatchCreate',
// ...
handle: async (cmd) => {
return cmd.data.items.map(item => ({
type: 'ItemCreated',
data: item,
}));
},
});
---
`typescript
import {
createMessageBus,
defineCommandHandler,
} from '@auto-engineer/message-bus';
import type {
Command,
Event,
CommandHandler,
EventHandler,
EventSubscription,
MessageBus,
UnifiedCommandHandler,
FieldDefinition,
} from '@auto-engineer/message-bus';
`
#### createMessageBus(): MessageBus
Create a new message bus instance.
#### defineCommandHandler(options): UnifiedCommandHandler
Define a command handler with CLI metadata.
`typescript`
type Command
type: Type;
data: Readonly;
timestamp?: Date;
requestId?: string;
correlationId?: string;
}>;
`typescript`
type Event
type: Type;
data: Data;
timestamp?: Date;
requestId?: string;
correlationId?: string;
}>;
`typescript`
interface MessageBus {
registerCommandHandler(handler: CommandHandler): void;
registerEventHandler(eventType: string, handler: EventHandler): void;
sendCommand(command: Command): Promise
publishEvent(event: Event): Promise
subscribeToEvent(eventType: string, handler: EventHandler): EventSubscription;
subscribeAll(handler: EventHandler): EventSubscription;
getCommandHandlers(): Record
}
---
``
src/
├── index.ts
├── message-bus.ts
├── define-command.ts
└── types.ts
The following diagram shows the message flow:
`mermaid`
flowchart LR
A[sendCommand] --> B[CommandHandler]
B --> C[Event]
C --> D[publishEvent]
D --> E[EventHandlers]
Flow: Command is sent, handler processes it, returns event(s), events are published to subscribers.
- One handler per command type: Ensures deterministic command processing
- Multiple handlers per event type: Enables fan-out notification
- Request/Correlation ID propagation: Maintains traceability
- Error isolation: Handler failures don't affect other handlers
This package has no dependencies on other @auto-engineer/*` packages. It is a foundational package used throughout the monorepo.