Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/Java/Openclaw/src/gateway/server-methods/   (KI Agentensystem Version 22©)  Datei vom 26.3.2026 mit Größe 6 kB image not shown  

Quelle  agent-wait-dedupe.ts

  Sprache: JAVA
 

Spracherkennung für: .ts vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

import { setSafeTimeout } from "../../utils/timer-delay.js";
import type { DedupeEntry } from "../server-shared.js";

export type AgentWaitTerminalSnapshot = {
  status: "ok" | "error" | "timeout";
  startedAt?: number;
  endedAt?: number;
  error?: string;
};

const AGENT_WAITERS_BY_RUN_ID = new Map<string, Set<() => void>>();

function parseRunIdFromDedupeKey(key: string): string | null {
  if (key.startsWith("agent:")) {
    return key.slice("agent:".length) || null;
  }
  if (key.startsWith("chat:")) {
    return key.slice("chat:".length) || null;
  }
  return null;
}

function asFiniteNumber(value: unknown): number | undefined {
  return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}

function removeWaiter(runId: string, waiter: () => void): void {
  const waiters = AGENT_WAITERS_BY_RUN_ID.get(runId);
  if (!waiters) {
    return;
  }
  waiters.delete(waiter);
  if (waiters.size === 0) {
    AGENT_WAITERS_BY_RUN_ID.delete(runId);
  }
}

function addWaiter(runId: string, waiter: () => void): () => void {
  const normalizedRunId = runId.trim();
  if (!normalizedRunId) {
    return () => {};
  }
  const existing = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId);
  if (existing) {
    existing.add(waiter);
    return () => removeWaiter(normalizedRunId, waiter);
  }
  AGENT_WAITERS_BY_RUN_ID.set(normalizedRunId, new Set([waiter]));
  return () => removeWaiter(normalizedRunId, waiter);
}

function notifyWaiters(runId: string): void {
  const normalizedRunId = runId.trim();
  if (!normalizedRunId) {
    return;
  }
  const waiters = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId);
  if (!waiters || waiters.size === 0) {
    return;
  }
  for (const waiter of waiters) {
    waiter();
  }
}

export function readTerminalSnapshotFromDedupeEntry(
  entry: DedupeEntry,
): AgentWaitTerminalSnapshot | null {
  const payload = entry.payload as
    | {
        status?: unknown;
        startedAt?: unknown;
        endedAt?: unknown;
        error?: unknown;
        summary?: unknown;
      }
    | undefined;
  const status = typeof payload?.status === "string" ? payload.status : undefined;
  if (status === "accepted" || status === "started" || status === "in_flight") {
    return null;
  }

  const startedAt = asFiniteNumber(payload?.startedAt);
  const endedAt = asFiniteNumber(payload?.endedAt) ?? entry.ts;
  const errorMessage =
    typeof payload?.error === "string"
      ? payload.error
      : typeof payload?.summary === "string"
        ? payload.summary
        : entry.error?.message;

  if (status === "ok" || status === "timeout") {
    return {
      status,
      startedAt,
      endedAt,
      error: status === "timeout" ? errorMessage : undefined,
    };
  }
  if (status === "error" || !entry.ok) {
    return {
      status: "error",
      startedAt,
      endedAt,
      error: errorMessage,
    };
  }
  return null;
}

export function readTerminalSnapshotFromGatewayDedupe(params: {
  dedupe: Map<string, DedupeEntry>;
  runId: string;
  ignoreAgentTerminalSnapshot?: boolean;
}): AgentWaitTerminalSnapshot | null {
  if (params.ignoreAgentTerminalSnapshot) {
    const chatEntry = params.dedupe.get(`chat:${params.runId}`);
    if (!chatEntry) {
      return null;
    }
    return readTerminalSnapshotFromDedupeEntry(chatEntry);
  }

  const chatEntry = params.dedupe.get(`chat:${params.runId}`);
  const chatSnapshot = chatEntry ? readTerminalSnapshotFromDedupeEntry(chatEntry) : null;

  const agentEntry = params.dedupe.get(`agent:${params.runId}`);
  const agentSnapshot = agentEntry ? readTerminalSnapshotFromDedupeEntry(agentEntry) : null;
  if (agentEntry) {
    if (!agentSnapshot) {
      // If agent is still in-flight, only trust chat if it was written after
      // this agent entry (indicating a newer completed chat run reused runId).
      if (chatSnapshot && chatEntry && chatEntry.ts > agentEntry.ts) {
        return chatSnapshot;
      }
      return null;
    }
  }

  if (agentSnapshot && chatSnapshot && agentEntry && chatEntry) {
    // Reused idempotency keys can leave both records present. Prefer the
    // freshest terminal snapshot so callers observe the latest run outcome.
    return chatEntry.ts > agentEntry.ts ? chatSnapshot : agentSnapshot;
  }

  return agentSnapshot ?? chatSnapshot;
}

export async function waitForTerminalGatewayDedupe(params: {
  dedupe: Map<string, DedupeEntry>;
  runId: string;
  timeoutMs: number;
  signal?: AbortSignal;
  ignoreAgentTerminalSnapshot?: boolean;
}): Promise<AgentWaitTerminalSnapshot | null> {
  const initial = readTerminalSnapshotFromGatewayDedupe(params);
  if (initial) {
    return initial;
  }
  if (params.timeoutMs <= 0 || params.signal?.aborted) {
    return null;
  }

  return await new Promise((resolve) => {
    let settled = false;
    let timeoutHandle: NodeJS.Timeout | undefined;
    let onAbort: (() => void) | undefined;
    let removeWaiter: (() => void) | undefined;

    const finish = (snapshot: AgentWaitTerminalSnapshot | null) => {
      if (settled) {
        return;
      }
      settled = true;
      if (timeoutHandle) {
        clearTimeout(timeoutHandle);
      }
      if (onAbort) {
        params.signal?.removeEventListener("abort", onAbort);
      }
      removeWaiter?.();
      resolve(snapshot);
    };

    const onWake = () => {
      const snapshot = readTerminalSnapshotFromGatewayDedupe(params);
      if (snapshot) {
        finish(snapshot);
      }
    };

    removeWaiter = addWaiter(params.runId, onWake);
    onWake();
    if (settled) {
      return;
    }

    timeoutHandle = setSafeTimeout(() => finish(null), params.timeoutMs);
    timeoutHandle.unref?.();

    onAbort = () => finish(null);
    params.signal?.addEventListener("abort", onAbort, { once: true });
  });
}

export function setGatewayDedupeEntry(params: {
  dedupe: Map<string, DedupeEntry>;
  key: string;
  entry: DedupeEntry;
}) {
  params.dedupe.set(params.key, params.entry);
  const runId = parseRunIdFromDedupeKey(params.key);
  if (!runId) {
    return;
  }
  const snapshot = readTerminalSnapshotFromDedupeEntry(params.entry);
  if (!snapshot) {
    return;
  }
  notifyWaiters(runId);
}

export const __testing = {
  getWaiterCount(runId?: string): number {
    if (runId) {
      return AGENT_WAITERS_BY_RUN_ID.get(runId)?.size ?? 0;
    }
    let total = 0;
    for (const waiters of AGENT_WAITERS_BY_RUN_ID.values()) {
      total += waiters.size;
    }
    return total;
  },
  resetWaiters() {
    AGENT_WAITERS_BY_RUN_ID.clear();
  },
};

¤ Dauer der Verarbeitung: 0.20 Sekunden  (vorverarbeitet am  2026-04-27) ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.