programmatic subagents
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(pi-mill): propagate subagent failures as MillError and stabilize async follow-up

+162 -55
+38
packages/driver-pi/src/internal/pi.codec.test.ts
··· 46 46 47 47 expect(decoded.result.text).toBe("done"); 48 48 }); 49 + 50 + it("marks stopReason=error payloads as failed spawn results", async () => { 51 + const output = [ 52 + JSON.stringify({ type: "session", id: "session-test" }), 53 + JSON.stringify({ 54 + type: "message_end", 55 + message: { 56 + role: "assistant", 57 + content: [], 58 + stopReason: "error", 59 + errorMessage: "context_length_exceeded", 60 + }, 61 + }), 62 + JSON.stringify({ 63 + type: "agent_end", 64 + messages: [ 65 + { 66 + role: "assistant", 67 + content: [], 68 + stopReason: "error", 69 + errorMessage: "context_length_exceeded", 70 + }, 71 + ], 72 + }), 73 + ].join("\n"); 74 + 75 + const decoded = await runWithRuntime( 76 + decodePiProcessOutput(output, { 77 + agent: "scout", 78 + model: "openai/gpt-5.3-codex", 79 + spawnId: "spawn_test", 80 + }), 81 + ); 82 + 83 + expect(decoded.result.exitCode).toBe(1); 84 + expect(decoded.result.stopReason).toBe("error"); 85 + expect(decoded.result.errorMessage).toBe("context_length_exceeded"); 86 + }); 49 87 });
+17 -2
packages/driver-pi/src/internal/pi.codec.ts
··· 47 47 return textParts.join("\n"); 48 48 }; 49 49 50 - const extractAssistantSummary = (message: unknown): { text?: string; stopReason?: string } => { 50 + const extractAssistantSummary = ( 51 + message: unknown, 52 + ): { text?: string; stopReason?: string; errorMessage?: string } => { 51 53 if (!isRecord(message)) { 52 54 return {}; 53 55 } ··· 55 57 return { 56 58 text: extractTextFromContent(message.content), 57 59 stopReason: readString(message, "stopReason"), 60 + errorMessage: readString(message, "errorMessage"), 58 61 }; 59 62 }; 60 63 ··· 93 96 let sessionRef: string | undefined; 94 97 let responseText: string | undefined; 95 98 let stopReason: string | undefined; 99 + let errorMessage: string | undefined; 96 100 let terminalSeen = false; 97 101 98 102 for (const decoded of decodedLines) { ··· 117 121 118 122 if (assistantSummary.stopReason !== undefined) { 119 123 stopReason = assistantSummary.stopReason; 124 + } 125 + 126 + if (assistantSummary.errorMessage !== undefined) { 127 + errorMessage = assistantSummary.errorMessage; 120 128 } 121 129 } 122 130 ··· 182 190 if (assistantSummary.stopReason !== undefined) { 183 191 stopReason = assistantSummary.stopReason; 184 192 } 193 + 194 + if (assistantSummary.errorMessage !== undefined) { 195 + errorMessage = assistantSummary.errorMessage; 196 + } 185 197 } 186 198 } 187 199 ··· 193 205 ); 194 206 } 195 207 208 + const exitCode = stopReason === "error" || errorMessage !== undefined ? 1 : 0; 209 + 196 210 return { 197 211 events, 198 212 result: { ··· 201 215 agent: input.agent, 202 216 model: input.model, 203 217 driver: "pi", 204 - exitCode: 0, 218 + exitCode, 205 219 stopReason, 220 + errorMessage, 206 221 }, 207 222 } satisfies DriverSpawnOutput; 208 223 });
+39
packages/driver-pi/src/public/index.api.test.ts
··· 21 21 "console.log(JSON.stringify({type:'auto_retry_start'}));" + 22 22 "console.log(JSON.stringify({type:'agent_end',messages:[{role:'assistant',content:[{type:'text',text:'second'}]}]}));"; 23 23 24 + const ERROR_TERMINAL_SCRIPT = 25 + "console.log(JSON.stringify({type:'session',id:'session-test'}));" + 26 + "console.log(JSON.stringify({type:'agent_end',messages:[{role:'assistant',content:[],stopReason:'error',errorMessage:'context_length_exceeded'}]}));"; 27 + 24 28 describe("createPiDriverRegistration", () => { 25 29 it("supports explicit model catalog overrides via codec", async () => { 26 30 const driver = createPiDriverRegistration({ ··· 132 136 ); 133 137 134 138 expect(output.result.text).toBe("second"); 139 + }); 140 + 141 + it("marks terminal stopReason=error payloads as failed", async () => { 142 + const driver = createPiDriverRegistration({ 143 + process: { 144 + command: "bun", 145 + args: ["-e", ERROR_TERMINAL_SCRIPT], 146 + }, 147 + models: ["openai/gpt-5.3-codex"], 148 + }); 149 + 150 + expect(driver.runtime).toBeDefined(); 151 + 152 + if (driver.runtime === undefined) { 153 + return; 154 + } 155 + 156 + const output = await Runtime.runPromise(runtime)( 157 + Effect.provide( 158 + driver.runtime.spawn({ 159 + runId: "run_driver_error", 160 + runDirectory: "/tmp/run_driver_error", 161 + spawnId: "spawn_driver_error", 162 + agent: "scout", 163 + systemPrompt: "You are concise.", 164 + prompt: "Say hello", 165 + model: "openai/gpt-5.3-codex", 166 + }), 167 + BunContext.layer, 168 + ), 169 + ); 170 + 171 + expect(output.result.exitCode).toBe(1); 172 + expect(output.result.stopReason).toBe("error"); 173 + expect(output.result.errorMessage).toBe("context_length_exceeded"); 135 174 }); 136 175 });
+3 -3
packages/pi-mill/contract.ts
··· 1 1 import { Type, type Static } from "@sinclair/typebox"; 2 - import { FactoryError } from "./errors.js"; 2 + import { MillError } from "./errors.js"; 3 3 4 4 export const SubagentSchema = Type.Object({ 5 5 task: Type.String({ description: "Label/description for this program run." }), ··· 13 13 14 14 export function validateParams(params: SubagentParams): SubagentParams { 15 15 if (!params.task?.trim()) { 16 - throw new FactoryError({ 16 + throw new MillError({ 17 17 code: "INVALID_INPUT", 18 18 message: "'task' is required.", 19 19 recoverable: true, 20 20 }); 21 21 } 22 22 if (!params.code?.trim()) { 23 - throw new FactoryError({ 23 + throw new MillError({ 24 24 code: "INVALID_INPUT", 25 25 message: "'code' is required and must be non-empty.", 26 26 recoverable: true,
+3 -3
packages/pi-mill/errors.ts
··· 19 19 meta?: Record<string, unknown>; 20 20 } 21 21 22 - export class FactoryError extends Error { 22 + export class MillError extends Error { 23 23 readonly details: ErrorDetails; 24 24 constructor(details: ErrorDetails) { 25 25 super(`${details.code}: ${details.message}`); 26 - this.name = "FactoryError"; 26 + this.name = "MillError"; 27 27 this.details = details; 28 28 } 29 29 } 30 30 31 31 export function toErrorDetails(error: unknown, fallback?: Partial<ErrorDetails>): ErrorDetails { 32 - if (error instanceof FactoryError) return error.details; 32 + if (error instanceof MillError) return error.details; 33 33 return { 34 34 code: fallback?.code ?? "RUNTIME", 35 35 message: error instanceof Error ? error.message : String(error),
+5 -5
packages/pi-mill/executors/program-executor.ts
··· 1 1 import { pathToFileURL } from "node:url"; 2 2 import { highlightCode, type ExtensionContext } from "@mariozechner/pi-coding-agent"; 3 3 import { matchesKey, truncateToWidth, wrapTextWithAnsi } from "@mariozechner/pi-tui"; 4 - import { FactoryError, toErrorDetails } from "../errors.js"; 4 + import { MillError, toErrorDetails } from "../errors.js"; 5 5 import type { ObservabilityStore } from "../observability.js"; 6 6 import { 7 7 createMillRuntime, ··· 206 206 // Preflight typecheck — catch type errors before showing confirmation dialog 207 207 const typeErrors = await preflightTypecheck(code); 208 208 if (typeErrors) { 209 - throw new FactoryError({ 209 + throw new MillError({ 210 210 code: "INVALID_INPUT", 211 211 message: `Type errors in program code:\n${typeErrors}`, 212 212 recoverable: true, ··· 216 216 if (!input.skipConfirmation) { 217 217 const confirmation = await confirmExecution(ctx, code); 218 218 if (!confirmation.approved) { 219 - throw new FactoryError({ 219 + throw new MillError({ 220 220 code: "CONFIRMATION_REJECTED", 221 221 message: confirmation.reason ? `Cancelled: ${confirmation.reason}` : "Cancelled by user.", 222 222 recoverable: true, ··· 270 270 restoreGlobals(); 271 271 restorePromise(); 272 272 restoreConsole(); 273 - throw new FactoryError({ 273 + throw new MillError({ 274 274 code: "CANCELLED", 275 275 message: "Cancelled before execution.", 276 276 recoverable: true, ··· 279 279 let onAbort: (() => void) | undefined; 280 280 const cancelled = new Promise<never>((_resolve, reject) => { 281 281 onAbort = () => 282 - reject(new FactoryError({ code: "CANCELLED", message: "Cancelled.", recoverable: true })); 282 + reject(new MillError({ code: "CANCELLED", message: "Cancelled.", recoverable: true })); 283 283 input.signal?.addEventListener("abort", onAbort, { once: true }); 284 284 }); 285 285 try {
+21 -23
packages/pi-mill/index.ts
··· 459 459 }); 460 460 } 461 461 462 - // Wire completion: persist state first, then UI updates + notification. 462 + const scheduleCompletionNotification = (summary: RunSummary): void => { 463 + // Defer follow-up delivery to the next macrotask so very fast failures 464 + // (e.g. immediate throw) don't race the originating tool turn. 465 + setTimeout(() => { 466 + try { 467 + notifyCompletion(pi, registry, summary); 468 + } catch (error) { 469 + observability.push(runId, "warning", "notify_failed", { error: String(error) }); 470 + } 471 + 472 + try { 473 + widget.update(registry.getVisible(), ctx); 474 + } catch { 475 + /* ui may be unavailable */ 476 + } 477 + }, 0); 478 + }; 479 + 480 + // Wire completion: persist state first, then UI updates + async notification. 463 481 promise.then( 464 482 (summary) => { 465 483 observability.setStatus( ··· 496 514 /* ui may be unavailable */ 497 515 } 498 516 499 - try { 500 - notifyCompletion(pi, registry, fullSummary); 501 - } catch (error) { 502 - observability.push(runId, "warning", "notify_failed", { error: String(error) }); 503 - } 504 - 505 - try { 506 - widget.update(registry.getVisible(), ctx); 507 - } catch { 508 - /* ui may be unavailable */ 509 - } 517 + scheduleCompletionNotification(fullSummary); 510 518 }, 511 519 (err) => { 512 520 const details = toErrorDetails(err); ··· 540 548 /* ui may be unavailable */ 541 549 } 542 550 543 - try { 544 - notifyCompletion(pi, registry, failedSummary); 545 - } catch (error) { 546 - observability.push(runId, "warning", "notify_failed", { error: String(error) }); 547 - } 548 - 549 - try { 550 - widget.update(registry.getVisible(), ctx); 551 - } catch { 552 - /* ui may be unavailable */ 553 - } 551 + scheduleCompletionNotification(failedSummary); 554 552 }, 555 553 ); 556 554
+36 -19
packages/pi-mill/runtime.ts
··· 3 3 import * as os from "node:os"; 4 4 import * as path from "node:path"; 5 5 import type { ExtensionContext } from "@mariozechner/pi-coding-agent"; 6 - import { FactoryError } from "./errors.js"; 6 + import { MillError } from "./errors.js"; 7 7 import type { ObservabilityStore } from "./observability.js"; 8 8 import type { ExecutionResult } from "./types.js"; 9 9 ··· 334 334 if (!Array.isArray(spawns) || spawns.length === 0) { 335 335 const failedStatus = payload.result?.status ?? payload.run?.status; 336 336 const message = payload.result?.errorMessage; 337 - throw new FactoryError({ 337 + throw new MillError({ 338 338 code: "RUNTIME", 339 339 message: 340 340 message && message.length > 0 ··· 355 355 ? 0 356 356 : 1; 357 357 358 + const errorMessage = selectedSpawn.errorMessage ?? payload.result?.errorMessage; 359 + const stopReason = selectedSpawn.stopReason; 360 + 361 + if (derivedExitCode !== 0 || stopReason === "error" || (errorMessage?.length ?? 0) > 0) { 362 + const reason = 363 + errorMessage ?? 364 + (stopReason !== undefined && stopReason.length > 0 365 + ? `stopReason=${stopReason}` 366 + : `exitCode=${derivedExitCode}`); 367 + 368 + throw new MillError({ 369 + code: "RUNTIME", 370 + message: `Subagent '${selectedSpawn.agent ?? fallback.agent}' failed: ${reason}`, 371 + recoverable: false, 372 + }); 373 + } 374 + 358 375 return { 359 376 taskId: "", 360 377 agent: selectedSpawn.agent ?? fallback.agent, ··· 364 381 stderr: "", 365 382 usage: newUsage(), 366 383 model: selectedSpawn.model ?? fallback.modelId, 367 - stopReason: selectedSpawn.stopReason, 368 - errorMessage: selectedSpawn.errorMessage ?? payload.result?.errorMessage, 384 + stopReason, 385 + errorMessage, 369 386 step: undefined, 370 387 text: selectedSpawn.text ?? "", 371 388 sessionPath: selectedSpawn.sessionRef, ··· 510 527 appendCommandLog(stdoutPath, input.millCommand, submitArgs, submitted); 511 528 512 529 if (submitted.aborted || aborted) { 513 - throw new FactoryError({ 530 + throw new MillError({ 514 531 code: "CANCELLED", 515 532 message: "Subagent aborted.", 516 533 recoverable: true, ··· 518 535 } 519 536 520 537 if (submitted.code !== 0) { 521 - throw new FactoryError({ 538 + throw new MillError({ 522 539 code: "RUNTIME", 523 540 message: 524 541 submitted.combined.trim().length > 0 ··· 532 549 [submitted.stdout, submitted.stderr].join("\n"), 533 550 ) as MillRunSubmitPayload | Record<string, unknown> | undefined; 534 551 if (!submitPayload) { 535 - throw new FactoryError({ 552 + throw new MillError({ 536 553 code: "RUNTIME", 537 554 message: "mill run did not return JSON submission payload.", 538 555 recoverable: false, ··· 541 558 542 559 submittedRunId = extractRunId(submitPayload as Record<string, unknown>); 543 560 if (!submittedRunId) { 544 - throw new FactoryError({ 561 + throw new MillError({ 545 562 code: "RUNTIME", 546 563 message: "mill run submission payload is missing runId.", 547 564 recoverable: false, ··· 569 586 570 587 if (waited.aborted || aborted) { 571 588 await requestRunCancel(); 572 - throw new FactoryError({ 589 + throw new MillError({ 573 590 code: "CANCELLED", 574 591 message: "Subagent aborted.", 575 592 recoverable: true, ··· 577 594 } 578 595 579 596 if (waited.code !== 0) { 580 - throw new FactoryError({ 597 + throw new MillError({ 581 598 code: "RUNTIME", 582 599 message: 583 600 waited.combined.trim().length > 0 ··· 591 608 const terminalStatus = waitPayload ? extractRunStatus(waitPayload) : undefined; 592 609 593 610 if (terminalStatus === "cancelled") { 594 - throw new FactoryError({ 611 + throw new MillError({ 595 612 code: "CANCELLED", 596 613 message: "Subagent run was cancelled.", 597 614 recoverable: true, ··· 599 616 } 600 617 601 618 if (terminalStatus === "failed") { 602 - throw new FactoryError({ 619 + throw new MillError({ 603 620 code: "RUNTIME", 604 621 message: "Subagent run failed before inspect.", 605 622 recoverable: false, ··· 622 639 623 640 if (inspected.aborted || aborted) { 624 641 await requestRunCancel(); 625 - throw new FactoryError({ 642 + throw new MillError({ 626 643 code: "CANCELLED", 627 644 message: "Subagent aborted.", 628 645 recoverable: true, ··· 630 647 } 631 648 632 649 if (inspected.code !== 0) { 633 - throw new FactoryError({ 650 + throw new MillError({ 634 651 code: "RUNTIME", 635 652 message: 636 653 inspected.combined.trim().length > 0 ··· 642 659 643 660 const parsed = parseJsonObjectFromText([inspected.stdout, inspected.stderr].join("\n")); 644 661 if (!parsed) { 645 - throw new FactoryError({ 662 + throw new MillError({ 646 663 code: "RUNTIME", 647 664 message: "mill inspect output was not valid JSON.", 648 665 recoverable: false, ··· 702 719 703 720 function validateModelSelector(model: string, agent: string): string { 704 721 if (!model?.trim()) { 705 - throw new FactoryError({ 722 + throw new MillError({ 706 723 code: "INVALID_INPUT", 707 724 message: `Spawn for '${agent}' requires a non-empty 'model'.`, 708 725 recoverable: true, ··· 741 758 742 759 spawn({ agent, systemPrompt, prompt, cwd, model, tools, step, signal }) { 743 760 if (!systemPrompt?.trim()) { 744 - throw new FactoryError({ 761 + throw new MillError({ 745 762 code: "INVALID_INPUT", 746 763 message: `Spawn for '${agent}' requires non-empty systemPrompt.`, 747 764 recoverable: true, 748 765 }); 749 766 } 750 767 if (!prompt?.trim()) { 751 - throw new FactoryError({ 768 + throw new MillError({ 752 769 code: "INVALID_INPUT", 753 770 message: `Spawn for '${agent}' requires non-empty prompt.`, 754 771 recoverable: true, ··· 899 916 900 917 export function prepareProgramModule(code: string): { modulePath: string } { 901 918 if (!code.trim()) { 902 - throw new FactoryError({ 919 + throw new MillError({ 903 920 code: "INVALID_INPUT", 904 921 message: "Program code is empty.", 905 922 recoverable: true,