Type-safe DAG execution engine for AI workflows
npm install @dagengine/core🚀 Type-Safe DAG Engine for AI Workflows
Define task dependencies. Get automatic parallelization, cost tracking, and 10x speedup.



🚀 Quick Start •
📖 Documentation •
💬 Discussions •
🐛 Issues •
📦 Examples
---
dagengine is a TypeScript DAG engine that turns sequential AI workflows into parallel ones automatically.
typescript
// ❌ What most developers do (sequential, slow, expensive)
for (const item of items) {
const sentiment = await ai.analyze(item); // Wait...
const topics = await ai.extract(item); // Wait...
const summary = await ai.summarize(item); // Wait...
}
// Result: 100 items × 15 seconds = 25 minutes, $15
`$3
`typescript
// ✅ With dagengine (parallel, fast, cheap)
const engine = new DagEngine({ plugin: new MyPlugin() });
const result = await engine.process(items);
// Result: 100 items in 2.5 minutes, $5
`10x faster. 67% cheaper. Zero orchestration code.
Define dependencies → get automatic parallelization.
---
🚀 5-Minute Quick Start
$3
`bash
npm install @dagengine/core
`Requirements: Node.js ≥ 18.0.0, TypeScript ≥ 5.0 (recommended)
$3
`typescript
import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';// Define result types (optional but helps with TypeScript)
interface SentimentResult {
sentiment: "positive" | "negative" | "neutral";
score: number;
}
interface TopicsResult {
topics: string[];
}
// 1. Define your workflow
class ReviewAnalyzer extends Plugin {
constructor() {
super('analyzer', 'Review Analyzer', 'Analyzes reviews');
this.dimensions = ['sentiment', 'topics', 'summary'];
}
defineDependencies(): Record {
return {
summary: ['sentiment', 'topics']
};
}
createPrompt(context: PromptContext): string {
const content = context.sections[0]?.content || '';
if (context.dimension === 'sentiment') {
return
Analyze sentiment: "${content}";
} if (context.dimension === 'topics') {
return
Extract topics: "${content}";
} if (context.dimension === 'summary') {
const sentiment = context.dependencies.sentiment?.data as SentimentResult;
const topics = context.dependencies.topics?.data as TopicsResult;
return
Create a ${sentiment.sentiment} summary covering ${topics.topics.join(', ')}:;
} throw new Error(
Unknown dimension: ${context.dimension});
} selectProvider(): ProviderSelection {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
}
// 2. Process your data
async function main(): Promise {
// Validate API key
if (!process.env.ANTHROPIC_API_KEY) {
console.error('❌ Missing ANTHROPIC_API_KEY environment variable');
console.error('Set it with: export ANTHROPIC_API_KEY="your-key"');
process.exit(1);
}
// Create engine
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
}
});
// Prepare reviews
const reviews = [
{ content: 'Great product!', metadata: { id: 1 } },
{ content: 'Not good.', metadata: { id: 2 } }
];
// Process
const result = await engine.process(reviews);
// Display results
console.log(JSON.stringify(result.sections[0]?.results, null, 4));
}
// 3. Run with error handling
main().catch((error: Error) => {
console.error('❌ Processing failed:', error.message);
process.exit(1);
});
`What just happened?
- ✅
sentiment and topics ran in parallel (both have no dependencies)
- ✅ summary waited for both to complete
- ✅ All sections processed in parallel
- ✅ 2 reviews × 3 dimensions = 6 AI calls, all optimized automaticallyNext: Full Documentation • Examples • Production Guide
---
📊 Why Choose dagengine?
| Feature | DIY Code | LangChain | dagengine |
|---------|----------|-----------|-----------|
| Setup | Manual loops | Learn LCEL | 2 methods |
| Parallelization | Manual | Manual | Automatic |
| Cost Tracking | Manual calc | Manual calc | Built-in |
| TypeScript | ✅ Full | ⚠️ Partial | ✅ Full |
| Code (100 items) | 150 lines | 80 lines | 25 lines |
| Best For | Small scripts | RAG/Agents | Orchestration |
Use dagengine when:
- ✅ Processing 100+ items with multiple AI analyses
- ✅ Want automatic parallelization without complexity
- ✅ Need built-in cost tracking
- ✅ TypeScript projects
Skip dagengine when:
- ❌ Single AI calls (overkill)
- ❌ Need RAG/agents (use LangChain)
- ❌ Python projects (we're TypeScript-only)
---
⚡ Key Features
$3
Define task dependencies once. Engine handles execution order, parallelization, and coordination automatically. No queues, workers, or complex orchestration code.$3
Skip low-value processing with conditional execution. Route tasks to optimal models. Track costs per dimension in real-time with automatic token counting.$3
Automatic retry with exponential backoff. Provider fallback chains. Graceful error recovery with partial results. Battle-tested reliability.
$3
Use Anthropic Claude, OpenAI GPT, Google Gemini with a unified interface. Switch providers per dimension. Mix models in one workflow.$3
Full async/await support. Integrate databases, caches, APIs at every processing stage. Transform data mid-pipeline. Complete control when you need it.$3
Built-in cost and token tracking per dimension and provider. Progress callbacks with throughput metrics. Detailed breakdowns in results.
---
💡 Core Concepts
$3
`typescript
const sections = [
{
content: 'Customer review text here',
metadata: { id: 1, userId: 123, productId: 'SKU-789' }
}
];
`Sections are the pieces of data you analyze (reviews, emails, documents, etc.).
$3
`typescript
this.dimensions = ['sentiment', 'topics', 'summary'];
`Dimensions are the analyses you run. Each dimension processes all sections.
$3
`typescript
defineDependencies() {
return {
sentiment: [], // No dependencies (runs first)
topics: [], // No dependencies (runs first)
summary: ['sentiment', 'topics'] // Waits for both
};
}
`Dependencies control execution order. Engine automatically parallelizes independent tasks.
`
Execution Plan:
sentiment ──┐
├─→ Both run in parallel → summary
topics ─────┘
`$3
Section Dimensions (default) - Analyze each item independently:
`typescript
this.dimensions = ['sentiment']; // Runs once per section
`Global Dimensions - Analyze all items together:
`typescript
this.dimensions = [
{ name: 'categorize', scope: 'global' } // Runs once for all sections
];
`---
🎨 Advanced Features
$3
`typescript
class SmartAnalyzer extends Plugin {
dimensions = ['quality_check', 'deep_analysis'];
defineDependencies() {
return { deep_analysis: ['quality_check'] };
} shouldSkipSectionDimension(context) {
if (context.dimension === 'deep_analysis') {
const quality = context.dependencies.quality_check.data;
return quality.score < 0.7; // Skip low-quality items
}
return false;
}
selectProvider(dimension) {
if (dimension === 'quality_check') {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' } // Cheap model
};
}
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' } // Expensive model
};
}
}
`Result: 100 items → 40 high-quality → 60% fewer expensive API calls
$3
`typescript
selectProvider() {
return {
provider: 'anthropic',
options: { model: 'claude-sonnet-4-5-20250929' },
fallbacks: [
{ provider: 'openai', options: { model: 'gpt-4o' } },
{ provider: 'gemini', options: { model: 'gemini-2.5-pro' } }
]
};
}
`Automatic failover: If Anthropic fails, automatically tries OpenAI, then Gemini.
$3
`typescript
class CategoryAnalyzer extends Plugin {
dimensions = [
'classify',
{ name: 'group_by_category', scope: 'global' },
'analyze_category'
]; transformSections(context) {
if (context.dimension === 'group_by_category') {
const categories = context.result.data.categories;
// Transform: 100 sections → 5 category groups
return categories.map(cat => ({
content: cat.items.join('\n\n'),
metadata: { category: cat.name, count: cat.items.length }
}));
}
}
}
`Result: Analyze 5 category groups instead of 100 individual items (95% fewer API calls)
$3
`typescript
class DatabaseIntegratedPlugin extends Plugin {
async beforeProcessStart(context) {
// Initialize connections
await this.db.connect();
} async shouldSkipSectionDimension(context) {
// Check cache before processing
const cached = await this.redis.get(
${context.section.id}:${context.dimension});
if (cached) return true;
return false;
} async afterDimensionExecute(context) {
// Save results to database
await this.db.results.insert({
section: context.section.id,
dimension: context.dimension,
data: context.result.data
});
}
async afterProcessComplete(context) {
// Cleanup
await this.db.disconnect();
}
}
`All 18 hooks support async/await for seamless external service integration.
---
📚 Documentation
$3
- Quick Start - Get started in 5 minutes
- Core Concepts - Understand sections, dimensions, dependencies
- Examples - Complete working examples
$3
1. Hello World - Your first plugin
2. Dependencies - Control execution order
3. Section vs Global - Two dimension types
4. Transformations - Reshape data mid-pipeline
5. Skip Logic - Optimize costs
6. Multi-Provider - Route to different models
7. Async Hooks - Database integration
8. Error Handling - Graceful recovery
$3
- Portkey Gateway - Rate limit protection & caching
- Production Quickstart - Complete production workflow
$3
- Configuration - Engine config options
- Lifecycle Hooks - All 18 hooks explained
- Type Definitions - TypeScript interfaces
---
🌐 Supported Providers
| Provider | Description | Best For | Docs |
|----------|-------------|----------|------|
| Anthropic | Claude models for reasoning and analysis | Complex tasks, deep reasoning | Docs |
| OpenAI | GPT models for general-purpose tasks | Fast responses, versatile workflows | Docs |
| Google Gemini | Gemini models for high-speed processing | High throughput, multimodal inputs | Docs |
Mix and match: Route different dimensions to different providers in the same workflow.
`typescript
selectProvider(dimension) {
if (dimension === 'quality_check') {
return { provider: 'gemini', options: { model: 'gemini-1.5-flash' } };
}
if (dimension === 'deep_analysis') {
return { provider: 'anthropic', options: { model: 'claude-sonnet-4-5-20250929' } };
}
}
`---
🔄 Gateway Support
dagengine supports Portkey as a unified AI gateway for advanced features:
| Feature | Direct Mode | With Portkey Gateway |
|---------|-------------|---------------------|
| Automatic Retries | ✅ Engine-level | ✅ Gateway-level with smart backoff |
| Rate Limit Handling | ⚠️ Manual | ✅ Automatic with queuing |
| Semantic Caching | ❌ | ✅ Reduce costs and latency |
| Load Balancing | ❌ | ✅ Multi-provider routing |
| Observability | ✅ Basic | ✅ Full dashboard & analytics |
Enable Portkey:
`typescript
providers: {
anthropic: {
apiKey: process.env.ANTHROPIC_API_KEY,
gateway: 'portkey',
gatewayApiKey: process.env.PORTKEY_API_KEY,
gatewayConfig: 'pc-my-config-id' // Optional: retry/cache config
}
}
`Learn more: Portkey Integration Guide • Portkey Docs
---
📦 Configuration
`typescript
const engine = new DagEngine({
plugin: new MyPlugin(),
// Provider credentials
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
openai: { apiKey: process.env.OPENAI_API_KEY }
},
// Execution settings
execution: {
concurrency: 10, // Max parallel operations
maxRetries: 3, // Retry attempts
retryDelay: 1000, // Base delay (ms)
timeout: 60000, // Default timeout
continueOnError: true // Process partial results
},
// Cost tracking
pricing: {
models: {
'claude-sonnet-4-5-20250929': {
inputPer1M: 3.00,
outputPer1M: 15.00
}
}
},
// Progress display
progressDisplay: {
display: 'bar', // 'simple' | 'bar' | 'multi' | 'none'
showDimensions: true
}
});
`---
🛠️ Development
`bash
Clone repository
git clone https://github.com/dagengine/dagengine.git
cd dagengineInstall dependencies
npm installRun tests
npm testRun tests with coverage
npm run test:coverageType check
npm run type-checkBuild
npm run buildRun all checks
npm run validate
`---
🤝 Contributing
We welcome contributions! Here's how to get started:
$3
1. Fork the repository on GitHub
2. Clone your fork:
git clone https://github.com/YOUR_USERNAME/dagengine.git
3. Create a branch: git checkout -b feature/your-feature-name
4. Make your changes and add tests
5. Run validation: npm run validate
6. Commit: git commit -m "feat: add your feature"
7. Push: git push origin feature/your-feature-name
8. Open a Pull Request on GitHub$3
- Code Style: We use Prettier and ESLint (run
npm run format && npm run lint:fix)
- Tests: Add tests for new features (run npm test)
- Types: Maintain full TypeScript coverage (run npm run type-check`)- 💬 Start a discussion
- 🐛 Report a bug
- 💡 Request a feature
See CONTRIBUTING.md for detailed guidelines.
---
1. 📖 Check the docs - Documentation
2. 🔍 Search existing Q&A - GitHub Discussions
3. 💬 Ask a question - Start a discussion
4. 🐛 Found a bug? - Open an issue
- 📦 npm Package - Install and updates
- ⭐ GitHub - Star for updates
- 💬 Discussions - Feature requests, Q&A
---
We take security seriously. See SECURITY.md for our security policy.
Never report security issues through public GitHub issues.
Use GitHub's private vulnerability reporting or email the maintainers directly.
---
Apache License 2.0 © dagengine contributors
Licensed under the Apache License, Version 2.0. See LICENSE for the full license text.
This license includes an explicit patent grant (Section 3), protecting users from patent litigation. See LICENSE for details.
---
Built with:
- TypeScript - Type safety
- @dagrejs/graphlib - DAG algorithms
- p-queue - Concurrency control
---
!GitHub stars
!npm downloads
!GitHub issues
!GitHub pull requests
!GitHub contributors
---
- 🌐 Homepage: dagengine.ai
- 📖 Documentation: https://www.dagengine.ai/guide/quick-start.html
- 📦 npm Package: @dagengine/core
- 💬 Community: GitHub Discussions
- 🐛 Issue Tracker: GitHub Issues
- 🔐 Security: Security Policy
---
⭐ Star us on GitHub — it helps the project grow!
Made with ❤️ by the dagengine community