Durable Object event emitter for Uploadista
Cloudflare Durable Objects-based event emitter for Uploadista. Provides globally consistent event emission with real-time WebSocket coordination.
The Durable Objects event emitter uses Cloudflare Durable Objects for strongly-consistent event emission. Perfect for:
- Edge Deployment: Events coordinated globally at Cloudflare edge
- Real-Time Coordination: Strong consistency across all operations
- WebSocket Integration: Native persistent connection support
- Global Subscribers: Serve events to clients worldwide
- Transactional Events: ACID guarantees for complex workflows
``bash`
npm install @uploadista/event-emitter-durable-objector
pnpm add @uploadista/event-emitter-durable-object
- Cloudflare Workers with Durable Objects enabled
- @cloudflare/workers-types for type definitionswrangler.toml
- Durable Objects bindings in
`typescript
import { uploadEventEmitterDurableObjectStore } from "@uploadista/event-emitter-durable-object";
import type { UploadEvent } from "@uploadista/core/types";
import { Effect } from "effect";
export interface Env {
EVENT_EMITTER_DO: DurableObjectNamespace;
}
export default {
async fetch(request: Request, env: Env) {
const program = Effect.gen(function* () {
// Event emitter is available
});
return Effect.runPromise(
program.pipe(
Effect.provide(
uploadEventEmitterDurableObjectStore({
durableObject: env.EVENT_EMITTER_DO,
})
)
)
);
},
};
`
- ✅ Strong Consistency: ACID properties for events
- ✅ Global Edge: Events coordinated at 300+ edge locations
- ✅ WebSocket Native: Built-in persistent connections
- ✅ Real-Time: Sub-10ms event delivery globally
- ✅ Transactional: Multiple operations as one unit
- ✅ Durable: Events persisted automatically
#### uploadEventEmitterDurableObjectStore(config): Layer
Creates an Effect layer for emitting upload events via Durable Objects.
`typescript
import { uploadEventEmitterDurableObjectStore } from "@uploadista/event-emitter-durable-object";
const layer = uploadEventEmitterDurableObjectStore({
durableObject: env.EVENT_EMITTER_DO,
});
`
#### makeEventEmitterDurableObjectStore
Factory function for typed event emitter.
`typescript`
const emitter = makeEventEmitterDurableObjectStore
durableObject: env.EVENT_EMITTER_DO,
});
#### emit(key: string, event: T): Effect
Emit event to all subscribers globally.
`typescript`
const program = Effect.gen(function* () {
yield* emitter.emit("upload:abc123", {
type: "completed",
duration: 45000,
size: 1048576,
});
});
#### subscribe(key: string, connection): Effect
Subscribe a WebSocket connection to events.
`typescript`
const program = Effect.gen(function* () {
yield* emitter.subscribe("upload:abc123", wsConnection);
});
#### unsubscribe(key: string): Effect
Unsubscribe from events.
`typescript`
const program = Effect.gen(function* () {
yield* emitter.unsubscribe("upload:abc123");
});
`toml
name = "uploadista-worker"
main = "src/index.ts"
[[durable_objects.bindings]]
name = "EVENT_EMITTER_DO"
class_name = "EventEmitterDurableObject"
[env.production]
durable_objects = { bindings = [{name = "EVENT_EMITTER_DO", class_name = "EventEmitterDurableObject"}] }
`
`typescript
import { uploadEventEmitterDurableObjectStore } from "@uploadista/event-emitter-durable-object";
export interface Env {
EVENT_EMITTER_DO: DurableObjectNamespace;
}
export default {
async fetch(request: Request, env: Env) {
const program = Effect.gen(function* () {
// Use event emitter
});
return Effect.runPromise(
program.pipe(
Effect.provide(
uploadEventEmitterDurableObjectStore({
durableObject: env.EVENT_EMITTER_DO,
})
)
)
);
},
};
`
Server emits progress, clients receive globally:
`typescript
import { uploadEventEmitterDurableObjectStore } from "@uploadista/event-emitter-durable-object";
import type { UploadEvent } from "@uploadista/core/types";
const trackUploadProgress = (uploadId: string) =>
Effect.gen(function* () {
// Emit start event
yield* emitter.emit(uploadId, {
type: "started",
timestamp: new Date().toISOString(),
});
// Simulate progress
for (let i = 0; i <= 100; i += 25) {
yield* Effect.sleep("1 second");
yield* emitter.emit(uploadId, {
type: "progress",
progress: i / 100,
bytesReceived: Math.floor((i / 100) * 1048576),
timestamp: new Date().toISOString(),
});
}
// Emit completion
yield* emitter.emit(uploadId, {
type: "completed",
timestamp: new Date().toISOString(),
duration: 4000,
});
});
`
Client-side (browser, anywhere globally):
`typescript
const ws = new WebSocket("wss://uploadista.example.com/events");
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === "progress") {
updateProgressBar(message.progress);
} else if (message.type === "completed") {
showSuccess("Upload complete!");
}
};
`
`typescript
const executeUploadWorkflow = (uploadId: string) =>
Effect.gen(function* () {
// Step 1: Validate
yield* emitter.emit(uploadId, {
type: "workflow",
step: "validating",
details: "Checking file format...",
});
// Step 2: Process
yield* emitter.emit(uploadId, {
type: "workflow",
step: "processing",
details: "Resizing images...",
});
// Step 3: Store
yield* emitter.emit(uploadId, {
type: "workflow",
step: "storing",
details: "Uploading to R2...",
});
// Completion
yield* emitter.emit(uploadId, {
type: "workflow",
step: "completed",
details: "All done!",
});
});
`
`typescript
// Upgrade HTTP to WebSocket
if (request.headers.get("Upgrade") === "websocket") {
const { 0: client, 1: server } = new WebSocketPair();
const uploadId = new URL(request.url).searchParams.get("uploadId");
const program = Effect.gen(function* () {
// Subscribe this client
yield* emitter.subscribe(uploadId, server);
});
await Effect.runPromise(program);
return new Response(null, { status: 101, webSocket: client });
}
`
| Operation | Latency | Range |
|-----------|---------|-------|
| emit() | 5-10ms | Global |
| subscribe() | 1-3ms | Global |
| unsubscribe() | 1-3ms | Global |
| Message delivery | 10-50ms | Any edge location |
Strong consistency: All subscribers see same events in same order.
| Limit | Value |
|-------|-------|
| Events per object | Unlimited |
| Simultaneous WebSockets | 128 per object |
| Storage | 128 MB per object |
| Event size | 512 KB recommended |
Partition large event streams across multiple objects if needed.
`typescript
// Good: Specific upload ID
"upload:abc123"
"flow:job:xyz"
"user:upload:456"
// Avoid: Generic
"events", "status", "updates"
`
`typescript
// Client connects
yield* emitter.subscribe(uploadId, wsConnection);
// Client disconnects
ws.addEventListener("close", () => {
yield* emitter.unsubscribe(uploadId);
});
`
`typescript`
interface UploadEvent {
type: "started" | "progress" | "completed" | "error";
timestamp: string;
progress?: number;
error?: string;
metadata?: Record
}
`bashDeploy to Cloudflare
wrangler publish
$3
`yaml
name: Deployon:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: cloudflare/wrangler-action@v3
with:
apiToken: ${{ secrets.CLOUDFLARE_API_TOKEN }}
`Monitoring
$3
`typescript
const emitWithMetrics = (key: string, event: UploadEvent) =>
Effect.gen(function* () {
const start = Date.now(); yield* emitter.emit(key, event);
const duration = Date.now() - start;
console.log(
Event emitted: ${key} (${duration}ms));
});
`$3
Use Cloudflare Dashboard:
- "Durable Objects" in Workers analytics
- Monitor storage usage
- Track request rate
Troubleshooting
$3
Ensure binding defined in
wrangler.toml:`toml
[[durable_objects.bindings]]
name = "EVENT_EMITTER_DO"
class_name = "EventEmitterDurableObject"
`$3
Implement reconnect logic:
`typescript
const reconnectWebSocket = () => {
ws = new WebSocket(wss://${host}/events?uploadId=${uploadId}); ws.onopen = () => {
console.log("Reconnected");
};
ws.onclose = () => {
setTimeout(reconnectWebSocket, 1000);
};
};
`$3
Partition events across multiple objects:
`typescript
const getObjectId = (uploadId: string) => {
// Deterministic partitioning
const hash = uploadId.charCodeAt(0);
return events-${hash % 10};
};
`Integration Patterns
$3
`typescript
// Emit to Durable Objects
yield* emitter.emit(uploadId, event);// Also cache in KV
yield* kv.put(
event:${uploadId}:latest, JSON.stringify(event));
`$3
`typescript
// Emit event
yield* emitter.emit(uploadId, { type: "completed" });// Store result in R2
await env.R2.put(
uploads/${uploadId}/result.json, resultData);
``- @uploadista/core - Core types
- @uploadista/kv-store-cloudflare-do - Durable Objects KV
- @uploadista/adapters-hono - Hono for Workers
- @uploadista/event-emitter-websocket - WebSocket emitter
See LICENSE in the main repository.
- EVENT_SYSTEM.md - Architecture guide
- Cloudflare Durable Objects - Official docs
- Server Setup Guide - Deployment guide
- WebSocket Emitter - Alternative for single-region