Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  server.cron.test.ts

  Sprache: JAVA
 

Spracherkennung für: .ts vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { setImmediate as setImmediatePromise } from "node:timers/promises";
import { afterAll, beforeEach, describe, expect, test, vi } from "vitest";
import type WebSocket from "ws";
import type { GuardedFetchOptions } from "../infra/net/fetch-guard.js";
import {
  connectOk,
  cronIsolatedRun,
  installGatewayTestHooks,
  onceMessage,
  rpcReq,
  startServerWithClient,
  testState,
  waitForSystemEvent,
} from "./test-helpers.js";

const fetchWithSsrFGuardMock = vi.hoisted(() =>
  vi.fn(async (params: GuardedFetchOptions) => ({
    response: new Response("ok", { status: 200 }),
    finalUrl: params.url,
    release: async () => {},
  })),
);

const sendFailureNotificationAnnounceMock = vi.hoisted(() => vi.fn(async () => undefined));
const closeTrackedBrowserTabsForSessionsMock = vi.hoisted(() => vi.fn(async () => 0));

vi.mock("../infra/net/fetch-guard.js", () => ({
  fetchWithSsrFGuard: (...args: unknown[]) =>
    (
      fetchWithSsrFGuardMock as unknown as (...innerArgs: unknown[]) => Promise<{
        response: Response;
        finalUrl: string;
        release: () => Promise<void>;
      }>
    )(...args),
}));

vi.mock("../cron/delivery.js", async () => {
  const actual = await vi.importActual<typeof import("../cron/delivery.js")>("../cron/delivery.js");
  return {
    ...actual,
    sendFailureNotificationAnnounce: (...args: unknown[]) =>
      (
        sendFailureNotificationAnnounceMock as unknown as (...innerArgs: unknown[]) => Promise<void>
      )(...args),
  };
});

vi.mock("../plugin-sdk/browser-maintenance.js", () => ({
  closeTrackedBrowserTabsForSessions: closeTrackedBrowserTabsForSessionsMock,
}));

installGatewayTestHooks({ scope: "suite" });
const CRON_WAIT_TIMEOUT_MS = 3_000;
const EMPTY_CRON_STORE_CONTENT = JSON.stringify({ version: 1, jobs: [] });
let cronSuiteTempRootPromise: Promise<string> | null = null;
let cronSuiteCaseId = 0;

async function getCronSuiteTempRoot(): Promise<string> {
  if (!cronSuiteTempRootPromise) {
    cronSuiteTempRootPromise = fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-cron-suite-"));
  }
  return await cronSuiteTempRootPromise;
}

async function yieldToEventLoop() {
  await setImmediatePromise();
}

async function rmTempDir(dir: string) {
  for (let i = 0; i < 100; i += 1) {
    try {
      await fs.rm(dir, { recursive: true, force: true });
      return;
    } catch (err) {
      const code = (err as { code?: unknown } | null)?.code;
      if (code === "ENOTEMPTY" || code === "EBUSY" || code === "EPERM" || code === "EACCES") {
        await yieldToEventLoop();
        continue;
      }
      throw err;
    }
  }
  await fs.rm(dir, { recursive: true, force: true });
}

async function waitForCronEvent(
  ws: WebSocket,
  check: (payload: Record<string, unknown> | null) => boolean,
  timeoutMs = CRON_WAIT_TIMEOUT_MS,
) {
  const message = await onceMessage(
    ws,
    (obj) => {
      const payload = obj.payload ?? null;
      return obj.type === "event" && obj.event === "cron" && check(payload);
    },
    timeoutMs,
  );
  return message.payload ?? null;
}

async function createCronCasePaths(tempPrefix: string): Promise<{
  dir: string;
  storePath: string;
}> {
  const suiteRoot = await getCronSuiteTempRoot();
  const dir = path.join(suiteRoot, `${tempPrefix}${cronSuiteCaseId++}`);
  const storePath = path.join(dir, "cron", "jobs.json");
  await fs.mkdir(path.dirname(storePath), { recursive: true });
  return { dir, storePath };
}

async function cleanupCronTestRun(params: {
  ws?: { close: () => void };
  server?: { close: () => Promise<void> };
  cronState?: DirectCronState;
  prevSkipCron: string | undefined;
  clearSessionConfig?: boolean;
}) {
  params.ws?.close();
  await params.server?.close();
  params.cronState?.cron.stop();
  testState.cronStorePath = undefined;
  if (params.clearSessionConfig) {
    testState.sessionConfig = undefined;
  }
  testState.cronEnabled = undefined;
  if (params.prevSkipCron === undefined) {
    delete process.env.OPENCLAW_SKIP_CRON;
    return;
  }
  process.env.OPENCLAW_SKIP_CRON = params.prevSkipCron;
}

async function setupCronTestRun(params: {
  tempPrefix: string;
  cronEnabled?: boolean;
  sessionConfig?: { mainKey: string };
  jobs?: unknown[];
}): Promise<{ prevSkipCron: string | undefined; dir: string }> {
  const prevSkipCron = process.env.OPENCLAW_SKIP_CRON;
  process.env.OPENCLAW_SKIP_CRON = "0";
  const { dir, storePath } = await createCronCasePaths(params.tempPrefix);
  testState.cronStorePath = storePath;
  testState.sessionConfig = params.sessionConfig;
  testState.cronEnabled = params.cronEnabled;
  await fs.writeFile(
    testState.cronStorePath,
    params.jobs ? JSON.stringify({ version: 1, jobs: params.jobs }) : EMPTY_CRON_STORE_CONTENT,
  );
  return { prevSkipCron, dir };
}

