// If we're not streaming block replies, ensure the final payload includes // the final text even when interim streaming was enabled. if (state.includeReasoning && text && !params.onBlockReply) { if (assistantTexts.length > state.assistantTextBaseline) {
assistantTexts.splice(
state.assistantTextBaseline,
assistantTexts.length - state.assistantTextBaseline,
text,
);
rememberAssistantText(text);
} else {
pushAssistantText(text);
}
state.suppressBlockChunks = true;
} elseif (!addedDuringMessage && !chunkerHasBuffered && text) { // Non-streaming models (no text_delta): ensure assistantTexts gets the final // text when the chunker has nothing buffered to drain.
pushAssistantText(text);
}
// ── Messaging tool duplicate detection ────────────────────────────────────── // Track texts sent via messaging tools to suppress duplicate block replies. // Only committed (successful) texts are checked - pending texts are tracked // to support commit logic but not used for suppression (avoiding lost messages on tool failure). // These tools can send messages via sendMessage/threadReply actions (or sessions_send with message). const MAX_MESSAGING_SENT_TEXTS = 200; const MAX_MESSAGING_SENT_TARGETS = 200; const MAX_MESSAGING_SENT_MEDIA_URLS = 200; const trimMessagingToolSent = () => { if (messagingToolSentTexts.length > MAX_MESSAGING_SENT_TEXTS) { const overflow = messagingToolSentTexts.length - MAX_MESSAGING_SENT_TEXTS;
messagingToolSentTexts.splice(0, overflow);
messagingToolSentTextsNormalized.splice(0, overflow);
} if (messagingToolSentTargets.length > MAX_MESSAGING_SENT_TARGETS) { const overflow = messagingToolSentTargets.length - MAX_MESSAGING_SENT_TARGETS;
messagingToolSentTargets.splice(0, overflow);
} if (messagingToolSentMediaUrls.length > MAX_MESSAGING_SENT_MEDIA_URLS) { const overflow = messagingToolSentMediaUrls.length - MAX_MESSAGING_SENT_MEDIA_URLS;
messagingToolSentMediaUrls.splice(0, overflow);
}
};
const ensureCompactionPromise = () => { if (!state.compactionRetryPromise) { // Create a single promise that resolves when ALL pending compactions complete // (tracked by pendingCompactionRetry counter, decremented in resolveCompactionRetry)
state.compactionRetryPromise = new Promise((resolve, reject) => {
state.compactionRetryResolve = resolve;
state.compactionRetryReject = reject;
}); // Prevent unhandled rejection if rejected after all consumers have resolved
state.compactionRetryPromise.catch((err) => {
log.debug(`compaction promise rejected (no waiter): ${String(err)}`);
});
}
};
// 1. Handle <think> blocks (stateful, strip content inside)
let processed = "";
THINKING_TAG_SCAN_RE.lastIndex = 0;
let lastIndex = 0;
let inThinking = state.thinking; for (const match of text.matchAll(THINKING_TAG_SCAN_RE)) { const idx = match.index ?? 0; if (codeSpans.isInside(idx)) { continue;
} if (!inThinking) {
processed += text.slice(lastIndex, idx);
} const isClose = match[1] === "/";
inThinking = !isClose;
lastIndex = idx + match[0].length;
} if (!inThinking) {
processed += text.slice(lastIndex);
}
state.thinking = inThinking;
// 2. Handle <final> blocks (stateful, strip content OUTSIDE) // If enforcement is disabled, we still strip the tags themselves to prevent // hallucinations (e.g. Minimax copying the style) from leaking, but we // do not enforce buffering/extraction logic. const finalCodeSpans = buildCodeSpanIndex(processed, inlineStateStart); if (!params.enforceFinalTag) {
state.inlineCode = finalCodeSpans.inlineState;
FINAL_TAG_SCAN_RE.lastIndex = 0; return stripTagsOutsideCodeSpans(processed, FINAL_TAG_SCAN_RE, finalCodeSpans.isInside);
}
// If enforcement is enabled, only return text that appeared inside a <final> block.
let result = "";
FINAL_TAG_SCAN_RE.lastIndex = 0;
let lastFinalIndex = 0;
let inFinal = state.final;
let everInFinal = state.final;
for (const match of processed.matchAll(FINAL_TAG_SCAN_RE)) { const idx = match.index ?? 0; if (finalCodeSpans.isInside(idx)) { continue;
} const isClose = match[1] === "/";
if (!inFinal && !isClose) { // Found <final> start tag.
inFinal = true;
everInFinal = true;
lastFinalIndex = idx + match[0].length;
} elseif (inFinal && isClose) { // Found </final> end tag.
result += processed.slice(lastFinalIndex, idx);
inFinal = false;
lastFinalIndex = idx + match[0].length;
}
}
if (inFinal) {
result += processed.slice(lastFinalIndex);
}
state.final = inFinal;
// Strict Mode: If enforcing final tags, we MUST NOT return content unless // we have seen a <final> tag. Otherwise, we leak "thinking out loud" text // (e.g. "**Locating Manulife**...") that the model emitted without <think> tags. if (!everInFinal) { return"";
}
// Hardened Cleanup: Remove any remaining <final> tags that might have been // missed (e.g. nested tags or hallucinations) to prevent leakage. const resultCodeSpans = buildCodeSpanIndex(result, inlineStateStart);
state.inlineCode = resultCodeSpans.inlineState; return stripTagsOutsideCodeSpans(result, FINAL_TAG_SCAN_RE, resultCodeSpans.isInside);
};
const stripTagsOutsideCodeSpans = (
text: string,
pattern: RegExp,
isInside: (index: number) => boolean,
) => {
let output = "";
let lastIndex = 0;
pattern.lastIndex = 0; for (const match of text.matchAll(pattern)) { const idx = match.index ?? 0; if (isInside(idx)) { continue;
}
output += text.slice(lastIndex, idx);
lastIndex = idx + match[0].length;
}
output += text.slice(lastIndex); return output;
};
const emitBlockChunk = (text: string, options?: { assistantMessageIndex?: number }) => { if (state.suppressBlockChunks || params.silentExpected) { return;
} // Strip <think> and <final> blocks across chunk boundaries to avoid leaking reasoning. // Also strip downgraded tool call text ([Tool Call: ...], [Historical context: ...], etc.). const chunk = stripDowngradedToolCallText(stripBlockTags(text, state.blockState)).trimEnd(); if (!chunk) { return;
} if (chunk === state.lastBlockReplyText) { return;
}
// Only check committed (successful) messaging tool texts - checking pending texts // is risky because if the tool fails after suppression, the user gets no response const normalizedChunk = normalizeTextForComparison(chunk); if (isMessagingToolDuplicateNormalized(normalizedChunk, messagingToolSentTextsNormalized)) {
log.debug(`Skipping block reply - already sent via messaging tool: ${chunk.slice(0, 50)}...`); return;
}
const unsubscribe = () => { if (state.unsubscribed) { return;
} // Mark as unsubscribed FIRST to prevent waitForCompactionRetry from creating // new un-resolvable promises during teardown.
state.unsubscribed = true; // Reject pending compaction wait to unblock awaiting code. // Don't resolve, as that would incorrectly signal "compaction complete" when it's still in-flight. if (state.compactionRetryPromise) {
log.debug(`unsubscribe: rejecting compaction wait runId=${params.runId}`); const reject = state.compactionRetryReject;
state.compactionRetryResolve = undefined;
state.compactionRetryReject = undefined;
state.compactionRetryPromise = null; // Reject with AbortError so it's caught by isAbortError() check in cleanup paths const abortErr = new Error("Unsubscribed during compaction");
abortErr.name = "AbortError";
reject?.(abortErr);
} // Cancel any in-flight compaction to prevent resource leaks when unsubscribing. // Only abort if compaction is actually running to avoid unnecessary work. if (params.session.isCompacting) {
log.debug(`unsubscribe: aborting in-flight compaction runId=${params.runId}`); try {
params.session.abortCompaction();
} catch (err) {
log.warn(`unsubscribe: compaction abort failed runId=${params.runId} err=${String(err)}`);
}
}
sessionUnsubscribe();
};
return {
assistantTexts,
toolMetas,
unsubscribe,
setTerminalLifecycleMeta: (meta: {
replayInvalid?: boolean;
livenessState?: EmbeddedRunLivenessState;
}) => { if (typeof meta.replayInvalid === "boolean") {
state.replayState = { ...state.replayState, replayInvalid: meta.replayInvalid };
} if (meta.livenessState) {
state.livenessState = meta.livenessState;
}
},
isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0,
isCompactionInFlight: () => state.compactionInFlight,
getMessagingToolSentTexts: () => messagingToolSentTexts.slice(),
getMessagingToolSentMediaUrls: () => messagingToolSentMediaUrls.slice(),
getMessagingToolSentTargets: () => messagingToolSentTargets.slice(),
getSuccessfulCronAdds: () => state.successfulCronAdds,
getReplayState: () => ({ ...state.replayState }), // Returns true if any messaging tool successfully sent a message. // Used to suppress agent's confirmation text (e.g., "Respondi no Telegram!") // which is generated AFTER the tool sends the actual answer.
didSendViaMessagingTool: () => messagingToolSentTexts.length > 0,
didSendDeterministicApprovalPrompt: () => state.deterministicApprovalPromptSent,
getLastToolError: () => (state.lastToolError ? { ...state.lastToolError } : undefined),
getUsageTotals,
getCompactionCount: () => compactionCount,
getItemLifecycle: () => ({
startedCount: state.itemStartedCount,
completedCount: state.itemCompletedCount,
activeCount: state.itemActiveIds.size,
}),
waitForCompactionRetry: () => { // Reject after unsubscribe so callers treat it as cancellation, not success if (state.unsubscribed) { const err = new Error("Unsubscribed during compaction wait");
err.name = "AbortError"; return Promise.reject(err);
} if (state.compactionInFlight || state.pendingCompactionRetry > 0) {
ensureCompactionPromise(); return state.compactionRetryPromise ?? Promise.resolve();
} returnnew Promise<void>((resolve, reject) => {
queueMicrotask(() => { if (state.unsubscribed) { const err = new Error("Unsubscribed during compaction wait");
err.name = "AbortError";
reject(err); return;
} if (state.compactionInFlight || state.pendingCompactionRetry > 0) {
ensureCompactionPromise(); void (state.compactionRetryPromise ?? Promise.resolve()).then(resolve, reject);
} else {
resolve();
}
});
});
},
};
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.