A reactive dataflow system that makes coders happy
npm install lulzA reactive dataflow system that makes coders happy. š
Inspired by FFmpeg filtergraph notation, Node-RED, and RxJS.
``javascript
import { flow, inject, debug } from 'lulz';
const app = flow([
[inject({ payload: 'Hello!' }), debug({ name: 'out' })],
]);
app.start();
`
- EventEmitter API - Natural emit/on for packet injection and interception['in', a, b, c, 'out']
- Series by Default - processes sequentially['in', [a, b, c], 'out']
- Explicit Parallel - fans out to allmap
- Auto-compose - Embed flows within flows seamlessly
- Worker Pool - CPU-bound tasks with Worker Threads/Web Workers
- RxJS Operators - , filter, scan, combineLatest, and moreinject
- Node-RED Style - , debug, func, switch, template
`bash`
npm install lulz
`javascript
import { flow, inject, debug, func } from 'lulz';
// Create a flow
const app = flow([
[inject({ payload: 'Hello World!' }), 'input'],
['input', func({ func: msg => ({ ...msg, payload: msg.payload.toUpperCase() }) }), 'output'],
['output', debug({ name: 'result' })],
]);
// Start producers
app.start();
// Or inject manually via EventEmitter API
app.emit('input', { payload: 'Manual injection!' });
// Listen to any pipe
app.on('output', (packet) => {
console.log('Received:', packet.payload);
});
`
Functions process sequentially. Output of one becomes input of next.
`javascript`
['input', validate, transform, save, 'output']
// ā ā ā
// packet ā packet ā packet ā output
Use [] or parallel() to fan out. All receive the same packet.
`javascript`
['input', [processA, processB, processC], 'output']
// ā ā ā
// packet packet packet
// ā ā ā
// āāāāāāāāāāā“āāāāāāāāāā
// ā
// output (3 packets)
`javascript
import { series, parallel } from 'lulz';
// Explicit series (same as default, but clearer)
['input', series(a, b, c), 'output']
// Explicit parallel
['input', parallel(a, b, c), 'output']
`
Every flow is an EventEmitter. Pipes are events.
`javascript
const app = flow([
['data', transform, 'result'],
]);
// Inject packets
app.emit('data', { payload: 42 });
// Listen to pipes
app.on('result', (packet) => {
console.log(packet.payload);
});
// Also works
app.inject('data', { payload: 42 });
`
Regular functions that return inner functions. Called with options.
`javascript
function myTransform(options) {
const { multiplier = 1 } = options;
return (send, packet) => {
send({ ...packet, payload: packet.payload * multiplier });
};
}
// Usage
['input', myTransform({ multiplier: 2 }), 'output'] // Pre-configured
['input', myTransform, 'output'] // Auto-configured with {}
`
Arrow functions that process packets. Used directly.
`javascript
const double = (send, packet) => {
send({ ...packet, payload: packet.payload * 2 });
};
['input', double, 'output']
`
Create reusable flow components.
`javascript
import { subflow, compose } from 'lulz';
// Create reusable component
const sanitizer = subflow([
['in', func({ func: msg => ({
...msg,
payload: String(msg.payload).trim().toLowerCase()
})}), 'out'],
]);
// Use via compose
const pipeline = compose(sanitizer, validator, enricher);
pipeline.emit('in', { payload: ' HELLO ' });
// Or embed in flow (auto-compose)
const app = flow([
['input', sanitizer, 'output'],
]);
`
Produce packets on schedule.
`javascript`
inject({
payload: 'hello', // or () => Date.now()
topic: 'greeting',
once: true, // Emit once on start
onceDelay: 100, // Delay before first
interval: 1000, // Repeat interval (ms)
})
Log packets.
`javascript`
debug({
name: 'my-debug',
active: true,
complete: false, // true = full packet
logger: console.log,
})
Execute custom code.
`javascript`
func({
func: (msg, context) => {
return { ...msg, payload: msg.payload.toUpperCase() };
},
})
Modify properties.
`javascript`
change({
rules: [
{ type: 'set', prop: 'payload', to: 'new value' },
{ type: 'change', prop: 'payload', from: /old/, to: 'new' },
{ type: 'delete', prop: 'temp' },
{ type: 'move', prop: 'data', to: 'payload' },
]
})
Route by conditions.
`javascript`
switchNode({
property: 'payload',
rules: [
{ type: 'gt', value: 100 },
{ type: 'regex', value: /^test/ },
{ type: 'else' },
],
})
Render templates.
`javascript`
template({
template: 'Hello {{name}}!',
field: 'payload',
})
`javascript
import { map, scan, pluck, pairwise, buffer } from 'lulz';
map({ fn: x => x * 2 })
scan({ reducer: (acc, x) => acc + x, initial: 0 })
pluck({ path: 'data.value' })
pairwise()
buffer({ count: 5 })
`
`javascript
import { filter, take, skip, distinct, distinctUntilChanged } from 'lulz';
filter({ predicate: x => x > 0 })
take({ count: 5 })
skip({ count: 2 })
distinct()
distinctUntilChanged()
`
`javascript
import { debounce, throttle, delay, timeout } from 'lulz';
debounce({ time: 300 })
throttle({ time: 1000 })
delay({ time: 500 })
timeout({ time: 5000 })
`
`javascript
import { combineLatest, merge, zip, withLatestFrom } from 'lulz';
combineLatest({ pipes: ['temp', 'humidity'] })
merge()
zip({ sources: 2 })
`
Process CPU-bound tasks in parallel using Worker Threads.
`javascript
import { taskQueue, worker, parallelMap } from 'lulz';
// Standalone task queue
const queue = taskQueue({
workers: 4,
handler: (data) => heavyComputation(data),
});
queue.on('result', ({ id, result }) => console.log(result));
queue.submit({ data: 42 });
await queue.drain();
// In a flow
const app = flow([
['input', worker({
workers: 4,
handler: (data) => data * data,
}), 'output'],
]);
`
Create a new flow.
`javascript
const app = flow([...], { username: 'alice' });
app.start(); // Start producers
app.stop(); // Stop producers
app.emit(pipe, packet); // Inject packet
app.on(pipe, handler); // Listen to pipe
app.pipe(name); // Get pipe node
`
Create a reusable flow with in/out pipes.
Connect flows in sequence.
Explicit processing mode markers.
lulz includes a worker pool system for CPU-intensive tasks. It uses Worker Threads in Node.js (and Web Workers in browsers) so heavy computation doesn't block your main thread.
JavaScript is single-threaded. If you compute Fibonacci(45), your entire app freezes:
`javascript`
// ā Bad: Blocks everything
['input', (send, packet) => {
const result = fibonacci(packet.payload); // š§ Frozen for 5 seconds
send({ ...packet, payload: result });
}, 'output']
Workers run in separate threads:
`javascript`
// ā
Good: Non-blocking
['input', worker({ handler: fibonacci }), 'output']
// Main thread stays responsive while workers compute
---
The foundation. An EventEmitter that manages a pool of workers.
`javascript
import { taskQueue } from 'lulz';
// Create a queue with 4 workers
const queue = taskQueue({
workers: 4, // Number of worker threads (default: CPU cores)
handler: (data) => data * data // Function that runs in worker
});
// Listen for completed tasks
queue.on('result', ({ id, result }) => {
console.log(Task ${id} finished:, result);
});
// Listen for errors (handler threw an exception)
queue.on('error', ({ id, error }) => {
console.error(Task ${id} failed:, error);
});
// Listen for all tasks complete
queue.on('drain', () => {
console.log('All tasks done!');
});
// Submit a single task
queue.submit({ id: 'task-1', data: 42 });
// ā Task task-1 finished: 1764
// Submit multiple tasks
queue.submitAll([
{ id: 'a', data: 10 },
{ id: 'b', data: 20 },
{ id: 'c', data: 30 },
]);
`
#### Async Handlers
Handlers can be async:
`javascript
const queue = taskQueue({
workers: 2,
handler: async (url) => {
const response = await fetch(url);
return response.json();
}
});
queue.submit({ data: 'https://api.example.com/data' });
`
#### Queue Control
`javascript
// Check queue status
console.log(queue.stats());
// ā { pending: 5, running: 4, available: 0, totalSubmitted: 9, totalCompleted: 4 }
// Wait for all tasks to complete
await queue.drain();
// Shut down all workers
await queue.terminate();
`
---
Use workers directly in your flows. Packets go in, get processed in worker threads, come out.
`javascript
import { flow, worker } from 'lulz';
const app = flow([
['numbers',
// This runs in a worker thread, not the main thread
worker({
workers: 4,
handler: (n) => {
// Heavy computation here
let sum = 0;
for (let i = 0; i < n * 1000000; i++) {
sum += Math.sqrt(i);
}
return sum;
}
}),
'results'
],
['results', debug({ name: 'computed' })],
]);
// Send numbers to process
app.emit('numbers', { payload: 100 });
app.emit('numbers', { payload: 200 });
app.emit('numbers', { payload: 300 });
// Results arrive as workers complete (may be out of order)
`
#### Preserves Packet Metadata
The worker node keeps your packet's other properties intact:
`javascript
app.emit('numbers', {
payload: 100,
userId: 'alice', // ā preserved
requestId: 'req-123' // ā preserved
});
// Output packet:
// { payload: 12345.67, userId: 'alice', requestId: 'req-123' }
`
---
When you have an array and want each item processed in parallel:
`javascript
import { flow, parallelMap } from 'lulz';
const app = flow([
['images',
parallelMap({
workers: 4,
fn: (image) => {
// Each image processed in its own worker
return {
...image,
thumbnail: generateThumbnail(image),
compressed: compress(image)
};
}
}),
'processed'
],
]);
// Send an array
app.emit('images', {
payload: [image1, image2, image3, image4, image5]
});
// Receive complete array (order preserved!)
// { payload: [processed1, processed2, processed3, processed4, processed5] }
`
Key difference from worker:worker
- : Each packet = one task, results stream outparallelMap
- : One packet with array = many tasks, waits for all, emits single array
---
Shorthand when you just want to run a function in a worker:
`javascript
import { flow, cpuTask } from 'lulz';
// Instead of this:
worker({ handler: (n) => fibonacci(n) })
// Write this:
cpuTask((n) => fibonacci(n))
`
Example:
`javascript`
const app = flow([
['input', cpuTask(expensiveCalculation), 'output'],
]);
It's just sugar for worker({ handler: fn }) with default worker count.
---
#### Pattern 1: Fan-Out Computation
Process the same data multiple ways in parallel:
`javascript`
const app = flow([
['data', [
worker({ handler: analyzeWithMethodA }),
worker({ handler: analyzeWithMethodB }),
worker({ handler: analyzeWithMethodC }),
], 'analyzed'],
]);
// All three analyses run simultaneously in different workers
#### Pattern 2: Pipeline with Mixed Threading
Some steps in main thread, heavy steps in workers:
`javascript`
const app = flow([
['request',
validate, // Fast: main thread
parseInput, // Fast: main thread
worker({ handler: heavyTransform }), // Slow: worker
formatOutput, // Fast: main thread
'response'
],
]);
#### Pattern 3: Batch Processing
Split ā process in workers ā join:
`javascript
const app = flow([
// Split array into individual items
['batch', split(), 'item'],
// Process each in workers
['item', worker({ handler: processOne }), 'processed'],
// Collect results (need custom collector)
['processed', join({ count: expectedCount }), 'complete'],
]);
`
Or just use parallelMap which does this for you:
`javascript`
const app = flow([
['batch', parallelMap({ fn: processOne }), 'complete'],
]);
---
Worker errors don't crash your app. They emit on the 'error' event:
`javascript
const queue = taskQueue({
handler: (data) => {
if (data < 0) throw new Error('Negative not allowed');
return Math.sqrt(data);
}
});
queue.on('result', ({ id, result }) => {
console.log(${id} = ${result});
});
queue.on('error', ({ id, error }) => {
console.log(${id} failed: ${error});
});
queue.submit({ id: 'good', data: 16 }); // ā good = 4
queue.submit({ id: 'bad', data: -1 }); // ā bad failed: Negative not allowed
`
In flows, errors become packet properties:
`javascript
['input', worker({ handler: riskyFunction }), 'output']
// If handler throws, packet becomes:
// { payload: ..., error: 'Error message' }
`
---
`javascript
import { cpus } from 'os';
taskQueue({
workers: cpus().length, // Default: number of CPU cores
handler: fn, // Required: function to run in worker
})
worker({
workers: 4, // Default: number of CPU cores
handler: fn, // Required: function to run in worker
})
parallelMap({
workers: 4, // Default: number of CPU cores
fn: fn, // Required: function to run in worker
})
`
---
ā
Use workers for:
- Mathematical computations (crypto, statistics, ML inference)
- Image/video processing
- Data parsing (large JSON, CSV)
- Compression/decompression
- Any task taking >50ms
ā Don't use workers for:
- Simple transformations (x * 2)
- I/O-bound tasks (use async/await instead)
- Tasks needing DOM access (workers can't touch DOM)
- Very small tasks (worker overhead > computation)
The overhead of sending data to a worker and back is ~1-5ms. If your task takes less than that, just run it in the main thread.
``
lulz/
āāā index.js # Main exports
āāā src/
ā āāā flow.js # Core engine
ā āāā red-lib.js # Node-RED style nodes
ā āāā rx-lib.js # RxJS operators
ā āāā workers.js # Worker pool
ā āāā utils.js # Utilities
āāā test.js
āāā examples.js
āāā TODO.md # Future operators
āāā README.md
`bash``
npm test # Run tests
npm run examples # Run examples
MIT