type DirectCronState = {
  cron: { stop: () => void };
  storePath: string;
};

async function createDirectCronState(): Promise<DirectCronState> {
  const [{ loadConfig }, { buildGatewayCronService }] = await Promise.all([
    import("../config/config.js"),
    import("./server-cron.js"),
  ]);
  return buildGatewayCronService({
    cfg: loadConfig(),
    deps: {} as never,
    broadcast: vi.fn(),
  });
}

async function directCronReq(
  cronState: DirectCronState,
  method: string,
  params: Record<string, unknown>,
): Promise<{ ok: boolean; payload?: unknown; error?: { code?: string; message?: string } }> {
  const { cronHandlers } = await import("./server-methods/cron.js");
  let result:
    | { ok: boolean; payload?: unknown; error?: { code?: string; message?: string } }
    | undefined;
  await cronHandlers[method]({
    req: {} as never,
    params,
    respond: (ok, payload, error) => {
      result = {
        ok,
        payload,
        error,
      };
    },
    context: {
      cron: cronState.cron,
      cronStorePath: cronState.storePath,
      logGateway: {
        info: vi.fn(),
        warn: vi.fn(),
        error: vi.fn(),
      },
    } as never,
    client: null,
    isWebchatConnect: () => false,
  });
  if (!result) {
    throw new Error(`${method} did not respond`);
  }
  return result;
}

function expectCronJobIdFromResponse(response: { ok?: unknown; payload?: unknown }) {
  expect(response.ok).toBe(true);
  const value = (response.payload as { id?: unknown } | null)?.id;
  const id = typeof value === "string" ? value : "";
  expect(id.length > 0).toBe(true);
  return id;
}

async function addMainSystemEventCronJob(params: { ws: WebSocket; name: string; text?: string }) {
  const response = await rpcReq(params.ws, "cron.add", {
    name: params.name,
    enabled: true,
    schedule: { kind: "every", everyMs: 60_000 },
    sessionTarget: "main",
    wakeMode: "next-heartbeat",
    payload: { kind: "systemEvent", text: params.text ?? "hello" },
  });
  return expectCronJobIdFromResponse(response);
}

async function addWebhookCronJob(params: {
  ws: WebSocket;
  name: string;
  sessionTarget?: "main" | "isolated";
  payloadText?: string;
  delivery: Record<string, unknown>;
}) {
  const response = await rpcReq(params.ws, "cron.add", {
    name: params.name,
    enabled: true,
    schedule: { kind: "every", everyMs: 60_000 },
    sessionTarget: params.sessionTarget ?? "main",
    wakeMode: "next-heartbeat",
    payload: {
      kind: params.sessionTarget === "isolated" ? "agentTurn" : "systemEvent",
      ...(params.sessionTarget === "isolated"
        ? { message: params.payloadText ?? "test" }
        : { text: params.payloadText ?? "send webhook" }),
    },
    delivery: params.delivery,
  });
  return expectCronJobIdFromResponse(response);
}

async function writeCronConfig(config: unknown) {
  const configPath = process.env.OPENCLAW_CONFIG_PATH;
  expect(typeof configPath).toBe("string");
  await fs.mkdir(path.dirname(configPath as string), { recursive: true });
  await fs.writeFile(configPath as string, JSON.stringify(config, null, 2), "utf-8");
}

