Kafka-compatible streaming platform on Cloudflare Workers with Durable Object SQLite
npm install kafka.doKafka-compatible streaming platform on Cloudflare Workers with Durable Object SQLite.
kafka.do brings the familiar Kafka programming model to the edge, running entirely on Cloudflare's global network. Each topic partition is backed by a Durable Object with SQLite storage, providing strong consistency and durability without managing any infrastructure.
- Kafka-compatible API - Familiar producer, consumer, and admin interfaces
- Edge-native - Runs on Cloudflare Workers with global distribution
- Durable storage - Messages stored in Durable Object SQLite
- Consumer groups - Coordinated consumption with automatic partition assignment
- Offset management - Automatic and manual offset commits
- HTTP Client SDK - Access kafka.do from any JavaScript runtime
- Partitioning - Key-based partitioning for message ordering
- Batch operations - Efficient batch produce and consume
``bash`
npm install kafka.do
Send messages to topics using the Producer API within a Cloudflare Worker:
`typescript
import { createProducer } from 'kafka.do'
export default {
async fetch(request: Request, env: Env) {
const producer = createProducer(env)
// Send a single message
const metadata = await producer.send({
topic: 'orders',
key: 'order-123',
value: { orderId: '123', amount: 99.99 },
headers: { source: 'web' }
})
console.log(Message sent to partition ${metadata.partition} at offset ${metadata.offset})
// Send multiple messages in a batch
const results = await producer.sendBatch([
{ topic: 'orders', key: 'order-124', value: { orderId: '124', amount: 49.99 } },
{ topic: 'orders', key: 'order-125', value: { orderId: '125', amount: 149.99 } }
])
await producer.close()
return new Response('Messages sent')
}
}
`
Consume messages from topics using the Consumer API:
`typescript
import { createConsumer } from 'kafka.do'
export default {
async fetch(request: Request, env: Env) {
const consumer = createConsumer(env, {
groupId: 'order-processor',
clientId: 'worker-1',
autoCommit: true,
fromBeginning: false
})
// Subscribe to topics
await consumer.subscribe(['orders'])
// Poll for messages
const records = await consumer.poll(1000)
for (const record of records) {
console.log(Received: ${record.key} = ${JSON.stringify(record.value)})Topic: ${record.topic}, Partition: ${record.partition}, Offset: ${record.offset}
console.log()
}
// Manual commit (if autoCommit is false)
await consumer.commit()
await consumer.close()
return new Response(Processed ${records.length} messages)`
}
}
#### Using Async Iterator
`typescript
const consumer = createConsumer(env, { groupId: 'my-group' })
await consumer.subscribe(['orders'])
for await (const record of consumer) {
console.log(Processing: ${record.key})`
// Process each record as it arrives
}
Manage topics and consumer groups:
`typescript
import { createAdmin } from 'kafka.do'
export default {
async fetch(request: Request, env: Env) {
const admin = createAdmin(env)
// Create a topic with 3 partitions
await admin.createTopic({
topic: 'orders',
partitions: 3,
config: {
'retention.ms': '604800000' // 7 days
}
})
// List all topics
const topics = await admin.listTopics()
console.log('Topics:', topics)
// Describe a topic
const metadata = await admin.describeTopic('orders')
console.log('Partitions:', metadata.partitions.length)
// Add more partitions
await admin.createPartitions('orders', 6)
// List consumer groups
const groups = await admin.listGroups()
// Describe a consumer group
const groupInfo = await admin.describeGroup('order-processor')
console.log('Group state:', groupInfo.state)
console.log('Members:', groupInfo.members.length)
// Get partition offsets
const offsets = await admin.listOffsets('orders')
for (const [partition, info] of offsets) {
console.log(Partition ${partition}: earliest=${info.earliest}, latest=${info.latest})
}
// Delete a topic
await admin.deleteTopic('old-topic')
// Delete a consumer group
await admin.deleteGroup('old-group')
await admin.close()
return new Response('Admin operations complete')
}
}
`
Access kafka.do from any JavaScript environment using the HTTP Client SDK:
`typescript
import { KafkaClient } from 'kafka.do/client'
// Create client pointing to your kafka.do deployment
const client = new KafkaClient({
baseUrl: 'https://kafka.your-domain.workers.dev',
clientId: 'my-app',
timeout: 30000,
headers: {
'Authorization': 'Bearer your-token'
}
})
// Check service health
const health = await client.health()
console.log('Status:', health.status)
// Producer operations
const producer = client.producer({ defaultTopic: 'events' })
await producer.send({
key: 'user-123',
value: { type: 'page_view', page: '/home' }
})
await producer.sendBatch([
{ key: 'user-123', value: { type: 'click', button: 'signup' } },
{ key: 'user-456', value: { type: 'page_view', page: '/about' } }
])
// Consumer operations
const consumer = client.consumer({
groupId: 'analytics-processor',
topics: ['events'],
autoCommit: true
})
// Connect and join consumer group
const joinResult = await consumer.connect()
console.log('Member ID:', joinResult.memberId)
// Fetch messages from a partition
const { messages } = await consumer.fetch('events', 0, { offset: 0, limit: 100 })
for (const msg of messages) {
console.log(${msg.key}: ${JSON.stringify(msg.value)})
}
// Commit offsets
await consumer.commit()
// Get partition offsets
const offsets = await consumer.getOffsets('events', 0)
console.log(Earliest: ${offsets.earliest}, Latest: ${offsets.latest})
// Disconnect from consumer group
await consumer.disconnect()
// Admin operations
const admin = client.admin()
await admin.createTopic({ topic: 'logs', partitions: 5 })
const topics = await admin.listTopics()
const topicInfo = await admin.describeTopic('logs')
const groups = await admin.listGroups()
const groupInfo = await admin.describeGroup('analytics-processor')
await admin.addPartitions('logs', 10)
await admin.deleteTopic('old-logs')
await admin.deleteGroup('old-group')
`
#### createProducer(env, config?)
Creates a new producer instance.
| Config Option | Type | Default | Description |
|--------------|------|---------|-------------|
| clientId | string | undefined | Client identifier for tracking |batchSize
| | number | undefined | Number of messages to batch |lingerMs
| | number | undefined | Time to wait for batch to fill |acks
| | 0 \| 1 \| 'all' | undefined | Acknowledgment mode |retries
| | number | undefined | Number of retry attempts |
#### Producer Methods
- send(record) - Send a single message, returns RecordMetadatasendBatch(records)
- - Send multiple messages, returns RecordMetadata[]flush()
- - Flush buffered messagesclose()
- - Close the producer
#### createConsumer(env, config, rebalanceListener?)
Creates a new consumer instance.
| Config Option | Type | Default | Description |
|--------------|------|---------|-------------|
| groupId | string | required | Consumer group ID |clientId
| | string | 'do-consumer' | Client identifier |sessionTimeoutMs
| | number | 30000 | Session timeout |heartbeatIntervalMs
| | number | 3000 | Heartbeat interval |maxPollRecords
| | number | 500 | Max records per poll |autoCommit
| | boolean | true | Enable auto-commit |autoCommitIntervalMs
| | number | 5000 | Auto-commit interval |fromBeginning
| | boolean | false | Start from beginning |rebalanceTimeoutMs
| | number | 60000 | Rebalance timeout |
#### Consumer Methods
- subscribe(topics) - Subscribe to topicsunsubscribe()
- - Unsubscribe from all topicspoll(timeout?)
- - Poll for recordscommit()
- - Commit current offsetscommitSync(offsets?)
- - Commit specific offsetsseek(partition, offset)
- - Seek to offsetpause(partitions)
- - Pause consumptionresume(partitions)
- - Resume consumptionclose()
- - Close consumer and leave group
#### Rebalance Listener
`typescript`
const consumer = createConsumer(env, config, {
async onPartitionsAssigned(partitions) {
console.log('Assigned:', partitions)
},
async onPartitionsRevoked(partitions) {
console.log('Revoked:', partitions)
}
})
#### createAdmin(env, config?)
Creates a new admin client.
| Config Option | Type | Default | Description |
|--------------|------|---------|-------------|
| clientId | string | undefined | Client identifier |requestTimeoutMs
| | number | undefined | Request timeout |
#### Admin Methods
- createTopic(config) - Create a new topicdeleteTopic(topic)
- - Delete a topiclistTopics()
- - List all topicsdescribeTopic(topic)
- - Get topic metadatacreatePartitions(topic, count)
- - Add partitionslistGroups()
- - List consumer groupsdescribeGroup(groupId)
- - Get group detailsdeleteGroup(groupId)
- - Delete a consumer grouplistOffsets(topic)
- - Get partition offsetsclose()
- - Close admin client
#### KafkaClient
| Config Option | Type | Default | Description |
|--------------|------|---------|-------------|
| baseUrl | string | required | kafka.do service URL |clientId
| | string | auto-generated | Client identifier |timeout
| | number | 30000 | Request timeout (ms) |headers
| | object | {} | Default headers |fetch
| | function | globalThis.fetch | Custom fetch implementation |
kafka.do includes pre-built integrations for common data sources.
`typescript`
import {
KafkaPipeline,
createKafkaPipeline,
R2EventBridge,
createR2EventBridge
} from 'kafka.do/integrations'
Stream MongoDB change events to kafka.do topics using the KafkaPipeline adapter. This integrates with MongoDB change streams to capture insert, update, and delete operations in real-time.
`typescript
import { createKafkaPipeline, type CDCEvent } from 'kafka.do/integrations'
// Create a pipeline that routes events to topics based on database/collection
const pipeline = createKafkaPipeline({
env,
topicPattern: 'cdc.{db}.{coll}' // e.g., cdc.mydb.users
})
// Send a CDC event (typically from a MongoDB change stream)
await pipeline.send({
eventId: 'evt-123',
operationType: 'insert',
ns: { db: 'mydb', coll: 'users' },
documentKey: { _id: 'user-456' },
fullDocument: { _id: 'user-456', name: 'Alice', email: 'alice@example.com' },
timestamp: new Date().toISOString()
})
// Send multiple events in a batch
await pipeline.sendBatch(cdcEvents)
`
Factory functions:
- createKafkaPipeline(config) - Full configuration with custom topic patternscreateFixedTopicPipeline(env, topic)
- - All events go to a single topiccreateDatabaseTopicPipeline(env)
- - Topics per database (cdc.{db})createCollectionTopicPipeline(env)
- - Topics per collection (cdc.{db}.{coll})
Consumer helpers:
`typescript
import { processCDCMessage, isInsertEvent } from 'kafka.do/integrations'
// Process CDC messages with typed handlers
await processCDCMessage(message, {
database: 'mydb',
collection: 'users',
groupId: 'cdc-processor',
onInsert: async (event) => console.log('New document:', event.fullDocument),
onUpdate: async (event) => console.log('Updated:', event.updateDescription),
onDelete: async (event) => console.log('Deleted:', event.documentKey)
})
`
Stream R2 object events (creates, deletes) to kafka.do topics. Use this as a Queue consumer to capture R2 event notifications.
`typescript
import { createR2EventBridge, R2EventBridge } from 'kafka.do/integrations'
// Create an event bridge
const bridge = createR2EventBridge({
env,
topicPattern: 'r2.{bucket}', // e.g., r2.my-bucket
bucketFilter: 'my-bucket', // Optional: filter by bucket
keyPrefixFilter: 'uploads/' // Optional: filter by key prefix
})
// Process R2 events (typically in a Queue consumer)
export default {
async queue(batch: MessageBatch, env: Env) {
const bridge = createR2EventBridge({ env })
for (const message of batch.messages) {
const event = R2EventBridge.parseQueueMessage(message)
if (event) {
await bridge.processEvent(event)
}
}
}
}
`
Consumer helpers:
`typescript
import { processR2Event, isR2ObjectCreated } from 'kafka.do/integrations'
// Process R2 events with typed handlers
await processR2Event(message, {
bucketFilter: 'my-bucket',
onObjectCreated: async (event) => {
console.log('Created:', event.object.key, event.object.size)
},
onObjectDeleted: async (event) => {
console.log('Deleted:', event.key)
}
})
`
Add the following to your wrangler.toml:
`toml
name = "my-kafka-app"
main = "src/index.ts"
compatibility_date = "2024-01-01"
compatibility_flags = ["nodejs_compat"]
[durable_objects]
bindings = [
{ name = "TOPIC_PARTITION", class_name = "TopicPartitionDO" },
{ name = "CONSUMER_GROUP", class_name = "ConsumerGroupDO" },
{ name = "CLUSTER_METADATA", class_name = "ClusterMetadataDO" }
]
[[migrations]]
tag = "v1"
new_sqlite_classes = ["TopicPartitionDO", "ConsumerGroupDO", "ClusterMetadataDO"]
`
`typescript`
interface Env {
TOPIC_PARTITION: DurableObjectNamespace
CONSUMER_GROUP: DurableObjectNamespace
CLUSTER_METADATA: DurableObjectNamespace
}
kafka.do exposes a REST API for external access:
- POST /topics/:topic/produce - Produce a single messagePOST /topics/:topic/produce-batch
- - Produce multiple messages
- GET /topics/:topic/partitions/:partition/messages - Read messagesGET /topics/:topic/partitions/:partition/offsets
- - Get partition offsets
- POST /consumer-groups/:groupId/join - Join a consumer groupPOST /consumer-groups/:groupId/heartbeat
- - Send heartbeatPOST /consumer-groups/:groupId/commit
- - Commit offsetsPOST /consumer-groups/:groupId/leave
- - Leave consumer groupGET /consumer-groups/:groupId
- - Describe consumer group
- GET /admin/topics - List topicsPOST /admin/topics
- - Create topicGET /admin/topics/:topic
- - Describe topicDELETE /admin/topics/:topic
- - Delete topicPOST /admin/topics/:topic/partitions
- - Add partitionsGET /admin/topics/:topic/offsets
- - Get topic offsetsGET /admin/groups
- - List consumer groupsGET /admin/groups/:groupId
- - Describe consumer groupDELETE /admin/groups/:groupId
- - Delete consumer group
- GET / - Service infoGET /health
- - Health check
- Cloudflare Workers environment
- Durable Objects with SQLite storage enabled
- Node.js 18+ (for local development)
`bashInstall dependencies
npm install
MIT