连接适配器处理客户端和服务器之间的通信,用于流式传输聊天响应。TanStack AI 提供了内置适配器并支持自定义实现。
SSE 是大多数用例的推荐适配器。它提供可靠的流式传输和自动重连。在服务器端,使用 toServerSentEventsStream() 或 toServerSentEventsResponse() 将您的聊天流转换为 SSE 格式。
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat"),
});
选项
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
headers: {
Authorization: "Bearer token",
},
}),
});
动态值
您可以使用函数来获取在每次请求时进行评估的动态 URL 或选项
const { messages } = useChat({
connection: fetchServerSentEvents(
() => `/api/chat?user=${currentUserId}`,
() => ({
headers: { Authorization: `Bearer ${getToken()}` },
})
),
});
自定义 fetch
您还可以传入自定义 fetch 客户端(用于代理、重试或自定义传输)
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
fetchClient: myCustomFetch,
}),
});
对于不支持 SSE 的环境,请使用 HTTP 流适配器
import { useChat, fetchHttpStream } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchHttpStream("/api/chat"),
});
对于特殊用例,您可以创建自定义适配器以满足特定的协议或要求
import { stream, type ConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk, ModelMessage } from "@tanstack/ai";
const customAdapter: ConnectionAdapter = stream(
async (messages: ModelMessage[], data?: Record<string, any>, signal?: AbortSignal) => {
// Custom implementation
const response = await fetch("/api/chat", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ messages, ...data }),
signal,
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
// Return async iterable of StreamChunk
return processStream(response);
}
);
const { messages } = useChat({
connection: customAdapter,
});
要创建基于 WebSocket 的适配器
import { stream, type ConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk, ModelMessage } from "@tanstack/ai";
function createWebSocketAdapter(url: string): ConnectionAdapter {
return stream(async (messages: ModelMessage[], data?: Record<string, any>) => {
return new ReadableStream<StreamChunk>({
async start(controller) {
const ws = new WebSocket(url);
ws.onopen = () => {
ws.send(JSON.stringify({ messages, ...data }));
};
ws.onmessage = (event) => {
const chunk = JSON.parse(event.data);
controller.enqueue(chunk);
};
ws.onerror = (error) => {
controller.error(error);
};
ws.onclose = () => {
controller.close();
};
},
});
});
}
const { messages } = useChat({
connection: createWebSocketAdapter("ws://:8080/chat"),
});
所有适配器都实现了 ConnectionAdapter 接口
interface ConnectionAdapter {
connect(
messages: UIMessage[] | ModelMessage[],
data?: Record<string, any>,
abortSignal?: AbortSignal
): AsyncIterable<StreamChunk>;
}
适配器应优雅地处理错误
const adapter = stream(async (messages, data, signal) => {
try {
const response = await fetch("/api/chat", {
method: "POST",
body: JSON.stringify({ messages, ...data }),
signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return processStream(response);
} catch (error) {
if (error.name === "AbortError") {
// Request was cancelled
return;
}
throw error;
}
});
将身份验证标头添加到适配器
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
headers: {
Authorization: `Bearer ${token}`,
},
}),
});
对于动态令牌,请使用函数
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", () => ({
headers: {
Authorization: `Bearer ${getToken()}`,
},
})),
});