Real-time graph-based stream processing for Cloudflare Workers and Durable Objects
npm install @fluxgraph/coreš Real-time graph-based stream processing and AI orchestration for Cloudflare Workers
FluxGraph is a lightweight, high-performance stream processing library with built-in AI workflow capabilities, designed specifically for edge computing environments. Build complex data pipelines and AI agents that run directly on Cloudflare's global network - combining the power of LangGraph-style orchestration with real-time stream processing in a package that's 10x smaller than alternatives.
- š Real-time Processing - Process data streams with millisecond latency
- š Graph-based Architecture - Create complex topologies with parallel and conditional paths
- š¤ AI-Native - Built-in LLM, tool calling, and memory nodes for AI workflows
- š Built-in Aggregations - Time, count, and session-based windowing
- š Backpressure Handling - Automatic buffering and flow control
- š”ļø Error Resilience - Retry policies and error recovery strategies
- šÆ Type-safe - Full TypeScript support with comprehensive types
- āļø Edge-native - Optimized for Cloudflare Workers and Durable Objects
- š Agent Loops - Support for cyclic graphs enabling ReAct and autonomous agents
- š¾ State Management - Built-in memory and checkpointing for long-running workflows
``bash`
npm install @fluxgraph/core
`typescript
import { Graph, nodes } from '@fluxgraph/core';
// Define your graph
const graph = new Graph({
name: 'Transaction Processor',
nodes: [
nodes.source('webhook', {
type: 'websocket',
url: 'wss://api.example.com/transactions'
}),
nodes.transform('normalize', {
function: (data) => ({
...data,
amount: data.amount / 100
})
}),
nodes.filter('large-only', {
function: (data) => data.amount > 100
}),
nodes.aggregate('hourly-summary', {
window: 'time',
duration: 3600,
function: (packets) => ({
total: packets.reduce((sum, p) => sum + p.data.amount, 0),
count: packets.length
})
}),
nodes.sink('alerts', {
type: 'http',
url: 'https://alerts.example.com/webhook'
})
],
edges: [
['webhook', 'normalize'],
['normalize', 'large-only'],
['large-only', 'hourly-summary'],
['hourly-summary', 'alerts']
]
});
// Start processing
await graph.start();
// Inject data manually
await graph.inject('webhook', { amount: 15000, currency: 'USD' });
// Subscribe to outputs
graph.subscribe('alerts', (packet) => {
console.log('Alert triggered:', packet.data);
});
`
FluxGraph now includes powerful AI workflow capabilities, making it a lightweight alternative to popular AI orchestration frameworks:
| Feature | FluxGraph | LangGraph | Pydantic AI | LlamaIndex | CrewAI |
|---------|-----------|-----------|-------------|------------|--------|
| Stream Processing | ā
Excellent (RxJS-based) | ā
Good | ā ļø Limited | ā ļø Limited | ā No |
| Graph Architecture | ā
Yes | ā
Yes | ā ļø Chain-based | ā ļø Chain-based | ā
Yes |
| AI-specific Nodes | ā
LLM, Tool, Memory | ā
Yes | ā
Yes | ā
Yes | ā
Agents |
| State Management | ā
Built-in + Durable Objects | ā
Built-in | ā
Built-in | ā
Built-in | ā
Yes |
| Cycles/Agent Loops | ā
Yes (ReAct, etc.) | ā
Yes | ā
Yes | ā ļø Limited | ā
Yes |
| Tool Calling | ā
Parallel + Sequential | ā
Yes | ā
Yes | ā
Yes | ā
Yes |
| Memory Types | ā
Conversation, Semantic, Hybrid | ā
Yes | ā ļø Basic | ā
Yes | ā
Yes |
| Checkpointing | ā
Yes | ā
Yes | ā
Yes | ā ļø Limited | ā ļø Limited |
| Edge Runtime | ā
Cloudflare Workers | ā No | ā No | ā No | ā No |
| Bundle Size | ā
~179KB | ā ~1.7MB | ā Python only | ā Python only | ā Python only |
| Streaming LLM | ā
Native | ā
Yes | ā ļø Limited | ā
Yes | ā ļø Limited |
| TypeScript | ā
First-class | ā
Yes | ā Python | ā Python | ā Python |
| Real-time Data | ā
Excellent | ā ļø Limited | ā No | ā No | ā No |
| Production Ready | ā
Yes | ā
Yes | ā
Yes | ā
Yes | ā ļø Beta |
- š Edge-Native: Only framework that runs on Cloudflare Workers
- ā” Lightweight: ~179KB vs 1.7MB for LangGraph (10x smaller)
- š Streaming-First: Built on RxJS for excellent real-time performance
- š§ Flexible: Combine AI with real-time data processing
- š¾ Durable: Native integration with Durable Objects for persistence
typescript
import { GraphRunner } from '@fluxgraph/core';
import { reactAgentTemplate } from '@fluxgraph/core/templates';// Create an autonomous agent
const agent = new GraphRunner(reactAgentTemplate);
await agent.initialize();
await agent.start();
// Give it a task
agent.inject('input', {
task: 'Research and summarize the latest AI trends'
});
`$3
`typescript
import { GraphBuilder, LLMNode, MemoryNode } from '@fluxgraph/core';const ragPipeline = GraphBuilder.create('RAG Pipeline')
.nodes(
{
id: 'vectorDB',
type: 'memory',
name: 'Vector Store',
memoryType: 'semantic',
embeddingDimension: 1536
},
{
id: 'llm',
type: 'llm',
name: 'GPT-4',
model: 'gpt-4',
systemPrompt: 'Answer based on the provided context.',
streaming: true
}
)
.flow('vectorDB', 'llm')
.build();
`$3
`typescript
const multiAgent = GraphBuilder.create('Multi-Agent System')
.allowCycles() // Enable agent communication loops
.nodes(
{
id: 'coordinator',
type: 'llm',
name: 'Coordinator',
model: 'gpt-4',
systemPrompt: 'You coordinate multiple specialist agents.'
},
{
id: 'researcher',
type: 'llm',
name: 'Research Agent',
model: 'gpt-3.5-turbo',
systemPrompt: 'You are a research specialist.'
},
{
id: 'analyst',
type: 'llm',
name: 'Analysis Agent',
model: 'gpt-3.5-turbo',
systemPrompt: 'You analyze data and provide insights.'
}
)
.build();
`Use Cases
$3
`typescript
const financialGraph = templates.financial.createAnomalyDetector({
thresholds: {
amount: 1000,
frequency: 10 // transactions per minute
},
alertUrl: 'https://your-webhook.com'
});
`$3
`typescript
const iotGraph = templates.iot.createSensorAggregator({
sensors: ['temperature', 'humidity', 'pressure'],
aggregateWindow: 60, // seconds
outputFormat: 'prometheus'
});
`$3
`typescript
const analyticsGraph = templates.analytics.createEventProcessor({
events: ['click', 'view', 'purchase'],
sessionTimeout: 1800, // 30 minutes
enrichment: {
geoip: true,
userAgent: true
}
});
`Durable Object Integration
`typescript
export class StreamProcessor extends DurableObject {
private graph: Graph; async fetch(request: Request) {
if (!this.graph) {
this.graph = new Graph(graphConfig);
await this.graph.start();
}
const url = new URL(request.url);
if (url.pathname === '/inject') {
const data = await request.json();
await this.graph.inject('input', data);
return new Response('OK');
}
if (url.pathname === '/metrics') {
return Response.json(this.graph.getMetrics());
}
return new Response('Not found', { status: 404 });
}
}
`Node Types
$3
- WebSocket - Real-time data streams
- HTTP - Polling or webhook endpoints
- Timer - Scheduled data generation
- Manual - Programmatic injection$3
- Data mapping and enrichment
- Format conversion
- Calculations and derived fields$3
- Conditional routing
- Data validation
- Sampling and rate limiting$3
- Time-based windows
- Count-based windows
- Session windows
- Custom aggregation functions$3
- WebSocket output
- HTTP webhooks
- Database writes
- Custom outputsAdvanced Features
$3
`typescript
const graph = new Graph({
// ...
errorStrategy: 'continue', // or 'stop', 'retry'
retryPolicy: {
maxRetries: 3,
backoffMultiplier: 2,
initialDelay: 1000
}
});
`$3
`typescript
const metrics = graph.getMetrics();
console.log({
processed: metrics.packetsProcessed,
dropped: metrics.packetsDropped,
latency: metrics.averageLatency
});graph.on('error', (event) => {
console.error('Graph error:', event);
});
`$3
`typescript
// Save graph state to Durable Object storage
const state = graph.getState();
await this.storage.put('graph-state', state);// Restore on restart
const savedState = await this.storage.get('graph-state');
if (savedState) {
graph.restore(savedState);
}
``Streamflow is designed for high-throughput, low-latency processing:
- Process 10,000+ events/second per Durable Object
- Sub-millisecond processing latency
- Automatic backpressure handling
- Memory-efficient buffering
We welcome contributions! Please see CONTRIBUTING.md for details.
MIT License - see LICENSE for details.
- š Documentation
- š¬ Discord Community
- š Issue Tracker
- š§ Email Support
---
Built with ā¤ļø for the edge computing community