personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

runner+supervisor: pgid teardown and orphan recovery sweep

The 2026-04-26 cortex meltdown showed that child processes could survive a supervisor restart when only the immediate child was terminated. Move the qnmxy4dj process-group teardown pattern down into the runner/supervisor layer and consolidate supervisor shutdown through ManagedProcess.terminate.

On startup, the supervisor now performs a narrow post-lock sweep for PPID=1 sol workers from the incident class, leaving shell-parented development processes alone.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+256 -30
+56
tests/test_runner.py
··· 4 4 """Tests for think.runner and logs tract integration.""" 5 5 6 6 import os 7 + import signal 8 + import subprocess 9 + import time 7 10 from io import StringIO 11 + from unittest.mock import Mock, call 8 12 9 13 import pytest 10 14 ··· 21 25 # Cleanup 22 26 if "SOLSTONE_JOURNAL" in os.environ: 23 27 del os.environ["SOLSTONE_JOURNAL"] 28 + 29 + 30 + def _managed_for_process(process): 31 + return ManagedProcess( 32 + process=process, 33 + name="test", 34 + log_writer=Mock(), 35 + cmd=["test"], 36 + _threads=[], 37 + ref="ref", 38 + _start_time=time.time(), 39 + _callosum=None, 40 + ) 41 + 42 + 43 + def test_terminate_uses_process_group(monkeypatch): 44 + killpg = Mock() 45 + monkeypatch.setattr("think.runner.os.getpgid", lambda pid: 456) 46 + monkeypatch.setattr("think.runner.os.killpg", killpg) 47 + 48 + graceful_process = Mock() 49 + graceful_process.pid = 123 50 + graceful_process.wait.return_value = -15 51 + graceful_process.returncode = -15 52 + graceful = _managed_for_process(graceful_process) 53 + 54 + assert graceful.terminate(timeout=2) == -15 55 + graceful_process.terminate.assert_called_once_with() 56 + graceful_process.wait.assert_called_once_with(timeout=2) 57 + graceful_process.kill.assert_not_called() 58 + killpg.assert_called_once_with(456, signal.SIGTERM) 59 + 60 + killpg.reset_mock() 61 + timeout_process = Mock() 62 + timeout_process.pid = 124 63 + timeout_process.wait.side_effect = [ 64 + subprocess.TimeoutExpired(cmd=["test"], timeout=2), 65 + -9, 66 + ] 67 + timeout_process.returncode = -9 68 + timeout = _managed_for_process(timeout_process) 69 + 70 + with pytest.raises(subprocess.TimeoutExpired): 71 + timeout.terminate(timeout=2) 72 + 73 + timeout_process.terminate.assert_called_once_with() 74 + timeout_process.kill.assert_called_once_with() 75 + assert timeout_process.wait.call_args_list == [call(timeout=2), call()] 76 + assert killpg.call_args_list == [ 77 + call(456, signal.SIGTERM), 78 + call(456, signal.SIGKILL), 79 + ] 24 80 25 81 26 82 def test_managed_process_has_ref_and_pid(journal_path, mock_callosum):
+88
tests/test_supervisor_orphans.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import signal 5 + 6 + import psutil 7 + 8 + from think import supervisor 9 + 10 + 11 + class FakeProcess: 12 + def __init__(self, pid: int, name: str, ppid: int): 13 + self._info = {"pid": pid, "name": name, "ppid": ppid} 14 + 15 + @property 16 + def info(self): 17 + return self._info 18 + 19 + 20 + class VanishedProcess: 21 + @property 22 + def info(self): 23 + raise psutil.NoSuchProcess(pid=4) 24 + 25 + 26 + def test_find_reparented_sol_workers_filters_name_and_ppid(monkeypatch): 27 + monkeypatch.setattr(supervisor.sys, "platform", "linux") 28 + monkeypatch.setattr( 29 + supervisor.psutil, 30 + "process_iter", 31 + lambda attrs: [ 32 + FakeProcess(100, "sol:cortex", 1), 33 + FakeProcess(101, "sol:cortex", 999), 34 + FakeProcess(102, "bash", 1), 35 + VanishedProcess(), 36 + ], 37 + ) 38 + 39 + assert supervisor._find_reparented_sol_workers() == [(100, "sol:cortex")] 40 + 41 + 42 + def test_reap_orphan_workers_empty_is_quiet(monkeypatch): 43 + logs = [] 44 + monkeypatch.setattr(supervisor.os, "kill", lambda *_args: logs.append("kill")) 45 + monkeypatch.setattr( 46 + supervisor.logging, "warning", lambda *_args: logs.append("log") 47 + ) 48 + monkeypatch.setattr( 49 + supervisor.logging, "exception", lambda *_args: logs.append("log") 50 + ) 51 + 52 + assert supervisor._reap_orphan_workers([]) == 0 53 + assert logs == [] 54 + 55 + 56 + def test_reap_orphan_workers_sigterms_then_sigkills_survivors(monkeypatch): 57 + calls = [] 58 + times = iter([0.0, 0.0, 0.06]) 59 + 60 + monkeypatch.setattr( 61 + supervisor.os, "kill", lambda pid, sig: calls.append((pid, sig)) 62 + ) 63 + monkeypatch.setattr(supervisor.time, "time", lambda: next(times)) 64 + monkeypatch.setattr(supervisor.time, "sleep", lambda _seconds: None) 65 + monkeypatch.setattr(supervisor.psutil, "pid_exists", lambda pid: pid == 100) 66 + 67 + assert ( 68 + supervisor._reap_orphan_workers( 69 + [(100, "sol:cortex"), (101, "sol:sense")], grace=0.05 70 + ) 71 + == 2 72 + ) 73 + assert calls == [ 74 + (100, signal.SIGTERM), 75 + (101, signal.SIGTERM), 76 + (100, signal.SIGKILL), 77 + ] 78 + 79 + 80 + def test_find_reparented_sol_workers_noop_on_macos(monkeypatch): 81 + monkeypatch.setattr(supervisor.sys, "platform", "darwin") 82 + monkeypatch.setattr( 83 + supervisor.psutil, 84 + "process_iter", 85 + lambda _attrs: (_ for _ in ()).throw(AssertionError("unexpected")), 86 + ) 87 + 88 + assert supervisor._find_reparented_sol_workers() == []
+39 -16
think/runner.py
··· 20 20 21 21 import logging 22 22 import os 23 + import signal 23 24 import subprocess 24 25 import threading 25 26 import time ··· 290 291 text=True, 291 292 bufsize=1, 292 293 env=env, 294 + start_new_session=True, 293 295 ) 294 296 except Exception as exc: 295 297 log_writer.close() ··· 386 388 return self.process.poll() is None 387 389 388 390 def terminate(self, timeout: float = 15) -> int: 389 - """Gracefully terminate process with automatic escalation. 391 + """Terminate the managed process and its session group. 390 392 391 - This method handles the full termination sequence in ONE CALL: 392 - 1. Send SIGTERM (graceful shutdown request) 393 - 2. Wait up to `timeout` seconds for process to exit 394 - 3. If still alive, send SIGKILL (force kill) 395 - 4. Wait for final cleanup (max 1 second) 396 - 5. Return exit code 393 + Sends SIGTERM to the immediate child and the process group. Waits up to 394 + `timeout` seconds for graceful exit. If the process is still alive after 395 + `timeout`, escalates to SIGKILL on the group and child, then re-raises 396 + `subprocess.TimeoutExpired` after the kill completes. 397 397 398 398 Args: 399 - timeout: Seconds to wait after SIGTERM before SIGKILL (default: 15) 399 + timeout: Seconds to wait after SIGTERM before SIGKILL (default: 15). 400 400 401 401 Returns: 402 - Exit code (may be negative for signals, e.g., -15 for SIGTERM) 402 + Exit code on graceful termination (may be negative for signals, 403 + e.g., -15 for SIGTERM). 403 404 404 - Example: 405 - exit_code = managed.terminate(timeout=10) # One call, blocks until dead 405 + Raises: 406 + subprocess.TimeoutExpired: Re-raised after SIGKILL when graceful 407 + shutdown did not complete within `timeout`. 406 408 """ 407 409 logger.debug(f"Terminating {self.name} (PID {self.pid})...") 408 410 try: 409 - self.process.terminate() 411 + pgid = os.getpgid(self.process.pid) 412 + except (ProcessLookupError, OSError): 413 + pgid = None 414 + 415 + try: 416 + try: 417 + self.process.terminate() 418 + except (ProcessLookupError, OSError): 419 + pass 420 + if pgid is not None: 421 + try: 422 + os.killpg(pgid, signal.SIGTERM) 423 + except (ProcessLookupError, OSError): 424 + pass 410 425 exit_code = self.process.wait(timeout=timeout) 411 426 logger.debug(f"{self.name} terminated gracefully with code {exit_code}") 412 427 return exit_code ··· 414 429 logger.warning( 415 430 f"{self.name} did not terminate after {timeout}s, force killing..." 416 431 ) 417 - self.process.kill() 418 - exit_code = self.process.wait(timeout=1) 419 - logger.debug(f"{self.name} killed with code {exit_code}") 420 - return exit_code 432 + if pgid is not None: 433 + try: 434 + os.killpg(pgid, signal.SIGKILL) 435 + except (ProcessLookupError, OSError): 436 + pass 437 + try: 438 + self.process.kill() 439 + except (ProcessLookupError, OSError): 440 + pass 441 + self.process.wait() 442 + logger.debug(f"{self.name} killed with code {self.process.returncode}") 443 + raise 421 444 422 445 def cleanup(self) -> None: 423 446 """Wait for output threads to finish and close log file.
+73 -14
think/supervisor.py
··· 43 43 CHECK_INTERVAL = 30 44 44 MAX_UPDATED_CATCHUP = 4 45 45 TEMPFAIL_DELAY = 15 # seconds to wait before retrying a tempfail exit 46 + _ORPHAN_WORKER_NAMES = { 47 + "sol:convey", 48 + "sol:sense", 49 + "sol:cortex", 50 + "sol:link", 51 + "sol:think", 52 + "sol:heartbeat", 53 + } 46 54 47 55 # Global shutdown flag 48 56 shutdown_requested = False 49 57 # Supervisor identity (set in main() once ref is assigned) 50 58 _supervisor_ref: str | None = None 51 59 _supervisor_start: float | None = None 60 + 61 + 62 + def _find_reparented_sol_workers() -> list[tuple[int, str]]: 63 + if sys.platform != "linux": 64 + return [] 65 + 66 + orphans: list[tuple[int, str]] = [] 67 + for proc in psutil.process_iter(["pid", "name", "ppid"]): 68 + try: 69 + info = proc.info 70 + pid = int(info["pid"]) 71 + name = str(info.get("name") or "") 72 + ppid = int(info.get("ppid") or 0) 73 + except (KeyError, TypeError, ValueError): 74 + continue 75 + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): 76 + continue 77 + if name in _ORPHAN_WORKER_NAMES and ppid == 1: 78 + orphans.append((pid, name)) 79 + return orphans 80 + 81 + 82 + def _reap_orphan_workers(orphans: list[tuple[int, str]], grace: float = 5.0) -> int: 83 + if not orphans: 84 + return 0 85 + 86 + reaped = 0 87 + for pid, name in orphans: 88 + try: 89 + logging.warning("Terminating orphaned sol worker %s (PID %d)", name, pid) 90 + os.kill(pid, signal.SIGTERM) 91 + reaped += 1 92 + except ProcessLookupError: 93 + reaped += 1 94 + except (PermissionError, OSError): 95 + logging.exception( 96 + "Failed to terminate orphaned sol worker %s (PID %d)", name, pid 97 + ) 98 + 99 + deadline = time.time() + grace 100 + while time.time() < deadline: 101 + if all(not psutil.pid_exists(pid) for pid, _name in orphans): 102 + return reaped 103 + time.sleep(0.1) 104 + 105 + for pid, name in orphans: 106 + if not psutil.pid_exists(pid): 107 + continue 108 + try: 109 + logging.warning("Killing orphaned sol worker %s (PID %d)", name, pid) 110 + os.kill(pid, signal.SIGKILL) 111 + except ProcessLookupError: 112 + continue 113 + except (PermissionError, OSError): 114 + logging.exception( 115 + "Failed to kill orphaned sol worker %s (PID %d)", name, pid 116 + ) 117 + return reaped 52 118 53 119 54 120 class CallosumLogHandler(logging.Handler): ··· 1554 1620 # Written here, not at _supervisor_start, to minimize drift from psutil create_time(). 1555 1621 start_time_path.write_text(str(time.time())) 1556 1622 logging.info("Singleton lock acquired (PID %d)", os.getpid()) 1623 + orphans = _find_reparented_sol_workers() 1624 + _reap_orphan_workers(orphans) 1557 1625 1558 1626 # Set up signal handlers 1559 1627 signal.signal(signal.SIGINT, handle_shutdown) ··· 1728 1796 1729 1797 def _stop_process(managed: ManagedProcess) -> None: 1730 1798 name = managed.name 1731 - proc = managed.process 1732 - logging.info(f"Stopping {name}...") 1799 + logging.info("Stopping %s...", name) 1733 1800 print(f" Stopping {name}...", end="", flush=True) 1734 1801 try: 1735 - proc.terminate() 1736 - except Exception: 1737 - pass 1738 - try: 1739 1802 timeout = getattr(managed, "shutdown_timeout", 15) 1740 - proc.wait(timeout=timeout) 1803 + managed.terminate(timeout=timeout) 1741 1804 print(" done", flush=True) 1742 1805 except subprocess.TimeoutExpired: 1743 - logging.warning(f"{name} did not terminate gracefully, killing...") 1806 + logging.warning("%s did not terminate gracefully, killing...", name) 1744 1807 print(" timeout, forcing kill...", flush=True) 1745 - try: 1746 - proc.kill() 1747 - proc.wait(timeout=1) 1748 - except Exception: 1749 - pass 1750 - managed.cleanup() 1808 + finally: 1809 + managed.cleanup() 1751 1810 1752 1811 # Stop services in reverse order 1753 1812 for managed in reversed(procs):