Primitives & operators for Communicating Sequential Processes based on async/await and async iterables
npm install @thi.ng/csp
!npm downloads

> [!NOTE]
> This is one of 214 standalone projects, maintained as part
> of the @thi.ng/umbrella monorepo
> and anti-framework.
>
> 🚀 Please help me to work full-time on these projects by sponsoring me on
> GitHub. Thank you! ❤️
- About
- What is CSP?
- Buffering behaviors
- Channels
- Other channel types
- Channel operators
- Status
- Related packages
- Installation
- Dependencies
- Usage examples
- API
- Ping pong
- PubSub
- Authors
- License
Primitives & operators for Communicating Sequential Processes based on async/await and async iterables.
> [!IMPORTANT]
> This package was temporarily deprecated (throughout most of 2023), but
> meanwhile has been reanimated in the form of a complete rewrite, using a
> new, more simple and more modern approach afforded by contemporary ES language
> features (and widespread support for them).
>
> **This new/current implementation is in most cases NOT compatible with earlier
> versions**.
References:
- Wikipedia
- Communicating Sequential Processes, C.A.R.
Hoare
The key construct of this package is a read/write channel primitive which can be
customized with different buffer implementations to control blocking behaviors
and backpressure handling (aka attempting to write faster to a channel than
values are being read, essentially a memory management issue). Unbuffered CSP
channels are blocking on both the reader and writer side.
The following channel buffer types/behaviors are included (from the
thi.ng/buffers
package), all accepting a max. capacity and all implementing the
IReadWriteBuffer
interface required by the channel:
- fifo: First in,
first out ring buffer. Writes to the channel will start blocking once the
buffer's capacity is reached, otherwise complete immediately. Likewise,
channel reads are non-blocking whilst there're more buffered values available.
Reads will only block if the buffer is empty.
- lifo: Last in,
first out. Write behavior is the same as with fifo, reads are in reverse
order (as the name indicates), i.e. the last value written will be the first
value read (i.e. stack behavior).
- sliding:
Sliding window ring buffer. Writes to the channel are never blocking!
Whilst the buffer is at full capacity, new writes will first expunge the
oldest buffered value (similar to LRU
cache
behavior). Read behavior is the same as for fifo.
- dropping:
Dropping value ring buffer. Writes to the channel are never blocking!
Whilst the buffer is at full capacity, new writes will be silently ignored.
Read behavior is the same as for fifo.
As mentioned previously,
channels and their
read,
write and
close operations
are the key building blocks for CSP.
- Mult for channel
multiplexing (aka one-to-many splitting) and dynamic add/removal of
subscribers
- PubSub for
topic-based subscriptions, each topic implemented as a Mult
- broadcast()
- channel()
- concat()
- consume()
- consumeWith()
- drain()
- fromAsyncIterable()
- into()
- merge()
- mult()
- pipe()
- pubsub()
- select()
- timeout()
BETA - possibly breaking changes forthcoming
Search or submit any issues for this package
- @thi.ng/fibers - Process hierarchies & operators for cooperative multitasking
- @thi.ng/rstream - Reactive streams & subscription primitives for constructing dataflow graphs / pipelines
- @thi.ng/transducers-async - Async versions of various highly composable transducers, reducers and iterators
``bash`
yarn add @thi.ng/csp
ESM import:
`ts`
import * as csp from "@thi.ng/csp";
Browser ESM import:
`html`
For Node.js REPL:
`js`
const csp = await import("@thi.ng/csp");
Package sizes (brotli'd, pre-treeshake): ESM: 1.80 KB
- @thi.ng/api
- @thi.ng/arrays
- @thi.ng/buffers
- @thi.ng/checks
- @thi.ng/errors
Note: @thi.ng/api is in _most_ cases a type-only import (not used at runtime)
One project in this repo's
/examples
directory is using this package:
| Screenshot | Description | Live demo | Source |
|:---------------------------------------------------------------------------------------------------------------|:-----------------------------------------------------------------------------|:----------------------------------------------|:---------------------------------------------------------------------------|
|
| CSP channel-based event handling, async transducers & reactive UI components | Demo | Source |
`ts tangle:export/readme-pingpong.ts
import { channel } from "@thi.ng/csp";
// create CSP channel for bi-directional communication
const chan = channel
// create first async process (ping)
(async () => {
while (true) {
// this op will block until a value becomes available in the channel
const x = await chan.read();
// if the channel was closed meanwhile, read() will deliver undefined
if (x === undefined || x > 5) {
console.log("stopping...");
// calling close() is idempotent
// any in-flight writes will still be readable
chan.close();
break;
}
console.log("ping", x);
// this op will also block until the other side is reading the value
await chan.write(x + 1);
}
console.log("ping done");
})();
// create second async process (pong, almost identical to ping)
(async () => {
while (true) {
// wait until value can be read (or channel closed)
const x = await chan.read();
// exit loop if channel closed
if (x === undefined) break;
console.log("pong", x);
// write next value & wait until other side read it
await chan.write(x + 1);
}
console.log("pong done");
})();
// kickoff
chan.write(0);
// ping 0
// pong 1
// ping 2
// pong 3
// ping 4
// pong 5
// stopping...
// ping done
// pong done
`
`ts tangle:export/readme-pubsub.ts
import { channel, consumeWith, into, pubsub } from "@thi.ng/csp";
// input channel (optional)
const src = channel
// publisher with a topic function
// (topic here is the first character of each received string)
const pub = pubsub
// create topic subscriptions (channel & debug consumer)
// under the hood each topic is a Mult (multiplexed channel)
// subscription channels are automatically named:
// (see below)
for (let i of "abc") {
consumeWith(pub.subscribeTopic(i), (x, ch) => console.log(ch.id, x));
}
// start processing by feeding an iterable of names
await into(src, ["alice", "bert", "bella", "charlie", "arthur"]);
// users-a-tap0 alice
// users-b-tap1 bert
// users-b-tap1 bella
// users-c-tap2 charlie
// users-a-tap0 arthur
// pubsubs & mults are closed recursively once we close the input channel
src.close();
`
If this project contributes to an academic publication, please cite it as:
`bibtex``
@misc{thing-csp,
title = "@thi.ng/csp",
author = "Karsten Schmidt",
note = "https://thi.ng/csp",
year = 2016
}
© 2016 - 2026 Karsten Schmidt // Apache License 2.0