A production-grade stable & flexible execution engine for resilient workflows, API integrations, and batch processing.
npm install @emmvish/stable-infraA stability-first production-grade TypeScript framework for resilient API integrations, batch processing, and orchestrating complex workflows with deterministic error handling, type safety, and comprehensive observability.
- Overview
- Core Concepts
- Core Modules
- stableRequest
- stableFunction
- stableApiGateway
- stableWorkflow
- stableWorkflowGraph
- StableScheduler
- StableBuffer
- Distributed Infrastructure
- Distributed Buffer
- Distributed Resilience
- Distributed Scheduler
- Multi-Node Integration Example
- Stable Runner
- Resilience Mechanisms
- Retry Strategies
- Circuit Breaker
- Caching
- Rate Limiting
- Concurrency Limiting
- Workflow Patterns
- Sequential & Concurrent Phases
- Non-Linear Workflows
- Branched Workflows
- Graph-based Workflow Patterns
- Graph-Based Workflows with Mixed Items
- Parallel Phase Execution
- Merge Points
- Linear Helper
- Branch Racing in Graphs
- Configuration & State
- Config Cascading
- Shared & State Buffers
- Hooks & Observability
- Pre-Execution Hooks
- Analysis Hooks
- Handler Hooks
- Decision Hooks
- Metrics & Logging
- Advanced Features
- Trial Mode
- State Persistence
- Mixed Request & Function Phases
- Best Practices
---
@emmvish/stable-infra evolved from a focused library for resilient API calls to a comprehensive execution framework. Originally addressing API integration challenges via stableRequest, it expanded to include:
1. Batch orchestration via stableApiGateway for processing groups of mixed requests/functions
2. Phased workflows via stableWorkflow for array-based multi-phase execution with dynamic control flow
3. Graph-based workflows via stableWorkflowGraph for DAG execution with higher parallelism
4. Generic function execution via stableFunction, inheriting all resilience guards
5. Queue based scheduling via StableScheduler, with option to preserve scheduler state and recover from saved state
6. Transactional shared state via StableBuffer, a concurrency-safe buffer you can pass as commonBuffer or sharedBuffer
7. Distributed coordination via DistributedCoordinator for multi-node locking, state, leader election, pub/sub, and 2PC transactions
All core modules support the same resilience stack: retries, jitter, circuit breaking, caching, rate/concurrency limits, config cascading, shared buffers, trial mode, comprehensive hooks, and metrics. This uniformity makes it trivial to compose requests and functions in any topology. Finally, Stable Runner executes jobs from config.
---
Every execution—whether a single request, a pure function, or an entire workflow—inherits built-in resilience:
- Retries with configurable backoff strategies (FIXED, LINEAR, EXPONENTIAL)
- Jitter to prevent thundering herd
- Circuit breaker to fail fast and protect downstream systems
- Caching for idempotent read operations
- Rate & concurrency limits to respect external constraints
- Metrics guardrails to validate execution against thresholds with automatic anomaly detection
All examples in this guide use TypeScript generics for type-safe request/response data and function arguments/returns. Analyzers validate shapes at runtime; TypeScript ensures compile-time safety.
Global defaults → group overrides → phase overrides → branch overrides → item overrides. Lower levels always win, preventing repetition while maintaining expressiveness.
Workflows and gateways support sharedBuffer for passing computed state across phases/branches/items without global state.
---
Single API call with resilience, type-safe request and response types.
``typescript
import { stableRequest, REQUEST_METHODS, VALID_REQUEST_PROTOCOLS } from '@emmvish/stable-infra';
interface GetUserRequest {
// Empty for GET requests with no body
}
interface User {
id: number;
name: string;
}
const result = await stableRequest
reqData: {
method: REQUEST_METHODS.GET,
protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
hostname: 'api.example.com',
path: '/users/1'
},
resReq: true,
attempts: 3,
wait: 500,
jitter: 100,
cache: { enabled: true, ttl: 5000 },
rateLimit: { maxRequests: 10, windowMs: 1000 },
maxConcurrentRequests: 5,
responseAnalyzer: ({ data }) => {
return typeof data === 'object' && data !== null && 'id' in data;
},
handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
console.log(User loaded: ${successfulAttemptData.data.name});
}
});
if (result.success) {
console.log(result.data.name, result.metrics.totalAttempts);
} else {
console.error(result.error);
}
`
Key responsibilities:
- Execute a single HTTP request with automatic retry and backoff
- Validate response shape via analyzer; retry if invalid
- Cache successful responses with TTL
- Apply rate and concurrency limits
- Throw or gracefully suppress errors via finalErrorAnalyzer
- Collect attempt metrics and infra dashboards (circuit breaker, cache, rate limiter state)
Generic async/sync function execution with identical resilience.
`typescript
import { stableFunction, RETRY_STRATEGIES } from '@emmvish/stable-infra';
type ComputeArgs = [number, number];
type ComputeResult = number;
const multiply = (a: number, b: number) => a * b;
const result = await stableFunction
fn: multiply,
args: [5, 3],
returnResult: true,
attempts: 2,
wait: 100,
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
responseAnalyzer: ({ data }) => data > 0,
cache: { enabled: true, ttl: 10000 }
});
if (result.success) {
console.log('Result:', result.data); // 15
}
`
Key responsibilities:
- Execute any async or sync function with typed arguments and return
- Support argument-based cache key generation
- Retry on error or analyzer rejection
- Enforce success criteria via analyzer
- Optionally suppress exceptions
Batch orchestration of mixed requests and functions.
`typescript
import {
stableApiGateway,
REQUEST_METHODS,
VALID_REQUEST_PROTOCOLS,
RequestOrFunction
} from '@emmvish/stable-infra';
import type { API_GATEWAY_ITEM } from '@emmvish/stable-infra';
// Define request types
interface ApiRequestData {
filters?: Record
}
interface ApiResponse {
id: number;
value: string;
}
// Define function types
type TransformArgs = [ApiResponse[], number];
type TransformResult = {
transformed: ApiResponse[];
count: number;
};
type ValidateArgs = [TransformResult];
type ValidateResult = boolean;
const items: API_GATEWAY_ITEM
{
type: RequestOrFunction.REQUEST,
request: {
id: 'fetch-data',
requestOptions: {
reqData: {
method: REQUEST_METHODS.GET,
protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
hostname: 'api.example.com',
path: '/data'
},
resReq: true,
attempts: 3
}
}
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'transform-data',
functionOptions: {
fn: (data: ApiResponse[], threshold: number): TransformResult => ({
transformed: data.filter(item => item.id > threshold),
count: data.length
}),
args: [[], 10] as TransformArgs,
returnResult: true,
attempts: 2,
cache: { enabled: true, ttl: 5000 }
}
}
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'validate-result',
functionOptions: {
fn: (result: TransformResult): ValidateResult => result.count > 0,
args: [{ transformed: [], count: 0 }] as ValidateArgs,
returnResult: true
}
}
}
];
const responses = await stableApiGateway
concurrentExecution: true,
stopOnFirstError: false,
sharedBuffer: {},
commonAttempts: 2,
commonWait: 300,
maxConcurrentRequests: 3
});
// Access individual responses
responses.forEach((resp, i) => {
console.log(Item ${i}: success=${resp.success});
});
// Access aggregate metrics
console.log(Success rate: ${responses.metrics.successRate.toFixed(2)}%);Execution time: ${responses.metrics.executionTime}ms
console.log();Throughput: ${responses.metrics.throughput.toFixed(2)} req/s
console.log();Average duration: ${responses.metrics.averageRequestDuration.toFixed(2)}ms
console.log();`
#### Request/Function Racing
Enable racing to accept the first successful request or function and cancel others, useful for redundant API calls or failover scenarios.
`typescript
const responses = await stableApiGateway(items, {
concurrentExecution: true,
enableRacing: true, // First successful item wins, others cancelled
maxConcurrentRequests: 10
});
// responses contains only the winning result
// Losing items marked as cancelled with appropriate error
`
Key responsibilities:
- Execute a batch of requests and functions concurrently or sequentially
- Apply global, group-level, and item-level config overrides
- Maintain shared buffer across items for state passing
- Stop on first error or continue despite failures
- Collect per-item and aggregate metrics (success rates, execution time, throughput)
- Support request grouping with group-specific config
- Track infrastructure metrics (circuit breaker, cache, rate limiter, concurrency)
Phased array-based workflows with sequential/concurrent phases, mixed items, and non-linear control flow.
You can start a workflow from a specific phase using startPhaseIndex (0-based). When starting inside a concurrent group (markConcurrentPhase), execution aligns to the group’s first phase.
`typescript
import { stableWorkflow, PHASE_DECISION_ACTIONS, RequestOrFunction, REQUEST_METHODS } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE, API_GATEWAY_ITEM } from '@emmvish/stable-infra';
// Define types for requests
interface FetchRequestData {}
interface FetchResponse {
users: Array<{ id: number; name: string }>;
posts: Array<{ id: number; title: string }>;
}
// Define types for functions
type ProcessArgs = [FetchResponse];
type ProcessResult = {
merged: Array<{ userId: number; userName: string; postTitle: string }>;
};
type AuditArgs = [ProcessResult, string];
type AuditResult = { logged: boolean; timestamp: string };
const phases: STABLE_WORKFLOW_PHASE
{
id: 'fetch-data',
requests: [
{
id: 'get-users-posts',
requestOptions: {
reqData: {
hostname: 'api.example.com',
path: '/users-and-posts'
},
resReq: true,
attempts: 3
}
}
]
},
{
id: 'process-and-audit',
markConcurrentPhase: true,
items: [
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'process-data',
functionOptions: {
fn: (data: FetchResponse): ProcessResult => ({
merged: data.users.map((user, idx) => ({
userId: user.id,
userName: user.name,
postTitle: data.posts[idx]?.title || 'No post'
}))
}),
args: [{ users: [], posts: [] }] as ProcessArgs,
returnResult: true
}
}
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'audit-processing',
functionOptions: {
fn: async (result: ProcessResult, auditId: string): Promise
console.log(Audit ${auditId}:, result);
return { logged: true, timestamp: new Date().toISOString() };
},
args: [{ merged: [] }, 'audit-123'] as AuditArgs,
returnResult: true
}
}
}
],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
if (!phaseResult.success) {
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
}
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
{
id: 'finalize',
requests: [
{
id: 'store-result',
requestOptions: {
reqData: {
hostname: 'api.example.com',
path: '/store',
method: REQUEST_METHODS.POST
},
resReq: false
}
}
]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'data-pipeline',
concurrentPhaseExecution: false, // Phases sequential
enableNonLinearExecution: true,
sharedBuffer: { userId: '123' },
commonAttempts: 2,
commonWait: 200,
handlePhaseCompletion: ({ phaseResult, workflowId }) => {
console.log(Phase ${phaseResult.phaseId} complete in workflow ${workflowId});
}
});
console.log(Workflow succeeded: ${result.success}, phases: ${result.totalPhases});`
Key responsibilities:
- Execute phases sequentially or concurrently
- Support mixed requests and functions per phase
- Enable non-linear flow (CONTINUE, SKIP, REPLAY, JUMP, TERMINATE)
- Maintain shared buffer across all phases
- Apply phase-level and request-level config cascading
- Support branching with parallel/sequential branches
- Collect per-phase metrics and workflow aggregates
DAG-based execution for higher parallelism and explicit phase dependencies.
`typescript
import { stableWorkflowGraph, WorkflowGraphBuilder } from '@emmvish/stable-infra';
const graph = new WorkflowGraphBuilder()
.addPhase('fetch-posts', {
requests: [{
id: 'get-posts',
requestOptions: {
reqData: { hostname: 'api.example.com', path: '/posts' },
resReq: true
}
}]
})
.addPhase('fetch-users', {
requests: [{
id: 'get-users',
requestOptions: {
reqData: { hostname: 'api.example.com', path: '/users' },
resReq: true
}
}]
})
.addParallelGroup('fetch-all', ['fetch-posts', 'fetch-users'])
.addPhase('aggregate', {
functions: [{
id: 'combine',
functionOptions: {
fn: () => ({ posts: [], users: [] }),
args: [],
returnResult: true
}
}]
})
.addMergePoint('sync', ['fetch-all'])
.connectSequence('fetch-all', 'sync', 'aggregate')
.setEntryPoint('fetch-all')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'data-aggregation'
});
console.log(Graph workflow success: ${result.success});`
Key responsibilities:
- Define phases as DAG nodes with explicit dependency edges
- Execute independent phases in parallel automatically
- Support parallel groups, merge points, and conditional routing
- Validate graph structure (cycle detection, reachability, orphan detection)
- Provide deterministic execution order
- Offer higher parallelism than phased workflows for complex topologies
Queue-based scheduler for cron/interval/timestamp execution with concurrency limits and recoverable state via custom persistence handlers.
Key responsibilities:
- Enforce max-parallel job execution
- Schedule jobs with cron, interval, or timestamp(s)
- Persist and restore scheduler state via user-provided handlers
Transactional, concurrency-safe shared state. It’s opt-in: pass a StableBuffer instance as commonBuffer or sharedBuffer to serialize updates across concurrent executions.
Key features:
- Serialized transactions via FIFO queue
- Snapshot reads with read()logTransaction
- Optional transaction timeouts
- Optional transaction logging with
`ts
import { StableBuffer } from '@emmvish/stable-infra';
const buffer = new StableBuffer({
initialState: { counter: 0 },
transactionTimeoutMs: 500,
logTransaction: (log) => {
// persist log.transactionId, log.activity, log.hookName, log.stateBefore, log.stateAfter
}
});
await buffer.run(
(state) => { state.counter += 1; },
{ activity: 'workflow-phase', hookName: 'phase-1', workflowId: 'wf-1' }
);
`
Replay utility (transaction logs → deterministic state replay):
`ts
import { replayStableBufferTransactions } from '@emmvish/stable-infra';
const replay = await replayStableBufferTransactions({
logs, // StableBufferTransactionLog[]
handlers: {
'phase-1': (state) => { state.counter += 1; }
},
initialState: { counter: 0 }
});
console.log(replay.buffer.getState());
`
Multi-node coordination via a shared backend (Redis, PostgreSQL, etcd, etc.). Nodes connect independently to the backend — no peer-to-peer discovery or IP exchange required. All coordination goes through the DistributedCoordinator, backed by a pluggable DistributedAdapter.
Key capabilities: distributed locking with fencing tokens, compare-and-swap (CAS), quorum-based leader election, pub/sub with delivery guarantees (at-most-once / at-least-once / exactly-once), two-phase commit transactions, distributed buffers, and distributed schedulers.
`typescript
import { DistributedCoordinator, InMemoryDistributedAdapter } from '@emmvish/stable-infra';
const coordinator = new DistributedCoordinator({
adapter: new InMemoryDistributedAdapter('node-1'), // Use Redis/Postgres adapter in production
namespace: 'my-app',
enableLeaderElection: true,
leaderHeartbeatMs: 5000,
});
await coordinator.connect();
// Distributed lock with fencing token
const lock = await coordinator.acquireLock({ resource: 'order:123', ttlMs: 30000 });
// Leader election — nodes register via shared backend, no IP discovery needed
await coordinator.registerForElection('scheduler-leader');
await coordinator.campaignForLeader({ electionKey: 'scheduler-leader' });
// Shared state & pub/sub across nodes
await coordinator.setState('user:123', { balance: 100 });
await coordinator.publish('events', { type: 'ORDER_CREATED', orderId: 123 });
`
An InMemoryDistributedAdapter is included for testing and single-instance use. For production multi-node deployments, implement the DistributedAdapter interface with your backend of choice.
#### Distributed Buffer — Cross-Node StableBuffer
Wrap a StableBuffer with distributed sync so transactions on one node propagate to all others via pub/sub. Conflict resolution is configurable.
`typescript
import {
createDistributedStableBuffer,
DistributedConflictResolution,
withDistributedBufferLock
} from '@emmvish/stable-infra';
const { buffer, sync, refresh, disconnect } = await createDistributedStableBuffer({
distributed: { adapter, namespace: 'my-app' },
initialState: { counter: 0, items: [] },
conflictResolution: DistributedConflictResolution.LAST_WRITE_WINS, // or MERGE, CUSTOM
syncOnTransaction: true // auto-push state after each buffer.run()
});
// Use exactly like a local StableBuffer — sync happens automatically
await buffer.run(state => { state.counter += 1; });
// Acquire a distributed lock for critical sections
await withDistributedBufferLock({ buffer, coordinator }, async () => {
await buffer.run(state => { state.counter -= 100; }); // exclusive cross-node access
}, { ttlMs: 10000 });
await refresh(); // Force pull latest state from remote
await disconnect();
`
#### Distributed Resilience — Shared Circuit Breaker, Rate Limiter, Cache
Every resilience component (CircuitBreaker, RateLimiter, ConcurrencyLimiter, CacheManager) supports a persistence interface. The distributed layer provides an implementation backed by coordinator.getState/setState, so component state (failure counts, open/closed status, rate windows) is synchronized across nodes.
Create them individually or as a bundle:
`typescript
import { createDistributedInfrastructureBundle } from '@emmvish/stable-infra';
// One coordinator, all components share the same backend
const infra = await createDistributedInfrastructureBundle({
distributed: { adapter, namespace: 'my-service' },
circuitBreaker: {
failureThresholdPercentage: 50,
minimumRequests: 10,
recoveryTimeoutMs: 30000
},
rateLimiter: { maxRequests: 1000, windowMs: 60000 },
concurrencyLimiter: { limit: 50 },
cacheManager: { enabled: true, ttl: 300000 }
});
// Plug directly into any core module via sharedInfrastructure
const scheduler = new StableScheduler({
maxParallel: 10,
sharedInfrastructure: {
circuitBreaker: infra.circuitBreaker, // shared state across nodes
rateLimiter: infra.rateLimiter,
concurrencyLimiter: infra.concurrencyLimiter,
cacheManager: infra.cacheManager
}
}, handler);
await infra.disconnect(); // cleanup
`
Standalone factories are also available: createDistributedCircuitBreaker(), createDistributedRateLimiter(), createDistributedConcurrencyLimiter(), createDistributedCacheManager().
#### Distributed Scheduler — Leader-Based Execution
Wrap StableScheduler so only the elected leader node processes jobs, with distributed state persistence and shared resilience infrastructure.
`typescript
import { runAsDistributedScheduler } from '@emmvish/stable-infra';
const runner = await runAsDistributedScheduler({
distributed: { adapter, namespace: 'workers' },
scheduler: { maxParallel: 5 },
circuitBreaker: { failureThresholdPercentage: 50, minimumRequests: 10, recoveryTimeoutMs: 30000 },
rateLimiter: { maxRequests: 100, windowMs: 60000 },
createScheduler: (config) => new StableScheduler(config, async (job) => {
await processJob(job);
})
});
await runner.start(); // Campaigns for leadership, starts scheduler when elected
runner.isLeader(); // Check current status
await runner.stop(); // Graceful shutdown, resigns leadership
`
Internally, scheduler state (job queue, execution history) is persisted via the coordinator. When the leader crashes or resigns, a new leader is elected (after the previous leader’s lease expires). The new leader automatically restores state from the coordinator and runs remaining jobs, so work resumes from where the previous leader left off.
#### Multi-Node Integration Example
Combining distributed buffer, shared circuit breaker, leader election, pub/sub, locking, and workflows:
`typescript
import {
createDistributedStableBuffer, createDistributedSchedulerConfig,
DistributedCoordinator, DistributedConflictResolution,
DistributedTransactionOperationType, withDistributedBufferLock,
StableScheduler
} from '@emmvish/stable-infra';
// 1. Distributed buffer with custom conflict resolution
const { buffer: sharedBuffer } = await createDistributedStableBuffer({
distributed: { adapter, namespace: 'orders' },
initialState: { orderCount: 0, totalRevenue: 0, failedOrders: [] },
conflictResolution: DistributedConflictResolution.CUSTOM,
mergeStrategy: (local, remote) => ({
orderCount: Math.max(local.orderCount, remote.orderCount),
totalRevenue: Math.max(local.totalRevenue, remote.totalRevenue),
failedOrders: [...new Set([...local.failedOrders, ...remote.failedOrders])]
}),
syncOnTransaction: true
});
// 2. Distributed scheduler with leader election + circuit breaker
const setup = await createDistributedSchedulerConfig({
distributed: { adapter, namespace: 'orders' },
scheduler: { maxParallel: 10 },
enableLeaderElection: true,
circuitBreaker: { failureThresholdPercentage: 40, minimumRequests: 5, recoveryTimeoutMs: 30000 }
});
// 3. Job handler — lock, process, update shared state atomically
const orderHandler = async (job) => {
const lock = await coordinator.acquireLock({
resource: order:${job.orderId}:processing, ttlMs: 60000
});
try {
await processOrderWorkflow(job); // stableWorkflow under the hood
// Update shared buffer with distributed lock
await withDistributedBufferLock({ buffer: sharedBuffer, coordinator }, async () => {
await sharedBuffer.run(state => {
state.orderCount += 1;
state.totalRevenue += job.total;
});
});
// Publish completion for other services
await coordinator.publish('order-events', { type: 'COMPLETED', orderId: job.orderId });
} catch (error) {
// Atomic error handling via 2PC transaction
await coordinator.executeTransaction([
{ type: DistributedTransactionOperationType.SET, key: order:${job.orderId}:status, value: { status: 'FAILED' } },
{ type: DistributedTransactionOperationType.INCREMENT, key: 'metrics:failed-orders', delta: 1 }
]);
} finally {
await coordinator.releaseLock(lock.handle);
}
};
// 4. Start — only the leader processes jobs
const scheduler = new StableScheduler({ ...setup.config, sharedBuffer }, orderHandler);
const isLeader = await setup.waitForLeadership(30000);
if (isLeader) scheduler.start();
`
---
Config-driven runner that executes core module jobs from JSON/ESM configs and can use StableScheduler for scheduled jobs.
---
Set maximum execution time for functions to prevent indefinite hangs. Timeouts are enforced at multiple levels with proper inheritance.
You can also set a workflow/gateway-level maxTimeout to cap total execution time (applies to stableWorkflow, stableWorkflowGraph, and stableApiGateway).
#### Function-Level Timeout
Set timeout directly on a function:
`typescript
import { stableFunction } from '@emmvish/stable-infra';
const result = await stableFunction({
fn: async () => {
// Long-running operation
await processLargeDataset();
return 'success';
},
args: [],
returnResult: true,
executionTimeout: 5000, // 5 seconds max
attempts: 3,
});
if (!result.success && result.error?.includes('timeout')) {
console.log('Function timed out');
}
`
#### Gateway-Level Timeout
Apply timeout to all functions in a gateway:
`typescript
import { stableApiGateway, RequestOrFunction } from '@emmvish/stable-infra';
const results = await stableApiGateway(
[
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'task1',
functionOptions: {
fn: async () => await task1(),
args: [],
// No timeout specified - inherits from gateway
},
},
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'task2',
functionOptions: {
fn: async () => await task2(),
args: [],
executionTimeout: 10000, // Override gateway timeout
},
},
},
],
{
commonExecutionTimeout: 3000, // Default 3s for all functions
}
);
`
#### Request Group Timeout
Different timeouts for different groups:
`typescript`
const results = await stableApiGateway(
[
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'critical',
groupId: 'criticalOps',
functionOptions: { fn: criticalOp, args: [] },
},
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'background',
groupId: 'backgroundOps',
functionOptions: { fn: backgroundOp, args: [] },
},
},
],
{
requestGroups: [
{
id: 'criticalOps',
commonConfig: {
commonExecutionTimeout: 1000, // Strict 1s timeout
},
},
{
id: 'backgroundOps',
commonConfig: {
commonExecutionTimeout: 30000, // Lenient 30s timeout
},
},
],
}
);
#### Workflow Phase Timeout
Apply timeout at phase level in workflows:
`typescript
import { stableWorkflow } from '@emmvish/stable-infra';
const result = await stableWorkflow(
[
{
id: 'initialization',
functions: [
{
id: 'init',
functionOptions: {
fn: initializeSystem,
args: [],
},
},
],
commonConfig: {
commonExecutionTimeout: 5000, // 5s for initialization
},
},
{
id: 'processing',
functions: [
{
id: 'process',
functionOptions: {
fn: processData,
args: [],
},
},
],
commonConfig: {
commonExecutionTimeout: 30000, // 30s for processing
},
},
],
{
commonExecutionTimeout: 10000, // Default for phases without specific timeout
}
);
`
#### Timeout Precedence
Timeouts follow the configuration cascade pattern:
Function > Group > Phase/Branch > Gateway
- Function-level executionTimeout always winscommonExecutionTimeout
- If not set, inherits from request group's commonExecutionTimeout
- If not set, inherits from phase/branch's commonExecutionTimeout
- If not set, inherits from gateway's
- If not set, no timeout is applied
#### Timeout Behavior
- Timeout applies to entire function execution including all retry attempts
- When timeout is exceeded, function returns failed result with timeout error
- Timeout does NOT stop execution mid-flight (no AbortController)
- Metrics are still collected even when timeout occurs
- Use with retries: timeout encompasses all attempts, not per-attempt
`typescript
const result = await stableFunction({
fn: slowFunction,
args: [],
attempts: 5,
wait: 1000,
executionTimeout: 3000, // Total time for all 5 attempts
});
// If each attempt takes 800ms:
// - Attempt 1: 800ms
// - Attempt 2: starts at 1800ms (after 1s wait)
// - Attempt 3: would start at 3600ms → TIMEOUT at 3000ms
`
When a request or function fails and is retryable, retry with configurable backoff.
#### FIXED Strategy
Constant wait between retries.
`typescript
import { stableRequest, RETRY_STRATEGIES } from '@emmvish/stable-infra';
interface DataRequest {}
interface DataResponse { data: any; }
const result = await stableRequest
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 4,
wait: 500,
retryStrategy: RETRY_STRATEGIES.FIXED
// Retries at: 500ms, 1000ms, 1500ms
});
`
#### LINEAR Strategy
Wait increases linearly with attempt number.
`typescript`
const result = await stableRequest
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 4,
wait: 100,
retryStrategy: RETRY_STRATEGIES.LINEAR
// Retries at: 100ms, 200ms, 300ms (wait * attempt)
});
#### EXPONENTIAL Strategy
Wait increases exponentially; useful for heavily loaded services.
`typescript`
const result = await stableRequest
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 4,
wait: 100,
maxAllowedWait: 10000,
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL
// Retries at: 100ms, 200ms, 400ms (wait * 2^(attempt-1))
// Capped at maxAllowedWait
});
#### Jitter
Add random milliseconds to prevent synchronization.
`typescript`
const result = await stableRequest
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 3,
wait: 500,
jitter: 200, // Add 0-200ms randomness
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL
});
#### Perform All Attempts
Collect all outcomes instead of failing on first error.
`typescript`
const result = await stableRequest
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 3,
performAllAttempts: true
// All 3 attempts execute; check result.successfulAttempts
});
Prevent cascading failures by failing fast when a dependency becomes unhealthy.
`typescript
import { stableApiGateway, CircuitBreaker } from '@emmvish/stable-infra';
interface FlakyRequest {}
interface FlakyResponse { status: string; }
const breaker = new CircuitBreaker({
failureThresholdPercentage: 50,
minimumRequests: 10,
recoveryTimeoutMs: 30000,
successThresholdPercentage: 80,
halfOpenMaxRequests: 5
});
const requests = [
{ id: 'req-1', requestOptions: { reqData: { path: '/flaky' }, resReq: true } },
{ id: 'req-2', requestOptions: { reqData: { path: '/flaky' }, resReq: true } }
];
const responses = await stableApiGateway
circuitBreaker: breaker
});
// Circuit breaker states:
// CLOSED: Normal operation (accept all requests)
// OPEN: Too many failures; reject immediately
// HALF_OPEN: Testing recovery; allow limited requests
`
State Transitions:
- CLOSED → OPEN: Failure rate exceeds threshold after minimum requests
- OPEN → HALF_OPEN: Recovery timeout elapsed; attempt recovery
- HALF_OPEN → CLOSED: Success rate exceeds recovery threshold
- HALF_OPEN → OPEN: Success rate below recovery threshold; reopen
Cache responses to avoid redundant calls.
`typescript
import { stableRequest, CacheManager } from '@emmvish/stable-infra';
interface UserRequest {}
interface UserResponse {
id: number;
name: string;
email: string;
}
const cache = new CacheManager({
enabled: true,
ttl: 5000 // 5 seconds
});
// First call: cache miss, hits API
const result1 = await stableRequest
reqData: { hostname: 'api.example.com', path: '/user/1' },
resReq: true,
cache
});
// Second call within 5s: cache hit, returns cached response
const result2 = await stableRequest
reqData: { hostname: 'api.example.com', path: '/user/1' },
resReq: true,
cache
});
// Respects Cache-Control headers if enabled
const cache2 = new CacheManager({
enabled: true,
ttl: 60000,
respectCacheControl: true // Uses max-age, no-cache, no-store
});
`
Function Caching:
Arguments become cache key; identical args hit cache.
`typescript
import { stableFunction } from '@emmvish/stable-infra';
const expensive = (x: number) => x x x; // Cubic calculation
const result1 = await stableFunction({
fn: expensive,
args: [5],
returnResult: true,
cache: { enabled: true, ttl: 10000 }
});
const result2 = await stableFunction({
fn: expensive,
args: [5], // Same args → cache hit
returnResult: true,
cache: { enabled: true, ttl: 10000 }
});
`
Enforce max requests per time window.
`typescript
import { stableApiGateway } from '@emmvish/stable-infra';
interface ItemRequest {}
interface ItemResponse {
id: number;
data: any;
}
const requests = Array.from({ length: 20 }, (_, i) => ({
id: req-${i},/item/${i}
requestOptions: {
reqData: { path: },
resReq: true
}
}));
const responses = await stableApiGateway
concurrentExecution: true,
rateLimit: {
maxRequests: 5,
windowMs: 1000 // 5 requests per second
}
// Requests queued until window allows; prevents overwhelming API
});
`
Limit concurrent in-flight requests.
`typescript
import { stableApiGateway } from '@emmvish/stable-infra';
interface ItemRequest {}
interface ItemResponse {
id: number;
data: any;
}
const requests = Array.from({ length: 50 }, (_, i) => ({
id: req-${i},/item/${i}
requestOptions: {
reqData: { path: },
resReq: true,
attempts: 1
}
}));
const responses = await stableApiGateway
concurrentExecution: true,
maxConcurrentRequests: 5 // Only 5 requests in-flight at a time
// Others queued and executed as slots free
});
`
---
#### Sequential (Default)
Each phase waits for the previous to complete.
`typescript
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
requests: [{ id: 'r1', requestOptions: { reqData: { path: '/p1' }, resReq: true } }]
},
{
id: 'phase-2',
requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
},
{
id: 'phase-3',
requests: [{ id: 'r3', requestOptions: { reqData: { path: '/p3' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'sequential-phases',
concurrentPhaseExecution: false // Phase-1 → Phase-2 → Phase-3
});
`
#### Concurrent Phases
Multiple phases run in parallel.
`typescript
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'fetch-users',
requests: [{ id: 'get-users', requestOptions: { reqData: { path: '/users' }, resReq: true } }]
},
{
id: 'fetch-posts',
requests: [{ id: 'get-posts', requestOptions: { reqData: { path: '/posts' }, resReq: true } }]
},
{
id: 'fetch-comments',
requests: [{ id: 'get-comments', requestOptions: { reqData: { path: '/comments' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'parallel-phases',
concurrentPhaseExecution: true // All 3 phases in parallel
});
`
#### Mixed Phases
Combine sequential and concurrent phases in one workflow.
`typescript
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'init', // Sequential
requests: [{ id: 'setup', requestOptions: { reqData: { path: '/init' }, resReq: true } }]
},
{
id: 'fetch-a',
markConcurrentPhase: true, // Concurrent with next
requests: [{ id: 'data-a', requestOptions: { reqData: { path: '/a' }, resReq: true } }]
},
{
id: 'fetch-b',
markConcurrentPhase: true, // Concurrent with fetch-a
requests: [{ id: 'data-b', requestOptions: { reqData: { path: '/b' }, resReq: true } }]
},
{
id: 'finalize', // Sequential after fetch-a/b complete
requests: [{ id: 'done', requestOptions: { reqData: { path: '/finalize' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
concurrentPhaseExecution: false // Respects markConcurrentPhase per phase
});
`
Use decision hooks to dynamically control phase flow.
#### CONTINUE
Standard flow to next sequential phase.
`typescript
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'check-status',
requests: [{ id: 'api', requestOptions: { reqData: { path: '/status' }, resReq: true } }],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
{
id: 'process', // Executes after check-status
requests: [{ id: 'process-data', requestOptions: { reqData: { path: '/process' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});
`
#### SKIP
Skip the next phase; execute the one after.
`typescript
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
requests: [{ id: 'r1', requestOptions: { reqData: { path: '/p1' }, resReq: true } }],
phaseDecisionHook: async () => ({
action: PHASE_DECISION_ACTIONS.SKIP
})
},
{
id: 'phase-2', // Skipped
requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
},
{
id: 'phase-3', // Executes
requests: [{ id: 'r3', requestOptions: { reqData: { path: '/p3' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});
// Execution: phase-1 → phase-3
`
#### JUMP
Jump to a specific phase by ID.
`typescript
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
requests: [{ id: 'r1', requestOptions: { reqData: { path: '/p1' }, resReq: true } }],
phaseDecisionHook: async () => ({
action: PHASE_DECISION_ACTIONS.JUMP,
targetPhaseId: 'recovery'
})
},
{
id: 'phase-2', // Skipped
requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
},
{
id: 'recovery',
requests: [{ id: 'recover', requestOptions: { reqData: { path: '/recovery' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});
// Execution: phase-1 → recovery
`
#### REPLAY
Re-execute current phase; useful for polling.
`typescript
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'wait-for-job',
allowReplay: true,
maxReplayCount: 5,
requests: [
{
id: 'check-job',
requestOptions: { reqData: { path: '/job/status' }, resReq: true, attempts: 1 }
}
],
phaseDecisionHook: async ({ phaseResult, executionHistory }) => {
const lastResponse = phaseResult.responses?.[0];
if ((lastResponse as any)?.data?.status === 'pending' && executionHistory.length < 5) {
return { action: PHASE_DECISION_ACTIONS.REPLAY };
}
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
{
id: 'process-result',
requests: [{ id: 'process', requestOptions: { reqData: { path: '/process' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true,
maxWorkflowIterations: 100
});
// Polls up to 5 times before continuing
`
#### TERMINATE
Stop workflow early.
`typescript
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'validate',
requests: [{ id: 'validate-input', requestOptions: { reqData: { path: '/validate' }, resReq: true } }],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
if (!phaseResult.success) {
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
}
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
{
id: 'phase-2', // Won't execute if validation fails
requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});
console.log(result.terminatedEarly); // true if TERMINATE triggered
`
Execute multiple independent branches with shared state.
`typescript
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_BRANCH } from '@emmvish/stable-infra';
const branches: STABLE_WORKFLOW_BRANCH[] = [
{
id: 'branch-payment',
phases: [
{
id: 'process-payment',
requests: [
{
id: 'charge-card',
requestOptions: {
reqData: { path: '/payment/charge' },
resReq: true
}
}
]
}
]
},
{
id: 'branch-notification',
phases: [
{
id: 'send-email',
requests: [
{
id: 'send',
requestOptions: {
reqData: { path: '/notify/email' },
resReq: false
}
}
]
}
]
}
];
const result = await stableWorkflow([], {
workflowId: 'checkout',
enableBranchExecution: true,
branches,
sharedBuffer: { orderId: '12345' },
markConcurrentBranch: true // Branches run in parallel
});
// Both branches access/modify sharedBuffer
`
#### Branch Racing
When multiple branches execute concurrently, enable racing to accept the first successful branch and cancel others.
`typescript
const result = await stableWorkflow([], {
workflowId: 'payment-racing',
enableBranchExecution: true,
enableBranchRacing: true, // First successful branch wins
branches: [
{
id: 'payment-provider-a',
phases: [/ ... /]
},
{
id: 'payment-provider-b',
phases: [/ ... /]
}
],
markConcurrentBranch: true
});
// Only winning branch's execution history recorded
// Losing branches marked as cancelled
`
Key responsibilities:
- Define phases as DAG nodes with explicit dependency edges
- Execute independent phases in parallel automatically
- Support parallel groups, merge points, and conditional routing
- Validate graph structure (cycle detection, reachability, orphan detection)
- Provide deterministic execution order
- Offer higher parallelism than phased workflows for complex topologies
For complex topologies with explicit dependencies, use DAG execution mixing requests and functions.
`typescript
import { stableWorkflowGraph, WorkflowGraphBuilder, RequestOrFunction } from '@emmvish/stable-infra';
import type { API_GATEWAY_ITEM } from '@emmvish/stable-infra';
// Request types
interface PostsRequest {}
interface PostsResponse { posts: Array<{ id: number; title: string }> };
interface UsersRequest {}
interface UsersResponse { users: Array<{ id: number; name: string }> };
// Function types
type AggregateArgs = [PostsResponse, UsersResponse];
type AggregateResult = {
combined: Array<{ userId: number; userName: string; postCount: number }>;
};
type AnalyzeArgs = [AggregateResult];
type AnalyzeResult = { totalPosts: number; activeUsers: number };
const graph = new WorkflowGraphBuilder<
PostsRequest | UsersRequest,
PostsResponse | UsersResponse,
AggregateArgs | AnalyzeArgs,
AggregateResult | AnalyzeResult
>()
.addPhase('fetch-posts', {
requests: [{
id: 'get-posts',
requestOptions: {
reqData: { path: '/posts' },
resReq: true
}
}]
})
.addPhase('fetch-users', {
requests: [{
id: 'get-users',
requestOptions: {
reqData: { path: '/users' },
resReq: true
}
}]
})
.addParallelGroup('fetch-all', ['fetch-posts', 'fetch-users'])
.addPhase('aggregate', {
functions: [{
id: 'combine-data',
functionOptions: {
fn: (posts: PostsResponse, users: UsersResponse): AggregateResult => ({
combined: users.users.map(user => ({
userId: user.id,
userName: user.name,
postCount: posts.posts.filter(p => p.id === user.id).length
}))
}),
args: [{ posts: [] }, { users: [] }] as AggregateArgs,
returnResult: true
}
}]
})
.addPhase('analyze', {
functions: [{
id: 'analyze-data',
functionOptions: {
fn: (aggregated: AggregateResult): AnalyzeResult => ({
totalPosts: aggregated.combined.reduce((sum, u) => sum + u.postCount, 0),
activeUsers: aggregated.combined.filter(u => u.postCount > 0).length
}),
args: [{ combined: [] }] as AnalyzeArgs,
returnResult: true
}
}]
})
.addMergePoint('sync', ['fetch-all'])
.connectSequence('fetch-all', 'sync', 'aggregate', 'analyze')
.setEntryPoint('fetch-all')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'data-aggregation'
});
console.log(Graph workflow success: ${result.success});`
Execute multiple phases concurrently within a group.
`typescript
import { stableWorkflowGraph, WorkflowGraphBuilder } from '@emmvish/stable-infra';
const graph = new WorkflowGraphBuilder()
.addPhase('fetch-users', {
requests: [{
id: 'users',
requestOptions: { reqData: { path: '/users' }, resReq: true }
}]
})
.addPhase('fetch-posts', {
requests: [{
id: 'posts',
requestOptions: { reqData: { path: '/posts' }, resReq: true }
}]
})
.addPhase('fetch-comments', {
requests: [{
id: 'comments',
requestOptions: { reqData: { path: '/comments' }, resReq: true }
}]
})
.addParallelGroup('data-fetch', ['fetch-users', 'fetch-posts', 'fetch-comments'])
.setEntryPoint('data-fetch')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'data-aggregation'
});
// All 3 phases run concurrently
`
Synchronize multiple predecessor phases.
`typescript
const graph = new WorkflowGraphBuilder()
.addPhase('fetch-a', {
requests: [{ id: 'a', requestOptions: { reqData: { path: '/a' }, resReq: true } }]
})
.addPhase('fetch-b', {
requests: [{ id: 'b', requestOptions: { reqData: { path: '/b' }, resReq: true } }]
})
.addMergePoint('sync', ['fetch-a', 'fetch-b'])
.addPhase('aggregate', {
functions: [{
id: 'combine',
functionOptions: {
fn: () => 'combined',
args: [],
returnResult: true
}
}]
})
.connectSequence('fetch-a', 'sync')
.connectSequence('fetch-b', 'sync')
.connectSequence('sync', 'aggregate')
.setEntryPoint('fetch-a')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'parallel-sync'
});
// fetch-a and fetch-b run in parallel
// aggregate waits for both to complete
`
Convenience function for sequential phase chains.
`typescript
import { createLinearWorkflowGraph } from '@emmvish/stable-infra';
const phases = [
{
id: 'init',
requests: [{ id: 'setup', requestOptions: { reqData: { path: '/init' }, resReq: true } }]
},
{
id: 'process',
requests: [{ id: 'do-work', requestOptions: { reqData: { path: '/work' }, resReq: true } }]
},
{
id: 'finalize',
requests: [{ id: 'cleanup', requestOptions: { reqData: { path: '/cleanup' }, resReq: true } }]
}
];
const graph = createLinearWorkflowGraph(phases);
const result = await stableWorkflowGraph(graph, {
workflowId: 'linear-workflow'
});
`
Enable branch racing in workflow graphs to accept the first successful branch node when multiple branches are executed in parallel.
`typescript
import { stableWorkflowGraph, WorkflowGraphBuilder } from '@emmvish/stable-infra';
const branch1 = {
id: 'provider-a',
phases: [{ / ... / }]
};
const branch2 = {
id: 'provider-b',
phases: [{ / ... / }]
};
const graph = new WorkflowGraphBuilder()
.addBranch('provider-a', branch1)
.addBranch('provider-b', branch2)
.addParallelGroup('race', ['provider-a', 'provider-b'])
.setEntryPoint('race')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'provider-racing',
enableBranchRacing: true // First successful branch wins
});
// Only winning branch's results recorded
// Losing branch marked as cancelled
`
---
Define defaults globally; override at group, phase, branch, or item level.
`typescript
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
attempts: 5, // Override global attempts for this phase
wait: 1000,
requests: [
{
id: 'req-1',
requestOptions: {
reqData: { path: '/data' },
resReq: true,
attempts: 2 // Override phase attempts for this item
}
}
]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'cascade-demo',
commonAttempts: 1, // Global default
commonWait: 500,
retryStrategy: 'LINEAR' // Global default
// Final config per item: merge common → phase → request
});
`
Hierarchy: global → group → phase → branch → item. Lower levels override.
Pass mutable state across phases, branches, and items. For concurrency-safe shared state, pass a StableBuffer instance instead of a plain object.
#### Shared Buffer (Workflow/Gateway)
`typescript
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'fetch',
requests: [
{
id: 'user-data',
requestOptions: {
reqData: { path: '/users/1' },
resReq: true,
handleSuccessfulAttemptData: ({ successfulAttemptData, stableRequestOptions }) => {
// Mutate shared buffer
const sharedBuffer = (stableRequestOptions as any).sharedBuffer;
sharedBuffer.userId = (successfulAttemptData.data as any).id;
}
}
}
]
},
{
id: 'use-shared-data',
requests: [
{
id: 'dependent-call',
requestOptions: {
reqData: { path: '/user-posts' },
resReq: true,
preExecution: {
preExecutionHook: async ({ stableRequestOptions, commonBuffer }) => {
const sharedBuffer = (stableRequestOptions as any).sharedBuffer;
console.log(Using userId: ${sharedBuffer.userId});
}
}
}
}
]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'shared-state-demo',
sharedBuffer: {} // Mutable across phases
});
`
#### Common Buffer (Request Level)
`typescript
import { stableRequest, PersistenceStage } from '@emmvish/stable-infra';
const commonBuffer = { transactionId: null };
const result = await stableRequest({
reqData: { path: '/transaction/start' },
resReq: true,
commonBuffer,
preExecution: {
preExecutionHook: async ({ commonBuffer, stableRequestOptions }) => {
// commonBuffer writable here
commonBuffer.userId = '123';
}
},
handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
// commonBuffer readable in handlers
console.log(Transaction for user ${commonBuffer.userId} done);`
}
});
---
Modify config or state before execution.
`typescript
import { stableRequest } from '@emmvish/stable-infra';
interface SecureRequest {}
interface SecureResponse {
data: any;
token?: string;
}
const result = await stableRequest
reqData: { path: '/secure-data' },
resReq: true,
preExecution: {
preExecutionHook: async ({ inputParams, commonBuffer, stableRequestOptions }) => {
// Dynamically fetch auth token
const token = await getAuthToken();
// Return partial config override
return {
reqData: {
headers: { Authorization: Bearer ${token} }`
}
};
},
preExecutionHookParams: { context: 'auth-fetch' },
applyPreExecutionConfigOverride: true,
continueOnPreExecutionHookFailure: false
}
});
Validate responses and errors.
#### Response Analyzer
`typescript
import { stableRequest } from '@emmvish/stable-infra';
interface ResourceRequest {}
interface ApiResponse {
id: number;
status: 'active' | 'inactive';
}
const result = await stableRequest
reqData: { path: '/resource' },
resReq: true,
responseAnalyzer: ({ data, reqData, trialMode }) => {
// Return true to accept, false to retry
if (!data || typeof data !== 'object') return false;
if (!('id' in data)) return false;
if ((data as any).status !== 'active') return false;
return true;
}
});
`
#### Error Analyzer
Decide whether to suppress error gracefully.
`typescript
import { stableRequest } from '@emmvish/stable-infra';
interface FeatureRequest {}
interface FeatureResponse {
enabled: boolean;
data?: any;
}
const result = await stableRequest
reqData: { path: '/optional-feature' },
resReq: true,
finalErrorAnalyzer: ({ error, reqData, trialMode }) => {
// Return true to suppress error and return failure result
// Return false to throw error
if (error.code === 'ECONNREFUSED') {
console.warn('Service unavailable, continuing with fallback');
return true; // Suppress, don't throw
}
return false; // Throw
}
});
if (result.success) {
console.log('Got data:', result.data);
} else {
console.log('Service offline, but we continue');
}
`
Custom logging and processing.
#### Success Handler
`typescript
import { stableRequest } from '@emmvish/stable-infra';
interface DataRequest {}
interface DataResponse {
id: number;
value: string;
}
const result = await stableRequest
reqData: { path: '/data' },
resReq: true,
logAllSuccessfulAttempts: true,
handleSuccessfulAttemptData: ({
successfulAttemptData,
reqData,
maxSerializableChars,
executionContext
}) => {
// Custom logging, metrics, state updates
console.log(
Success in context ${executionContext.workflowId},data:
,`
successfulAttemptData.data
);
}
});
#### Error Handler
`typescriptError in ${executionContext.workflowId}:
const result = await stableRequest
reqData: { path: '/data' },
resReq: true,
logAllErrors: true,
handleErrors: ({ errorLog, reqData, executionContext }) => {
// Custom error logging, alerting, retry logic
console.error(
,Retryable: ${errorLog.isRetryable}
errorLog.errorMessage,
`
);
}
});
#### Phase Handlers (Workflow)
`typescript
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
requests: [{ id: 'r1', requestOptions: { reqData: { path: '/data' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'wf-handlers',
handlePhaseCompletion: ({ phaseResult, workflowId }) => {
console.log(Phase ${phaseResult.phaseId} complete in ${workflowId});Phase ${phaseResult.phaseId} failed:
},
handlePhaseError: ({ phaseResult, error, workflowId }) => {
console.error(, error);Phase decision: ${decision.action}
},
handlePhaseDecision: ({ decision, phaseResult }) => {
console.log();`
}
});
Dynamically determine workflow flow.
`typescript
import { stableWorkflow, PHASE_DECISION_ACTIONS } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'fetch-data',
requests: [{ id: 'api', requestOptions: { reqData: { path: '/data' }, resReq: true } }],
phaseDecisionHook: async ({ phaseResult, sharedBuffer, executionHistory }) => {
if (!phaseResult.success) {
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
}
if (phaseResult.responses[0].data?.needsRetry) {
return { action: PHASE_DECISION_ACTIONS.REPLAY };
}
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});
`
Automatic metrics collection across all execution modes.
#### Request Metrics
`typescript
import { stableRequest } from '@emmvish/stable-infra';
interface DataRequest {}
interface DataResponse { data: any; }
const result = await stableRequest
reqData: { path: '/data' },
resReq: true,
attempts: 3
});
console.log(result.metrics); // {
// totalAttempts: 2,
// successfulAttempts: 1,
// failedAttempts: 1,
// totalExecutionTime: 450,
// averageAttemptTime: 225,
// infrastructureMetrics: {
// circuitBreaker: { / state, stats, config / },
// cache: { / hits, misses, size / },
// rateLimiter: { / limit, current rate / },
// concurrencyLimiter: { / limit, in-flight / }
// },
// validation: {
// isValid: true,
// anomalies: [],
// validatedAt: '2026-01-20T...'
// }
// }
`
#### API Gateway Metrics
`typescript
import { stableApiGateway } from '@emmvish/stable-infra';
import type { API_GATEWAY_REQUEST } from '@emmvish/stable-infra';
interface ApiRequest {}
interface ApiResponse { data: any; }
const requests: API_GATEWAY_REQUEST
{ id: 'req-1', requestOptions: { reqData: { path: '/data/1' }, resReq: true } },
{ id: 'req-2', requestOptions: { reqData: { path: '/data/2' }, resReq: true } },
{ id: 'req-3', requestOptions: { reqData: { path: '/data/3' }, resReq: true } }
];
const result = await stableApiGateway
concurrentExecution: true,
maxConcurrentRequests: 5
});
console.log(result.metrics); // {
// totalRequests: 3,
// successfulRequests: 3,
// failedRequests: 0,
// successRate: 100,
// failureRate: 0,
// executionTime: 450, // Total execution time in ms
// timestamp: '2026-01-20T...', // ISO 8601 completion timestamp
// throughput: 6.67, // Requests per second
// averageRequestDuration: 150, // Average time per request in ms
// requestGroups: [/ per-group stats /],
// infrastructureMetrics: {
// circuitBreaker: { / state, stats, config / },
// cache: { / hit rate, size, utilization / },
// rateLimiter: { / throttle rate, queue length / },
// concurrencyLimiter: { / utilization, queue / }
// },
// validation: {
// isValid: true,
// anomalies: [],
// validatedAt: '2026-01-20T...'
// }
// }
`
#### Workflow Metrics
`typescript
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{ id: 'p1', requests: [{ id: 'r1', requestOptions: { reqData: { path: '/a' }, resReq: true } }] },
{ id: 'p2', requests: [{ id: 'r2', requestOptions: { reqData: { path: '/b' }, resReq: true } }] }
];
const result = await stableWorkflow(phases, {
workflowId: 'wf-metrics'
});
console.log(result); // {
// workflowId: 'wf-metrics',
// success: true,
// totalPhases: 2,
// completedPhases: 2,
// totalRequests: 2,
// successfulRequests: 2,
// failedRequests: 0,
// workflowExecutionTime: 1200,
// phases: [
// { phaseId: 'p1', success: true, responses: [...], validation: {...}, ... },
// { phaseId: 'p2', success: true, responses: [...], validation: {...}, ... }
// ],
// validation: {
// isValid: true,
// anomalies: [],
// validatedAt: '2026-01-20T...'
// }
// }
`
#### Structured Error Logs
`typescript
const result = await stableRequest
reqData: { path: '/flaky' },
resReq: true,
attempts: 3,
logAllErrors: true,
handleErrors: ({ errorLog }) => {
console.log(errorLog); // {
// attempt: '1/3',
// type: 'NetworkError',
// error: 'ECONNREFUSED',
// isRetryable: true,
// timestamp: 1234567890
// }
}
});
if (result.errorLogs) {
console.log(${result.errorLogs.length} errors logged);`
}
---
Dry-run workflows without side effects; simulate failures.
`typescript
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'process',
requests: [
{
id: 'api-call',
requestOptions: {
reqData: { path: '/payment/charge' },
resReq: true,
trialMode: {
enabled: true,
requestFailureProbability: 0.3 // 30% simulated failure rate
}
}
}
]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'payment-trial',
trialMode: {
enabled: true,
functionFailureProbability: 0.2
}
});
// Requests/functions execute but failures are simulated
// Real API calls happen; real side effects occur only if enabled
// Useful for testing retry logic, decision hooks, workflow topology
`
Persist state across retry attempts for distributed tracing.
The persistenceFunction receives a persistenceStage parameter (PersistenceStage.BEFORE_HOOK or PersistenceStage.AFTER_HOOK) to indicate when it is called.
`typescript
import { stableRequest, PersistenceStage } from '@emmvish/stable-infra';
interface DataRequest {}
interface DataResponse { data: any; }
const result = await stableRequest
reqData: { path: '/data' },
resReq: true,
attempts: 3,
statePersistence: {
persistenceFunction: async ({ executionContext, buffer, params, persistenceStage }) => {
const key = ${executionContext.workflowId}:${executionContext.requestId}`;
if (persistenceStage === PersistenceStage.BEFORE_HOOK || params?.operation === 'load') {
// Load state for recovery
return await loadFromDatabase(key);
}
// Save state to database or distributed cache
await saveToDatabase({ key, state: buffer });
return buffer;
},
persist