Transactional middleware for AI agents - Saga pattern with automatic rollback
npm install @saga-engine/coreTransactional middleware for AI agents — Ctrl+Z for the real world.




Saga Engine implements the Saga pattern for AI agents and autonomous systems. When your agent books a flight, reserves a hotel, and then fails to rent a car — Saga Engine automatically cancels the hotel and flight in reverse order.
AI agents are increasingly performing real-world actions: booking travel, processing payments, managing infrastructure. But what happens when step 5 of 7 fails?
Without Saga Engine: Manual cleanup, inconsistent state, frustrated users.
With Saga Engine: Automatic rollback, consistent state, reliable operations.
``typescript
const result = await orchestrator.execute(bookTripSaga, {
flight: { from: 'NYC', to: 'LAX' },
hotel: { city: 'Los Angeles', nights: 3 },
car: { pickup: 'LAX' }
});
if (!result.success) {
// Flight and hotel automatically cancelled
console.log('Booking failed, all changes rolled back');
}
`
- Automatic Compensation — Define rollback logic once, executed automatically on failure
- Type-Safe — Full TypeScript support with generics for input/output types
- Pluggable Storage — In-memory for dev, Redis/Postgres for production
- Observable — Rich event system for monitoring and debugging
- Crash Recovery — Resume interrupted sagas after process restart
- Battle-Tested — 193+ tests covering edge cases and production scenarios
`bash`
npm install @saga-engine/coreor
pnpm add @saga-engine/coreor
yarn add @saga-engine/core
`typescript
import { Saga, SagaOrchestrator, InMemoryStore } from '@saga-engine/core';
// 1. Define your saga with steps and compensations
const orderSaga = new Saga<{ orderId: string; amount: number }>('process-order')
.step({
name: 'reserve-inventory',
execute: async (ctx) => {
const reservation = await inventoryService.reserve(ctx.input.orderId);
return { reservationId: reservation.id };
},
compensate: async (ctx, result) => {
await inventoryService.release(result.reservationId);
}
})
.step({
name: 'charge-payment',
execute: async (ctx) => {
const charge = await paymentService.charge(ctx.input.amount);
return { chargeId: charge.id };
},
compensate: async (ctx, result) => {
await paymentService.refund(result.chargeId);
}
})
.step({
name: 'send-confirmation',
execute: async (ctx) => {
await emailService.send(ctx.input.orderId, 'Order confirmed!');
return { sent: true };
}
});
// 2. Create orchestrator with storage
const orchestrator = new SagaOrchestrator({
store: new InMemoryStore()
});
// 3. Execute the saga
const result = await orchestrator.execute(orderSaga, {
orderId: 'ORD-123',
amount: 99.99
});
if (result.success) {
console.log('Order processed:', result.stepResults);
} else {
console.log('Order failed:', result.error.message);
console.log('Compensated:', result.compensated);
}
`
A saga is a sequence of steps that form a logical transaction. Each step can have:
- execute — The action to perform
- compensate — The rollback action (optional)
`typescript`
const saga = new Saga
.step({ name: 'step-1', execute: async (ctx) => {...}, compensate: async (ctx, result) => {...} })
.step({ name: 'step-2', execute: async (ctx) => {...} });
Each step receives a context with:
- sagaId — Unique execution IDinput
- — The input provided when starting the sagastepResults
- — Results from previous steps (Map)
`typescript`
execute: async (ctx) => {
const previousResult = ctx.stepResults.get('previous-step');
return { processed: ctx.input.data };
}
When a step fails, compensation runs in reverse order for all completed steps:
`
Step 1: reserve-inventory ✅
Step 2: charge-payment ✅
Step 3: send-email ❌ FAILED
Compensation:
→ charge-payment.compensate() // Refund
→ reserve-inventory.compensate() // Release
`
`typescript`
const orchestrator = new SagaOrchestrator({
store: new InMemoryStore(),
onCompensationFailure: 'continue', // 'retry' | 'continue' | 'halt'
compensationRetries: 3, // For 'retry' strategy
compensationRetryDelay: 1000 // Milliseconds between retries
});
Monitor saga execution with events:
`typescriptSaga ${state.sagaName} started
orchestrator.on('saga:started', (state) => {
console.log();
});
orchestrator.on('step:executed', (state, stepName, result) => {
console.log(Step ${stepName} completed:, result);
});
orchestrator.on('step:failed', (state, stepName, error) => {
console.log(Step ${stepName} failed:, error.message);
});
orchestrator.on('compensation:started', (state) => {
console.log('Starting rollback...');
});
orchestrator.on('compensation:completed', (state) => {
console.log('Rollback complete');
});
orchestrator.on('saga:completed', (state) => {
console.log('Saga completed successfully');
});
orchestrator.on('saga:failed', (state, error) => {
console.log('Saga failed:', error.message);
});
`
`typescript
import { InMemoryStore } from '@saga-engine/core';
const store = new InMemoryStore();
`
`typescript
import { RedisStore } from '@saga-engine/redis';
const store = new RedisStore({
url: 'redis://localhost:6379',
keyPrefix: 'saga:'
});
`
`typescript
import { PostgresStore } from '@saga-engine/postgres';
const store = new PostgresStore({
connectionString: process.env.DATABASE_URL
});
`
Implement the StateStore interface:
`typescript`
interface StateStore {
create(state: SagaState): Promise
get(sagaId: string): Promise
updateStatus(sagaId: string, status: SagaStatus, completedAt?: Date): Promise
updateStep(sagaId: string, stepName: string, stepState: Partial
getPendingSagas(): Promise
delete?(sagaId: string): Promise
close?(): Promise
}
Recover interrupted sagas on startup:
`typescript
const orchestrator = new SagaOrchestrator({ store: redisStore });
// On application startup
await orchestrator.recover();
`
`typescript`
const processOrderSaga = new Saga
.step({
name: 'validate-cart',
execute: async (ctx) => {
const valid = await cartService.validate(ctx.input.cartId);
if (!valid) throw new Error('Invalid cart');
return { validated: true };
}
})
.step({
name: 'reserve-inventory',
execute: async (ctx) => {
const items = await inventoryService.reserve(ctx.input.items);
return { reservedItems: items };
},
compensate: async (ctx, result) => {
await inventoryService.release(result.reservedItems);
}
})
.step({
name: 'process-payment',
execute: async (ctx) => {
const payment = await paymentService.charge({
amount: ctx.input.total,
customerId: ctx.input.customerId
});
return { paymentId: payment.id };
},
compensate: async (ctx, result) => {
await paymentService.refund(result.paymentId);
}
})
.step({
name: 'create-shipment',
execute: async (ctx) => {
const shipment = await shippingService.create({
address: ctx.input.shippingAddress,
items: ctx.stepResults.get('reserve-inventory').reservedItems
});
return { trackingNumber: shipment.tracking };
},
compensate: async (ctx, result) => {
await shippingService.cancel(result.trackingNumber);
}
});
`typescript`
const agentTaskSaga = new Saga
.step({
name: 'acquire-resources',
execute: async (ctx) => {
const resources = await resourceManager.acquire(ctx.input.requirements);
return { resourceIds: resources.map(r => r.id) };
},
compensate: async (ctx, result) => {
await resourceManager.release(result.resourceIds);
}
})
.step({
name: 'execute-action',
execute: async (ctx) => {
const result = await actionExecutor.run(ctx.input.action);
return { actionResult: result };
},
compensate: async (ctx, result) => {
await actionExecutor.undo(result.actionResult);
}
})
.step({
name: 'commit-changes',
execute: async (ctx) => {
await changeTracker.commit(ctx.sagaId);
return { committed: true };
}
});
`typescript
import { Tool } from 'langchain/tools';
const bookingTool = new Tool({
name: 'book-travel',
description: 'Book flights and hotels with automatic rollback on failure',
func: async (input: string) => {
const params = JSON.parse(input);
const result = await orchestrator.execute(bookTripSaga, params);
return JSON.stringify(result);
}
});
`
`typescript
import { tool } from 'ai';
const processOrderTool = tool({
description: 'Process an order with inventory, payment, and shipping',
parameters: z.object({
orderId: z.string(),
items: z.array(z.object({ sku: z.string(), quantity: z.number() })),
customerId: z.string()
}),
execute: async (params) => {
return orchestrator.execute(orderSaga, params);
}
});
`
`typescript`
new Saga
.step
.addSteps(definitions: StepDefinition[]): Saga
.getStep(name: string): SagaStep | undefined
.hasSteps(): boolean
.steps: ReadonlyArray
.stepCount: number
.name: string
`typescript`
new SagaOrchestrator(options: OrchestratorOptions & { store: StateStore })
.execute
.recover(): Promise
.on
.off
`typescript
interface SagaResult
success: boolean;
sagaId: string;
result?: TResult; // On success
stepResults?: Map
error?: Error; // On failure
failedStep?: string;
compensated?: boolean;
compensationErrors?: Array<{ step: string; error: Error }>;
}
interface SagaContext
sagaId: string;
input: TInput;
stepResults: Map
}
interface StepDefinition
name: string;
execute: (ctx: SagaContext
compensate?: (ctx: SagaContext
}
`
Looking for advanced features? Check out Saga Engine Cloud:
- Hosted State Management — Durable storage with global replication
- Observability Dashboard — Real-time monitoring, tracing, and analytics
- Team Collaboration — Role-based access control and audit logs
- SLA Guarantees — 99.99% uptime with dedicated support
Contact us for Enterprise pricing →
We welcome contributions! Please see our Contributing Guide for details.
`bashClone the repository
git clone https://github.com/Chetan-svg/saga-engine.git
cd saga-engine
MIT © Verto AI LLC
---
Website •
Documentation •
Issues