personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1964 lines 66 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4from __future__ import annotations 5 6import argparse 7import asyncio 8import concurrent.futures 9import json 10import logging 11import os 12import signal 13import subprocess 14import sys 15import tempfile 16import threading 17import time 18from collections import deque 19from datetime import datetime 20from pathlib import Path 21from typing import Any 22 23import psutil 24 25from think import routines, scheduler 26from think.callosum import CallosumConnection, CallosumServer 27from think.maint import run_pending_tasks 28from think.runner import ManagedProcess as RunnerManagedProcess 29from think.utils import ( 30 EXIT_TEMPFAIL, 31 day_path, 32 find_available_port, 33 get_journal, 34 get_journal_info, 35 get_rev, 36 is_solstone_up, 37 now_ms, 38 read_service_port, 39 setup_cli, 40 updated_days, 41) 42 43DEFAULT_THRESHOLD = 60 44CHECK_INTERVAL = 30 45MAX_UPDATED_CATCHUP = 4 46TEMPFAIL_DELAY = 15 # seconds to wait before retrying a tempfail exit 47logger = logging.getLogger(__name__) 48_SERVICE_LIFECYCLE_VERBS = { 49 "start", 50 "stop", 51 "restart", 52 "status", 53 "install", 54 "uninstall", 55 "logs", 56} 57 58# Global shutdown flag 59shutdown_requested = False 60# Supervisor identity (set in main() once ref is assigned) 61_supervisor_ref: str | None = None 62_supervisor_start: float | None = None 63 64 65def _parse_self_cgroup_path(text: str) -> str | None: 66 """Parse cgroup v2 line `0::/path` from /proc/self/cgroup contents.""" 67 for line in text.splitlines(): 68 if line.startswith("0::"): 69 return line[3:].strip() or None 70 return None 71 72 73def _read_self_cgroup_path() -> str | None: 74 try: 75 return _parse_self_cgroup_path(Path("/proc/self/cgroup").read_text()) 76 except OSError: 77 return None 78 79 80def _is_systemd_service_cgroup(cgroup_path: str | None) -> bool: 81 # Only sweep when this supervisor owns the cgroup; dev shells share scopes. 82 if not cgroup_path: 83 return False 84 from think.service import SYSTEMD_UNIT 85 86 return f"{SYSTEMD_UNIT}.service" in cgroup_path 87 88 89def _sweep_cgroup_at_startup(grace: float = 5.0) -> int: 90 if sys.platform != "linux": 91 return 0 92 cgroup_path = _read_self_cgroup_path() 93 if not _is_systemd_service_cgroup(cgroup_path): 94 return 0 95 procs_file = Path("/sys/fs/cgroup") / cgroup_path.lstrip("/") / "cgroup.procs" 96 try: 97 pids = [int(pid) for pid in procs_file.read_text().split() if pid.strip()] 98 except OSError: 99 return 0 100 101 own_pid = os.getpid() 102 targets = [pid for pid in pids if pid != own_pid] 103 if not targets: 104 return 0 105 106 logger.info("cgroup sweep: terminating %d residual process(es)", len(targets)) 107 for pid in targets: 108 try: 109 os.kill(pid, signal.SIGTERM) 110 except ProcessLookupError: 111 pass 112 113 deadline = time.time() + grace 114 while time.time() < deadline: 115 if not any(psutil.pid_exists(pid) for pid in targets): 116 break 117 time.sleep(0.2) 118 119 survivors = [pid for pid in targets if psutil.pid_exists(pid)] 120 for pid in survivors: 121 try: 122 os.kill(pid, signal.SIGKILL) 123 except ProcessLookupError: 124 pass 125 return len(targets) 126 127 128class CallosumLogHandler(logging.Handler): 129 """Logging handler that emits log records as callosum ``logs`` tract events. 130 131 Silently drops events on any error — callosum mirroring is best-effort. 132 """ 133 134 def __init__(self, conn: CallosumConnection, ref: str): 135 super().__init__() 136 self._conn = conn 137 self._ref = ref 138 self._pid = os.getpid() 139 self._emitting = False 140 141 def emit(self, record: logging.LogRecord) -> None: 142 if self._emitting: 143 return 144 self._emitting = True 145 try: 146 self._conn.emit( 147 "logs", 148 "line", 149 ref=self._ref, 150 name="supervisor", 151 pid=self._pid, 152 stream="log", 153 line=self.format(record), 154 ) 155 except Exception: 156 pass 157 finally: 158 self._emitting = False 159 160 161class SupervisorArgumentParser(argparse.ArgumentParser): 162 def error(self, message: str) -> None: 163 mistaken = next( 164 (arg for arg in sys.argv[1:] if arg in _SERVICE_LIFECYCLE_VERBS), 165 None, 166 ) 167 if mistaken: 168 self.exit( 169 2, 170 "sol supervisor is the server-launch command (takes a port). " 171 "For lifecycle, use: sol service <verb>. " 172 f"Did you mean: sol service {mistaken} ?\n", 173 ) 174 super().error(message) 175 176 177class TaskQueue: 178 """Manages on-demand task execution with per-command serialization. 179 180 Tasks are serialized by command name - only one task per command runs at a time. 181 Additional requests for the same command are queued (deduped by exact cmd match). 182 Multiple callers requesting the same work have their refs coalesced so all get 183 notified when the task completes. 184 185 The lock only protects state mutations, never held during I/O operations. 186 """ 187 188 def __init__(self, on_queue_change: callable = None, ready: bool = True): 189 """Initialize task queue. 190 191 Args: 192 on_queue_change: Optional callback(cmd_name, running_ref, queue_entries) 193 called after queue state changes. Called outside lock. 194 """ 195 self._running: dict[ 196 str, dict 197 ] = {} # command_name -> {"ref": str, "thread": Thread} 198 self._queues: dict[str, list] = {} # command_name -> list of {refs, cmd} dicts 199 self._active: dict[str, RunnerManagedProcess] = {} # ref -> process 200 self._history: deque[dict[str, Any]] = deque(maxlen=100) 201 self._cap_terminated: set[str] = set() 202 self._caps: dict[str, int] = {} 203 self._pending: list[dict] = [] 204 self._ready = ready 205 self._lock = threading.Lock() 206 self._on_queue_change = on_queue_change 207 208 @staticmethod 209 def get_command_name(cmd: list[str]) -> str: 210 """Extract command name from cmd array for queue serialization. 211 212 For 'sol X' commands, returns X. Otherwise returns cmd[0] basename. 213 """ 214 if cmd and cmd[0] == "sol" and len(cmd) > 1: 215 return cmd[1] 216 return Path(cmd[0]).name if cmd else "unknown" 217 218 def _notify_queue_change(self, cmd_name: str) -> None: 219 """Notify listener of queue state change (called outside lock).""" 220 if not self._on_queue_change: 221 return 222 223 with self._lock: 224 if cmd_name == "pending": 225 queue = list(self._pending) 226 running_ref = None 227 else: 228 queue = list(self._queues.get(cmd_name, [])) 229 entry = self._running.get(cmd_name) 230 running_ref = entry["ref"] if entry else None 231 232 self._on_queue_change(cmd_name, running_ref, queue) 233 234 def submit( 235 self, 236 cmd: list[str], 237 ref: str | None = None, 238 day: str | None = None, 239 scheduler_name: str | None = None, 240 ) -> str | None: 241 """Submit a task for execution. 242 243 If no task of this command type is running, starts immediately. 244 Otherwise queues (deduped by exact cmd match, refs coalesced). 245 246 Args: 247 cmd: Command to execute 248 ref: Optional caller-provided ref for tracking 249 day: Optional day override (YYYYMMDD) for log placement 250 251 Returns: 252 ref if task was started/queued, None if already tracked (no change) 253 """ 254 ref = ref or str(now_ms()) 255 cmd_name = self.get_command_name(cmd) 256 257 with self._lock: 258 if not self._ready: 259 self._pending.append( 260 { 261 "refs": [ref], 262 "cmd": cmd, 263 "day": day, 264 "scheduler_name": scheduler_name, 265 } 266 ) 267 should_notify_pending = True 268 else: 269 should_notify_pending = False 270 271 if should_notify_pending: 272 self._notify_queue_change("pending") 273 return ref 274 275 should_notify = False 276 should_start = False 277 278 with self._lock: 279 # Detect stale running state (task thread died without clearing queue) 280 if cmd_name in self._running: 281 stale = self._running[cmd_name] 282 if stale["thread"] is not None and not stale["thread"].is_alive(): 283 logging.warning( 284 f"Clearing stale {cmd_name} queue " 285 f"(thread dead, ref={stale['ref']})" 286 ) 287 self._running.pop(cmd_name) 288 289 if cmd_name in self._running: 290 # Command already running - queue or coalesce 291 queue = self._queues.setdefault(cmd_name, []) 292 existing = next((q for q in queue if q["cmd"] == cmd), None) 293 if existing: 294 if ref not in existing["refs"]: 295 existing["refs"].append(ref) 296 logging.info( 297 f"Added ref {ref} to queued task {cmd_name} " 298 f"(refs: {len(existing['refs'])})" 299 ) 300 should_notify = True 301 else: 302 logging.debug(f"Ref already tracked for queued task: {ref}") 303 return None 304 else: 305 queue.append( 306 { 307 "refs": [ref], 308 "cmd": cmd, 309 "day": day, 310 "scheduler_name": scheduler_name, 311 } 312 ) 313 logging.info( 314 f"Queued task {cmd_name}: {' '.join(cmd)} ref={ref} " 315 f"(queue: {len(queue)})" 316 ) 317 should_notify = True 318 else: 319 # Not running - mark as running and start 320 # Thread is set to None here; _run_task registers it on entry 321 self._running[cmd_name] = { 322 "ref": ref, 323 "thread": None, 324 "scheduler_name": scheduler_name, 325 } 326 should_start = True 327 328 # Notify outside lock 329 if should_notify: 330 self._notify_queue_change(cmd_name) 331 return ref 332 333 # Start task outside lock 334 if should_start: 335 threading.Thread( 336 target=self._run_task, 337 args=([ref], cmd, cmd_name, day, scheduler_name), 338 daemon=True, 339 ).start() 340 return ref 341 342 return None 343 344 def set_cap(self, cmd_name: str, seconds: int) -> None: 345 """Set a max runtime cap in seconds for a queued command name.""" 346 with self._lock: 347 self._caps[cmd_name] = seconds 348 349 def get_active_by_cmd_name(self, name: str) -> str | None: 350 """Return the first active ref matching a command name.""" 351 with self._lock: 352 for ref, managed in self._active.items(): 353 if self.get_command_name(managed.cmd) == name: 354 return ref 355 return None 356 357 def enforce_deadlines(self, now: float) -> None: 358 """Enforce configured task runtime caps without blocking the supervisor tick.""" 359 with self._lock: 360 for ref, managed in list(self._active.items()): 361 cmd_name = self.get_command_name(managed.cmd) 362 cap = self._caps.get(cmd_name) 363 if not cap: 364 continue 365 366 elapsed = now - managed.start_time 367 if elapsed <= cap: 368 continue 369 370 elapsed_seconds = int(elapsed) 371 logging.warning( 372 "Task %s (cmd=%s, ref=%s) exceeded max_runtime of %ds " 373 "(elapsed=%ds); terminating", 374 cmd_name, 375 " ".join(managed.cmd), 376 ref, 377 cap, 378 elapsed_seconds, 379 ) 380 self._cap_terminated.add(ref) 381 _start_termination_thread(ref, managed, timeout=2.0, reason="cap") 382 383 def set_ready(self) -> None: 384 """Allow buffered tasks to start dispatching through the normal queue path.""" 385 with self._lock: 386 if self._ready: 387 return 388 self._ready = True 389 pending = list(self._pending) 390 self._pending.clear() 391 392 if pending: 393 self._notify_queue_change("pending") 394 for entry in pending: 395 self.submit( 396 entry["cmd"], 397 ref=entry["refs"][0], 398 day=entry.get("day"), 399 scheduler_name=entry.get("scheduler_name"), 400 ) 401 402 def _run_task( 403 self, 404 refs: list[str], 405 cmd: list[str], 406 cmd_name: str, 407 day: str | None = None, 408 scheduler_name: str | None = None, 409 ) -> None: 410 """Execute a task and handle completion. 411 412 Args: 413 refs: List of refs to notify on completion 414 cmd: Command to execute 415 cmd_name: Command name for queue management 416 day: Optional day override (YYYYMMDD) for log placement 417 """ 418 # Register this thread for stale-queue detection 419 with self._lock: 420 if cmd_name in self._running and self._running[cmd_name]["ref"] == refs[0]: 421 self._running[cmd_name]["thread"] = threading.current_thread() 422 423 callosum = CallosumConnection() 424 managed = None 425 primary_ref = refs[0] 426 service = cmd_name 427 exit_status = "error" 428 429 try: 430 callosum.start() 431 logging.info(f"Starting task {primary_ref}: {' '.join(cmd)}") 432 433 managed = RunnerManagedProcess.spawn( 434 cmd, ref=primary_ref, callosum=callosum, day=day 435 ) 436 with self._lock: 437 self._active[primary_ref] = managed 438 439 callosum.emit( 440 "supervisor", 441 "started", 442 service=service, 443 pid=managed.pid, 444 ref=primary_ref, 445 ) 446 447 exit_code = managed.wait() 448 exit_status = "ok" if exit_code == 0 else "error" 449 450 for ref in refs: 451 callosum.emit( 452 "supervisor", 453 "stopped", 454 service=service, 455 pid=managed.pid, 456 ref=ref, 457 exit_code=exit_code, 458 ) 459 460 if exit_code == 0: 461 logging.info(f"Task {cmd_name} ({primary_ref}) finished successfully") 462 else: 463 logging.warning( 464 f"Task {cmd_name} ({primary_ref}) failed with exit code {exit_code}" 465 ) 466 467 except Exception as e: 468 if isinstance(e, subprocess.TimeoutExpired): 469 exit_status = "timeout" 470 logging.exception( 471 f"Task {cmd_name} ({primary_ref}) encountered exception: {e}" 472 ) 473 for ref in refs: 474 callosum.emit( 475 "supervisor", 476 "stopped", 477 service=service, 478 pid=managed.pid if managed else 0, 479 ref=ref, 480 exit_code=-1, 481 ) 482 finally: 483 try: 484 if managed: 485 managed.cleanup() 486 except Exception: 487 logging.exception(f"Task {cmd_name} ({primary_ref}): cleanup failed") 488 with self._lock: 489 self._active.pop(primary_ref, None) 490 if primary_ref in self._cap_terminated: 491 exit_status = "timeout" 492 self._cap_terminated.discard(primary_ref) 493 ended_at = time.time() 494 self._history.append( 495 { 496 "name": cmd_name, 497 "cmd": list(cmd), 498 "ref": primary_ref, 499 "ended_at": ended_at, 500 "exit_status": exit_status, 501 "scheduler_name": scheduler_name, 502 } 503 ) 504 if scheduler_name: 505 try: 506 _record_scheduler_completion( 507 scheduler_name, 508 ended_at=ended_at, 509 exit_status=exit_status, 510 ref=primary_ref, 511 cmd=cmd, 512 ) 513 except Exception as exc: 514 logger.warning("scheduler completion writeback failed: %s", exc) 515 try: 516 callosum.stop() 517 except Exception: 518 logging.exception( 519 f"Task {cmd_name} ({primary_ref}): callosum stop failed" 520 ) 521 self._process_next(cmd_name) 522 523 def _process_next(self, cmd_name: str) -> None: 524 """Process next queued task after completion.""" 525 next_cmd = None 526 refs = None 527 day = None 528 scheduler_name = None 529 530 with self._lock: 531 queue = self._queues.get(cmd_name, []) 532 if queue: 533 entry = queue.pop(0) 534 refs = entry["refs"] 535 next_cmd = entry["cmd"] 536 day = entry.get("day") 537 scheduler_name = entry.get("scheduler_name") 538 # Thread is set to None here; _run_task registers it on entry 539 self._running[cmd_name] = { 540 "ref": refs[0], 541 "thread": None, 542 "scheduler_name": scheduler_name, 543 } 544 logging.info( 545 f"Dequeued task {cmd_name}: {' '.join(next_cmd)} refs={refs} " 546 f"(remaining: {len(queue)})" 547 ) 548 else: 549 self._running.pop(cmd_name, None) 550 551 # Notify and spawn outside lock 552 self._notify_queue_change(cmd_name) 553 if next_cmd: 554 threading.Thread( 555 target=self._run_task, 556 args=(refs, next_cmd, cmd_name, day, scheduler_name), 557 daemon=True, 558 ).start() 559 560 def cancel(self, ref: str) -> bool: 561 """Cancel a running task. 562 563 Returns: 564 True if task was found and terminated, False otherwise 565 """ 566 if ref not in self._active: 567 logging.warning(f"Cannot cancel task {ref}: not found") 568 return False 569 570 managed = self._active[ref] 571 if not managed.is_running(): 572 logging.debug(f"Task {ref} already finished") 573 return False 574 575 logging.info(f"Cancelling task {ref}...") 576 managed.terminate() 577 return True 578 579 def shutdown(self, timeout: float = 10.0) -> int: 580 with self._lock: 581 active = list(self._active.items()) 582 if not active: 583 return 0 584 585 def _terminate(item: tuple[str, RunnerManagedProcess]) -> None: 586 ref, managed = item 587 try: 588 managed.terminate(timeout=timeout) 589 except subprocess.TimeoutExpired: 590 logger.warning( 591 "task %s did not exit within %ss; KILL sent", ref, timeout 592 ) 593 except OSError as exc: 594 logger.warning("task %s terminate raised: %s", ref, exc) 595 596 with concurrent.futures.ThreadPoolExecutor(max_workers=len(active)) as executor: 597 list(executor.map(_terminate, active)) 598 return len(active) 599 600 def get_status(self, ref: str) -> dict: 601 """Get status of a task.""" 602 if ref not in self._active: 603 return {"status": "not_found"} 604 605 managed = self._active[ref] 606 return { 607 "status": "running" if managed.is_running() else "finished", 608 "pid": managed.pid, 609 "returncode": managed.returncode, 610 "log_path": str(managed.log_writer.path), 611 "cmd": managed.cmd, 612 } 613 614 def collect_task_status(self) -> list[dict]: 615 """Collect status of all running tasks for supervisor status.""" 616 now = time.time() 617 tasks = [] 618 for ref, managed in self._active.items(): 619 if managed.is_running(): 620 duration = int(now - managed.start_time) 621 cmd_name = TaskQueue.get_command_name(managed.cmd) 622 tasks.append( 623 { 624 "ref": ref, 625 "name": cmd_name, 626 "duration_seconds": duration, 627 } 628 ) 629 return tasks 630 631 def collect_queue_counts(self) -> dict[str, int]: 632 """Snapshot per-command queue depths for status reporting.""" 633 with self._lock: 634 counts = { 635 cmd_name: len(queue) 636 for cmd_name, queue in self._queues.items() 637 if queue 638 } 639 if self._pending: 640 counts["pending"] = len(self._pending) 641 return counts 642 643 644# Global task queue instance (initialized in main()) 645_task_queue: TaskQueue | None = None 646 647# Global supervisor callosum connection for event emissions 648_supervisor_callosum: CallosumConnection | None = None 649 650# Global reference to managed processes for restart control 651_managed_procs: list[RunnerManagedProcess] = [] 652_SERVICE_STATE: dict[str, dict[str, Any]] = {} 653_termination_threads: dict[str, threading.Thread] = {} 654_termination_threads_lock = threading.Lock() 655_SCHEDULER_JSON_LOCK = threading.Lock() 656 657# Global reference to in-process Callosum server 658_callosum_server: CallosumServer | None = None 659_callosum_thread: threading.Thread | None = None 660 661# Track whether running in remote mode (upload-only, no local processing) 662_is_remote_mode: bool = False 663_digest_submitted_this_boot = False 664 665# State for daily processing (tracks day boundary for midnight think trigger) 666_daily_state = { 667 "last_day": None, # Track which day we last processed 668} 669 670# Timeout before flushing stale segments (seconds) 671FLUSH_TIMEOUT = 3600 672 673# State for segment flush (close out dangling agent state after inactivity) 674_flush_state: dict = { 675 "last_segment_ts": 0.0, # Wall-clock time of last observe.observed event 676 "day": None, # Day of last observed segment 677 "segment": None, # Last observed segment key 678 "flushed": False, # Whether flush has already run for current segment 679} 680 681 682def _get_journal_path() -> Path: 683 return Path(get_journal()) 684 685 686def is_supervisor_up() -> bool: 687 """Return True when supervisor.pid and supervisor.start_time identify a live supervisor process for the current journal.""" 688 health_dir = Path(get_journal()) / "health" 689 pid_path = health_dir / "supervisor.pid" 690 try: 691 pid = int(pid_path.read_text().strip()) 692 except FileNotFoundError: 693 return False 694 except (OSError, ValueError): 695 return False 696 697 try: 698 os.kill(pid, 0) 699 except ProcessLookupError: 700 return False 701 except PermissionError: 702 return False 703 except OSError: 704 return False 705 706 start_time_path = health_dir / "supervisor.start_time" 707 try: 708 recorded_start = float(start_time_path.read_text().strip()) 709 except FileNotFoundError: 710 return False 711 except (OSError, ValueError): 712 return False 713 714 try: 715 create_time = psutil.Process(pid).create_time() 716 except psutil.NoSuchProcess: 717 return False 718 except psutil.Error: 719 return False 720 721 tolerance = 1.5 # drift between time.time() and psutil create_time() 722 return abs(recorded_start - create_time) <= tolerance 723 724 725class RestartPolicy: 726 """Track restart attempts and compute backoff delays.""" 727 728 _SCHEDULE = (0, 1, 5) 729 730 def __init__(self) -> None: 731 self.attempts = 0 732 self.last_start = 0.0 733 734 def record_start(self) -> None: 735 self.last_start = time.time() 736 737 def reset_attempts(self) -> None: 738 self.attempts = 0 739 740 def next_delay(self) -> int: 741 delay = self._SCHEDULE[min(self.attempts, len(self._SCHEDULE) - 1)] 742 self.attempts += 1 743 return delay 744 745 746_RESTART_POLICIES: dict[str, RestartPolicy] = {} 747 748 749def _get_restart_policy(name: str) -> RestartPolicy: 750 return _RESTART_POLICIES.setdefault(name, RestartPolicy()) 751 752 753def _launch_process( 754 name: str, 755 cmd: list[str], 756 *, 757 restart: bool = False, 758 shutdown_timeout: int = 15, 759 ref: str | None = None, 760) -> RunnerManagedProcess: 761 # NOTE: All child processes should include -v for verbose logging by default. 762 # This ensures their output is captured in logs for debugging. 763 """Launch process with automatic output logging and restart policy tracking.""" 764 policy: RestartPolicy | None = None 765 if restart: 766 policy = _get_restart_policy(name) 767 768 # Generate ref if not provided 769 ref = ref if ref else str(now_ms()) 770 771 # Use unified runner to spawn process (share supervisor's callosum) 772 try: 773 managed = RunnerManagedProcess.spawn( 774 cmd, ref=ref, callosum=_supervisor_callosum 775 ) 776 except RuntimeError as exc: 777 logging.error(str(exc)) 778 raise 779 780 if policy: 781 policy.record_start() 782 _SERVICE_STATE[name] = { 783 "restart": restart, 784 "shutdown_timeout": shutdown_timeout, 785 } 786 787 # Emit started event 788 if _supervisor_callosum: 789 _supervisor_callosum.emit( 790 "supervisor", 791 "started", 792 service=name, 793 pid=managed.process.pid, 794 ref=managed.ref, 795 ) 796 797 return managed 798 799 800def _terminate_managed( 801 managed: RunnerManagedProcess, timeout: float, *, reason: str 802) -> None: 803 logger.info("Terminating %s for %s", managed.name, reason) 804 try: 805 managed.terminate(timeout=timeout) 806 except subprocess.TimeoutExpired: 807 logger.warning( 808 "%s did not terminate within %.1fs for %s", 809 managed.name, 810 timeout, 811 reason, 812 ) 813 814 815def _start_termination_thread( 816 key: str, managed: RunnerManagedProcess, timeout: float, reason: str 817) -> None: 818 def run() -> None: 819 try: 820 _terminate_managed(managed, timeout, reason=reason) 821 finally: 822 with _termination_threads_lock: 823 if _termination_threads.get(key) is threading.current_thread(): 824 _termination_threads.pop(key, None) 825 826 with _termination_threads_lock: 827 existing = _termination_threads.get(key) 828 if existing and existing.is_alive(): 829 return 830 831 thread = threading.Thread( 832 target=run, 833 daemon=True, 834 name=f"terminate-{key}", 835 ) 836 _termination_threads[key] = thread 837 thread.start() 838 839 840def _stop_process(managed: RunnerManagedProcess) -> None: 841 timeout = _SERVICE_STATE.get(managed.name, {}).get("shutdown_timeout", 15) 842 _terminate_managed(managed, timeout, reason="shutdown") 843 managed.cleanup() 844 845 846def _record_scheduler_completion( 847 scheduler_name: str, 848 *, 849 ended_at: float, 850 exit_status: str, 851 ref: str, 852 cmd: list[str], 853) -> None: 854 health_dir = Path(get_journal()) / "health" 855 health_dir.mkdir(parents=True, exist_ok=True) 856 state_path = health_dir / "scheduler.json" 857 with _SCHEDULER_JSON_LOCK: 858 try: 859 with open(state_path, "r", encoding="utf-8") as file: 860 state = json.load(file) 861 except FileNotFoundError: 862 state = {} 863 except (json.JSONDecodeError, OSError) as exc: 864 logger.warning( 865 "Failed to load scheduler state for completion write: %s", exc 866 ) 867 state = {} 868 869 current = state.get(scheduler_name) 870 if not isinstance(current, dict): 871 current = {} 872 current.update( 873 { 874 "last_run": ended_at, 875 "last_status": exit_status, 876 "last_ref": ref, 877 } 878 ) 879 state[scheduler_name] = current 880 881 fd, tmp_path = tempfile.mkstemp( 882 dir=health_dir, suffix=".tmp", prefix=".scheduler_" 883 ) 884 tmp_file = Path(tmp_path) 885 try: 886 with open(fd, "w", encoding="utf-8") as file: 887 json.dump(state, file, indent=2) 888 tmp_file.replace(state_path) 889 except BaseException: 890 tmp_file.unlink(missing_ok=True) 891 raise 892 893 894def _emit_queue_event(cmd_name: str, running_ref: str, queue: list) -> None: 895 """Emit supervisor.queue event with current queue state for a command. 896 897 This is the callback passed to TaskQueue for queue change notifications. 898 """ 899 if not _supervisor_callosum: 900 return 901 902 _supervisor_callosum.emit( 903 "supervisor", 904 "queue", 905 command=cmd_name, 906 running=running_ref, 907 queued=len(queue), 908 queue=queue, 909 ) 910 911 912def _maybe_submit_startup_digest(*, no_cortex: bool) -> None: 913 """Submit the startup digest once when a local cortex substrate exists.""" 914 global _digest_submitted_this_boot 915 916 if ( 917 _digest_submitted_this_boot 918 or no_cortex 919 or _is_remote_mode 920 or _task_queue is None 921 ): 922 return 923 924 _task_queue.submit(["sol", "call", "identity", "digest"]) 925 _digest_submitted_this_boot = True 926 logging.info("startup: submitted identity digest") 927 928 929def _handle_task_request(message: dict) -> None: 930 """Handle incoming task request from Callosum.""" 931 if message.get("tract") != "supervisor" or message.get("event") != "request": 932 return 933 934 cmd = message.get("cmd") 935 if not cmd: 936 logging.error(f"Invalid task request: missing cmd: {message}") 937 return 938 939 ref = message.get("ref") or str(now_ms()) 940 day = message.get("day") 941 scheduler_name = message.get("scheduler_name") 942 if _task_queue: 943 cmd_name = TaskQueue.get_command_name(cmd) 944 active_ref = _task_queue.get_active_by_cmd_name(cmd_name) 945 if active_ref: 946 with _task_queue._lock: 947 managed = _task_queue._active.get(active_ref) 948 cap = _task_queue._caps.get(cmd_name, 0) 949 runtime = time.time() - managed.start_time if managed else 0 950 reason = "wedged" if cap and runtime > 2 * cap else "still_running" 951 if _supervisor_callosum: 952 _supervisor_callosum.emit( 953 "supervisor", 954 "skipped", 955 reason=reason, 956 ref=ref, 957 active_ref=active_ref, 958 cmd=cmd, 959 scheduler_name=scheduler_name, 960 ) 961 return 962 _task_queue.submit(cmd, ref, day=day, scheduler_name=scheduler_name) 963 964 965def _restart_service(service: str) -> bool: 966 """Terminate a managed service to trigger graceful restart. 967 968 Returns True if the service was found and running, False if not found 969 or already exited. 970 """ 971 for proc in _managed_procs: 972 if proc.name == service: 973 if proc.process.poll() is not None: 974 logging.debug( 975 f"Ignoring restart for {service}: already exited, awaiting auto-restart" 976 ) 977 return False 978 979 state = _SERVICE_STATE.setdefault(service, {}) 980 state["restart"] = True 981 timeout = state.get("shutdown_timeout", 15) 982 983 logging.info(f"Restart requested for {service}, terminating...") 984 985 if _supervisor_callosum: 986 _supervisor_callosum.emit( 987 "supervisor", 988 "restarting", 989 service=service, 990 pid=proc.process.pid, 991 ref=proc.ref, 992 ) 993 994 _start_termination_thread(service, proc, timeout=timeout, reason="restart") 995 return True 996 997 logging.warning(f"Cannot restart {service}: not found in managed processes") 998 return False 999 1000 1001def _handle_supervisor_request(message: dict) -> None: 1002 """Handle incoming supervisor control messages.""" 1003 if message.get("tract") != "supervisor" or message.get("event") != "restart": 1004 return 1005 1006 service = message.get("service") 1007 if not service: 1008 logging.error("Invalid restart request: missing service") 1009 return 1010 if service == "supervisor": 1011 logging.debug("Ignoring restart request for supervisor itself") 1012 return 1013 1014 _restart_service(service) 1015 1016 1017def get_task_status(ref: str) -> dict: 1018 """Get status of a task. 1019 1020 Args: 1021 ref: Task correlation ID 1022 1023 Returns: 1024 Dict with status info, or {"status": "not_found"} if task doesn't exist 1025 """ 1026 if _task_queue: 1027 return _task_queue.get_status(ref) 1028 return {"status": "not_found"} 1029 1030 1031def collect_status(procs: list[RunnerManagedProcess]) -> dict: 1032 """Collect current supervisor status for broadcasting.""" 1033 now = time.time() 1034 1035 # Running services 1036 services = [] 1037 running_names = set() 1038 for proc in procs: 1039 if proc.process.poll() is None: # Still running 1040 policy = _get_restart_policy(proc.name) 1041 uptime = int(now - policy.last_start) if policy.last_start else 0 1042 services.append( 1043 { 1044 "name": proc.name, 1045 "ref": proc.ref, 1046 "pid": proc.process.pid, 1047 "uptime_seconds": uptime, 1048 } 1049 ) 1050 running_names.add(proc.name) 1051 1052 # Prepend supervisor itself 1053 if _supervisor_ref and _supervisor_start: 1054 services.insert( 1055 0, 1056 { 1057 "name": "supervisor", 1058 "ref": _supervisor_ref, 1059 "pid": os.getpid(), 1060 "uptime_seconds": int(now - _supervisor_start), 1061 }, 1062 ) 1063 1064 # Crashed services (in restart backoff) 1065 crashed = [] 1066 for name, policy in _RESTART_POLICIES.items(): 1067 if name not in running_names and policy.attempts > 0: 1068 crashed.append( 1069 { 1070 "name": name, 1071 "restart_attempts": policy.attempts, 1072 } 1073 ) 1074 1075 # Running tasks 1076 tasks = _task_queue.collect_task_status() if _task_queue else [] 1077 queues = _task_queue.collect_queue_counts() if _task_queue else {} 1078 1079 # Scheduled tasks 1080 schedules = scheduler.collect_status() 1081 # Connected callosum clients 1082 callosum_clients = _callosum_server.client_count() if _callosum_server else 0 1083 1084 return { 1085 "services": services, 1086 "crashed": crashed, 1087 "tasks": tasks, 1088 "queues": queues, 1089 "stale_heartbeats": [], 1090 "schedules": schedules, 1091 "callosum_clients": callosum_clients, 1092 } 1093 1094 1095def start_sense() -> RunnerManagedProcess: 1096 """Launch sol sense with output logging.""" 1097 return _launch_process("sense", ["sol", "sense", "-v"], restart=True) 1098 1099 1100def start_callosum_in_process() -> CallosumServer: 1101 """Start Callosum message bus server in-process. 1102 1103 Runs the server in a background thread and waits for socket to be ready. 1104 1105 Returns: 1106 CallosumServer instance 1107 """ 1108 global _callosum_server, _callosum_thread 1109 1110 server = CallosumServer() 1111 _callosum_server = server 1112 1113 # Pre-delete stale socket to avoid race condition where the ready check 1114 # passes due to an old socket file before the server thread deletes it 1115 socket_path = server.socket_path 1116 socket_path.parent.mkdir(parents=True, exist_ok=True) 1117 if socket_path.exists(): 1118 socket_path.unlink() 1119 1120 # Start server in background thread (server.start() is blocking) 1121 thread = threading.Thread(target=server.start, daemon=False, name="callosum-server") 1122 thread.start() 1123 _callosum_thread = thread 1124 1125 # Wait for socket to be ready (with timeout) 1126 for _ in range(50): # Wait up to 500ms 1127 if socket_path.exists(): 1128 logging.info(f"Callosum server started on {socket_path}") 1129 return server 1130 time.sleep(0.01) 1131 1132 raise RuntimeError("Callosum server failed to create socket within 500ms") 1133 1134 1135def wait_for_convey_ready( 1136 convey_mp, *, timeout: float = 30.0, interval: float = 0.1 1137) -> bool: 1138 """Poll until Convey accepts TCP connections, or fail fast on death/timeout.""" 1139 start = time.monotonic() 1140 deadline = start + timeout 1141 while time.monotonic() < deadline: 1142 rc = convey_mp.process.poll() 1143 if rc is not None: 1144 logging.error( 1145 "Convey process exited during startup (rc=%d); continuing into supervise loop", 1146 rc, 1147 ) 1148 return False 1149 if is_solstone_up(timeout=0.1): 1150 logging.info("Convey ready after %.1fs", time.monotonic() - start) 1151 return True 1152 time.sleep(interval) 1153 alive = convey_mp.process.poll() is None 1154 logging.error( 1155 "Convey not ready after %.1fs (port=%s, pid alive=%s); continuing into supervise loop", 1156 time.monotonic() - start, 1157 read_service_port("convey"), 1158 alive, 1159 ) 1160 return False 1161 1162 1163def stop_callosum_in_process() -> None: 1164 """Stop the in-process Callosum server.""" 1165 global _callosum_server, _callosum_thread 1166 1167 if _callosum_server: 1168 logging.info("Stopping Callosum server...") 1169 _callosum_server.stop() 1170 1171 if _callosum_thread: 1172 _callosum_thread.join(timeout=5) 1173 if _callosum_thread.is_alive(): 1174 logging.warning("Callosum server thread did not stop cleanly") 1175 1176 _callosum_server = None 1177 _callosum_thread = None 1178 1179 1180def start_cortex_server() -> RunnerManagedProcess: 1181 """Launch the Cortex WebSocket API server.""" 1182 cmd = ["sol", "cortex", "-v"] 1183 return _launch_process("cortex", cmd, restart=True) 1184 1185 1186def start_link_server() -> RunnerManagedProcess: 1187 """Launch the link tunnel service (spl home-side endpoint).""" 1188 cmd = ["sol", "link", "-v"] 1189 return _launch_process("link", cmd, restart=True) 1190 1191 1192def start_convey_server( 1193 verbose: bool, debug: bool = False, port: int = 0 1194) -> tuple[RunnerManagedProcess, int]: 1195 """Launch the Convey web application with optional verbose and debug logging. 1196 1197 Returns: 1198 Tuple of (RunnerManagedProcess, resolved_port) where resolved_port is the 1199 actual port being used (auto-selected if port was 0). 1200 """ 1201 # Resolve port 0 to an available port before launching 1202 resolved_port = port if port != 0 else find_available_port() 1203 1204 cmd = ["sol", "convey", "--port", str(resolved_port)] 1205 if debug: 1206 cmd.append("-d") 1207 elif verbose: 1208 cmd.append("-v") 1209 return _launch_process("convey", cmd, restart=True), resolved_port 1210 1211 1212def check_runner_exits( 1213 procs: list[RunnerManagedProcess], 1214) -> list[RunnerManagedProcess]: 1215 """Return managed processes that have exited.""" 1216 1217 exited: list[RunnerManagedProcess] = [] 1218 for managed in procs: 1219 if managed.process.poll() is not None: 1220 exited.append(managed) 1221 return exited 1222 1223 1224async def handle_runner_exits(procs: list[RunnerManagedProcess]) -> None: 1225 """Check for and handle exited processes with restart policy.""" 1226 exited = check_runner_exits(procs) 1227 if not exited: 1228 return 1229 1230 exited_names = [managed.name for managed in exited] 1231 1232 # Check if all exits are tempfail (session not ready) 1233 all_tempfail = all(m.process.returncode == EXIT_TEMPFAIL for m in exited) 1234 1235 if all_tempfail: 1236 logging.info("Runner waiting for session: %s", ", ".join(sorted(exited_names))) 1237 else: 1238 msg = f"Runner process exited: {', '.join(sorted(exited_names))}" 1239 logging.error(msg) 1240 1241 for managed in exited: 1242 returncode = managed.process.returncode 1243 is_tempfail = returncode == EXIT_TEMPFAIL 1244 logging.info("%s exited with code %s", managed.name, returncode) 1245 1246 # Emit stopped event 1247 if _supervisor_callosum: 1248 _supervisor_callosum.emit( 1249 "supervisor", 1250 "stopped", 1251 service=managed.name, 1252 pid=managed.process.pid, 1253 ref=managed.ref, 1254 exit_code=returncode, 1255 ) 1256 1257 # Remove from procs list 1258 try: 1259 procs.remove(managed) 1260 except ValueError: 1261 pass 1262 1263 managed.cleanup() 1264 1265 # Handle restart if needed 1266 restart = _SERVICE_STATE.get(managed.name, {}).get("restart", False) 1267 if restart and not shutdown_requested: 1268 # Tempfail: use fixed longer delay, don't burn through backoff 1269 if is_tempfail: 1270 delay = TEMPFAIL_DELAY 1271 else: 1272 policy = _get_restart_policy(managed.name) 1273 uptime = time.time() - policy.last_start if policy.last_start else 0 1274 if uptime >= 60: 1275 policy.reset_attempts() 1276 delay = policy.next_delay() 1277 if delay: 1278 logging.info("Waiting %ss before restarting %s", delay, managed.name) 1279 for _ in range(delay): 1280 if shutdown_requested: 1281 break 1282 await asyncio.sleep(1) 1283 if shutdown_requested: 1284 continue 1285 logging.info("Restarting %s...", managed.name) 1286 try: 1287 state = _SERVICE_STATE.get(managed.name, {}) 1288 new_proc = _launch_process( 1289 managed.name, 1290 managed.cmd, 1291 restart=True, 1292 shutdown_timeout=state.get("shutdown_timeout", 15), 1293 ) 1294 except Exception as exc: 1295 logging.exception("Failed to restart %s: %s", managed.name, exc) 1296 continue 1297 1298 procs.append(new_proc) 1299 logging.info("Restarted %s after exit code %s", managed.name, returncode) 1300 else: 1301 logging.info("Not restarting %s", managed.name) 1302 1303 1304def handle_daily_tasks() -> None: 1305 """Check for day change and submit daily think for updated days (non-blocking). 1306 1307 Triggers once when the day rolls over at midnight. Queries ``updated_days()`` 1308 for journal days that have new stream data but haven't completed a daily 1309 think yet, then submits up to ``MAX_UPDATED_CATCHUP`` thinks in chronological 1310 order (oldest first, yesterday last) via the TaskQueue. 1311 1312 Think auto-detects updated state and enables ``--refresh`` internally, so we 1313 don't pass it here. 1314 1315 Skipped in remote mode (no local data to process). 1316 """ 1317 # Remote mode: no local processing, data is on the server 1318 if _is_remote_mode: 1319 return 1320 1321 today = datetime.now().date() 1322 1323 # Only trigger when day actually changes (at midnight) 1324 if today != _daily_state["last_day"]: 1325 # The day that just ended is what we process 1326 prev_day = _daily_state["last_day"] 1327 1328 # Guard against None (e.g., module reloaded without going through main()) 1329 if prev_day is None: 1330 logging.warning("Daily state not initialized, skipping daily processing") 1331 _daily_state["last_day"] = today 1332 return 1333 1334 prev_day_str = prev_day.strftime("%Y%m%d") 1335 1336 # Update state for new day 1337 _daily_state["last_day"] = today 1338 1339 # Flush any dangling segment state from the previous day before daily think 1340 if not _flush_state["flushed"] and _flush_state["day"] == prev_day_str: 1341 _check_segment_flush(force=True) 1342 1343 today_str = today.strftime("%Y%m%d") 1344 all_updated = updated_days(exclude={today_str}) 1345 1346 if not all_updated: 1347 logging.info("Day changed to %s, no updated days to process", today) 1348 return 1349 1350 # Take the newest MAX_UPDATED_CATCHUP days (already sorted ascending) 1351 days_to_process = all_updated[-MAX_UPDATED_CATCHUP:] 1352 skipped = len(all_updated) - len(days_to_process) 1353 1354 if skipped: 1355 logging.warning( 1356 "Skipping %d older updated days (max catchup %d): %s", 1357 skipped, 1358 MAX_UPDATED_CATCHUP, 1359 all_updated[:skipped], 1360 ) 1361 1362 logging.info( 1363 "Day changed to %s, queuing daily think for %d updated day(s): %s", 1364 today, 1365 len(days_to_process), 1366 days_to_process, 1367 ) 1368 1369 # Submit oldest-first so yesterday is processed last 1370 for day_str in days_to_process: 1371 cmd = ["sol", "think", "-v", "--day", day_str] 1372 if _task_queue: 1373 _task_queue.submit(cmd, day=day_str) 1374 logging.debug("Submitted daily think for %s", day_str) 1375 else: 1376 logging.warning( 1377 "No task queue available for daily processing: %s", day_str 1378 ) 1379 1380 1381def _handle_segment_observed(message: dict) -> None: 1382 """Handle segment completion events (from live observation or imports). 1383 1384 Submits sol think in segment mode via task queue, which handles both 1385 generators and segment agents. Also updates flush state to track 1386 segment recency. 1387 """ 1388 if message.get("tract") != "observe" or message.get("event") != "observed": 1389 return 1390 1391 segment = message.get("segment") # e.g., "163045_300" 1392 if not segment: 1393 logging.warning("observed event missing segment field") 1394 return 1395 1396 # Use day from event payload, fallback to today (for live observation) 1397 day = message.get("day") or datetime.now().strftime("%Y%m%d") 1398 stream = message.get("stream") 1399 1400 # Update flush state — new segment resets the flush timer 1401 _flush_state["last_segment_ts"] = time.time() 1402 _flush_state["day"] = day 1403 _flush_state["segment"] = segment 1404 _flush_state["stream"] = stream 1405 _flush_state["flushed"] = False 1406 1407 logging.info(f"Segment observed: {day}/{segment}, submitting processing...") 1408 1409 # Submit via task queue — serializes with other think invocations 1410 cmd = ["sol", "think", "-v", "--day", day, "--segment", segment] 1411 if stream: 1412 cmd.extend(["--stream", stream]) 1413 if _task_queue: 1414 _task_queue.submit(cmd, day=day) 1415 else: 1416 logging.warning( 1417 "No task queue available for segment processing: %s/%s", day, segment 1418 ) 1419 1420 1421def _check_segment_flush(force: bool = False) -> None: 1422 """Check if the last observed segment needs flushing. 1423 1424 If no new segments have arrived within FLUSH_TIMEOUT seconds, runs 1425 ``sol think --flush`` on the last segment to let flush-enabled agents 1426 close out dangling state (e.g., end active activities). 1427 1428 Args: 1429 force: Skip timeout check (used at day boundary to flush 1430 before daily think regardless of elapsed time). 1431 1432 Skipped in remote mode (no local processing). 1433 """ 1434 if _is_remote_mode: 1435 return 1436 1437 last_ts = _flush_state["last_segment_ts"] 1438 if not last_ts or _flush_state["flushed"]: 1439 return 1440 1441 if not force and time.time() - last_ts < FLUSH_TIMEOUT: 1442 return 1443 1444 day = _flush_state["day"] 1445 segment = _flush_state["segment"] 1446 if not day or not segment: 1447 return 1448 1449 _flush_state["flushed"] = True 1450 1451 stream = _flush_state.get("stream") 1452 cmd = ["sol", "think", "-v", "--day", day, "--segment", segment, "--flush"] 1453 if stream: 1454 cmd.extend(["--stream", stream]) 1455 if _task_queue: 1456 _task_queue.submit(cmd, day=day) 1457 logging.info(f"Queued segment flush: {day}/{segment}") 1458 else: 1459 logging.warning( 1460 "No task queue available for segment flush: %s/%s", day, segment 1461 ) 1462 1463 1464def _handle_segment_event_log(message: dict) -> None: 1465 """Log observe, think, and activity events with day+segment to segment/events.jsonl. 1466 1467 Any observe, think, or activity tract message with both day and segment fields 1468 gets logged to journal/day/segment/events.jsonl if that directory exists. 1469 """ 1470 if message.get("tract") not in {"observe", "think", "activity"}: 1471 return 1472 1473 day = message.get("day") 1474 segment = message.get("segment") 1475 1476 if not day or not segment: 1477 return 1478 1479 stream = message.get("stream") 1480 1481 try: 1482 if stream: 1483 segment_dir = day_path(day, create=False) / stream / segment 1484 else: 1485 segment_dir = day_path(day, create=False) / segment 1486 1487 # Only log if segment directory exists 1488 if not segment_dir.is_dir(): 1489 return 1490 1491 events_file = segment_dir / "events.jsonl" 1492 1493 # Append event as JSON line 1494 with open(events_file, "a", encoding="utf-8") as f: 1495 f.write(json.dumps(message, ensure_ascii=False) + "\n") 1496 1497 except Exception as e: 1498 logging.debug(f"Failed to log segment event: {e}") 1499 1500 1501def _handle_activity_recorded(message: dict) -> None: 1502 """Queue a per-activity think task when an activity is recorded. 1503 1504 Listens for activity.recorded events and submits a queued think task 1505 for per-activity agent processing (serialized via TaskQueue). 1506 """ 1507 if message.get("tract") != "activity" or message.get("event") != "recorded": 1508 return 1509 1510 record_id = message.get("id") 1511 facet = message.get("facet") 1512 day = message.get("day") 1513 1514 if not record_id or not facet or not day: 1515 logging.warning("activity.recorded event missing required fields") 1516 return 1517 1518 cmd = ["sol", "think", "--activity", record_id, "--facet", facet, "--day", day] 1519 1520 if _task_queue: 1521 _task_queue.submit(cmd, day=day) 1522 logging.info(f"Queued activity think: {record_id} for #{facet}") 1523 else: 1524 logging.warning("No task queue available for activity think: %s", record_id) 1525 1526 1527def _handle_think_daily_complete(message: dict) -> None: 1528 """Submit a heartbeat task after daily think processing completes. 1529 1530 Listens for think.daily_complete events. Skips if a heartbeat process 1531 is already running (PID file guard). 1532 """ 1533 if message.get("tract") != "think" or message.get("event") != "daily_complete": 1534 return 1535 1536 # Check if heartbeat is already running via PID file 1537 pid_file = Path(get_journal()) / "health" / "heartbeat.pid" 1538 if pid_file.exists(): 1539 try: 1540 existing_pid = int(pid_file.read_text().strip()) 1541 os.kill(existing_pid, 0) 1542 logging.info("Heartbeat already running (pid=%d), skipping", existing_pid) 1543 return 1544 except ProcessLookupError: 1545 pass # Stale PID file, proceed 1546 except PermissionError: 1547 logging.info( 1548 "Heartbeat running under different user (pid file exists), skipping" 1549 ) 1550 return 1551 except ValueError: 1552 pass # Corrupt PID file, proceed 1553 1554 cmd = ["sol", "heartbeat"] 1555 if _task_queue: 1556 _task_queue.submit(cmd) 1557 logging.info("Queued heartbeat after daily think completion") 1558 else: 1559 logging.warning("No task queue available for heartbeat submission") 1560 1561 1562def _handle_callosum_message(message: dict) -> None: 1563 """Dispatch incoming Callosum messages to appropriate handlers.""" 1564 _handle_task_request(message) 1565 _handle_supervisor_request(message) 1566 _handle_segment_observed(message) 1567 _handle_activity_recorded(message) 1568 _handle_think_daily_complete(message) 1569 _handle_segment_event_log(message) 1570 1571 1572async def supervise( 1573 *, 1574 daily: bool = True, 1575 schedule: bool = True, 1576 procs: list[RunnerManagedProcess] | None = None, 1577) -> None: 1578 """Main supervision loop. Runs at 1-second intervals for responsiveness. 1579 1580 Monitors runner health, emits status, triggers daily processing, 1581 and checks scheduled agents. 1582 """ 1583 last_status_emit = 0.0 1584 1585 try: 1586 while ( 1587 not shutdown_requested 1588 ): # pragma: no cover - loop checked via unit tests by patching 1589 if _task_queue: 1590 _task_queue.enforce_deadlines(time.time()) 1591 1592 # Check for runner exits first (immediate alert) 1593 if procs: 1594 await handle_runner_exits(procs) 1595 1596 # Emit status every 5 seconds 1597 now = time.time() 1598 if now - last_status_emit >= 5: 1599 if _supervisor_callosum and procs: 1600 try: 1601 status = collect_status(procs) 1602 _supervisor_callosum.emit("supervisor", "status", **status) 1603 except Exception as e: 1604 logging.debug(f"Status emission failed: {e}") 1605 last_status_emit = now 1606 1607 # Check for segment flush (non-blocking, submits via task queue) 1608 _check_segment_flush() 1609 1610 # Check for daily processing (non-blocking, submits via task queue) 1611 if daily: 1612 handle_daily_tasks() 1613 1614 # Check periodic task schedules (non-blocking, submits via callosum) 1615 if schedule: 1616 scheduler.check() 1617 routines.check() 1618 1619 # Sleep 1 second before next iteration (responsive to shutdown) 1620 await asyncio.sleep(1) 1621 finally: 1622 pass # Callosum cleanup happens in main() 1623 1624 1625def parse_args() -> argparse.ArgumentParser: 1626 parser = SupervisorArgumentParser(description="Monitor journaling health") 1627 parser.add_argument( 1628 "port", 1629 nargs="?", 1630 type=int, 1631 default=0, 1632 help="Convey port (0 = auto-select available port)", 1633 ) 1634 parser.add_argument( 1635 "--threshold", 1636 type=int, 1637 default=DEFAULT_THRESHOLD, 1638 help="Seconds before heartbeat considered stale", 1639 ) 1640 parser.add_argument( 1641 "--interval", type=int, default=CHECK_INTERVAL, help="Polling interval seconds" 1642 ) 1643 parser.add_argument( 1644 "--no-daily", 1645 action="store_true", 1646 help="Disable daily processing run at midnight", 1647 ) 1648 parser.add_argument( 1649 "--no-cortex", 1650 action="store_true", 1651 help="Do not start the Cortex server (run it manually for debugging)", 1652 ) 1653 parser.add_argument( 1654 "--no-link", 1655 action="store_true", 1656 help="Do not start the link tunnel service", 1657 ) 1658 parser.add_argument( 1659 "--no-convey", 1660 action="store_true", 1661 help="Do not start the Convey web application", 1662 ) 1663 parser.add_argument( 1664 "--no-schedule", 1665 action="store_true", 1666 help="Disable periodic task scheduler", 1667 ) 1668 parser.add_argument( 1669 "--remote", 1670 type=str, 1671 help="Remote mode: URL for segment transfer (not yet implemented)", 1672 ) 1673 return parser 1674 1675 1676def handle_shutdown(signum, frame): 1677 """Handle shutdown signals gracefully.""" 1678 global shutdown_requested 1679 if not shutdown_requested: # Only log once 1680 shutdown_requested = True 1681 logging.info("Shutdown requested, cleaning up...") 1682 raise KeyboardInterrupt 1683 1684 1685def main() -> None: 1686 parser = parse_args() 1687 1688 # Capture journal info BEFORE setup_cli() loads .env and pollutes os.environ 1689 journal_info = get_journal_info() 1690 1691 args = setup_cli(parser) 1692 1693 journal_path = _get_journal_path() 1694 1695 log_level = logging.DEBUG if args.debug else logging.INFO 1696 log_path = journal_path / "health" / "supervisor.log" 1697 log_path.parent.mkdir(parents=True, exist_ok=True) 1698 logging.getLogger().handlers = [] 1699 logging.basicConfig( 1700 level=log_level, 1701 handlers=[logging.FileHandler(log_path, encoding="utf-8")], 1702 format="%(asctime)s [supervisor:log] %(levelname)s %(message)s", 1703 datefmt="%Y-%m-%dT%H:%M:%S", 1704 ) 1705 1706 if args.verbose or args.debug: 1707 console_handler = logging.StreamHandler() 1708 console_handler.setLevel(log_level) 1709 console_handler.setFormatter( 1710 logging.Formatter("%(asctime)s %(levelname)s %(message)s") 1711 ) 1712 logging.getLogger().addHandler(console_handler) 1713 1714 # Singleton guard: only one supervisor per journal 1715 health_dir = journal_path / "health" 1716 lock_path = health_dir / "supervisor.lock" 1717 pid_path = health_dir / "supervisor.pid" 1718 import fcntl 1719 1720 lock_fd = open(lock_path, "w") 1721 try: 1722 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 1723 except OSError: 1724 lock_fd.close() 1725 pid_str = "" 1726 try: 1727 pid_str = pid_path.read_text().strip() 1728 except OSError: 1729 pass 1730 pid_msg = f" (PID {pid_str})" if pid_str else "" 1731 sock_path = health_dir / "callosum.sock" 1732 if sock_path.exists(): 1733 try: 1734 from think.health_cli import health_check 1735 1736 print(f"Supervisor already running{pid_msg}\n") 1737 health_check() 1738 except Exception: 1739 print(f"Supervisor already running{pid_msg}") 1740 else: 1741 print(f"Supervisor already running{pid_msg}") 1742 sys.exit(1) 1743 pid_path.write_text(str(os.getpid())) 1744 start_time_path = health_dir / "supervisor.start_time" 1745 # Written here, not at _supervisor_start, to minimize drift from psutil create_time(). 1746 start_time_path.write_text(str(time.time())) 1747 logging.info("Singleton lock acquired (PID %d)", os.getpid()) 1748 _sweep_cgroup_at_startup() 1749 1750 # Set up signal handlers 1751 signal.signal(signal.SIGINT, handle_shutdown) 1752 signal.signal(signal.SIGTERM, handle_shutdown) 1753 1754 # Show journal path and source on startup 1755 path, source = journal_info 1756 print(f"Journal: {path} (from {source})") 1757 logging.info("Supervisor starting...") 1758 1759 global _managed_procs, _supervisor_callosum, _is_remote_mode 1760 global _digest_submitted_this_boot 1761 global _task_queue 1762 procs: list[RunnerManagedProcess] = [] 1763 convey_port = None 1764 1765 # Remote mode: run sync instead of local processing 1766 _is_remote_mode = bool(args.remote) 1767 _digest_submitted_this_boot = False 1768 1769 # Run pending journal-maintenance tasks before spawning any writer children. 1770 # Callosum isn't up yet (emit_fn=None); migrations log through supervisor's logger only. 1771 try: 1772 ran, succeeded = run_pending_tasks(journal_path, emit_fn=None) 1773 if ran > 0: 1774 print(f" Ran {ran} maintenance task(s)", flush=True) 1775 if ran == succeeded: 1776 logging.info("Completed %d/%d maintenance task(s)", succeeded, ran) 1777 else: 1778 logging.error( 1779 "Maintenance tasks completed with failures: %d/%d succeeded", 1780 succeeded, 1781 ran, 1782 ) 1783 except Exception: 1784 logging.exception("Maintenance runner raised; continuing startup") 1785 1786 try: 1787 from think.importers.journal_archive import sweep_stale_extract_dirs 1788 1789 swept = sweep_stale_extract_dirs() 1790 if swept > 0: 1791 logging.info("Swept %d stale journal-archive extract dir(s)", swept) 1792 except Exception: 1793 logging.exception("Journal archive extract sweep raised; continuing startup") 1794 1795 # Start Callosum in-process first - it's the message bus that other services depend on 1796 try: 1797 print(" Starting Callosum bus...", flush=True) 1798 start_callosum_in_process() 1799 except RuntimeError as e: 1800 logging.error(f"Failed to start Callosum server: {e}") 1801 parser.error(f"Failed to start Callosum server: {e}") 1802 return 1803 1804 # Connect supervisor's Callosum client to capture startup events from other services 1805 try: 1806 _supervisor_callosum = CallosumConnection(defaults={"rev": get_rev()}) 1807 _supervisor_callosum.start(callback=_handle_callosum_message) 1808 logging.info("Supervisor connected to Callosum") 1809 except Exception as e: 1810 logging.warning(f"Failed to start Callosum connection: {e}") 1811 1812 # Mirror supervisor log output to callosum logs tract (best-effort) 1813 supervisor_ref = str(now_ms()) 1814 global _supervisor_ref, _supervisor_start 1815 _supervisor_ref = supervisor_ref 1816 _supervisor_start = time.time() 1817 if _supervisor_callosum: 1818 try: 1819 handler = CallosumLogHandler(_supervisor_callosum, supervisor_ref) 1820 handler.setFormatter( 1821 logging.Formatter("%(asctime)s %(levelname)s %(message)s") 1822 ) 1823 logging.getLogger().addHandler(handler) 1824 except Exception: 1825 pass 1826 1827 # Initialize task queue with callosum event callback 1828 _task_queue = TaskQueue(on_queue_change=_emit_queue_event, ready=False) 1829 1830 # Now start other services (their startup events will be captured) 1831 if _is_remote_mode: 1832 # Remote mode: transfer send will be added here 1833 pass 1834 else: 1835 # Local mode: convey first, then sense for file processing 1836 os.environ["SOL_SUPERVISOR_SPAWNED"] = "1" 1837 if not args.no_convey: 1838 print(f" Starting convey on port {args.port}...", flush=True) 1839 proc, convey_port = start_convey_server( 1840 verbose=args.verbose, debug=args.debug, port=args.port 1841 ) 1842 procs.append(proc) 1843 wait_for_convey_ready(proc) 1844 print(" Convey ready", flush=True) 1845 # Sense handles file processing 1846 print(" Starting sense...", flush=True) 1847 procs.append(start_sense()) 1848 # Cortex for agent execution 1849 if not args.no_cortex: 1850 print(" Starting cortex...", flush=True) 1851 procs.append(start_cortex_server()) 1852 # Link tunnel service (opt-out via --no-link) 1853 if not args.no_link: 1854 print(" Starting link...", flush=True) 1855 procs.append(start_link_server()) 1856 1857 # Make procs accessible to restart handler 1858 _managed_procs = procs 1859 1860 # Initialize daily state to today - think only triggers at midnight when day changes 1861 _daily_state["last_day"] = datetime.now().date() 1862 1863 # Initialize periodic task scheduler 1864 schedule_enabled = not args.no_schedule and not _is_remote_mode 1865 if schedule_enabled and _supervisor_callosum: 1866 scheduler.init(_supervisor_callosum) 1867 scheduler.register_defaults() 1868 if _task_queue: 1869 for cmd, seconds in scheduler.collect_runtime_caps(): 1870 cmd_name = TaskQueue.get_command_name(cmd) 1871 _task_queue.set_cap(cmd_name, seconds) 1872 logging.info( 1873 "Registered max_runtime cap for %s: %ss", 1874 cmd_name, 1875 seconds, 1876 ) 1877 routines.init(_supervisor_callosum) 1878 1879 if _task_queue: 1880 _task_queue.set_ready() 1881 _maybe_submit_startup_digest(no_cortex=args.no_cortex) 1882 1883 # Show Convey URL if running 1884 if convey_port: 1885 print(f"Convey: http://localhost:{convey_port}/") 1886 1887 logging.info(f"Started {len(procs)} processes, entering supervision loop") 1888 daily_enabled = not args.no_daily and not _is_remote_mode 1889 if daily_enabled: 1890 logging.info("Daily processing scheduled for midnight") 1891 1892 # Startup catchup: submit thinks for days with pending stream data 1893 if daily_enabled: 1894 all_updated = updated_days() 1895 if all_updated: 1896 days_to_process = all_updated[-MAX_UPDATED_CATCHUP:] 1897 skipped = len(all_updated) - len(days_to_process) 1898 1899 if skipped: 1900 logging.warning( 1901 "Startup catchup: skipping %d older updated days (max %d): %s", 1902 skipped, 1903 MAX_UPDATED_CATCHUP, 1904 all_updated[:skipped], 1905 ) 1906 1907 logging.info( 1908 "Startup catchup: submitted %d day(s) with pending stream data: %s", 1909 len(days_to_process), 1910 days_to_process, 1911 ) 1912 1913 for day_str in days_to_process: 1914 cmd = ["sol", "think", "-v", "--day", day_str] 1915 if _task_queue: 1916 _task_queue.submit(cmd, day=day_str) 1917 logging.debug("Startup catchup: submitted think for %s", day_str) 1918 else: 1919 logging.warning( 1920 "No task queue available for startup catchup: %s", day_str 1921 ) 1922 1923 try: 1924 print(" Supervisor ready", flush=True) 1925 asyncio.run( 1926 supervise( 1927 daily=daily_enabled, 1928 schedule=schedule_enabled, 1929 procs=procs if procs else None, 1930 ) 1931 ) 1932 except KeyboardInterrupt: 1933 logging.info("Caught KeyboardInterrupt, shutting down...") 1934 finally: 1935 logging.info("Stopping all processes...") 1936 print("\nShutting down gracefully (this may take a moment)...", flush=True) 1937 1938 if _task_queue: 1939 _task_queue.shutdown(timeout=10) 1940 1941 # Stop services in reverse order 1942 for managed in reversed(procs): 1943 _stop_process(managed) 1944 1945 if schedule_enabled: 1946 try: 1947 routines.save_state() 1948 except Exception as exc: 1949 logging.warning("Failed to save routines state on shutdown: %s", exc) 1950 1951 # Disconnect supervisor's Callosum connection 1952 if _supervisor_callosum: 1953 _supervisor_callosum.stop() 1954 logging.info("Supervisor disconnected from Callosum") 1955 1956 # Stop in-process Callosum server last 1957 stop_callosum_in_process() 1958 1959 logging.info("Supervisor shutdown complete.") 1960 print("Shutdown complete.", flush=True) 1961 1962 1963if __name__ == "__main__": 1964 main()