Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/JAVA/Openclaw/src/gateway/   (KI Agentensystem Version 22©)  Datei vom 26.3.2026 mit Größe 36 kB image not shown  

Quelle  server-restart-sentinel.test.ts

  Sprache: JAVA
 

Spracherkennung für: .ts vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
import { mergeMockedModule } from "../test-utils/vitest-module-mocks.js";

type LoadedSessionEntry = ReturnType<typeof import("./session-utils.js").loadSessionEntry>;
type RecordInboundSessionAndDispatchReplyParams = Parameters<
  typeof import("../plugin-sdk/inbound-reply-dispatch.js").recordInboundSessionAndDispatchReply
>[0];

const mocks = vi.hoisted(() => {
  const state = {
    queuedSessionDelivery: null as Record<string, unknown> | null,
  };

  return {
    resolveSessionAgentId: vi.fn(() => "agent-from-key"),
    get queuedSessionDelivery() {
      return state.queuedSessionDelivery;
    },
    set queuedSessionDelivery(value: Record<string, unknown> | null) {
      state.queuedSessionDelivery = value;
    },
    readRestartSentinel: vi.fn(async () => ({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
      },
    })),
    removeRestartSentinelFile: vi.fn(async () => undefined),
    resolveRestartSentinelPath: vi.fn(() => "/tmp/restart-sentinel.json"),
    formatRestartSentinelMessage: vi.fn(() => "restart message"),
    summarizeRestartSentinel: vi.fn(() => "restart summary"),
    resolveMainSessionKeyFromConfig: vi.fn(() => "agent:main:main"),
    parseSessionThreadInfo: vi.fn(
      (): { baseSessionKey: string | null | undefined; threadId: string | undefined } => ({
        baseSessionKey: null,
        threadId: undefined,
      }),
    ),
    loadSessionEntry: vi.fn(
      (): LoadedSessionEntry => ({
        cfg: {},
        entry: {
          sessionId: "agent:main:main",
          updatedAt: 0,
        },
        store: {},
        storePath: "/tmp/sessions.json",
        canonicalKey: "agent:main:main",
        legacyKey: undefined,
      }),
    ),
    deliveryContextFromSession: vi.fn(
      ():
        | { channel?: string; to?: string; accountId?: string; threadId?: string | number }
        | undefined => undefined,
    ),
    mergeDeliveryContext: vi.fn((a?: Record<string, unknown>, b?: Record<string, unknown>) => ({
      ...b,
      ...a,
    })),
    getChannelPlugin: vi.fn((): ChannelPlugin | undefined => undefined),
    normalizeChannelId: vi.fn<(channel?: string | null) => string | null>(),
    resolveOutboundTarget: vi.fn(((_params?: { to?: string }) => ({
      ok: true as const,
      to: "+15550002",
    })) as (params?: { to?: string }) => { ok: true; to: string } | { ok: false; error: Error }),
    deliverOutboundPayloads: vi.fn(async () => [{ channel: "whatsapp", messageId: "msg-1" }]),
    enqueueDelivery: vi.fn(async () => "queue-1"),
    ackDelivery: vi.fn(async () => {}),
    failDelivery: vi.fn(async () => {}),
    enqueueSystemEvent: vi.fn(),
    requestHeartbeatNow: vi.fn(),
    enqueueSessionDelivery: vi.fn(async (payload: Record<string, unknown>) => {
      state.queuedSessionDelivery = payload;
      return "session-delivery-1";
    }),
    drainPendingSessionDeliveries: vi.fn(
      async (params: {
        logLabel: string;
        log: { warn: (message: string) => void };
        selectEntry: (entry: Record<string, unknown>, now: number) => { match: boolean };
        deliver: (entry: Record<string, unknown>) => Promise<void>;
      }) => {
        if (!state.queuedSessionDelivery) {
          return;
        }
        const entry = {
          id: "session-delivery-1",
          enqueuedAt: 1,
          retryCount: 0,
          ...state.queuedSessionDelivery,
        };
        if (!params.selectEntry(entry, Date.now()).match) {
          return;
        }
        try {
          await params.deliver(entry);
        } catch (err) {
          params.log.warn(`${params.logLabel}: retry failed for entry ${entry.id}: ${String(err)}`);
        }
      },
    ),
    recoverPendingSessionDeliveries: vi.fn(async () => ({
      recovered: 0,
      failed: 0,
      skippedMaxRetries: 0,
      deferredBackoff: 0,
    })),
    injectTimestamp: vi.fn((message: string) => `stamped:${message}`),
    timestampOptsFromConfig: vi.fn(() => ({})),
    recordInboundSessionAndDispatchReply: vi.fn(
      async (_params: RecordInboundSessionAndDispatchReplyParams) => {},
    ),
    logInfo: vi.fn(),
    logWarn: vi.fn(),
    logError: vi.fn(),
  };
});