async function runCronJobForce(ws: WebSocket, id: string) {
  const response = await rpcReq(ws, "cron.run", { id, mode: "force" }, 20_000);
  expect(response.ok).toBe(true);
  expect(response.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
  return response;
}

async function runCronJobAndWaitForFinished(ws: WebSocket, jobId: string) {
  const finished = waitForCronEvent(
    ws,
    (payload) => payload?.jobId === jobId && payload?.action === "finished",
  );
  await runCronJobForce(ws, jobId);
  await finished;
}

function getWebhookCall(index: number) {
  const [args] = fetchWithSsrFGuardMock.mock.calls[index] as unknown as [
    {
      url?: string;
      init?: {
        method?: string;
        headers?: Record<string, string>;
        body?: string;
      };
    },
  ];
  const url = args.url ?? "";
  const init = args.init ?? {};
  const body = JSON.parse(init.body ?? "{}") as Record<string, unknown>;
  return { url, init, body };
}

describe("gateway server cron", () => {
  afterAll(async () => {
    if (!cronSuiteTempRootPromise) {
      return;
    }
    await rmTempDir(await cronSuiteTempRootPromise);
    cronSuiteTempRootPromise = null;
    cronSuiteCaseId = 0;
  });

  beforeEach(() => {
    // Keep polling helpers deterministic even if other tests left fake timers enabled.
    vi.useRealTimers();
    sendFailureNotificationAnnounceMock.mockClear();
    closeTrackedBrowserTabsForSessionsMock.mockClear();
  });

  test("handles cron CRUD, normalization, and patch semantics", { timeout: 45_000 }, async () => {
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-",
      sessionConfig: { mainKey: "primary" },
      cronEnabled: false,
    });

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    try {
      const addRes = await rpcReq(ws, "cron.add", {
        name: "daily",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "main",
        wakeMode: "next-heartbeat",
        payload: { kind: "systemEvent", text: "hello" },
        delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
      });
      expect(addRes.ok).toBe(true);
      const dailyJobId = (addRes.payload as { id?: unknown } | null)?.id;
      expect(typeof dailyJobId).toBe("string");

      const listRes = await rpcReq(ws, "cron.list", {
        includeDisabled: true,
      });
      expect(listRes.ok).toBe(true);
      const jobs = (listRes.payload as { jobs?: unknown } | null)?.jobs;
      expect(Array.isArray(jobs)).toBe(true);
      expect((jobs as unknown[]).length).toBe(1);
      expect(((jobs as Array<{ name?: unknown }>)[0]?.name as string) ?? "").toBe("daily");
      expect(
        ((jobs as Array<{ delivery?: { mode?: unknown } }>)[0]?.delivery?.mode as string) ?? "",
      ).toBe("webhook");
      expect(
        (
          listRes.payload as {
            deliveryPreviews?: Record<string, { label?: unknown; detail?: unknown }>;
          } | null
        )?.deliveryPreviews?.[String(dailyJobId)],
      ).toEqual({
        label: "webhook:https://example.invalid/cron-finished",
        detail: "webhook",
      });

      const routeAtMs = Date.now() - 1;
      const routeRes = await rpcReq(ws, "cron.add", {
        name: "route test",
        enabled: true,
        schedule: { kind: "at", at: new Date(routeAtMs).toISOString() },
        sessionTarget: "main",
        wakeMode: "next-heartbeat",
        payload: { kind: "systemEvent", text: "cron route check" },
      });
      expect(routeRes.ok).toBe(true);
      const routeJobIdValue = (routeRes.payload as { id?: unknown } | null)?.id;
      const routeJobId = typeof routeJobIdValue === "string" ? routeJobIdValue : "";
      expect(routeJobId.length > 0).toBe(true);

      const runRes = await rpcReq(ws, "cron.run", { id: routeJobId, mode: "force" }, 20_000);
      expect(runRes.ok).toBe(true);
      expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
      const events = await waitForSystemEvent();
      expect(events.some((event) => event.includes("cron route check"))).toBe(true);

      const wrappedAtMs = Date.now() + 1000;
      const wrappedRes = await rpcReq(ws, "cron.add", {
        data: {
          name: "wrapped",
          schedule: { at: new Date(wrappedAtMs).toISOString() },
          payload: { kind: "systemEvent", text: "hello" },
        },
      });
      expect(wrappedRes.ok).toBe(true);
      const wrappedPayload = wrappedRes.payload as
        | { schedule?: unknown; sessionTarget?: unknown; wakeMode?: unknown }
        | undefined;
      expect(wrappedPayload?.sessionTarget).toBe("main");
      expect(wrappedPayload?.wakeMode).toBe("now");
      expect((wrappedPayload?.schedule as { kind?: unknown } | undefined)?.kind).toBe("at");

      const patchJobId = await addMainSystemEventCronJob({ ws, name: "patch test" });

      const atMs = Date.now() + 1_000;
      const updateRes = await rpcReq(ws, "cron.update", {
        id: patchJobId,
        patch: {
          schedule: { at: new Date(atMs).toISOString() },
          payload: { kind: "systemEvent", text: "updated" },
        },
      });
      expect(updateRes.ok).toBe(true);
      const updated = updateRes.payload as
        | { schedule?: { kind?: unknown }; payload?: { kind?: unknown } }
        | undefined;
      expect(updated?.schedule?.kind).toBe("at");
      expect(updated?.payload?.kind).toBe("systemEvent");

      const mergeRes = await rpcReq(ws, "cron.add", {
        name: "patch merge",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "isolated",
        wakeMode: "next-heartbeat",
        payload: { kind: "agentTurn", message: "hello", model: "opus" },
      });
      expect(mergeRes.ok).toBe(true);
      const mergeJobIdValue = (mergeRes.payload as { id?: unknown } | null)?.id;
      const mergeJobId = typeof mergeJobIdValue === "string" ? mergeJobIdValue : "";
      expect(mergeJobId.length > 0).toBe(true);

      const noTimeoutRes = await rpcReq(ws, "cron.add", {
        name: "no-timeout payload",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "isolated",
        wakeMode: "next-heartbeat",
        payload: { kind: "agentTurn", message: "hello", timeoutSeconds: 0 },
      });
      expect(noTimeoutRes.ok).toBe(true);
      const noTimeoutPayload = noTimeoutRes.payload as
        | {
            payload?: {
              kind?: unknown;
              timeoutSeconds?: unknown;
            };
          }
        | undefined;
      expect(noTimeoutPayload?.payload?.kind).toBe("agentTurn");
      expect(noTimeoutPayload?.payload?.timeoutSeconds).toBe(0);

      const mergeUpdateRes = await rpcReq(ws, "cron.update", {
        id: mergeJobId,
        patch: {
          delivery: { mode: "announce", channel: "telegram", to: "19098680" },
        },
      });
      expect(mergeUpdateRes.ok).toBe(true);
      const merged = mergeUpdateRes.payload as
        | {
            payload?: { kind?: unknown; message?: unknown; model?: unknown };
            delivery?: { mode?: unknown; channel?: unknown; to?: unknown };
          }
        | undefined;
      expect(merged?.payload?.kind).toBe("agentTurn");
      expect(merged?.payload?.message).toBe("hello");
      expect(merged?.payload?.model).toBe("opus");
      expect(merged?.delivery?.mode).toBe("announce");
      expect(merged?.delivery?.channel).toBe("telegram");
      expect(merged?.delivery?.to).toBe("19098680");

      const modelOnlyPatchRes = await rpcReq(ws, "cron.update", {
        id: mergeJobId,
        patch: {
          payload: {
            model: "anthropic/claude-sonnet-4-6",
          },
        },
      });
      expect(modelOnlyPatchRes.ok).toBe(true);
      const modelOnlyPatched = modelOnlyPatchRes.payload as
        | {
            payload?: {
              kind?: unknown;
              message?: unknown;
              model?: unknown;
            };
          }
        | undefined;
      expect(modelOnlyPatched?.payload?.kind).toBe("agentTurn");
      expect(modelOnlyPatched?.payload?.message).toBe("hello");
      expect(modelOnlyPatched?.payload?.model).toBe("anthropic/claude-sonnet-4-6");

      const deliveryPatchRes = await rpcReq(ws, "cron.update", {
        id: mergeJobId,
        patch: {
          delivery: {
            mode: "announce",
            channel: "signal",
            to: "+15550001111",
            bestEffort: true,
          },
        },
      });
      expect(deliveryPatchRes.ok).toBe(true);
      const deliveryPatched = deliveryPatchRes.payload as
        | {
            payload?: { kind?: unknown; message?: unknown };
            delivery?: { mode?: unknown; channel?: unknown; to?: unknown; bestEffort?: unknown };
          }
        | undefined;
      expect(deliveryPatched?.payload?.kind).toBe("agentTurn");
      expect(deliveryPatched?.payload?.message).toBe("hello");
      expect(deliveryPatched?.delivery?.mode).toBe("announce");
      expect(deliveryPatched?.delivery?.channel).toBe("signal");
      expect(deliveryPatched?.delivery?.to).toBe("+15550001111");
      expect(deliveryPatched?.delivery?.bestEffort).toBe(true);

      const rejectJobId = await addMainSystemEventCronJob({ ws, name: "patch reject" });

      const rejectUpdateRes = await rpcReq(ws, "cron.update", {
        id: rejectJobId,
        patch: {
          payload: { kind: "agentTurn", message: "nope" },
        },
      });
      expect(rejectUpdateRes.ok).toBe(false);

      const jobId = await addMainSystemEventCronJob({ ws, name: "jobId test" });

      const jobIdUpdateRes = await rpcReq(ws, "cron.update", {
        jobId,
        patch: {
          schedule: { at: new Date(Date.now() + 2_000).toISOString() },
          payload: { kind: "systemEvent", text: "updated" },
        },
      });
      expect(jobIdUpdateRes.ok).toBe(true);

      const disableJobId = await addMainSystemEventCronJob({ ws, name: "disable test" });

      const disableUpdateRes = await rpcReq(ws, "cron.update", {
        id: disableJobId,
        patch: { enabled: false },
      });
      expect(disableUpdateRes.ok).toBe(true);
      const disabled = disableUpdateRes.payload as { enabled?: unknown } | undefined;
      expect(disabled?.enabled).toBe(false);
    } finally {
      await cleanupCronTestRun({
        ws,
        server,
        prevSkipCron,
        clearSessionConfig: true,
      });
    }
  });

  test("rejects unsafe custom session ids on add and update", async () => {
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-bad-session-target-",
      cronEnabled: false,
    });

    const cronState = await createDirectCronState();

    try {
      const addRes = await directCronReq(cronState, "cron.add", {
        name: "bad custom session",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "session:../../outside",
        wakeMode: "now",
        payload: { kind: "agentTurn", message: "hello" },
      });
      expect(addRes.ok).toBe(false);
      expect(addRes.error?.message).toContain("invalid cron sessionTarget session id");

      const validRes = await directCronReq(cronState, "cron.add", {
        name: "good custom session",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "session:project-alpha:ops",
        wakeMode: "now",
        payload: { kind: "agentTurn", message: "hello" },
      });
      expect(validRes.ok).toBe(true);
      const jobId = (validRes.payload as { id?: unknown } | null)?.id;
      expect(typeof jobId).toBe("string");

      const updateRes = await directCronReq(cronState, "cron.update", {
        id: jobId,
        patch: {
          sessionTarget: "session:..\\outside",
        },
      });
      expect(updateRes.ok).toBe(false);
      expect(updateRes.error?.message).toContain("invalid cron sessionTarget session id");
    } finally {
      await cleanupCronTestRun({ cronState, prevSkipCron });
    }
  });

  test("keeps delivery updates valid for main jobs owned by an explicit default agent", async () => {
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-main-default-agent-delivery-",
      cronEnabled: false,
    });

    await writeCronConfig({
      session: {
        mainKey: "main",
      },
      agents: {
        list: [{ id: "ops", default: true }],
      },
      channels: {
        telegram: {
          botToken: "telegram-token",
        },
      },
    });

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    try {
      const addRes = await rpcReq(ws, "cron.add", {
        name: "main default agent",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "main",
        agentId: "ops",
        wakeMode: "next-heartbeat",
        payload: { kind: "systemEvent", text: "hello" },
      });
      expect(addRes.ok).toBe(true);
      const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
      const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
      expect(jobId.length > 0).toBe(true);

      const updateRes = await rpcReq(ws, "cron.update", {
        id: jobId,
        patch: {
          delivery: { mode: "announce", channel: "telegram", to: "19098680" },
        },
      });

      expect(updateRes.ok).toBe(true);
      const updated = updateRes.payload as { delivery?: unknown } | undefined;
      expect(updated?.delivery).toBeUndefined();
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  });

  test("accepts implicit announce delivery when extra configured channels are disabled", async () => {
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-disabled-channel-ambiguity-",
      cronEnabled: false,
    });

    await writeCronConfig({
      session: {
        mainKey: "main",
      },
      channels: {
        telegram: {
          botToken: "telegram-token",
        },
        slack: {
          enabled: false,
          botToken: "xoxb-slack-token",
          appToken: "xapp-slack-token",
        },
      },
    });

    const cronState = await createDirectCronState();

    try {
      const addRes = await directCronReq(cronState, "cron.add", {
        name: "disabled extra channel",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "isolated",
        wakeMode: "next-heartbeat",
        payload: { kind: "agentTurn", message: "hello" },
        delivery: { mode: "announce" },
      });

      if (!addRes.ok) {
        throw new Error(addRes.error?.message ?? "cron.add failed");
      }
      expect(addRes.ok).toBe(true);
    } finally {
      await cleanupCronTestRun({ cronState, prevSkipCron });
    }
  });

  test("keeps delivery updates valid after gateway config changes the default agent", async () => {
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-main-default-agent-drift-",
      cronEnabled: false,
    });

    await writeCronConfig({
      session: {
        mainKey: "main",
      },
      agents: {
        list: [{ id: "ops", default: true }],
      },
      channels: {
        telegram: {
          botToken: "telegram-token",
        },
      },
    });

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    try {
      const addRes = await rpcReq(ws, "cron.add", {
        name: "main default agent drift",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "main",
        agentId: "ops",
        wakeMode: "next-heartbeat",
        payload: { kind: "systemEvent", text: "hello" },
      });
      expect(addRes.ok).toBe(true);
      const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
      const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
      expect(jobId.length > 0).toBe(true);

      const { writeConfigFile } = await import("../config/config.js");
      await writeConfigFile({
        session: {
          mainKey: "main",
        },
        agents: {
          list: [{ id: "main", default: true }, { id: "ops" }],
        },
        channels: {
          telegram: {
            botToken: "telegram-token",
          },
        },
      });

      let agentIds: string[] = [];
      for (let i = 0; i < 20; i += 1) {
        const agentsRes = await rpcReq<{ agents?: Array<{ id?: string }> }>(ws, "agents.list", {});
        expect(agentsRes.ok).toBe(true);
        agentIds = (agentsRes.payload?.agents ?? [])
          .map((agent) => agent.id)
          .filter((id): id is string => typeof id === "string");
        if (agentIds.includes("main") && agentIds.includes("ops")) {
          break;
        }
        await yieldToEventLoop();
      }
      expect(agentIds).toContain("main");
      expect(agentIds).toContain("ops");

      const updateRes = await rpcReq(ws, "cron.update", {
        id: jobId,
        patch: {
          delivery: { mode: "announce", channel: "telegram", to: "19098680" },
        },
      });

      if (!updateRes.ok) {
        throw new Error(updateRes.error?.message ?? "cron.update failed");
      }
      expect(updateRes.ok).toBe(true);
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  });

  test("ignores ambient disabled channel env when validating announce delivery", async () => {
    vi.stubEnv("SLACK_BOT_TOKEN", "xoxb-ambient");
    vi.stubEnv("TELEGRAM_BOT_TOKEN", "ambient-telegram");
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-ambient-disabled-delivery-",
      cronEnabled: false,
    });

    await writeCronConfig({
      session: {
        mainKey: "main",
      },
      plugins: {
        allow: ["memory-core"],
      },
    });

    const cronState = await createDirectCronState();

    try {
      const addRes = await directCronReq(cronState, "cron.add", {
        name: "ambient disabled announce",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "isolated",
        wakeMode: "next-heartbeat",
        payload: { kind: "agentTurn", message: "hello" },
        delivery: { mode: "announce" },
      });

      expect(addRes.ok).toBe(true);
    } finally {
      await cleanupCronTestRun({ cronState, prevSkipCron });
    }
  });

  test("writes cron run history and auto-runs due jobs", async () => {
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-log-",
    });

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    try {
      const atMs = Date.now() - 1;
      const addRes = await rpcReq(ws, "cron.add", {
        name: "log test",
        enabled: true,
        schedule: { kind: "at", at: new Date(atMs).toISOString() },
        sessionTarget: "main",
        wakeMode: "next-heartbeat",
        payload: { kind: "systemEvent", text: "hello" },
      });
      expect(addRes.ok).toBe(true);
      const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
      const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
      expect(jobId.length > 0).toBe(true);

      const finishedRun = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === jobId && payload?.action === "finished",
      );
      const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000);
      expect(runRes.ok).toBe(true);
      expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
      const finishedPayload = await finishedRun;
      expect(finishedPayload).toMatchObject({
        jobId,
        action: "finished",
        status: "ok",
        summary: "hello",
        deliveryStatus: "not-requested",
      });

      const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 });
      expect(runsRes.ok).toBe(true);
      const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
      expect(Array.isArray(entries)).toBe(true);
      expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
      expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe("hello");
      expect((entries as Array<{ deliveryStatus?: unknown }>).at(-1)?.deliveryStatus).toBe(
        "not-requested",
      );
      const allRunsRes = await rpcReq(ws, "cron.runs", {
        scope: "all",
        limit: 50,
        statuses: ["ok"],
      });
      expect(allRunsRes.ok).toBe(true);
      const allEntries = (allRunsRes.payload as { entries?: unknown } | null)?.entries;
      expect(Array.isArray(allEntries)).toBe(true);
      expect(
        (allEntries as Array<{ jobId?: unknown }>).some((entry) => entry.jobId === jobId),
      ).toBe(true);

      const statusRes = await rpcReq(ws, "cron.status", {});
      expect(statusRes.ok).toBe(true);
      const statusPayload = statusRes.payload as
        | { enabled?: unknown; storePath?: unknown }
        | undefined;
      expect(statusPayload?.enabled).toBe(true);
      const storePath = typeof statusPayload?.storePath === "string" ? statusPayload.storePath : "";
      expect(storePath).toContain("jobs.json");

      const autoRes = await rpcReq(ws, "cron.add", {
        name: "auto run test",
        enabled: true,
        schedule: { kind: "at", at: new Date(Date.now() + 200).toISOString() },
        sessionTarget: "main",
        wakeMode: "next-heartbeat",
        payload: { kind: "systemEvent", text: "auto" },
      });
      expect(autoRes.ok).toBe(true);
      const autoJobIdValue = (autoRes.payload as { id?: unknown } | null)?.id;
      const autoJobId = typeof autoJobIdValue === "string" ? autoJobIdValue : "";
      expect(autoJobId.length > 0).toBe(true);

      await waitForCronEvent(
        ws,
        (payload) => payload?.jobId === autoJobId && payload?.action === "finished",
      );
      const autoEntries = (await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 })).payload as
        | { entries?: Array<{ jobId?: unknown }> }
        | undefined;
      expect(Array.isArray(autoEntries?.entries)).toBe(true);
      const runs = autoEntries?.entries ?? [];
      expect(runs.at(-1)?.jobId).toBe(autoJobId);
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  }, 45_000);

  test("fails closed for persisted unsafe custom session ids", async () => {
    const now = Date.now();
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-persisted-bad-session-target-",
      cronEnabled: false,
      jobs: [
        {
          id: "bad-custom-session-job",
          name: "bad custom session job",
          enabled: true,
          createdAtMs: now,
          updatedAtMs: now,
          schedule: { kind: "every", everyMs: 60_000 },
          sessionTarget: "session:../../outside",
          wakeMode: "now",
          payload: { kind: "agentTurn", message: "hello" },
          state: {},
        },
      ],
    });

    cronIsolatedRun.mockClear();
    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    try {
      const runRes = await rpcReq(ws, "cron.run", {
        id: "bad-custom-session-job",
        mode: "force",
      });
      expect(runRes.ok).toBe(true);
      expect(runRes.payload).toEqual({ ok: true, ran: false, reason: "invalid-spec" });
      expect(cronIsolatedRun).not.toHaveBeenCalled();
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  });

  test("returns from cron.run immediately while isolated work continues in background", async () => {
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-run-detached-",
      cronEnabled: false,
    });

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    let resolveRun: ((value: { status: "ok"; summary: string }) => void) | undefined;
    cronIsolatedRun.mockImplementationOnce(
      () =>
        new Promise((resolve) => {
          resolveRun = resolve as (value: { status: "ok"; summary: string }) => void;
        }),
    );

    try {
      const addRes = await rpcReq(ws, "cron.add", {
        name: "detached run test",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "isolated",
        wakeMode: "next-heartbeat",
        payload: { kind: "agentTurn", message: "do work" },
        delivery: { mode: "none" },
      });
      expect(addRes.ok).toBe(true);
      const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
      const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
      expect(jobId.length > 0).toBe(true);

      const startedRun = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === jobId && payload?.action === "started",
      );
      const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 1_000);
      expect(runRes.ok).toBe(true);
      expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
      await startedRun;
      expect(cronIsolatedRun).toHaveBeenCalledTimes(1);

      const finishedRun = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === jobId && payload?.action === "finished",
      );
      resolveRun?.({ status: "ok", summary: "background finished" });
      const finishedPayload = await finishedRun;
      expect(finishedPayload).toMatchObject({
        jobId,
        action: "finished",
        status: "ok",
        summary: "background finished",
      });
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  });

  test("returns already-running without starting background work", async () => {
    const now = Date.now();
    let resolveRun: ((result: { status: "ok"; summary: string }) => void) | undefined;
    cronIsolatedRun.mockImplementationOnce(
      () =>
        new Promise((resolve) => {
          resolveRun = resolve;
        }),
    );

    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-run-busy-",
      cronEnabled: false,
      jobs: [
        {
          id: "busy-job",
          name: "busy job",
          enabled: true,
          createdAtMs: now - 60_000,
          updatedAtMs: now - 60_000,
          schedule: { kind: "at", at: new Date(now + 60_000).toISOString() },
          sessionTarget: "isolated",
          wakeMode: "next-heartbeat",
          payload: { kind: "agentTurn", message: "still busy" },
          delivery: { mode: "none" },
          state: {
            nextRunAtMs: now + 60_000,
          },
        },
      ],
    });

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    try {
      const startedRun = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === "busy-job" && payload?.action === "started",
      );
      const firstRunRes = await rpcReq(ws, "cron.run", { id: "busy-job", mode: "force" }, 1_000);
      expect(firstRunRes.ok).toBe(true);
      expect(firstRunRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
      await startedRun;
      expect(cronIsolatedRun).toHaveBeenCalledTimes(1);

      const secondRunRes = await rpcReq(ws, "cron.run", { id: "busy-job", mode: "force" }, 1_000);
      expect(secondRunRes.ok).toBe(true);
      expect(secondRunRes.payload).toEqual({ ok: true, ran: false, reason: "already-running" });
      expect(cronIsolatedRun).toHaveBeenCalledTimes(1);

      const finishedRun = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === "busy-job" && payload?.action === "finished",
      );
      resolveRun?.({ status: "ok", summary: "busy done" });
      await finishedRun;
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  });

  test("returns not-due without starting background work", async () => {
    const now = Date.now();
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-run-not-due-",
      cronEnabled: false,
      jobs: [
        {
          id: "future-job",
          name: "future job",
          enabled: true,
          createdAtMs: now - 60_000,
          updatedAtMs: now - 60_000,
          schedule: { kind: "at", at: new Date(now + 60_000).toISOString() },
          sessionTarget: "isolated",
          wakeMode: "next-heartbeat",
          payload: { kind: "agentTurn", message: "not yet" },
          delivery: { mode: "none" },
          state: {
            nextRunAtMs: now + 60_000,
          },
        },
      ],
    });

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);
    cronIsolatedRun.mockClear();

    try {
      const runRes = await rpcReq(ws, "cron.run", { id: "future-job", mode: "due" }, 1_000);
      expect(runRes.ok).toBe(true);
      expect(runRes.payload).toEqual({ ok: true, ran: false, reason: "not-due" });
      expect(cronIsolatedRun).not.toHaveBeenCalled();
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  });

  test("posts webhooks for delivery mode and legacy notify fallback only when summary exists", async () => {
    const legacyNotifyJob = {
      id: "legacy-notify-job",
      name: "legacy notify job",
      enabled: true,
      notify: true,
      createdAtMs: Date.now(),
      updatedAtMs: Date.now(),
      schedule: { kind: "every", everyMs: 60_000 },
      sessionTarget: "main",
      wakeMode: "next-heartbeat",
      payload: { kind: "systemEvent", text: "legacy webhook" },
      state: {},
    };
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-webhook-",
      cronEnabled: false,
      jobs: [legacyNotifyJob],
    });

    await writeCronConfig({
      cron: {
        webhook: "https://legacy.example.invalid/cron-finished",
        webhookToken: "cron-webhook-token",
      },
    });

    fetchWithSsrFGuardMock.mockClear();

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    try {
      const invalidWebhookRes = await rpcReq(ws, "cron.add", {
        name: "invalid webhook",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "main",
        wakeMode: "next-heartbeat",
        payload: { kind: "systemEvent", text: "invalid" },
        delivery: { mode: "webhook", to: "ftp://example.invalid/cron-finished" },
      });
      expect(invalidWebhookRes.ok).toBe(false);

      const notifyJobId = await addWebhookCronJob({
        ws,
        name: "webhook enabled",
        delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
      });
      await runCronJobAndWaitForFinished(ws, notifyJobId);
      const notifyCall = getWebhookCall(0);
      expect(notifyCall.url).toBe("https://example.invalid/cron-finished");
      expect(notifyCall.init.method).toBe("POST");
      expect(notifyCall.init.headers?.Authorization).toBe("Bearer cron-webhook-token");
      expect(notifyCall.init.headers?.["Content-Type"]).toBe("application/json");
      const notifyBody = notifyCall.body;
      expect(notifyBody.action).toBe("finished");
      expect(notifyBody.jobId).toBe(notifyJobId);

      const legacyFinished = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === "legacy-notify-job" && payload?.action === "finished",
      );
      const legacyRunRes = await rpcReq(
        ws,
        "cron.run",
        { id: "legacy-notify-job", mode: "force" },
        20_000,
      );
      expect(legacyRunRes.ok).toBe(true);
      expect(legacyRunRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
      await legacyFinished;
      const legacyCall = getWebhookCall(1);
      expect(legacyCall.url).toBe("https://legacy.example.invalid/cron-finished");
      expect(legacyCall.init.method).toBe("POST");
      expect(legacyCall.init.headers?.Authorization).toBe("Bearer cron-webhook-token");
      const legacyBody = legacyCall.body;
      expect(legacyBody.action).toBe("finished");
      expect(legacyBody.jobId).toBe("legacy-notify-job");

      const silentRes = await rpcReq(ws, "cron.add", {
        name: "webhook disabled",
        enabled: true,
        schedule: { kind: "every", everyMs: 60_000 },
        sessionTarget: "main",
        wakeMode: "next-heartbeat",
        payload: { kind: "systemEvent", text: "do not send" },
      });
      expect(silentRes.ok).toBe(true);
      const silentJobIdValue = (silentRes.payload as { id?: unknown } | null)?.id;
      const silentJobId = typeof silentJobIdValue === "string" ? silentJobIdValue : "";
      expect(silentJobId.length > 0).toBe(true);

      const silentFinished = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === silentJobId && payload?.action === "finished",
      );
      const silentRunRes = await rpcReq(ws, "cron.run", { id: silentJobId, mode: "force" }, 20_000);
      expect(silentRunRes.ok).toBe(true);
      expect(silentRunRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
      await silentFinished;
      expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(2);

      fetchWithSsrFGuardMock.mockClear();
      cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "delivery failed" });
      const failureDestJobId = await addWebhookCronJob({
        ws,
        name: "failure destination webhook",
        sessionTarget: "isolated",
        delivery: {
          mode: "announce",
          channel: "telegram",
          to: "19098680",
          failureDestination: {
            mode: "webhook",
            to: "https://example.invalid/failure-destination",
          },
        },
      });
      const failureDestFinished = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === failureDestJobId && payload?.action === "finished",
      );
      await runCronJobForce(ws, failureDestJobId);
      await failureDestFinished;
      const failureDestCall = getWebhookCall(0);
      expect(failureDestCall.url).toBe("https://example.invalid/failure-destination");
      const failureDestBody = failureDestCall.body;
      expect(failureDestBody.message).toBe(
        'Cron job "failure destination webhook" failed: unknown error',
      );

      fetchWithSsrFGuardMock.mockClear();
      cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "best-effort failed" });
      const bestEffortFailureDestJobId = await addWebhookCronJob({
        ws,
        name: "best effort failure destination webhook",
        sessionTarget: "isolated",
        delivery: {
          mode: "announce",
          channel: "telegram",
          to: "19098680",
          bestEffort: true,
          failureDestination: {
            mode: "webhook",
            to: "https://example.invalid/failure-destination",
          },
        },
      });
      const bestEffortFailureDestFinished = waitForCronEvent(
        ws,
        (payload) =>
          payload?.jobId === bestEffortFailureDestJobId && payload?.action === "finished",
      );
      await runCronJobForce(ws, bestEffortFailureDestJobId);
      await bestEffortFailureDestFinished;
      expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();

      cronIsolatedRun.mockResolvedValueOnce({ status: "ok", summary: "" });
      const noSummaryJobId = await addWebhookCronJob({
        ws,
        name: "webhook no summary",
        sessionTarget: "isolated",
        delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
      });
      const noSummaryFinished = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === noSummaryJobId && payload?.action === "finished",
      );
      await runCronJobForce(ws, noSummaryJobId);
      await noSummaryFinished;
      expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  }, 60_000);

  test("falls back to the primary delivery channel on job failure and preserves sessionKey", async () => {
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-failure-primary-fallback-",
      cronEnabled: false,
    });

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    try {
      cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "delivery failed" });
      const jobId = await addWebhookCronJob({
        ws,
        name: "primary delivery fallback",
        sessionTarget: "isolated",
        delivery: {
          mode: "announce",
          channel: "last",
        },
      });

      const updateRes = await rpcReq(ws, "cron.update", {
        id: jobId,
        patch: {
          sessionKey: "agent:main:telegram:direct:123:thread:99",
        },
      });
      expect(updateRes.ok).toBe(true);

      const finished = waitForCronEvent(
        ws,
        (payload) => payload?.jobId === jobId && payload?.action === "finished",
      );
      await runCronJobForce(ws, jobId);
      await finished;

      expect(sendFailureNotificationAnnounceMock).toHaveBeenCalledTimes(1);
      expect(sendFailureNotificationAnnounceMock).toHaveBeenCalledWith(
        expect.anything(),
        expect.anything(),
        expect.any(String),
        jobId,
        {
          channel: "last",
          to: undefined,
          accountId: undefined,
          sessionKey: "agent:main:telegram:direct:123:thread:99",
        },
        '⚠️ Cron job "primary delivery fallback" failed: unknown error',
      );
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  }, 45_000);

  test("ignores non-string cron.webhookToken values without crashing webhook delivery", async () => {
    const { prevSkipCron } = await setupCronTestRun({
      tempPrefix: "openclaw-gw-cron-webhook-secretinput-",
      cronEnabled: false,
    });

    await writeCronConfig({
      cron: {
        webhookToken: {
          opaque: true,
        },
      },
    });

    fetchWithSsrFGuardMock.mockClear();

    const { server, ws } = await startServerWithClient();
    await connectOk(ws);

    try {
      const notifyJobId = await addWebhookCronJob({
        ws,
        name: "webhook secretinput object",
        delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
      });
      await runCronJobAndWaitForFinished(ws, notifyJobId);
      const [notifyArgs] = fetchWithSsrFGuardMock.mock.calls[0] as unknown as [
        {
          url?: string;
          init?: {
            method?: string;
            headers?: Record<string, string>;
          };
        },
      ];
      expect(notifyArgs.url).toBe("https://example.invalid/cron-finished");
      expect(notifyArgs.init?.method).toBe("POST");
      expect(notifyArgs.init?.headers?.Authorization).toBeUndefined();
      expect(notifyArgs.init?.headers?.["Content-Type"]).toBe("application/json");
    } finally {
      await cleanupCronTestRun({ ws, server, prevSkipCron });
    }
  }, 45_000);
});

¤ Dauer der Verarbeitung: 0.42 Sekunden  (vorverarbeitet am  2026-04-27) ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge