State change event protocol for Durable Streams
npm install @durable-streams/stateBuilding blocks for transmitting structured state over Durable Streams. Use these primitives for any real-time protocol: AI token streams, presence updates, collaborative editing, or database sync.
``bash`
pnpm add @durable-streams/state @tanstack/db
> Note: @tanstack/db is a peer dependency that must be installed alongside this package. This ensures type compatibility when using StreamDB collections with TanStack DB's query utilities like useLiveQuery from @tanstack/react-db.
This package provides flexible primitives for streaming structured state. You choose how much structure you need:
- Simple state updates: Stream JSON payloads and track current values
- Typed collections: Add schemas and primary keys for structured entities
- Reactive queries: Build on TanStack DB for subscriptions and optimistic updates
Stream whatever state your protocol requires.
Stream structured JSON and query current values:
`typescript
import { MaterializedState } from "@durable-streams/state"
const state = new MaterializedState()
// Apply any structured change
state.apply({
type: "token",
key: "stream-1",
value: { content: "Hello", model: "claude-3" },
headers: { operation: "insert" },
})
// Query current state
const token = state.get("token", "stream-1")
const allTokens = state.getType("token")
`
Add schemas and validation for structured entities:
`typescript
import { createStateSchema, createStreamDB } from "@durable-streams/state"
// Define your schema
const schema = createStateSchema({
users: {
schema: userSchema, // Standard Schema validator
type: "user", // Event type field
primaryKey: "id", // Primary key field name
},
messages: {
schema: messageSchema,
type: "message",
primaryKey: "id",
},
})
// Create a stream-backed database
const db = createStreamDB({
streamOptions: {
url: "https://api.example.com/streams/my-stream",
contentType: "application/json",
},
state: schema,
})
// Load initial data
await db.preload()
// Reactive queries with useLiveQuery
import { useLiveQuery } from "@tanstack/react-db" // or solid-db, vue-db
import { eq } from "@tanstack/db"
const userQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => eq(users.id, "123"))
.findOne()
)
const allUsersQuery = useLiveQuery((q) =>
q.from({ users: db.collections.users })
)
`
The Durable Streams State Protocol defines a standard format for state change events:
- Change Events: insert, update, delete operations on entitiessnapshot-start
- Control Events: , snapshot-end, reset signals
- Entity Types: Discriminator field that routes events to collections
- Primary Keys: Unique identifiers extracted from entity values
See STATE-PROTOCOL.md for the full specification.
Simple in-memory state container for basic use cases:
`typescript
import { MaterializedState } from "@durable-streams/state"
const state = new MaterializedState()
// Apply change events
state.apply({
type: "user",
key: "1",
value: { name: "Kyle" },
headers: { operation: "insert" },
})
// Query state
const user = state.get("user", "1")
const allUsers = state.getType("user")
`
Stream-backed database with TanStack DB collections. Provides reactive queries, subscriptions, and optimistic updates.
Define your application state structure:
`typescript`
const schema = createStateSchema({
users: {
schema: userSchema, // Standard Schema validator
type: "user", // Event type for routing
primaryKey: "id", // Field to use as primary key
},
messages: {
schema: messageSchema,
type: "message",
primaryKey: "id",
},
})
Uses Standard Schema for validation, supporting multiple libraries:
`typescript
// Zod
import { z } from "zod"
const userSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
})
// Valibot
import * as v from "valibot"
const userSchema = v.object({
id: v.string(),
name: v.string(),
email: v.pipe(v.string(), v.email()),
})
// Manual Standard Schema
const userSchema = {
"~standard": {
version: 1,
vendor: "my-app",
validate: (value) => {
// Your validation logic
if (isValid(value)) {
return { value }
}
return { issues: [{ message: "Invalid user" }] }
},
},
}
`
Schema provides typed event creation helpers:
`typescript
// Insert
const insertEvent = schema.users.insert({
value: { id: "1", name: "Kyle", email: "kyle@example.com" },
key: "1", // Optional, defaults to value[primaryKey]
})
// Update
const updateEvent = schema.users.update({
value: { id: "1", name: "Kyle Mathews", email: "kyle@example.com" },
oldValue: { id: "1", name: "Kyle", email: "kyle@example.com" }, // Optional
})
// Delete
const deleteEvent = schema.users.delete({
key: "1",
oldValue: { id: "1", name: "Kyle", email: "kyle@example.com" }, // Optional
})
// Custom headers
const eventWithTxId = schema.users.insert({
value: { id: "1", name: "Kyle" },
headers: {
txid: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
`
`typescript
const db = createStreamDB({
streamOptions: {
url: "https://api.example.com/streams/my-stream",
contentType: "application/json",
// All DurableStream options supported
headers: { Authorization: "Bearer token" },
batching: true,
},
state: schema,
})
// The stream is created lazily when preload() is called
await db.preload()
`
StreamDB collections are TanStack DB collections. Use TanStack DB's query builder for filtering, sorting, aggregation, and joins with differential dataflow - dramatically faster than JavaScript filtering:
`typescript
import { useLiveQuery } from "@tanstack/[framework]-db" // react-db, solid-db, etc
import { eq, gt, and, count } from "@tanstack/db"
// Simple collection access
const query = useLiveQuery((q) => q.from({ users: db.collections.users }))
// Filtering with WHERE
const activeQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => eq(users.active, true))
)
// Complex conditions
const query = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => and(eq(users.active, true), gt(users.age, 18)))
)
// Sorting and limiting
const topUsersQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.orderBy(({ users }) => users.lastSeen, "desc")
.limit(10)
)
// Aggregation with GROUP BY and ordering
const langStatsQuery = useLiveQuery((q) => {
const languageCounts = q
.from({ events: db.collections.events })
.groupBy(({ events }) => events.language)
.select(({ events }) => ({
language: events.language,
total: count(events.id),
}))
return q
.from({ stats: languageCounts })
.orderBy(({ stats }) => stats.total, "desc")
})
// Joins across collections
const query = useLiveQuery((q) =>
q
.from({ messages: db.collections.messages })
.join({ users: db.collections.users }, ({ messages, users }) =>
eq(messages.userId, users.id)
)
.select(({ messages, users }) => ({
messageId: messages.id,
text: messages.text,
userName: users.name,
}))
)
`
Why use the query builder?
- Differential dataflow: Incremental updates only recompute affected results
- Dramatically faster: Push filtering/sorting into the DB engine vs JavaScript
- Reactive: Queries automatically update when data changes
- Type-safe: Full TypeScript support with autocomplete
Framework integration: See TanStack DB docs for framework-specific guides:
- @tanstack/react-db
- @tanstack/solid-db
- @tanstack/vue-db
`typescript
// Load all data until up-to-date
await db.preload()
// Stop syncing and cleanup
db.close()
// Wait for a transaction to be confirmed
await db.utils.awaitTxId("txid-uuid", 5000) // 5 second timeout
`
Define actions with optimistic updates and server confirmation:
`typescript
const db = createStreamDB({
streamOptions: { url: streamUrl, contentType: "application/json" },
state: schema,
actions: ({ db, stream }) => ({
addUser: {
// Optimistic update (runs immediately)
onMutate: (user) => {
db.collections.users.insert(user)
},
// Server mutation (runs async)
mutationFn: async (user) => {
const txid = crypto.randomUUID()
await stream.append(
schema.users.insert({
value: user,
headers: { txid },
})
)
// Wait for confirmation
await db.utils.awaitTxId(txid)
},
},
updateUser: {
onMutate: ({ id, updates }) => {
db.collections.users.update(id, (draft) => {
Object.assign(draft, updates)
})
},
mutationFn: async ({ id, updates }) => {
const txid = crypto.randomUUID()
const current = await db.collections.users.get(id)
await stream.append(
schema.users.update({
value: { ...current, ...updates },
oldValue: current,
headers: { txid },
})
)
await db.utils.awaitTxId(txid)
},
},
}),
})
// Call actions
await db.actions.addUser({ id: "1", name: "Kyle", email: "kyle@example.com" })
await db.actions.updateUser({ id: "1", updates: { name: "Kyle Mathews" } })
`
Use TanStack DB's framework adapters for reactive queries:
`typescript
import { useLiveQuery } from '@tanstack/react-db'
import { eq } from '@tanstack/db'
function UserProfile({ userId }: { userId: string }) {
const userQuery = useLiveQuery((q) =>
q.from({ users: db.collections.users })
.where(({ users }) => eq(users.id, userId))
.findOne()
)
if (userQuery.isLoading()) return
return (
{userQuery.data.email}
See @tanstack/react-db docs for more.
$3
`typescript
import { useLiveQuery } from '@tanstack/solid-db'
import { eq } from '@tanstack/db'function MessageList() {
const messagesQuery = useLiveQuery((q) =>
q.from({ messages: db.collections.messages })
.orderBy(({ messages }) => messages.timestamp, 'desc')
.limit(50)
)
return (
{(message) => }
)
}
`See @tanstack/solid-db docs for more.
Common Patterns
$3
`typescript
const schema = createStateSchema({
config: {
schema: configSchema,
type: "config",
primaryKey: "key",
},
})// Set value
await stream.append(
schema.config.insert({
value: { key: "theme", value: "dark" },
})
)
// Query value reactively
const themeQuery = useLiveQuery((q) =>
q
.from({ config: db.collections.config })
.where(({ config }) => eq(config.key, "theme"))
.findOne()
)
`$3
`typescript
const schema = createStateSchema({
presence: {
schema: presenceSchema,
type: "presence",
primaryKey: "userId",
},
})// Update presence
await stream.append(
schema.presence.update({
value: {
userId: "kyle",
status: "online",
lastSeen: Date.now(),
},
})
)
// Query presence with TanStack DB
const presenceQuery = useLiveQuery((q) =>
q
.from({ presence: db.collections.presence })
.where(({ presence }) => eq(presence.status, "online"))
)
`$3
`typescript
const schema = createStateSchema({
users: { schema: userSchema, type: "user", primaryKey: "id" },
messages: { schema: messageSchema, type: "message", primaryKey: "id" },
reactions: { schema: reactionSchema, type: "reaction", primaryKey: "id" },
typing: { schema: typingSchema, type: "typing", primaryKey: "userId" },
})// Different types coexist in the same stream
await stream.append(JSON.stringify(schema.users.insert({ value: user })))
await stream.append(JSON.stringify(schema.messages.insert({ value: message })))
await stream.append(
JSON.stringify(schema.reactions.insert({ value: reaction }))
)
`Best Practices
$3
StreamDB requires object values (not primitives) for the primary key pattern:
`typescript
// ❌ Won't work
{ type: 'count', key: 'views', value: 42 }// ✅ Works
{ type: 'count', key: 'views', value: { count: 42 } }
`$3
`typescript
useEffect(() => {
const db = createStreamDB({ streamOptions, state: schema }) return () => db.close() // Cleanup on unmount
}, [])
`$3
Use Standard Schema to validate data at system boundaries:
`typescript
const userSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
age: z.number().min(0).max(150),
})
`$3
For critical operations, always use transaction IDs to ensure confirmation:
`typescript
const txid = crypto.randomUUID()
await stream.append(
JSON.stringify(schema.users.insert({ value: user, headers: { txid } }))
)
await db.utils.awaitTxId(txid, 10000) // Wait up to 10 seconds
`$3
`typescript
try {
await db.actions.addUser(user)
} catch (error) {
if (error.message.includes("Timeout")) {
// Handle timeout
} else {
// Handle other errors
}
}
`API Reference
$3
`typescript
export type Operation = "insert" | "update" | "delete"export interface ChangeEvent {
type: string
key: string
value?: T
old_value?: T
headers: ChangeHeaders
}
export interface ChangeHeaders {
operation: Operation
txid?: string
timestamp?: string
}
export interface ControlEvent {
headers: {
control: "snapshot-start" | "snapshot-end" | "reset"
offset?: string
}
}
export type StateEvent = ChangeEvent | ControlEvent
`$3
`typescript
// Create a state schema with typed collections and event helpers
export function createStateSchema<
T extends Record,
>(collections: T): StateSchema// Create a stream-backed database
export async function createStreamDB<
TDef extends StreamStateDefinition,
TActions extends Record>,
>(
options: CreateStreamDBOptions
): Promise | StreamDBWithActions>
`$3
`typescript
export class MaterializedState {
apply(event: ChangeEvent): void
applyBatch(events: ChangeEvent[]): void
get(type: string, key: string): T | undefined
getType(type: string): Map
clear(): void
readonly typeCount: number
readonly types: string[]
}
``Apache-2.0
- STATE-PROTOCOL.md - Full protocol specification
- Durable Streams Protocol - Base protocol
- Standard Schema - Schema validation
- TanStack DB - Reactive collections