vi.mock("../agents/agent-scope.js", () => ({
  resolveSessionAgentId: mocks.resolveSessionAgentId,
}));

vi.mock("../infra/restart-sentinel.js", () => ({
  readRestartSentinel: mocks.readRestartSentinel,
  removeRestartSentinelFile: mocks.removeRestartSentinelFile,
  resolveRestartSentinelPath: mocks.resolveRestartSentinelPath,
  formatRestartSentinelMessage: mocks.formatRestartSentinelMessage,
  summarizeRestartSentinel: mocks.summarizeRestartSentinel,
}));

vi.mock("../infra/session-delivery-queue.js", () => ({
  enqueueSessionDelivery: mocks.enqueueSessionDelivery,
  drainPendingSessionDeliveries: mocks.drainPendingSessionDeliveries,
  recoverPendingSessionDeliveries: mocks.recoverPendingSessionDeliveries,
}));

vi.mock("../config/sessions.js", () => ({
  resolveMainSessionKeyFromConfig: mocks.resolveMainSessionKeyFromConfig,
}));

vi.mock("../config/sessions/thread-info.js", () => ({
  parseSessionThreadInfo: mocks.parseSessionThreadInfo,
}));

vi.mock("./session-utils.js", () => ({
  loadSessionEntry: mocks.loadSessionEntry,
}));

vi.mock("../utils/delivery-context.shared.js", () => ({
  deliveryContextFromSession: mocks.deliveryContextFromSession,
  mergeDeliveryContext: mocks.mergeDeliveryContext,
}));

vi.mock("../channels/plugins/index.js", async () => {
  return await mergeMockedModule(
    await vi.importActual<typeof import("../channels/plugins/index.js")>(
      "../channels/plugins/index.js",
    ),
    (actual) => ({
      getChannelPlugin: mocks.getChannelPlugin,
      normalizeChannelId: mocks.normalizeChannelId.mockImplementation(
        (channel?: string | null) =>
          actual.normalizeChannelId(channel) ??
          (typeof channel === "string" && channel.trim().length > 0
            ? channel.trim().toLowerCase()
            : null),
      ),
    }),
  );
});

vi.mock("../infra/outbound/targets.js", () => ({
  resolveOutboundTarget: mocks.resolveOutboundTarget,
}));

vi.mock("../infra/outbound/deliver.js", () => ({
  deliverOutboundPayloads: mocks.deliverOutboundPayloads,
}));

vi.mock("../infra/outbound/delivery-queue.js", () => ({
  enqueueDelivery: mocks.enqueueDelivery,
  ackDelivery: mocks.ackDelivery,
  failDelivery: mocks.failDelivery,
}));

vi.mock("../infra/system-events.js", () => ({
  enqueueSystemEvent: mocks.enqueueSystemEvent,
}));

vi.mock("../plugin-sdk/inbound-reply-dispatch.js", () => ({
  recordInboundSessionAndDispatchReply: mocks.recordInboundSessionAndDispatchReply,
}));

vi.mock("../infra/heartbeat-wake.js", async () => {
  return await mergeMockedModule(
    await vi.importActual<typeof import("../infra/heartbeat-wake.js")>(
      "../infra/heartbeat-wake.js",
    ),
    () => ({
      requestHeartbeatNow: mocks.requestHeartbeatNow,
    }),
  );
});

vi.mock("../logging/subsystem.js", () => ({
  createSubsystemLogger: vi.fn(() => ({
    info: mocks.logInfo,
    warn: mocks.logWarn,
    error: mocks.logError,
  })),
}));

