Framework-agnostic pipeline orchestration primitives for WPKernel
> A type-safe, dependency-aware workflow engine for orchestrating complex generation tasks.
@wpkernel/pipeline is a generic orchestration engine that turns sets of decoupled "helpers" into deterministic, topologically sorted execution plans.
While it powers WPKernel's code generation (assembling fragments into artifacts), the core is completely agnostic. You can use it to build:
- ETL Pipelines: Extract, Transform, and Load stages with shared state.
- Build Systems: Compile, Bundle, and Minify steps with precise ordering.
- Code Generators: The standard "Fragment → Builder" pattern.
It guarantees:
- Deterministic Ordering: Topologically sorts helpers based on dependsOn.
- Cycle Detection: Fails fast (halts execution) if dependencies form a loop.
- Robust Rollbacks: Extensions and helpers provide best-effort rollback hooks run LIFO, attempting all cleanup steps and reporting any rollback failures.
- Type Safety: Full TypeScript support for custom contexts, options, and artifacts.
The package exports a single entry point @wpkernel/pipeline which provides the "Standard Pipeline" (Fragments & Builders). This is the recommended API for most consumers.
Under the hood, the package is split into:
1. Standard Pipeline (src/standard-pipeline): The opinionated implementation used by WPKernel.
2. Core Runner (src/core/runner): A purely agnostic DAG execution engine.
Subpath imports (e.g., @wpkernel/pipeline/core) are available if you need to build a completely custom pipeline architecture using the runner primitives directly.
``bash`
pnpm add @wpkernel/pipeline
The package ships pure TypeScript and has no runtime dependencies.
#### Standard Pipeline (Recommended)
Use createPipeline for the standard Fragment → Builder workflow used by WPKernel.
`ts
import { createPipeline } from '@wpkernel/pipeline';
const pipeline = createPipeline({
// Configuration
createContext: (ops) => ({ db: ops.db }),
createBuildOptions: () => ({}),
createFragmentState: () => ({}),
// Argument resolvers
createFragmentArgs: ({ context }) => ({ db: context.db }),
createBuilderArgs: ({ artifact }) => ({ artifact }),
});
`
#### Custom Pipeline (Advanced)
For completely custom architectures (ETL, migrations, etc.), use makePipeline to define your own stages.
`ts
import { makePipeline } from '@wpkernel/pipeline';
const pipeline = makePipeline({
// Define the "Stages" of your pipeline
helperKinds: ['extract', 'transform', 'load'] as const,
createStages: (deps) => [
deps.makeLifecycleStage('extract'),
deps.makeLifecycleStage('transform'),
deps.makeLifecycleStage('load'),
deps.commitStage,
deps.finalizeResult,
],
createContext: (ops) => ({ db: ops.db }),
// ... logic for resolving args for your helpers ...
});
`
Helpers are the atomic units of work. They can be anything - functions, objects, or complex services.
`ts
// "Extract" helper
pipeline.use({
kind: 'extract',
key: 'users',
apply: async ({ context }) => {
return context.db.query('SELECT * FROM users');
},
});
// "Transform" helper (depends on generic extract logic)
pipeline.use({
kind: 'transform',
key: 'clean-users',
dependsOn: ['users'],
apply: ({ input }) => {
return input.map((u) => ({ ...u, name: u.name.trim() }));
},
});
`
The pipeline resolves the graph, executes the content, and manages the lifecycle.
`ts`
const result = await pipeline.run({ db: myDatabase });
You are not limited to fixed roles. Define any kind of helper (e.g., 'validator', 'compiler', 'notifier') and map them to execution stages.
Pipeline creates a dependency graph for _each_ kind of helper. If Helper B depends on Helper A, the runner ensures A executes before B (and passes A's output to B if configured).
Extensions wrap execution with hooks at specific lifecycle stages.
Standard Pipeline Lifecycles:
prepare → before-fragments → after-fragments → before-builders → after-builders → finalize
> Note: Custom pipelines (using makePipeline) can define arbitrary lifecycle stages. Extensions can hook into any stage, standard or custom, as long as it exists in the pipeline's execution plan.
Validation: The pipeline validates extension registrations. If an extension attempts to hook into an unscheduled lifecycle, the pipeline will log a warning instead of silently ignoring it.
Extension Registration (Sync & Async): extensions.use() returns MaybePromise. It returns a Promise only if the extension's register method is asynchronous.
`ts
// Sync registration (e.g. simple helper bundles)
extensions.use(mySyncExtension);
// Async registration (e.g. database connections)
await extensions.use(myAsyncExtension);
`
> Recommendation: We recommend awaiting registration when possible for consistency, but you may omit it if you are certain the extension initializes synchronously. pipeline.run() will automatically wait for any pending async registrations.
The pipeline supports robust rollback for both helper application and extension lifecycle commit phases:
- Extensions: Can provide transactional overhead via the commit phase. If extensive failure occurs, rollback hooks are triggered.rollback
- Helpers: Can return a function in their result. These are executed LIFO if a later failure occurs.
- Robustness: The rollback stack continues execution even if individual rollback actions fail (errors are collected and reported).
Diagnostics are per-run. Calling pipeline.run()` automatically clears any previous runtime diagnostics to ensure a fresh state. Static diagnostics (e.g., registration conflicts) are preserved and re-emitted for each run.
- Architecture Guide: Deep dive into the runner's internals and DAG resolution.
- API Reference: Generated TSDoc for all interfaces.
EUPL-1.2 © The Geekist