NATS/JetStream on Cloudflare Durable Objects
npm install nats.doNATS/JetStream on Cloudflare Durable Objects
NATS.do implements NATS Core messaging and JetStream persistence using Cloudflare Workers and Durable Objects with SQLite storage. It provides a nats.js-compatible API accessible via JSON-RPC 2.0 over HTTP, WebSockets, or Cloudflare Workers Service Bindings.
- NATS Core: Publish/subscribe messaging with subject wildcards (*, >)
- JetStream Streams: Persistent message storage with configurable retention policies
- JetStream Consumers: Pull and push consumers with acknowledgment tracking
- MCP Integration: Model Context Protocol tools for AI agent access
- Edge-Native: Runs entirely on Cloudflare's global network
``bash`
npm install nats.do
`typescript
import { StringCodec, JSONCodec } from 'nats.do'
const sc = StringCodec()
const jc = JSONCodec()
// Publish a message
nc.publish('orders.new', sc.encode('Hello NATS!'))
// Subscribe with wildcards
const sub = nc.subscribe('orders.*')
for await (const msg of sub) {
console.log(Received: ${sc.decode(msg.data)})
}
// Request/Reply pattern
const response = await nc.request('api.users.get', jc.encode({ id: 123 }))
console.log(jc.decode(response.data))
`
`typescript
// Create a stream
const jsm = nc.jetstreamManager()
await jsm.streams.add({
name: 'ORDERS',
subjects: ['orders.*'],
retention: 'workqueue',
max_msgs: 10000,
max_age: 24 60 60 * 1_000_000_000, // 24 hours in nanoseconds
})
// Publish with acknowledgment
const js = nc.jetstream()
const ack = await js.publish('orders.new', sc.encode('{"id": 456}'))
console.log(Published to ${ack.stream} at seq ${ack.seq})`
`typescript
// Create a durable consumer
await jsm.consumers.add('ORDERS', {
durable_name: 'order-processor',
ack_policy: 'explicit',
deliver_policy: 'all',
})
// Fetch messages
const consumer = await js.consumers.get('ORDERS', 'order-processor')
const messages = await consumer.fetch({ max_messages: 10 })
for await (const msg of messages) {
console.log(Processing: ${sc.decode(msg.data)})`
msg.ack()
}
`typescript
// Connection options
interface ConnectionOptions {
servers: string | string[]
name?: string
token?: string
timeout?: number
}
// Message
interface Msg {
subject: string
data: Uint8Array
reply?: string
headers?: MsgHdrs
respond(data?: Uint8Array): boolean
}
// Subscription
interface Subscription extends AsyncIterable
getSubject(): string
unsubscribe(max?: number): void
drain(): Promise
}
`
`typescript
// Stream configuration
interface StreamConfig {
name: string
subjects: string[]
retention?: 'limits' | 'interest' | 'workqueue'
storage?: 'file' | 'memory'
max_msgs?: number
max_bytes?: number
max_age?: number // nanoseconds
discard?: 'old' | 'new'
}
// Consumer configuration
interface ConsumerConfig {
name?: string
durable_name?: string
ack_policy: 'none' | 'all' | 'explicit'
deliver_policy?: 'all' | 'last' | 'new' | 'by_start_sequence' | 'by_start_time'
filter_subject?: string
max_deliver?: number
ack_wait?: number // nanoseconds
}
// Publish acknowledgment
interface PubAck {
stream: string
seq: number
duplicate?: boolean
}
`
`typescript
import { StringCodec, JSONCodec, Empty } from 'nats.do'
const sc = StringCodec()
sc.encode('hello') // Uint8Array
sc.decode(data) // string
const jc = JSONCodec
jc.encode({ key: 'value' }) // Uint8Array
jc.decode(data) // MyType
Empty // Empty Uint8Array for messages without payload
`
NATS.do supports NATS subject wildcards for subscriptions:
- matches exactly one token: orders. matches orders.new but not orders.us.new>
- matches one or more tokens (must be last): orders.> matches orders.new and orders.us.new
`typescript
import { matchSubject, isValidSubject, isValidWildcard } from 'nats.do/utils'
matchSubject('orders.*', 'orders.new') // true
matchSubject('orders.*', 'orders.us.new') // false
matchSubject('orders.>', 'orders.us.new') // true
`
NATS.do uses three Durable Object classes:
| Durable Object | Scope | Responsibility |
|---------------|-------|----------------|
| NatsCoordinator | Global singleton | Stream registry, consumer discovery, cluster metadata |NatsPubSub
| | Per region | Core NATS pub/sub, WebSocket connections, request/reply |StreamDO
| | Per stream | Message storage, consumer state, ack tracking, retention |
NATS.do uses JSON-RPC 2.0 for communication:
`typescript
// Request
{
"jsonrpc": "2.0",
"method": "nats.publish",
"params": { "subject": "orders.new", "data": "base64..." },
"id": 1
}
// Response
{
"jsonrpc": "2.0",
"result": { "success": true },
"id": 1
}
`
NATS.do exposes MCP (Model Context Protocol) tools for AI agent integration:
| Tool | Description |
|------|-------------|
| nats_publish | Publish a message to a subject |nats_subscribe
| | Subscribe to a subject |nats_request
| | Send a request and wait for response |jetstream_publish
| | Publish to JetStream with acknowledgment |jetstream_stream_create
| | Create a new stream |jetstream_stream_info
| | Get stream information |jetstream_consumer_create
| | Create a consumer |jetstream_consumer_fetch
| | Fetch messages from a consumer |
`jsonc
{
"name": "nats.do",
"main": "src/index.ts",
"compatibility_date": "2024-01-01",
"compatibility_flags": ["nodejs_compat"],
"durable_objects": {
"bindings": [
{ "name": "NATS_COORDINATOR", "class_name": "NatsCoordinator" },
{ "name": "NATS_PUBSUB", "class_name": "NatsPubSub" },
{ "name": "STREAM_DO", "class_name": "StreamDO" }
]
},
"migrations": [
{
"tag": "v1",
"new_sqlite_classes": ["NatsCoordinator", "NatsPubSub", "StreamDO"]
}
]
}
`
`typescript
// In another Worker
export default {
async fetch(request: Request, env: Env) {
const id = env.NATS_COORDINATOR.idFromName('global')
const stub = env.NATS_COORDINATOR.get(id)
const response = await stub.fetch(new Request('http://internal/rpc', {
method: 'POST',
body: JSON.stringify({
jsonrpc: '2.0',
method: 'consumers.list',
params: { streamName: 'ORDERS' },
id: 1
})
}))
return response
}
}
`
`bashInstall dependencies
npm install
MIT