import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" ;
import * as sessions from "../config/sessions.js" ;
import * as gateway from "../gateway/call.js" ;
import * as sessionUtils from "../gateway/session-utils.fs.js" ;
import * as announceDelivery from "./subagent-announce-delivery.js" ;
import {
recoverOrphanedSubagentSessions,
scheduleOrphanRecovery,
} from "./subagent-orphan-recovery.js" ;
import * as subagentRegistrySteerRuntime from "./subagent-registry-steer-runtime.js" ;
import type { SubagentRunRecord } from "./subagent-registry.types.js" ;
// Mock dependencies before importing the module under test
vi.mock("../config/config.js" , () => ({
loadConfig: vi.fn(() => ({
session: { store: undefined },
})),
}));
vi.mock("../config/sessions.js" , () => ({
loadSessionStore: vi.fn(() => ({})),
resolveAgentIdFromSessionKey: vi.fn(() => "main" ),
resolveStorePath: vi.fn(() => "/tmp/test-sessions.json" ),
updateSessionStore: vi.fn(async () => {}),
}));
vi.mock("../gateway/call.js" , () => ({
callGateway: vi.fn(async () => ({ runId: "test-run-id" })),
}));
vi.mock("../gateway/session-utils.fs.js" , () => ({
readSessionMessages: vi.fn(() => []),
}));
vi.mock("./subagent-announce-delivery.js" , () => ({
deliverSubagentAnnouncement: vi.fn(async () => ({ delivered: true , path: "direct" })),
isInternalAnnounceRequesterSession: vi.fn(() => false ),
loadRequesterSessionEntry: vi.fn(() => ({ entry: {} })),
}));
vi.mock("./subagent-announce-origin.js" , () => ({
resolveAnnounceOrigin: vi.fn((entry, requesterOrigin) => requesterOrigin),
}));
vi.mock("./subagent-registry-steer-runtime.js" , () => ({
replaceSubagentRunAfterSteer: vi.fn(() => true ),
finalizeInterruptedSubagentRun: vi.fn(async () => 1 ),
}));
function createTestRunRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
return {
runId: "run-1" ,
childSessionKey: "agent:main:subagent:test-session-1" ,
requesterSessionKey: "agent:main:quietchat:direct:+1234567890" ,
requesterDisplayKey: "main" ,
task: "Test task: implement feature X" ,
cleanup: "delete" ,
createdAt: Date.now() - 60 _000 ,
startedAt: Date.now() - 55 _000 ,
...overrides,
};
}
function createActiveRuns(...runs: SubagentRunRecord[]) {
return new Map(runs.map((run) => [run.runId, run] satisfies [string, SubagentRunRecord]));
}
function mockSingleAbortedSession(
overrides: Partial<NonNullable<ReturnType<typeof sessions.loadSessionStore>[string]>> = {},
) {
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1" : {
sessionId: "session-abc" ,
updatedAt: Date.now(),
abortedLastRun: true ,
...overrides,
},
});
}
async function expectSkippedRecovery(store: ReturnType<typeof sessions.loadSessionStore>) {
vi.mocked(sessions.loadSessionStore).mockReturnValue(store);
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => createActiveRuns(createTestRunRecord()),
});
expect(result.recovered).toBe(0 );
expect(result.skipped).toBe(1 );
expect(gateway.callGateway).not.toHaveBeenCalled();
}
function getResumeMessage() {
const call = vi.mocked(gateway.callGateway).mock.calls[0 ];
expect(call).toBeDefined();
const params = call[0 ].params as Record<string, unknown>;
return params.message as string;
}
describe("subagent-orphan-recovery" , () => {
beforeEach(() => {
vi.useFakeTimers();
vi.clearAllMocks();
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
});
it("recovers orphaned sessions with abortedLastRun=true" , async () => {
const sessionEntry = {
sessionId: "session-abc" ,
updatedAt: Date.now(),
abortedLastRun: true ,
};
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1" : sessionEntry,
});
const run = createTestRunRecord();
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1" , run);
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(1 );
expect(result.failed).toBe(0 );
expect(result.skipped).toBe(0 );
// Should have called callGateway to resume the session
expect(gateway.callGateway).toHaveBeenCalledOnce();
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0 ];
const opts = callArgs[0 ];
expect(opts.method).toBe("agent" );
const params = opts.params as Record<string, unknown>;
expect(params.sessionKey).toBe("agent:main:subagent:test-session-1" );
expect(params.message).toContain("gateway reload" );
expect(params.message).toContain("Test task: implement feature X" );
expect(subagentRegistrySteerRuntime.replaceSubagentRunAfterSteer).toHaveBeenCalledWith(
expect.objectContaining({
previousRunId: "run-1" ,
nextRunId: "test-run-id" ,
fallback: run,
}),
);
});
it("skips sessions that are not aborted" , async () => {
await expectSkippedRecovery({
"agent:main:subagent:test-session-1" : {
sessionId: "session-abc" ,
updatedAt: Date.now(),
abortedLastRun: false ,
},
});
});
it("skips runs that have already ended" , async () => {
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set(
"run-1" ,
createTestRunRecord({
endedAt: Date.now() - 1000 ,
}),
);
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(0 );
expect(result.skipped).toBe(1 );
expect(gateway.callGateway).not.toHaveBeenCalled();
});
it("recovers restart-aborted timeout runs even when the registry marked them ended" , async () => {
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1" : {
sessionId: "session-abc" ,
updatedAt: Date.now(),
abortedLastRun: true ,
},
});
const activeRuns = createActiveRuns(
createTestRunRecord({
endedAt: Date.now() - 1 _000 ,
outcome: {
status: "timeout" ,
},
}),
);
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(1 );
expect(result.failed).toBe(0 );
expect(result.skipped).toBe(0 );
expect(gateway.callGateway).toHaveBeenCalledOnce();
});
it("handles multiple orphaned sessions" , async () => {
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:session-a" : {
sessionId: "id-a" ,
updatedAt: Date.now(),
abortedLastRun: true ,
},
"agent:main:subagent:session-b" : {
sessionId: "id-b" ,
updatedAt: Date.now(),
abortedLastRun: true ,
},
"agent:main:subagent:session-c" : {
sessionId: "id-c" ,
updatedAt: Date.now(),
abortedLastRun: false ,
},
});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set(
"run-a" ,
createTestRunRecord({
runId: "run-a" ,
childSessionKey: "agent:main:subagent:session-a" ,
task: "Task A" ,
}),
);
activeRuns.set(
"run-b" ,
createTestRunRecord({
runId: "run-b" ,
childSessionKey: "agent:main:subagent:session-b" ,
task: "Task B" ,
}),
);
activeRuns.set(
"run-c" ,
createTestRunRecord({
runId: "run-c" ,
childSessionKey: "agent:main:subagent:session-c" ,
task: "Task C" ,
}),
);
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(2 );
expect(result.skipped).toBe(1 );
expect(gateway.callGateway).toHaveBeenCalledTimes(2 );
});
it("handles callGateway failure gracefully and preserves abortedLastRun flag" , async () => {
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1" : {
sessionId: "session-abc" ,
updatedAt: Date.now(),
abortedLastRun: true ,
},
});
vi.mocked(gateway.callGateway).mockRejectedValue(new Error("gateway unavailable" ));
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1" , createTestRunRecord());
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(0 );
expect(result.failed).toBe(1 );
expect(result.failedRuns).toEqual([
expect.objectContaining({
runId: "run-1" ,
childSessionKey: "agent:main:subagent:test-session-1" ,
error: "gateway unavailable" ,
}),
]);
// abortedLastRun flag should NOT be cleared on failure,
// so the next restart can retry the recovery
expect(sessions.updateSessionStore).not.toHaveBeenCalled();
});
it("returns empty results when no active runs exist" , async () => {
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => new Map(),
});
expect(result.recovered).toBe(0 );
expect(result.failed).toBe(0 );
expect(result.skipped).toBe(0 );
});
it("skips sessions with missing session entry in store" , async () => {
await expectSkippedRecovery({});
});
it("clears abortedLastRun flag after successful resume" , async () => {
// Ensure callGateway succeeds for this test
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "resumed-run" } as never);
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1" : {
sessionId: "session-abc" ,
updatedAt: Date.now(),
abortedLastRun: true ,
},
});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1" , createTestRunRecord());
await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
// updateSessionStore should have been called AFTER successful resume to clear the flag
expect(sessions.updateSessionStore).toHaveBeenCalledOnce();
const calls = vi.mocked(sessions.updateSessionStore).mock.calls;
const [storePath, updater] = calls[0 ];
expect(storePath).toBe("/tmp/test-sessions.json" );
// Simulate the updater to verify it clears abortedLastRun
const mockStore: Record<string, { abortedLastRun?: boolean ; updatedAt?: number }> = {
"agent:main:subagent:test-session-1" : {
abortedLastRun: true ,
updatedAt: 0 ,
},
};
(updater as (store: Record<string, unknown>) => void )(mockStore);
expect(mockStore["agent:main:subagent:test-session-1" ]?.abortedLastRun).toBe(false );
});
it("truncates long task descriptions in resume message" , async () => {
mockSingleAbortedSession();
const longTask = "x" .repeat(5000 );
const activeRuns = createActiveRuns(createTestRunRecord({ task: longTask }));
await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
const message = getResumeMessage();
// Message should contain truncated task (2000 chars + "...")
expect(message.length).toBeLessThan(5000 );
expect(message).toContain("..." );
});
it("includes last human message in resume when available" , async () => {
mockSingleAbortedSession({ sessionFile: "session-abc.jsonl" });
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
{ role: "user" , content: [{ type: "text" , text: "Please build feature Y" }] },
{ role: "assistant" , content: [{ type: "text" , text: "Working on it..." }] },
{ role: "user" , content: [{ type: "text" , text: "Also add tests for it" }] },
{ role: "assistant" , content: [{ type: "text" , text: "Sure, adding tests now." }] },
]);
const activeRuns = createActiveRuns(createTestRunRecord());
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
const message = getResumeMessage();
expect(message).toContain("Also add tests for it" );
expect(message).toContain("last message from the user" );
});
it("adds config change hint when assistant messages reference config modifications" , async () => {
mockSingleAbortedSession();
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
{ role: "user" , content: "Update the config" },
{ role: "assistant" , content: "I've modified openclaw.json to add the new setting." },
]);
const activeRuns = createActiveRuns(createTestRunRecord());
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
const message = getResumeMessage();
expect(message).toContain("config changes from your previous run were already applied" );
});
it("announces recovery-in-progress once when a later retry is attempting resume" , async () => {
mockSingleAbortedSession();
const activeRuns = createActiveRuns(createTestRunRecord());
const notifiedRecoverySessionKeys = new Set<string>();
await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
attemptNumber: 2 ,
maxAttempts: 4 ,
notifiedRecoverySessionKeys,
});
expect(announceDelivery.deliverSubagentAnnouncement).toHaveBeenCalledOnce();
expect(announceDelivery.deliverSubagentAnnouncement).toHaveBeenCalledWith(
expect.objectContaining({
requesterSessionKey: "agent:main:quietchat:direct:+1234567890" ,
triggerMessage: expect.stringContaining("Automatic recovery is already in progress" ),
}),
);
expect(notifiedRecoverySessionKeys).toEqual(new Set(["agent:main:subagent:test-session-1" ]));
await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
attemptNumber: 3 ,
maxAttempts: 4 ,
notifiedRecoverySessionKeys,
});
expect(announceDelivery.deliverSubagentAnnouncement).toHaveBeenCalledOnce();
});
it("prevents duplicate resume when updateSessionStore fails" , async () => {
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "new-run" } as never);
vi.mocked(sessions.updateSessionStore).mockRejectedValue(new Error("write failed" ));
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1" : {
sessionId: "session-abc" ,
updatedAt: Date.now(),
abortedLastRun: true ,
},
});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1" , createTestRunRecord());
activeRuns.set(
"run-2" ,
createTestRunRecord({
runId: "run-2" ,
}),
);
const result = await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
expect(result.recovered).toBe(1 );
expect(result.skipped).toBe(1 );
expect(gateway.callGateway).toHaveBeenCalledOnce();
});
it("does not retry a session after the gateway accepted resume but run remap failed" , async () => {
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "new-run" } as never);
vi.mocked(subagentRegistrySteerRuntime.replaceSubagentRunAfterSteer).mockReturnValue(false );
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1" : {
sessionId: "session-abc" ,
updatedAt: Date.now(),
abortedLastRun: true ,
},
});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1" , createTestRunRecord());
const resumedSessionKeys = new Set<string>();
const first = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
resumedSessionKeys,
});
const second = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
resumedSessionKeys,
});
expect(first.recovered).toBe(1 );
expect(first.failed).toBe(0 );
expect(second.recovered).toBe(0 );
expect(second.skipped).toBe(1 );
expect(gateway.callGateway).toHaveBeenCalledOnce();
expect(sessions.updateSessionStore).toHaveBeenCalledOnce();
});
it("finalizes interrupted runs with a readable failure after recovery retries are exhausted" , async () => {
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1" : {
sessionId: "session-abc" ,
updatedAt: Date.now(),
abortedLastRun: true ,
},
});
vi.mocked(gateway.callGateway).mockRejectedValue(new Error("service restart" ));
const activeRuns = createActiveRuns(createTestRunRecord());
scheduleOrphanRecovery({
getActiveRuns: () => activeRuns,
delayMs: 1 ,
maxRetries: 1 ,
});
await vi.advanceTimersByTimeAsync(1 );
await Promise.resolve();
await vi.advanceTimersByTimeAsync(2 );
await Promise.resolve();
expect(subagentRegistrySteerRuntime.finalizeInterruptedSubagentRun).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-1" ,
childSessionKey: "agent:main:subagent:test-session-1" ,
error: expect.stringContaining("Automatic recovery failed after 2 attempts" ),
}),
);
expect(subagentRegistrySteerRuntime.finalizeInterruptedSubagentRun).toHaveBeenCalledWith(
expect.objectContaining({
error: expect.stringContaining("service restart" ),
}),
);
});
});
Messung V0.5 in Prozent C=98 H=97 G=97
¤ Dauer der Verarbeitung: 0.12 Sekunden
(vorverarbeitet am 2026-05-26)
¤
*© Formatika GbR, Deutschland