A TypeScript library for building event-sourced applications in Node.js. This library provides the foundational building blocks for implementing CQRS (Command Query Responsibility Segregation) and Event Sourcing patterns.
npm install @robinmalfait/event-sourceA TypeScript library for building event-sourced applications in Node.js. This library provides the foundational building blocks for implementing CQRS (Command Query Responsibility Segregation) and Event Sourcing patterns.
``bash`
npm install @robinmalfait/event-sourceor
pnpm add @robinmalfait/event-source
Events are immutable records of something that happened in your domain. They contain an aggregate ID, payload, metadata, and version information.
`typescript
import { Event } from '@robinmalfait/event-source'
function accountOpened(id: string, owner: string) {
return Event('ACCOUNT_OPENED', id, { owner })
}
function moneyDeposited(id: string, amount: number) {
return Event('MONEY_DEPOSITED', id, { amount })
}
`
Commands represent an intent to perform an action. They carry a type and payload.
`typescript
import { Command } from '@robinmalfait/event-source'
function openAccount(id: string, owner: string) {
return Command('OPEN_ACCOUNT', { id, owner })
}
function depositMoney(accountId: string, amount: number) {
return Command('DEPOSIT_MONEY', { accountId, amount })
}
`
Aggregates are domain entities that emit events and rebuild their state from event history. They extend the base Aggregate class and define apply handlers for each event type.
`typescript
import { Aggregate, type ApplyEvents, abort } from '@robinmalfait/event-source'
class Account extends Aggregate {
private owner: string = ''
private balance: number = 0
private closed: boolean = false
// Static factory methods for creating new aggregates
static open(id: string, owner: string) {
return new Account().recordThat(accountOpened(id, owner))
}
// Instance methods for operations on existing aggregates
deposit(amount: number) {
if (this.closed) {
abort('Cannot deposit to a closed account')
}
return this.recordThat(moneyDeposited(this.aggregateId, amount))
}
// Apply handlers rebuild state from events
apply: ApplyEvents<
typeof accountOpened | typeof moneyDeposited | typeof accountClosed
> = {
ACCOUNT_OPENED: (event) => {
this.owner = event.payload.owner
},
MONEY_DEPOSITED: (event) => {
this.balance += event.payload.amount
},
ACCOUNT_CLOSED: () => {
this.closed = true
},
}
}
`
The EventSource class is the central coordinator that connects everything together. Use the builder pattern to configure it:
`typescript
import { EventSource } from '@robinmalfait/event-source'
const eventSource = EventSource.builder(myEventStore)
.addCommandHandler('OPEN_ACCOUNT', openAccountHandler)
.addCommandHandler('DEPOSIT_MONEY', depositMoneyHandler)
.addProjector(new BalanceProjector())
.addEventHandler(sendNotificationHandler)
.metadata(() => ({ userId: getCurrentUserId() }))
.build()
// Dispatch commands
await eventSource.dispatch(openAccount('acc-123', 'John Doe'))
await eventSource.dispatch(depositMoney('acc-123', 1000))
`
Command handlers receive a command and the event source instance, and return an aggregate to persist:
`typescript
import type { CommandHandler } from '@robinmalfait/event-source'
const openAccountHandler: CommandHandler<
ReturnType
> = async (command, es) => {
return es.persist(Account.open(command.payload.id, command.payload.owner))
}
const depositMoneyHandler: CommandHandler<
ReturnType
> = async (command, es) => {
let account = await es.load(Account, command.payload.accountId)
return es.persist(account.deposit(command.payload.amount))
}
`
To use the library, implement the EventStore interface with your preferred storage:
`typescript
import type { EventStore, EventType } from '@robinmalfait/event-source'
class MyEventStore implements EventStore {
async persist(events: EventType[]): Promise
// Store events in your database
}
async load(aggregateId: string): Promise
// Load all events for an aggregate
}
async loadEvents(): Promise
// Load all events (for rebuilding projections)
}
}
`
See the examples/mysql-event-store directory for a MySQL implementation using Knex.js.
Projectors build read models from events. They process events sequentially and maintain derived state.
Lifecycle:
- Normal operation: When events are persisted via es.persist(), each projector's apply handlers are called for the new events only.es.resetProjections()
- Full rebuild: When is called, reset() is called first (to clear existing state), then all events are replayed through apply.
This means projectors work well in serverless environments - projections are persisted to your database and don't need rebuilding on every cold start.
`typescript
import {
Projector,
type ApplyProjectorEvents,
} from '@robinmalfait/event-source'
class BalanceProjector extends Projector {
name = 'balance-projector'
private balances = new Map
async reset() {
// Called only during resetProjections() - clear state before full rebuild
this.balances.clear()
}
apply: ApplyProjectorEvents
ACCOUNT_OPENED: (event, es) => {
this.balances.set(event.aggregateId, 0)
},
MONEY_DEPOSITED: (event, es) => {
let current = this.balances.get(event.aggregateId) ?? 0
this.balances.set(event.aggregateId, current + event.payload.amount)
},
}
getBalance(accountId: string) {
return this.balances.get(accountId) ?? 0
}
}
`
#### Accessing Aggregate State in Projectors
Each apply handler receives the EventSource instance as the second argument. This allows projectors to reconstruct aggregate state when needed — useful when an event doesn't contain all the data required for the projection.
`typescript
class TransactionHistoryProjector extends Projector {
name = 'transaction-history-projector'
apply: ApplyProjectorEvents
ACCOUNT_CLOSED: async (event, es) => {
// Reconstruct the aggregate to access its full state
let account = await es.load(new Account(), event.aggregateId)
await db.transactionHistory.insert({
accountId: event.aggregateId,
type: 'CLOSED',
finalBalance: account.balance, // Data not in the event
closedAt: event.recordedAt,
})
},
}
}
`
The library provides BDD-style testing utilities with a given/when/then pattern:
`typescript
import { createTestEventStore } from '@robinmalfait/event-source'
describe('deposit money', () => {
let { given, when, then, ___ } = createTestEventStore({
DEPOSIT_MONEY: depositMoneyHandler,
})
it('should deposit money to an open account', async () => {
await given([accountOpened('acc-123', 'John Doe')])
await when(depositMoney('acc-123', 500))
await then([moneyDeposited('acc-123', 500)])
})
it('should fail when depositing to a closed account', async () => {
await given([
accountOpened('acc-123', 'John Doe'),
accountClosed('acc-123'),
])
await when(depositMoney('acc-123', 500))
await then(new Error('Cannot deposit to a closed account'))
})
it('should use placeholders for values we do not care about', async () => {
await given([accountOpened('acc-123', 'John Doe')])
await when(depositMoney('acc-123', 500))
// Use ___ as a placeholder for any value
await then([Event('MONEY_DEPOSITED', ___, { amount: 500 })])
})
})
`
Throw errors with clean stack traces and custom attributes:
`typescript
import { abort } from '@robinmalfait/event-source'
if (balance < amount) {
abort('Insufficient funds', { balance, requested: amount })
}
`
#### ApplyEvents / ApplyProjectorEvents
Type-safe handlers for applying events to aggregates and projectors:
`typescript
import type {
ApplyEvents,
ApplyProjectorEvents,
} from '@robinmalfait/event-source'
// For Aggregates - handlers receive only the event
apply: ApplyEvents
ACCOUNT_OPENED: (event) => {
/ ... /
},
MONEY_DEPOSITED: (event) => {
/ ... /
},
}
// For Projectors - handlers receive the event and EventSource
apply: ApplyProjectorEvents
ACCOUNT_OPENED: (event, es) => {
/ ... /
},
MONEY_DEPOSITED: (event, es) => {
/ ... /
},
}
`
#### PayloadOf / TypeOf
Extract types from events and commands:
`typescript
import type { PayloadOf, TypeOf } from '@robinmalfait/event-source'
type AccountOpenedPayload = PayloadOf
// { owner: string }
type AccountOpenedType = TypeOf
// 'ACCOUNT_OPENED'
`
See the examples/bank directory for a complete bank account domain implementation demonstrating:
- Domain events and commands
- Account aggregate with business rules
- Command handlers
- Test cases using the given/when/then pattern
- Node.js 24+ (see .nvmrc)
- pnpm
| Command | Description |
| ------------- | ----------------------------------------------------- |
| pnpm start | Build in watch mode for development |pnpm build
| | Production build (ESM + CJS + TypeScript definitions) |pnpm test
| | Run all tests |pnpm tdd
| | Run tests in watch mode |pnpm format` | Format code with Prettier |
|
MIT