Type-safe, structured communication between worker threads and parent processes via TypeScript meta-programming.
npm install @sanity/worker-channels    
> Type-safe, structured communication between worker threads and parent processes via TypeScript meta-programming.
- Motivation
- Installation
- Quick Start
- 1. Define your channel contract
- 2. Report events in your worker thread
- 3. Await events in your parent process
- Usage Examples
- Node.js Workers
- Web Workers
- EventEmitter
- EventTarget
- API Reference
- WorkerChannel.Definition
- WorkerChannelReporter
- WorkerChannelReceiver
- Important Caveats
- Events are one-time only
- Streams must be ended
- Control flow must match between worker and parent
- LICENSE
Worker communication often becomes messy when different types of messages flow through the same channel. Worker channels provide type-safe, structured communication with clear contracts defined entirely in TypeScript.
| Traditional Approach ❌ | Worker Channels ✅ |
|---|---|
`` import {parentPort, workerData} from 'node:worker_threads' const imageFiles = workerData parentPort.postMessage({ let processed = 0 try { parentPort.postMessage({ | ` const report = WorkerChannelReporter.from report.event.started({expected: imageFiles.length}) let processed = 0 try { report.event.finished({processed}) |
` const worker = new Worker(/ ... /) await new Promise((resolve, reject) => { // don't forget to propagate errors | ` const worker = new Worker(/ ... /) const receiver = WorkerChannelReceiver.from // BONUS: the receiver automatically propagates errors for await (const progress of receiver.stream.progress()) { const {processed} = await receiver.event.finished() receiver.unsubscribe() // clean up after the worker |
> [!IMPORTANT]
> The channel contract exists solely in TypeScript types and is shared between worker and parent processes.
This library uses Proxies to dynamically intercept property access and route messages to the correct handlers, providing compile-time safety with minimal runtime overhead while keeping worker and parent code cleanly isolated.
`ts
import {type WorkerChannel} from '@sanity/worker-channels'
// both the child and parent can import this with a type import
export type ImageChannel = WorkerChannel.Definition<{
started: WorkerChannel.Event<{expected: number}>
progress: WorkerChannel.Stream<{file: ImageFile; completed: number}>
finished: WorkerChannel.Event<{processed: number}>
}>
`
`bash`
npm install @sanity/worker-channels
`ts
// types.ts - shared between worker and parent
import {type WorkerChannel} from '@sanity/worker-channels'
export type BuildChannel = WorkerChannel.Definition<{
buildStart: WorkerChannel.Event<{target: string}>
progress: WorkerChannel.Stream<{file: string; percent: number}>
buildComplete: WorkerChannel.Event<{duration: number; files: string[]}>
}>
`
`ts
// worker.ts
import {parentPort} from 'node:worker_threads'
import {WorkerChannelReporter} from '@sanity/worker-channels'
import type {BuildChannel} from './types'
const report = WorkerChannelReporter.from
// Signal build started
report.event.buildStart({target: 'production'})
// Stream progress updates
const files = ['app.js', 'styles.css', 'index.html']
for (let i = 0; i < files.length; i++) {
report.stream.progress.emit({
file: files[i],
percent: Math.round(((i + 1) / files.length) * 100),
})
await new Promise((resolve) => setTimeout(resolve, 1000)) // Simulate work
}
report.stream.progress.end() // Important: end the stream
// Signal completion
report.event.buildComplete({duration: 3000, files})
`
`ts
// main.ts
import {Worker} from 'node:worker_threads'
import {WorkerChannelReceiver} from '@sanity/worker-channels'
import type {BuildChannel} from './types'
const worker = new Worker('./worker.js')
const receiver = WorkerChannelReceiver.from
// Wait for build to start
const {target} = await receiver.event.buildStart()
console.log(Build started for ${target})
// Monitor progress stream
for await (const {file, percent} of receiver.stream.progress()) {
console.log(${file}: ${percent}%)
}
// Wait for completion
const {duration, files} = await receiver.event.buildComplete()
console.log(Build completed in ${duration}ms, ${files.length} files)
receiver.unsubscribe() // Clean up
`
`ts
import {Worker} from 'node:worker_threads'
import {parentPort} from 'node:worker_threads'
// Worker thread
const report = WorkerChannelReporter.from
// Parent process
const worker = new Worker('./worker.js')
const receiver = WorkerChannelReceiver.from
`
`ts
// In web worker
const report = WorkerChannelReporter.from
// In main thread
const worker = new Worker('./worker.js')
const receiver = WorkerChannelReceiver.from
`
Useful for asynchronously reporting progress within the same thread in Node.js.
`ts
import {EventEmitter} from 'node:events'
const emitter = new EventEmitter()
const reporter = WorkerChannelReporter.from
const receiver = WorkerChannelReceiver.from
`
Similarly, EventTarget is also supported and useful for asynchronously reporting progress within the same thread in non-Node.js environments.
`ts`
const target = new EventTarget()
const reporter = WorkerChannelReporter.from
const receiver = WorkerChannelReceiver.from
`ts`
type MyChannel = WorkerChannel.Definition<{
eventName: WorkerChannel.Event
streamName: WorkerChannel.Stream
}>
Reports events and streams from worker to parent process.
#### Creation
Static factory (recommended):
`ts`
// Automatically detects the interface type
const report = WorkerChannelReporter.from
Constructor:
`ts`
// For custom message posting logic
const report = new WorkerChannelReporter
parentPort.postMessage(message)
})
#### Usage
`ts
// Events (one-time only)
report.event.eventName(payload)
// Streams (multiple emissions + end)
report.stream.streamName.emit(payload)
report.stream.streamName.emit(anotherPayload)
report.stream.streamName.end()
`
Receives events and streams from worker in the parent process.
#### Creation
Static factory (recommended):
`ts`
// Automatically detects the interface type and sets up listeners
const receiver = WorkerChannelReceiver.from
Constructor:
`ts
// For custom subscription logic
const receiver = new WorkerChannelReceiver
worker.addListener('message', subscriber.next)
worker.addListener('error', subscriber.error)
// Return cleanup function
return () => {
worker.removeListener('message', subscriber.next)
worker.removeListener('error', subscriber.error)
}
})
`
#### Usage
`ts
// Events (returns Promise that resolves once)
const payload = await receiver.event.eventName()
// Streams (returns AsyncIterable for multiple values)
for await (const payload of receiver.stream.streamName()) {
console.log('Received:', payload)
}
// Cleanup (important for static factory instances)
receiver.unsubscribe()
`
#### Error Propagation
When an error occurs in the worker thread, it automatically propagates to any awaiting event handlers or stream iterators in the parent process:
`ts`
// If the worker throws an error...
// worker.ts
throw new Error('Something went wrong in worker')
`ts
// ...it will reject awaiting promises in the parent
// main.ts
try {
const result = await receiver.event.completed() // ← Will reject with worker error
} catch (error) {
console.error('Worker failed:', error.message) // "Something went wrong in worker"
}
// ...and cause stream iteration to throw
try {
for await (const progress of receiver.stream.progress()) {
// ← Will throw worker error
console.log(progress)
}
} catch (error) {
console.error('Stream failed:', error.message) // "Something went wrong in worker"
}
`
This eliminates the need for manual error handling patterns like worker.on('error', ...) - errors are automatically propagated to the appropriate awaiting code.
`ts
// ❌ This will throw an error
reporter.event.buildComplete({duration: 1000})
reporter.event.buildComplete({duration: 2000}) // Error: already reported
// ✅ Use streams for multiple values instead
reporter.stream.status.emit('processing')
reporter.stream.status.emit('finalizing')
reporter.stream.status.end()
`
> [!NOTE]
> This design prevents bugs where the same event fires multiple times unexpectedly. It makes the contract explicit: use events for singular occurrences, streams for continuous data.
`ts
// ❌ Stream never ends - receiver will wait forever
reporter.stream.progress.emit(50)
reporter.stream.progress.emit(100)
// Missing: reporter.stream.progress.end()
// ✅ Always end streams
reporter.stream.progress.emit(50)
reporter.stream.progress.emit(100)
reporter.stream.progress.end() // Signals completion
`
The parent's control flow should mirror the worker's control flow. If a condition prevents the worker from reporting an event/stream that the parent is awaiting, the parent will hang indefinitely:
`ts
// ❌ Worker may conditionally skip events
// worker.ts
if (shouldProcess) {
report.event.started({count: files.length})
// ... processing
report.event.finished({success: true})
}
// If shouldProcess is false, no events are sent
// ❌ Parent unconditionally awaits - will hang if worker skips
// main.ts
await receiver.event.started() // ← Will hang forever if shouldProcess is false
// ...
await receiver.event.finished()
`
`ts
// ✅ Match the conditional logic or use different events
// worker.ts
if (shouldProcess) {
report.event.started({count: files.length})
// ... processing
report.event.finished({success: true})
} else {
report.event.skipped({reason: 'No processing needed'})
}
// ✅ Parent handles both cases
// main.ts
const startResult = await Promise.race([
receiver.event.started().then((data) => ({type: 'started', data})),
receiver.event.skipped().then((data) => ({type: 'skipped', data})),
])
if (startResult.type === 'started') {
// ... handle processing flow
await receiver.event.finished()
} else {
console.log('Processing skipped:', startResult.data.reason)
}
``
> [!WARNING]
> Worker channels provide message routing, not flow validation. The library doesn't verify that events are sent in the expected order or that all expected events are sent - it only ensures messages reach the right handlers when they are sent.
MIT License - see LICENSE file for details.