personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

supervisor: add convey-readiness barrier + gate TaskQueue

Eliminates the startup race where sense/cortex called require_solstone()
before convey's TCP listener was bound, producing false ERROR /
Restarting noise and dropping startup-catchup tasks with exit code 1.

- think/supervisor.py: new wait_for_convey_ready() called between
start_convey_server() and start_sense/cortex. TaskQueue gains ready
flag + _pending buffer + set_ready() drain so startup catchup tasks
submit before convey is up but dispatch after. main() sets
SOL_SUPERVISOR_SPAWNED=1 before any _launch_process so children
inherit it.
- think/utils.py: EXIT_TEMPFAIL hoisted here as the single source of
truth; require_solstone() exits 75 (absorbed quietly by
handle_runner_exits) when SOL_SUPERVISOR_SPAWNED=1, else keeps the
existing exit-1 + friendly-stderr path for external CLIs.
- tests/test_supervisor_startup.py: 10 new deterministic tests.
- AGENTS.md: tiny fix for an unrelated pre-existing CI blocker
surfaced during `make ci`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+280 -9
+4 -2
AGENTS.md
··· 1 - # solstone coder guide 1 + # solstone Developer Guide 2 2 3 - This file is the **coder guide** for the solstone repository. Read it before writing code. 3 + This file is the **developer guide** for the solstone repository. Read it before writing code. 4 4 5 5 Audience: 6 6 7 7 - **Coders** (cwd = repo root, editing `observe/`, `think/`, `convey/`, `apps/`, `talent/`, `tests/`) — you're in the right place. 8 8 - **Cogitate talents** (cwd = `journal/`, running inside the live system) — your entry is `talent/journal/SKILL.md`, installed into `journal/.claude/skills/journal/` and `journal/.agents/skills/journal/`. 9 9 - **Operators** debugging a running system — see `docs/DOCTOR.md`. 10 + 11 + For the journal-side runtime entry point, see `journal/AGENTS.md`. 10 12 11 13 `CLAUDE.md` and `GEMINI.md` at the repo root are symlinks to this file. 12 14
+197
tests/test_supervisor_startup.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import importlib 5 + from types import SimpleNamespace 6 + from unittest import mock 7 + 8 + import pytest 9 + 10 + import think.utils as utils 11 + 12 + 13 + def test_task_queue_defers_submit_when_not_ready(monkeypatch): 14 + mod = importlib.import_module("think.supervisor") 15 + queue = mod.TaskQueue(on_queue_change=None, ready=False) 16 + 17 + started = [] 18 + 19 + def fake_thread_start(self): 20 + started.append(self._args) 21 + 22 + monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 23 + 24 + ref = queue.submit( 25 + ["sol", "indexer", "--rescan"], ref="pending-ref", day="20260418" 26 + ) 27 + 28 + assert ref == "pending-ref" 29 + assert started == [] 30 + assert queue._pending == [ 31 + { 32 + "refs": ["pending-ref"], 33 + "cmd": ["sol", "indexer", "--rescan"], 34 + "day": "20260418", 35 + } 36 + ] 37 + assert queue.collect_queue_counts() == {"pending": 1} 38 + 39 + 40 + def test_task_queue_set_ready_drains_in_submission_order(monkeypatch): 41 + mod = importlib.import_module("think.supervisor") 42 + queue = mod.TaskQueue(on_queue_change=None, ready=False) 43 + 44 + started = [] 45 + 46 + def fake_thread_start(self): 47 + started.append(self._args) 48 + 49 + monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 50 + 51 + queue.submit(["sol", "indexer", "--rescan"], ref="ref-1") 52 + queue.submit(["sol", "insight", "20260418"], ref="ref-2") 53 + queue.submit(["sol", "heartbeat"], ref="ref-3") 54 + 55 + queue.set_ready() 56 + 57 + assert [args[0] for args in started] == [["ref-1"], ["ref-2"], ["ref-3"]] 58 + assert [args[1] for args in started] == [ 59 + ["sol", "indexer", "--rescan"], 60 + ["sol", "insight", "20260418"], 61 + ["sol", "heartbeat"], 62 + ] 63 + assert queue._pending == [] 64 + 65 + 66 + def test_task_queue_set_ready_dedupes_same_cmd_in_pending(monkeypatch): 67 + mod = importlib.import_module("think.supervisor") 68 + queue = mod.TaskQueue(on_queue_change=None, ready=False) 69 + 70 + started = [] 71 + 72 + def fake_thread_start(self): 73 + started.append(self._args) 74 + 75 + monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 76 + 77 + queue.submit(["sol", "indexer", "--rescan"], ref="ref-1") 78 + queue.submit(["sol", "indexer", "--rescan"], ref="ref-2") 79 + 80 + queue.set_ready() 81 + 82 + assert len(started) == 1 83 + assert started[0][0] == ["ref-1"] 84 + assert queue._queues["indexer"] == [ 85 + {"refs": ["ref-2"], "cmd": ["sol", "indexer", "--rescan"], "day": None} 86 + ] 87 + 88 + 89 + def test_task_queue_ready_true_default_dispatches_immediately(monkeypatch): 90 + mod = importlib.import_module("think.supervisor") 91 + queue = mod.TaskQueue(on_queue_change=None) 92 + 93 + started = [] 94 + 95 + def fake_thread_start(self): 96 + started.append(self._args) 97 + 98 + monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 99 + 100 + ref = queue.submit(["sol", "indexer", "--rescan"], ref="ready-ref") 101 + 102 + assert ref == "ready-ref" 103 + assert len(started) == 1 104 + assert started[0][0] == ["ready-ref"] 105 + assert queue._pending == [] 106 + 107 + 108 + def test_wait_for_convey_ready_success(caplog): 109 + mod = importlib.import_module("think.supervisor") 110 + caplog.set_level("INFO") 111 + convey_mp = SimpleNamespace(process=SimpleNamespace(poll=lambda: None)) 112 + 113 + with mock.patch( 114 + "think.supervisor.is_solstone_up", 115 + side_effect=[False, False, True], 116 + ) as probe: 117 + assert mod.wait_for_convey_ready(convey_mp, timeout=1.0, interval=0.001) is True 118 + 119 + assert probe.call_count == 3 120 + assert "Convey ready after" in caplog.text 121 + 122 + 123 + def test_wait_for_convey_ready_timeout(caplog): 124 + mod = importlib.import_module("think.supervisor") 125 + caplog.set_level("ERROR") 126 + convey_mp = SimpleNamespace(process=SimpleNamespace(poll=lambda: None)) 127 + ticks = iter([0.0, 0.0, 0.1, 0.2, 0.3, 0.35]) 128 + 129 + with mock.patch("think.supervisor.is_solstone_up", return_value=False): 130 + with mock.patch("think.supervisor.read_service_port", return_value=5015): 131 + with mock.patch("think.supervisor.time.sleep", return_value=None): 132 + with mock.patch( 133 + "think.supervisor.time.monotonic", 134 + side_effect=lambda: next(ticks), 135 + ): 136 + assert ( 137 + mod.wait_for_convey_ready( 138 + convey_mp, 139 + timeout=0.3, 140 + interval=0.05, 141 + ) 142 + is False 143 + ) 144 + 145 + assert "Convey not ready after" in caplog.text 146 + 147 + 148 + def test_wait_for_convey_ready_convey_died(caplog): 149 + mod = importlib.import_module("think.supervisor") 150 + caplog.set_level("ERROR") 151 + convey_mp = SimpleNamespace(process=SimpleNamespace(poll=lambda: -11)) 152 + 153 + with mock.patch("think.supervisor.is_solstone_up") as probe: 154 + assert ( 155 + mod.wait_for_convey_ready(convey_mp, timeout=1.0, interval=0.001) is False 156 + ) 157 + 158 + probe.assert_not_called() 159 + assert "Convey process exited during startup" in caplog.text 160 + 161 + 162 + def test_require_solstone_tempfail_when_supervisor_spawned(monkeypatch, capsys): 163 + monkeypatch.delenv("SOL_SKIP_SUPERVISOR_CHECK", raising=False) 164 + monkeypatch.setenv("SOL_SUPERVISOR_SPAWNED", "1") 165 + 166 + with mock.patch("think.utils.is_solstone_up", return_value=False): 167 + with pytest.raises(SystemExit) as exc_info: 168 + utils.require_solstone() 169 + 170 + assert exc_info.value.code == utils.EXIT_TEMPFAIL 171 + assert capsys.readouterr().err == "" 172 + 173 + 174 + def test_require_solstone_exit1_when_not_supervisor_spawned(monkeypatch, capsys): 175 + monkeypatch.delenv("SOL_SKIP_SUPERVISOR_CHECK", raising=False) 176 + monkeypatch.delenv("SOL_SUPERVISOR_SPAWNED", raising=False) 177 + 178 + with mock.patch("think.utils.is_solstone_up", return_value=False): 179 + with pytest.raises(SystemExit) as exc_info: 180 + utils.require_solstone() 181 + 182 + assert exc_info.value.code == 1 183 + assert ( 184 + capsys.readouterr().err 185 + == "sol: solstone isn't running. Start it with 'sol up' and retry.\n" 186 + ) 187 + 188 + 189 + def test_require_solstone_skip_env_still_honored(monkeypatch): 190 + monkeypatch.setenv("SOL_SKIP_SUPERVISOR_CHECK", "1") 191 + monkeypatch.delenv("SOL_SUPERVISOR_SPAWNED", raising=False) 192 + 193 + with mock.patch( 194 + "think.utils.is_solstone_up", 195 + side_effect=AssertionError("should not run"), 196 + ): 197 + assert utils.require_solstone() is None
+76 -7
think/supervisor.py
··· 25 25 from think.runner import DailyLogWriter 26 26 from think.runner import ManagedProcess as RunnerManagedProcess 27 27 from think.utils import ( 28 + EXIT_TEMPFAIL, 28 29 day_path, 29 30 find_available_port, 30 31 get_journal, 31 32 get_journal_info, 32 33 get_rev, 34 + is_solstone_up, 33 35 now_ms, 36 + read_service_port, 34 37 setup_cli, 35 38 updated_days, 36 39 ) ··· 38 41 DEFAULT_THRESHOLD = 60 39 42 CHECK_INTERVAL = 30 40 43 MAX_UPDATED_CATCHUP = 4 41 - EXIT_TEMPFAIL = 75 # EX_TEMPFAIL: service prerequisites not ready 42 44 TEMPFAIL_DELAY = 15 # seconds to wait before retrying a tempfail exit 43 45 44 46 # Global shutdown flag ··· 137 139 The lock only protects state mutations, never held during I/O operations. 138 140 """ 139 141 140 - def __init__(self, on_queue_change: callable = None): 142 + def __init__(self, on_queue_change: callable = None, ready: bool = True): 141 143 """Initialize task queue. 142 144 143 145 Args: ··· 149 151 ] = {} # command_name -> {"ref": str, "thread": Thread} 150 152 self._queues: dict[str, list] = {} # command_name -> list of {refs, cmd} dicts 151 153 self._active: dict[str, RunnerManagedProcess] = {} # ref -> process 154 + self._pending: list[dict] = [] 155 + self._ready = ready 152 156 self._lock = threading.Lock() 153 157 self._on_queue_change = on_queue_change 154 158 ··· 168 172 return 169 173 170 174 with self._lock: 171 - queue = list(self._queues.get(cmd_name, [])) 172 - entry = self._running.get(cmd_name) 173 - running_ref = entry["ref"] if entry else None 175 + if cmd_name == "pending": 176 + queue = list(self._pending) 177 + running_ref = None 178 + else: 179 + queue = list(self._queues.get(cmd_name, [])) 180 + entry = self._running.get(cmd_name) 181 + running_ref = entry["ref"] if entry else None 174 182 175 183 self._on_queue_change(cmd_name, running_ref, queue) 176 184 ··· 196 204 ref = ref or str(now_ms()) 197 205 cmd_name = self.get_command_name(cmd) 198 206 207 + with self._lock: 208 + if not self._ready: 209 + self._pending.append({"refs": [ref], "cmd": cmd, "day": day}) 210 + should_notify_pending = True 211 + else: 212 + should_notify_pending = False 213 + 214 + if should_notify_pending: 215 + self._notify_queue_change("pending") 216 + return ref 217 + 199 218 should_notify = False 200 219 should_start = False 201 220 ··· 254 273 255 274 return None 256 275 276 + def set_ready(self) -> None: 277 + """Allow buffered tasks to start dispatching through the normal queue path.""" 278 + with self._lock: 279 + if self._ready: 280 + return 281 + self._ready = True 282 + pending = list(self._pending) 283 + self._pending.clear() 284 + 285 + if pending: 286 + self._notify_queue_change("pending") 287 + for entry in pending: 288 + self.submit(entry["cmd"], ref=entry["refs"][0], day=entry.get("day")) 289 + 257 290 def _run_task( 258 291 self, 259 292 refs: list[str], ··· 429 462 def collect_queue_counts(self) -> dict[str, int]: 430 463 """Snapshot per-command queue depths for status reporting.""" 431 464 with self._lock: 432 - return { 465 + counts = { 433 466 cmd_name: len(queue) 434 467 for cmd_name, queue in self._queues.items() 435 468 if queue 436 469 } 470 + if self._pending: 471 + counts["pending"] = len(self._pending) 472 + return counts 437 473 438 474 439 475 # Global task queue instance (initialized in main()) ··· 828 864 time.sleep(0.01) 829 865 830 866 raise RuntimeError("Callosum server failed to create socket within 500ms") 867 + 868 + 869 + def wait_for_convey_ready( 870 + convey_mp, *, timeout: float = 30.0, interval: float = 0.1 871 + ) -> bool: 872 + """Poll until Convey accepts TCP connections, or fail fast on death/timeout.""" 873 + start = time.monotonic() 874 + deadline = start + timeout 875 + while time.monotonic() < deadline: 876 + rc = convey_mp.process.poll() 877 + if rc is not None: 878 + logging.error( 879 + "Convey process exited during startup (rc=%d); continuing into supervise loop", 880 + rc, 881 + ) 882 + return False 883 + if is_solstone_up(timeout=0.1): 884 + logging.info("Convey ready after %.1fs", time.monotonic() - start) 885 + return True 886 + time.sleep(interval) 887 + alive = convey_mp.process.poll() is None 888 + logging.error( 889 + "Convey not ready after %.1fs (port=%s, pid alive=%s); continuing into supervise loop", 890 + time.monotonic() - start, 891 + read_service_port("convey"), 892 + alive, 893 + ) 894 + return False 831 895 832 896 833 897 def stop_callosum_in_process() -> None: ··· 1484 1548 pass 1485 1549 1486 1550 # Initialize task queue with callosum event callback 1487 - _task_queue = TaskQueue(on_queue_change=_emit_queue_event) 1551 + _task_queue = TaskQueue(on_queue_change=_emit_queue_event, ready=False) 1488 1552 1489 1553 # Now start other services (their startup events will be captured) 1490 1554 if _is_remote_mode: ··· 1492 1556 pass 1493 1557 else: 1494 1558 # Local mode: convey first, then sense for file processing 1559 + os.environ["SOL_SUPERVISOR_SPAWNED"] = "1" 1495 1560 if not args.no_convey: 1496 1561 proc, convey_port = start_convey_server( 1497 1562 verbose=args.verbose, debug=args.debug, port=args.port 1498 1563 ) 1499 1564 procs.append(proc) 1565 + wait_for_convey_ready(proc) 1500 1566 # Sense handles file processing 1501 1567 procs.append(start_sense()) 1502 1568 # Cortex for agent execution ··· 1515 1581 scheduler.init(_supervisor_callosum) 1516 1582 scheduler.register_defaults() 1517 1583 routines.init(_supervisor_callosum) 1584 + 1585 + if _task_queue: 1586 + _task_queue.set_ready() 1518 1587 1519 1588 # Show Convey URL if running 1520 1589 if convey_port:
+3
think/utils.py
··· 30 30 DATE_RE = re.compile(r"\d{8}") 31 31 CHRONICLE_DIR = "chronicle" 32 32 DEFAULT_STREAM = "_default" 33 + EXIT_TEMPFAIL = 75 33 34 34 35 35 36 def now_ms() -> int: ··· 952 953 return 953 954 if is_solstone_up(): 954 955 return 956 + if os.environ.get("SOL_SUPERVISOR_SPAWNED") == "1": 957 + sys.exit(EXIT_TEMPFAIL) 955 958 print( 956 959 "sol: solstone isn't running. Start it with 'sol up' and retry.", 957 960 file=sys.stderr,