AI SDK: Filter and transform UI messages while streaming to the client
npm install ai-stream-utilsThis library provides composable stream transformation and filter utilities for UI message streams created by streamText() in the AI SDK.
The AI SDK UI message stream created by toUIMessageStream() streams all parts (text, tools, reasoning, etc.) to the client by default. However, you may want to:
- Filter: Tool calls like database queries often contain large amounts of data or sensitive information that should not be visible on the client
- Transform: Modify text or tool outputs while they are streamed to the client
This library provides type-safe, composable utilities for all these use cases.
This library only supports AI SDK v5.
``bash`
npm install ai-stream-utils
| Function | Input | Returns | Use Case |
|----------|-------------|---------|----------|
| mapUIMessageStream | UIMessageChunk | chunk \| chunk[] \| null | Transform or filter chunks in real-time (e.g., smooth streaming) |flatMapUIMessageStream
| | UIMessagePart | part \| part[] \| null | Buffer until complete, then transform (e.g., redact tool output) |filterUIMessageStream
| | UIMessageChunk | boolean | Include/exclude parts by type (e.g., hide reasoning) |
The mapUIMessageStream function operates on chunks and can be used to transform or filter individual chunks as they stream through. It receives the current chunk and the partial part representing all already processed chunks.
`typescript
import { mapUIMessageStream } from 'ai-stream-utils';
const stream = mapUIMessageStream(
result.toUIMessageStream
({ chunk, part }) => {
// Transform: modify the chunk
if (chunk.type === 'text-delta') {
return { ...chunk, delta: chunk.delta.toUpperCase() };
}
// Filter: return null to exclude chunks
if (part.type === 'tool-weather') {
return null;
}
return chunk;
}
);
`
The flatMapUIMessageStream function operates on parts. It buffers all chunks of a particular type (e.g. text parts) until the part is complete and then transforms or filters the complete part. The optional predicate partTypeIs() can be used to selectively buffer only specific parts while streaming others through immediately.
`typescript
import { flatMapUIMessageStream, partTypeIs } from 'ai-stream-utils';
const stream = flatMapUIMessageStream(
result.toUIMessageStream
// Predicate to only buffer tool-weather parts and pass through other parts
partTypeIs('tool-weather'),
({ part }) => {
// Transform: modify the complete part
if (part.state === 'output-available') {
return { ...part, output: { ...part.output, temperature: toFahrenheit(part.output.temperature) } };
}
// Filter: return null to exclude parts
return part;
}
);
`
The filterUIMessageStream function is a convenience function around mapUIMessageStream with a simpler API to filter chunks by part type. It provides the includeParts() and excludeParts() predicates for common patterns.
`typescript
import { filterUIMessageStream, includeParts, excludeParts } from 'ai-stream-utils';
// Include only specific parts
const stream = filterUIMessageStream(
result.toUIMessageStream
includeParts(['text', 'tool-weather'])
);
// Exclude specific parts
const stream = filterUIMessageStream(
result.toUIMessageStream
excludeParts(['reasoning', 'tool-database'])
);
// Custom filter function
const stream = filterUIMessageStream(
result.toUIMessageStream
({ part, chunk }) => {
if (part.type === 'text') return true;
if (chunk.type === 'tool-input-available') return true;
return false;
}
);
`
Buffers multiple text chunks into a string, splits at word boundaries and re-emits each word as a separate chunk for smoother UI rendering. See examples/smooth-streaming.ts for the full implementation.
`typescript
import { mapUIMessageStream } from 'ai-stream-utils';
const WORD_REGEX = /\S+\s+/m;
let buffer = '';
const smoothedStream = mapUIMessageStream(
result.toUIMessageStream(),
({ chunk }) => {
if (chunk.type !== 'text-delta') {
// Flush buffer on non-text chunks
if (buffer.length > 0) {
const flushed = { type: 'text-delta' as const, id: chunk.id, delta: buffer };
buffer = '';
return [flushed, chunk];
}
return chunk;
}
// Append the text delta to the buffer
buffer += chunk.delta;
const chunks = [];
let match;
while ((match = WORD_REGEX.exec(buffer)) !== null) {
chunks.push({ type: 'text-delta', id: chunk.id, delta: buffer.slice(0, match.index + match[0].length) });
buffer = buffer.slice(match.index + match[0].length);
}
// Emit the word-by-word chunks
return chunks;
}
);
// Output: word-by-word streaming
// { type: 'text-delta', delta: 'Why ' }
// { type: 'text-delta', delta: "don't " }
// { type: 'text-delta', delta: 'scientists ' }
`
Buffer tool calls until complete, then redact sensitive fields before streaming to the client. See examples/order-lookup.ts for the full example.
`typescript
import { flatMapUIMessageStream, partTypeIs } from 'ai-stream-utils';
const tools = {
lookupOrder: tool({
description: 'Look up order details by order ID',
inputSchema: z.object({
orderId: z.string().describe('The order ID to look up'),
}),
execute: ({ orderId }) => ({
orderId,
status: 'shipped',
items: ['iPhone 15'],
total: 1299.99,
email: 'customer@example.com', // Sensitive
address: '123 Main St, SF, CA 94102', // Sensitive
}),
}),
};
const result = streamText({
model: openai('gpt-4o'),
prompt: 'Where is my order #12345?',
tools,
});
// Buffer tool-lookupOrder parts, stream text parts immediately
const redactedStream = flatMapUIMessageStream(
result.toUIMessageStream
partTypeIs('tool-lookupOrder'),
({ part }) => {
if (part.state === 'output-available') {
return {
...part,
output: {
...part.output,
email: '[REDACTED]',
address: '[REDACTED]',
},
};
}
return part;
},
);
// Text streams immediately, tool output is redacted:
// { type: 'text-delta', delta: 'Let me look that up...' }
// { type: 'tool-output-available', output: { orderId: '12345', email: '[REDACTED]', address: '[REDACTED]' } }
`
Inspect previously streamed parts to conditionally inject new parts. This example creates a text part from a tool call message if the model didn't generate one. See examples/ask-permission.ts for the full example.
`typescript
import { flatMapUIMessageStream, partTypeIs } from 'ai-stream-utils';
const tools = {
askForPermission: tool({
description: 'Ask for permission to access current location',
inputSchema: z.object({
message: z.string().describe('The message to ask for permission'),
}),
}),
};
const result = streamText({
model: openai('gpt-4o'),
prompt: 'Is it sunny today?',
tools,
});
// Buffer askForPermission tool calls, check if text was already generated
const stream = flatMapUIMessageStream(
result.toUIMessageStream
partTypeIs('tool-askForPermission'),
(current, context) => {
if (current.part.state === 'input-available') {
// Check if a text part was already streamed
const hasTextPart = context.parts.some((p) => p.type === 'text');
if (!hasTextPart) {
// Inject a text part from the tool call message
return [
{ type: 'text', text: current.part.input.message },
current.part,
];
}
}
return current.part;
},
);
// If model only generated tool call, we inject the text:
// { type: 'text', text: 'May I access your location?' }
// { type: 'tool-askForPermission', input: { message: 'May I access your location?' } }
`
Transform tool outputs on-the-fly, such as converting temperature units. See examples/weather.ts for the full example.
`typescript
import { flatMapUIMessageStream, partTypeIs } from 'ai-stream-utils';
const toFahrenheit = (celsius: number) => (celsius * 9) / 5 + 32;
const tools = {
weather: tool({
description: 'Get the weather in a location',
inputSchema: z.object({ location: z.string() }),
execute: ({ location }) => ({
location,
temperature: 22, // Celsius from API
unit: 'C',
}),
}),
};
const result = streamText({
model: openai('gpt-4o'),
prompt: 'What is the weather in Tokyo?',
tools,
});
// Convert Celsius to Fahrenheit before streaming to client
const stream = flatMapUIMessageStream(
result.toUIMessageStream
partTypeIs('tool-weather'),
({ part }) => {
if (part.state === 'output-available') {
return {
...part,
output: {
...part.output,
temperature: toFahrenheit(part.output.temperature),
unit: 'F',
},
};
}
return part;
},
);
// Output is converted:
// { type: 'tool-output-available', output: { location: 'Tokyo', temperature: 71.6, unit: 'F' } }
`
Helper functions for converting between streams, arrays, and async iterables.
| Function | Converts | To |
|----------|----------|-----|
| createAsyncIterableStream | ReadableStream | AsyncIterableStream |convertArrayToStream
| | Array | ReadableStream |convertAsyncIterableToStream
| | AsyncIterable | ReadableStream |convertAsyncIterableToArray
| | AsyncIterable | Promise |convertStreamToArray
| | ReadableStream | Promise |convertUIMessageToSSEStream
| | ReadableStream | ReadableStream |convertSSEToUIMessageStream
| | ReadableStream | ReadableStream |
Adds async iterator protocol to a ReadableStream, enabling for await...of loops.
`typescript
import { createAsyncIterableStream } from 'ai-stream-utils/utils';
const asyncStream = createAsyncIterableStream(readableStream);
for await (const chunk of asyncStream) {
console.log(chunk);
}
`
Converts an array to a ReadableStream that emits each element.
`typescript
import { convertArrayToStream } from 'ai-stream-utils/utils';
const stream = convertArrayToStream([1, 2, 3]);
`
Converts an async iterable (e.g., async generator) to a ReadableStream.
`typescript
import { convertAsyncIterableToStream } from 'ai-stream-utils/utils';
async function* generator() {
yield 1;
yield 2;
}
const stream = convertAsyncIterableToStream(generator());
`
Collects all values from an async iterable into an array.
`typescript
import { convertAsyncIterableToArray } from 'ai-stream-utils/utils';
const array = await convertAsyncIterableToArray(asyncIterable);
`
Consumes a ReadableStream and collects all chunks into an array.
`typescript
import { convertStreamToArray } from 'ai-stream-utils/utils';
const array = await convertStreamToArray(readableStream);
`
Converts a UI message stream to an SSE (Server-Sent Events) stream. Useful for sending UI message chunks over HTTP as SSE-formatted text.
`typescript
import { convertUIMessageToSSEStream } from 'ai-stream-utils/utils';
const uiStream = result.toUIMessageStream();
const sseStream = convertUIMessageToSSEStream(uiStream);
// Output format: "data: {...}\n\n" for each chunk
`
Converts an SSE stream back to a UI message stream. Useful for parsing SSE-formatted responses on the client.
`typescript
import { convertSSEToUIMessageStream } from 'ai-stream-utils/utils';
const response = await fetch('/api/chat');
const sseStream = response.body.pipeThrough(new TextDecoderStream());
const uiStream = convertSSEToUIMessageStream(sseStream);
`
The toUIMessageStream() from streamText() returns a generic ReadableStream, which means the part types cannot be inferred automatically.
To enable autocomplete and type-safety, pass your UIMessage type as a generic parameter:
`typescript
import type { UIMessage, InferUITools } from 'ai';
type MyUIMessageMetadata = {};
type MyDataPart = {};
type MyTools = InferUITools
type MyUIMessage = UIMessage<
MyUIMessageMetadata,
MyDataPart,
MyTools
>;
// Use MyUIMessage type when creating the UI message stream
const uiStream = result.toUIMessageStream
// Type-safe filtering with autocomplete
const stream = filterUIMessageStream(
uiStream,
includeParts(['text', 'tool-weather']) // Autocomplete works!
);
// Type-safe chunk mapping
const stream = mapUIMessageStream(
uiStream,
({ chunk, part }) => {
// part.type is typed based on MyUIMessage
return chunk;
}
);
`
The transformed stream has the same type as the original UI message stream. You can consume it with useChat() or readUIMessageStream().
Since message parts may be different on the client vs. the server, you may need to reconcile message parts when the client sends messages back to the server.
If you save messages to a database and configure useChat() to only send the last message, you can read existing messages from the database. This means the model will have access to all message parts, including filtered parts not available on the client.
The transformations operate on UIMessagePart types, which are derived from UIMessageChunk types:
| Part Type | Chunk Types |
| ----------------- | ------------------------------------- |
| text | text-start, text-delta, text-end |reasoning
| | reasoning-start, reasoning-delta, reasoning-end |tool-{name}
| | tool-input-start, tool-input-delta, tool-input-available, tool-input-error, tool-output-available, tool-output-error |data-{name}
| | data-{name} |step-start
| | start-step |file
| | file |source-url
| | source-url |source-document
| | source-document |
Control chunks always pass through regardless of filter/transform settings:
- start: Stream start markerfinish
- : Stream finish markerabort
- : Stream abort markermessage-metadata
- : Message metadata updateserror
- : Error messages
Step boundaries are handled automatically:
1. start-step is buffered until the first content chunk is encounteredstart-step
2. If the first content chunk passes through, is includedstart-step
3. If the first content chunk is filtered out, is also filtered outfinish-step
4. is only included if the corresponding start-step was included
`typescript
function mapUIMessageStream
stream: ReadableStream
mapFn: MapUIMessageStreamFn
): AsyncIterableStream
type MapUIMessageStreamFn
input: MapInput
) => InferUIMessageChunk
type MapInput
chunk: InferUIMessageChunk
part: InferUIMessagePart
};
`
`typescript
// Without predicate - buffer all parts
function flatMapUIMessageStream
stream: ReadableStream
flatMapFn: FlatMapUIMessageStreamFn
): AsyncIterableStream
// With predicate - buffer only matching parts, pass through others
function flatMapUIMessageStream
stream: ReadableStream
predicate: FlatMapUIMessageStreamPredicate
flatMapFn: FlatMapUIMessageStreamFn
): AsyncIterableStream
type FlatMapUIMessageStreamFn
input: FlatMapInput
context: FlatMapContext
) => InferUIMessagePart
type FlatMapInput
part: PART;
};
type FlatMapContext
index: number;
parts: InferUIMessagePart
};
`
#### partTypeIs
`typescript
function partTypeIs
type: T | T[],
): FlatMapUIMessageStreamPredicate
type FlatMapUIMessageStreamPredicate
(part: InferUIMessagePart
`
`typescript
function filterUIMessageStream
stream: ReadableStream
filterFn: FilterUIMessageStreamPredicate
): AsyncIterableStream
type FilterUIMessageStreamPredicate
input: MapInput
context: MapContext
) => boolean;
`
#### includeParts
`typescript`
function includeParts
partTypes: Array
): FilterUIMessageStreamPredicate
#### excludeParts
`typescript``
function excludeParts
partTypes: Array
): FilterUIMessageStreamPredicate