TypeScript client for the Durable Streams protocol
npm install @durable-streams/clientTypeScript client for the Durable Streams protocol.
``bash`
npm install @durable-streams/client
The Durable Streams client provides three main APIs:
1. stream() function - A fetch-like read-only API for consuming streams
2. DurableStream class - A handle for read/write operations on a stream
3. IdempotentProducer class - High-throughput producer with exactly-once write semantics (recommended for writes)
- Exactly-Once Writes: IdempotentProducer provides Kafka-style exactly-once semantics with automatic deduplicationstream()
- Automatic Batching: Multiple writes are automatically batched together for high throughput
- Pipelining: Up to 5 concurrent batches in flight by default for maximum throughput
- Streaming Reads: and DurableStream.stream() provide rich consumption options (promises, ReadableStreams, subscribers)
- Resumable: Offset-based reads let you resume from any point
- Real-time: Long-poll and SSE modes for live tailing with catch-up from any offset
The stream() function provides a simple, fetch-like interface for reading from streams:
`typescript
import { stream } from "@durable-streams/client"
// Connect and get a StreamResponse
const res = await stream<{ message: string }>({
url: "https://streams.example.com/my-account/chat/room-1",
headers: {
Authorization: Bearer ${process.env.DS_TOKEN!},
},
offset: savedOffset, // optional: resume from offset
live: true, // default: auto-select best live mode
})
// Accumulate all JSON items until up-to-date
const items = await res.json()
console.log("All items:", items)
// Or stream live with a subscriber
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log("item:", item)
saveOffset(batch.offset) // persist for resumption
}
})
`
The StreamResponse object returned by stream() offers multiple ways to consume data:
`typescript
// Promise helpers (accumulate until first upToDate)
const bytes = await res.body() // Uint8Array
const items = await res.json() // Array
const text = await res.text() // string
// ReadableStreams
const byteStream = res.bodyStream() // ReadableStream
const jsonStream = res.jsonStream() // ReadableStream
const textStream = res.textStream() // ReadableStream
// Subscribers (with backpressure)
const unsubscribe = res.subscribeJson(async (batch) => {
await processBatch(batch.items)
})
const unsubscribe2 = res.subscribeBytes(async (chunk) => {
await processBytes(chunk.data)
})
const unsubscribe3 = res.subscribeText(async (chunk) => {
await processText(chunk.text)
})
`
For reliable, high-throughput writes with exactly-once semantics, use IdempotentProducer:
`typescript
import { DurableStream, IdempotentProducer } from "@durable-streams/client"
const stream = await DurableStream.create({
url: "https://streams.example.com/events",
contentType: "application/json",
})
const producer = new IdempotentProducer(stream, "event-processor-1", {
autoClaim: true,
onError: (err) => console.error("Batch failed:", err), // Errors reported here
})
// Fire-and-forget - don't await, errors go to onError callback
for (const event of events) {
producer.append(event) // Objects serialized automatically for JSON streams
}
// IMPORTANT: Always flush before shutdown to ensure delivery
await producer.flush()
await producer.close()
`
For high-throughput scenarios, append() is fire-and-forget (returns immediately):
`typescript
// Fire-and-forget - errors reported via onError callback
for (const event of events) {
producer.append(event) // Returns void, adds to batch
}
// Always flush before shutdown to ensure delivery
await producer.flush()
`
Why use IdempotentProducer?
- Exactly-once delivery: Server deduplicates using (producerId, epoch, seq) tuple
- Automatic batching: Multiple writes batched into single HTTP requests
- Pipelining: Multiple batches in flight concurrently
- Zombie fencing: Stale producers are rejected, preventing split-brain scenarios
- Network resilience: Safe to retry on network errors (server deduplicates)
For simple write operations or when you need a persistent handle:
`typescript
import { DurableStream } from "@durable-streams/client"
// Create a new stream
const handle = await DurableStream.create({
url: "https://streams.example.com/my-account/chat/room-1",
headers: {
Authorization: Bearer ${process.env.DS_TOKEN!},
},
contentType: "application/json",
ttlSeconds: 3600,
})
// Append data (simple API without exactly-once guarantees)
await handle.append(JSON.stringify({ type: "message", text: "Hello" }), {
seq: "writer-1-000001",
})
// Read using the new stream() API
const res = await handle.stream<{ type: string; text: string }>()
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log("message:", item.text)
}
})
`
`typescriptBearer ${token}
// HEAD gives you the current tail offset if the server exposes it
const handle = await DurableStream.connect({
url,
headers: { Authorization: },
})
const { offset } = await handle.head()
// Read only new data from that point on
const res = await handle.stream({ offset })
res.subscribeBytes(async (chunk) => {
console.log("new data:", new TextDecoder().decode(chunk.data))
})
`
`typescript
// Read existing data only, stop when up-to-date
const res = await stream({
url: "https://streams.example.com/my-stream",
live: false,
})
const text = await res.text()
console.log("All existing data:", text)
`
Creates a fetch-like streaming session:
`typescript`
const res = await stream
url: string | URL, // Stream URL
headers?: HeadersRecord, // Headers (static or function-based)
params?: ParamsRecord, // Query params (static or function-based)
signal?: AbortSignal, // Cancellation
fetch?: typeof fetch, // Custom fetch implementation
backoffOptions?: BackoffOptions,// Retry backoff configuration
offset?: Offset, // Starting offset (default: start of stream)
live?: LiveMode, // Live mode (default: true)
json?: boolean, // Force JSON mode
onError?: StreamErrorHandler, // Error handler
})
`typescript
class DurableStream {
readonly url: string
readonly contentType?: string
constructor(opts: DurableStreamConstructorOptions)
// Static methods
static create(opts: CreateOptions): Promise
static connect(opts: DurableStreamOptions): Promise
static head(opts: DurableStreamOptions): Promise
static delete(opts: DurableStreamOptions): Promise
// Instance methods
head(opts?: { signal?: AbortSignal }): Promise
create(opts?: CreateOptions): Promise
delete(opts?: { signal?: AbortSignal }): Promise
close(opts?: CloseOptions): Promise
append(
body: BodyInit | Uint8Array | string,
opts?: AppendOptions
): Promise
appendStream(
source: AsyncIterable
opts?: AppendOptions
): Promise
// Fetch-like read API
stream
}
`
`typescript
// true (default): auto-select best live mode
// - SSE for JSON streams, long-poll for binary
// - Promise helpers (body/json/text): stop after upToDate
// - Streams/subscribers: continue with live updates
// false: catch-up only, stop at first upToDate
const res = await stream({ url, live: false })
// "long-poll": explicit long-poll mode for live updates
const res = await stream({ url, live: "long-poll" })
// "sse": explicit SSE mode for live updates
const res = await stream({ url, live: "sse" })
`
For binary content types (e.g., application/octet-stream), SSE mode requires the encoding option:
`typescript
const stream = await DurableStream.create({
url: "https://streams.example.com/my-binary-stream",
contentType: "application/octet-stream",
})
const response = await stream.read({
live: "sse",
encoding: "base64",
})
response.subscribe((chunk) => {
console.log(chunk.data) // Uint8Array - automatically decoded from base64
})
`
The client automatically decodes base64 data events before returning them. This is required for any content type other than text/* or application/json when using SSE mode.
Headers and params support both static values and functions (sync or async) for dynamic values like authentication tokens.
`typescript
// Static headers
{
headers: {
Authorization: "Bearer my-token",
"X-Custom-Header": "value",
}
}
// Function-based headers (sync)
{
headers: {
Authorization: () => Bearer ${getCurrentToken()},
"X-Tenant-Id": () => getCurrentTenant(),
}
}
// Async function headers (for refreshing tokens)
{
headers: {
Authorization: async () => {
const token = await refreshToken()
return Bearer ${token}
}
}
}
// Mix static and function headers
{
headers: {
"X-Static": "always-the-same",
Authorization: async () => Bearer ${await getToken()},
}
}
// Query params work the same way
{
params: {
tenant: "static-tenant",
region: () => getCurrentRegion(),
token: async () => await getSessionToken(),
}
}
`
`typescript
import { stream, FetchError, DurableStreamError } from "@durable-streams/client"
const res = await stream({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
onError: async (error) => {
if (error instanceof FetchError) {
if (error.status === 401) {
const newToken = await refreshAuthToken()
return { headers: { Authorization: Bearer ${newToken} } }Stream error: ${error.code}
}
}
if (error instanceof DurableStreamError) {
console.error()`
}
return {} // Retry with same params
},
})
The StreamResponse object provides multiple ways to consume stream data. All methods respect the live mode setting.
These methods accumulate data until the stream is up-to-date, then resolve.
#### body(): Promise
Accumulates all bytes until up-to-date.
`typescript
const res = await stream({ url, live: false })
const bytes = await res.body()
console.log("Total bytes:", bytes.length)
// Process as needed
const text = new TextDecoder().decode(bytes)
`
#### json(): Promise
Accumulates all JSON items until up-to-date. Only works with JSON content.
`typescript
const res = await stream<{ id: number; name: string }>({
url,
live: false,
})
const items = await res.json()
for (const item of items) {
console.log(User ${item.id}: ${item.name})`
}
#### text(): Promise
Accumulates all text until up-to-date.
`typescript`
const res = await stream({ url, live: false })
const text = await res.text()
console.log("Full content:", text)
Web Streams API for piping to other streams or using with streaming APIs. ReadableStreams can be consumed using either getReader() or for await...of syntax.
> Safari/iOS Compatibility: The client ensures all returned streams are async-iterable by defining [Symbol.asyncIterator] on stream instances when missing. This allows for await...of consumption without requiring a global polyfill, while preserving instanceof ReadableStream behavior..pipeThrough()
>
> Derived streams: Streams created via or similar transformations are NOT automatically patched. Use the exported asAsyncIterableReadableStream() helper:`
>
> typescript`
> import { asAsyncIterableReadableStream } from "@durable-streams/client"
>
> const derived = res.bodyStream().pipeThrough(myTransform)
> const iterable = asAsyncIterableReadableStream(derived)
> for await (const chunk of iterable) { ... }
>
#### bodyStream(): ReadableStream
Raw bytes as a ReadableStream.
Using getReader():
`typescript
const res = await stream({ url, live: false })
const readable = res.bodyStream()
const reader = readable.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log("Received:", value.length, "bytes")
}
`
Using for await...of:
`typescript
const res = await stream({ url, live: false })
for await (const chunk of res.bodyStream()) {
console.log("Received:", chunk.length, "bytes")
}
`
Piping to a file (Node.js):
`typescript
import { Readable } from "node:stream"
import { pipeline } from "node:stream/promises"
const res = await stream({ url, live: false })
await pipeline(
Readable.fromWeb(res.bodyStream()),
fs.createWriteStream("output.bin")
)
`
#### jsonStream(): ReadableStream
Individual JSON items as a ReadableStream.
Using getReader():
`typescript
const res = await stream<{ id: number }>({ url, live: false })
const readable = res.jsonStream()
const reader = readable.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log("Item:", value)
}
`
Using for await...of:
`typescript
const res = await stream<{ id: number; name: string }>({ url, live: false })
for await (const item of res.jsonStream()) {
console.log(User ${item.id}: ${item.name})`
}
#### textStream(): ReadableStream
Text chunks as a ReadableStream.
Using getReader():
`typescript
const res = await stream({ url, live: false })
const readable = res.textStream()
const reader = readable.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log("Text chunk:", value)
}
`
Using for await...of:
`typescript
const res = await stream({ url, live: false })
for await (const text of res.textStream()) {
console.log("Text chunk:", text)
}
`
Using with Response API:
`typescript`
const res = await stream({ url, live: false })
const textResponse = new Response(res.textStream())
const fullText = await textResponse.text()
Subscribers provide callback-based consumption with backpressure. The next chunk isn't fetched until your callback's promise resolves. Returns an unsubscribe function.
#### subscribeJson(callback): () => void
Subscribe to JSON batches with metadata. Provides backpressure-aware consumption.
`typescript
const res = await stream<{ event: string }>({ url, live: true })
const unsubscribe = res.subscribeJson(async (batch) => {
// Process items - next batch waits until this resolves
for (const item of batch.items) {
await processEvent(item)
}
await saveCheckpoint(batch.offset)
})
// Later: stop receiving updates
setTimeout(() => {
unsubscribe()
}, 60000)
`
#### subscribeBytes(callback): () => void
Subscribe to byte chunks with metadata.
`typescript
const res = await stream({ url, live: true })
const unsubscribe = res.subscribeBytes(async (chunk) => {
console.log("Received bytes:", chunk.data.length)
console.log("Offset:", chunk.offset)
console.log("Up to date:", chunk.upToDate)
await writeToFile(chunk.data)
await saveCheckpoint(chunk.offset)
})
`
#### subscribeText(callback): () => void
Subscribe to text chunks with metadata.
`typescript
const res = await stream({ url, live: true })
const unsubscribe = res.subscribeText(async (chunk) => {
console.log("Text:", chunk.text)
console.log("Offset:", chunk.offset)
await appendToLog(chunk.text)
})
`
#### cancel(reason?: unknown): void
Cancel the stream session. Aborts any pending requests.
`typescript
const res = await stream({ url, live: true })
// Start consuming
res.subscribeBytes(async (chunk) => {
console.log("Chunk:", chunk)
})
// Cancel after 10 seconds
setTimeout(() => {
res.cancel("Timeout")
}, 10000)
`
#### closed: Promise
Promise that resolves when the session is complete or cancelled.
`typescript
const res = await stream({ url, live: false })
// Start consuming in background
const consumer = res.text()
// Wait for completion
await res.closed
console.log("Stream fully consumed")
`
`typescript
const res = await stream({ url })
res.url // The stream URL
res.contentType // Content-Type from response headers
res.live // The live mode (true, "long-poll", "sse", or false)
res.startOffset // The starting offset passed to stream()
res.offset // Current offset (updates as data is consumed)
res.cursor // Cursor for collapsing (if provided by server)
res.upToDate // Whether we've caught up to the stream head
res.streamClosed // Whether the stream is permanently closed (EOF)
`
---
#### DurableStream.create(opts): Promise
Create a new stream on the server.
`typescript
const handle = await DurableStream.create({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
contentType: "application/json",
ttlSeconds: 3600, // Optional: auto-delete after 1 hour
})
await handle.append('{"hello": "world"}')
`
#### DurableStream.connect(opts): Promise
Connect to an existing stream (validates it exists via HEAD).
`typescript
const handle = await DurableStream.connect({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
})
console.log("Content-Type:", handle.contentType)
`
#### DurableStream.head(opts): Promise
Get stream metadata without creating a handle.
`typescript
const metadata = await DurableStream.head({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
})
console.log("Offset:", metadata.offset)
console.log("Content-Type:", metadata.contentType)
`
#### DurableStream.delete(opts): Promise
Delete a stream without creating a handle.
`typescript`
await DurableStream.delete({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
})
#### head(opts?): Promise
Get metadata for this stream.
`typescriptBearer ${token}
const handle = new DurableStream({
url,
headers: { Authorization: },
})
const metadata = await handle.head()
console.log("Current offset:", metadata.offset)
`
#### create(opts?): Promise
Create this stream on the server.
`typescriptBearer ${token}
const handle = new DurableStream({
url,
headers: { Authorization: },`
})
await handle.create({
contentType: "text/plain",
ttlSeconds: 7200,
})
#### delete(opts?): Promise
Delete this stream.
`typescriptBearer ${token}
const handle = new DurableStream({
url,
headers: { Authorization: },`
})
await handle.delete()
#### append(body, opts?): Promise
Append data to the stream. By default, automatic batching is enabled: multiple append() calls made while a POST is in-flight will be batched together into a single request. This significantly improves throughput for high-frequency writes.
`typescriptBearer ${token}
const handle = await DurableStream.connect({
url,
headers: { Authorization: },
})
// Append string
await handle.append("Hello, world!")
// Append with sequence number for ordering
await handle.append("Message 1", { seq: "writer-1-001" })
await handle.append("Message 2", { seq: "writer-1-002" })
// For JSON streams, append objects directly (serialized automatically)
await handle.append({ event: "click", x: 100, y: 200 })
// Batching happens automatically - these may be sent in a single request
await Promise.all([
handle.append({ event: "msg1" }),
handle.append({ event: "msg2" }),
handle.append({ event: "msg3" }),
])
`
Batching behavior:
- JSON mode (contentType: "application/json"): Multiple values are sent as a JSON array [val1, val2, ...]
- Byte mode: Binary data is concatenated
Disabling batching:
If you need to ensure each append is sent immediately (e.g., for precise timing or debugging):
`typescript`
const handle = new DurableStream({
url,
batching: false, // Disable automatic batching
})
#### appendStream(source, opts?): Promise
Append streaming data from an async iterable or ReadableStream. This method supports piping from any source.
`typescriptBearer ${token}
const handle = await DurableStream.connect({
url,
headers: { Authorization: },
})
// From async generator
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield Line ${i}\n
}
}
await handle.appendStream(generateData())
// From ReadableStream
const readable = new ReadableStream({
start(controller) {
controller.enqueue("chunk 1")
controller.enqueue("chunk 2")
controller.close()
},
})
await handle.appendStream(readable)
// Pipe from a fetch response body
const response = await fetch("https://example.com/data")
await handle.appendStream(response.body!)
`
#### writable(opts?): WritableStream
Create a WritableStream that can receive piped data. Useful for stream composition:
`typescript
const handle = await DurableStream.connect({ url, auth })
// Pipe from any ReadableStream
await someReadableStream.pipeTo(handle.writable())
// Pipe through a transform
const readable = inputStream.pipeThrough(new TextEncoderStream())
await readable.pipeTo(handle.writable())
`
#### stream(opts?): Promise
Start a read session (same as standalone stream() function).
`typescriptBearer ${token}
const handle = await DurableStream.connect({
url,
headers: { Authorization: },
})
const res = await handle.stream<{ message: string }>({
offset: savedOffset,
live: true,
})
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log(item.message)
}
})
`
---
The IdempotentProducer class provides Kafka-style exactly-once write semantics with automatic batching and pipelining.
`typescript`
new IdempotentProducer(stream: DurableStream, producerId: string, opts?: IdempotentProducerOptions)
Parameters:
- stream - The DurableStream to write toproducerId
- - Stable identifier for this producer (e.g., "order-service-1")opts
- - Optional configuration
Options:
`typescript`
interface IdempotentProducerOptions {
epoch?: number // Starting epoch (default: 0)
autoClaim?: boolean // On 403, retry with epoch+1 (default: false)
maxBatchBytes?: number // Max bytes before sending batch (default: 1MB)
lingerMs?: number // Max time to wait for more messages (default: 5ms)
maxInFlight?: number // Concurrent batches in flight (default: 5)
signal?: AbortSignal // Cancellation signal
fetch?: typeof fetch // Custom fetch implementation
onError?: (error: Error) => void // Error callback for batch failures
}
#### append(body): void
Append data to the stream (fire-and-forget). For JSON streams, you can pass objects directly.
Returns immediately after adding to the internal batch. Errors are reported via onError callback.
`typescript
// For JSON streams - pass objects directly
producer.append({ event: "click", x: 100 })
// Or strings/bytes
producer.append("message data")
producer.append(new Uint8Array([1, 2, 3]))
// All appends are fire-and-forget - use flush() to wait for delivery
await producer.flush()
`
#### flush(): Promise
Send any pending batch immediately and wait for all in-flight batches to complete.
`typescript`
// Always call before shutdown
await producer.flush()
#### close(finalMessage?): Promise
Flush pending messages and close the underlying stream (EOF). This is the typical way to end a producer session:
1. Flushes all pending messages
2. Optionally appends a final message atomically with close
3. Closes the stream (no further appends permitted by any producer)
Idempotent: Safe to retry on network failures - uses producer headers for deduplication.
`typescript
// Close stream (EOF)
const result = await producer.close()
console.log("Final offset:", result.finalOffset)
// Close with final message (atomic append + close)
const result = await producer.close('{"done": true}')
`
#### detach(): Promise
Stop the producer without closing the underlying stream. Use this when:
- Handing off writing to another producer
- Keeping the stream open for future writes
- Stopping this producer but not signaling EOF to readers
`typescript`
await producer.detach() // Stream remains open
#### restart(): Promise
Increment epoch and reset sequence. Call this when restarting the producer to establish a new session.
`typescript`
await producer.restart()
- epoch: number - Current epoch for this producernextSeq: number
- - Next sequence number to be assignedpendingCount: number
- - Messages in the current pending batchinFlightCount: number
- - Batches currently in flight
Errors are delivered via the onError callback since append() is fire-and-forget:
`typescript
import {
IdempotentProducer,
StaleEpochError,
SequenceGapError,
} from "@durable-streams/client"
const producer = new IdempotentProducer(stream, "my-producer", {
onError: (error) => {
if (error instanceof StaleEpochError) {
// Another producer has a higher epoch - this producer is "fenced"
console.log(Fenced by epoch ${error.currentEpoch})Expected seq ${error.expectedSeq}, got ${error.receivedSeq}
} else if (error instanceof SequenceGapError) {
// Sequence gap detected (should not happen with proper usage)
console.log()
}
},
})
producer.append("data") // Fire-and-forget, errors go to onError
await producer.flush() // Wait for all batches to complete
`
---
Durable Streams supports permanently closing streams to signal EOF (End of File). Once closed, no further appends are permitted, but data remains fully readable.
#### Using DurableStream.close()
`typescript
const stream = await DurableStream.connect({ url })
// Simple close (no final message)
const result = await stream.close()
console.log("Final offset:", result.finalOffset)
// Atomic append-and-close with final message
const result = await stream.close({
body: '{"status": "complete"}',
})
`
Options:
`typescript
interface CloseOptions {
body?: Uint8Array | string // Optional final message
contentType?: string // Content type (must match stream)
signal?: AbortSignal // Cancellation
}
interface CloseResult {
finalOffset: Offset // The offset after the last byte
}
`
Idempotency:
- close() without body: Idempotent — safe to call multiple timesclose({ body })
- with body: NOT idempotent — throws StreamClosedError if already closed. Use IdempotentProducer.close(finalMessage) for idempotent close-with-body.
#### Using IdempotentProducer.close()
For reliable close with final message (safe to retry):
`typescript
const producer = new IdempotentProducer(stream, "producer-1", {
autoClaim: true,
})
// Write some messages
producer.append('{"event": "start"}')
producer.append('{"event": "data"}')
// Close with final message (idempotent, safe to retry)
const result = await producer.close('{"event": "end"}')
`
Important: IdempotentProducer.close() closes the stream, not just the producer. Use detach() to stop the producer without closing the stream.
#### Creating Closed Streams
Create a stream that's immediately closed (useful for cached responses, errors, single-shot data):
`typescript
// Empty closed stream
const stream = await DurableStream.create({
url: "https://streams.example.com/cached-response",
contentType: "application/json",
closed: true,
})
// Closed stream with initial content
const stream = await DurableStream.create({
url: "https://streams.example.com/error-response",
contentType: "application/json",
body: '{"error": "Service unavailable"}',
closed: true,
})
`
#### Detecting Closure
The streamClosed property indicates when a stream is permanently closed:
`typescript
// StreamResponse properties
const res = await stream({ url, live: true })
console.log(res.streamClosed) // false initially
// In subscribers - batch/chunk metadata includes streamClosed
res.subscribeJson((batch) => {
console.log("Items:", batch.items)
console.log("Stream closed:", batch.streamClosed) // true when EOF reached
})
// In HEAD requests
const metadata = await stream.head()
console.log("Stream closed:", metadata.streamClosed)
`
#### Live Mode Behavior
When a stream is closed:
- Long-poll: Returns immediately with streamClosed: true (no waiting)streamClosed: true
- SSE: Sends in final control event, then closes connectionstreamClosed: true
- Subscribers: Receive final batch with , then stop
`typescript
const res = await stream({ url, live: true })
res.subscribeJson((batch) => {
for (const item of batch.items) {
process(item)
}
if (batch.streamClosed) {
console.log("Stream complete, no more data will arrive")
// Connection will close automatically
}
})
`
Attempting to append to a closed stream throws StreamClosedError:
`typescript
import { StreamClosedError } from "@durable-streams/client"
try {
await stream.append("data")
} catch (error) {
if (error instanceof StreamClosedError) {
console.log("Stream is closed at offset:", error.finalOffset)
}
}
`
---
Key types exported from the package:
- Offset - Opaque string for stream positionStreamResponse
- - Response object from stream() (includes streamClosed property)ByteChunk
- - { data: Uint8Array, offset: Offset, upToDate: boolean, streamClosed: boolean, cursor?: string }JsonBatch
- - { items: T[], offset: Offset, upToDate: boolean, streamClosed: boolean, cursor?: string }TextChunk
- - { text: string, offset: Offset, upToDate: boolean, streamClosed: boolean, cursor?: string }HeadResult
- - Metadata from HEAD requests (includes streamClosed property)CloseOptions
- - Options for closing a streamCloseResult
- - Result from closing a stream (includes finalOffset)IdempotentProducer
- - Exactly-once producer classStaleEpochError
- - Thrown when producer epoch is stale (zombie fencing)SequenceGapError
- - Thrown when sequence numbers are out of orderStreamClosedError
- - Thrown when attempting to append to a closed stream (includes finalOffset)DurableStreamError
- - Protocol-level errors with codesFetchError` - Transport/network errors
-
Apache-2.0