BlueSky firehose/WebSocket and CAR in plain JavaScript
npm install bskiParsing raw binary content of the realtime firehose from WebSocket, and CAR account repository snapshot data.
Self-contained, zero dependencies.
```
npm install bski
`JavaScript
import { firehose } from 'bski'; // import from npm
const chunk = [];
for await(const msg of firehose.each()) {
chunk.push(msg);
if(chunk.length === 1000) break;
}
`
)+%7B/++chunk.push(msg);/++if(chunk.length+===+1000)+break;/%7D//chunk/%60%60%60//)
`JavaScript
import { readCAR } from 'bski'; // import from npm
const did = 'did:plc:z72i7hdynmk6r22z27h6tvur';
const car = await fetch('https://puffball.us-east.host.bsky.network/xrpc/com.atproto.sync.getRepo?did=' + did)
.then(x => x.arrayBuffer());
const records = readCAR(did, car);
`
/++.then(x+=%3E+x.arrayBuffer());//const+records+=+readCAR(did,+car);//%60%60%60//)
`TypeScript`
firehose(address = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'):
AsyncIterable
Connects to a firehose via WebSocket (defaults to the central server https://bsky.network/
or potentially a local server i.e. PDS if the address parameter is provided).
Yields records in batches FirehoseRecord[].
Batching lets you consume records at your own speed. If you're iterating
and immediatelly processing — they will come in batches of one.
If your code stalls in process, they will queue up and next iteration will come
with whole pile at once.
Exiting the iterator loop disconnects from the WebSockets and discards any unprocessed records.
`TypeScript`
firehose.each(address?): AsyncIterable
Same as the firehose() above, but always reporting records one by one.
The queueing still happens behind the scene, but if your code stalls it will still
receive each record separately. That comes with a small performance penalty.
`TypeScript`
readCAR(messageBuf: ArrayBuffer | Uint8Array, did: string): FirehoseRepositoryRecord[]
Parses binary CAR/DAG/CBOR format that is the archive/database format for BlueSky account history.
The parser is pretty fast: 50Mb repository takes 1-2 seconds. However, for a web app that delay could be jarring. Enter sequenceReadCAR:
`TypeScript`
sequenceReadCAR(messageBuf: ArrayBuffer | Uint8Array, did: string):
Iterable
Parsing that binary, yielding the parsed records in implementation-defined batches.
This lets your code parse CAR even on the main
thread incrementally, without freezing the app.
Apart from capturing the built-in BlueSky fields,
both firehose and readCAR collect a couple extras:
* repo the DID of the account making the record (post/like etc.)
* uri standard at:// way of referring to events in ATProtocreate
* cid another standard identifier, a bit longer and less useful
* action mostly just but can also be delete or update` (think updating user profile)
* time BlueSky-observed time (different from self-reported time that can be spoofed by a poster)
* receiveTimestamp Unix-style time the message is received from WebSocket
* parseTime useful for tracking performance, averages to 0.3 millisecond
The firehose functionality existed in
colds.ky codebase for a while,
using some of the packages referenced by the official
@atproto/api:
* @ipld/car - Apache 2.0 and MIT
* cbor-x - MIT
* multiformats - Apache 2.0 and MIT
* cborg - Apache 2.0
* @ipld/dag-cbor - Apache 2.0 and MIT
* varint - MIT
But those are complex and broader-purpose libraries.
Later @mary.my.id created leaner,
more focused set of libraries to transcode some of the same formats, @atcute/* - MIT license.
And now this library here is taking in only few necessary bits,
focusing on singular use case: parsing realtime firehose,
and account repository CAR.
MIT Oleg Mihailik