personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(supervisor): self-bootstrap PATH and surface stopped tasks immediately

PATH self-bootstrap: follow-up to the SIGTTIN fix in afd33f2d. main() now prepends os.path.dirname(sys.executable) to PATH before any child spawn, so the supervisor can launch `sol` children even when the operator's interactive PATH does not include the venv bin dir. It is idempotent, so repeated calls do not duplicate the entry.

Stopped-task detection: follow-up to the SIGTTIN fix in afd33f2d. TaskQueue.enforce_deadlines now polls each active process's psutil status. Tasks observed in STOPPED or TRACING_STOP state for two consecutive ticks are terminated via the existing termination thread (reason="stopped"), reusing the _cap_terminated set so the eventual exit is recorded as exit_status="timeout". A debounce tick avoids racing on transient stop/cont, and operators get a distinct "was stopped (state=...)" warning alongside the existing cap-path warning.

Co-Authored-By: Codex <codex@openai.com>

+121
+70
tests/test_supervisor.py
··· 4 4 import importlib 5 5 import io 6 6 import json 7 + import logging 7 8 import os 9 + import signal 8 10 import subprocess 9 11 import sys 10 12 import threading 13 + import time 11 14 from unittest.mock import MagicMock 12 15 13 16 import psutil ··· 594 597 self.cleanup = MagicMock() 595 598 596 599 600 + def test_ensure_venv_bin_on_path_prepends_when_missing(monkeypatch): 601 + mod = importlib.import_module("think.supervisor") 602 + monkeypatch.setenv("PATH", "/usr/bin") 603 + monkeypatch.setattr(sys, "executable", "/fake/venv/bin/python3") 604 + 605 + mod._ensure_venv_bin_on_path() 606 + 607 + parts = os.environ["PATH"].split(os.pathsep) 608 + assert parts[0] == "/fake/venv/bin" 609 + assert "/usr/bin" in parts[1:] 610 + 611 + 612 + def test_ensure_venv_bin_on_path_idempotent(monkeypatch): 613 + mod = importlib.import_module("think.supervisor") 614 + monkeypatch.setenv("PATH", "/usr/bin") 615 + monkeypatch.setattr(sys, "executable", "/fake/venv/bin/python3") 616 + 617 + mod._ensure_venv_bin_on_path() 618 + mod._ensure_venv_bin_on_path() 619 + 620 + parts = os.environ["PATH"].split(os.pathsep) 621 + assert parts.count("/fake/venv/bin") == 1 622 + 623 + 597 624 def test_taskqueue_set_cap_records_cap(): 598 625 mod = importlib.import_module("think.supervisor") 599 626 queue = mod.TaskQueue(on_queue_change=None) ··· 923 950 "Task import (cmd=sol import --sync plaud --save, ref=ref-1) exceeded " 924 951 "max_runtime of 50s (elapsed=100s); terminating" 925 952 ) in caplog.text 953 + 954 + 955 + def test_enforce_deadlines_terminates_stopped_task(caplog, monkeypatch): 956 + mod = importlib.import_module("think.supervisor") 957 + proc = subprocess.Popen(["sh", "-c", "kill -STOP $$; sleep 60"]) 958 + try: 959 + child = psutil.Process(proc.pid) 960 + for _ in range(30): 961 + if child.status() == psutil.STATUS_STOPPED: 962 + break 963 + time.sleep(0.1) 964 + else: 965 + pytest.fail("subprocess did not enter stopped state") 966 + 967 + queue = mod.TaskQueue(on_queue_change=None) 968 + managed = _TaskManagedStub(cmd=["sleep"], start_time=time.time()) 969 + managed.process.pid = proc.pid 970 + queue._caps["sleep"] = 60 971 + queue._active["ref-1"] = managed 972 + terminate = MagicMock() 973 + monkeypatch.setattr(mod, "_start_termination_thread", terminate) 974 + caplog.set_level(logging.WARNING) 975 + 976 + queue.enforce_deadlines(time.time()) 977 + terminate.assert_not_called() 978 + 979 + queue.enforce_deadlines(time.time()) 980 + 981 + terminate.assert_called_once_with( 982 + "ref-1", managed, timeout=2.0, reason="stopped" 983 + ) 984 + assert "stopped" in caplog.text 985 + finally: 986 + try: 987 + os.kill(proc.pid, signal.SIGCONT) 988 + except ProcessLookupError: 989 + pass 990 + try: 991 + proc.terminate() 992 + proc.wait(timeout=5) 993 + except subprocess.TimeoutExpired: 994 + proc.kill() 995 + proc.wait(timeout=5) 926 996 927 997 928 998 def test_terminate_managed_logs_timeout(caplog):
+51
think/supervisor.py
··· 44 44 CHECK_INTERVAL = 30 45 45 MAX_UPDATED_CATCHUP = 4 46 46 TEMPFAIL_DELAY = 15 # seconds to wait before retrying a tempfail exit 47 + STOPPED_TICKS_THRESHOLD = 2 47 48 logger = logging.getLogger(__name__) 48 49 _SERVICE_LIFECYCLE_VERBS = { 49 50 "start", ··· 199 200 self._active: dict[str, RunnerManagedProcess] = {} # ref -> process 200 201 self._history: deque[dict[str, Any]] = deque(maxlen=100) 201 202 self._cap_terminated: set[str] = set() 203 + self._stopped_ticks: dict[str, int] = {} 202 204 self._caps: dict[str, int] = {} 203 205 self._pending: list[dict] = [] 204 206 self._ready = ready ··· 380 382 self._cap_terminated.add(ref) 381 383 _start_termination_thread(ref, managed, timeout=2.0, reason="cap") 382 384 385 + for ref, managed in list(self._active.items()): 386 + if ref in self._cap_terminated: 387 + continue 388 + 389 + try: 390 + state = psutil.Process(managed.process.pid).status() 391 + except (psutil.NoSuchProcess, psutil.AccessDenied): 392 + self._stopped_ticks.pop(ref, None) 393 + continue 394 + 395 + if state in (psutil.STATUS_STOPPED, psutil.STATUS_TRACING_STOP): 396 + ticks = self._stopped_ticks.get(ref, 0) + 1 397 + self._stopped_ticks[ref] = ticks 398 + if ticks >= STOPPED_TICKS_THRESHOLD: 399 + cmd_name = self.get_command_name(managed.cmd) 400 + logging.warning( 401 + "Task %s (cmd=%s, ref=%s) was stopped (state=%s) " 402 + "for %d consecutive ticks; terminating", 403 + cmd_name, 404 + " ".join(managed.cmd), 405 + ref, 406 + state, 407 + ticks, 408 + ) 409 + self._cap_terminated.add(ref) 410 + _start_termination_thread( 411 + ref, managed, timeout=2.0, reason="stopped" 412 + ) 413 + self._stopped_ticks.pop(ref, None) 414 + else: 415 + self._stopped_ticks.pop(ref, None) 416 + 383 417 def set_ready(self) -> None: 384 418 """Allow buffered tasks to start dispatching through the normal queue path.""" 385 419 with self._lock: ··· 490 524 if primary_ref in self._cap_terminated: 491 525 exit_status = "timeout" 492 526 self._cap_terminated.discard(primary_ref) 527 + self._stopped_ticks.pop(primary_ref, None) 493 528 ended_at = time.time() 494 529 self._history.append( 495 530 { ··· 1682 1717 raise KeyboardInterrupt 1683 1718 1684 1719 1720 + def _ensure_venv_bin_on_path() -> None: 1721 + """Prepend the venv bin dir (sibling of sys.executable) to PATH if absent. 1722 + 1723 + Idempotent — safe to call repeatedly. Lets the supervisor spawn `sol` and 1724 + other venv-installed entry points even when the operator's shell PATH does 1725 + not include the venv bin dir. 1726 + """ 1727 + venv_bin = os.path.dirname(sys.executable) 1728 + parts = os.environ.get("PATH", "").split(os.pathsep) 1729 + if parts and parts[0] == venv_bin: 1730 + return 1731 + parts = [venv_bin] + [p for p in parts if p != venv_bin] 1732 + os.environ["PATH"] = os.pathsep.join(parts) 1733 + 1734 + 1685 1735 def main() -> None: 1686 1736 parser = parse_args() 1687 1737 ··· 1689 1739 journal_info = get_journal_info() 1690 1740 1691 1741 args = setup_cli(parser) 1742 + _ensure_venv_bin_on_path() 1692 1743 1693 1744 journal_path = _get_journal_path() 1694 1745