A lightweight TypeScript CQRS and Event Sourcing framework with pluggable database adapters
npm install cqrskitA lightweight TypeScript CQRS (Command Query Responsibility Segregation) and Event Sourcing framework with pluggable database adapters.
- Full CQRS Implementation - Separate command and query models with clear boundaries
- Event Sourcing - Rebuild aggregate state from historical events
- Pluggable Database Adapters - Easy integration with any event store (Genesis DB, EventStoreDB, PostgreSQL, etc.)
- TypeScript-First - Full type safety and excellent IDE support
- Decorator-Based API - Clean, intuitive syntax for defining handlers
- State Caching - Optional LRU cache for reconstructed aggregates
- Event Upcasting - Handle event schema evolution gracefully
- Async Event Processing - Background event handlers with partitioning and progress tracking
- Testing Utilities - Fluent Given-When-Then API for testing command handlers
- Zero Dependencies (except database adapters)
``bash`
npm install cqrskit
For Genesis DB support:
`bash`
npm install cqrskit genesisdb
Before diving in, here are the core concepts used throughout this framework:
aggregate with properties like id, title, status$3
An instruction to perform an action that may change state:
- Immutable data structure
- Examples: CreateTaskCommand, AssignTaskCommand
- Can succeed or fail based on business rules$3
A fact that has already happened and cannot be changed:
- Immutable data structure
- Past tense naming: TaskCreatedEvent, TaskAssignedEvent
- Contains all data needed to update state$3
Business logic that processes a command:
- Validates the command
- Checks business rules
- Publishes one or more events if valid
- Example: handleCreate(), handleAssign()$3
Logic that applies an event to recreate aggregate state:
- Takes previous state and an event
- Returns new state
- Pure function (no side effects)
- Example: onTaskCreated(), onTaskAssigned()$3
Asynchronous logic that reacts to events:
- Updates read models (projections)
- Sends notifications
- Triggers external actions
- Runs in background, separate from commands$3
A unique identifier for an aggregate instance:
- Hierarchical path format: /task/task-1, /user/user-123
- Used to query and store events
- Supports recursive queries (e.g., /task/*)$3
TypeScript annotation that adds metadata to classes/methods:
- @CommandHandling() - Marks command handler methods
- @StateRebuilding() - Marks state rebuilding methods
- @EventHandling() - Marks event handler methods
- Simplifies configuration and registration$3
Strategy for loading events when executing a command:
- NONE - Don't load any events (for stateless commands)
- LOCAL - Load events for exact subject only
- RECURSIVE - Load events for subject and all children (for parent-child relationships)$3
Converting old event versions to new versions:
- Handles event schema evolution
- Example: Renaming fields, adding required fields
- Allows changing event structure without breaking old data
- Applied automatically when reading events$3
Optimistic locking mechanism:
- Checks expected last event ID before publishing
- Prevents concurrent modification conflicts
- Example: Ensure task hasn't been modified since we loaded it$3
Database that stores events:
- Append-only (events never modified/deleted)
- Supports querying by subject and timestamp
- Examples: Genesis DB, PostgreSQL, MySQL
- CQRSKit supports custom adapters$3
Denormalized view of data optimized for queries:
- Built from events
- Can be rebuilt anytime from event history
- Example: Task list, user profile view
- Updated by event handlers$3
Splitting event processing across multiple workers:
- Enables parallel processing
- Each partition handles subset of events
- Ensures events for same subject stay in order$3
Tracks position in event stream:
- Remembers last processed event
- Enables resuming after restart
- Per partition and event handler group$3
Additional context attached to events:
- Not part of event data itself
- Examples: correlationId, userId, timestamp
- Can be propagated from commands to events$3
In-memory storage of reconstructed aggregate state:
- Improves performance
- LRU eviction policy (removes least recently used)
- Optional optimizationQuick Start
$3
`typescript
// domain/Task.ts
export enum TaskStatus {
TODO = 'TODO',
IN_PROGRESS = 'IN_PROGRESS',
COMPLETED = 'COMPLETED'
}export class Task {
constructor(
public id: string,
public title: string,
public assignee: string | null,
public status: TaskStatus
) {}
}
`$3
`typescript
// commands/CreateTaskCommand.ts
import { Command, SubjectCondition } from 'cqrskit';export class CreateTaskCommand implements Command {
constructor(
public taskId: string,
public title: string
) {}
getSubject(): string {
return
/task/${this.taskId};
} getSubjectCondition(): SubjectCondition {
return SubjectCondition.NEW; // Must not exist
}
}
`$3
`typescript
// events/TaskCreatedEvent.ts
export class TaskCreatedEvent {
constructor(
public taskId: string,
public title: string,
public createdAt: string
) {}
}
`$3
`typescript
// handlers/TaskHandlers.ts
import { CommandEventPublisher, SourcingMode } from 'cqrskit';
import { CommandHandling, StateRebuilding } from 'cqrskit';export class TaskHandlers {
@CommandHandling({ sourcingMode: SourcingMode.LOCAL })
async handleCreate(
command: CreateTaskCommand,
publisher: CommandEventPublisher
): Promise {
publisher.publish(
new TaskCreatedEvent(command.taskId, command.title, new Date().toISOString())
);
return command.taskId;
}
@StateRebuilding()
onTaskCreated(task: Task | null, event: TaskCreatedEvent): Task {
return new Task(event.taskId, event.title, null, TaskStatus.TODO);
}
}
`$3
`typescript
import {
CommandRouter,
GenesisDBAdapter,
ConfiguredEventTypeResolver,
JsonEventDataMarshaller,
InMemoryStateCache
} from 'cqrskit';// Configure database adapter
const eventStore = new GenesisDBAdapter({
apiUrl: 'http://localhost:8080',
authToken: 'your-token',
source: 'my-app'
});
// Configure event type resolver
const eventTypeResolver = new ConfiguredEventTypeResolver()
.register('app.TaskCreatedEvent', TaskCreatedEvent);
// Create command router
const commandRouter = new CommandRouter({
eventStore,
eventTypeResolver,
eventDataMarshaller: new JsonEventDataMarshaller(),
commandHandlers: [/ your handler definitions /],
stateRebuildingHandlers: [/ your state rebuilding definitions /],
eventSource: 'my-app',
cache: new InMemoryStateCache(1000)
});
// Send commands
const taskId = await commandRouter.send(
new CreateTaskCommand('task-1', 'Implement login feature')
);
`Core Concepts
$3
Commands are immutable instructions to change aggregate state. They implement the
Command interface:`typescript
interface Command {
getSubject(): string;
getSubjectCondition?(): SubjectCondition;
}
`Subject Conditions:
-
NONE - No validation (default)
- NEW - Subject must not exist (for creation)
- EXISTS - Subject must exist (for updates)$3
Events represent facts that have occurred in the system. They are plain TypeScript classes:
`typescript
class TaskCreatedEvent {
constructor(
public taskId: string,
public title: string,
public createdAt: string
) {}
}
`$3
Command handlers contain business logic. They receive commands, validate them, and publish events:
`typescript
@CommandHandling({ sourcingMode: SourcingMode.RECURSIVE })
async handleCommand(
instance: MyAggregate, // Reconstructed state
command: MyCommand, // The command
publisher: CommandEventPublisher
): Promise {
// Validate
if (!instance.canDoSomething()) {
throw new Error('Cannot do something');
} // Publish events
publisher.publish(new SomethingDoneEvent(...));
return result;
}
`Sourcing Modes:
-
NONE - No event sourcing
- LOCAL - Load events for exact subject only
- RECURSIVE - Load events for subject and children$3
State rebuilding handlers reconstruct aggregate state from events:
`typescript
@StateRebuilding()
onEventOccurred(
instance: MyAggregate | null,
event: EventOccurred
): MyAggregate {
// Return new state
return new MyAggregate(...);
}
`$3
Event handlers process events asynchronously (for read models, notifications, etc.):
`typescript
@EventHandling({ group: 'my-projector' })
async onEventOccurred(event: EventOccurred): Promise {
// Update read model
await database.save(...);
}
`Pluggable Database Adapters
CQRSKit supports any event store through the
EventStoreAdapter interface:`typescript
interface EventStoreAdapter {
streamEvents(subject: string, options?: StreamOptions, recursive?: boolean): AsyncIterable;
observeEvents(subject: string, options?: StreamOptions, recursive?: boolean): AsyncIterable;
publishEvents(events: EventToPublish[], preconditions?: Precondition[]): Promise;
ping?(): Promise;
}
`$3
#### Genesis DB
`typescript
import { GenesisDBAdapter } from 'cqrskit';const adapter = new GenesisDBAdapter({
apiUrl: process.env.GENESISDB_API_URL,
authToken: process.env.GENESISDB_AUTH_TOKEN,
source: 'my-app'
});
`$3
See the
examples/custom-adapter/ directory for complete examples:
- InMemoryAdapter.ts - Simple in-memory implementation
- MySQLAdapter.ts - MySQL database adapter with polling
- PostgreSQLAdapter.ts - PostgreSQL adapter with LISTEN/NOTIFYExample custom adapter:
`typescript
export class MyCustomAdapter implements EventStoreAdapter {
async *streamEvents(subject: string, options?: StreamOptions): AsyncIterable {
// Fetch historical events from your database
const events = await myDatabase.query(...);
for (const event of events) {
yield mapToRawEvent(event);
}
} async *observeEvents(subject: string, options?: StreamOptions): AsyncIterable {
// Stream historical events first
for await (const event of this.streamEvents(subject, options)) {
yield event;
}
// Then subscribe to new events
await myDatabase.subscribe(...);
}
async publishEvents(events: EventToPublish[]): Promise {
// Store events atomically
return await myDatabase.insertEvents(events);
}
}
`Testing
CQRSKit provides a fluent Given-When-Then API for testing:
`typescript
import { CommandHandlingTestFixture } from 'cqrskit';it('should create a task', async () => {
await CommandHandlingTestFixture
.create()
.withStateRebuildingHandlers([...])
.using(Task, handlers.handleCreate)
.given()
.givenNothing()
.when(new CreateTaskCommand('task-1', 'Implement login feature'))
.then(expect => {
expect
.expectSuccessfulExecution()
.expectSingleEvent(new TaskCreatedEvent(...))
.expectResult('task-1');
});
});
`Test API:
-
givenNothing() - Start with no state
- givenEvents(...) - Start with historical events
- givenState(instance) - Start with explicit state
- when(command) - Execute command
- expectSuccessfulExecution() - Assert no errors
- expectFailure(message?) - Assert error occurred
- expectResult(value) - Assert return value
- expectEventCount(n) - Assert event count
- expectEvent(event) - Assert next event matches
- expectNoEvents() - Assert no events publishedAdvanced Features
$3
Handle event schema evolution:
`typescript
import { EventUpcaster, UpcasterResult } from 'cqrskit';class TaskEventUpcaster implements EventUpcaster {
canUpcast(event: RawEvent): boolean {
return event.type === 'app.TaskAddedEvent.v1';
}
upcast(event: RawEvent): UpcasterResult[] {
return [{
type: 'app.TaskCreatedEvent',
data: {
...event.data,
createdAt: event.data.timestamp || new Date().toISOString() // Rename field
}
}];
}
}
`$3
Improve performance with in-memory caching:
`typescript
import { InMemoryStateCache } from 'cqrskit';const cache = new InMemoryStateCache(1000); // Cache 1000 aggregates
const commandRouter = new CommandRouter({
// ...
cache
});
`$3
Propagate metadata (correlation IDs, user context, etc.):
`typescript
import { MetadataPropagationMode } from 'cqrskit';const commandRouter = new CommandRouter({
// ...
metadataPropagation: {
mode: MetadataPropagationMode.SHALLOW,
keys: ['correlationId', 'userId']
}
});
await commandRouter.send(
new MyCommand(),
{ correlationId: '123', userId: 'john' }
);
`Propagation Modes:
-
NONE - Don't propagate
- SHALLOW - Propagate specific keys
- DEEP - Propagate all metadata$3
Process events in parallel with partitioning:
`typescript
import {
EventHandlingProcessor,
DefaultPartitionKeyResolver,
PerSubjectEventSequenceResolver
} from 'cqrskit';const processor = new EventHandlingProcessor({
group: 'my-group',
partition: 0, // Process partition 0
partitionKeyResolver: new DefaultPartitionKeyResolver(10), // 10 partitions
eventSequenceResolver: new PerSubjectEventSequenceResolver(),
// ...
});
processor.start();
`Examples
See the
examples/ directory for complete working examples:-
examples/tasks/ - Full task management system
- examples/custom-adapter/ - Custom in-memory adapter
- examples/testing/ - Test examples with Given-When-ThenRun the task management example:
`bash
cd examples/tasks
npm install
npx tsx app.ts
`Architecture
CQRSKit follows the CQRS and Event Sourcing patterns:
`
┌─────────────┐
│ Command │
└──────┬──────┘
│
v
┌─────────────────┐ ┌──────────────┐
│ CommandRouter │─────→│ Event Store │
└─────────────────┘ └──────┬───────┘
│ │
│ Rebuild State │ Observe
v v
┌─────────────────┐ ┌──────────────────┐
│ State Handlers │ │ Event Processors │
└─────────────────┘ └──────────────────┘
│
v
┌──────────────┐
│ Read Model │
└──────────────┘
`API Reference
$3
-
Command - Base interface for commands
- SubjectCondition - Validation for aggregate existence
- SourcingMode - Event loading strategy
- RawEvent - Event as stored in database
- EventToPublish - Event to be published
- Precondition - Conditional event publishing$3
-
CommandRouter - Routes commands to handlers
- CommandHandler - Processes commands
- StateRebuildingHandler - Rebuilds state from events
- CommandEventPublisher - Publishes events during command execution$3
-
EventHandler - Processes events asynchronously
- EventHandlingProcessor - Background event processing
- ProgressTracker - Tracks processing position
- PartitionKeyResolver - Maps events to partitions$3
-
EventStoreAdapter - Abstract database interface
- GenesisDBAdapter - Genesis DB implementation
- StreamOptions - Options for streaming events$3
-
EventTypeResolver - Maps classes to event types
- EventDataMarshaller - Serializes events
- EventUpcaster - Migrates event schemas
- StateRebuildingCache` - Caches reconstructed stateContributions welcome! Please open an issue or PR.
- Genesis DB: Genesis DB - The GDPR-ready event sourcing database
MIT