/** * End-to-end integration tests for OpenAI WebSocket streaming. * * These tests hit the real OpenAI Responses API over WebSocket and verify * the full request/response lifecycle including: * - Connection establishment and session reuse * - Context options forwarding (temperature) * - Graceful fallback to HTTP on connection failure * - Connection lifecycle cleanup via releaseWsSession * * Run manually with a valid OPENAI_API_KEY: * OPENCLAW_LIVE_TEST=1 pnpm test:e2e -- src/agents/openai-ws-stream.e2e.test.ts * * This now runs only in the keyed live/release lanes.
*/
import type {
AssistantMessage,
AssistantMessageEvent,
AssistantMessageEventStream,
Context,
} from "@mariozechner/pi-ai"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { isLiveTestEnabled } from "./live-test-helpers.js"; import type { OutputItem, ResponseObject } from "./openai-ws-connection.js";
const API_KEY = process.env.OPENAI_API_KEY; const LIVE = isLiveTestEnabled(["OPENAI_LIVE_TEST"]) && !!API_KEY; const testFn = LIVE ? it : it.skip;
type OpenAIWsStreamModule = typeofimport("./openai-ws-stream.js");
type OpenAIWsConnectionModule = typeofimport("./openai-ws-connection.js");
type StreamFactory = OpenAIWsStreamModule["createOpenAIWebSocketStreamFn"];
type StreamReturn = ReturnType<ReturnType<StreamFactory>>;
let openAIWsStreamModule: OpenAIWsStreamModule;
let openAIWsConnectionModule: OpenAIWsConnectionModule;
const model = {
api: "openai-responses" as const,
provider: "openai",
id: "gpt-5.4",
name: "gpt-5.4",
contextWindow: 128_000,
maxTokens: 4_096,
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
} as unknown as Parameters<ReturnType<StreamFactory>>[0];
type StreamFnParams = Parameters<ReturnType<StreamFactory>>; function makeContext(userMessage: string): StreamFnParams[1] { return {
systemPrompt: "You are a helpful assistant. Reply in one sentence.",
messages: [{ role: "user" as const, content: userMessage }],
tools: [],
} as unknown as StreamFnParams[1];
}
function makeToolContext(userMessage: string): StreamFnParams[1] { return {
systemPrompt: "You are a precise assistant. Follow tool instructions exactly.",
messages: [{ role: "user" as const, content: userMessage }],
tools: [
{
name: "noop",
description: "Return the supplied tool result to the user.",
parameters: {
type: "object",
additionalProperties: false,
properties: {},
},
},
],
} as unknown as Context;
}
function makeToolResultMessage(
callId: string,
output: string,
): StreamFnParams[1]["messages"][number] { return {
role: "toolResult" as const,
toolCallId: callId,
toolName: "noop",
content: [{ type: "text" as const, text: output }],
isError: false,
timestamp: Date.now(),
} as unknown as StreamFnParams[1]["messages"][number];
}
/** Each test gets a unique session ID to avoid cross-test interference. */ const sessions: string[] = []; function freshSession(name: string): string { const id = `e2e-${name}-${Date.now()}`;
sessions.push(id); return id;
}
expect(assistantText(secondDone)).toMatch(/TOOL_OK/);
}, // Live CI can spend more than a minute waiting for a stable follow-up turn // when websocket reuse and tool callbacks contend with other provider lanes. 120_000,
);
testFn( "surfaces replay-safe reasoning metadata on websocket tool turns",
async () => { const sid = freshSession("tool-reasoning"); const completedResponses: ResponseObject[] = [];
openAIWsStreamModule.__testing.setDepsForTest({
createManager: (options) => { const manager = new openAIWsConnectionModule.OpenAIWebSocketManager(options);
manager.onMessage((event) => { if (event.type === "response.completed") {
completedResponses.push(event.response);
}
}); return manager;
},
}); const streamFn = openAIWsStreamModule.createOpenAIWebSocketStreamFn(API_KEY!, sid); const firstContext = makeToolContext( "Think carefully, call the tool `noop` with {} first, then after the tool result reply with exactly TOOL_OK.",
); const firstDone = expectDone(
await collectEvents(
streamFn(model, firstContext, {
transport: "websocket",
toolChoice: "required",
reasoningEffort: "high",
reasoningSummary: "detailed",
maxTokens: 256,
} as unknown as StreamFnParams[2]),
),
);
const done = events.find((event) => event.type === "done")?.message; if (done) {
expect(assistantText(done).toLowerCase()).toContain("warmed");
}
}, // This transport check does not need expensive reasoning. Keep the timeout // generous for CI jitter, but force a minimal response shape so the first // websocket request stays bounded. 720_000,
);
testFn( "session is tracked in registry during request",
async () => { const sid = freshSession("registry"); const streamFn = openAIWsStreamModule.createOpenAIWebSocketStreamFn(API_KEY!, sid);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.