strict-stream - strictly / strongly typed stream
npm install strict-streamstrict-stream is a tiny and lightweight library that helps manage strictly/strongly typed streams using AsyncIterable as the core principle to enable strict data pipelines with useful behavior.
It ensures that the data flowing through a stream conforms to a specific data type or structure, which helps catch errors early on, reduce bugs, and make code more reliable and easier to maintain.
Why Iterable and AsyncIterable Matter
-----------------------------------------------
In JavaScript and TypeScript, Iterable and AsyncIterable are two important interfaces that allow you to work with sequences of values.
An Iterable is an object that can be iterated over using a for...of loop or the Spread operator, while an AsyncIterable represents a sequence of values that are produced asynchronously, such as through a network request or database query.
Using these interfaces has several advantages:
* Type safety: By using Iterable and AsyncIterable, you can ensure that the data you're working with is strongly typed and conforms to a specific schema. This helps catch errors early in the development process and makes your code more robust and reliable.
* Composability: Because Iterable and AsyncIterable are composable, you can easily create complex data pipelines that process, transform, and filter data in a modular way. This makes it easier to reason about your code and maintain it over time.
* Performance: Iterable and AsyncIterable are highly optimized for performance, allowing you to process data streams with high throughput up to millions of records per second. This makes them an ideal choice for working with large datasets or real-time data streams.
For more information on Iterable and AsyncIterable, check out the following links:
* MDN web docs: Iterable
* MDN web docs: AsyncIterable
Installation
------------
To install strict-stream, you can use your preferred package manager:
npm install strict-stream
or
yarn add strict-stream
Usage
-----
Here's a simple example that demonstrates how to use strict-stream:
``typescript
import {of} from 'strict-stream';
import {filter} from 'strict-stream/filter';
async function* generateData() {
yield {name: 'Alice', age: 30};
yield {name: 'Bob', age: 40};
yield {name: 'Charlie', age: 50};
}
async function example() {
// AsyncIterable<{name: string, age: number}>
const stream = of(generateData())
.pipe(
filter(({age}) => age > 30)
);
for await (const data of stream) {
console.log(Name: ${data.name}, Age: ${data.age});
}
// Name: Bob, Age: 40
// Name: Charlie, Age: 50
}
await example();
`` of
- This code demonstrates how to use the and filter functions from the library to create a typed stream and filter the data.generateData
- First, the function is an async generator function that yields objects with a name and age property.typed stream
- Next, the of function is used to create a from the generator function generateData. The resulting stream is an AsyncIterable of objects with a name and age property.pipe
- The method is then used to apply a filter to the stream, keeping only the objects where the age property is greater than 30.for-await-of
- Finally, the resulting stream is iterated over using a loop. The output shows only the objects where age is greater than 30.
It gives you these IDE hints with strong types and guides you to follow types.
There is a function getUsers() that gives you AsyncIterable<{name: string, age: number}> and all pipeline operations are type-safe.
`typescript
import {sequence} from "strict-stream/sequence";
import {map} from "strict-stream/map";
import {from} from "strict-stream/from";
import {filter} from "strict-stream/filter";
async function example() {
const usersStream =
from(
// gives AsyncIterable
// sequence 0,1,2,3,4;
sequence(5)
)
.pipe(
// takes only 0, 2, 4
filter((id) => id % 2 === 0)
)
.pipe(
// maps to {type: string, id: number, name: string}
map((id) => ({
type: 'User',
id,
name: User ${id}
}))
)
// inferred type
// AsyncIterable<{type: string, id: number, name: string}>
for await (const user of usersStream) {
console.log(user)
}
// { type: 'User', id: 0, name: 'User 0' }
// { type: 'User', id: 2, name: 'User 2' }
// { type: 'User', id: 4, name: 'User 4' }
}
`
1. There is a sequence function that generates a sequence of numbers 0,1,2,3,4. filter
2. This sequence is filtered using the function to include only the even numbers (i.e., 0, 2, and 4). map
3. And the resulting sequence is mapped using the function to convert each number into a user object with a type, name and id. userStreams
4. Finally is a strictly typed AsyncIterable<{type: string, id: number, name: string}>
An example of how to create a stream / AsyncIterable with a generator:
`typescript
async function* generateData() {
yield {name: 'Alice', age: 30};
yield {name: 'Bob', age: 40};
yield {name: 'Charlie', age: 50};
}
async function example() {
const stream = generateData();
for await (const data of stream) {
console.log(Name: ${data.name}, Age: ${data.age});
}
// Name: Alice, Age: 30
// Name: Bob, Age: 40
// Name: Charlie, Age: 50
}
await example();
`
- In this example, generateData is a generator function that yields three objects with name and age properties.example
- The function creates a stream from the generator by simply calling it and assigns it to the stream variable. for await...of
- Then, it iterates over the stream using a loop and logs the name and age properties of each object.
reader function which creates an async iterable stream from a reader function.
The reader function is called every time a new value is requested from the stream and should return the value or DONE if there are no more values.
`typescript
import { reader } from 'strict-stream/reader';
async function example() {
const array = [1, 2, 3];
const stream = reader
const value = array.shift();
return value === undefined ? reader.DONE : value;
});
for await (const number of stream) {
console.log(number);
}
// Output: 1
// Output: 2
// Output: 3
}
await example();
`
- In this example, the reader function is called with a generator function that pops a value from the array on each call.
- When there are no more values, it returns reader.DONE, which signals to the stream that there are no more values to yield. await...of
- Finally, the for loop is used to consume the values from the stream.
`typescript
import {reader} from 'strict-stream/reader';
import {map} from 'strict-stream/map';
async function example() {
const array = [1, 2, 3];
const stream = reader
const value = array.shift();
return value !== undefined ? value : reader.DONE;
});
const transformedStream = of(stream)
.pipe(
map((value) => value * 2)
);
for await (const value of transformedStream) {
console.log(value);
}
// Output: 2, 4, 6
}
await example();
`
- In this example, the function passed to reader returns the next value in the array each time it is called, using array.shift(). reader.DONE
- If there are no more values in the array, it returns the special value to indicate that the stream is complete.of
- The function is then used to create a composable stream from the AsyncIterable returned by reader. pipe
- This stream has a method that can be used to apply a series of transformations to the stream.map
- The operator is used to transform the stream by multiplying each value by 2. map
- The operator takes a function that is applied to each value in the stream, and returns a new stream with the transformed values.await...of
- Finally, the transformed stream is iterated over using a for loop. 2, 4, 6
- In this case, the output will be which are the values of the original array multiplied by 2.
is a factory function that creates a new instance of a composable stream by wrapping an AsyncIterable
- The resulting stream can be composed with other stream functions using the pipe method. #### An example:
`typescript
import {of} from "strict-stream";
import {map} from "strict-stream/map";async function* generateIds() {
yield 1
yield 2
yield 3
}
async function example() {
const stream = of(generateIds())
.pipe(
map(async (id) => ({id, name:
User ${id}}))
); for await (const data of stream) {
console.log(
Id: ${data.id}, Name: ${data.name});
}
// Id: 1, Name: User 1
// Id: 2, Name: User 2
// Id: 3, Name: User 3
}await example();
`- The code above is a simple to create and
transform streams of data.
- The generateIds function is a generator that yields three numbers (1, 2, and 3) in sequence.
- The of function is used to create a stream from the generator by passing generateIds as its argument.
- The pipe method is used to apply a transformation to the stream.
- In this case, the map function is used to transform each item in the stream.
- The map function takes a callback that is called with each item in the stream, and returns a new value for that item.
- In this case, the callback takes an id value and returns an object with two properties: id and name.
- Finally, the transformed stream is consumed with a for-await-of loop, which iterates through each item in the stream and logs its id and name properties to the console.
- The output will be Id: 1, Name: User 1, Id: 2, Name: User 2, and Id: 3, Name: User 3.#### An example (advanced, custom mapper):
`typescript
import {of, StrictStreamMapper} from "strict-stream";async function* generateIds() {
yield 1
yield 2
yield 3
}
async function example() {
// my first stream mapper; maps inputStream to mappedStream;
function myMap(mapper: (input: Input) => Promise
const stream = of(generateIds())
.pipe(
myMap(async (id) => ({id, name:
User ${id}}))
); for await (const data of stream) {
console.log(
Id: ${data.id}, Name: ${data.name});
}
// Id: 1, Name: User 1
// Id: 2, Name: User 2
// Id: 3, Name: User 3
}await example();
`-
of creates a new stream instance from the generateIds async generator.
- the resulting stream is composed with the myMap function that transforms each id into an object with id and name properties.
- finally, the transformed stream is iterated using a for await...of loop.$3
The
from function is used to convert any iterable object, whether synchronous or asynchronous, to a StrictStream.It takes a single argument of type
StrictStreamLike, which can be either an Iterable or an AsyncIterable;The
from function returns a StrictStream object of type StrictStreamOf, which has a pipe method that can be used to transform the stream.StrictStreamLike type means AsyncIterable#### An example
`typescript
import {from} from "strict-stream/from";
import {map} from "strict-stream/map";async function* generateIds() {
yield 1
yield 2
yield 3
}
async function example() {
const streamLike1: Iterable = [1, 2, 3];
const streamLike2: AsyncIterable = generateIds(); // is equivalent
// could consume
streamLike1 or streamLike2
const stream = from(streamLike1)
.pipe(
map(async (id) => ({id, name: User ${id}}))
); for await (const data of stream) {
console.log(
Id: ${data.id}, Name: ${data.name});
}
// Id: 1, Name: User 1
// Id: 2, Name: User 2
// Id: 3, Name: User 3
}await example();
`- The example demonstrates how to use the
from function to turn an iterable into a composable stream.
- An asynchronous generator function called generateIds is defined that yields the numbers 1, 2, and 3.
- streamLike1 is defined as an array containing the numbers 1, 2, and 3.
- streamLike2 is defined as an async iterable that is equivalent to generateIds.
- The from function is then used to create a stream from streamLike1.
- This stream is then piped through a map function that maps each number to an object containing an id and a name field.
- Finally, the resulting stream is consumed using a for await loop$3
tap is a utility function that allows you to perform side-effects on each element of a stream without modifying the stream itself. It works by taking a callback function as an argument, which is invoked for each element of the stream,
but then simply returns the original element, so that it can be passed on to the next step in the pipeline unchanged.
#### An example:
`typescript
import {of} from "strict-stream";
import {tap} from "strict-stream/tap";async function example() {
async function* generateIds() {
yield 1
yield 2
yield 3
}
const transformedStream = of(generateIds())
.pipe(
tap((value) => console.log(value))
);
for await (const value of transformedStream) {
/ empty /
}
// 1
// 2
// 3
}
await example();
`- In this example, we start with an asynchronous generator that yields three numbers: 1, 2, and 3.
- We then use the
of function to wrap this generator in a composable stream, and then use the pipe method to apply the tap function to the stream.
- The tap function simply logs each element of the stream to the console.
- Finally, we iterate over the transformed stream using a for-await-of loop, which triggers the evaluation of the stream and executes the side-effects of the tap function.
- However, since tap returns each element unchanged, the loop does not actually output anything to the console.
- The output of the example, therefore, is simply the values 1, 2, and 3, printed to the console by the tap function.$3
- Consumes the given AsyncIterable, iterating over its values, and returns a Promise that resolves to the last value of the stream.
- If the stream is empty, the function returns a default value, which is optional and defaults to undefined.#### An example
`typescript
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";async function example() {
async function* generateIds() {
yield 1
yield 2
yield 3
}
const stream = of(generateIds())
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// Output
// 1
// 2
// 3
}
await example();
`- It then creates a stream by calling of with
generateIds as its argument. It then pipes this stream through a tap operation which logs each value emitted by the stream to the console.
- Finally, it calls the run function to execute the stream. The run function returns a Promise that resolves when the stream has completed.
- In this case, it logs the numbers 1, 2, and 3 to the console.$3
#### An example
`typescript
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
import {sequence} from "strict-stream/sequence";async function example() {
const sequenceStream = of(sequence(3))
.pipe(
tap((value) => console.log(value))
);
await run(sequenceStream)
// 0
// 1
// 2
}
await example();
`- The code is an example of how to use the
sequence function to generate a stream of numbers with a given length
- And then use the of and pipe functions to transform the stream by appending a tap function that logs each value in the stream to the console.
- Specifically, the sequence function generates a stream of numbers from 0 up to the given length.
- The of function is then used to create a new stream from the output of the sequence function, and the pipe method is called to add the tap function as a transform to the stream.
- Finally, the run function is called to consume the stream and log each value to the console.When the example function is called
- It creates a new stream using
of(sequence(3)), which generates a stream of numbers from 0 to 2.
- The pipe method is then used to append a tap function that logs each value in the stream to the console.
- Finally, the run function is called to consume the stream and log each value to the console.
- The output is: 1, 2, 3$3
The
map function is a higher-order function that takes a function mapper as input and returns another function that applies the mapper function to every element in a stream. The mapper function
transforms each element of the stream and returns a new output element. #### An example
`typescript
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
import {sequence} from "strict-stream/sequence";
import {map} from "strict-stream/map";async function example() {
const sequenceStream = of(sequence(3))
.pipe(
map((id) => id * 2)
)
.pipe(
tap((value) => console.log(value))
);
await run(sequenceStream)
// 0
// 2
// 4
}
await example();
`- In the example function, the
of function is used to create a new stream from the sequence generator function that generates a sequence of numbers from 0 to 2.
- This stream is then piped through the map function, which multiplies each number in the stream by 2.
- The resulting stream is then piped through the tap function, which logs each element in the stream to the console.
- Finally, the run function is called to consume the stream and output its elements.
- The output of the example function will log the numbers 0, 2, and 4 to the console, which are the result of multiplying the original numbers generated by sequence by 2.$3
The filter function is a higher-order function that takes a condition function as its input and returns a function that can be used as a stream mapper. The condition function is applied to each item in the stream, and only those items for which the condition function returns a truthy value are included in the output stream.
#### An example
`typescript
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
import {sequence} from "strict-stream/sequence";
import {filter} from "strict-stream/filter";async function example() {
const stream = of(sequence(3))
.pipe(
filter((id) => id > 0)
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// 1
// 2
}
await example();
`- In the example function, the
of function is used to create a stream from the sequence generator that yields three numbers (0, 1, and 2).
- This stream is then piped to a filter mapper that only allows numbers greater than 0 to pass through.
- The resulting stream is then piped to a tap mapper that logs each item to the console.
- Finally, the run function is used to execute the stream and log the output to the console.
- The output of this code will be the numbers 1 and 2, since those are the only numbers in the original sequence that meet the filter condition greater than 0.$3
The
reduce function is a higher-order function that takes a reducer function and an initial value as input, and returns a new function that can be used to transform a stream of values. The reducer function takes an
accumulator value and an input value, and returns a new accumulator value. The reduce function applies the reducer function to each value in the stream,
accumulating the results into a final value that is emitted by the resulting stream.#### An example
`typescript
import {of, run} from "strict-stream";
import {sequence} from "strict-stream/sequence";
import {reduce} from "strict-stream/reduce";async function example() {
const stream = of(sequence(5))
.pipe(
reduce(({counter}) => ({counter: counter + 1}), {counter: 0})
);
const result = await run(stream);
console.log(result)
// { counter: 5 }
}
await example();
`- In the example, the reduce function is used to count the number of values in a stream.
- The stream is created using the
sequence function, which generates a stream of numbers from 0 to 4.
- The reduce function takes an object with a counter property as the initial value, and a reducer function that increments the counter property for each input value.
- The resulting stream emits a single object with the final value of the counter property, which is 5 in this case.
- The run function is used to execute the stream and log the final result.$3
-
batch is a function that returns a mapper function that takes an input stream and emits an array of inputs that are processed in batches of a given size.
- And when the batch reaches the desired size it emits the batch downstream.#### An example
`typescript
import {of, run} from "strict-stream";
import {sequence} from "strict-stream/sequence";
import {batch} from "strict-stream/batch";async function example() {
const stream = of(sequence(3))
.pipe(
batch(2)
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// Output
// [ 0, 1 ]
// [ 2 ]
}
await example();
`- The example code creates a
sequence stream of 3 numbers
- And pipes it through the batch function with a batch size of 2.
- The resulting stream emits two arrays,
- The first with the values [0, 1] and the second with the value [2].
- The tap function is used to log each emitted value to the console.
$3
- The
flat function is a stream transformer that flattens the first level of stream or an array (Iterable).
- If the input stream contains arrays or nested streams
- the flat function will iterate over each element in the array or nested stream and emit it as a separate item in the output stream.#### An example
`typescript
import {run} from "strict-stream";
import {from} from "strict-stream/from";
import {flat} from "strict-stream/flat";
import {tap} from "strict-stream/tap";async function example() {
const stream = from(
[
[1, 2],
[3, 4],
5
]
)
.pipe(
flat()
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// 1
// 2
// 3
// 4
// 5
}
await example();
`- In the example code, the
from function is used to create a stream from an array that contains nested arrays and a single value.
- The flat function is then used to flatten the first level of stream so that each element in the nested arrays is emitted as a separate item in the output stream.
- Finally, the tap function is used to log each item.
- When the example function is run, the output stream contains each element in the nested arrays and the single value, emitted as separate items in the stream.$3
-
flatMap is a function that maps each element of a stream to another stream and then flattens the first level of resulting stream of streams into a single stream.
- It takes a mapper function that maps the input element.
- The resulting stream is then flat mapped, meaning that it is flattened so that all elements are emitted in a single stream.#### An example
`typescript
import {run} from "strict-stream";
import {from} from "strict-stream/from";
import {flatMap} from "strict-stream/flatMap";async function example() {
type User = {
id: number;
name: string;
orders: Order[];
};
type Order = {
id: number;
product: string;
price: number;
};
const users: User[] = [
{
id: 1,
name: "Alice",
orders: [
{id: 101, product: "Widget A", price: 10.0},
{id: 102, product: "Widget B", price: 20.0},
],
},
{
id: 2,
name: "Bob",
orders: [
{id: 201, product: "Widget C", price: 30.0},
{id: 202, product: "Widget D", price: 40.0},
{id: 203, product: "Widget E", price: 50.0},
],
},
];
async function fetchStreamOfUsers(): Promise> {
return from(users);
}
// StrictStreamOf<{userId: number, orderId: number}
const stream = (await fetchStreamOfUsers())
.pipe(
flatMap(async (user) => {
return from(user.orders)
.pipe(
map(
async (order) => {
return {
userId: user.id,
orderId: order.id,
price: order.price
}
})
)
})
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// { userId: 1, orderId: 101, price: 10 }
// { userId: 1, orderId: 102, price: 20 }
// { userId: 2, orderId: 201, price: 30 }
// { userId: 2, orderId: 202, price: 40 }
// { userId: 2, orderId: 203, price: 50 }
}
await example();
`- In the provided example,
flatMap is used to flatten the orders of the users.
- A stream of users is created using the from function.
- The flatMap function is then called on this stream, mapping each user to a stream of orders using the from function again.
- The resulting stream of orders is then mapped to an object with the userId, orderId, and price using the map function.
- Finally, the resulting stream of objects is logged using the tap function.
- When the stream is run using the run function, it logs each object in the stream, which contains the userId, orderId, and price for each order.$3
- The
pipe function is used to create composable behavior for StrictStreams.
- It takes a StrictStreamMapper as an input, which is a function that transforms a StrictStream of one type to a StrictStream of another type.
- pipe then returns a StrictStreamPlumber, which is a function that takes a StrictStream of the original input type and returns a StrictStream of the final output type.
- pipe also has a pipe method on the returned function, which allows for easy composition of multiple StrictStreamMappers.
#### An example
`typescript
import {run, pipe} from "strict-stream";
import {from} from "strict-stream/from";
import {map} from "strict-stream/map";async function example() {
// composable behavior
const addFive = pipe(
map((input: number) => input + 4)
)
.pipe(
map(async (input) => input + 1)
)
// High order function to manage / compose part of the pipe
function multiple(x: number) {
return pipe(
map(async (value: number) => value * x)
);
}
const stream = from([1, 2, 3])
.pipe(
addFive
)
.pipe(multiple(2))
.pipe(tap((value) => console.log(value)))
await run(stream)
// 12
// 14
// 16
}
await example();
`- In the
example function, we create two separate StrictStreamMappers using pipe.
- We then use the multiple function to create another StrictStreamMapper that multiplies the input value by a given number.
- We then compose these three mappers using pipe and use the resulting StrictStreamPlumber to create a stream of numbers.
- Finally, we run the stream and log each value as it is processed.
- The output will be 12, 14, 16.#### An example of
flatMap implementationThere is a composition of
map and flat functions.`typescript
export function flatMap(mapper: (input: Input) => Promised`
- The flatMap function is implemented using the pipe function, which composes a set of StrictStreamMapper functions into a single StrictStreamMapper.
- In the implementation of flatMap, the map function is first applied to the mapper argument
- Resulting in a new StrictStreamMapper that transforms the input values using the mapper function.
- This transformation may result in an output value or a StrictStreamLike object that contains a set of output values.
- The resulting StrictStreamMapper is then piped into the flat function, which flattens any StrictStreamLike objects into a stream of individual output values.$3
- Basically the
map function with desired concurrency to process records. That keeps the ordering of output stream unchanged.
- The scaleSync function takes two arguments, the first one is a number which represents the concurrency, and the second one is a mapper function that maps the input to the output.#### An example
`typescript
import {run, of} from "strict-stream";
import {scaleSync} from "strict-stream/scaleSync";async function fetchUserById(id: number) {
// some logic to fetch the use
return {
id,
userName:
User ${id}
};
}async function getUserIds() {
return sequence(3);
}
async function example() {
const usersStream = of(await getUserIds())
.pipe(
// run's the async queries concurrently, keeps the ordering of output stream unchanged
scaleSync(5, async (id) => fetchUserById(id))
)
.pipe(
tap((value) => console.log(value))
);
await run(usersStream)
// { id: 0, userName: 'User 0' }
// { id: 1, userName: 'User 1' }
// { id: 2, userName: 'User 2' }
}
await example();
`- In the example, the
scaleSync function is used to fetch user details for a given set of user ids.
- The fetchUserById function fetches the user details asynchronously for a given user id, and the getUserIds function generates a stream of user ids.
- The usersStream is created with concurrency of 5, and executing the fetchUserById function for each id.
- The resulting user details are logged to the console using the tap function.
$3
- concatenate is a function that concatenates multiple streams into a single stream
- ensuring that the records are read sequentially one by one, and maintains the ordering of the output stream unchanged.
- The implementation of the function is done using rest parameters to allow for an arbitrary number of streams to be concatenated#### An example
`typescript
import {run, of} from "strict-stream";
import {concatenate} from "strict-stream/concatenate";
import {from} from "strict-stream/from";
import {tap} from "strict-stream/tap";async function* generateIds() {
yield 10
yield 20
yield 30
}
async function example() {
const streamLike1: Iterable = [1, 2, 3];
const streamLike2: AsyncIterable = generateIds(); // is equivalent
const stream = from(
concatenate(
from(streamLike1),
from(streamLike2),
)
).pipe(
tap((value) => console.log(value))
);
await run(stream)
// 1
// 2
// 3
// 10
// 20
// 30
}
await example();
`- In the provided example, two stream-likes, one iterable and one async iterable, are concatenated using
concatenate.
- The resulting stream is then converted into a strict stream using the from function
- And a tap operation is performed on it to log each record.
- Finally, the stream is run using the run function, which is a utility function to consume and execute the stream.
- The output shows that the resulting stream contains all the records from both input streams in the correct order.$3
-
interval is a function that creates a stream that emits a sequence of integers at regular intervals.
- It takes two parameters: the duration of the interval in milliseconds, and a boolean flag indicating whether the stream should start emitting immediately or after one interval has elapsed.
- The function returns a StrictStream object with an additional method stop that can be used to stop the interval stream.#### An example
`typescript
import {run, of} from "strict-stream";
import {tap} from "strict-stream/tap";
import {map} from "strict-stream/map";
import {interval} from "strict-stream/interval";async function example() {
// every 300ms
const source = interval(300);
let counter = 0;
const stream = of(source)
.pipe(
map(() => {
counter++
if (counter > 3) {
// stops the interval stream
source.stop()
}
return counter;
})
)
.pipe(
tap((value) => console.log(value))
)
await run(stream)
// 1
// 2
// 3
// 4
}
await example();
`
- This example creates an interval stream that emits every 300ms
- And uses the map operator to increment a counter and stop the stream after 4 emissions.
- The tap operator is used to log the emitted values to the console.
$3
#### nodeReadable