vi.mock("./server-methods/agent-timestamp.js", () => ({
  injectTimestamp: mocks.injectTimestamp,
  timestampOptsFromConfig: mocks.timestampOptsFromConfig,
}));

const { scheduleRestartSentinelWake } = await import("./server-restart-sentinel.js");

describe("scheduleRestartSentinelWake", () => {
  afterEach(() => {
    vi.useRealTimers();
  });

  beforeEach(() => {
    vi.useRealTimers();
    mocks.queuedSessionDelivery = null;
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
      },
    });
    mocks.parseSessionThreadInfo.mockReset();
    mocks.parseSessionThreadInfo.mockReturnValue({ baseSessionKey: null, threadId: undefined });
    mocks.loadSessionEntry.mockReset();
    mocks.loadSessionEntry.mockReturnValue({
      cfg: {},
      entry: {
        sessionId: "agent:main:main",
        updatedAt: 0,
      },
      store: {},
      storePath: "/tmp/sessions.json",
      canonicalKey: "agent:main:main",
      legacyKey: undefined,
    });
    mocks.deliveryContextFromSession.mockReset();
    mocks.deliveryContextFromSession.mockReturnValue(undefined);
    mocks.getChannelPlugin.mockReset();
    mocks.getChannelPlugin.mockReturnValue(undefined);
    mocks.normalizeChannelId.mockClear();
    mocks.resolveOutboundTarget.mockReset();
    mocks.resolveOutboundTarget.mockReturnValue({ ok: true as const, to: "+15550002" });
    mocks.deliverOutboundPayloads.mockReset();
    mocks.deliverOutboundPayloads.mockResolvedValue([{ channel: "whatsapp", messageId: "msg-1" }]);
    mocks.enqueueDelivery.mockReset();
    mocks.enqueueDelivery.mockResolvedValue("queue-1");
    mocks.ackDelivery.mockClear();
    mocks.failDelivery.mockClear();
    mocks.enqueueSystemEvent.mockClear();
    mocks.requestHeartbeatNow.mockClear();
    mocks.enqueueSessionDelivery.mockClear();
    mocks.drainPendingSessionDeliveries.mockClear();
    mocks.recoverPendingSessionDeliveries.mockClear();
    mocks.removeRestartSentinelFile.mockClear();
    mocks.injectTimestamp.mockClear();
    mocks.timestampOptsFromConfig.mockClear();
    mocks.recordInboundSessionAndDispatchReply.mockReset();
    mocks.recordInboundSessionAndDispatchReply.mockResolvedValue(undefined);
    mocks.logInfo.mockClear();
    mocks.logWarn.mockClear();
    mocks.logError.mockClear();
  });

  it("enqueues the sentinel note and wakes the session even when outbound delivery succeeds", async () => {
    const deps = {} as never;

    await scheduleRestartSentinelWake({ deps });

    expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "whatsapp",
        to: "+15550002",
        session: { key: "agent:main:main", agentId: "agent-from-key" },
        deps,
        bestEffort: false,
        skipQueue: true,
      }),
    );
    expect(mocks.enqueueDelivery).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "whatsapp",
        to: "+15550002",
        payloads: [{ text: "restart message" }],
        bestEffort: false,
      }),
    );
    expect(mocks.ackDelivery).toHaveBeenCalledWith("queue-1");
    expect(mocks.failDelivery).not.toHaveBeenCalled();
    expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
      "restart message",
      expect.objectContaining({
        sessionKey: "agent:main:main",
      }),
    );
    expect(mocks.requestHeartbeatNow).toHaveBeenCalledWith({
      reason: "wake",
      sessionKey: "agent:main:main",
    });
    expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled();
    expect(mocks.logWarn).not.toHaveBeenCalled();
  });

  it("retries outbound delivery once and logs a warning without dropping the agent wake", async () => {
    vi.useFakeTimers();
    mocks.deliverOutboundPayloads
      .mockRejectedValueOnce(new Error("transport not ready"))
      .mockResolvedValueOnce([{ channel: "whatsapp", messageId: "msg-2" }]);

    const wakePromise = scheduleRestartSentinelWake({ deps: {} as never });
    await Promise.resolve();
    await Promise.resolve();
    await vi.advanceTimersByTimeAsync(1_000);
    await wakePromise;

    expect(mocks.enqueueDelivery).toHaveBeenCalledTimes(1);
    expect(mocks.deliverOutboundPayloads).toHaveBeenCalledTimes(2);
    expect(mocks.deliverOutboundPayloads).toHaveBeenNthCalledWith(
      1,
      expect.objectContaining({
        skipQueue: true,
      }),
    );
    expect(mocks.deliverOutboundPayloads).toHaveBeenNthCalledWith(
      2,
      expect.objectContaining({
        skipQueue: true,
      }),
    );
    expect(mocks.ackDelivery).toHaveBeenCalledWith("queue-1");
    expect(mocks.failDelivery).not.toHaveBeenCalled();
    expect(mocks.enqueueSystemEvent).toHaveBeenCalledTimes(1);
    expect(mocks.requestHeartbeatNow).toHaveBeenCalledTimes(1);
    expect(mocks.logWarn).toHaveBeenCalledWith(
      expect.stringContaining("retrying in 1000ms"),
      expect.objectContaining({
        channel: "whatsapp",
        to: "+15550002",
        sessionKey: "agent:main:main",
        attempt: 1,
        maxAttempts: 45,
      }),
    );
  });

  it("keeps one queued restart notice when outbound retries are exhausted", async () => {
    vi.useFakeTimers();
    mocks.deliverOutboundPayloads.mockRejectedValue(new Error("transport still not ready"));

    const wakePromise = scheduleRestartSentinelWake({ deps: {} as never });
    await Promise.resolve();
    await Promise.resolve();
    for (let attempt = 1; attempt < 45; attempt += 1) {
      await vi.advanceTimersByTimeAsync(1_000);
    }
    await wakePromise;

    expect(mocks.enqueueDelivery).toHaveBeenCalledTimes(1);
    expect(mocks.deliverOutboundPayloads).toHaveBeenCalledTimes(45);
    expect(mocks.ackDelivery).not.toHaveBeenCalled();
    expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready");
  });

  it("still dispatches continuation after restart notice retries are exhausted", async () => {
    vi.useFakeTimers();
    mocks.deliverOutboundPayloads.mockRejectedValue(new Error("transport still not ready"));
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as unknown as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);

    const wakePromise = scheduleRestartSentinelWake({ deps: {} as never });
    await Promise.resolve();
    await Promise.resolve();
    for (let attempt = 1; attempt < 45; attempt += 1) {
      await vi.advanceTimersByTimeAsync(1_000);
    }
    await wakePromise;

    expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready");
    expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1);
    expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
      expect.objectContaining({
        routeSessionKey: "agent:main:main",
        ctxPayload: expect.objectContaining({
          Body: "continue",
        }),
      }),
    );
  });

  it("prefers top-level sentinel threadId for wake routing context", async () => {
    // Legacy or malformed sentinel JSON can still carry a nested threadId.
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
          threadId: "stale-thread",
        } as never,
        threadId: "fresh-thread",
      },
    } as unknown as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
      "restart message",
      expect.objectContaining({
        sessionKey: "agent:main:main",
        deliveryContext: expect.objectContaining({
          threadId: "fresh-thread",
        }),
      }),
    );
  });

  it("dispatches agentTurn continuation after the restart notice in the same routed thread", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        threadId: "thread-42",
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "Reply with exactly: Yay! I did it!",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => {
      await params.deliver({
        text: "done",
        replyToId: "restart-sentinel:agent:main:main:agentTurn:123",
      });
    });

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.enqueueDelivery).toHaveBeenCalledWith(
      expect.objectContaining({
        payloads: [{ text: "restart message" }],
        threadId: "thread-42",
      }),
    );
    expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1);
    expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "whatsapp",
        accountId: "acct-2",
        routeSessionKey: "agent:main:main",
        ctxPayload: expect.objectContaining({
          Body: "Reply with exactly: Yay! I did it!",
          BodyForAgent: "stamped:Reply with exactly: Yay! I did it!",
          BodyForCommands: "",
          CommandBody: "",
          CommandAuthorized: false,
          SessionKey: "agent:main:main",
          Provider: "whatsapp",
          Surface: "whatsapp",
          OriginatingChannel: "whatsapp",
          OriginatingTo: "+15550002",
          MessageThreadId: "thread-42",
        }),
      }),
    );
  });

  it("preserves the session chat type for agentTurn continuations", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:group",
        deliveryContext: {
          channel: "telegram",
          to: "telegram:-1001",
          accountId: "default",
        },
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.loadSessionEntry.mockReturnValue({
      cfg: {},
      entry: {
        sessionId: "agent:main:group",
        updatedAt: 0,
        origin: { provider: "telegram", chatType: "group" },
      },
      store: {},
      storePath: "/tmp/sessions.json",
      canonicalKey: "agent:main:group",
      legacyKey: undefined,
    });
    mocks.resolveOutboundTarget.mockReturnValue({ ok: true as const, to: "telegram:-1001" });

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "telegram",
        routeSessionKey: "agent:main:group",
        ctxPayload: expect.objectContaining({
          ChatType: "group",
          OriginatingChannel: "telegram",
          OriginatingTo: "telegram:-1001",
        }),
      }),
    );
  });

  it("preserves derived reply transport ids in continuation context", async () => {
    mocks.getChannelPlugin.mockReturnValue({
      id: "whatsapp",
      meta: {
        id: "whatsapp",
        label: "WhatsApp",
        selectionLabel: "WhatsApp",
        docsPath: "/channels/whatsapp",
        blurb: "WhatsApp",
      },
      capabilities: { chatTypes: ["direct"] },
      config: {
        listAccountIds: () => [],
        resolveAccount: () => ({}),
      },
      threading: {
        resolveReplyTransport: ({ threadId }: { threadId?: string | number | null }) => ({
          replyToId: threadId != null ? `reply:${String(threadId)}` : undefined,
          threadId: null,
        }),
      },
    });
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        threadId: "thread-42",
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => {
      await params.deliver({
        text: "done",
        replyToId: "restart-sentinel:agent:main:main:agentTurn:123",
      });
    });

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
      expect.objectContaining({
        ctxPayload: expect.objectContaining({
          ReplyToId: "reply:thread-42",
          MessageThreadId: undefined,
        }),
      }),
    );
    expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
      expect.objectContaining({
        payloads: [
          {
            text: "done",
            replyToId: "reply:thread-42",
          },
        ],
      }),
    );
  });

  it("strips synthetic reply transport ids when no real reply target exists", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => {
      await params.deliver({
        text: "done",
        replyToId: "restart-sentinel:agent:main:main:agentTurn:123",
      });
    });

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
      expect.objectContaining({
        payloads: [{ text: "done" }],
      }),
    );
  });

  it("preserves non-synthetic reply transport ids from continuation payloads", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => {
      await params.deliver({
        text: "done",
        replyToId: "provider-reply-id",
      });
    });

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
      expect.objectContaining({
        payloads: [
          {
            text: "done",
            replyToId: "provider-reply-id",
          },
        ],
      }),
    );
  });

  it("dispatches agentTurn continuation from session delivery context when sentinel routing is empty", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as unknown as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.deliveryContextFromSession.mockReturnValue({
      channel: "telegram",
      to: "telegram:200482621",
      accountId: "default",
    });
    mocks.resolveOutboundTarget.mockReturnValue({
      ok: true as const,
      to: "telegram:200482621",
    });

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "telegram",
        accountId: "default",
        ctxPayload: expect.objectContaining({
          Body: "continue",
          OriginatingChannel: "telegram",
          OriginatingTo: "telegram:200482621",
        }),
      }),
    );
  });

  it("requests another wake after enqueueing a systemEvent continuation", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        threadId: "thread-42",
        ts: 123,
        continuation: {
          kind: "systemEvent",
          text: "continue after restart",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(
      2,
      "continue after restart",
      expect.objectContaining({
        sessionKey: "agent:main:main",
        deliveryContext: expect.objectContaining({
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
          threadId: "thread-42",
        }),
      }),
    );
    expect(mocks.requestHeartbeatNow).toHaveBeenNthCalledWith(1, {
      reason: "wake",
      sessionKey: "agent:main:main",
    });
    expect(mocks.requestHeartbeatNow).toHaveBeenNthCalledWith(2, {
      reason: "wake",
      sessionKey: "agent:main:main",
    });
  });

  it("enqueues systemEvent continuation without stale partial delivery context", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        threadId: "thread-42",
        ts: 123,
        continuation: {
          kind: "systemEvent",
          text: "continue after restart",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.resolveOutboundTarget.mockReturnValueOnce({
      ok: false,
      error: new Error("missing route"),
    });

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(
      2,
      "continue after restart",
      expect.objectContaining({
        sessionKey: "agent:main:main",
        deliveryContext: expect.objectContaining({
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
          threadId: "thread-42",
        }),
      }),
    );
  });

  it("logs and continues when continuation delivery fails", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.recordInboundSessionAndDispatchReply.mockRejectedValueOnce(new Error("dispatch failed"));

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
      "restart message",
      expect.objectContaining({
        sessionKey: "agent:main:main",
      }),
    );
    expect(mocks.logWarn).toHaveBeenCalledWith(
      expect.stringContaining("retry failed for entry session-delivery-1: Error: dispatch failed"),
    );
  });

  it("logs and continues when continuation dispatch reports a delivery error", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(
      async (params: { onDispatchError: (err: unknown, info: { kind: string }) => void }) => {
        params.onDispatchError(new Error("route failed"), { kind: "final" });
      },
    );

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.logWarn).toHaveBeenCalledWith(
      expect.stringContaining("retry failed for entry session-delivery-1: Error: route failed"),
    );
  });

  it("falls back to a session wake when restart routing cannot resolve a destination", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.resolveOutboundTarget.mockReturnValueOnce({
      ok: false,
      error: new Error("missing route"),
    });

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled();
    expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(
      2,
      "continue",
      expect.objectContaining({
        sessionKey: "agent:main:main",
      }),
    );
    expect(mocks.requestHeartbeatNow).toHaveBeenCalledTimes(2);
    expect(mocks.logWarn).not.toHaveBeenCalled();
  });

  it("keeps the sentinel file when durable continuation handoff fails", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:main",
        deliveryContext: {
          channel: "whatsapp",
          to: "+15550002",
          accountId: "acct-2",
        },
        ts: 123,
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.enqueueSessionDelivery.mockRejectedValueOnce(new Error("queue write failed"));

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.removeRestartSentinelFile).not.toHaveBeenCalled();
    expect(mocks.drainPendingSessionDeliveries).not.toHaveBeenCalled();
    expect(mocks.logWarn).toHaveBeenCalledWith(
      "startup task failed",
      expect.objectContaining({
        source: "restart-sentinel",
        sessionKey: "agent:main:main",
        reason: "queue write failed",
      }),
    );
  });

  it("consumes continuation once and does not replay it on later startup cycles", async () => {
    mocks.readRestartSentinel
      .mockResolvedValueOnce({
        payload: {
          sessionKey: "agent:main:main",
          deliveryContext: {
            channel: "whatsapp",
            to: "+15550002",
            accountId: "acct-2",
          },
          ts: 123,
          continuation: {
            kind: "agentTurn",
            message: "continue",
          },
        },
      } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>)
      .mockResolvedValueOnce(
        null as unknown as Awaited<ReturnType<typeof mocks.readRestartSentinel>>,
      );

    await scheduleRestartSentinelWake({ deps: {} as never });
    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1);
  });

  it("does not wake the main session when the sentinel has no sessionKey", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        message: "restart message",
      },
    } as unknown as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith("restart message", {
      sessionKey: "agent:main:main",
    });
    expect(mocks.requestHeartbeatNow).not.toHaveBeenCalled();
    expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
  });

  it("warns when continuation cannot run because the restart sentinel has no sessionKey", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        message: "restart message",
        continuation: {
          kind: "agentTurn",
          message: "continue",
        },
      },
    } as unknown as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith("restart message", {
      sessionKey: "agent:main:main",
    });
    expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled();
    expect(mocks.logWarn).toHaveBeenCalledWith(
      expect.stringContaining("continuation skipped"),
      expect.objectContaining({
        sessionKey: "agent:main:main",
        continuationKind: "agentTurn",
      }),
    );
  });
  it("skips outbound restart notice when no canonical delivery context survives restart", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:matrix:channel:!lowercased:example.org",
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.parseSessionThreadInfo.mockReturnValue({
      baseSessionKey: "agent:main:matrix:channel:!lowercased:example.org",
      threadId: undefined,
    });
    mocks.deliveryContextFromSession.mockReturnValue(undefined);

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
      "restart message",
      expect.objectContaining({
        sessionKey: "agent:main:matrix:channel:!lowercased:example.org",
      }),
    );
    expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
    expect(mocks.enqueueDelivery).not.toHaveBeenCalled();
    expect(mocks.resolveOutboundTarget).not.toHaveBeenCalled();
  });

  it("resolves session routing before queueing the heartbeat wake", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:qa-channel:channel:qa-room",
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.parseSessionThreadInfo.mockReturnValue({
      baseSessionKey: "agent:main:qa-channel:channel:qa-room",
      threadId: undefined,
    });
    mocks.deliveryContextFromSession.mockReturnValue({
      channel: "qa-channel",
      to: "channel:qa-room",
    });
    mocks.requestHeartbeatNow.mockImplementation(() => {
      mocks.deliveryContextFromSession.mockReturnValue({
        channel: "qa-channel",
        to: "heartbeat",
      });
    });
    mocks.resolveOutboundTarget.mockImplementation((params?: { to?: string }) => ({
      ok: true as const,
      to: params?.to ?? "missing",
    }));

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.requestHeartbeatNow).toHaveBeenCalledTimes(1);
    expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "qa-channel",
        to: "channel:qa-room",
      }),
    );
    expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "qa-channel",
        to: "channel:qa-room",
      }),
    );
  });

  it("merges base session routing into partial thread metadata", async () => {
    mocks.readRestartSentinel.mockResolvedValue({
      payload: {
        sessionKey: "agent:main:matrix:channel:!lowercased:example.org:thread:$thread-event",
      },
    } as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
    mocks.parseSessionThreadInfo.mockReturnValue({
      baseSessionKey: "agent:main:matrix:channel:!lowercased:example.org",
      threadId: "$thread-event",
    });
    mocks.loadSessionEntry
      .mockReturnValueOnce({
        cfg: {},
        entry: {
          sessionId: "agent:main:matrix:channel:!lowercased:example.org:thread:$thread-event",
          updatedAt: 0,
          origin: { provider: "matrix", accountId: "acct-thread", threadId: "$thread-event" },
        },
        store: {},
        storePath: "/tmp/sessions.json",
        canonicalKey: "agent:main:matrix:channel:!lowercased:example.org:thread:$thread-event",
        legacyKey: undefined,
      })
      .mockReturnValueOnce({
        cfg: {},
        entry: {
          sessionId: "agent:main:matrix:channel:!lowercased:example.org",
          updatedAt: 0,
          lastChannel: "matrix",
          lastTo: "room:!MixedCase:example.org",
        },
        store: {},
        storePath: "/tmp/sessions.json",
        canonicalKey: "agent:main:matrix:channel:!lowercased:example.org",
        legacyKey: undefined,
      });
    mocks.deliveryContextFromSession
      .mockReturnValueOnce({
        channel: "matrix",
        accountId: "acct-thread",
        threadId: "$thread-event",
      })
      .mockReturnValueOnce({ channel: "matrix", to: "room:!MixedCase:example.org" });
    mocks.resolveOutboundTarget.mockReturnValue({
      ok: true as const,
      to: "room:!MixedCase:example.org",
    });

    await scheduleRestartSentinelWake({ deps: {} as never });

    expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "matrix",
        to: "room:!MixedCase:example.org",
        accountId: "acct-thread",
      }),
    );
    expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "matrix",
        to: "room:!MixedCase:example.org",
        accountId: "acct-thread",
        threadId: "$thread-event",
      }),
    );
  });
});

¤ Dauer der Verarbeitung: 0.26 Sekunden  (vorverarbeitet am  2026-04-27) ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.