programmatic subagents
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(cli): unify watch streams and remove inspect command

+462 -455
+13 -11
README.md
··· 27 27 ``` 28 28 29 29 ```bash 30 - mill run review.ts # returns runId, executes in background 31 - mill watch abc123 # stream events live 32 - mill run review.ts --sync # or block until done 30 + mill run review.ts # returns runId, executes in background 31 + mill watch --run abc123 # stream events live 32 + mill watch --run abc123 --channel io 33 + mill run review.ts --sync # or block until done 33 34 ``` 34 35 35 36 ## CLI 36 37 37 38 ``` 38 39 mill run <program.ts> [--sync] [--json] [--driver <name>] 39 - mill status <runId> show run state 40 - mill wait <runId> --timeout block until complete/failed/cancelled 41 - mill watch [--run <runId>] stream tier-1 events (global if --run omitted) 42 - mill inspect <id>[.<spawnId>] inspect run or spawn detail 43 - mill inspect <id> --session resolve full agent session via driver 44 - mill cancel <runId> mark cancelled + kill worker process tree 45 - mill ls [--status <filter>] list runs 46 - mill init [--global] generate starter config (local or ~/.mill/config.ts) 40 + mill status <runId> show run state 41 + mill wait <runId> --timeout block until complete/failed/cancelled 42 + mill watch [--run <runId>] watch streams (default: events) 43 + --channel events|io|all choose stream channel 44 + --source driver|program io source filter (io/all only) 45 + --spawn <spawnId> io spawn filter (io/all only) 46 + mill cancel <runId> mark cancelled + kill worker process tree 47 + mill ls [--status <filter>] list runs 48 + mill init [--global] generate starter config (local or ~/.mill/config.ts) 47 49 ``` 48 50 49 51 All commands accept `--json` for machine-readable output on stdout (diagnostics go to stderr).
+61 -138
packages/cli/src/public/index.api.test.ts
··· 64 64 }), 65 65 ); 66 66 67 - const InspectRunEnvelope = Schema.parseJson( 68 - Schema.Struct({ 69 - kind: Schema.Literal("run"), 70 - run: Schema.Struct({ 71 - id: Schema.String, 72 - status: Schema.String, 73 - }), 74 - events: Schema.Array( 75 - Schema.Struct({ 67 + const WatchOutputEnvelope = Schema.parseJson( 68 + Schema.Union( 69 + Schema.Struct({ 70 + kind: Schema.Literal("event"), 71 + runId: Schema.String, 72 + event: Schema.Struct({ 76 73 type: Schema.String, 77 - sequence: Schema.Number, 78 - }), 79 - ), 80 - }), 81 - ); 82 - 83 - const InspectSpawnEnvelope = Schema.parseJson( 84 - Schema.Struct({ 85 - kind: Schema.Literal("spawn"), 86 - runId: Schema.String, 87 - spawnId: Schema.String, 88 - result: Schema.optional( 89 - Schema.Struct({ 90 - sessionRef: Schema.String, 74 + runId: Schema.String, 91 75 }), 92 - ), 93 - }), 94 - ); 95 - 96 - const SessionEnvelope = Schema.parseJson( 97 - Schema.Struct({ 98 - runId: Schema.String, 99 - spawnId: Schema.String, 100 - sessionRef: Schema.String, 101 - pointer: Schema.String, 102 - driver: Schema.String, 103 - }), 76 + }), 77 + Schema.Struct({ 78 + kind: Schema.Literal("io"), 79 + runId: Schema.String, 80 + source: Schema.Union(Schema.Literal("driver"), Schema.Literal("program")), 81 + stream: Schema.Union(Schema.Literal("stdout"), Schema.Literal("stderr")), 82 + line: Schema.String, 83 + timestamp: Schema.String, 84 + spawnId: Schema.optional(Schema.String), 85 + }), 86 + ), 104 87 ); 105 88 106 89 const CancelEnvelope = Schema.parseJson( ··· 129 112 timeoutSeconds: Schema.Number, 130 113 message: Schema.String, 131 114 }), 132 - }), 133 - ); 134 - 135 - const WatchEventEnvelope = Schema.parseJson( 136 - Schema.Struct({ 137 - type: Schema.String, 138 - runId: Schema.String, 139 115 }), 140 116 ); 141 117 ··· 498 474 } 499 475 }); 500 476 501 - it("supports watch/inspect/session/ls JSON contracts for completed runs", async () => { 502 - const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-inspect-watch-")); 477 + it("supports watch/ls JSON contracts for completed runs", async () => { 478 + const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-watch-")); 503 479 const homeDirectory = join(tempDirectory, "home"); 504 480 const programPath = join(tempDirectory, "program.ts"); 505 481 ··· 534 510 const runPayload = Schema.decodeUnknownSync(RunSyncEnvelope)(runStdout[0]); 535 511 536 512 const watchStdout: Array<string> = []; 537 - const watchCode = await runCli(["watch", "--run", runPayload.run.id, "--json"], { 538 - cwd: tempDirectory, 539 - homeDirectory, 540 - pathExists: async () => false, 541 - io: { 542 - stdout: (line) => { 543 - watchStdout.push(line); 544 - }, 545 - stderr: () => undefined, 546 - }, 547 - }); 548 - 549 - expect(watchCode).toBe(0); 550 - expect(watchStdout.length).toBeGreaterThan(0); 551 - 552 - for (const line of watchStdout) { 553 - const parsed = Schema.decodeUnknownSync(WatchEventEnvelope)(line); 554 - expect(parsed.runId).toBe(runPayload.run.id); 555 - expect(typeof parsed.type).toBe("string"); 556 - } 557 - 558 - const inspectRunStdout: Array<string> = []; 559 - const inspectRunCode = await runCli(["inspect", runPayload.run.id, "--json"], { 560 - cwd: tempDirectory, 561 - homeDirectory, 562 - pathExists: async () => false, 563 - io: { 564 - stdout: (line) => { 565 - inspectRunStdout.push(line); 566 - }, 567 - stderr: () => undefined, 568 - }, 569 - }); 570 - 571 - expect(inspectRunCode).toBe(0); 572 - const inspectedRun = Schema.decodeUnknownSync(InspectRunEnvelope)(inspectRunStdout[0]); 573 - expect(inspectedRun.run.id).toBe(runPayload.run.id); 574 - expect(inspectedRun.events.length).toBeGreaterThan(0); 575 - 576 - const inspectSpawnStdout: Array<string> = []; 577 - const inspectSpawnCode = await runCli(["inspect", `${runPayload.run.id}.spawn_1`, "--json"], { 578 - cwd: tempDirectory, 579 - homeDirectory, 580 - pathExists: async () => false, 581 - io: { 582 - stdout: (line) => { 583 - inspectSpawnStdout.push(line); 584 - }, 585 - stderr: () => undefined, 586 - }, 587 - }); 588 - 589 - expect(inspectSpawnCode).toBe(0); 590 - const inspectedSpawn = Schema.decodeUnknownSync(InspectSpawnEnvelope)(inspectSpawnStdout[0]); 591 - expect(inspectedSpawn.spawnId).toBe("spawn_1"); 592 - expect(inspectedSpawn.result?.sessionRef.length).toBeGreaterThan(0); 593 - 594 - const inspectSessionStdout: Array<string> = []; 595 - const inspectSessionCode = await runCli( 596 - ["inspect", `${runPayload.run.id}.spawn_1`, "--session", "--json"], 513 + const watchCode = await runCli( 514 + ["watch", "--run", runPayload.run.id, "--channel", "all", "--json"], 597 515 { 598 516 cwd: tempDirectory, 599 517 homeDirectory, 600 518 pathExists: async () => false, 601 519 io: { 602 520 stdout: (line) => { 603 - inspectSessionStdout.push(line); 521 + watchStdout.push(line); 604 522 }, 605 523 stderr: () => undefined, 606 524 }, 607 525 }, 608 526 ); 609 527 610 - expect(inspectSessionCode).toBe(0); 611 - const sessionPayload = Schema.decodeUnknownSync(SessionEnvelope)(inspectSessionStdout[0]); 612 - expect(sessionPayload.runId).toBe(runPayload.run.id); 613 - expect(sessionPayload.spawnId).toBe("spawn_1"); 614 - expect(sessionPayload.pointer.length).toBeGreaterThan(0); 528 + expect(watchCode).toBe(0); 529 + expect(watchStdout.length).toBeGreaterThan(0); 530 + 531 + const parsedWatch = watchStdout.map((line) => 532 + Schema.decodeUnknownSync(WatchOutputEnvelope)(line), 533 + ); 534 + expect(parsedWatch.every((entry) => entry.runId === runPayload.run.id)).toBe(true); 535 + expect( 536 + parsedWatch.some((entry) => entry.kind === "event" && entry.event.type === "run:complete"), 537 + ).toBe(true); 615 538 616 539 const listStdout: Array<string> = []; 617 540 const listCode = await runCli(["ls", "--json"], { ··· 697 620 const secondCancel = Schema.decodeUnknownSync(CancelEnvelope)(cancelStdout2[0]); 698 621 expect(secondCancel.runId).toBe(submittedRun.runId); 699 622 700 - const inspectStdout: Array<string> = []; 701 - const inspectCode = await runCli(["inspect", submittedRun.runId, "--json"], { 702 - cwd: tempDirectory, 703 - homeDirectory, 704 - pathExists: async () => false, 705 - io: { 706 - stdout: (line) => { 707 - inspectStdout.push(line); 708 - }, 709 - stderr: () => undefined, 710 - }, 711 - }); 623 + const eventsContent = await readFile(submittedRun.paths.eventsFile, "utf-8"); 624 + const persistedEvents = eventsContent 625 + .split("\n") 626 + .map((line) => line.trim()) 627 + .filter((line) => line.length > 0) 628 + .map((line) => JSON.parse(line) as { type?: string }); 712 629 713 - expect(inspectCode).toBe(0); 714 - const inspectedRun = Schema.decodeUnknownSync(InspectRunEnvelope)(inspectStdout[0]); 715 - const terminalEvents = inspectedRun.events.filter( 630 + const terminalEvents = persistedEvents.filter( 716 631 (event) => 717 632 event.type === "run:complete" || 718 633 event.type === "run:failed" || ··· 743 658 } 744 659 }); 745 660 746 - it("watch --raw streams tier-2 passthrough without persistence", async () => { 747 - const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-watch-raw-")); 661 + it("watch --channel io streams io envelopes without persisted event payloads", async () => { 662 + const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-watch-io-")); 748 663 const homeDirectory = join(tempDirectory, "home"); 749 664 const programPath = join(tempDirectory, "program.ts"); 750 665 ··· 806 721 const submittedRun = Schema.decodeUnknownSync(RunSubmitEnvelope)(runStdout[0]); 807 722 808 723 const watchStdout: Array<string> = []; 809 - const watchCode = await runCli(["watch", "--run", submittedRun.runId, "--raw"], { 810 - cwd: tempDirectory, 811 - homeDirectory, 812 - pathExists: async () => false, 813 - io: { 814 - stdout: (line) => { 815 - watchStdout.push(line); 724 + const watchCode = await runCli( 725 + ["watch", "--run", submittedRun.runId, "--channel", "io", "--json"], 726 + { 727 + cwd: tempDirectory, 728 + homeDirectory, 729 + pathExists: async () => false, 730 + io: { 731 + stdout: (line) => { 732 + watchStdout.push(line); 733 + }, 734 + stderr: () => undefined, 816 735 }, 817 - stderr: () => undefined, 818 736 }, 819 - }); 737 + ); 820 738 821 739 expect(watchCode).toBe(0); 822 740 expect(watchStdout.length).toBeGreaterThan(0); 823 741 742 + const parsedWatch = watchStdout.map((line) => 743 + Schema.decodeUnknownSync(WatchOutputEnvelope)(line), 744 + ); 745 + expect(parsedWatch.every((entry) => entry.kind === "io")).toBe(true); 746 + 824 747 const eventsContent = await readFile(submittedRun.paths.eventsFile, "utf-8"); 825 - for (const rawLine of watchStdout) { 826 - expect(eventsContent.includes(rawLine.trim())).toBe(false); 748 + for (const line of watchStdout) { 749 + expect(eventsContent.includes(line.trim())).toBe(false); 827 750 } 828 751 } finally { 829 752 await rm(tempDirectory, { recursive: true, force: true });
+18 -55
packages/cli/src/public/index.api.ts
··· 8 8 cancelRun, 9 9 defineConfig, 10 10 getRunStatus, 11 - inspectRun, 12 11 listRuns, 13 12 processDriver, 14 13 resolveConfig, ··· 481 480 return 1; 482 481 }; 483 482 483 + const WATCH_CHANNELS = ["events", "io", "all"] as const; 484 + const WATCH_SOURCES = ["driver", "program"] as const; 485 + 486 + type WatchChannel = (typeof WATCH_CHANNELS)[number]; 487 + type WatchSource = (typeof WATCH_SOURCES)[number]; 488 + 484 489 interface WatchCommandInput { 485 490 readonly run: Option.Option<string>; 486 491 readonly sinceTime: Option.Option<string>; 492 + readonly channel: Option.Option<WatchChannel>; 493 + readonly source: Option.Option<WatchSource>; 494 + readonly spawn: Option.Option<string>; 487 495 readonly json: boolean; 488 - readonly raw: boolean; 489 496 readonly runsDir: Option.Option<string>; 490 497 readonly driver: Option.Option<string>; 491 498 } ··· 498 505 await watchRun({ 499 506 defaults: defaultConfig, 500 507 runId: fromOption(command.run), 508 + channel: fromOption(command.channel), 509 + source: fromOption(command.source), 510 + spawnId: fromOption(command.spawn), 501 511 sinceTimeIso: fromOption(command.sinceTime), 502 - raw: command.raw, 503 512 cwd: options.cwd, 504 513 homeDirectory: options.homeDirectory, 505 514 runsDirectory: fromOption(command.runsDir) ?? options.runsDirectory, ··· 514 523 return 0; 515 524 }; 516 525 517 - interface InspectCommandInput { 518 - readonly ref: string; 519 - readonly json: boolean; 520 - readonly session: boolean; 521 - readonly runsDir: Option.Option<string>; 522 - readonly driver: Option.Option<string>; 523 - } 524 - 525 - const inspectCommand = async ( 526 - command: InspectCommandInput, 527 - options: RunCliOptions, 528 - io: CliIo, 529 - ): Promise<number> => { 530 - const inspected = await inspectRun({ 531 - defaults: defaultConfig, 532 - ref: command.ref, 533 - session: command.session, 534 - cwd: options.cwd, 535 - homeDirectory: options.homeDirectory, 536 - runsDirectory: fromOption(command.runsDir) ?? options.runsDirectory, 537 - driverName: fromOption(command.driver), 538 - pathExists: options.pathExists, 539 - loadConfigModule: options.loadConfigModule, 540 - }); 541 - 542 - if (command.json) { 543 - io.stdout(JSON.stringify(inspected)); 544 - return 0; 545 - } 546 - 547 - io.stdout(JSON.stringify(inspected, null, 2)); 548 - return 0; 549 - }; 550 - 551 526 interface CancelCommandInput { 552 527 readonly runId: string; 553 528 readonly json: boolean; ··· 676 651 { 677 652 run: optionalTextOption("run"), 678 653 sinceTime: optionalTextOption("since-time"), 654 + channel: Options.choice("channel", WATCH_CHANNELS).pipe(Options.optional), 655 + source: Options.choice("source", WATCH_SOURCES).pipe(Options.optional), 656 + spawn: optionalTextOption("spawn"), 679 657 json: Options.boolean("json"), 680 - raw: Options.boolean("raw"), 681 658 runsDir: optionalTextOption("runs-dir"), 682 659 driver: optionalTextOption("driver"), 683 660 }, 684 661 (command) => toCliEffect(watchCommand(command, options, io)), 685 662 ).pipe( 686 663 CliCommand.withDescription( 687 - "Stream run events. Use --run <runId> for scoped watch, omit for global watch.", 664 + "Watch run streams. --channel events|io|all (default: events). --channel io|all requires --run.", 688 665 ), 689 666 ); 690 667 691 - const inspect = CliCommand.make( 692 - "inspect", 693 - { 694 - ref: Args.text({ name: "runId[.spawnId]" }), 695 - json: Options.boolean("json"), 696 - session: Options.boolean("session"), 697 - runsDir: optionalTextOption("runs-dir"), 698 - driver: optionalTextOption("driver"), 699 - }, 700 - (command) => toCliEffect(inspectCommand(command, options, io)), 701 - ).pipe(CliCommand.withDescription("Inspect run, spawn, or session output.")); 702 - 703 668 const cancel = CliCommand.make( 704 669 "cancel", 705 670 { ··· 736 701 737 702 return CliCommand.make("mill").pipe( 738 703 CliCommand.withDescription("Mill orchestration runtime."), 739 - CliCommand.withSubcommands([run, status, wait, watch, inspect, cancel, ls, init, worker]), 704 + CliCommand.withSubcommands([run, status, wait, watch, cancel, ls, init, worker]), 740 705 ); 741 706 }; 742 707 ··· 763 728 run <program.ts> Run a mill program 764 729 status <runId> Show run state 765 730 wait <runId> --timeout <s> Wait for terminal state 766 - watch [--run <runId>] Stream run events (global if --run omitted) 767 - inspect <ref> Inspect run or spawn detail 731 + watch [--run <runId>] Watch events/io streams (use --channel events|io|all) 768 732 cancel <runId> Cancel a running execution 769 733 ls List runs 770 734 init [--global] Create starter config (local or ~/.mill/config.ts) ··· 802 766 "status", 803 767 "wait", 804 768 "watch", 805 - "inspect", 806 769 "cancel", 807 770 "ls", 808 771 "init",
+45 -56
packages/cli/src/public/index.e2e.test.ts
··· 68 68 }), 69 69 ); 70 70 71 - const InspectRunEnvelope = Schema.parseJson( 72 - Schema.Struct({ 73 - kind: Schema.Literal("run"), 74 - run: Schema.Struct({ 75 - id: Schema.String, 76 - status: Schema.String, 77 - }), 78 - events: Schema.Array( 79 - Schema.Struct({ 71 + const WatchOutputEnvelope = Schema.parseJson( 72 + Schema.Union( 73 + Schema.Struct({ 74 + kind: Schema.Literal("event"), 75 + runId: Schema.String, 76 + event: Schema.Struct({ 80 77 type: Schema.String, 81 - sequence: Schema.Number, 78 + runId: Schema.String, 82 79 }), 83 - ), 84 - }), 85 - ); 86 - 87 - const SessionEnvelope = Schema.parseJson( 88 - Schema.Struct({ 89 - runId: Schema.String, 90 - spawnId: Schema.String, 91 - sessionRef: Schema.String, 92 - pointer: Schema.String, 93 - driver: Schema.String, 94 - }), 80 + }), 81 + Schema.Struct({ 82 + kind: Schema.Literal("io"), 83 + runId: Schema.String, 84 + source: Schema.Union(Schema.Literal("driver"), Schema.Literal("program")), 85 + stream: Schema.Union(Schema.Literal("stdout"), Schema.Literal("stderr")), 86 + line: Schema.String, 87 + timestamp: Schema.String, 88 + spawnId: Schema.optional(Schema.String), 89 + }), 90 + ), 95 91 ); 96 92 97 93 const ListEnvelope = Schema.parseJson( ··· 132 128 expect(output).toContain("Usage: mill <command>"); 133 129 expect(output).toContain("Commands:"); 134 130 expect(output).toContain("run <program.ts>"); 131 + expect(output).not.toContain("inspect <ref>"); 135 132 expect(output).not.toContain("discovery"); 136 133 expect(output).not.toContain("Effect-first"); 137 134 }); ··· 403 400 } 404 401 }, 20_000); 405 402 406 - it("runs inspect/session/cancel/watch matrix across concurrent runs", async () => { 403 + it("runs cancel/watch/list matrix across concurrent runs", async () => { 407 404 const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-matrix-e2e-")); 408 405 const runsDirectory = join(tempDirectory, "runs"); 409 406 const completeProgramPath = join(tempDirectory, "complete.ts"); ··· 522 519 "watch", 523 520 "--run", 524 521 completeRun.runId, 522 + "--channel", 523 + "all", 525 524 "--json", 526 525 "--runs-dir", 527 526 runsDirectory, ··· 531 530 const watchLines = watchOutput 532 531 .split("\n") 533 532 .map((line) => line.trim()) 534 - .filter((line) => line.length > 0); 533 + .filter((line) => line.length > 0) 534 + .map((line) => Schema.decodeUnknownSync(WatchOutputEnvelope)(line)); 535 535 536 536 expect(watchLines.length).toBeGreaterThan(0); 537 - const watchTerminalCount = watchLines 538 - .map((line) => Schema.decodeUnknownSync(EventTypeEnvelope)(line)) 539 - .filter( 540 - (event) => 541 - event.type === "run:complete" || 542 - event.type === "run:failed" || 543 - event.type === "run:cancelled", 544 - ).length; 537 + const watchTerminalCount = watchLines.filter( 538 + (entry) => 539 + entry.kind === "event" && 540 + (entry.event.type === "run:complete" || 541 + entry.event.type === "run:failed" || 542 + entry.event.type === "run:cancelled"), 543 + ).length; 545 544 expect(watchTerminalCount).toBe(1); 546 545 547 - const inspectCancelledOutput = await commandOutput( 546 + const watchCancelledOutput = await commandOutput( 548 547 Command.make( 549 548 "bun", 550 549 "run", 551 550 "packages/cli/src/bin/mill.ts", 552 - "inspect", 551 + "watch", 552 + "--run", 553 553 cancelRun.runId, 554 + "--channel", 555 + "events", 554 556 "--json", 555 557 "--runs-dir", 556 558 runsDirectory, 557 559 ), 558 560 ); 559 561 560 - const inspectedCancelled = 561 - Schema.decodeUnknownSync(InspectRunEnvelope)(inspectCancelledOutput); 562 - expect(inspectedCancelled.run.status).toBe("cancelled"); 562 + const cancelledLines = watchCancelledOutput 563 + .split("\n") 564 + .map((line) => line.trim()) 565 + .filter((line) => line.length > 0) 566 + .map((line) => Schema.decodeUnknownSync(WatchOutputEnvelope)(line)); 567 + 563 568 expect( 564 - inspectedCancelled.events.filter((event) => event.type === "run:cancelled"), 565 - ).toHaveLength(1); 566 - 567 - const inspectSessionOutput = await commandOutput( 568 - Command.make( 569 - "bun", 570 - "run", 571 - "packages/cli/src/bin/mill.ts", 572 - "inspect", 573 - `${completeRun.runId}.spawn_1`, 574 - "--session", 575 - "--json", 576 - "--runs-dir", 577 - runsDirectory, 569 + cancelledLines.some( 570 + (entry) => entry.kind === "event" && entry.event.type === "run:cancelled", 578 571 ), 579 - ); 580 - 581 - const sessionPayload = Schema.decodeUnknownSync(SessionEnvelope)(inspectSessionOutput); 582 - expect(sessionPayload.runId).toBe(completeRun.runId); 583 - expect(sessionPayload.spawnId).toBe("spawn_1"); 572 + ).toBe(true); 584 573 585 574 const listOutput = await commandOutput( 586 575 Command.make(
+10 -10
packages/core/src/internal/engine.effect.test.ts
··· 370 370 } 371 371 }); 372 372 373 - it("watch and watchRaw surface tier-1 persisted events and tier-2 raw passthrough", async () => { 373 + it("watch and watchIo surface tier-1 persisted events and io passthrough", async () => { 374 374 const runsDirectory = await mkdtemp(join(tmpdir(), "mill-engine-watch-")); 375 375 const runId = decodeRunIdSync(`run_${crypto.randomUUID()}`); 376 376 ··· 428 428 ), 429 429 ); 430 430 431 - const watchRawEffect = Effect.scoped( 432 - Stream.runCollect(Stream.take(engine.watchRaw(runId), 2)), 433 - ); 431 + const watchIoEffect = Effect.scoped(Stream.runCollect(Stream.take(engine.watchIo(runId), 2))); 434 432 435 433 const executionEffect = engine.runSync({ 436 434 runId, ··· 446 444 ), 447 445 }); 448 446 449 - const [tier1EventsChunk, rawEventsChunk] = await runWithBunContext( 447 + const [tier1EventsChunk, ioEventsChunk] = await runWithBunContext( 450 448 Effect.map( 451 - Effect.all([watchTier1Effect, watchRawEffect, executionEffect], { 449 + Effect.all([watchTier1Effect, watchIoEffect, executionEffect], { 452 450 concurrency: "unbounded", 453 451 }), 454 - ([tier1Events, rawEvents]) => [tier1Events, rawEvents] as const, 452 + ([tier1Events, ioEvents]) => [tier1Events, ioEvents] as const, 455 453 ), 456 454 ); 457 455 458 456 const tier1Events = [...tier1EventsChunk]; 459 - const rawEvents = [...rawEventsChunk]; 457 + const ioEvents = [...ioEventsChunk]; 460 458 461 459 expect(tier1Events.some((event) => event.type === "run:start")).toBe(true); 462 460 expect(tier1Events.some((event) => event.type === "run:complete")).toBe(true); 463 - expect(rawEvents).toHaveLength(2); 464 - expect(rawEvents[0]).toContain("raw:scout"); 461 + expect(ioEvents).toHaveLength(2); 462 + expect(ioEvents[0]?.source).toBe("driver"); 463 + expect(ioEvents[0]?.stream).toBe("stdout"); 464 + expect(ioEvents[0]?.line).toContain("raw:scout"); 465 465 466 466 const eventsFile = await readFile(join(runsDirectory, runId, "events.ndjson"), "utf-8"); 467 467 expect(eventsFile.includes('"type":"final"')).toBe(false);
+18 -6
packages/core/src/internal/engine.effect.ts
··· 30 30 type RunStore, 31 31 } from "./run-store.effect"; 32 32 import { 33 - publishRawEvent, 33 + publishIoEvent, 34 34 publishTier1Event, 35 - watchRawLive, 35 + watchIoLive, 36 36 watchTier1GlobalLive, 37 37 watchTier1Live, 38 + type IoStreamEvent, 38 39 } from "./observer-hub.effect"; 39 40 40 41 export class ConfigError extends Data.TaggedError("ConfigError")<{ message: string }> {} ··· 118 119 ) => Effect.Effect<ReadonlyArray<RunSyncOutput["run"]>, PersistenceError>; 119 120 readonly watch: (runId: RunId) => Stream.Stream<MillEvent, RunNotFoundError | PersistenceError>; 120 121 readonly watchAll: (sinceTimeIso?: string) => Stream.Stream<MillEvent, PersistenceError>; 121 - readonly watchRaw: (runId: RunId) => Stream.Stream<string, RunNotFoundError | PersistenceError>; 122 + readonly watchIo: ( 123 + runId: RunId, 124 + ) => Stream.Stream<IoStreamEvent, RunNotFoundError | PersistenceError>; 122 125 readonly inspect: ( 123 126 ref: InspectRef, 124 127 ) => Effect.Effect<InspectResult, RunNotFoundError | PersistenceError>; ··· 800 803 }); 801 804 802 805 for (const rawLine of driverOutputExit.value.raw ?? []) { 803 - yield* publishRawEvent(runInput.runId, rawLine); 806 + const timestamp = yield* toIsoTimestamp; 807 + 808 + yield* publishIoEvent({ 809 + runId: runInput.runId, 810 + source: "driver", 811 + stream: "stdout", 812 + line: rawLine, 813 + timestamp, 814 + spawnId, 815 + }); 804 816 } 805 817 806 818 for (const driverEvent of driverOutputExit.value.events) { ··· 1069 1081 }), 1070 1082 ), 1071 1083 1072 - watchRaw: (runId) => 1084 + watchIo: (runId) => 1073 1085 Stream.unwrapScoped( 1074 - Effect.zipRight(runStore.getRun(runId), Effect.succeed(watchRawLive(runId))), 1086 + Effect.zipRight(runStore.getRun(runId), Effect.succeed(watchIoLive(runId))), 1075 1087 ), 1076 1088 1077 1089 inspect: (ref) =>
+20 -9
packages/core/src/internal/observer-hub.effect.ts
··· 1 1 import { Effect, PubSub, Stream } from "effect"; 2 2 import type { MillEvent } from "../domain/event.schema"; 3 3 4 + export interface IoStreamEvent { 5 + readonly runId: string; 6 + readonly source: "driver" | "program"; 7 + readonly stream: "stdout" | "stderr"; 8 + readonly line: string; 9 + readonly timestamp: string; 10 + readonly spawnId?: string; 11 + } 12 + 4 13 const tier1PubSubByRun = new Map<string, PubSub.PubSub<MillEvent>>(); 5 - const rawPubSubByRun = new Map<string, PubSub.PubSub<string>>(); 14 + const ioPubSubByRun = new Map<string, PubSub.PubSub<IoStreamEvent>>(); 6 15 let tier1GlobalPubSub: PubSub.PubSub<MillEvent> | undefined; 7 16 8 17 const ensureTier1PubSub = (runId: string): Effect.Effect<PubSub.PubSub<MillEvent>> => ··· 20 29 ); 21 30 }); 22 31 23 - const ensureRawPubSub = (runId: string): Effect.Effect<PubSub.PubSub<string>> => 32 + const ensureIoPubSub = (runId: string): Effect.Effect<PubSub.PubSub<IoStreamEvent>> => 24 33 Effect.suspend(() => { 25 - const existing = rawPubSubByRun.get(runId); 34 + const existing = ioPubSubByRun.get(runId); 26 35 27 36 if (existing !== undefined) { 28 37 return Effect.succeed(existing); 29 38 } 30 39 31 - return Effect.tap(PubSub.unbounded<string>(), (pubSub) => 40 + return Effect.tap(PubSub.unbounded<IoStreamEvent>(), (pubSub) => 32 41 Effect.sync(() => { 33 - rawPubSubByRun.set(runId, pubSub); 42 + ioPubSubByRun.set(runId, pubSub); 34 43 }), 35 44 ); 36 45 }); ··· 58 67 ), 59 68 ); 60 69 61 - export const publishRawEvent = (runId: string, raw: string): Effect.Effect<void> => 62 - Effect.asVoid(Effect.flatMap(ensureRawPubSub(runId), (pubSub) => PubSub.publish(pubSub, raw))); 70 + export const publishIoEvent = (event: IoStreamEvent): Effect.Effect<void> => 71 + Effect.asVoid( 72 + Effect.flatMap(ensureIoPubSub(event.runId), (pubSub) => PubSub.publish(pubSub, event)), 73 + ); 63 74 64 75 export const watchTier1Live = (runId: string): Stream.Stream<MillEvent, never> => 65 76 Stream.unwrapScoped(Effect.map(ensureTier1PubSub(runId), (pubSub) => Stream.fromPubSub(pubSub))); 66 77 67 - export const watchRawLive = (runId: string): Stream.Stream<string, never> => 68 - Stream.unwrapScoped(Effect.map(ensureRawPubSub(runId), (pubSub) => Stream.fromPubSub(pubSub))); 78 + export const watchIoLive = (runId: string): Stream.Stream<IoStreamEvent, never> => 79 + Stream.unwrapScoped(Effect.map(ensureIoPubSub(runId), (pubSub) => Stream.fromPubSub(pubSub))); 69 80 70 81 export const watchTier1GlobalLive = (): Stream.Stream<MillEvent, never> => 71 82 Stream.unwrapScoped(Effect.map(ensureTier1GlobalPubSub(), (pubSub) => Stream.fromPubSub(pubSub)));
+1
packages/core/src/observer-hub.effect.ts
··· 1 + export * from "./internal/observer-hub.effect";
+155 -128
packages/core/src/public/run.api.ts
··· 2 2 import * as fs from "node:fs"; 3 3 import * as FileSystem from "@effect/platform/FileSystem"; 4 4 import * as BunContext from "@effect/platform-bun/BunContext"; 5 - import { Effect, Runtime, Stream } from "effect"; 6 - import { makeMillEngine, ProgramExecutionError, type InspectResult } from "../engine.effect"; 5 + import { Effect, Fiber, Runtime, Stream } from "effect"; 6 + import { makeMillEngine, ProgramExecutionError } from "../engine.effect"; 7 7 import { makeDriverRegistry } from "../driver-registry.effect"; 8 8 import { makeExecutorRegistry } from "../executor-registry.effect"; 9 - import { 10 - decodeRunIdSync, 11 - decodeSpawnIdSync, 12 - type RunRecord, 13 - type RunSyncOutput, 14 - } from "../run.schema"; 9 + import { type MillEvent } from "../event.schema"; 10 + import { decodeRunIdSync, type RunRecord, type RunSyncOutput } from "../run.schema"; 15 11 import { runDetachedWorker } from "../worker.effect"; 16 12 import { executeProgramInProcessHost } from "../program-host.effect"; 13 + import { publishIoEvent, type IoStreamEvent } from "../observer-hub.effect"; 17 14 import { resolveConfig } from "./config-loader.api"; 18 - import type { 19 - DriverSessionPointer, 20 - ExecutorRuntime, 21 - ExtensionRegistration, 22 - ResolveConfigOptions, 23 - } from "./types"; 15 + import type { ExecutorRuntime, ExtensionRegistration, ResolveConfigOptions } from "./types"; 24 16 25 17 const runtime = Runtime.defaultRuntime; 26 18 ··· 53 45 readonly timeoutSeconds: number; 54 46 } 55 47 48 + export type WatchChannel = "events" | "io" | "all"; 49 + export type WatchSource = "driver" | "program"; 50 + 51 + export type WatchOutput = 52 + | { 53 + readonly kind: "event"; 54 + readonly runId: string; 55 + readonly event: MillEvent; 56 + } 57 + | { 58 + readonly kind: "io"; 59 + readonly runId: string; 60 + readonly source: WatchSource; 61 + readonly stream: "stdout" | "stderr"; 62 + readonly line: string; 63 + readonly timestamp: string; 64 + readonly spawnId?: string; 65 + }; 66 + 56 67 export interface WatchRunInput extends Omit< 57 68 GetRunStatusInput, 58 69 "runId" | "pathExists" | "loadConfigModule" 59 70 > { 60 71 readonly runId?: string; 61 - readonly raw?: boolean; 72 + readonly channel?: WatchChannel; 73 + readonly source?: WatchSource; 74 + readonly spawnId?: string; 62 75 readonly sinceTimeIso?: string; 63 76 readonly pathExists?: (path: string) => Promise<boolean>; 64 77 readonly loadConfigModule?: (path: string) => Promise<unknown>; 65 78 readonly onEvent: (line: string) => void; 66 - } 67 - 68 - export interface InspectRunInput extends BaseRunInput { 69 - readonly ref: string; 70 - readonly session?: boolean; 71 79 } 72 80 73 81 export interface CancelRunInput extends GetRunStatusInput { ··· 404 412 }; 405 413 }; 406 414 407 - const parseInspectRef = ( 408 - ref: string, 409 - ): 410 - | { 411 - runId: string; 412 - spawnId?: string; 413 - } 414 - | undefined => { 415 - const [runIdPart, spawnIdPart] = ref.split("."); 416 - 417 - if (runIdPart === undefined || runIdPart.length === 0) { 418 - return undefined; 419 - } 420 - 421 - if (spawnIdPart === undefined || spawnIdPart.length === 0) { 422 - return { 423 - runId: runIdPart, 424 - }; 425 - } 426 - 427 - return { 428 - runId: runIdPart, 429 - spawnId: spawnIdPart, 430 - }; 431 - }; 432 - 433 415 const isRunTerminalEvent = (eventType: string): boolean => 434 416 eventType === "run:complete" || eventType === "run:failed" || eventType === "run:cancelled"; 435 417 ··· 443 425 return new Date(parsed).toISOString() === value; 444 426 }; 445 427 446 - export interface InspectSessionOutput extends DriverSessionPointer { 447 - readonly runId: string; 448 - readonly spawnId: string; 449 - } 428 + const toWatchEventOutput = (event: MillEvent): WatchOutput => ({ 429 + kind: "event", 430 + runId: event.runId, 431 + event, 432 + }); 433 + 434 + const toWatchIoOutput = (event: IoStreamEvent): WatchOutput => ({ 435 + kind: "io", 436 + runId: event.runId, 437 + source: event.source, 438 + stream: event.stream, 439 + line: event.line, 440 + timestamp: event.timestamp, 441 + spawnId: event.spawnId, 442 + }); 443 + 444 + const emitWatchOutput = ( 445 + onEvent: (line: string) => void, 446 + output: WatchOutput, 447 + ): Effect.Effect<void> => 448 + Effect.sync(() => { 449 + onEvent(JSON.stringify(output)); 450 + }); 451 + 452 + const filterIoEvent = ( 453 + event: IoStreamEvent, 454 + source: WatchSource | undefined, 455 + spawnId: string | undefined, 456 + ): boolean => { 457 + if (source !== undefined && event.source !== source) { 458 + return false; 459 + } 460 + 461 + if (spawnId !== undefined && event.spawnId !== spawnId) { 462 + return false; 463 + } 464 + 465 + return true; 466 + }; 450 467 451 468 export const submitRun = async (input: SubmitRunInput): Promise<RunRecord> => { 452 469 const cwd = input.cwd ?? process.cwd(); ··· 543 560 executorName: engineContext.selectedExecutorName, 544 561 extensions: engineContext.selectedExtensions, 545 562 spawn, 563 + onIo: ({ stream, line }) => 564 + Effect.flatMap( 565 + Effect.sync(() => new Date().toISOString()), 566 + (timestamp) => 567 + publishIoEvent({ 568 + runId: input.runId, 569 + source: "program", 570 + stream, 571 + line, 572 + timestamp, 573 + }), 574 + ), 546 575 }), 547 576 }), 548 577 (error) => ··· 589 618 ); 590 619 } 591 620 621 + const channel = input.channel ?? "events"; 622 + 623 + if (input.runId === undefined && channel !== "events") { 624 + return Promise.reject(new Error("watch --channel io|all requires --run <runId>.")); 625 + } 626 + 627 + if (input.runId === undefined && (input.source !== undefined || input.spawnId !== undefined)) { 628 + return Promise.reject(new Error("watch --source/--spawn requires --run <runId>.")); 629 + } 630 + 631 + if (channel === "io" && input.sinceTimeIso !== undefined) { 632 + return Promise.reject(new Error("watch --channel io does not support --since-time.")); 633 + } 634 + 635 + if (channel === "events" && (input.source !== undefined || input.spawnId !== undefined)) { 636 + return Promise.reject( 637 + new Error("watch --source/--spawn require --channel io or --channel all."), 638 + ); 639 + } 640 + 592 641 const engineContext = await makeEngineForConfig(input); 593 642 594 643 if (input.runId === undefined) { 595 - if (input.raw === true) { 596 - return Promise.reject(new Error("watch --raw requires a runId.")); 597 - } 598 - 599 644 await runWithBunContext( 600 645 Effect.scoped( 601 646 Stream.runForEach(engineContext.engine.watchAll(input.sinceTimeIso), (event) => 602 - Effect.sync(() => { 603 - input.onEvent(JSON.stringify(event)); 604 - }), 647 + emitWatchOutput(input.onEvent, toWatchEventOutput(event)), 605 648 ), 606 649 ), 607 650 ); ··· 611 654 612 655 const runId = decodeRunIdSync(input.runId); 613 656 614 - if (input.raw === true) { 657 + const eventStream = Stream.filter(engineContext.engine.watch(runId), (event) => 658 + input.sinceTimeIso === undefined ? true : event.timestamp >= input.sinceTimeIso, 659 + ); 660 + 661 + const ioStream = Stream.filter(engineContext.engine.watchIo(runId), (event) => 662 + filterIoEvent(event, input.source, input.spawnId), 663 + ); 664 + 665 + if (channel === "events") { 615 666 await runWithBunContext( 616 - Effect.raceFirst( 617 - Effect.scoped( 618 - Stream.runForEach(engineContext.engine.watchRaw(runId), (line) => 619 - Effect.sync(() => { 620 - input.onEvent(line); 621 - }), 622 - ), 667 + Effect.scoped( 668 + Stream.runForEach( 669 + Stream.takeUntil(eventStream, (event) => isRunTerminalEvent(event.type)), 670 + (event) => emitWatchOutput(input.onEvent, toWatchEventOutput(event)), 623 671 ), 624 - engineContext.engine.wait(runId, DEFAULT_SYNC_WAIT_TIMEOUT_SECONDS * 1000), 625 672 ), 626 673 ); 627 674 628 675 return; 629 676 } 630 677 631 - const watchStream = Stream.filter(engineContext.engine.watch(runId), (event) => 632 - input.sinceTimeIso === undefined ? true : event.timestamp >= input.sinceTimeIso, 633 - ); 678 + const currentRun = await runWithBunContext(engineContext.engine.status(runId)); 634 679 635 - await runWithBunContext( 636 - Effect.scoped( 637 - Stream.runForEach( 638 - Stream.takeUntil(watchStream, (event) => isRunTerminalEvent(event.type)), 639 - (event) => 640 - Effect.sync(() => { 641 - input.onEvent(JSON.stringify(event)); 642 - }), 643 - ), 644 - ), 645 - ); 646 - }; 680 + if ( 681 + currentRun.status === "complete" || 682 + currentRun.status === "failed" || 683 + currentRun.status === "cancelled" 684 + ) { 685 + if (channel === "all") { 686 + await runWithBunContext( 687 + Effect.scoped( 688 + Stream.runForEach( 689 + Stream.takeUntil(eventStream, (event) => isRunTerminalEvent(event.type)), 690 + (event) => emitWatchOutput(input.onEvent, toWatchEventOutput(event)), 691 + ), 692 + ), 693 + ); 694 + } 647 695 648 - export const inspectRun = async ( 649 - input: InspectRunInput, 650 - ): Promise<InspectResult | InspectSessionOutput> => { 651 - const parsedRef = parseInspectRef(input.ref); 652 - 653 - if (parsedRef === undefined) { 654 - return Promise.reject(new Error("inspect reference requires a runId")); 696 + return; 655 697 } 656 698 657 - const engineContext = await makeEngineForConfig(input); 658 - const inspected = await runWithBunContext( 659 - engineContext.engine.inspect({ 660 - runId: decodeRunIdSync(parsedRef.runId), 661 - spawnId: parsedRef.spawnId === undefined ? undefined : decodeSpawnIdSync(parsedRef.spawnId), 662 - }), 663 - ); 699 + if (channel === "io") { 700 + await runWithBunContext( 701 + Effect.raceFirst( 702 + Effect.scoped( 703 + Stream.runForEach(ioStream, (event) => 704 + emitWatchOutput(input.onEvent, toWatchIoOutput(event)), 705 + ), 706 + ), 707 + engineContext.engine.wait(runId, DEFAULT_SYNC_WAIT_TIMEOUT_SECONDS * 1000), 708 + ), 709 + ); 664 710 665 - if (input.session !== true) { 666 - return inspected; 711 + return; 667 712 } 668 713 669 - if (inspected.kind !== "spawn" || inspected.result === undefined) { 670 - return Promise.reject( 671 - new Error("inspect --session requires a runId.spawnId reference with completed spawn result"), 672 - ); 673 - } 674 - 675 - const resolvedConfig = await resolveConfig(input); 676 - const driverRegistry = makeDriverRegistry({ 677 - defaultDriver: resolvedConfig.config.defaultDriver, 678 - drivers: resolvedConfig.config.drivers, 679 - }); 680 - const run = await runWithBunContext( 681 - engineContext.engine.status(decodeRunIdSync(parsedRef.runId)), 682 - ); 683 - const resolvedDriver = await Runtime.runPromise(runtime)(driverRegistry.resolve(run.driver)); 714 + await runWithBunContext( 715 + Effect.scoped( 716 + Effect.gen(function* () { 717 + const ioFiber = yield* Effect.forkScoped( 718 + Stream.runForEach(ioStream, (event) => 719 + emitWatchOutput(input.onEvent, toWatchIoOutput(event)), 720 + ), 721 + ); 684 722 685 - if (resolvedDriver.runtime.resolveSession === undefined) { 686 - return Promise.reject( 687 - new Error(`Driver ${resolvedDriver.name} does not support session inspection`), 688 - ); 689 - } 723 + yield* Stream.runForEach( 724 + Stream.takeUntil(eventStream, (event) => isRunTerminalEvent(event.type)), 725 + (event) => emitWatchOutput(input.onEvent, toWatchEventOutput(event)), 726 + ); 690 727 691 - const sessionPointer = await Runtime.runPromise(runtime)( 692 - Effect.provide( 693 - resolvedDriver.runtime.resolveSession({ 694 - sessionRef: inspected.result.sessionRef, 728 + yield* Fiber.interrupt(ioFiber); 695 729 }), 696 - BunContext.layer, 697 730 ), 698 731 ); 699 - 700 - return { 701 - runId: parsedRef.runId, 702 - spawnId: inspected.spawnId, 703 - ...sessionPointer, 704 - } satisfies InspectSessionOutput; 705 732 }; 706 733 707 734 export const cancelRun = async (
+20 -1
packages/core/src/runtime/program-host.effect.ts
··· 24 24 readonly executorName: string; 25 25 readonly extensions: ReadonlyArray<ExtensionRegistration>; 26 26 readonly spawn: (input: SpawnOptions) => Effect.Effect<SpawnResult, unknown>; 27 + readonly onIo?: (input: { 28 + readonly stream: "stdout" | "stderr"; 29 + readonly line: string; 30 + }) => Effect.Effect<void>; 27 31 } 28 32 29 33 type ProgramHostResultMessage = Extract<ProgramHostInboundMessage, { readonly kind: "result" }>; ··· 322 326 Stream.runForEach(Stream.splitLines(Stream.decodeText(processHandle.stdout)), (line) => 323 327 Effect.gen(function* () { 324 328 if (!line.startsWith(ProgramHostProtocolPrefix)) { 329 + if (line.length > 0 && input.onIo !== undefined) { 330 + yield* input.onIo({ 331 + stream: "stdout", 332 + line, 333 + }); 334 + } 325 335 return; 326 336 } 327 337 ··· 408 418 409 419 const stderrFiber = yield* Effect.forkScoped( 410 420 Stream.runForEach(Stream.splitLines(Stream.decodeText(processHandle.stderr)), (line) => 411 - Ref.update(stderrLinesRef, (lines) => [...lines, line]), 421 + Effect.gen(function* () { 422 + yield* Ref.update(stderrLinesRef, (lines) => [...lines, line]); 423 + 424 + if (line.length > 0 && input.onIo !== undefined) { 425 + yield* input.onIo({ 426 + stream: "stderr", 427 + line, 428 + }); 429 + } 430 + }), 412 431 ), 413 432 ); 414 433
+1 -1
packages/pi-mill/README.md
··· 62 62 ]); 63 63 ``` 64 64 65 - Each `mill.spawn()` submits an async mill run (`mill run --json`) and then follows completion via mill APIs (`wait` + `inspect`). Model selection, driver routing, and execution behavior all come from your mill configuration. 65 + Each `mill.spawn()` submits an async mill run (`mill run --json`) and then follows completion via mill APIs (`wait` + `watch --channel events`). Model selection, driver routing, and execution behavior all come from your mill configuration. 66 66 67 67 By default, mill run storage uses mill's global default (`~/.mill/runs`) unless you explicitly pass `--runs-dir` (or set `millRunsDir`). 68 68
+1 -1
packages/pi-mill/index.ts
··· 360 360 label: "Subagent", 361 361 description: [ 362 362 "Spawn subagents for delegated or orchestrated work.", 363 - "Execution backend: mill async APIs (submit + watch + inspect). Configure drivers/executors/models via mill.config.ts.", 363 + "Execution backend: mill async APIs (submit + watch + wait). Configure drivers/executors/models via mill.config.ts.", 364 364 `Enabled models: ${modelsText}`, 365 365 "Write a TypeScript script. `mill` is a global (like `process` or `console`). Use mill.spawn() to orchestrate agents.", 366 366 "mill.spawn() returns a Promise<ExecutionResult>. Use `await` for sequential, `Promise.all` for parallel.",
+99 -39
packages/pi-mill/runtime.ts
··· 136 136 runId?: string; 137 137 } 138 138 139 - interface MillRunInspectPayload { 140 - run?: { 141 - status?: string; 142 - }; 143 - result?: { 144 - status?: string; 145 - errorMessage?: string; 146 - spawns?: ReadonlyArray<MillSpawnResult>; 139 + interface MillWatchOutputEnvelope { 140 + kind?: string; 141 + runId?: string; 142 + event?: { 143 + type?: string; 144 + payload?: Record<string, unknown>; 147 145 }; 148 146 } 149 147 ··· 186 184 } 187 185 188 186 return undefined; 187 + }; 188 + 189 + const parseJsonObjectsFromText = (text: string): Array<Record<string, unknown>> => 190 + text 191 + .split("\n") 192 + .map((line) => line.trim()) 193 + .filter((line) => line.length > 0) 194 + .flatMap((line) => { 195 + try { 196 + const parsed = JSON.parse(line) as unknown; 197 + return typeof parsed === "object" && parsed !== null 198 + ? [parsed as Record<string, unknown>] 199 + : []; 200 + } catch { 201 + return []; 202 + } 203 + }); 204 + 205 + const isRecord = (value: unknown): value is Record<string, unknown> => 206 + typeof value === "object" && value !== null; 207 + 208 + const readStringField = (record: Record<string, unknown>, key: string): string | undefined => { 209 + const value = record[key]; 210 + return typeof value === "string" ? value : undefined; 189 211 }; 190 212 191 213 const formatCommand = (command: string, args: ReadonlyArray<string>): string => ··· 328 350 } 329 351 330 352 const decodeMillResult = ( 331 - payload: MillRunInspectPayload, 353 + payloads: ReadonlyArray<Record<string, unknown>>, 332 354 fallback: { agent: string; modelId: string; prompt: string }, 333 355 ): ExecutionResult => { 334 - const spawns = payload.result?.spawns; 335 - if (!Array.isArray(spawns) || spawns.length === 0) { 336 - const failedStatus = payload.result?.status ?? payload.run?.status; 337 - const message = payload.result?.errorMessage; 356 + const spawnResults: Array<MillSpawnResult> = []; 357 + let runFailedMessage: string | undefined; 358 + 359 + for (const payload of payloads) { 360 + const envelope = payload as MillWatchOutputEnvelope; 361 + if (envelope.kind !== "event") { 362 + continue; 363 + } 364 + 365 + const event = envelope.event; 366 + if (!isRecord(event)) { 367 + continue; 368 + } 369 + 370 + const eventType = readStringField(event, "type"); 371 + const eventPayload = event.payload; 372 + 373 + if (eventType === "spawn:complete" && isRecord(eventPayload) && isRecord(eventPayload.result)) { 374 + const result = eventPayload.result; 375 + spawnResults.push({ 376 + text: readStringField(result, "text"), 377 + sessionRef: readStringField(result, "sessionRef"), 378 + agent: readStringField(result, "agent"), 379 + model: readStringField(result, "model"), 380 + driver: readStringField(result, "driver"), 381 + exitCode: typeof result.exitCode === "number" ? result.exitCode : undefined, 382 + stopReason: readStringField(result, "stopReason"), 383 + errorMessage: readStringField(result, "errorMessage"), 384 + }); 385 + continue; 386 + } 387 + 388 + if (eventType === "run:failed" && isRecord(eventPayload)) { 389 + runFailedMessage = readStringField(eventPayload, "message"); 390 + } 391 + } 392 + 393 + if (spawnResults.length === 0) { 338 394 throw new MillError({ 339 395 code: "RUNTIME", 340 396 message: 341 - message && message.length > 0 342 - ? `mill run ${failedStatus ?? "unknown"}: ${message}` 397 + runFailedMessage && runFailedMessage.length > 0 398 + ? `mill run failed: ${runFailedMessage}` 343 399 : "mill run completed without spawn results.", 344 400 recoverable: false, 345 401 }); 346 402 } 347 403 348 404 const selectedSpawn = 349 - spawns.find((spawn) => spawn.agent === fallback.agent) ?? spawns[0] ?? ({} as MillSpawnResult); 405 + spawnResults.find((spawn) => spawn.agent === fallback.agent) ?? 406 + spawnResults[0] ?? 407 + ({} as MillSpawnResult); 350 408 351 - const runStatus = payload.result?.status ?? payload.run?.status; 352 - const derivedExitCode = 353 - typeof selectedSpawn.exitCode === "number" 354 - ? selectedSpawn.exitCode 355 - : runStatus === "complete" 356 - ? 0 357 - : 1; 409 + const derivedExitCode = typeof selectedSpawn.exitCode === "number" ? selectedSpawn.exitCode : 0; 358 410 359 - const errorMessage = selectedSpawn.errorMessage ?? payload.result?.errorMessage; 411 + const errorMessage = selectedSpawn.errorMessage; 360 412 const stopReason = selectedSpawn.stopReason; 361 413 362 414 if (derivedExitCode !== 0 || stopReason === "error" || (errorMessage?.length ?? 0) > 0) { ··· 621 673 if (terminalStatus === "failed") { 622 674 throw new MillError({ 623 675 code: "RUNTIME", 624 - message: "Subagent run failed before inspect.", 676 + message: "Subagent run failed before watch replay.", 625 677 recoverable: false, 626 678 }); 627 679 } 628 680 629 - const inspectArgs = [...input.millArgs, "inspect", submittedRunId, "--json"]; 681 + const watchArgs = [ 682 + ...input.millArgs, 683 + "watch", 684 + "--run", 685 + submittedRunId, 686 + "--channel", 687 + "events", 688 + "--json", 689 + ]; 630 690 if (input.millRunsDir && input.millRunsDir.trim().length > 0) { 631 - inspectArgs.push("--runs-dir", input.millRunsDir); 691 + watchArgs.push("--runs-dir", input.millRunsDir); 632 692 } 633 693 634 - const inspected = await runCommandCapture({ 694 + const watched = await runCommandCapture({ 635 695 command: input.millCommand, 636 - args: inspectArgs, 696 + args: watchArgs, 637 697 cwd: input.cwd, 638 698 env: process.env, 639 699 signal: input.signal, 640 700 }); 641 - appendCommandLog(stdoutPath, input.millCommand, inspectArgs, inspected); 701 + appendCommandLog(stdoutPath, input.millCommand, watchArgs, watched); 642 702 643 - if (inspected.aborted || aborted) { 703 + if (watched.aborted || aborted) { 644 704 await requestRunCancel(); 645 705 throw new MillError({ 646 706 code: "CANCELLED", ··· 649 709 }); 650 710 } 651 711 652 - if (inspected.code !== 0) { 712 + if (watched.code !== 0) { 653 713 throw new MillError({ 654 714 code: "RUNTIME", 655 715 message: 656 - inspected.combined.trim().length > 0 657 - ? `mill inspect failed:\n${inspected.combined.trim()}` 658 - : "mill inspect failed.", 716 + watched.combined.trim().length > 0 717 + ? `mill watch failed:\n${watched.combined.trim()}` 718 + : "mill watch failed.", 659 719 recoverable: false, 660 720 }); 661 721 } 662 722 663 - const parsed = parseJsonObjectFromText([inspected.stdout, inspected.stderr].join("\n")); 664 - if (!parsed) { 723 + const parsedEvents = parseJsonObjectsFromText([watched.stdout, watched.stderr].join("\n")); 724 + if (parsedEvents.length === 0) { 665 725 throw new MillError({ 666 726 code: "RUNTIME", 667 - message: "mill inspect output was not valid JSON.", 727 + message: "mill watch output did not contain JSON events.", 668 728 recoverable: false, 669 729 }); 670 730 } 671 731 672 - const decoded = decodeMillResult(parsed as MillRunInspectPayload, { 732 + const decoded = decodeMillResult(parsedEvents, { 673 733 agent: input.agent, 674 734 modelId: input.modelId, 675 735 prompt: input.prompt,