Streaming response support for Flink Framework (SSE and NDJSON)
npm install @flink-app/streaming-pluginStreaming response support for Flink Framework, enabling Server-Sent Events (SSE) and NDJSON streaming for real-time data transmission.
- Server-Sent Events (SSE): Perfect for live updates, notifications, and dashboards
- NDJSON Streaming: Ideal for LLM chat streaming (OpenAI/Anthropic style)
- Type-Safe: Full TypeScript support with generic event types
- Simple API: Clean StreamWriter interface for writing data
- Connection Management: Automatic handling of client disconnections
- Flexible: Works alongside regular Flink handlers
``bash`
npm install @flink-app/streaming-plugin
`typescript
import { FlinkApp, FlinkContext } from "@flink-app/flink";
import { streamingPlugin, StreamingPluginContext } from "@flink-app/streaming-plugin";
// Define your app context with streaming plugin
interface AppContext extends FlinkContext<{ streaming: StreamingPluginContext }> {
// Your other context properties
}
const app = await new FlinkApp
name: "My App",
plugins: [streamingPlugin({ debug: true })],
}).start();
`
LLM-Style Chat Streaming (NDJSON):
`typescript
// src/streaming/GetChatStream.ts
import { StreamHandler, StreamingRouteProps } from "@flink-app/streaming-plugin";
export const Route: StreamingRouteProps = {
path: "/chat/stream",
format: "ndjson", // Use NDJSON for LLM streaming
skipAutoRegister: true, // Required for streaming handlers
};
interface ChatEvent {
delta: string;
done?: boolean;
}
const GetChatStream: StreamHandler
const prompt = req.query.prompt as string;
// Call your LLM API
for await (const chunk of callLLMAPI(prompt)) {
stream.write({
delta: chunk.text,
});
}
stream.write({ delta: "", done: true });
stream.end();
};
export default GetChatStream;
`
Live Updates (SSE):
`typescript
// src/streaming/GetLiveUpdates.ts
import { StreamHandler, StreamingRouteProps } from "@flink-app/streaming-plugin";
export const Route: StreamingRouteProps = {
path: "/live-updates",
format: "sse", // Use SSE for live updates
skipAutoRegister: true,
};
interface UpdateEvent {
type: "update" | "notification";
message: string;
timestamp: number;
}
const GetLiveUpdates: StreamHandler
// Send updates periodically
const interval = setInterval(() => {
if (!stream.isOpen()) {
clearInterval(interval);
return;
}
stream.write({
type: "update",
message: "New data available",
timestamp: Date.now(),
});
}, 1000);
};
export default GetLiveUpdates;
`
Option 1: Via context (recommended)
`typescript
// src/index.ts
import * as GetChatStream from "./streaming/GetChatStream";
import * as GetLiveUpdates from "./streaming/GetLiveUpdates";
// After app.start() - access via ctx.plugins.streaming
app.ctx.plugins.streaming.registerStreamHandler(GetChatStream);
app.ctx.plugins.streaming.registerStreamHandler(GetLiveUpdates);
`
Option 2: Via plugin instance
`typescript
// If you prefer to keep a reference to the plugin instance
const streaming = streamingPlugin({ debug: true });
const app = await new FlinkApp({
plugins: [streaming],
}).start();
// Register after app.start()
streaming.registerStreamHandler(GetChatStream);
streaming.registerStreamHandler(GetLiveUpdates);
`
Option 3: Explicit registration
`typescript`
// For inline handlers without separate files
app.ctx.plugins.streaming.registerStreamHandler(
async ({ stream }) => {
stream.write({ message: "Hello" });
stream.end();
},
{ path: "/custom", format: "sse", skipAutoRegister: true }
);
`typescript/chat/stream?prompt=${encodeURIComponent(prompt)}
async function streamChat(prompt: string) {
const response = await fetch();
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || ""; // Keep incomplete line
for (const line of lines) {
if (line.trim()) {
const event = JSON.parse(line);
console.log("Delta:", event.delta);
if (event.done) {
console.log("Stream complete");
return;
}
}
}
}
}
`
`typescript
const eventSource = new EventSource("/live-updates");
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("Update:", data);
};
eventSource.addEventListener("error", (event) => {
const error = JSON.parse(event.data);
console.error("Error:", error.message);
});
// Close connection when done
eventSource.close();
`
Creates a streaming plugin instance.
Options:
- defaultFormat?: 'sse' | 'ndjson' - Default format if not specified (default: 'sse')debug?: boolean
- - Enable debug logging (default: false)
Handler function type for streaming endpoints.
Type Parameters:
- Ctx - Your application context typeT
- - Event data type
Props:
- req: FlinkRequest - Flink request objectctx: Ctx
- - Application contextstream: StreamWriter
- - Stream writer for sending dataorigin?: string
- - Route origin
Interface for writing data to streams.
Methods:
- write(data: T): void - Write data to the streamerror(error: Error | string): void
- - Send error eventend(): void
- - Close the streamisOpen(): boolean
- - Check if connection is still open
Route configuration for streaming endpoints.
Properties:
- path: string - HTTP path (required)method?: HttpMethod
- - HTTP method (default: GET)format?: 'sse' | 'ndjson'
- - Streaming format (default: plugin default)skipAutoRegister: true
- - Must be true (required)permissions?: string | string[]
- - Route permissions (uses Flink auth plugin)origin?: string
- - Route origin (optional)
When to use:
- Live dashboards and real-time updates
- Notifications and alerts
- Progress indicators
- Traditional one-way server → client streaming
Format:
``
data: {"message":"Hello"}\n\n
data: {"message":"World"}\n\n
Headers:
- Content-Type: text/event-streamCache-Control: no-cache
- Connection: keep-alive
-
When to use:
- LLM chat streaming (OpenAI/Anthropic style)
- Bulk data export
- Log streaming
- Any streaming JSON data
Format:
``
{"delta":"Hello"}\n
{"delta":" world","done":true}\n
Headers:
- Content-Type: application/x-ndjsonCache-Control: no-cache
-
Streaming handlers support Flink's authentication system just like regular handlers. Simply add permissions to your route configuration:
`typescript
export const Route: StreamingRouteProps = {
path: "/admin/stream",
format: "sse",
skipAutoRegister: true,
permissions: ["admin"], // Requires admin role
};
const AdminStream: StreamHandler
// ✅ req.user is populated by auth plugin after successful authentication
const user = req.user; // Type depends on your auth plugin
stream.write({
message: Hello ${user?.name},`
userId: user?.userId,
permissions: user?.permissions,
});
stream.end();
};
How it works:
1. Plugin checks routeProps.permissions before starting the streamreq.user
2. Calls your configured auth plugin (JWT, BankID, OAuth, etc.)
3. Returns 401 if authentication fails
4. ✅ Authenticated user info is available in (populated by your auth plugin)
Example with JWT:
`typescript
import { jwtAuthPlugin } from "@flink-app/jwt-auth-plugin";
const app = new FlinkApp({
auth: jwtAuthPlugin({ secret: "your-secret" }),
plugins: [streaming],
});
// Client must send Authorization header
fetch("/admin/stream", {
headers: {
Authorization: "Bearer your-jwt-token",
},
});
`
Always check if the stream is still open before writing:
`typescript`
if (stream.isOpen()) {
stream.write(data);
}
Clean up intervals, timers, and resources when the connection closes:
`typescript`
const interval = setInterval(() => {
if (!stream.isOpen()) {
clearInterval(interval);
return;
}
stream.write(data);
}, 1000);
Always wrap streaming logic in try-catch:
`typescript`
const handler: StreamHandler
try {
// Your streaming logic
} catch (err) {
stream.error(err);
stream.end();
}
};
Leverage TypeScript for type-safe events:
`typescript
interface ChatEvent {
delta: string;
done?: boolean;
metadata?: {
model: string;
tokens: number;
};
}
const handler: StreamHandler
stream.write({
delta: "Hello",
metadata: { model: "gpt-4", tokens: 5 },
}); // ✅ Type-safe
// stream.write({ invalid: "data" }); // ❌ Type error
};
`
Unlike regular Flink handlers, streaming handlers must be registered manually after app.start():
`typescript
// ❌ Won't auto-register
export const Route: StreamingRouteProps = {
path: "/stream",
skipAutoRegister: true, // Required
};
// ✅ Must register manually
streaming.registerStreamHandler(handler, Route);
`
This is a trade-off for zero core framework changes and allows the plugin to work independently.
IMPORTANT: Streaming handlers MUST be placed outside the src/handlers/ directory.
`typescript
// ❌ WRONG - Will cause TypeScript compilation error
src/handlers/streaming/GetChatStream.ts
// ✅ CORRECT - Place outside handlers directory
src/streaming/GetChatStream.ts
`
Why? Flink's TypeScript compiler automatically scans src/handlers/ for auto-registration and attempts to analyze all handler types at compile-time. Since it doesn't recognize the StreamHandler type from this plugin, it will throw an error:
``
Error: Unknown handler type StreamHandler in GetChatStream.ts - should be Handler or GetHandler
Recommended locations:
- src/streaming/ - Dedicated directory for streaming handlers (recommended)src/streams/
- - Alternative namingsrc/handlers/
- Any directory outside
Streaming responses bypass Flink's JSON schema validation since data is sent incrementally. Ensure your event types are well-defined and validated manually if needed.
Be mindful of:
- Server connection limits (max concurrent streams)
- Client browser limits (typically 6 connections per domain)
- Load balancer timeout settings
See the demo-app for complete working examples:
- /chat/stream - NDJSON streaming chat/live-updates` - SSE live updates
-
Potential future enhancements:
- [ ] Auto-registration support (requires core framework changes)
- [ ] Binary streaming support
- [ ] WebSocket integration
- [ ] Compression support (gzip)
- [ ] Rate limiting/backpressure
- [ ] Metrics and monitoring hooks
MIT