import OpenAI from "openai" import { logMessage } from "../db.js" import { buildCompletionMessages, rememberRecalledMemoryChunks } from "../memory.js" import { recordMetric } from "../metrics.js" import { emit } from "../stream.js" import type { LoopState } from "./types.js" import { API_BASE, ENABLE_THINKING, FALLBACK_BASE, FALLBACK_MODEL, FALLBACK_TOOL_CHOICE, MODEL, PRIMARY_TOOL_CHOICE, SUMMARY_MODEL, TOOLS, USE_FALLBACK, apiErrorDetails, client, errorSummary, estimatePromptTokens, fallbackClient, fallbackContextWindow, findSummaryMessageIndex, isPromptTooLargeError, retryDelayMs, sanitizeMessages, shouldFallback, summaryClient, summarizeConversationViaLLM, } from "./util.js" import { assistantContentText } from "./loop-content.js" import type { CompletionRequest, CompletionTurnResult, ToolCallAssembly } from "./loop-shared.js" /** * Resolves the configured summary client/model pair. * * @returns Active summary provider config. */ export function configuredSummaryProvider(): { client: OpenAI | null; model: string } { if (summaryClient && SUMMARY_MODEL) return { client: summaryClient, model: SUMMARY_MODEL } return { client: USE_FALLBACK ? fallbackClient : client, model: USE_FALLBACK ? FALLBACK_MODEL : MODEL, } } function logApiError(err: unknown, context: string): void { if (!(err instanceof OpenAI.APIError)) return console.error(`[api] ${err.status} ${err.message} - ${context}`) for (const line of apiErrorDetails(err)) console.error(line) } /** * Appends an assistant message to state and persists it to conversation logs. * * @param convId - Active conversation id. * @param state - Mutable loop state. * @param msg - Assistant message to append. */ export function addAssistantMessage(convId: number, state: LoopState, msg: OpenAI.Chat.ChatCompletionMessage): void { state.conversation.push(msg) logMessage(convId, msg.role, msg.content ?? "", msg.tool_calls ?? undefined) } function recordPromptResponse(request: CompletionRequest, result: CompletionTurnResult, promptMetricId: number | null): void { recordMetric({ type: "prompt_response", promptMetricId: promptMetricId ?? undefined, model: request.model, toolChoice: request.tool_choice, messages: request.messages, response: result.message, usage: result.usage, }) } /** * Applies token usage from a completion response to loop state counters. * * @param state - Mutable loop state. * @param usage - Completion usage payload (if provided by the API). */ export function applyUsage(state: LoopState, usage: OpenAI.Completions.CompletionUsage | undefined): void { if (!usage) return state.tokenCount += usage.total_tokens if (usage.prompt_tokens) state.contextSize = usage.prompt_tokens console.log(`[tokens] +${usage.total_tokens} total=${state.tokenCount}`) recordMetric({ type: "usage", usage }) } /** * Emits model reasoning text when exposed by the provider. * * Supports both `reasoning_content` and `...` wrappers. * * @param msg - Assistant message to inspect for reasoning traces. */ export function emitThinking(msg: OpenAI.Chat.ChatCompletionMessage): void { const rawMsg = msg as unknown as Record let thinkingText: string | null = null if (typeof rawMsg.reasoning_content === "string" && rawMsg.reasoning_content.trim()) { thinkingText = rawMsg.reasoning_content.trim() } else if (typeof msg.content === "string") { const match = msg.content.match(/^([\s\S]*?)<\/think>\s*/i) if (match) { thinkingText = match[1]!.trim() ;(msg as unknown as Record).content = msg.content.slice(match[0].length) } } if (thinkingText) emit({ type: "thinking", text: thinkingText }) } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)) } function formatRetryAt(retryAfterMs: number): string { const retryAt = new Date(Date.now() + retryAfterMs) const local = retryAt.toLocaleString(undefined, { hour12: false, timeZoneName: "short", }) return `${local} (${retryAt.toISOString()})` } function apiErrorSearchText(err: { message: string; error?: unknown }): string { const parts = [err.message] if (err.error !== undefined) { try { parts.push(JSON.stringify(err.error)) } catch { parts.push(String(err.error)) } } return parts.join("\n") } function shouldRetryWithAutoToolChoice(err: unknown): boolean { if (!(err instanceof OpenAI.APIError)) return false const text = apiErrorSearchText(err) return /no endpoints found that support the provided 'tool_choice' value|does not support this tool_choice/i.test(text) } function shouldRetryWithoutReasoningForTools(err: unknown): boolean { if (!(err instanceof OpenAI.APIError)) return false return /function call should not be used with prefix/i.test(apiErrorSearchText(err)) } function toolCompatibleReasoningExtras( request?: Pick, ): Partial { return { include_reasoning: false, reasoning: { enabled: false, exclude: true, effort: "none" }, provider: { ...request?.provider, require_parameters: true }, } } function prefixModeToolCallExtras(request?: Pick): Partial { return { ...toolCompatibleReasoningExtras(request), enable_thinking: false, chat_template_kwargs: { ...request?.chat_template_kwargs, enable_thinking: false, }, } } function disableReasoningForToolCalls(request: CompletionRequest): CompletionRequest { return { ...request, ...prefixModeToolCallExtras(request), } } function configuredThinkingRequestExtras( request?: Pick, ): Partial { if (ENABLE_THINKING) return {} return { include_reasoning: false, reasoning: { enabled: false, exclude: true, effort: "none" }, enable_thinking: false, chat_template_kwargs: { ...request?.chat_template_kwargs, enable_thinking: false, }, } } function openRouterToolRequestExtras(baseUrl: string): Partial { if (!baseUrl.includes("openrouter.ai")) return {} return toolCompatibleReasoningExtras() } function shouldRetryWithoutStreamUsage(err: unknown): boolean { if (!(err instanceof OpenAI.APIError)) return false if (err.status !== 400) return false return /stream_options|include_usage/i.test(err.message) } function coerceReasoningToolArgument(rawValue: string): unknown { const value = rawValue.trim() if (!value) return "" if (/^true$/i.test(value)) return true if (/^false$/i.test(value)) return false if (/^null$/i.test(value)) return null if (/^-?\d+(?:\.\d+)?$/.test(value)) { const parsed = Number(value) if (Number.isFinite(parsed)) return parsed } if ( (value.startsWith("{") && value.endsWith("}")) || (value.startsWith("[") && value.endsWith("]")) || (value.startsWith('"') && value.endsWith('"')) ) { try { return JSON.parse(value) } catch { // keep raw string fallback } } return value } function parseReasoningToolCallBlock(rawBlock: string): ToolCallAssembly | null { const functionMatch = rawBlock.match(/"'\s/]+)["']?\s*>/i) if (!functionMatch || functionMatch.index === undefined) return null const functionName = functionMatch[1]?.trim() if (!functionName) return null const functionBodyStart = functionMatch.index + functionMatch[0].length const functionBodyEnd = rawBlock.indexOf("", functionBodyStart) if (functionBodyEnd < 0) return null const functionBody = rawBlock.slice(functionBodyStart, functionBodyEnd) const args: Record = {} const parameterRegex = /"'\s/]+)["']?\s*>([\s\S]*?)<\/parameter>/gi for (const match of functionBody.matchAll(parameterRegex)) { const key = match[1]?.trim() if (!key) continue args[key] = coerceReasoningToolArgument(match[2] ?? "") } return { id: "", type: "function", function: { name: functionName, arguments: JSON.stringify(args), }, } } function drainReasoningToolCallBlocks(buffer: string): { blocks: string[]; remainder: string } { const blocks: string[] = [] let remaining = buffer while (true) { const openMatch = remaining.match(/]*)?>/i) if (!openMatch || openMatch.index === undefined) { const partialStart = remaining.lastIndexOf("= 0 ? remaining.slice(partialStart) : "", } } const openStart = openMatch.index const openEnd = openStart + openMatch[0].length const closeStart = remaining.indexOf("", openEnd) if (closeStart < 0) { return { blocks, remainder: remaining.slice(openStart), } } blocks.push(remaining.slice(openEnd, closeStart)) remaining = remaining.slice(closeStart + "".length) } } async function consumeCompletionStream( stream: AsyncIterable, ): Promise { const contentParts: string[] = [] const streamedToolCalls = new Map() const reasoningToolCalls: ToolCallAssembly[] = [] let reasoningToolBuffer = "" let usage: OpenAI.Completions.CompletionUsage | undefined let emittedText = false let emittedThinking = false const reasoningParts: string[] = [] for await (const chunk of stream) { if (chunk.usage) usage = chunk.usage const choice = chunk.choices[0] if (!choice) continue const delta = choice.delta as OpenAI.Chat.ChatCompletionChunk.Choice.Delta & { reasoning_content?: string } if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) { if (ENABLE_THINKING) reasoningParts.push(delta.reasoning_content) reasoningToolBuffer += delta.reasoning_content const { blocks, remainder } = drainReasoningToolCallBlocks(reasoningToolBuffer) reasoningToolBuffer = remainder for (const block of blocks) { const parsedCall = parseReasoningToolCallBlock(block) if (!parsedCall) continue parsedCall.id = `call_reasoning_${reasoningToolCalls.length}` reasoningToolCalls.push(parsedCall) } } if (typeof delta.content === "string" && delta.content.length > 0) { if (ENABLE_THINKING && !emittedThinking && reasoningParts.length > 0) { emit({ type: "thinking", text: reasoningParts.join("") }) emittedThinking = true } contentParts.push(delta.content) emit({ type: "text", text: delta.content }) emittedText = true } if (!Array.isArray(delta.tool_calls)) continue for (const partial of delta.tool_calls) { const index = partial.index ?? 0 const existing = streamedToolCalls.get(index) ?? { id: partial.id ?? `call_${index}`, type: "function" as const, function: { name: "", arguments: "" }, } if (partial.id) existing.id = partial.id if (partial.type === "function") existing.type = "function" if (partial.function?.name) existing.function.name += partial.function.name if (partial.function?.arguments) existing.function.arguments += partial.function.arguments streamedToolCalls.set(index, existing) } } if (streamedToolCalls.size === 0) { const trailingReasoningCall = parseReasoningToolCallBlock(reasoningToolBuffer) if (trailingReasoningCall) { trailingReasoningCall.id = `call_reasoning_${reasoningToolCalls.length}` reasoningToolCalls.push(trailingReasoningCall) } } const finalToolCalls = streamedToolCalls.size > 0 ? [...streamedToolCalls.entries()] .sort((a, b) => a[0] - b[0]) .map(([, toolCall]) => toolCall) : reasoningToolCalls const message: OpenAI.Chat.ChatCompletionMessage = { role: "assistant", content: contentParts.length > 0 ? contentParts.join("") : null, refusal: null, ...(finalToolCalls.length > 0 ? { tool_calls: finalToolCalls, } : {}), } if (reasoningParts.length > 0) { ;(message as OpenAI.Chat.ChatCompletionMessage & { reasoning_content?: string }).reasoning_content = reasoningParts.join("") } return { message, usage, emittedText, emittedThinking, bufferedThinking: reasoningParts.join(""), } } async function createStreamedCompletion( apiClient: OpenAI, request: CompletionRequest, ): Promise { const streamedRequest = { ...request, stream: true, stream_options: { include_usage: true }, } as const const promptMetricId = recordMetric({ type: "prompt", messages: request.messages }) try { const stream = await apiClient.chat.completions.create(streamedRequest) const result = await consumeCompletionStream(stream as AsyncIterable) recordPromptResponse(request, result, promptMetricId) return result } catch (err) { if (shouldRetryWithoutStreamUsage(err)) { const stream = await apiClient.chat.completions.create({ ...request, stream: true, } as const) const result = await consumeCompletionStream(stream as AsyncIterable) recordPromptResponse(request, result, promptMetricId) return result } throw err } } async function createFallbackCompletion(messages: OpenAI.Chat.ChatCompletionMessageParam[]): Promise { const request: CompletionRequest = { model: FALLBACK_MODEL, messages, tools: TOOLS, tool_choice: FALLBACK_TOOL_CHOICE, ...openRouterToolRequestExtras(FALLBACK_BASE), ...configuredThinkingRequestExtras(), } let currentRequest = request let retriedAutoToolChoice = false let retriedWithoutReasoning = false while (true) { try { return await createStreamedCompletion(fallbackClient, currentRequest) } catch (err) { if (currentRequest.tool_choice !== "auto" && !retriedAutoToolChoice && shouldRetryWithAutoToolChoice(err)) { retriedAutoToolChoice = true console.warn( `[fallback] provider rejected tool_choice=${currentRequest.tool_choice}; retrying with tool_choice=auto`, ) currentRequest = { ...currentRequest, tool_choice: "auto", } continue } if (!retriedWithoutReasoning && shouldRetryWithoutReasoningForTools(err)) { retriedWithoutReasoning = true console.warn("[fallback] provider rejected function calling in reasoning/prefix mode; retrying fallback with tool-compatible reasoning disabled") currentRequest = disableReasoningForToolCalls(currentRequest) continue } throw err } } } async function createPrimaryCompletion(messages: OpenAI.Chat.ChatCompletionMessageParam[]): Promise { const request: CompletionRequest = { model: MODEL, messages, tools: TOOLS, tool_choice: PRIMARY_TOOL_CHOICE, ...openRouterToolRequestExtras(API_BASE), ...configuredThinkingRequestExtras(), } let currentRequest = request let retriedAutoToolChoice = false let retriedWithoutReasoning = false while (true) { try { return await createStreamedCompletion(client!, currentRequest) } catch (err) { if (currentRequest.tool_choice !== "auto" && !retriedAutoToolChoice && shouldRetryWithAutoToolChoice(err)) { retriedAutoToolChoice = true console.warn(`[api] provider rejected tool_choice=${currentRequest.tool_choice}; retrying primary with tool_choice=auto`) currentRequest = { ...currentRequest, tool_choice: "auto", } continue } if (!retriedWithoutReasoning && shouldRetryWithoutReasoningForTools(err)) { retriedWithoutReasoning = true console.warn("[api] provider rejected function calling in reasoning/prefix mode; retrying primary with tool-compatible reasoning disabled") currentRequest = disableReasoningForToolCalls(currentRequest) continue } throw err } } } function logPromptSizeDebug(state: LoopState, err: unknown, label: string): void { const messageCount = state.conversation.length const roleCounts = state.conversation.reduce>((acc, m) => { const role = (m as { role?: string }).role ?? "unknown" acc[role] = (acc[role] ?? 0) + 1 return acc }, {}) const estimate = estimatePromptTokens(state.conversation) const charLength = JSON.stringify(state.conversation).length const summary = err instanceof OpenAI.APIError ? `${err.status} ${err.message}` : errorSummary(err) console.warn( `[api] ${label}: ${summary} - messages=${messageCount} est_tokens=${estimate} chars=${charLength} roles=${JSON.stringify(roleCounts)} observedPromptTokens=${state.contextSize}`, ) } async function recoverFromPromptTooLarge(state: LoopState, attempt: number): Promise { const beforeCount = state.conversation.length const beforeEstimate = estimatePromptTokens(state.conversation) const summaryProvider = configuredSummaryProvider() if (!summaryProvider.client || !summaryProvider.model) { console.warn(`[context] recovery: no summary client available; cannot llm-summarize`) return false } console.warn(`[context] recovery: attempting llm summarization via ${summaryProvider.model} (attempt=${attempt + 1})`) const summarized = await summarizeConversationViaLLM(state.conversation, summaryProvider.client, summaryProvider.model) if (!summarized) { console.warn(`[context] recovery: llm summarization returned no changes`) return false } const afterEstimate = estimatePromptTokens(summarized) if (afterEstimate >= beforeEstimate) { console.warn(`[context] recovery: llm summary not smaller (${beforeEstimate} -> ${afterEstimate}); keeping original`) return false } state.conversation = summarized state.contextSize = afterEstimate const summaryIdx = findSummaryMessageIndex(state.conversation) const summary = summaryIdx >= 0 ? (state.conversation[summaryIdx]?.content as string) : undefined console.warn( `[context] recovery: llm-summarized conversation (${beforeCount} -> ${summarized.length} msgs, ${beforeEstimate} -> ${afterEstimate} tokens)`, ) recordMetric({ type: "compaction", before: beforeEstimate, after: afterEstimate, method: "force-llm", summary, }) return true } /** * Fetches the next assistant completion, including fallback and backoff behavior. * * @param state - Mutable loop state containing current conversation/context. * @param baseConversation - Optional alternate base conversation for retries. * @returns The next chat completion response. * @throws If the primary request fails with a non-fallback error condition. */ export async function fetchCompletion( state: LoopState, baseConversation: OpenAI.Chat.ChatCompletionMessageParam[] = state.conversation, ): Promise { let promptTooLargeAttempts = 0 while (true) { if (baseConversation === state.conversation) { state.conversation = sanitizeMessages(state.conversation) baseConversation = state.conversation } else { baseConversation = sanitizeMessages(baseConversation) } const requestContext = await buildCompletionMessages( baseConversation, state.memoryRecallCooldowns, state.memoryRecallTurn, ) const requestMessages = requestContext.messages if (USE_FALLBACK) { const fallbackWindow = fallbackContextWindow(requestMessages) if (fallbackWindow.nearLimit) { console.warn( `[fallback] prompt estimate ${fallbackWindow.estimate} nearing fallback limit ${fallbackWindow.softLimit} (${FALLBACK_MODEL})`, ) } try { const completion = await createFallbackCompletion(requestMessages) state.memoryRecallCooldowns = rememberRecalledMemoryChunks( state.memoryRecallCooldowns, requestContext.recalledChunkIds, state.memoryRecallTurn, ) return completion } catch (fallbackErr) { if (isPromptTooLargeError(fallbackErr) && promptTooLargeAttempts < 2) { logPromptSizeDebug(state, fallbackErr, `fallback rejected prompt (attempt ${promptTooLargeAttempts + 1}/2)`) const recovered = await recoverFromPromptTooLarge(state, promptTooLargeAttempts) promptTooLargeAttempts++ if (recovered) continue } if (shouldFallback(fallbackErr)) { const retryAfter = retryDelayMs(fallbackErr) console.warn( `[fallback] transient failure (${errorSummary(fallbackErr)}); retrying after ${Math.ceil(retryAfter / 1000)}s`, ) console.log( `[runner] backing off ${Math.ceil(retryAfter / 1000)}s (until ${formatRetryAt(retryAfter)}) before retrying fallback...`, ) await sleep(retryAfter) continue } logApiError(fallbackErr, `model=${FALLBACK_MODEL} api=${FALLBACK_BASE}`) throw fallbackErr } } try { const completion = await createPrimaryCompletion(requestMessages) state.memoryRecallCooldowns = rememberRecalledMemoryChunks( state.memoryRecallCooldowns, requestContext.recalledChunkIds, state.memoryRecallTurn, ) return completion } catch (primaryErr) { if (isPromptTooLargeError(primaryErr) && promptTooLargeAttempts < 2) { logPromptSizeDebug(state, primaryErr, `primary rejected prompt (attempt ${promptTooLargeAttempts + 1}/2)`) const recovered = await recoverFromPromptTooLarge(state, promptTooLargeAttempts) promptTooLargeAttempts++ if (recovered) continue logApiError(primaryErr, `model=${MODEL} api=${API_BASE}`) throw primaryErr } if (!shouldFallback(primaryErr)) { logApiError(primaryErr, `model=${MODEL} api=${API_BASE}`) throw primaryErr } const fallbackWindow = fallbackContextWindow(requestMessages) if (fallbackWindow.skip) { console.warn( `[api] primary down (${errorSummary(primaryErr)}) and fallback context estimate ${fallbackWindow.estimate} exceeds hard limit ${fallbackWindow.hardLimit}; retrying primary after backoff`, ) const retryAfter = retryDelayMs(primaryErr) console.log( `[runner] backing off ${Math.ceil(retryAfter / 1000)}s (until ${formatRetryAt(retryAfter)}) before retrying primary...`, ) await sleep(retryAfter) continue } if (fallbackWindow.nearLimit) { console.warn( `[fallback] prompt estimate ${fallbackWindow.estimate} nearing fallback limit ${fallbackWindow.softLimit} (${FALLBACK_MODEL})`, ) } console.warn(`[api] primary down (${errorSummary(primaryErr)}) - switching to fallback`) try { const completion = await createFallbackCompletion(requestMessages) state.memoryRecallCooldowns = rememberRecalledMemoryChunks( state.memoryRecallCooldowns, requestContext.recalledChunkIds, state.memoryRecallTurn, ) return completion } catch (fallbackErr) { if (isPromptTooLargeError(fallbackErr) && promptTooLargeAttempts < 2) { logPromptSizeDebug(state, fallbackErr, `fallback rejected prompt during failover (attempt ${promptTooLargeAttempts + 1}/2)`) const recovered = await recoverFromPromptTooLarge(state, promptTooLargeAttempts) promptTooLargeAttempts++ if (recovered) continue } console.warn( `[api] fallback failed (${errorSummary(fallbackErr)}) after primary failure (${errorSummary(primaryErr)}); retrying primary after backoff`, ) const retryAfter = retryDelayMs(primaryErr) console.log( `[runner] backing off ${Math.ceil(retryAfter / 1000)}s (until ${formatRetryAt(retryAfter)}) before retrying primary...`, ) await sleep(retryAfter) } } } } export const __completionTest = { consumeCompletionStream, }