Typesafe API for processing iterable data in TypeScript and JavaScript
npm install @szilanor/stream
Type-safe, lazy data processing for TypeScript and JavaScript.
Inspired by Java Streams, C# LINQ, and Kotlin Sequences.
``bash`
npm install @szilanor/stream
Stream API creates a pipeline where data flows through operations. It is lazy, meaning items are processed one by one, and only if needed.
`typescript
import { stream, filter, map, toArray } from "@szilanor/stream";
const result = stream([1, 2, 3, 4, 5])
.pipe(
filter((x) => x % 2 === 0),
map((x) => x * 10),
)
.collect(toArray());
console.log(result); // [20, 40]
`
- Lazy Evaluation: Avoids creating intermediate arrays. Great for large datasets or performance-critical paths.
- Type Safety: Infers types correctly through complex pipelines.
- Async Support: Native support for AsyncIterable and async transformations.
- Lightweight: Zero dependencies and tree-shakeable.
Under the hood, Stream API uses JavaScript Itarator and Generators.
When you call .pipe(), you aren't iterating the data yet. You are composing a chain of generator functions. Iteration only begins when you call .collect() (or iterate the stream manually). This is what allows for efficiency gains:
1. Short-circuiting: Operations like find or take stop the generator chain early.sort
2. Low Memory: Only one item is held in memory at a time (unless buffering is explicitly required, like in ).
Standard array methods like .map().filter() create a new array for every step. Stream API does not.
`typescript
import { stream, map, filter, take, toArray } from "@szilanor/stream";
// Standard JS: Iterates the entire array multiple times
const bad = hugeArray
.map(transform) // Allocates new array size of hugeArray
.filter(predicate) // Allocates another new array
.slice(0, 5);
// Stream API: Iterates once, stops early
const good = stream(hugeArray)
.pipe(
map(transform),
filter(predicate),
take(5), // Stops processing after finding 5 items
)
.collect(toArray());
`
Handling async data streams seamlessly.
`typescript
import { stream, mapAsync, filterAsync, toArrayAsync } from "@szilanor/stream";
const activeUsers = await stream(userIds)
.pipeAsync(
mapAsync(async (id) => {
const user = await fetchUser(id);
return user;
}),
filterAsync((user) => user.isActive),
)
.collectAsync(toArrayAsync());
`
Extending the library is as simple as writing a generator function.
`typescript
import { OperationFunction, stream, toArray } from "@szilanor/stream";
// Emit every Nth item
const everyNth =
return function* (iterable) {
let i = 0;
for (const item of iterable) {
if (i++ % n === 0) yield item;
}
};
};
stream([1, 2, 3, 4, 5, 6]).pipe(everyNth(2)).collect(toArray()); // [1, 3, 5]
`
| | Stream API | RxJS | Native Array Methods |
| ------------- | --------------------------------- | ------------------------ | -------------------- |
| Paradigm | Pull-based (Iterators) | Push-based (Observables) | Eager |
| Data Type | Data in motion / Collections | Events / Time streams | Static Arrays |
| Async | AsyncIterable (await/for-await) | Observables (subscribe) | Promise.all needed |
The main difference lies in who controls the flow:
- Stream API (Pull): _You_ are in control. You ask for the next item when you are ready. This is ideal for "Data in motion" (reading files, database cursors, large algorithms) where you want to process data at your own pace without being overwhelmed.
- RxJS (Push): _The source_ is in control. Data arrives whenever it wants (mouse clicks, WebSocket messages, timers). You must react to it immediately.
- Native Arrays: Data is static. It sits in memory, allowing random access (index [0]`), but requiring all data to be loaded before processing begins.