programmatic subagents
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(driver-pi): fail fast on stalled pi subprocesses

+85 -20
+6 -1
packages/driver-pi/src/internal/pi.codec.ts
··· 198 198 } 199 199 200 200 if (!terminalSeen) { 201 + const lastEventType = 202 + decodedLines.length > 0 203 + ? readString(decodedLines[decodedLines.length - 1]!, "type") 204 + : undefined; 205 + 201 206 return yield* Effect.fail( 202 207 new PiCodecError({ 203 - message: "Missing terminal agent_end line from pi process output.", 208 + message: `Missing terminal agent_end line from pi process output (lines=${String(decodedLines.length)}, lastType=${lastEventType ?? "unknown"}). Process may have been interrupted or stalled.`, 204 209 }), 205 210 ); 206 211 }
+39 -18
packages/driver-pi/src/internal/process-driver.effect.ts
··· 16 16 return String(error); 17 17 }; 18 18 19 + const DEFAULT_PI_PROCESS_TIMEOUT_MS = 60 * 60 * 1000; 20 + 21 + const resolvePiProcessTimeoutMs = (timeoutMs?: number): number => { 22 + if (typeof timeoutMs !== "number" || !Number.isFinite(timeoutMs) || timeoutMs <= 0) { 23 + return DEFAULT_PI_PROCESS_TIMEOUT_MS; 24 + } 25 + 26 + return Math.floor(timeoutMs); 27 + }; 28 + 29 + const toPiProcessDriverError = (error: unknown): PiProcessDriverError => 30 + error instanceof PiProcessDriverError 31 + ? error 32 + : new PiProcessDriverError({ 33 + message: toMessage(error), 34 + }); 35 + 19 36 const normalizePath = (path: string): string => { 20 37 if (path.length <= 1) { 21 38 return path; ··· 54 71 return Command.env(command, config.env); 55 72 }; 56 73 57 - export const makePiProcessDriver = (config: DriverProcessConfig): DriverRuntime => ({ 74 + export const makePiProcessDriver = ( 75 + config: DriverProcessConfig, 76 + options?: { 77 + timeoutMs?: number; 78 + }, 79 + ): DriverRuntime => ({ 58 80 name: "pi", 59 81 spawn: (input) => 60 82 Effect.gen(function* () { 61 83 const sessionPath = sessionPathForSpawn(input); 62 84 const sessionsDirectory = sessionPath.slice(0, sessionPath.lastIndexOf("/")); 63 85 const fileSystem = yield* FileSystem.FileSystem; 86 + const timeoutMs = resolvePiProcessTimeoutMs(options?.timeoutMs); 64 87 65 88 yield* Effect.mapError( 66 89 fileSystem.makeDirectory(sessionsDirectory, { recursive: true }), ··· 79 102 model: input.model, 80 103 command: config.command, 81 104 sessionPath, 105 + timeoutMs, 82 106 }); 83 107 84 - const stdout = yield* Effect.mapError( 85 - Command.string(command), 86 - (error) => 87 - new PiProcessDriverError({ 88 - message: toMessage(error), 89 - }), 90 - ); 91 - 92 - const decoded = yield* Effect.mapError( 93 - decodePiProcessOutput(stdout, { 94 - agent: input.agent, 95 - model: input.model, 96 - spawnId: input.spawnId, 108 + const stdout = yield* Command.string(command).pipe( 109 + Effect.timeoutFail({ 110 + duration: timeoutMs, 111 + onTimeout: () => 112 + new PiProcessDriverError({ 113 + message: `pi process timed out after ${String(timeoutMs)}ms (runId=${input.runId}, spawnId=${input.spawnId}).`, 114 + }), 97 115 }), 98 - (error) => 99 - new PiProcessDriverError({ 100 - message: toMessage(error), 101 - }), 116 + Effect.mapError(toPiProcessDriverError), 102 117 ); 118 + 119 + const decoded = yield* decodePiProcessOutput(stdout, { 120 + agent: input.agent, 121 + model: input.model, 122 + spawnId: input.spawnId, 123 + }).pipe(Effect.mapError(toPiProcessDriverError)); 103 124 104 125 const raw = stdout 105 126 .split("\n")
+36
packages/driver-pi/src/public/index.api.test.ts
··· 29 29 "console.log(JSON.stringify({type:'session',id:'session-test'}));" + 30 30 "console.log(JSON.stringify({type:'agent_end',messages:[{role:'assistant',content:[],stopReason:'error',errorMessage:'context_length_exceeded'}]}));"; 31 31 32 + const HANGING_SCRIPT = "setInterval(() => {}, 1_000);"; 33 + 32 34 describe("createPiDriverRegistration", () => { 33 35 it("supports explicit model catalog overrides via codec", async () => { 34 36 const driver = createPiDriverRegistration({ ··· 218 220 expect(output.result.exitCode).toBe(1); 219 221 expect(output.result.stopReason).toBe("error"); 220 222 expect(output.result.errorMessage).toBe("context_length_exceeded"); 223 + }); 224 + 225 + it("fails fast when the pi process stalls and exceeds timeout", async () => { 226 + const driver = createPiDriverRegistration({ 227 + process: { 228 + command: "bun", 229 + args: ["-e", HANGING_SCRIPT, "--"], 230 + }, 231 + timeoutMs: 100, 232 + models: ["openai/gpt-5.3-codex"], 233 + }); 234 + 235 + expect(driver.runtime).toBeDefined(); 236 + 237 + if (driver.runtime === undefined) { 238 + return; 239 + } 240 + 241 + await expect( 242 + Runtime.runPromise(runtime)( 243 + Effect.provide( 244 + driver.runtime.spawn({ 245 + runId: "run_driver_timeout", 246 + runDirectory: "/tmp/run_driver_timeout", 247 + spawnId: "spawn_driver_timeout", 248 + agent: "scout", 249 + systemPrompt: "You are concise.", 250 + prompt: "Say hello", 251 + model: "openai/gpt-5.3-codex", 252 + }), 253 + BunContext.layer, 254 + ), 255 + ), 256 + ).rejects.toThrow("timed out"); 221 257 }); 222 258 });
+4 -1
packages/driver-pi/src/public/index.api.ts
··· 7 7 export interface CreatePiDriverRegistrationInput { 8 8 readonly process?: DriverProcessConfig; 9 9 readonly models?: ReadonlyArray<string>; 10 + readonly timeoutMs?: number; 10 11 } 11 12 12 13 const normalizeModelCatalog = (models: ReadonlyArray<string>): ReadonlyArray<string> => ··· 89 90 process, 90 91 models: input?.models, 91 92 }), 92 - runtime: makePiProcessDriver(process), 93 + runtime: makePiProcessDriver(process, { 94 + timeoutMs: input?.timeoutMs, 95 + }), 93 96 }; 94 97 };