Server-Sent Events (SSE) support for Fetcher HTTP client with native LLM streaming API support. Enables real-time data streaming and token-by-token LLM response handling.
npm install @ahoo-wang/fetcher-eventstream







为 Fetcher 提供 text/event-stream 支持,实现服务器发送事件(SSE)功能,用于实时数据流。
- 📡 事件流转换:将 text/event-stream 响应转换为 ServerSentEvent 对象的异步生成器
- 🔌 自动扩展:模块导入时自动扩展 Response 原型,添加事件流方法
- 📋 SSE 解析:根据规范解析服务器发送事件,包括数据、事件、ID 和重试字段
- 🔄 流支持:正确处理分块数据和多行事件
- 💬 注释处理:正确忽略注释行(以 : 开头的行)
- 🛡️ TypeScript 支持:完整的 TypeScript 类型定义
- ⚡ 性能优化:高效的解析和流处理,适用于高性能应用
- 🤖 LLM 流准备就绪: 原生支持来自流行 LLM API(如 OpenAI GPT、Claude 等)的流式响应
- 🔚 流终止检测:自动流终止检测,实现干净的资源管理和完成处理
``bash使用 npm
npm install @ahoo-wang/fetcher-eventstream
$3
要使用事件流功能,您需要导入模块以执行其副作用:
`typescript
import '@ahoo-wang/fetcher-eventstream';
`此导入会自动扩展
Response 接口以处理服务器发送事件流:-
eventStream() - 将带有 text/event-stream 内容类型的响应转换为 ServerSentEventStream
- jsonEventStream() - 将带有 text/event-stream 内容类型的响应转换为 JsonServerSentEventStream
- isEventStream getter - 检查响应是否具有 text/event-stream 内容类型
- requiredEventStream() - 获取 ServerSentEventStream,如果不可用则抛出错误
- requiredJsonEventStream() - 获取 JsonServerSentEventStream,如果不可用则抛出错误这是 JavaScript/TypeScript 中常见的模式,用于在不修改原始类型定义的情况下扩展现有类型的功能。
$3
以下示例展示了如何创建带事件流支持的 LLM 客户端,类似于 Fetcher
项目中的集成测试。您可以在 integration-test/src/eventstream/llmClient.ts
中找到完整实现。
这个示例演示了如何使用 Fetcher 的流式传输功能与流行的 LLM API(如 OpenAI 的 GPT 模型)进行交互。
`typescript
import {
BaseURLCapable,
ContentTypeValues,
FetchExchange,
NamedFetcher,
REQUEST_BODY_INTERCEPTOR_ORDER,
RequestInterceptor,
} from '@ahoo-wang/fetcher';
import {
api,
autoGeneratedError,
body,
post,
ResultExtractors,
} from '@ahoo-wang/fetcher-decorator';
import '@ahoo-wang/fetcher-eventstream';
import { JsonServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
import { ChatRequest, ChatResponse } from './types';export const llmFetcherName = 'llm';
export interface LlmOptions extends BaseURLCapable {
apiKey: string;
model?: string;
}
export class LlmRequestInterceptor implements RequestInterceptor {
readonly name: string = 'LlmRequestInterceptor';
readonly order: number = REQUEST_BODY_INTERCEPTOR_ORDER - 1;
constructor(private llmOptions: LlmOptions) {}
intercept(exchange: FetchExchange): void {
const chatRequest = exchange.request.body as ChatRequest;
if (!chatRequest.model) {
chatRequest.model = this.llmOptions.model;
}
}
}
export function createLlmFetcher(options: LlmOptions): NamedFetcher {
const llmFetcher = new NamedFetcher(llmFetcherName, {
baseURL: options.baseURL,
headers: {
Authorization:
Bearer ${options.apiKey},
'Content-Type': ContentTypeValues.APPLICATION_JSON,
},
});
llmFetcher.interceptors.request.use(new LlmRequestInterceptor(options));
return llmFetcher;
}@api('/chat', {
fetcher: llmFetcherName,
resultExtractor: ResultExtractors.JsonEventStream,
})
export class LlmClient {
@post('/completions')
streamChat(
@body() body: ChatRequest,
): Promise> {
throw autoGeneratedError(body);
}
@post('/completions', { resultExtractor: ResultExtractors.Json })
chat(@body() body: ChatRequest): Promise {
throw autoGeneratedError(body);
}
}
`#### 使用 streamChat 进行实时响应
以下是使用
streamChat 方法从 LLM API 获取实时响应的示例:`typescript
import { createLlmFetcher, LlmClient } from './llmClient';// 使用您的 API 配置初始化 LLM 客户端
const llmFetcher = createLlmFetcher({
baseURL: 'https://api.openai.com/v1', // OpenAI 示例
apiKey: process.env.OPENAI_API_KEY || 'your-api-key',
model: 'gpt-3.5-turbo', // 默认模型
});
// 创建客户端实例
const llmClient = new LlmClient();
// 示例:实时流式传输聊天完成响应
async function streamChatExample() {
try {
// 流式传输响应,逐个令牌接收
const stream = await llmClient.streamChat({
messages: [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: 'Explain quantum computing in simple terms.' },
],
model: 'gpt-3.5-turbo', // 如需要可覆盖默认模型
stream: true, // 启用流式传输
});
// 处理流式响应
let fullResponse = '';
for await (const event of stream) {
// 每个事件包含部分响应
if (event.data) {
const chunk = event.data;
const content = chunk.choices[0]?.delta?.content || '';
fullResponse += content;
console.log('新令牌:', content);
// 在令牌到达时实时更新 UI
updateUI(content);
}
}
console.log('完整响应:', fullResponse);
} catch (error) {
console.error('流式聊天错误:', error);
}
}
// 辅助函数模拟 UI 更新
function updateUI(content: string) {
// 在实际应用中,这将更新您的 UI
process.stdout.write(content);
}
`$3
`typescript
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// 在响应中使用 eventStream 方法处理 text/event-stream 内容类型
// Response 对象将自动具有 eventStream() 和 jsonEventStream() 方法
const response = await fetcher.get('/events');
for await (const event of response.requiredEventStream()) {
console.log('收到事件:', event);
}
// 使用 jsonEventStream 方法处理 JSON 数据
const jsonResponse = await fetcher.get('/json-events');
for await (const event of response.requiredJsonEventStream()) {
console.log('收到 JSON 事件:', event.data);
}
`$3
`typescript
import { Fetcher } from '@ahoo-wang/fetcher';
import {
toJsonServerSentEventStream,
type TerminateDetector,
} from '@ahoo-wang/fetcher-eventstream';const fetcher = new Fetcher({
baseURL: 'https://api.openai.com/v1',
});
// 定义 OpenAI 风格完成的终止检测器
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
// 获取原始事件流
const response = await fetcher.post('/chat/completions', {
body: {
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: '你好!' }],
stream: true,
},
});
// 转换为带自动终止的类型化 JSON 流
const jsonStream = toJsonServerSentEventStream(
response.requiredEventStream(),
terminateOnDone,
);
// 处理带自动终止的流式响应
for await (const event of jsonStream) {
const content = event.data.choices[0]?.delta?.content;
if (content) {
console.log('令牌:', content);
// 当收到 '[DONE]' 时流会自动终止
}
}
`$3
`typescript
import { toServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';// 手动转换 Response 对象
const response = await fetch('/events');
const eventStream = toServerSentEventStream(response);
// 从流中读取事件
const reader = eventStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log('收到事件:', value);
}
} finally {
reader.releaseLock();
}
`📚 API 参考
$3
要使用事件流功能,您需要导入模块以执行其副作用:
`typescript
import '@ahoo-wang/fetcher-eventstream';
`此导入会自动扩展全局
Response 接口以处理服务器发送事件流:-
eventStream() - 将带有 text/event-stream 内容类型的响应转换为 ServerSentEventStream
- jsonEventStream() - 将带有 text/event-stream 内容类型的响应转换为 JsonServerSentEventStream
- isEventStream getter - 检查响应是否具有 text/event-stream 内容类型
- requiredEventStream() - 获取 ServerSentEventStream,如果不可用则抛出错误
- requiredJsonEventStream() - 获取 JsonServerSentEventStream,如果不可用则抛出错误这是 JavaScript/TypeScript 中常见的模式,用于在不修改原始类型定义的情况下扩展现有类型的功能。
在集成测试和实际应用中,此导入对于处理事件流至关重要。例如:
`typescript
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// Response 对象将自动具有 eventStream() 和 jsonEventStream() 方法
const response = await fetcher.get('/events');
// 处理事件流
for await (const event of response.requiredEventStream()) {
console.log('收到事件:', event);
}
`$3
将
ServerSentEventStream 转换为 JsonServerSentEventStream,用于处理带有 JSON 数据的服务器发送事件。可选支持流终止检测以实现自动流关闭。#### 签名
`typescript
function toJsonServerSentEventStream(
serverSentEventStream: ServerSentEventStream,
terminateDetector?: TerminateDetector,
): JsonServerSentEventStream;
`#### 参数
-
serverSentEventStream:要转换的 ServerSentEventStream
- terminateDetector:可选的函数,用于检测何时应该终止流。当提供时,当检测器对某个事件返回 true 时,流将自动关闭。#### 返回
-
JsonServerSentEventStream:带有 JSON 数据的 ServerSentEvent 对象的可读流#### 示例
`typescript
// 基本用法,不使用终止检测
const jsonStream = toJsonServerSentEventStream(serverSentEventStream);// 使用终止检测处理 OpenAI 风格的完成
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
const terminatingStream = toJsonServerSentEventStream(
serverSentEventStream,
terminateOnDone,
);
// 自定义终止逻辑
const terminateOnError: TerminateDetector = event => {
return event.event === 'error' || event.data.includes('ERROR');
};
const errorHandlingStream = toJsonServerSentEventStream(
serverSentEventStream,
terminateOnError,
);
`$3
定义带有 JSON 数据的服务器发送事件结构的接口。
`typescript
interface JsonServerSentEvent extends Omit {
data: DATA; // 作为解析 JSON 的事件数据
}
`$3
带有 JSON 数据的 ServerSentEvent 对象的可读流的类型别名。
`typescript
type JsonServerSentEventStream = ReadableStream<
JsonServerSentEvent
>;
`$3
用于检测服务器发送事件流何时应该终止的函数类型。这通常用于 LLM API 发送特殊终止事件来表示响应流结束的情况。
#### 签名
`typescript
type TerminateDetector = (event: ServerSentEvent) => boolean;
`#### 参数
-
event:当前正在处理的 ServerSentEvent#### 返回
-
boolean:如果应该终止流则返回 true,否则返回 false#### 示例
`typescript
// OpenAI 风格的终止(常见模式)
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';// 基于事件的终止
const terminateOnComplete: TerminateDetector = event => event.event === 'done';
// 具有多个条件的自定义终止
const terminateOnFinish: TerminateDetector = event => {
return (
event.event === 'done' ||
event.event === 'error' ||
event.data === '[DONE]' ||
event.data.includes('TERMINATE')
);
};
// 与 toJsonServerSentEventStream 一起使用
const stream = toJsonServerSentEventStream(
serverSentEventStream,
terminateOnDone,
);
`#### 常见用例
- LLM 流式传输:检测来自 OpenAI、Claude 或其他 LLM API 的完成标记,如
[DONE]
- 错误处理:在收到错误事件时终止流
- 自定义协议:实现应用程序特定的终止逻辑
- 资源管理:在满足特定条件时自动关闭流$3
将带有
text/event-stream 主体的 Response 对象转换为 ServerSentEvent 对象的可读流。#### 签名
`typescript
function toServerSentEventStream(response: Response): ServerSentEventStream;
`#### 参数
-
response:带有 text/event-stream 内容类型的 HTTP 响应#### 返回
-
ServerSentEventStream:ServerSentEvent 对象的可读流$3
定义服务器发送事件结构的接口。
`typescript
interface ServerSentEvent {
data: string; // 事件数据(必需)
event?: string; // 事件类型(可选,默认为 'message')
id?: string; // 事件 ID(可选)
retry?: number; // 以毫秒为单位的重试超时(可选)
}
`$3
ServerSentEvent 对象的可读流的类型别名。
`typescript
type ServerSentEventStream = ReadableStream;
`🛠️ 示例
$3
`typescript
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// 监听实时通知
const response = await fetcher.get('/notifications');
for await (const event of response.requiredEventStream()) {
switch (event.event) {
case 'message':
showNotification('消息', event.data);
break;
case 'alert':
showAlert('警报', event.data);
break;
case 'update':
handleUpdate(JSON.parse(event.data));
break;
default:
console.log('未知事件:', event);
}
}
`$3
`typescript
import { Fetcher } from '@ahoo-wang/fetcher';const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// 跟踪长时间运行的任务进度
const response = await fetcher.get('/tasks/123/progress');
for await (const event of response.requiredEventStream()) {
if (event.event === 'progress') {
const progress = JSON.parse(event.data);
updateProgressBar(progress.percentage);
} else if (event.event === 'complete') {
showCompletionMessage(event.data);
break;
}
}
`$3
`typescript
import { Fetcher } from '@ahoo-wang/fetcher';const fetcher = new Fetcher({
baseURL: 'https://chat-api.example.com',
});
// 实时聊天消息
const response = await fetcher.get('/rooms/123/messages');
for await (const event of response.requiredEventStream()) {
if (event.event === 'message') {
const message = JSON.parse(event.data);
displayMessage(message);
} else if (event.event === 'user-joined') {
showUserJoined(event.data);
} else if (event.event === 'user-left') {
showUserLeft(event.data);
}
}
`🧪 测试
`bash
运行测试
pnpm test运行带覆盖率的测试
pnpm test -- --coverage
`测试套件包括:
- 事件流转换测试
- 边缘情况处理(格式错误的事件、分块数据等)
- 大事件流的性能测试
📋 服务器发送事件规范兼容性
此包完全实现了 服务器发送事件规范:
- 数据字段:支持多行数据字段
- 事件字段:自定义事件类型
- ID 字段:最后事件 ID 跟踪
- 重试字段:自动重连超时
- 注释行:忽略以
:` 开头的行欢迎贡献!请查看 贡献指南 了解更多详情。
本项目采用 Apache-2.0 许可证。
---
Fetcher 生态系统的一部分