import OpenAI from "openai"
import { logMessage } from "../db.js"
import { buildCompletionMessages, rememberRecalledMemoryChunks } from "../memory.js"
import { recordMetric } from "../metrics.js"
import { emit } from "../stream.js"
import type { LoopState } from "./types.js"
import {
API_BASE,
ENABLE_THINKING,
FALLBACK_BASE,
FALLBACK_MODEL,
FALLBACK_TOOL_CHOICE,
MODEL,
PRIMARY_TOOL_CHOICE,
SUMMARY_MODEL,
TOOLS,
USE_FALLBACK,
apiErrorDetails,
client,
errorSummary,
estimatePromptTokens,
fallbackClient,
fallbackContextWindow,
findSummaryMessageIndex,
isPromptTooLargeError,
retryDelayMs,
sanitizeMessages,
shouldFallback,
summaryClient,
summarizeConversationViaLLM,
} from "./util.js"
import { assistantContentText } from "./loop-content.js"
import type { CompletionRequest, CompletionTurnResult, ToolCallAssembly } from "./loop-shared.js"
/**
* Resolves the configured summary client/model pair.
*
* @returns Active summary provider config.
*/
export function configuredSummaryProvider(): { client: OpenAI | null; model: string } {
if (summaryClient && SUMMARY_MODEL) return { client: summaryClient, model: SUMMARY_MODEL }
return {
client: USE_FALLBACK ? fallbackClient : client,
model: USE_FALLBACK ? FALLBACK_MODEL : MODEL,
}
}
function logApiError(err: unknown, context: string): void {
if (!(err instanceof OpenAI.APIError)) return
console.error(`[api] ${err.status} ${err.message} - ${context}`)
for (const line of apiErrorDetails(err)) console.error(line)
}
/**
* Appends an assistant message to state and persists it to conversation logs.
*
* @param convId - Active conversation id.
* @param state - Mutable loop state.
* @param msg - Assistant message to append.
*/
export function addAssistantMessage(convId: number, state: LoopState, msg: OpenAI.Chat.ChatCompletionMessage): void {
state.conversation.push(msg)
logMessage(convId, msg.role, msg.content ?? "", msg.tool_calls ?? undefined)
}
function recordPromptResponse(request: CompletionRequest, result: CompletionTurnResult, promptMetricId: number | null): void {
recordMetric({
type: "prompt_response",
promptMetricId: promptMetricId ?? undefined,
model: request.model,
toolChoice: request.tool_choice,
messages: request.messages,
response: result.message,
usage: result.usage,
})
}
/**
* Applies token usage from a completion response to loop state counters.
*
* @param state - Mutable loop state.
* @param usage - Completion usage payload (if provided by the API).
*/
export function applyUsage(state: LoopState, usage: OpenAI.Completions.CompletionUsage | undefined): void {
if (!usage) return
state.tokenCount += usage.total_tokens
if (usage.prompt_tokens) state.contextSize = usage.prompt_tokens
console.log(`[tokens] +${usage.total_tokens} total=${state.tokenCount}`)
recordMetric({ type: "usage", usage })
}
/**
* Emits model reasoning text when exposed by the provider.
*
* Supports both `reasoning_content` and `...` wrappers.
*
* @param msg - Assistant message to inspect for reasoning traces.
*/
export function emitThinking(msg: OpenAI.Chat.ChatCompletionMessage): void {
const rawMsg = msg as unknown as Record
let thinkingText: string | null = null
if (typeof rawMsg.reasoning_content === "string" && rawMsg.reasoning_content.trim()) {
thinkingText = rawMsg.reasoning_content.trim()
} else if (typeof msg.content === "string") {
const match = msg.content.match(/^([\s\S]*?)<\/think>\s*/i)
if (match) {
thinkingText = match[1]!.trim()
;(msg as unknown as Record).content = msg.content.slice(match[0].length)
}
}
if (thinkingText) emit({ type: "thinking", text: thinkingText })
}
function sleep(ms: number): Promise {
return new Promise((resolve) => setTimeout(resolve, ms))
}
function formatRetryAt(retryAfterMs: number): string {
const retryAt = new Date(Date.now() + retryAfterMs)
const local = retryAt.toLocaleString(undefined, {
hour12: false,
timeZoneName: "short",
})
return `${local} (${retryAt.toISOString()})`
}
function apiErrorSearchText(err: { message: string; error?: unknown }): string {
const parts = [err.message]
if (err.error !== undefined) {
try {
parts.push(JSON.stringify(err.error))
} catch {
parts.push(String(err.error))
}
}
return parts.join("\n")
}
function shouldRetryWithAutoToolChoice(err: unknown): boolean {
if (!(err instanceof OpenAI.APIError)) return false
const text = apiErrorSearchText(err)
return /no endpoints found that support the provided 'tool_choice' value|does not support this tool_choice/i.test(text)
}
function shouldRetryWithoutReasoningForTools(err: unknown): boolean {
if (!(err instanceof OpenAI.APIError)) return false
return /function call should not be used with prefix/i.test(apiErrorSearchText(err))
}
function toolCompatibleReasoningExtras(
request?: Pick,
): Partial {
return {
include_reasoning: false,
reasoning: { enabled: false, exclude: true, effort: "none" },
provider: { ...request?.provider, require_parameters: true },
}
}
function prefixModeToolCallExtras(request?: Pick): Partial {
return {
...toolCompatibleReasoningExtras(request),
enable_thinking: false,
chat_template_kwargs: {
...request?.chat_template_kwargs,
enable_thinking: false,
},
}
}
function disableReasoningForToolCalls(request: CompletionRequest): CompletionRequest {
return {
...request,
...prefixModeToolCallExtras(request),
}
}
function configuredThinkingRequestExtras(
request?: Pick,
): Partial {
if (ENABLE_THINKING) return {}
return {
include_reasoning: false,
reasoning: { enabled: false, exclude: true, effort: "none" },
enable_thinking: false,
chat_template_kwargs: {
...request?.chat_template_kwargs,
enable_thinking: false,
},
}
}
function openRouterToolRequestExtras(baseUrl: string): Partial {
if (!baseUrl.includes("openrouter.ai")) return {}
return toolCompatibleReasoningExtras()
}
function shouldRetryWithoutStreamUsage(err: unknown): boolean {
if (!(err instanceof OpenAI.APIError)) return false
if (err.status !== 400) return false
return /stream_options|include_usage/i.test(err.message)
}
function coerceReasoningToolArgument(rawValue: string): unknown {
const value = rawValue.trim()
if (!value) return ""
if (/^true$/i.test(value)) return true
if (/^false$/i.test(value)) return false
if (/^null$/i.test(value)) return null
if (/^-?\d+(?:\.\d+)?$/.test(value)) {
const parsed = Number(value)
if (Number.isFinite(parsed)) return parsed
}
if (
(value.startsWith("{") && value.endsWith("}")) ||
(value.startsWith("[") && value.endsWith("]")) ||
(value.startsWith('"') && value.endsWith('"'))
) {
try {
return JSON.parse(value)
} catch {
// keep raw string fallback
}
}
return value
}
function parseReasoningToolCallBlock(rawBlock: string): ToolCallAssembly | null {
const functionMatch = rawBlock.match(/"'\s/]+)["']?\s*>/i)
if (!functionMatch || functionMatch.index === undefined) return null
const functionName = functionMatch[1]?.trim()
if (!functionName) return null
const functionBodyStart = functionMatch.index + functionMatch[0].length
const functionBodyEnd = rawBlock.indexOf("", functionBodyStart)
if (functionBodyEnd < 0) return null
const functionBody = rawBlock.slice(functionBodyStart, functionBodyEnd)
const args: Record = {}
const parameterRegex = /"'\s/]+)["']?\s*>([\s\S]*?)<\/parameter>/gi
for (const match of functionBody.matchAll(parameterRegex)) {
const key = match[1]?.trim()
if (!key) continue
args[key] = coerceReasoningToolArgument(match[2] ?? "")
}
return {
id: "",
type: "function",
function: {
name: functionName,
arguments: JSON.stringify(args),
},
}
}
function drainReasoningToolCallBlocks(buffer: string): { blocks: string[]; remainder: string } {
const blocks: string[] = []
let remaining = buffer
while (true) {
const openMatch = remaining.match(/]*)?>/i)
if (!openMatch || openMatch.index === undefined) {
const partialStart = remaining.lastIndexOf("= 0 ? remaining.slice(partialStart) : "",
}
}
const openStart = openMatch.index
const openEnd = openStart + openMatch[0].length
const closeStart = remaining.indexOf("", openEnd)
if (closeStart < 0) {
return {
blocks,
remainder: remaining.slice(openStart),
}
}
blocks.push(remaining.slice(openEnd, closeStart))
remaining = remaining.slice(closeStart + "".length)
}
}
async function consumeCompletionStream(
stream: AsyncIterable,
): Promise {
const contentParts: string[] = []
const streamedToolCalls = new Map()
const reasoningToolCalls: ToolCallAssembly[] = []
let reasoningToolBuffer = ""
let usage: OpenAI.Completions.CompletionUsage | undefined
let emittedText = false
let emittedThinking = false
const reasoningParts: string[] = []
for await (const chunk of stream) {
if (chunk.usage) usage = chunk.usage
const choice = chunk.choices[0]
if (!choice) continue
const delta = choice.delta as OpenAI.Chat.ChatCompletionChunk.Choice.Delta & {
reasoning_content?: string
}
if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) {
if (ENABLE_THINKING) reasoningParts.push(delta.reasoning_content)
reasoningToolBuffer += delta.reasoning_content
const { blocks, remainder } = drainReasoningToolCallBlocks(reasoningToolBuffer)
reasoningToolBuffer = remainder
for (const block of blocks) {
const parsedCall = parseReasoningToolCallBlock(block)
if (!parsedCall) continue
parsedCall.id = `call_reasoning_${reasoningToolCalls.length}`
reasoningToolCalls.push(parsedCall)
}
}
if (typeof delta.content === "string" && delta.content.length > 0) {
if (ENABLE_THINKING && !emittedThinking && reasoningParts.length > 0) {
emit({ type: "thinking", text: reasoningParts.join("") })
emittedThinking = true
}
contentParts.push(delta.content)
emit({ type: "text", text: delta.content })
emittedText = true
}
if (!Array.isArray(delta.tool_calls)) continue
for (const partial of delta.tool_calls) {
const index = partial.index ?? 0
const existing = streamedToolCalls.get(index) ?? {
id: partial.id ?? `call_${index}`,
type: "function" as const,
function: { name: "", arguments: "" },
}
if (partial.id) existing.id = partial.id
if (partial.type === "function") existing.type = "function"
if (partial.function?.name) existing.function.name += partial.function.name
if (partial.function?.arguments) existing.function.arguments += partial.function.arguments
streamedToolCalls.set(index, existing)
}
}
if (streamedToolCalls.size === 0) {
const trailingReasoningCall = parseReasoningToolCallBlock(reasoningToolBuffer)
if (trailingReasoningCall) {
trailingReasoningCall.id = `call_reasoning_${reasoningToolCalls.length}`
reasoningToolCalls.push(trailingReasoningCall)
}
}
const finalToolCalls =
streamedToolCalls.size > 0
? [...streamedToolCalls.entries()]
.sort((a, b) => a[0] - b[0])
.map(([, toolCall]) => toolCall)
: reasoningToolCalls
const message: OpenAI.Chat.ChatCompletionMessage = {
role: "assistant",
content: contentParts.length > 0 ? contentParts.join("") : null,
refusal: null,
...(finalToolCalls.length > 0
? {
tool_calls: finalToolCalls,
}
: {}),
}
if (reasoningParts.length > 0) {
;(message as OpenAI.Chat.ChatCompletionMessage & { reasoning_content?: string }).reasoning_content =
reasoningParts.join("")
}
return {
message,
usage,
emittedText,
emittedThinking,
bufferedThinking: reasoningParts.join(""),
}
}
async function createStreamedCompletion(
apiClient: OpenAI,
request: CompletionRequest,
): Promise {
const streamedRequest = {
...request,
stream: true,
stream_options: { include_usage: true },
} as const
const promptMetricId = recordMetric({ type: "prompt", messages: request.messages })
try {
const stream = await apiClient.chat.completions.create(streamedRequest)
const result = await consumeCompletionStream(stream as AsyncIterable)
recordPromptResponse(request, result, promptMetricId)
return result
} catch (err) {
if (shouldRetryWithoutStreamUsage(err)) {
const stream = await apiClient.chat.completions.create({
...request,
stream: true,
} as const)
const result = await consumeCompletionStream(stream as AsyncIterable)
recordPromptResponse(request, result, promptMetricId)
return result
}
throw err
}
}
async function createFallbackCompletion(messages: OpenAI.Chat.ChatCompletionMessageParam[]): Promise {
const request: CompletionRequest = {
model: FALLBACK_MODEL,
messages,
tools: TOOLS,
tool_choice: FALLBACK_TOOL_CHOICE,
...openRouterToolRequestExtras(FALLBACK_BASE),
...configuredThinkingRequestExtras(),
}
let currentRequest = request
let retriedAutoToolChoice = false
let retriedWithoutReasoning = false
while (true) {
try {
return await createStreamedCompletion(fallbackClient, currentRequest)
} catch (err) {
if (currentRequest.tool_choice !== "auto" && !retriedAutoToolChoice && shouldRetryWithAutoToolChoice(err)) {
retriedAutoToolChoice = true
console.warn(
`[fallback] provider rejected tool_choice=${currentRequest.tool_choice}; retrying with tool_choice=auto`,
)
currentRequest = {
...currentRequest,
tool_choice: "auto",
}
continue
}
if (!retriedWithoutReasoning && shouldRetryWithoutReasoningForTools(err)) {
retriedWithoutReasoning = true
console.warn("[fallback] provider rejected function calling in reasoning/prefix mode; retrying fallback with tool-compatible reasoning disabled")
currentRequest = disableReasoningForToolCalls(currentRequest)
continue
}
throw err
}
}
}
async function createPrimaryCompletion(messages: OpenAI.Chat.ChatCompletionMessageParam[]): Promise {
const request: CompletionRequest = {
model: MODEL,
messages,
tools: TOOLS,
tool_choice: PRIMARY_TOOL_CHOICE,
...openRouterToolRequestExtras(API_BASE),
...configuredThinkingRequestExtras(),
}
let currentRequest = request
let retriedAutoToolChoice = false
let retriedWithoutReasoning = false
while (true) {
try {
return await createStreamedCompletion(client!, currentRequest)
} catch (err) {
if (currentRequest.tool_choice !== "auto" && !retriedAutoToolChoice && shouldRetryWithAutoToolChoice(err)) {
retriedAutoToolChoice = true
console.warn(`[api] provider rejected tool_choice=${currentRequest.tool_choice}; retrying primary with tool_choice=auto`)
currentRequest = {
...currentRequest,
tool_choice: "auto",
}
continue
}
if (!retriedWithoutReasoning && shouldRetryWithoutReasoningForTools(err)) {
retriedWithoutReasoning = true
console.warn("[api] provider rejected function calling in reasoning/prefix mode; retrying primary with tool-compatible reasoning disabled")
currentRequest = disableReasoningForToolCalls(currentRequest)
continue
}
throw err
}
}
}
function logPromptSizeDebug(state: LoopState, err: unknown, label: string): void {
const messageCount = state.conversation.length
const roleCounts = state.conversation.reduce>((acc, m) => {
const role = (m as { role?: string }).role ?? "unknown"
acc[role] = (acc[role] ?? 0) + 1
return acc
}, {})
const estimate = estimatePromptTokens(state.conversation)
const charLength = JSON.stringify(state.conversation).length
const summary = err instanceof OpenAI.APIError ? `${err.status} ${err.message}` : errorSummary(err)
console.warn(
`[api] ${label}: ${summary} - messages=${messageCount} est_tokens=${estimate} chars=${charLength} roles=${JSON.stringify(roleCounts)} observedPromptTokens=${state.contextSize}`,
)
}
async function recoverFromPromptTooLarge(state: LoopState, attempt: number): Promise {
const beforeCount = state.conversation.length
const beforeEstimate = estimatePromptTokens(state.conversation)
const summaryProvider = configuredSummaryProvider()
if (!summaryProvider.client || !summaryProvider.model) {
console.warn(`[context] recovery: no summary client available; cannot llm-summarize`)
return false
}
console.warn(`[context] recovery: attempting llm summarization via ${summaryProvider.model} (attempt=${attempt + 1})`)
const summarized = await summarizeConversationViaLLM(state.conversation, summaryProvider.client, summaryProvider.model)
if (!summarized) {
console.warn(`[context] recovery: llm summarization returned no changes`)
return false
}
const afterEstimate = estimatePromptTokens(summarized)
if (afterEstimate >= beforeEstimate) {
console.warn(`[context] recovery: llm summary not smaller (${beforeEstimate} -> ${afterEstimate}); keeping original`)
return false
}
state.conversation = summarized
state.contextSize = afterEstimate
const summaryIdx = findSummaryMessageIndex(state.conversation)
const summary = summaryIdx >= 0 ? (state.conversation[summaryIdx]?.content as string) : undefined
console.warn(
`[context] recovery: llm-summarized conversation (${beforeCount} -> ${summarized.length} msgs, ${beforeEstimate} -> ${afterEstimate} tokens)`,
)
recordMetric({
type: "compaction",
before: beforeEstimate,
after: afterEstimate,
method: "force-llm",
summary,
})
return true
}
/**
* Fetches the next assistant completion, including fallback and backoff behavior.
*
* @param state - Mutable loop state containing current conversation/context.
* @param baseConversation - Optional alternate base conversation for retries.
* @returns The next chat completion response.
* @throws If the primary request fails with a non-fallback error condition.
*/
export async function fetchCompletion(
state: LoopState,
baseConversation: OpenAI.Chat.ChatCompletionMessageParam[] = state.conversation,
): Promise {
let promptTooLargeAttempts = 0
while (true) {
if (baseConversation === state.conversation) {
state.conversation = sanitizeMessages(state.conversation)
baseConversation = state.conversation
} else {
baseConversation = sanitizeMessages(baseConversation)
}
const requestContext = await buildCompletionMessages(
baseConversation,
state.memoryRecallCooldowns,
state.memoryRecallTurn,
)
const requestMessages = requestContext.messages
if (USE_FALLBACK) {
const fallbackWindow = fallbackContextWindow(requestMessages)
if (fallbackWindow.nearLimit) {
console.warn(
`[fallback] prompt estimate ${fallbackWindow.estimate} nearing fallback limit ${fallbackWindow.softLimit} (${FALLBACK_MODEL})`,
)
}
try {
const completion = await createFallbackCompletion(requestMessages)
state.memoryRecallCooldowns = rememberRecalledMemoryChunks(
state.memoryRecallCooldowns,
requestContext.recalledChunkIds,
state.memoryRecallTurn,
)
return completion
} catch (fallbackErr) {
if (isPromptTooLargeError(fallbackErr) && promptTooLargeAttempts < 2) {
logPromptSizeDebug(state, fallbackErr, `fallback rejected prompt (attempt ${promptTooLargeAttempts + 1}/2)`)
const recovered = await recoverFromPromptTooLarge(state, promptTooLargeAttempts)
promptTooLargeAttempts++
if (recovered) continue
}
if (shouldFallback(fallbackErr)) {
const retryAfter = retryDelayMs(fallbackErr)
console.warn(
`[fallback] transient failure (${errorSummary(fallbackErr)}); retrying after ${Math.ceil(retryAfter / 1000)}s`,
)
console.log(
`[runner] backing off ${Math.ceil(retryAfter / 1000)}s (until ${formatRetryAt(retryAfter)}) before retrying fallback...`,
)
await sleep(retryAfter)
continue
}
logApiError(fallbackErr, `model=${FALLBACK_MODEL} api=${FALLBACK_BASE}`)
throw fallbackErr
}
}
try {
const completion = await createPrimaryCompletion(requestMessages)
state.memoryRecallCooldowns = rememberRecalledMemoryChunks(
state.memoryRecallCooldowns,
requestContext.recalledChunkIds,
state.memoryRecallTurn,
)
return completion
} catch (primaryErr) {
if (isPromptTooLargeError(primaryErr) && promptTooLargeAttempts < 2) {
logPromptSizeDebug(state, primaryErr, `primary rejected prompt (attempt ${promptTooLargeAttempts + 1}/2)`)
const recovered = await recoverFromPromptTooLarge(state, promptTooLargeAttempts)
promptTooLargeAttempts++
if (recovered) continue
logApiError(primaryErr, `model=${MODEL} api=${API_BASE}`)
throw primaryErr
}
if (!shouldFallback(primaryErr)) {
logApiError(primaryErr, `model=${MODEL} api=${API_BASE}`)
throw primaryErr
}
const fallbackWindow = fallbackContextWindow(requestMessages)
if (fallbackWindow.skip) {
console.warn(
`[api] primary down (${errorSummary(primaryErr)}) and fallback context estimate ${fallbackWindow.estimate} exceeds hard limit ${fallbackWindow.hardLimit}; retrying primary after backoff`,
)
const retryAfter = retryDelayMs(primaryErr)
console.log(
`[runner] backing off ${Math.ceil(retryAfter / 1000)}s (until ${formatRetryAt(retryAfter)}) before retrying primary...`,
)
await sleep(retryAfter)
continue
}
if (fallbackWindow.nearLimit) {
console.warn(
`[fallback] prompt estimate ${fallbackWindow.estimate} nearing fallback limit ${fallbackWindow.softLimit} (${FALLBACK_MODEL})`,
)
}
console.warn(`[api] primary down (${errorSummary(primaryErr)}) - switching to fallback`)
try {
const completion = await createFallbackCompletion(requestMessages)
state.memoryRecallCooldowns = rememberRecalledMemoryChunks(
state.memoryRecallCooldowns,
requestContext.recalledChunkIds,
state.memoryRecallTurn,
)
return completion
} catch (fallbackErr) {
if (isPromptTooLargeError(fallbackErr) && promptTooLargeAttempts < 2) {
logPromptSizeDebug(state, fallbackErr, `fallback rejected prompt during failover (attempt ${promptTooLargeAttempts + 1}/2)`)
const recovered = await recoverFromPromptTooLarge(state, promptTooLargeAttempts)
promptTooLargeAttempts++
if (recovered) continue
}
console.warn(
`[api] fallback failed (${errorSummary(fallbackErr)}) after primary failure (${errorSummary(primaryErr)}); retrying primary after backoff`,
)
const retryAfter = retryDelayMs(primaryErr)
console.log(
`[runner] backing off ${Math.ceil(retryAfter / 1000)}s (until ${formatRetryAt(retryAfter)}) before retrying primary...`,
)
await sleep(retryAfter)
}
}
}
}
export const __completionTest = {
consumeCompletionStream,
}