programmatic subagents
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(s7): Final Hardening: inspect/session/cancel/watch Semantics

+1545 -14
+11
docs/exec-plans/active/vertical-slices.md
··· 291 291 - `bun test packages/cli/src` 292 292 - `bun test packages/driver-pi/src` 293 293 - `bun test` 294 + 295 + **Status (2026-02-23)** 296 + 297 + - ✅ Added core observer fanout hub (`packages/core/src/internal/observer-hub.effect.ts`) and wired engine tier-1/tier-2 publishing to persisted append + in-memory live subscribers. 298 + - ✅ Extended `MillEngine` with `watch`, `watchRaw`, `inspect`, `cancel`, and `list` semantics. 299 + - ✅ Hardened append path synchronization against concurrent terminal transitions by rehydrating lifecycle guard state from persisted events before each append. 300 + - ✅ Implemented `inspect` run/spawn decoded views and session bridge plumbing in `packages/core/src/public/run.api.ts`. 301 + - ✅ Added driver session bridge contract (`resolveSession`) and implemented pointer resolution for `@mill/driver-pi`. 302 + - ✅ Added CLI handlers for `watch`, `inspect`, `cancel`, and `ls` with JSON stdout contracts. 303 + - ✅ Fixed async submit detachment stdio to `ignore` so `run --json` remains non-blocking even under captured stdout in e2e contexts. 304 + - ✅ Added integration/e2e coverage for command matrix behavior across watch/inspect/session/cancel/ls, including concurrent run cancellation invariants.
+375
packages/cli/src/public/index.api.test.ts
··· 91 91 }), 92 92 ); 93 93 94 + const InspectRunEnvelope = Schema.parseJson( 95 + Schema.Struct({ 96 + kind: Schema.Literal("run"), 97 + run: Schema.Struct({ 98 + id: Schema.String, 99 + status: Schema.String, 100 + }), 101 + events: Schema.Array( 102 + Schema.Struct({ 103 + type: Schema.String, 104 + sequence: Schema.Number, 105 + }), 106 + ), 107 + }), 108 + ); 109 + 110 + const InspectSpawnEnvelope = Schema.parseJson( 111 + Schema.Struct({ 112 + kind: Schema.Literal("spawn"), 113 + runId: Schema.String, 114 + spawnId: Schema.String, 115 + result: Schema.optional( 116 + Schema.Struct({ 117 + sessionRef: Schema.String, 118 + }), 119 + ), 120 + }), 121 + ); 122 + 123 + const SessionEnvelope = Schema.parseJson( 124 + Schema.Struct({ 125 + runId: Schema.String, 126 + spawnId: Schema.String, 127 + sessionRef: Schema.String, 128 + pointer: Schema.String, 129 + driver: Schema.String, 130 + }), 131 + ); 132 + 133 + const CancelEnvelope = Schema.parseJson( 134 + Schema.Struct({ 135 + runId: Schema.String, 136 + status: Schema.String, 137 + alreadyTerminal: Schema.Boolean, 138 + }), 139 + ); 140 + 141 + const ListEnvelope = Schema.parseJson( 142 + Schema.Array( 143 + Schema.Struct({ 144 + id: Schema.String, 145 + status: Schema.String, 146 + }), 147 + ), 148 + ); 149 + 94 150 const WaitTimeoutEnvelope = Schema.parseJson( 95 151 Schema.Struct({ 96 152 ok: Schema.Literal(false), ··· 481 537 expect(waitHumanStderr).toHaveLength(0); 482 538 expect(waitHumanStdout[0]).toContain(`run ${runPayload.run.id}`); 483 539 expect(waitHumanStdout[0]).toContain("status=complete"); 540 + } finally { 541 + await rm(tempDirectory, { recursive: true, force: true }); 542 + } 543 + }); 544 + 545 + it("supports watch/inspect/session/ls JSON contracts for completed runs", async () => { 546 + const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-inspect-watch-")); 547 + const homeDirectory = join(tempDirectory, "home"); 548 + const programPath = join(tempDirectory, "program.ts"); 549 + 550 + await writeFile( 551 + programPath, 552 + [ 553 + "const scan = await mill.spawn({", 554 + ' agent: "scout",', 555 + ' systemPrompt: "You are concise.",', 556 + ' prompt: "Say hello",', 557 + "});", 558 + "return scan.text;", 559 + ].join("\n"), 560 + "utf-8", 561 + ); 562 + 563 + try { 564 + const runStdout: Array<string> = []; 565 + const runCode = await runCli(["run", programPath, "--sync", "--json"], { 566 + cwd: tempDirectory, 567 + homeDirectory, 568 + pathExists: async () => false, 569 + io: { 570 + stdout: (line) => { 571 + runStdout.push(line); 572 + }, 573 + stderr: () => undefined, 574 + }, 575 + }); 576 + 577 + expect(runCode).toBe(0); 578 + const runPayload = Schema.decodeUnknownSync(RunSyncEnvelope)(runStdout[0]); 579 + 580 + const watchStdout: Array<string> = []; 581 + const watchCode = await runCli(["watch", runPayload.run.id, "--json"], { 582 + cwd: tempDirectory, 583 + homeDirectory, 584 + pathExists: async () => false, 585 + io: { 586 + stdout: (line) => { 587 + watchStdout.push(line); 588 + }, 589 + stderr: () => undefined, 590 + }, 591 + }); 592 + 593 + expect(watchCode).toBe(0); 594 + expect(watchStdout.length).toBeGreaterThan(0); 595 + 596 + for (const line of watchStdout) { 597 + const parsed = JSON.parse(line) as { readonly type?: string; readonly runId?: string }; 598 + expect(parsed.runId).toBe(runPayload.run.id); 599 + expect(typeof parsed.type).toBe("string"); 600 + } 601 + 602 + const inspectRunStdout: Array<string> = []; 603 + const inspectRunCode = await runCli(["inspect", runPayload.run.id, "--json"], { 604 + cwd: tempDirectory, 605 + homeDirectory, 606 + pathExists: async () => false, 607 + io: { 608 + stdout: (line) => { 609 + inspectRunStdout.push(line); 610 + }, 611 + stderr: () => undefined, 612 + }, 613 + }); 614 + 615 + expect(inspectRunCode).toBe(0); 616 + const inspectedRun = Schema.decodeUnknownSync(InspectRunEnvelope)(inspectRunStdout[0]); 617 + expect(inspectedRun.run.id).toBe(runPayload.run.id); 618 + expect(inspectedRun.events.length).toBeGreaterThan(0); 619 + 620 + const inspectSpawnStdout: Array<string> = []; 621 + const inspectSpawnCode = await runCli(["inspect", `${runPayload.run.id}.spawn_1`, "--json"], { 622 + cwd: tempDirectory, 623 + homeDirectory, 624 + pathExists: async () => false, 625 + io: { 626 + stdout: (line) => { 627 + inspectSpawnStdout.push(line); 628 + }, 629 + stderr: () => undefined, 630 + }, 631 + }); 632 + 633 + expect(inspectSpawnCode).toBe(0); 634 + const inspectedSpawn = Schema.decodeUnknownSync(InspectSpawnEnvelope)(inspectSpawnStdout[0]); 635 + expect(inspectedSpawn.spawnId).toBe("spawn_1"); 636 + expect(inspectedSpawn.result?.sessionRef.length).toBeGreaterThan(0); 637 + 638 + const inspectSessionStdout: Array<string> = []; 639 + const inspectSessionCode = await runCli( 640 + ["inspect", `${runPayload.run.id}.spawn_1`, "--session", "--json"], 641 + { 642 + cwd: tempDirectory, 643 + homeDirectory, 644 + pathExists: async () => false, 645 + io: { 646 + stdout: (line) => { 647 + inspectSessionStdout.push(line); 648 + }, 649 + stderr: () => undefined, 650 + }, 651 + }, 652 + ); 653 + 654 + expect(inspectSessionCode).toBe(0); 655 + const sessionPayload = Schema.decodeUnknownSync(SessionEnvelope)(inspectSessionStdout[0]); 656 + expect(sessionPayload.runId).toBe(runPayload.run.id); 657 + expect(sessionPayload.spawnId).toBe("spawn_1"); 658 + expect(sessionPayload.pointer.length).toBeGreaterThan(0); 659 + 660 + const listStdout: Array<string> = []; 661 + const listCode = await runCli(["ls", "--json"], { 662 + cwd: tempDirectory, 663 + homeDirectory, 664 + pathExists: async () => false, 665 + io: { 666 + stdout: (line) => { 667 + listStdout.push(line); 668 + }, 669 + stderr: () => undefined, 670 + }, 671 + }); 672 + 673 + expect(listCode).toBe(0); 674 + const listPayload = Schema.decodeUnknownSync(ListEnvelope)(listStdout[0]); 675 + expect(listPayload.some((run) => run.id === runPayload.run.id)).toBe(true); 676 + } finally { 677 + await rm(tempDirectory, { recursive: true, force: true }); 678 + } 679 + }); 680 + 681 + it("cancel is idempotent and preserves terminal invariants", async () => { 682 + const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-cancel-")); 683 + const homeDirectory = join(tempDirectory, "home"); 684 + const programPath = join(tempDirectory, "program.ts"); 685 + 686 + await writeFile( 687 + programPath, 688 + [ 689 + "await new Promise((resolve) => setTimeout(resolve, 400));", 690 + "return 'done';", 691 + ].join("\n"), 692 + "utf-8", 693 + ); 694 + 695 + try { 696 + const runStdout: Array<string> = []; 697 + const runCode = await runCli(["run", programPath, "--json"], { 698 + cwd: tempDirectory, 699 + homeDirectory, 700 + pathExists: async () => false, 701 + io: { 702 + stdout: (line) => { 703 + runStdout.push(line); 704 + }, 705 + stderr: () => undefined, 706 + }, 707 + }); 708 + 709 + expect(runCode).toBe(0); 710 + const submittedRun = Schema.decodeUnknownSync(RunSubmitEnvelope)(runStdout[0]); 711 + 712 + const cancelStdout1: Array<string> = []; 713 + const cancelCode1 = await runCli(["cancel", submittedRun.runId, "--json"], { 714 + cwd: tempDirectory, 715 + homeDirectory, 716 + pathExists: async () => false, 717 + io: { 718 + stdout: (line) => { 719 + cancelStdout1.push(line); 720 + }, 721 + stderr: () => undefined, 722 + }, 723 + }); 724 + 725 + expect(cancelCode1).toBe(0); 726 + const firstCancel = Schema.decodeUnknownSync(CancelEnvelope)(cancelStdout1[0]); 727 + expect(firstCancel.runId).toBe(submittedRun.runId); 728 + expect(firstCancel.status).toBe("cancelled"); 729 + 730 + const cancelStdout2: Array<string> = []; 731 + const cancelCode2 = await runCli(["cancel", submittedRun.runId, "--json"], { 732 + cwd: tempDirectory, 733 + homeDirectory, 734 + pathExists: async () => false, 735 + io: { 736 + stdout: (line) => { 737 + cancelStdout2.push(line); 738 + }, 739 + stderr: () => undefined, 740 + }, 741 + }); 742 + 743 + expect(cancelCode2).toBe(0); 744 + const secondCancel = Schema.decodeUnknownSync(CancelEnvelope)(cancelStdout2[0]); 745 + expect(secondCancel.runId).toBe(submittedRun.runId); 746 + 747 + const inspectStdout: Array<string> = []; 748 + const inspectCode = await runCli(["inspect", submittedRun.runId, "--json"], { 749 + cwd: tempDirectory, 750 + homeDirectory, 751 + pathExists: async () => false, 752 + io: { 753 + stdout: (line) => { 754 + inspectStdout.push(line); 755 + }, 756 + stderr: () => undefined, 757 + }, 758 + }); 759 + 760 + expect(inspectCode).toBe(0); 761 + const inspectedRun = Schema.decodeUnknownSync(InspectRunEnvelope)(inspectStdout[0]); 762 + const terminalEvents = inspectedRun.events.filter( 763 + (event) => 764 + event.type === "run:complete" || 765 + event.type === "run:failed" || 766 + event.type === "run:cancelled", 767 + ); 768 + 769 + expect(terminalEvents).toHaveLength(1); 770 + expect(terminalEvents[0]?.type).toBe("run:cancelled"); 771 + 772 + await new Promise((resolve) => setTimeout(resolve, 450)); 773 + } finally { 774 + await rm(tempDirectory, { recursive: true, force: true }); 775 + } 776 + }); 777 + 778 + it("watch --raw streams tier-2 passthrough without persistence", async () => { 779 + const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-watch-raw-")); 780 + const homeDirectory = join(tempDirectory, "home"); 781 + const programPath = join(tempDirectory, "program.ts"); 782 + 783 + await writeFile( 784 + programPath, 785 + [ 786 + "const scan = await mill.spawn({", 787 + ' agent: "scout",', 788 + ' systemPrompt: "You are concise.",', 789 + ' prompt: "Say hello",', 790 + "});", 791 + "return scan.text;", 792 + ].join("\n"), 793 + "utf-8", 794 + ); 795 + 796 + try { 797 + const runStdout: Array<string> = []; 798 + const runCode = await runCli(["run", programPath, "--json"], { 799 + cwd: tempDirectory, 800 + homeDirectory, 801 + pathExists: async () => false, 802 + launchWorker: async (input) => { 803 + const workerArgs = [ 804 + "_worker", 805 + "--run-id", 806 + input.runId, 807 + "--program", 808 + input.programPath, 809 + "--runs-dir", 810 + input.runsDirectory, 811 + "--driver", 812 + input.driverName, 813 + "--executor", 814 + input.executorName, 815 + ]; 816 + 817 + setTimeout(() => { 818 + void runCli(workerArgs, { 819 + cwd: input.cwd, 820 + homeDirectory, 821 + pathExists: async () => false, 822 + io: { 823 + stdout: () => undefined, 824 + stderr: () => undefined, 825 + }, 826 + }); 827 + }, 25); 828 + }, 829 + io: { 830 + stdout: (line) => { 831 + runStdout.push(line); 832 + }, 833 + stderr: () => undefined, 834 + }, 835 + }); 836 + 837 + expect(runCode).toBe(0); 838 + const submittedRun = Schema.decodeUnknownSync(RunSubmitEnvelope)(runStdout[0]); 839 + 840 + const watchStdout: Array<string> = []; 841 + const watchCode = await runCli(["watch", submittedRun.runId, "--raw"], { 842 + cwd: tempDirectory, 843 + homeDirectory, 844 + pathExists: async () => false, 845 + io: { 846 + stdout: (line) => { 847 + watchStdout.push(line); 848 + }, 849 + stderr: () => undefined, 850 + }, 851 + }); 852 + 853 + expect(watchCode).toBe(0); 854 + expect(watchStdout.length).toBeGreaterThan(0); 855 + expect(watchStdout.some((line) => line.includes('"type":"final"'))).toBe(true); 856 + 857 + const eventsContent = await readFile(submittedRun.paths.eventsFile, "utf-8"); 858 + expect(eventsContent.includes('"type":"final"')).toBe(false); 484 859 } finally { 485 860 await rm(tempDirectory, { recursive: true, force: true }); 486 861 }
+157 -4
packages/cli/src/public/index.api.ts
··· 3 3 import * as BunContext from "@effect/platform-bun/BunContext"; 4 4 import { Effect, Runtime, Scope } from "effect"; 5 5 import { 6 + cancelRun, 6 7 createDiscoveryPayload, 7 8 defineConfig, 8 9 getRunStatus, 10 + inspectRun, 11 + listRuns, 9 12 processDriver, 10 13 runProgramSync, 11 14 runWorker, 12 15 submitRun, 13 16 waitForRun, 17 + watchRun, 14 18 type ConfigOverrides, 15 19 type LaunchWorkerInput, 16 20 } from "@mill/core"; ··· 29 33 readonly runsDirectory?: string; 30 34 readonly pathExists?: (path: string) => Promise<boolean>; 31 35 readonly loadConfigOverrides?: (path: string) => Promise<ConfigOverrides>; 36 + readonly launchWorker?: (input: LaunchWorkerInput) => Promise<void>; 32 37 readonly io?: CliIo; 33 38 } 34 39 ··· 128 133 input.executorName, 129 134 ).pipe( 130 135 Command.workingDirectory(input.cwd), 131 - Command.stdin("inherit"), 132 - Command.stdout("inherit"), 133 - Command.stderr("inherit"), 136 + Command.stdin("ignore"), 137 + Command.stdout("ignore"), 138 + Command.stderr("ignore"), 134 139 ); 135 140 136 141 await runWithBunContext( ··· 164 169 executorName: readFlagValue(argv, "--executor"), 165 170 pathExists: options.pathExists, 166 171 loadConfigOverrides: options.loadConfigOverrides, 167 - launchWorker: launchDetachedWorker, 172 + launchWorker: options.launchWorker ?? launchDetachedWorker, 168 173 } as const; 169 174 170 175 if (argv.includes("--sync")) { ··· 401 406 return 1; 402 407 }; 403 408 409 + const watchCommand = async ( 410 + argv: ReadonlyArray<string>, 411 + options: RunCliOptions, 412 + io: CliIo, 413 + ): Promise<number> => { 414 + const runId = argv[0]; 415 + 416 + if (runId === undefined) { 417 + io.stderr("Usage: mill watch <runId> [--json] [--raw]"); 418 + return 1; 419 + } 420 + 421 + await watchRun({ 422 + defaults: defaultConfig, 423 + runId, 424 + raw: argv.includes("--raw"), 425 + cwd: options.cwd, 426 + homeDirectory: options.homeDirectory, 427 + runsDirectory: readFlagValue(argv, "--runs-dir") ?? options.runsDirectory, 428 + driverName: readFlagValue(argv, "--driver"), 429 + pathExists: options.pathExists, 430 + loadConfigOverrides: options.loadConfigOverrides, 431 + onEvent: (line) => { 432 + io.stdout(line); 433 + }, 434 + }); 435 + 436 + return 0; 437 + }; 438 + 439 + const inspectCommand = async ( 440 + argv: ReadonlyArray<string>, 441 + options: RunCliOptions, 442 + io: CliIo, 443 + ): Promise<number> => { 444 + const ref = argv[0]; 445 + 446 + if (ref === undefined) { 447 + io.stderr("Usage: mill inspect <runId>[.<spawnId>] [--json] [--session]"); 448 + return 1; 449 + } 450 + 451 + const inspected = await inspectRun({ 452 + defaults: defaultConfig, 453 + ref, 454 + session: argv.includes("--session"), 455 + cwd: options.cwd, 456 + homeDirectory: options.homeDirectory, 457 + runsDirectory: readFlagValue(argv, "--runs-dir") ?? options.runsDirectory, 458 + driverName: readFlagValue(argv, "--driver"), 459 + pathExists: options.pathExists, 460 + loadConfigOverrides: options.loadConfigOverrides, 461 + }); 462 + 463 + if (argv.includes("--json")) { 464 + io.stdout(JSON.stringify(inspected)); 465 + return 0; 466 + } 467 + 468 + io.stdout(JSON.stringify(inspected, null, 2)); 469 + return 0; 470 + }; 471 + 472 + const cancelCommand = async ( 473 + argv: ReadonlyArray<string>, 474 + options: RunCliOptions, 475 + io: CliIo, 476 + ): Promise<number> => { 477 + const runId = argv[0]; 478 + 479 + if (runId === undefined) { 480 + io.stderr("Usage: mill cancel <runId> [--json]"); 481 + return 1; 482 + } 483 + 484 + const cancelled = await cancelRun({ 485 + defaults: defaultConfig, 486 + runId, 487 + cwd: options.cwd, 488 + homeDirectory: options.homeDirectory, 489 + runsDirectory: readFlagValue(argv, "--runs-dir") ?? options.runsDirectory, 490 + driverName: readFlagValue(argv, "--driver"), 491 + pathExists: options.pathExists, 492 + loadConfigOverrides: options.loadConfigOverrides, 493 + }); 494 + 495 + if (argv.includes("--json")) { 496 + io.stdout(JSON.stringify(cancelled)); 497 + return 0; 498 + } 499 + 500 + io.stdout(`run ${cancelled.runId} status=${cancelled.status}`); 501 + return 0; 502 + }; 503 + 504 + const lsCommand = async ( 505 + argv: ReadonlyArray<string>, 506 + options: RunCliOptions, 507 + io: CliIo, 508 + ): Promise<number> => { 509 + const statusFilter = readFlagValue(argv, "--status") as 510 + | "pending" 511 + | "running" 512 + | "complete" 513 + | "failed" 514 + | "cancelled" 515 + | undefined; 516 + const runs = await listRuns({ 517 + defaults: defaultConfig, 518 + status: statusFilter, 519 + cwd: options.cwd, 520 + homeDirectory: options.homeDirectory, 521 + runsDirectory: readFlagValue(argv, "--runs-dir") ?? options.runsDirectory, 522 + driverName: readFlagValue(argv, "--driver"), 523 + pathExists: options.pathExists, 524 + loadConfigOverrides: options.loadConfigOverrides, 525 + }); 526 + 527 + if (argv.includes("--json")) { 528 + io.stdout(JSON.stringify(runs)); 529 + return 0; 530 + } 531 + 532 + if (runs.length === 0) { 533 + io.stdout("No runs found."); 534 + return 0; 535 + } 536 + 537 + io.stdout(runs.map((run) => `${run.id}\t${run.status}\t${run.updatedAt}`).join("\n")); 538 + return 0; 539 + }; 540 + 404 541 export const runCli = async ( 405 542 argv: ReadonlyArray<string>, 406 543 options?: RunCliOptions, ··· 450 587 451 588 if (argv[0] === "wait") { 452 589 return waitCommand(argv.slice(1), options ?? {}, io); 590 + } 591 + 592 + if (argv[0] === "watch") { 593 + return watchCommand(argv.slice(1), options ?? {}, io); 594 + } 595 + 596 + if (argv[0] === "inspect") { 597 + return inspectCommand(argv.slice(1), options ?? {}, io); 598 + } 599 + 600 + if (argv[0] === "cancel") { 601 + return cancelCommand(argv.slice(1), options ?? {}, io); 602 + } 603 + 604 + if (argv[0] === "ls") { 605 + return lsCommand(argv.slice(1), options ?? {}, io); 453 606 } 454 607 455 608 if (argv[0] === "init") {
+225
packages/cli/src/public/index.e2e.test.ts
··· 87 87 }), 88 88 ); 89 89 90 + const CancelEnvelope = Schema.parseJson( 91 + Schema.Struct({ 92 + runId: Schema.String, 93 + status: Schema.String, 94 + alreadyTerminal: Schema.Boolean, 95 + }), 96 + ); 97 + 98 + const InspectRunEnvelope = Schema.parseJson( 99 + Schema.Struct({ 100 + kind: Schema.Literal("run"), 101 + run: Schema.Struct({ 102 + id: Schema.String, 103 + status: Schema.String, 104 + }), 105 + events: Schema.Array( 106 + Schema.Struct({ 107 + type: Schema.String, 108 + sequence: Schema.Number, 109 + }), 110 + ), 111 + }), 112 + ); 113 + 114 + const SessionEnvelope = Schema.parseJson( 115 + Schema.Struct({ 116 + runId: Schema.String, 117 + spawnId: Schema.String, 118 + sessionRef: Schema.String, 119 + pointer: Schema.String, 120 + driver: Schema.String, 121 + }), 122 + ); 123 + 124 + const ListEnvelope = Schema.parseJson( 125 + Schema.Array( 126 + Schema.Struct({ 127 + id: Schema.String, 128 + status: Schema.String, 129 + }), 130 + ), 131 + ); 132 + 90 133 const commandOutput = (command: Command.Command): Promise<string> => 91 134 Runtime.runPromise(runtime)(Effect.provide(Command.string(command), BunContext.layer)); 92 135 ··· 363 406 expect(runFile.length).toBeGreaterThan(0); 364 407 expect(eventsFile.length).toBeGreaterThan(0); 365 408 expect(resultFile.length).toBeGreaterThan(0); 409 + } finally { 410 + await rm(tempDirectory, { recursive: true, force: true }); 411 + } 412 + }); 413 + 414 + it("runs inspect/session/cancel/watch matrix across concurrent runs", async () => { 415 + const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-matrix-e2e-")); 416 + const runsDirectory = join(tempDirectory, "runs"); 417 + const completeProgramPath = join(tempDirectory, "complete.ts"); 418 + const cancelProgramPath = join(tempDirectory, "cancel.ts"); 419 + 420 + await writeFile( 421 + completeProgramPath, 422 + [ 423 + "const scan = await mill.spawn({", 424 + ' agent: "scout",', 425 + ' systemPrompt: "You are concise.",', 426 + ' prompt: "Inspect repository layout.",', 427 + "});", 428 + "return scan.text;", 429 + ].join("\n"), 430 + "utf-8", 431 + ); 432 + 433 + await writeFile( 434 + cancelProgramPath, 435 + [ 436 + "await new Promise((resolve) => setTimeout(resolve, 3000));", 437 + "return 'late-complete';", 438 + ].join("\n"), 439 + "utf-8", 440 + ); 441 + 442 + try { 443 + const submitCompleteOutput = await commandOutput( 444 + Command.make( 445 + "bun", 446 + "run", 447 + "packages/cli/src/bin/mill.ts", 448 + "run", 449 + completeProgramPath, 450 + "--json", 451 + "--runs-dir", 452 + runsDirectory, 453 + ), 454 + ); 455 + 456 + const submitCancelOutput = await commandOutput( 457 + Command.make( 458 + "bun", 459 + "run", 460 + "packages/cli/src/bin/mill.ts", 461 + "run", 462 + cancelProgramPath, 463 + "--json", 464 + "--runs-dir", 465 + runsDirectory, 466 + ), 467 + ); 468 + 469 + const completeRun = Schema.decodeUnknownSync(RunSubmitEnvelope)(submitCompleteOutput); 470 + const cancelRun = Schema.decodeUnknownSync(RunSubmitEnvelope)(submitCancelOutput); 471 + 472 + const cancelOutput = await commandOutput( 473 + Command.make( 474 + "bun", 475 + "run", 476 + "packages/cli/src/bin/mill.ts", 477 + "cancel", 478 + cancelRun.runId, 479 + "--json", 480 + "--runs-dir", 481 + runsDirectory, 482 + ), 483 + ); 484 + 485 + const cancelPayload = Schema.decodeUnknownSync(CancelEnvelope)(cancelOutput); 486 + expect(cancelPayload.runId).toBe(cancelRun.runId); 487 + expect(cancelPayload.status).toBe("cancelled"); 488 + 489 + const waitCompleteOutput = await commandOutput( 490 + Command.make( 491 + "bun", 492 + "run", 493 + "packages/cli/src/bin/mill.ts", 494 + "wait", 495 + completeRun.runId, 496 + "--timeout", 497 + "8", 498 + "--json", 499 + "--runs-dir", 500 + runsDirectory, 501 + ), 502 + ); 503 + 504 + const waitComplete = Schema.decodeUnknownSync(StatusEnvelope)(waitCompleteOutput); 505 + expect(waitComplete.status).toBe("complete"); 506 + 507 + const watchOutput = await commandOutput( 508 + Command.make( 509 + "bun", 510 + "run", 511 + "packages/cli/src/bin/mill.ts", 512 + "watch", 513 + completeRun.runId, 514 + "--json", 515 + "--runs-dir", 516 + runsDirectory, 517 + ), 518 + ); 519 + 520 + const watchLines = watchOutput 521 + .split("\n") 522 + .map((line) => line.trim()) 523 + .filter((line) => line.length > 0); 524 + 525 + expect(watchLines.length).toBeGreaterThan(0); 526 + const watchTerminalCount = watchLines 527 + .map((line) => JSON.parse(line) as { readonly type: string }) 528 + .filter( 529 + (event) => 530 + event.type === "run:complete" || 531 + event.type === "run:failed" || 532 + event.type === "run:cancelled", 533 + ).length; 534 + expect(watchTerminalCount).toBe(1); 535 + 536 + const inspectCancelledOutput = await commandOutput( 537 + Command.make( 538 + "bun", 539 + "run", 540 + "packages/cli/src/bin/mill.ts", 541 + "inspect", 542 + cancelRun.runId, 543 + "--json", 544 + "--runs-dir", 545 + runsDirectory, 546 + ), 547 + ); 548 + 549 + const inspectedCancelled = Schema.decodeUnknownSync(InspectRunEnvelope)(inspectCancelledOutput); 550 + expect(inspectedCancelled.run.status).toBe("cancelled"); 551 + expect( 552 + inspectedCancelled.events.filter((event) => event.type === "run:cancelled"), 553 + ).toHaveLength(1); 554 + 555 + const inspectSessionOutput = await commandOutput( 556 + Command.make( 557 + "bun", 558 + "run", 559 + "packages/cli/src/bin/mill.ts", 560 + "inspect", 561 + `${completeRun.runId}.spawn_1`, 562 + "--session", 563 + "--json", 564 + "--runs-dir", 565 + runsDirectory, 566 + ), 567 + ); 568 + 569 + const sessionPayload = Schema.decodeUnknownSync(SessionEnvelope)(inspectSessionOutput); 570 + expect(sessionPayload.runId).toBe(completeRun.runId); 571 + expect(sessionPayload.spawnId).toBe("spawn_1"); 572 + 573 + const listOutput = await commandOutput( 574 + Command.make( 575 + "bun", 576 + "run", 577 + "packages/cli/src/bin/mill.ts", 578 + "ls", 579 + "--json", 580 + "--runs-dir", 581 + runsDirectory, 582 + ), 583 + ); 584 + 585 + const listedRuns = Schema.decodeUnknownSync(ListEnvelope)(listOutput); 586 + const cancelledListed = listedRuns.find((item) => item.id === cancelRun.runId); 587 + const completeListed = listedRuns.find((item) => item.id === completeRun.runId); 588 + 589 + expect(cancelledListed?.status).toBe("cancelled"); 590 + expect(completeListed?.status).toBe("complete"); 366 591 } finally { 367 592 await rm(tempDirectory, { recursive: true, force: true }); 368 593 }
+213 -1
packages/core/src/internal/engine.effect.test.ts
··· 3 3 import { tmpdir } from "node:os"; 4 4 import { join } from "node:path"; 5 5 import { setTimeout as delay } from "node:timers/promises"; 6 - import { Effect } from "effect"; 6 + import { Effect, Stream } from "effect"; 7 7 import { decodeMillEventJsonSync, type MillEvent } from "../domain/event.schema"; 8 8 import { decodeRunIdSync } from "../domain/run.schema"; 9 9 import { runWithBunContext } from "../public/test-runtime.api"; ··· 305 305 _tag: "WaitTimeoutError", 306 306 runId, 307 307 }); 308 + } finally { 309 + await rm(runsDirectory, { recursive: true, force: true }); 310 + } 311 + }); 312 + 313 + it("inspect returns decoded persisted run and spawn views", async () => { 314 + const runsDirectory = await mkdtemp(join(tmpdir(), "mill-engine-inspect-")); 315 + const runId = decodeRunIdSync(`run_${crypto.randomUUID()}`); 316 + 317 + const engine = makeMillEngine({ 318 + runsDirectory, 319 + defaultModel: "openai/gpt-5.3-codex", 320 + driverName: "default", 321 + executorName: "direct", 322 + driver: testDriver, 323 + extensions: [], 324 + }); 325 + 326 + try { 327 + await runWithBunContext( 328 + engine.runSync({ 329 + runId, 330 + programPath: "/tmp/program.ts", 331 + executeProgram: (spawn) => 332 + Effect.flatMap( 333 + spawn({ 334 + agent: "scout", 335 + systemPrompt: "You are concise.", 336 + prompt: "Inspect this run", 337 + }), 338 + () => Effect.void, 339 + ), 340 + }), 341 + ); 342 + 343 + const inspectedRun = await runWithBunContext(engine.inspect({ runId })); 344 + expect(inspectedRun.kind).toBe("run"); 345 + 346 + if (inspectedRun.kind === "run") { 347 + expect(inspectedRun.run.id).toBe(runId); 348 + expect(inspectedRun.events.length).toBeGreaterThan(0); 349 + } 350 + 351 + const spawnStart = inspectedRun.events.find((event) => event.type === "spawn:start"); 352 + expect(spawnStart).toBeDefined(); 353 + 354 + if (spawnStart === undefined || spawnStart.type !== "spawn:start") { 355 + return; 356 + } 357 + 358 + const inspectedSpawn = await runWithBunContext( 359 + engine.inspect({ runId, spawnId: spawnStart.payload.spawnId }), 360 + ); 361 + 362 + expect(inspectedSpawn.kind).toBe("spawn"); 363 + 364 + if (inspectedSpawn.kind === "spawn") { 365 + expect(inspectedSpawn.spawnId).toBe(spawnStart.payload.spawnId); 366 + expect(inspectedSpawn.result?.sessionRef).toBe("session/scout"); 367 + } 368 + } finally { 369 + await rm(runsDirectory, { recursive: true, force: true }); 370 + } 371 + }); 372 + 373 + it("watch and watchRaw surface tier-1 persisted events and tier-2 raw passthrough", async () => { 374 + const runsDirectory = await mkdtemp(join(tmpdir(), "mill-engine-watch-")); 375 + const runId = decodeRunIdSync(`run_${crypto.randomUUID()}`); 376 + 377 + const driverWithRaw: DriverRuntime = { 378 + name: "test-driver-raw", 379 + spawn: (input) => 380 + Effect.succeed({ 381 + raw: [ 382 + JSON.stringify({ type: "milestone", message: `raw:${input.agent}` }), 383 + JSON.stringify({ type: "final", sessionRef: `session/${input.agent}` }), 384 + ], 385 + events: [ 386 + { 387 + type: "milestone", 388 + message: `spawned:${input.agent}`, 389 + }, 390 + ], 391 + result: { 392 + text: `driver:${input.prompt}`, 393 + sessionRef: `session/${input.agent}`, 394 + agent: input.agent, 395 + model: input.model, 396 + driver: "test-driver-raw", 397 + exitCode: 0, 398 + }, 399 + }), 400 + }; 401 + 402 + const engine = makeMillEngine({ 403 + runsDirectory, 404 + defaultModel: "openai/gpt-5.3-codex", 405 + driverName: "default", 406 + executorName: "direct", 407 + driver: driverWithRaw, 408 + extensions: [], 409 + }); 410 + 411 + try { 412 + await runWithBunContext( 413 + engine.submit({ 414 + runId, 415 + programPath: "/tmp/program.ts", 416 + }), 417 + ); 418 + 419 + const watchTier1Effect = Effect.scoped( 420 + Stream.runCollect( 421 + Stream.takeUntil(engine.watch(runId), (event) => 422 + event.type === "run:complete" || 423 + event.type === "run:failed" || 424 + event.type === "run:cancelled", 425 + ), 426 + ), 427 + ); 428 + 429 + const watchRawEffect = Effect.scoped(Stream.runCollect(Stream.take(engine.watchRaw(runId), 2))); 430 + 431 + const executionEffect = engine.runSync({ 432 + runId, 433 + programPath: "/tmp/program.ts", 434 + executeProgram: (spawn) => 435 + Effect.flatMap( 436 + spawn({ 437 + agent: "scout", 438 + systemPrompt: "You are concise.", 439 + prompt: "watch this run", 440 + }), 441 + () => Effect.void, 442 + ), 443 + }); 444 + 445 + const [tier1EventsChunk, rawEventsChunk] = await runWithBunContext( 446 + Effect.map( 447 + Effect.all([watchTier1Effect, watchRawEffect, executionEffect], { 448 + concurrency: "unbounded", 449 + }), 450 + ([tier1Events, rawEvents]) => [tier1Events, rawEvents] as const, 451 + ), 452 + ); 453 + 454 + const tier1Events = [...tier1EventsChunk]; 455 + const rawEvents = [...rawEventsChunk]; 456 + 457 + expect(tier1Events.some((event) => event.type === "run:start")).toBe(true); 458 + expect(tier1Events.some((event) => event.type === "run:complete")).toBe(true); 459 + expect(rawEvents).toHaveLength(2); 460 + expect(rawEvents[0]).toContain("raw:scout"); 461 + 462 + const eventsFile = await readFile(join(runsDirectory, runId, "events.ndjson"), "utf-8"); 463 + expect(eventsFile.includes("\"type\":\"final\"")).toBe(false); 464 + } finally { 465 + await rm(runsDirectory, { recursive: true, force: true }); 466 + } 467 + }); 468 + 469 + it("cancel is idempotent and appends at most one run:cancelled terminal event", async () => { 470 + const runsDirectory = await mkdtemp(join(tmpdir(), "mill-engine-cancel-")); 471 + const runId = decodeRunIdSync(`run_${crypto.randomUUID()}`); 472 + 473 + const store = makeRunStore({ runsDirectory }); 474 + const engine = makeMillEngine({ 475 + runsDirectory, 476 + defaultModel: "openai/gpt-5.3-codex", 477 + driverName: "default", 478 + executorName: "direct", 479 + driver: testDriver, 480 + extensions: [], 481 + }); 482 + 483 + try { 484 + await runWithBunContext( 485 + store.create({ 486 + runId, 487 + programPath: "/tmp/program.ts", 488 + driver: "default", 489 + executor: "direct", 490 + status: "running", 491 + timestamp: "2026-02-23T20:00:00.000Z", 492 + }), 493 + ); 494 + 495 + await runWithBunContext( 496 + store.appendEvent(runId, { 497 + schemaVersion: 1, 498 + runId, 499 + sequence: 1, 500 + timestamp: "2026-02-23T20:00:00.000Z", 501 + type: "run:start", 502 + payload: { 503 + programPath: "/tmp/program.ts", 504 + }, 505 + }), 506 + ); 507 + 508 + await runWithBunContext(engine.cancel(runId)); 509 + await runWithBunContext(engine.cancel(runId)); 510 + 511 + const run = await runWithBunContext(engine.status(runId)); 512 + expect(run.status).toBe("cancelled"); 513 + 514 + const events = await runWithBunContext(store.readEvents(runId)); 515 + const cancelledCount = events.filter((event) => event.type === "run:cancelled").length; 516 + const terminalCount = events.filter((event) => runTerminalTypes.has(event.type)).length; 517 + 518 + expect(cancelledCount).toBe(1); 519 + expect(terminalCount).toBe(1); 308 520 } finally { 309 521 await rm(runsDirectory, { recursive: true, force: true }); 310 522 }
+250 -5
packages/core/src/internal/engine.effect.ts
··· 1 - import { Cause, Clock, Data, Effect, Exit, Ref } from "effect"; 1 + import { Cause, Clock, Data, Effect, Exit, Ref, Stream } from "effect"; 2 2 import { 3 3 makeEventEnvelope, 4 4 type MillEvent, ··· 13 13 type RunId, 14 14 type RunResult, 15 15 type RunSyncOutput, 16 + type SpawnId, 16 17 } from "../domain/run.schema"; 17 18 import { decodeSpawnResult, type SpawnOptions, type SpawnResult } from "../domain/spawn.schema"; 18 19 import type { DriverRuntime, ExtensionContext, ExtensionRegistration } from "../public/types"; ··· 29 30 makeRunStore, 30 31 type RunStore, 31 32 } from "./run-store.effect"; 33 + import { 34 + publishRawEvent, 35 + publishTier1Event, 36 + watchRawLive, 37 + watchTier1Live, 38 + } from "./observer-hub.effect"; 32 39 33 40 export class ConfigError extends Data.TaggedError("ConfigError")<{ message: string }> {} 34 41 ··· 59 66 ) => Effect.Effect<unknown, ProgramExecutionError>; 60 67 } 61 68 69 + export interface InspectRef { 70 + readonly runId: RunId; 71 + readonly spawnId?: SpawnId; 72 + } 73 + 74 + export type InspectResult = 75 + | { 76 + readonly kind: "run"; 77 + readonly run: RunSyncOutput["run"]; 78 + readonly events: ReadonlyArray<MillEvent>; 79 + readonly result: RunResult | undefined; 80 + } 81 + | { 82 + readonly kind: "spawn"; 83 + readonly runId: RunId; 84 + readonly spawnId: SpawnId; 85 + readonly events: ReadonlyArray<MillEvent>; 86 + readonly result: SpawnResult | undefined; 87 + }; 88 + 89 + export interface CancelResult { 90 + readonly run: RunSyncOutput["run"]; 91 + readonly alreadyTerminal: boolean; 92 + } 93 + 62 94 export interface MillEngine { 63 95 readonly submit: (input: RunSubmitInput) => Effect.Effect<RunSyncOutput["run"], PersistenceError>; 64 96 readonly runSync: ( ··· 80 112 RunSyncOutput["run"], 81 113 RunNotFoundError | PersistenceError | LifecycleInvariantError | WaitTimeoutError 82 114 >; 115 + readonly list: (status?: RunSyncOutput["run"]["status"]) => Effect.Effect< 116 + ReadonlyArray<RunSyncOutput["run"]>, 117 + PersistenceError 118 + >; 119 + readonly watch: (runId: RunId) => Stream.Stream<MillEvent, RunNotFoundError | PersistenceError>; 120 + readonly watchRaw: (runId: RunId) => Stream.Stream<string, RunNotFoundError | PersistenceError>; 121 + readonly inspect: ( 122 + ref: InspectRef, 123 + ) => Effect.Effect<InspectResult, RunNotFoundError | PersistenceError>; 124 + readonly cancel: ( 125 + runId: RunId, 126 + reason?: string, 127 + ) => Effect.Effect<CancelResult, RunNotFoundError | PersistenceError | LifecycleInvariantError>; 83 128 } 84 129 85 130 export interface MakeMillEngineInput { ··· 106 151 const nextSequence = (sequenceRef: Ref.Ref<number>): Effect.Effect<number> => 107 152 Ref.updateAndGet(sequenceRef, (current) => current + 1); 108 153 154 + const toPersistenceError = ( 155 + runId: RunId, 156 + error: RunNotFoundError | PersistenceError, 157 + ): PersistenceError => { 158 + if (error._tag === "PersistenceError") { 159 + return error; 160 + } 161 + 162 + return new PersistenceError({ 163 + path: runId, 164 + message: `Run ${runId} not found while appending event.`, 165 + }); 166 + }; 167 + 168 + const synchronizeAppendState = ( 169 + lifecycleStateRef: Ref.Ref<LifecycleGuardState>, 170 + sequenceRef: Ref.Ref<number>, 171 + runStore: RunStore, 172 + runId: RunId, 173 + ): Effect.Effect<LifecycleGuardState, PersistenceError | LifecycleInvariantError> => 174 + Effect.gen(function* () { 175 + const persistedEvents = yield* Effect.mapError( 176 + runStore.readEvents(runId), 177 + (error) => toPersistenceError(runId, error), 178 + ); 179 + 180 + let lifecycleState = initialLifecycleGuardState; 181 + 182 + for (const persistedEvent of persistedEvents) { 183 + lifecycleState = yield* applyLifecycleTransition(lifecycleState, persistedEvent); 184 + } 185 + 186 + const maxPersistedSequence = persistedEvents.reduce( 187 + (currentMax, event) => (event.sequence > currentMax ? event.sequence : currentMax), 188 + 0, 189 + ); 190 + 191 + yield* Ref.set(lifecycleStateRef, lifecycleState); 192 + yield* Ref.set(sequenceRef, maxPersistedSequence); 193 + 194 + return lifecycleState; 195 + }); 196 + 109 197 const appendTier1Event = ( 110 198 lifecycleStateRef: Ref.Ref<LifecycleGuardState>, 111 199 sequenceRef: Ref.Ref<number>, ··· 114 202 eventBuilder: (sequence: number, timestamp: string) => MillEvent, 115 203 ): Effect.Effect<void, PersistenceError | LifecycleInvariantError> => 116 204 Effect.gen(function* () { 205 + const synchronizedState = yield* synchronizeAppendState( 206 + lifecycleStateRef, 207 + sequenceRef, 208 + runStore, 209 + runId, 210 + ); 117 211 const sequence = yield* nextSequence(sequenceRef); 118 212 const timestamp = yield* toIsoTimestamp; 119 213 const event = eventBuilder(sequence, timestamp); 120 - const lifecycleState = yield* Ref.get(lifecycleStateRef); 121 - const nextState = yield* applyLifecycleTransition(lifecycleState, event); 214 + const nextState = yield* applyLifecycleTransition(synchronizedState, event); 122 215 123 216 yield* Ref.set(lifecycleStateRef, nextState); 124 217 yield* runStore.appendEvent(runId, event); 218 + yield* publishTier1Event(runId, event); 125 219 }); 126 220 127 221 const appendExtensionErrorEvent = ( ··· 240 334 eventBuilder: (sequence: number, timestamp: string) => MillEvent, 241 335 ): Effect.Effect<void, PersistenceError | LifecycleInvariantError> => 242 336 Effect.gen(function* () { 337 + const synchronizedState = yield* synchronizeAppendState( 338 + lifecycleStateRef, 339 + sequenceRef, 340 + runStore, 341 + runId, 342 + ); 243 343 const sequence = yield* nextSequence(sequenceRef); 244 344 const timestamp = yield* toIsoTimestamp; 245 345 const event = eventBuilder(sequence, timestamp); 246 - const lifecycleState = yield* Ref.get(lifecycleStateRef); 247 - const nextState = yield* applyLifecycleTransition(lifecycleState, event); 346 + const nextState = yield* applyLifecycleTransition(synchronizedState, event); 248 347 249 348 yield* Ref.set(lifecycleStateRef, nextState); 250 349 yield* runStore.appendEvent(runId, event); 350 + yield* publishTier1Event(runId, event); 251 351 yield* runExtensionOnEventHooks( 252 352 extensions, 253 353 extensionContext, ··· 345 445 }), 346 446 ); 347 447 448 + const terminalEventForRun = (event: MillEvent): boolean => 449 + event.type === "run:complete" || event.type === "run:failed" || event.type === "run:cancelled"; 450 + 451 + const isSpawnEventForSpawn = (event: MillEvent, spawnId: SpawnId): boolean => { 452 + if (event.type === "spawn:start") { 453 + return event.payload.spawnId === spawnId; 454 + } 455 + 456 + if (event.type === "spawn:milestone") { 457 + return event.payload.spawnId === spawnId; 458 + } 459 + 460 + if (event.type === "spawn:tool_call") { 461 + return event.payload.spawnId === spawnId; 462 + } 463 + 464 + if (event.type === "spawn:error") { 465 + return event.payload.spawnId === spawnId; 466 + } 467 + 468 + if (event.type === "spawn:complete") { 469 + return event.payload.spawnId === spawnId; 470 + } 471 + 472 + if (event.type === "spawn:cancelled") { 473 + return event.payload.spawnId === spawnId; 474 + } 475 + 476 + return false; 477 + }; 478 + 479 + const spawnResultFromEvents = ( 480 + events: ReadonlyArray<MillEvent>, 481 + spawnId: SpawnId, 482 + ): SpawnResult | undefined => { 483 + const completion = events.find( 484 + (event): event is Extract<MillEvent, { type: "spawn:complete" }> => 485 + event.type === "spawn:complete" && event.payload.spawnId === spawnId, 486 + ); 487 + 488 + if (completion === undefined) { 489 + return undefined; 490 + } 491 + 492 + return completion.payload.result; 493 + }; 494 + 348 495 export const makeMillEngine = (input: MakeMillEngineInput): MillEngine => { 349 496 const runStore = makeRunStore({ 350 497 runsDirectory: input.runsDirectory, ··· 574 721 ); 575 722 } 576 723 724 + for (const rawLine of driverOutputExit.value.raw ?? []) { 725 + yield* publishRawEvent(runInput.runId, rawLine); 726 + } 727 + 577 728 for (const driverEvent of driverOutputExit.value.events) { 578 729 if (driverEvent.type === "milestone") { 579 730 const milestoneEvent: Omit< ··· 790 941 }), 791 942 ); 792 943 }, 944 + 945 + list: (status) => runStore.listRuns(status), 946 + 947 + watch: (runId) => 948 + Stream.unwrapScoped( 949 + Effect.map(runStore.readEvents(runId), (persistedEvents) => 950 + Stream.concat(Stream.fromIterable(persistedEvents), watchTier1Live(runId)), 951 + ), 952 + ), 953 + 954 + watchRaw: (runId) => 955 + Stream.unwrapScoped(Effect.zipRight(runStore.getRun(runId), Effect.succeed(watchRawLive(runId)))), 956 + 957 + inspect: (ref) => 958 + Effect.gen(function* () { 959 + const run = yield* runStore.getRun(ref.runId); 960 + const events = yield* runStore.readEvents(ref.runId); 961 + 962 + if (ref.spawnId === undefined) { 963 + const result = yield* runStore.getResult(ref.runId); 964 + 965 + return { 966 + kind: "run", 967 + run, 968 + events, 969 + result, 970 + } satisfies InspectResult; 971 + } 972 + 973 + const spawnEvents = events.filter((event) => isSpawnEventForSpawn(event, ref.spawnId)); 974 + 975 + return { 976 + kind: "spawn", 977 + runId: ref.runId, 978 + spawnId: ref.spawnId, 979 + events: spawnEvents, 980 + result: spawnResultFromEvents(events, ref.spawnId), 981 + } satisfies InspectResult; 982 + }), 983 + 984 + cancel: (runId, reason) => 985 + Effect.gen(function* () { 986 + const run = yield* runStore.getRun(runId); 987 + 988 + if (isRunTerminalStatus(run.status)) { 989 + return { 990 + run, 991 + alreadyTerminal: true, 992 + } satisfies CancelResult; 993 + } 994 + 995 + const events = yield* runStore.readEvents(runId); 996 + const alreadyTerminalEvent = events.some(terminalEventForRun); 997 + 998 + if (!alreadyTerminalEvent) { 999 + let lifecycleState = initialLifecycleGuardState; 1000 + 1001 + for (const event of events) { 1002 + lifecycleState = yield* applyLifecycleTransition(lifecycleState, event); 1003 + } 1004 + 1005 + const maxSequence = events.reduce( 1006 + (currentMax, event) => (event.sequence > currentMax ? event.sequence : currentMax), 1007 + 0, 1008 + ); 1009 + 1010 + const lifecycleStateRef = yield* Ref.make(lifecycleState); 1011 + const sequenceRef = yield* Ref.make(maxSequence); 1012 + 1013 + yield* Effect.catchTag( 1014 + appendTier1Event(lifecycleStateRef, sequenceRef, runStore, runId, (sequence, timestamp) => ({ 1015 + ...makeEventEnvelope(runId, sequence, timestamp), 1016 + type: "run:cancelled", 1017 + payload: { 1018 + reason, 1019 + }, 1020 + })), 1021 + "LifecycleInvariantError", 1022 + () => Effect.void, 1023 + ); 1024 + } 1025 + 1026 + const cancelledAt = yield* toIsoTimestamp; 1027 + const cancelledRun = yield* Effect.catchTag( 1028 + runStore.setStatus(runId, "cancelled", cancelledAt), 1029 + "LifecycleInvariantError", 1030 + () => runStore.getRun(runId), 1031 + ); 1032 + 1033 + return { 1034 + run: cancelledRun, 1035 + alreadyTerminal: false, 1036 + } satisfies CancelResult; 1037 + }), 793 1038 }; 794 1039 };
+4
packages/core/src/internal/lifecycle-guard.effect.ts
··· 128 128 return Effect.void; 129 129 } 130 130 131 + if (current === "pending" && next === "cancelled") { 132 + return Effect.void; 133 + } 134 + 131 135 if (current === "running" && (next === "running" || isTerminalStatus(next))) { 132 136 return Effect.void; 133 137 }
+47
packages/core/src/internal/observer-hub.effect.ts
··· 1 + import { Effect, PubSub, Stream } from "effect"; 2 + import type { MillEvent } from "../domain/event.schema"; 3 + 4 + const tier1PubSubByRun = new Map<string, PubSub.PubSub<MillEvent>>(); 5 + const rawPubSubByRun = new Map<string, PubSub.PubSub<string>>(); 6 + 7 + const ensureTier1PubSub = (runId: string): Effect.Effect<PubSub.PubSub<MillEvent>> => 8 + Effect.suspend(() => { 9 + const existing = tier1PubSubByRun.get(runId); 10 + 11 + if (existing !== undefined) { 12 + return Effect.succeed(existing); 13 + } 14 + 15 + return Effect.tap(PubSub.unbounded<MillEvent>(), (pubSub) => 16 + Effect.sync(() => { 17 + tier1PubSubByRun.set(runId, pubSub); 18 + }), 19 + ); 20 + }); 21 + 22 + const ensureRawPubSub = (runId: string): Effect.Effect<PubSub.PubSub<string>> => 23 + Effect.suspend(() => { 24 + const existing = rawPubSubByRun.get(runId); 25 + 26 + if (existing !== undefined) { 27 + return Effect.succeed(existing); 28 + } 29 + 30 + return Effect.tap(PubSub.unbounded<string>(), (pubSub) => 31 + Effect.sync(() => { 32 + rawPubSubByRun.set(runId, pubSub); 33 + }), 34 + ); 35 + }); 36 + 37 + export const publishTier1Event = (runId: string, event: MillEvent): Effect.Effect<void> => 38 + Effect.asVoid(Effect.flatMap(ensureTier1PubSub(runId), (pubSub) => PubSub.publish(pubSub, event))); 39 + 40 + export const publishRawEvent = (runId: string, raw: string): Effect.Effect<void> => 41 + Effect.asVoid(Effect.flatMap(ensureRawPubSub(runId), (pubSub) => PubSub.publish(pubSub, raw))); 42 + 43 + export const watchTier1Live = (runId: string): Stream.Stream<MillEvent, never> => 44 + Stream.unwrapScoped(Effect.map(ensureTier1PubSub(runId), (pubSub) => Stream.fromPubSub(pubSub))); 45 + 46 + export const watchRawLive = (runId: string): Stream.Stream<string, never> => 47 + Stream.unwrapScoped(Effect.map(ensureRawPubSub(runId), (pubSub) => Stream.fromPubSub(pubSub)));
+48
packages/core/src/internal/run-store.effect.ts
··· 2 2 import { Data, Effect } from "effect"; 3 3 import { decodeMillEventJson, encodeMillEventJson, type MillEvent } from "../domain/event.schema"; 4 4 import { 5 + decodeRunId, 5 6 decodeRunRecordJson, 6 7 decodeRunResultJson, 7 8 type RunId, ··· 46 47 readonly getResult: ( 47 48 runId: RunId, 48 49 ) => Effect.Effect<RunResult | undefined, RunNotFoundError | PersistenceError>; 50 + readonly listRuns: (status?: RunRecord["status"]) => Effect.Effect<ReadonlyArray<RunRecord>, PersistenceError>; 49 51 } 50 52 51 53 export interface MakeRunStoreInput { ··· 212 214 message: toMessage(error), 213 215 }), 214 216 ); 217 + }), 218 + 219 + listRuns: (status) => 220 + Effect.gen(function* () { 221 + const fileSystem = yield* FileSystem.FileSystem; 222 + const runsDirectoryExists = yield* mapPersistenceError(input.runsDirectory)( 223 + fileSystem.exists(input.runsDirectory), 224 + ); 225 + 226 + if (!runsDirectoryExists) { 227 + return []; 228 + } 229 + 230 + const runDirectories = yield* mapPersistenceError(input.runsDirectory)( 231 + fileSystem.readDirectory(input.runsDirectory), 232 + ); 233 + 234 + const loadedRuns = yield* Effect.forEach( 235 + runDirectories, 236 + (runDirectory) => 237 + Effect.gen(function* () { 238 + const decodedRunId = yield* Effect.either(decodeRunId(runDirectory)); 239 + 240 + if (decodedRunId._tag === "Left") { 241 + return undefined; 242 + } 243 + 244 + const maybeRun = yield* Effect.either(storeGetRun(input.runsDirectory, decodedRunId.right)); 245 + 246 + if (maybeRun._tag === "Left") { 247 + return undefined; 248 + } 249 + 250 + return maybeRun.right; 251 + }), 252 + { 253 + concurrency: "unbounded", 254 + }, 255 + ); 256 + 257 + const filteredRuns = loadedRuns 258 + .filter((run): run is RunRecord => run !== undefined) 259 + .filter((run) => (status === undefined ? true : run.status === status)) 260 + .sort((left, right) => right.createdAt.localeCompare(left.createdAt)); 261 + 262 + return filteredRuns; 215 263 }), 216 264 }); 217 265
+164 -3
packages/core/src/public/run.api.ts
··· 1 1 import * as FileSystem from "@effect/platform/FileSystem"; 2 2 import * as BunContext from "@effect/platform-bun/BunContext"; 3 - import { Effect, Runtime } from "effect"; 4 - import { makeMillEngine, ProgramExecutionError } from "../engine.effect"; 3 + import { Effect, Runtime, Stream } from "effect"; 4 + import { 5 + makeMillEngine, 6 + ProgramExecutionError, 7 + type InspectResult, 8 + } from "../engine.effect"; 5 9 import { makeDriverRegistry } from "../driver-registry.effect"; 6 10 import { makeExecutorRegistry } from "../executor-registry.effect"; 7 - import { decodeRunIdSync, type RunRecord, type RunSyncOutput } from "../run.schema"; 11 + import { 12 + decodeRunIdSync, 13 + decodeSpawnIdSync, 14 + type RunRecord, 15 + type RunSyncOutput, 16 + } from "../run.schema"; 8 17 import { runDetachedWorker } from "../worker.effect"; 9 18 import { decodeSpawnOptions } from "../spawn.schema"; 10 19 import { resolveConfig } from "./config-loader.api"; 11 20 import type { 12 21 ConfigOverrides, 22 + DriverSessionPointer, 13 23 ExecutorRuntime, 14 24 ExtensionRegistration, 15 25 ResolveConfigOptions, ··· 62 72 63 73 export interface WaitForRunInput extends GetRunStatusInput { 64 74 readonly timeoutSeconds: number; 75 + } 76 + 77 + export interface WatchRunInput extends GetRunStatusInput { 78 + readonly raw?: boolean; 79 + readonly onEvent: (line: string) => void; 80 + } 81 + 82 + export interface InspectRunInput extends BaseRunInput { 83 + readonly ref: string; 84 + readonly session?: boolean; 85 + } 86 + 87 + export interface CancelRunInput extends GetRunStatusInput { 88 + readonly reason?: string; 89 + } 90 + 91 + export interface ListRunsInput extends BaseRunInput { 92 + readonly status?: RunRecord["status"]; 65 93 } 66 94 67 95 export interface RunWorkerInput extends BaseRunInput { ··· 240 268 }; 241 269 }; 242 270 271 + const parseInspectRef = (ref: string): { runId: string; spawnId?: string } => { 272 + const [runIdPart, spawnIdPart] = ref.split("."); 273 + 274 + if (runIdPart === undefined || runIdPart.length === 0) { 275 + throw new Error("inspect reference requires a runId"); 276 + } 277 + 278 + if (spawnIdPart === undefined || spawnIdPart.length === 0) { 279 + return { 280 + runId: runIdPart, 281 + }; 282 + } 283 + 284 + return { 285 + runId: runIdPart, 286 + spawnId: spawnIdPart, 287 + }; 288 + }; 289 + 290 + const isRunTerminalEvent = (eventType: string): boolean => 291 + eventType === "run:complete" || eventType === "run:failed" || eventType === "run:cancelled"; 292 + 293 + export interface InspectSessionOutput extends DriverSessionPointer { 294 + readonly runId: string; 295 + readonly spawnId: string; 296 + } 297 + 243 298 export const submitRun = async (input: SubmitRunInput): Promise<RunRecord> => { 244 299 const cwd = input.cwd ?? process.cwd(); 245 300 const programPath = resolveProgramPath(cwd, input.programPath); ··· 358 413 359 414 throw waitOutcome.left; 360 415 }; 416 + 417 + export const watchRun = async (input: WatchRunInput): Promise<void> => { 418 + const engineContext = await makeEngineForConfig(input); 419 + const runId = decodeRunIdSync(input.runId); 420 + 421 + if (input.raw === true) { 422 + await runWithBunContext( 423 + Effect.raceFirst( 424 + Effect.scoped( 425 + Stream.runForEach(engineContext.engine.watchRaw(runId), (line) => 426 + Effect.sync(() => { 427 + input.onEvent(line); 428 + }), 429 + ), 430 + ), 431 + engineContext.engine.wait(runId, DEFAULT_SYNC_WAIT_TIMEOUT_SECONDS * 1000), 432 + ), 433 + ); 434 + 435 + return; 436 + } 437 + 438 + await runWithBunContext( 439 + Effect.scoped( 440 + Stream.runForEach( 441 + Stream.takeUntil(engineContext.engine.watch(runId), (event) => isRunTerminalEvent(event.type)), 442 + (event) => 443 + Effect.sync(() => { 444 + input.onEvent(JSON.stringify(event)); 445 + }), 446 + ), 447 + ), 448 + ); 449 + }; 450 + 451 + export const inspectRun = async ( 452 + input: InspectRunInput, 453 + ): Promise<InspectResult | InspectSessionOutput> => { 454 + const parsedRef = parseInspectRef(input.ref); 455 + const engineContext = await makeEngineForConfig(input); 456 + const inspected = await runWithBunContext( 457 + engineContext.engine.inspect({ 458 + runId: decodeRunIdSync(parsedRef.runId), 459 + spawnId: 460 + parsedRef.spawnId === undefined ? undefined : decodeSpawnIdSync(parsedRef.spawnId), 461 + }), 462 + ); 463 + 464 + if (input.session !== true) { 465 + return inspected; 466 + } 467 + 468 + if (inspected.kind !== "spawn" || inspected.result === undefined) { 469 + throw new Error("inspect --session requires a runId.spawnId reference with completed spawn result"); 470 + } 471 + 472 + const resolvedConfig = await resolveConfig(input); 473 + const driverRegistry = makeDriverRegistry({ 474 + defaultDriver: resolvedConfig.config.defaultDriver, 475 + drivers: resolvedConfig.config.drivers, 476 + }); 477 + const run = await runWithBunContext(engineContext.engine.status(decodeRunIdSync(parsedRef.runId))); 478 + const resolvedDriver = await Runtime.runPromise(runtime)(driverRegistry.resolve(run.driver)); 479 + 480 + if (resolvedDriver.runtime.resolveSession === undefined) { 481 + throw new Error(`Driver ${resolvedDriver.name} does not support session inspection`); 482 + } 483 + 484 + const sessionPointer = await Runtime.runPromise(runtime)( 485 + Effect.provide( 486 + resolvedDriver.runtime.resolveSession({ 487 + sessionRef: inspected.result.sessionRef, 488 + }), 489 + BunContext.layer, 490 + ), 491 + ); 492 + 493 + return { 494 + runId: parsedRef.runId, 495 + spawnId: inspected.spawnId, 496 + ...sessionPointer, 497 + } satisfies InspectSessionOutput; 498 + }; 499 + 500 + export const cancelRun = async (input: CancelRunInput): Promise<{ 501 + runId: string; 502 + status: RunRecord["status"]; 503 + alreadyTerminal: boolean; 504 + }> => { 505 + const engineContext = await makeEngineForConfig(input); 506 + const cancelled = await runWithBunContext( 507 + engineContext.engine.cancel(decodeRunIdSync(input.runId), input.reason), 508 + ); 509 + 510 + return { 511 + runId: cancelled.run.id, 512 + status: cancelled.run.status, 513 + alreadyTerminal: cancelled.alreadyTerminal, 514 + }; 515 + }; 516 + 517 + export const listRuns = async (input: ListRunsInput): Promise<ReadonlyArray<RunRecord>> => { 518 + const engineContext = await makeEngineForConfig(input); 519 + 520 + return runWithBunContext(engineContext.engine.list(input.status)); 521 + };
+10
packages/core/src/public/types.ts
··· 39 39 40 40 export interface DriverSpawnOutput { 41 41 readonly events: ReadonlyArray<DriverSpawnEvent>; 42 + readonly raw?: ReadonlyArray<string>; 42 43 readonly result: SpawnOutput; 43 44 } 44 45 46 + export interface DriverSessionPointer { 47 + readonly driver: string; 48 + readonly sessionRef: string; 49 + readonly pointer: string; 50 + } 51 + 45 52 export interface DriverRuntime { 46 53 readonly name: string; 47 54 readonly spawn: (input: DriverSpawnInput) => Effect.Effect<DriverSpawnOutput, unknown>; 55 + readonly resolveSession?: (input: { 56 + readonly sessionRef: string; 57 + }) => Effect.Effect<DriverSessionPointer, unknown>; 48 58 } 49 59 50 60 export interface ExecutorRunInput {
+17 -1
packages/driver-pi/src/internal/process-driver.effect.ts
··· 41 41 }), 42 42 ); 43 43 44 - return yield* Effect.mapError( 44 + const decoded = yield* Effect.mapError( 45 45 decodePiProcessOutput(stdout), 46 46 (error) => 47 47 new PiProcessDriverError({ 48 48 message: toMessage(error), 49 49 }), 50 50 ); 51 + 52 + const raw = stdout 53 + .split("\n") 54 + .map((line) => line.trim()) 55 + .filter((line) => line.length > 0); 56 + 57 + return { 58 + ...decoded, 59 + raw, 60 + }; 61 + }), 62 + resolveSession: ({ sessionRef }) => 63 + Effect.succeed({ 64 + driver: "pi", 65 + sessionRef, 66 + pointer: `pi://session/${encodeURIComponent(sessionRef)}`, 51 67 }), 52 68 });
+24
packages/driver-pi/src/public/index.api.test.ts
··· 52 52 expect(output.result.exitCode).toBe(0); 53 53 }); 54 54 55 + it("resolves session pointers for inspect --session bridge", async () => { 56 + const driver = createPiDriverRegistration(); 57 + 58 + expect(driver.runtime).toBeDefined(); 59 + 60 + if (driver.runtime === undefined) { 61 + throw new Error("driver runtime is required"); 62 + } 63 + 64 + expect(driver.runtime.resolveSession).toBeDefined(); 65 + 66 + if (driver.runtime.resolveSession === undefined) { 67 + throw new Error("resolveSession bridge is required"); 68 + } 69 + 70 + const session = await Runtime.runPromise(runtime)( 71 + Effect.provide(driver.runtime.resolveSession({ sessionRef: "session/scout" }), BunContext.layer), 72 + ); 73 + 74 + expect(session.sessionRef).toBe("session/scout"); 75 + expect(session.driver).toBe("pi"); 76 + expect(session.pointer.length).toBeGreaterThan(0); 77 + }); 78 + 55 79 it("rejects malformed duplicate terminal output fixtures", async () => { 56 80 const driver = createPiDriverRegistration({ 57 81 process: {