personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4from __future__ import annotations
5
6import argparse
7import asyncio
8import concurrent.futures
9import json
10import logging
11import os
12import signal
13import subprocess
14import sys
15import tempfile
16import threading
17import time
18from collections import deque
19from datetime import datetime
20from pathlib import Path
21from typing import Any
22
23import psutil
24
25from think import routines, scheduler
26from think.callosum import CallosumConnection, CallosumServer
27from think.maint import run_pending_tasks
28from think.runner import ManagedProcess as RunnerManagedProcess
29from think.utils import (
30 EXIT_TEMPFAIL,
31 day_path,
32 find_available_port,
33 get_journal,
34 get_journal_info,
35 get_rev,
36 is_solstone_up,
37 now_ms,
38 read_service_port,
39 setup_cli,
40 updated_days,
41)
42
43DEFAULT_THRESHOLD = 60
44CHECK_INTERVAL = 30
45MAX_UPDATED_CATCHUP = 4
46TEMPFAIL_DELAY = 15 # seconds to wait before retrying a tempfail exit
47logger = logging.getLogger(__name__)
48_SERVICE_LIFECYCLE_VERBS = {
49 "start",
50 "stop",
51 "restart",
52 "status",
53 "install",
54 "uninstall",
55 "logs",
56}
57
58# Global shutdown flag
59shutdown_requested = False
60# Supervisor identity (set in main() once ref is assigned)
61_supervisor_ref: str | None = None
62_supervisor_start: float | None = None
63
64
65def _parse_self_cgroup_path(text: str) -> str | None:
66 """Parse cgroup v2 line `0::/path` from /proc/self/cgroup contents."""
67 for line in text.splitlines():
68 if line.startswith("0::"):
69 return line[3:].strip() or None
70 return None
71
72
73def _read_self_cgroup_path() -> str | None:
74 try:
75 return _parse_self_cgroup_path(Path("/proc/self/cgroup").read_text())
76 except OSError:
77 return None
78
79
80def _is_systemd_service_cgroup(cgroup_path: str | None) -> bool:
81 # Only sweep when this supervisor owns the cgroup; dev shells share scopes.
82 if not cgroup_path:
83 return False
84 from think.service import SYSTEMD_UNIT
85
86 return f"{SYSTEMD_UNIT}.service" in cgroup_path
87
88
89def _sweep_cgroup_at_startup(grace: float = 5.0) -> int:
90 if sys.platform != "linux":
91 return 0
92 cgroup_path = _read_self_cgroup_path()
93 if not _is_systemd_service_cgroup(cgroup_path):
94 return 0
95 procs_file = Path("/sys/fs/cgroup") / cgroup_path.lstrip("/") / "cgroup.procs"
96 try:
97 pids = [int(pid) for pid in procs_file.read_text().split() if pid.strip()]
98 except OSError:
99 return 0
100
101 own_pid = os.getpid()
102 targets = [pid for pid in pids if pid != own_pid]
103 if not targets:
104 return 0
105
106 logger.info("cgroup sweep: terminating %d residual process(es)", len(targets))
107 for pid in targets:
108 try:
109 os.kill(pid, signal.SIGTERM)
110 except ProcessLookupError:
111 pass
112
113 deadline = time.time() + grace
114 while time.time() < deadline:
115 if not any(psutil.pid_exists(pid) for pid in targets):
116 break
117 time.sleep(0.2)
118
119 survivors = [pid for pid in targets if psutil.pid_exists(pid)]
120 for pid in survivors:
121 try:
122 os.kill(pid, signal.SIGKILL)
123 except ProcessLookupError:
124 pass
125 return len(targets)
126
127
128class CallosumLogHandler(logging.Handler):
129 """Logging handler that emits log records as callosum ``logs`` tract events.
130
131 Silently drops events on any error — callosum mirroring is best-effort.
132 """
133
134 def __init__(self, conn: CallosumConnection, ref: str):
135 super().__init__()
136 self._conn = conn
137 self._ref = ref
138 self._pid = os.getpid()
139 self._emitting = False
140
141 def emit(self, record: logging.LogRecord) -> None:
142 if self._emitting:
143 return
144 self._emitting = True
145 try:
146 self._conn.emit(
147 "logs",
148 "line",
149 ref=self._ref,
150 name="supervisor",
151 pid=self._pid,
152 stream="log",
153 line=self.format(record),
154 )
155 except Exception:
156 pass
157 finally:
158 self._emitting = False
159
160
161class SupervisorArgumentParser(argparse.ArgumentParser):
162 def error(self, message: str) -> None:
163 mistaken = next(
164 (arg for arg in sys.argv[1:] if arg in _SERVICE_LIFECYCLE_VERBS),
165 None,
166 )
167 if mistaken:
168 self.exit(
169 2,
170 "sol supervisor is the server-launch command (takes a port). "
171 "For lifecycle, use: sol service <verb>. "
172 f"Did you mean: sol service {mistaken} ?\n",
173 )
174 super().error(message)
175
176
177class TaskQueue:
178 """Manages on-demand task execution with per-command serialization.
179
180 Tasks are serialized by command name - only one task per command runs at a time.
181 Additional requests for the same command are queued (deduped by exact cmd match).
182 Multiple callers requesting the same work have their refs coalesced so all get
183 notified when the task completes.
184
185 The lock only protects state mutations, never held during I/O operations.
186 """
187
188 def __init__(self, on_queue_change: callable = None, ready: bool = True):
189 """Initialize task queue.
190
191 Args:
192 on_queue_change: Optional callback(cmd_name, running_ref, queue_entries)
193 called after queue state changes. Called outside lock.
194 """
195 self._running: dict[
196 str, dict
197 ] = {} # command_name -> {"ref": str, "thread": Thread}
198 self._queues: dict[str, list] = {} # command_name -> list of {refs, cmd} dicts
199 self._active: dict[str, RunnerManagedProcess] = {} # ref -> process
200 self._history: deque[dict[str, Any]] = deque(maxlen=100)
201 self._cap_terminated: set[str] = set()
202 self._caps: dict[str, int] = {}
203 self._pending: list[dict] = []
204 self._ready = ready
205 self._lock = threading.Lock()
206 self._on_queue_change = on_queue_change
207
208 @staticmethod
209 def get_command_name(cmd: list[str]) -> str:
210 """Extract command name from cmd array for queue serialization.
211
212 For 'sol X' commands, returns X. Otherwise returns cmd[0] basename.
213 """
214 if cmd and cmd[0] == "sol" and len(cmd) > 1:
215 return cmd[1]
216 return Path(cmd[0]).name if cmd else "unknown"
217
218 def _notify_queue_change(self, cmd_name: str) -> None:
219 """Notify listener of queue state change (called outside lock)."""
220 if not self._on_queue_change:
221 return
222
223 with self._lock:
224 if cmd_name == "pending":
225 queue = list(self._pending)
226 running_ref = None
227 else:
228 queue = list(self._queues.get(cmd_name, []))
229 entry = self._running.get(cmd_name)
230 running_ref = entry["ref"] if entry else None
231
232 self._on_queue_change(cmd_name, running_ref, queue)
233
234 def submit(
235 self,
236 cmd: list[str],
237 ref: str | None = None,
238 day: str | None = None,
239 scheduler_name: str | None = None,
240 ) -> str | None:
241 """Submit a task for execution.
242
243 If no task of this command type is running, starts immediately.
244 Otherwise queues (deduped by exact cmd match, refs coalesced).
245
246 Args:
247 cmd: Command to execute
248 ref: Optional caller-provided ref for tracking
249 day: Optional day override (YYYYMMDD) for log placement
250
251 Returns:
252 ref if task was started/queued, None if already tracked (no change)
253 """
254 ref = ref or str(now_ms())
255 cmd_name = self.get_command_name(cmd)
256
257 with self._lock:
258 if not self._ready:
259 self._pending.append(
260 {
261 "refs": [ref],
262 "cmd": cmd,
263 "day": day,
264 "scheduler_name": scheduler_name,
265 }
266 )
267 should_notify_pending = True
268 else:
269 should_notify_pending = False
270
271 if should_notify_pending:
272 self._notify_queue_change("pending")
273 return ref
274
275 should_notify = False
276 should_start = False
277
278 with self._lock:
279 # Detect stale running state (task thread died without clearing queue)
280 if cmd_name in self._running:
281 stale = self._running[cmd_name]
282 if stale["thread"] is not None and not stale["thread"].is_alive():
283 logging.warning(
284 f"Clearing stale {cmd_name} queue "
285 f"(thread dead, ref={stale['ref']})"
286 )
287 self._running.pop(cmd_name)
288
289 if cmd_name in self._running:
290 # Command already running - queue or coalesce
291 queue = self._queues.setdefault(cmd_name, [])
292 existing = next((q for q in queue if q["cmd"] == cmd), None)
293 if existing:
294 if ref not in existing["refs"]:
295 existing["refs"].append(ref)
296 logging.info(
297 f"Added ref {ref} to queued task {cmd_name} "
298 f"(refs: {len(existing['refs'])})"
299 )
300 should_notify = True
301 else:
302 logging.debug(f"Ref already tracked for queued task: {ref}")
303 return None
304 else:
305 queue.append(
306 {
307 "refs": [ref],
308 "cmd": cmd,
309 "day": day,
310 "scheduler_name": scheduler_name,
311 }
312 )
313 logging.info(
314 f"Queued task {cmd_name}: {' '.join(cmd)} ref={ref} "
315 f"(queue: {len(queue)})"
316 )
317 should_notify = True
318 else:
319 # Not running - mark as running and start
320 # Thread is set to None here; _run_task registers it on entry
321 self._running[cmd_name] = {
322 "ref": ref,
323 "thread": None,
324 "scheduler_name": scheduler_name,
325 }
326 should_start = True
327
328 # Notify outside lock
329 if should_notify:
330 self._notify_queue_change(cmd_name)
331 return ref
332
333 # Start task outside lock
334 if should_start:
335 threading.Thread(
336 target=self._run_task,
337 args=([ref], cmd, cmd_name, day, scheduler_name),
338 daemon=True,
339 ).start()
340 return ref
341
342 return None
343
344 def set_cap(self, cmd_name: str, seconds: int) -> None:
345 """Set a max runtime cap in seconds for a queued command name."""
346 with self._lock:
347 self._caps[cmd_name] = seconds
348
349 def get_active_by_cmd_name(self, name: str) -> str | None:
350 """Return the first active ref matching a command name."""
351 with self._lock:
352 for ref, managed in self._active.items():
353 if self.get_command_name(managed.cmd) == name:
354 return ref
355 return None
356
357 def enforce_deadlines(self, now: float) -> None:
358 """Enforce configured task runtime caps without blocking the supervisor tick."""
359 with self._lock:
360 for ref, managed in list(self._active.items()):
361 cmd_name = self.get_command_name(managed.cmd)
362 cap = self._caps.get(cmd_name)
363 if not cap:
364 continue
365
366 elapsed = now - managed.start_time
367 if elapsed <= cap:
368 continue
369
370 elapsed_seconds = int(elapsed)
371 logging.warning(
372 "Task %s (cmd=%s, ref=%s) exceeded max_runtime of %ds "
373 "(elapsed=%ds); terminating",
374 cmd_name,
375 " ".join(managed.cmd),
376 ref,
377 cap,
378 elapsed_seconds,
379 )
380 self._cap_terminated.add(ref)
381 _start_termination_thread(ref, managed, timeout=2.0, reason="cap")
382
383 def set_ready(self) -> None:
384 """Allow buffered tasks to start dispatching through the normal queue path."""
385 with self._lock:
386 if self._ready:
387 return
388 self._ready = True
389 pending = list(self._pending)
390 self._pending.clear()
391
392 if pending:
393 self._notify_queue_change("pending")
394 for entry in pending:
395 self.submit(
396 entry["cmd"],
397 ref=entry["refs"][0],
398 day=entry.get("day"),
399 scheduler_name=entry.get("scheduler_name"),
400 )
401
402 def _run_task(
403 self,
404 refs: list[str],
405 cmd: list[str],
406 cmd_name: str,
407 day: str | None = None,
408 scheduler_name: str | None = None,
409 ) -> None:
410 """Execute a task and handle completion.
411
412 Args:
413 refs: List of refs to notify on completion
414 cmd: Command to execute
415 cmd_name: Command name for queue management
416 day: Optional day override (YYYYMMDD) for log placement
417 """
418 # Register this thread for stale-queue detection
419 with self._lock:
420 if cmd_name in self._running and self._running[cmd_name]["ref"] == refs[0]:
421 self._running[cmd_name]["thread"] = threading.current_thread()
422
423 callosum = CallosumConnection()
424 managed = None
425 primary_ref = refs[0]
426 service = cmd_name
427 exit_status = "error"
428
429 try:
430 callosum.start()
431 logging.info(f"Starting task {primary_ref}: {' '.join(cmd)}")
432
433 managed = RunnerManagedProcess.spawn(
434 cmd, ref=primary_ref, callosum=callosum, day=day
435 )
436 with self._lock:
437 self._active[primary_ref] = managed
438
439 callosum.emit(
440 "supervisor",
441 "started",
442 service=service,
443 pid=managed.pid,
444 ref=primary_ref,
445 )
446
447 exit_code = managed.wait()
448 exit_status = "ok" if exit_code == 0 else "error"
449
450 for ref in refs:
451 callosum.emit(
452 "supervisor",
453 "stopped",
454 service=service,
455 pid=managed.pid,
456 ref=ref,
457 exit_code=exit_code,
458 )
459
460 if exit_code == 0:
461 logging.info(f"Task {cmd_name} ({primary_ref}) finished successfully")
462 else:
463 logging.warning(
464 f"Task {cmd_name} ({primary_ref}) failed with exit code {exit_code}"
465 )
466
467 except Exception as e:
468 if isinstance(e, subprocess.TimeoutExpired):
469 exit_status = "timeout"
470 logging.exception(
471 f"Task {cmd_name} ({primary_ref}) encountered exception: {e}"
472 )
473 for ref in refs:
474 callosum.emit(
475 "supervisor",
476 "stopped",
477 service=service,
478 pid=managed.pid if managed else 0,
479 ref=ref,
480 exit_code=-1,
481 )
482 finally:
483 try:
484 if managed:
485 managed.cleanup()
486 except Exception:
487 logging.exception(f"Task {cmd_name} ({primary_ref}): cleanup failed")
488 with self._lock:
489 self._active.pop(primary_ref, None)
490 if primary_ref in self._cap_terminated:
491 exit_status = "timeout"
492 self._cap_terminated.discard(primary_ref)
493 ended_at = time.time()
494 self._history.append(
495 {
496 "name": cmd_name,
497 "cmd": list(cmd),
498 "ref": primary_ref,
499 "ended_at": ended_at,
500 "exit_status": exit_status,
501 "scheduler_name": scheduler_name,
502 }
503 )
504 if scheduler_name:
505 try:
506 _record_scheduler_completion(
507 scheduler_name,
508 ended_at=ended_at,
509 exit_status=exit_status,
510 ref=primary_ref,
511 cmd=cmd,
512 )
513 except Exception as exc:
514 logger.warning("scheduler completion writeback failed: %s", exc)
515 try:
516 callosum.stop()
517 except Exception:
518 logging.exception(
519 f"Task {cmd_name} ({primary_ref}): callosum stop failed"
520 )
521 self._process_next(cmd_name)
522
523 def _process_next(self, cmd_name: str) -> None:
524 """Process next queued task after completion."""
525 next_cmd = None
526 refs = None
527 day = None
528 scheduler_name = None
529
530 with self._lock:
531 queue = self._queues.get(cmd_name, [])
532 if queue:
533 entry = queue.pop(0)
534 refs = entry["refs"]
535 next_cmd = entry["cmd"]
536 day = entry.get("day")
537 scheduler_name = entry.get("scheduler_name")
538 # Thread is set to None here; _run_task registers it on entry
539 self._running[cmd_name] = {
540 "ref": refs[0],
541 "thread": None,
542 "scheduler_name": scheduler_name,
543 }
544 logging.info(
545 f"Dequeued task {cmd_name}: {' '.join(next_cmd)} refs={refs} "
546 f"(remaining: {len(queue)})"
547 )
548 else:
549 self._running.pop(cmd_name, None)
550
551 # Notify and spawn outside lock
552 self._notify_queue_change(cmd_name)
553 if next_cmd:
554 threading.Thread(
555 target=self._run_task,
556 args=(refs, next_cmd, cmd_name, day, scheduler_name),
557 daemon=True,
558 ).start()
559
560 def cancel(self, ref: str) -> bool:
561 """Cancel a running task.
562
563 Returns:
564 True if task was found and terminated, False otherwise
565 """
566 if ref not in self._active:
567 logging.warning(f"Cannot cancel task {ref}: not found")
568 return False
569
570 managed = self._active[ref]
571 if not managed.is_running():
572 logging.debug(f"Task {ref} already finished")
573 return False
574
575 logging.info(f"Cancelling task {ref}...")
576 managed.terminate()
577 return True
578
579 def shutdown(self, timeout: float = 10.0) -> int:
580 with self._lock:
581 active = list(self._active.items())
582 if not active:
583 return 0
584
585 def _terminate(item: tuple[str, RunnerManagedProcess]) -> None:
586 ref, managed = item
587 try:
588 managed.terminate(timeout=timeout)
589 except subprocess.TimeoutExpired:
590 logger.warning(
591 "task %s did not exit within %ss; KILL sent", ref, timeout
592 )
593 except OSError as exc:
594 logger.warning("task %s terminate raised: %s", ref, exc)
595
596 with concurrent.futures.ThreadPoolExecutor(max_workers=len(active)) as executor:
597 list(executor.map(_terminate, active))
598 return len(active)
599
600 def get_status(self, ref: str) -> dict:
601 """Get status of a task."""
602 if ref not in self._active:
603 return {"status": "not_found"}
604
605 managed = self._active[ref]
606 return {
607 "status": "running" if managed.is_running() else "finished",
608 "pid": managed.pid,
609 "returncode": managed.returncode,
610 "log_path": str(managed.log_writer.path),
611 "cmd": managed.cmd,
612 }
613
614 def collect_task_status(self) -> list[dict]:
615 """Collect status of all running tasks for supervisor status."""
616 now = time.time()
617 tasks = []
618 for ref, managed in self._active.items():
619 if managed.is_running():
620 duration = int(now - managed.start_time)
621 cmd_name = TaskQueue.get_command_name(managed.cmd)
622 tasks.append(
623 {
624 "ref": ref,
625 "name": cmd_name,
626 "duration_seconds": duration,
627 }
628 )
629 return tasks
630
631 def collect_queue_counts(self) -> dict[str, int]:
632 """Snapshot per-command queue depths for status reporting."""
633 with self._lock:
634 counts = {
635 cmd_name: len(queue)
636 for cmd_name, queue in self._queues.items()
637 if queue
638 }
639 if self._pending:
640 counts["pending"] = len(self._pending)
641 return counts
642
643
644# Global task queue instance (initialized in main())
645_task_queue: TaskQueue | None = None
646
647# Global supervisor callosum connection for event emissions
648_supervisor_callosum: CallosumConnection | None = None
649
650# Global reference to managed processes for restart control
651_managed_procs: list[RunnerManagedProcess] = []
652_SERVICE_STATE: dict[str, dict[str, Any]] = {}
653_termination_threads: dict[str, threading.Thread] = {}
654_termination_threads_lock = threading.Lock()
655_SCHEDULER_JSON_LOCK = threading.Lock()
656
657# Global reference to in-process Callosum server
658_callosum_server: CallosumServer | None = None
659_callosum_thread: threading.Thread | None = None
660
661# Track whether running in remote mode (upload-only, no local processing)
662_is_remote_mode: bool = False
663_digest_submitted_this_boot = False
664
665# State for daily processing (tracks day boundary for midnight think trigger)
666_daily_state = {
667 "last_day": None, # Track which day we last processed
668}
669
670# Timeout before flushing stale segments (seconds)
671FLUSH_TIMEOUT = 3600
672
673# State for segment flush (close out dangling agent state after inactivity)
674_flush_state: dict = {
675 "last_segment_ts": 0.0, # Wall-clock time of last observe.observed event
676 "day": None, # Day of last observed segment
677 "segment": None, # Last observed segment key
678 "flushed": False, # Whether flush has already run for current segment
679}
680
681
682def _get_journal_path() -> Path:
683 return Path(get_journal())
684
685
686def is_supervisor_up() -> bool:
687 """Return True when supervisor.pid and supervisor.start_time identify a live supervisor process for the current journal."""
688 health_dir = Path(get_journal()) / "health"
689 pid_path = health_dir / "supervisor.pid"
690 try:
691 pid = int(pid_path.read_text().strip())
692 except FileNotFoundError:
693 return False
694 except (OSError, ValueError):
695 return False
696
697 try:
698 os.kill(pid, 0)
699 except ProcessLookupError:
700 return False
701 except PermissionError:
702 return False
703 except OSError:
704 return False
705
706 start_time_path = health_dir / "supervisor.start_time"
707 try:
708 recorded_start = float(start_time_path.read_text().strip())
709 except FileNotFoundError:
710 return False
711 except (OSError, ValueError):
712 return False
713
714 try:
715 create_time = psutil.Process(pid).create_time()
716 except psutil.NoSuchProcess:
717 return False
718 except psutil.Error:
719 return False
720
721 tolerance = 1.5 # drift between time.time() and psutil create_time()
722 return abs(recorded_start - create_time) <= tolerance
723
724
725class RestartPolicy:
726 """Track restart attempts and compute backoff delays."""
727
728 _SCHEDULE = (0, 1, 5)
729
730 def __init__(self) -> None:
731 self.attempts = 0
732 self.last_start = 0.0
733
734 def record_start(self) -> None:
735 self.last_start = time.time()
736
737 def reset_attempts(self) -> None:
738 self.attempts = 0
739
740 def next_delay(self) -> int:
741 delay = self._SCHEDULE[min(self.attempts, len(self._SCHEDULE) - 1)]
742 self.attempts += 1
743 return delay
744
745
746_RESTART_POLICIES: dict[str, RestartPolicy] = {}
747
748
749def _get_restart_policy(name: str) -> RestartPolicy:
750 return _RESTART_POLICIES.setdefault(name, RestartPolicy())
751
752
753def _launch_process(
754 name: str,
755 cmd: list[str],
756 *,
757 restart: bool = False,
758 shutdown_timeout: int = 15,
759 ref: str | None = None,
760) -> RunnerManagedProcess:
761 # NOTE: All child processes should include -v for verbose logging by default.
762 # This ensures their output is captured in logs for debugging.
763 """Launch process with automatic output logging and restart policy tracking."""
764 policy: RestartPolicy | None = None
765 if restart:
766 policy = _get_restart_policy(name)
767
768 # Generate ref if not provided
769 ref = ref if ref else str(now_ms())
770
771 # Use unified runner to spawn process (share supervisor's callosum)
772 try:
773 managed = RunnerManagedProcess.spawn(
774 cmd, ref=ref, callosum=_supervisor_callosum
775 )
776 except RuntimeError as exc:
777 logging.error(str(exc))
778 raise
779
780 if policy:
781 policy.record_start()
782 _SERVICE_STATE[name] = {
783 "restart": restart,
784 "shutdown_timeout": shutdown_timeout,
785 }
786
787 # Emit started event
788 if _supervisor_callosum:
789 _supervisor_callosum.emit(
790 "supervisor",
791 "started",
792 service=name,
793 pid=managed.process.pid,
794 ref=managed.ref,
795 )
796
797 return managed
798
799
800def _terminate_managed(
801 managed: RunnerManagedProcess, timeout: float, *, reason: str
802) -> None:
803 logger.info("Terminating %s for %s", managed.name, reason)
804 try:
805 managed.terminate(timeout=timeout)
806 except subprocess.TimeoutExpired:
807 logger.warning(
808 "%s did not terminate within %.1fs for %s",
809 managed.name,
810 timeout,
811 reason,
812 )
813
814
815def _start_termination_thread(
816 key: str, managed: RunnerManagedProcess, timeout: float, reason: str
817) -> None:
818 def run() -> None:
819 try:
820 _terminate_managed(managed, timeout, reason=reason)
821 finally:
822 with _termination_threads_lock:
823 if _termination_threads.get(key) is threading.current_thread():
824 _termination_threads.pop(key, None)
825
826 with _termination_threads_lock:
827 existing = _termination_threads.get(key)
828 if existing and existing.is_alive():
829 return
830
831 thread = threading.Thread(
832 target=run,
833 daemon=True,
834 name=f"terminate-{key}",
835 )
836 _termination_threads[key] = thread
837 thread.start()
838
839
840def _stop_process(managed: RunnerManagedProcess) -> None:
841 timeout = _SERVICE_STATE.get(managed.name, {}).get("shutdown_timeout", 15)
842 _terminate_managed(managed, timeout, reason="shutdown")
843 managed.cleanup()
844
845
846def _record_scheduler_completion(
847 scheduler_name: str,
848 *,
849 ended_at: float,
850 exit_status: str,
851 ref: str,
852 cmd: list[str],
853) -> None:
854 health_dir = Path(get_journal()) / "health"
855 health_dir.mkdir(parents=True, exist_ok=True)
856 state_path = health_dir / "scheduler.json"
857 with _SCHEDULER_JSON_LOCK:
858 try:
859 with open(state_path, "r", encoding="utf-8") as file:
860 state = json.load(file)
861 except FileNotFoundError:
862 state = {}
863 except (json.JSONDecodeError, OSError) as exc:
864 logger.warning(
865 "Failed to load scheduler state for completion write: %s", exc
866 )
867 state = {}
868
869 current = state.get(scheduler_name)
870 if not isinstance(current, dict):
871 current = {}
872 current.update(
873 {
874 "last_run": ended_at,
875 "last_status": exit_status,
876 "last_ref": ref,
877 }
878 )
879 state[scheduler_name] = current
880
881 fd, tmp_path = tempfile.mkstemp(
882 dir=health_dir, suffix=".tmp", prefix=".scheduler_"
883 )
884 tmp_file = Path(tmp_path)
885 try:
886 with open(fd, "w", encoding="utf-8") as file:
887 json.dump(state, file, indent=2)
888 tmp_file.replace(state_path)
889 except BaseException:
890 tmp_file.unlink(missing_ok=True)
891 raise
892
893
894def _emit_queue_event(cmd_name: str, running_ref: str, queue: list) -> None:
895 """Emit supervisor.queue event with current queue state for a command.
896
897 This is the callback passed to TaskQueue for queue change notifications.
898 """
899 if not _supervisor_callosum:
900 return
901
902 _supervisor_callosum.emit(
903 "supervisor",
904 "queue",
905 command=cmd_name,
906 running=running_ref,
907 queued=len(queue),
908 queue=queue,
909 )
910
911
912def _maybe_submit_startup_digest(*, no_cortex: bool) -> None:
913 """Submit the startup digest once when a local cortex substrate exists."""
914 global _digest_submitted_this_boot
915
916 if (
917 _digest_submitted_this_boot
918 or no_cortex
919 or _is_remote_mode
920 or _task_queue is None
921 ):
922 return
923
924 _task_queue.submit(["sol", "call", "identity", "digest"])
925 _digest_submitted_this_boot = True
926 logging.info("startup: submitted identity digest")
927
928
929def _handle_task_request(message: dict) -> None:
930 """Handle incoming task request from Callosum."""
931 if message.get("tract") != "supervisor" or message.get("event") != "request":
932 return
933
934 cmd = message.get("cmd")
935 if not cmd:
936 logging.error(f"Invalid task request: missing cmd: {message}")
937 return
938
939 ref = message.get("ref") or str(now_ms())
940 day = message.get("day")
941 scheduler_name = message.get("scheduler_name")
942 if _task_queue:
943 cmd_name = TaskQueue.get_command_name(cmd)
944 active_ref = _task_queue.get_active_by_cmd_name(cmd_name)
945 if active_ref:
946 with _task_queue._lock:
947 managed = _task_queue._active.get(active_ref)
948 cap = _task_queue._caps.get(cmd_name, 0)
949 runtime = time.time() - managed.start_time if managed else 0
950 reason = "wedged" if cap and runtime > 2 * cap else "still_running"
951 if _supervisor_callosum:
952 _supervisor_callosum.emit(
953 "supervisor",
954 "skipped",
955 reason=reason,
956 ref=ref,
957 active_ref=active_ref,
958 cmd=cmd,
959 scheduler_name=scheduler_name,
960 )
961 return
962 _task_queue.submit(cmd, ref, day=day, scheduler_name=scheduler_name)
963
964
965def _restart_service(service: str) -> bool:
966 """Terminate a managed service to trigger graceful restart.
967
968 Returns True if the service was found and running, False if not found
969 or already exited.
970 """
971 for proc in _managed_procs:
972 if proc.name == service:
973 if proc.process.poll() is not None:
974 logging.debug(
975 f"Ignoring restart for {service}: already exited, awaiting auto-restart"
976 )
977 return False
978
979 state = _SERVICE_STATE.setdefault(service, {})
980 state["restart"] = True
981 timeout = state.get("shutdown_timeout", 15)
982
983 logging.info(f"Restart requested for {service}, terminating...")
984
985 if _supervisor_callosum:
986 _supervisor_callosum.emit(
987 "supervisor",
988 "restarting",
989 service=service,
990 pid=proc.process.pid,
991 ref=proc.ref,
992 )
993
994 _start_termination_thread(service, proc, timeout=timeout, reason="restart")
995 return True
996
997 logging.warning(f"Cannot restart {service}: not found in managed processes")
998 return False
999
1000
1001def _handle_supervisor_request(message: dict) -> None:
1002 """Handle incoming supervisor control messages."""
1003 if message.get("tract") != "supervisor" or message.get("event") != "restart":
1004 return
1005
1006 service = message.get("service")
1007 if not service:
1008 logging.error("Invalid restart request: missing service")
1009 return
1010 if service == "supervisor":
1011 logging.debug("Ignoring restart request for supervisor itself")
1012 return
1013
1014 _restart_service(service)
1015
1016
1017def get_task_status(ref: str) -> dict:
1018 """Get status of a task.
1019
1020 Args:
1021 ref: Task correlation ID
1022
1023 Returns:
1024 Dict with status info, or {"status": "not_found"} if task doesn't exist
1025 """
1026 if _task_queue:
1027 return _task_queue.get_status(ref)
1028 return {"status": "not_found"}
1029
1030
1031def collect_status(procs: list[RunnerManagedProcess]) -> dict:
1032 """Collect current supervisor status for broadcasting."""
1033 now = time.time()
1034
1035 # Running services
1036 services = []
1037 running_names = set()
1038 for proc in procs:
1039 if proc.process.poll() is None: # Still running
1040 policy = _get_restart_policy(proc.name)
1041 uptime = int(now - policy.last_start) if policy.last_start else 0
1042 services.append(
1043 {
1044 "name": proc.name,
1045 "ref": proc.ref,
1046 "pid": proc.process.pid,
1047 "uptime_seconds": uptime,
1048 }
1049 )
1050 running_names.add(proc.name)
1051
1052 # Prepend supervisor itself
1053 if _supervisor_ref and _supervisor_start:
1054 services.insert(
1055 0,
1056 {
1057 "name": "supervisor",
1058 "ref": _supervisor_ref,
1059 "pid": os.getpid(),
1060 "uptime_seconds": int(now - _supervisor_start),
1061 },
1062 )
1063
1064 # Crashed services (in restart backoff)
1065 crashed = []
1066 for name, policy in _RESTART_POLICIES.items():
1067 if name not in running_names and policy.attempts > 0:
1068 crashed.append(
1069 {
1070 "name": name,
1071 "restart_attempts": policy.attempts,
1072 }
1073 )
1074
1075 # Running tasks
1076 tasks = _task_queue.collect_task_status() if _task_queue else []
1077 queues = _task_queue.collect_queue_counts() if _task_queue else {}
1078
1079 # Scheduled tasks
1080 schedules = scheduler.collect_status()
1081 # Connected callosum clients
1082 callosum_clients = _callosum_server.client_count() if _callosum_server else 0
1083
1084 return {
1085 "services": services,
1086 "crashed": crashed,
1087 "tasks": tasks,
1088 "queues": queues,
1089 "stale_heartbeats": [],
1090 "schedules": schedules,
1091 "callosum_clients": callosum_clients,
1092 }
1093
1094
1095def start_sense() -> RunnerManagedProcess:
1096 """Launch sol sense with output logging."""
1097 return _launch_process("sense", ["sol", "sense", "-v"], restart=True)
1098
1099
1100def start_callosum_in_process() -> CallosumServer:
1101 """Start Callosum message bus server in-process.
1102
1103 Runs the server in a background thread and waits for socket to be ready.
1104
1105 Returns:
1106 CallosumServer instance
1107 """
1108 global _callosum_server, _callosum_thread
1109
1110 server = CallosumServer()
1111 _callosum_server = server
1112
1113 # Pre-delete stale socket to avoid race condition where the ready check
1114 # passes due to an old socket file before the server thread deletes it
1115 socket_path = server.socket_path
1116 socket_path.parent.mkdir(parents=True, exist_ok=True)
1117 if socket_path.exists():
1118 socket_path.unlink()
1119
1120 # Start server in background thread (server.start() is blocking)
1121 thread = threading.Thread(target=server.start, daemon=False, name="callosum-server")
1122 thread.start()
1123 _callosum_thread = thread
1124
1125 # Wait for socket to be ready (with timeout)
1126 for _ in range(50): # Wait up to 500ms
1127 if socket_path.exists():
1128 logging.info(f"Callosum server started on {socket_path}")
1129 return server
1130 time.sleep(0.01)
1131
1132 raise RuntimeError("Callosum server failed to create socket within 500ms")
1133
1134
1135def wait_for_convey_ready(
1136 convey_mp, *, timeout: float = 30.0, interval: float = 0.1
1137) -> bool:
1138 """Poll until Convey accepts TCP connections, or fail fast on death/timeout."""
1139 start = time.monotonic()
1140 deadline = start + timeout
1141 while time.monotonic() < deadline:
1142 rc = convey_mp.process.poll()
1143 if rc is not None:
1144 logging.error(
1145 "Convey process exited during startup (rc=%d); continuing into supervise loop",
1146 rc,
1147 )
1148 return False
1149 if is_solstone_up(timeout=0.1):
1150 logging.info("Convey ready after %.1fs", time.monotonic() - start)
1151 return True
1152 time.sleep(interval)
1153 alive = convey_mp.process.poll() is None
1154 logging.error(
1155 "Convey not ready after %.1fs (port=%s, pid alive=%s); continuing into supervise loop",
1156 time.monotonic() - start,
1157 read_service_port("convey"),
1158 alive,
1159 )
1160 return False
1161
1162
1163def stop_callosum_in_process() -> None:
1164 """Stop the in-process Callosum server."""
1165 global _callosum_server, _callosum_thread
1166
1167 if _callosum_server:
1168 logging.info("Stopping Callosum server...")
1169 _callosum_server.stop()
1170
1171 if _callosum_thread:
1172 _callosum_thread.join(timeout=5)
1173 if _callosum_thread.is_alive():
1174 logging.warning("Callosum server thread did not stop cleanly")
1175
1176 _callosum_server = None
1177 _callosum_thread = None
1178
1179
1180def start_cortex_server() -> RunnerManagedProcess:
1181 """Launch the Cortex WebSocket API server."""
1182 cmd = ["sol", "cortex", "-v"]
1183 return _launch_process("cortex", cmd, restart=True)
1184
1185
1186def start_link_server() -> RunnerManagedProcess:
1187 """Launch the link tunnel service (spl home-side endpoint)."""
1188 cmd = ["sol", "link", "-v"]
1189 return _launch_process("link", cmd, restart=True)
1190
1191
1192def start_convey_server(
1193 verbose: bool, debug: bool = False, port: int = 0
1194) -> tuple[RunnerManagedProcess, int]:
1195 """Launch the Convey web application with optional verbose and debug logging.
1196
1197 Returns:
1198 Tuple of (RunnerManagedProcess, resolved_port) where resolved_port is the
1199 actual port being used (auto-selected if port was 0).
1200 """
1201 # Resolve port 0 to an available port before launching
1202 resolved_port = port if port != 0 else find_available_port()
1203
1204 cmd = ["sol", "convey", "--port", str(resolved_port)]
1205 if debug:
1206 cmd.append("-d")
1207 elif verbose:
1208 cmd.append("-v")
1209 return _launch_process("convey", cmd, restart=True), resolved_port
1210
1211
1212def check_runner_exits(
1213 procs: list[RunnerManagedProcess],
1214) -> list[RunnerManagedProcess]:
1215 """Return managed processes that have exited."""
1216
1217 exited: list[RunnerManagedProcess] = []
1218 for managed in procs:
1219 if managed.process.poll() is not None:
1220 exited.append(managed)
1221 return exited
1222
1223
1224async def handle_runner_exits(procs: list[RunnerManagedProcess]) -> None:
1225 """Check for and handle exited processes with restart policy."""
1226 exited = check_runner_exits(procs)
1227 if not exited:
1228 return
1229
1230 exited_names = [managed.name for managed in exited]
1231
1232 # Check if all exits are tempfail (session not ready)
1233 all_tempfail = all(m.process.returncode == EXIT_TEMPFAIL for m in exited)
1234
1235 if all_tempfail:
1236 logging.info("Runner waiting for session: %s", ", ".join(sorted(exited_names)))
1237 else:
1238 msg = f"Runner process exited: {', '.join(sorted(exited_names))}"
1239 logging.error(msg)
1240
1241 for managed in exited:
1242 returncode = managed.process.returncode
1243 is_tempfail = returncode == EXIT_TEMPFAIL
1244 logging.info("%s exited with code %s", managed.name, returncode)
1245
1246 # Emit stopped event
1247 if _supervisor_callosum:
1248 _supervisor_callosum.emit(
1249 "supervisor",
1250 "stopped",
1251 service=managed.name,
1252 pid=managed.process.pid,
1253 ref=managed.ref,
1254 exit_code=returncode,
1255 )
1256
1257 # Remove from procs list
1258 try:
1259 procs.remove(managed)
1260 except ValueError:
1261 pass
1262
1263 managed.cleanup()
1264
1265 # Handle restart if needed
1266 restart = _SERVICE_STATE.get(managed.name, {}).get("restart", False)
1267 if restart and not shutdown_requested:
1268 # Tempfail: use fixed longer delay, don't burn through backoff
1269 if is_tempfail:
1270 delay = TEMPFAIL_DELAY
1271 else:
1272 policy = _get_restart_policy(managed.name)
1273 uptime = time.time() - policy.last_start if policy.last_start else 0
1274 if uptime >= 60:
1275 policy.reset_attempts()
1276 delay = policy.next_delay()
1277 if delay:
1278 logging.info("Waiting %ss before restarting %s", delay, managed.name)
1279 for _ in range(delay):
1280 if shutdown_requested:
1281 break
1282 await asyncio.sleep(1)
1283 if shutdown_requested:
1284 continue
1285 logging.info("Restarting %s...", managed.name)
1286 try:
1287 state = _SERVICE_STATE.get(managed.name, {})
1288 new_proc = _launch_process(
1289 managed.name,
1290 managed.cmd,
1291 restart=True,
1292 shutdown_timeout=state.get("shutdown_timeout", 15),
1293 )
1294 except Exception as exc:
1295 logging.exception("Failed to restart %s: %s", managed.name, exc)
1296 continue
1297
1298 procs.append(new_proc)
1299 logging.info("Restarted %s after exit code %s", managed.name, returncode)
1300 else:
1301 logging.info("Not restarting %s", managed.name)
1302
1303
1304def handle_daily_tasks() -> None:
1305 """Check for day change and submit daily think for updated days (non-blocking).
1306
1307 Triggers once when the day rolls over at midnight. Queries ``updated_days()``
1308 for journal days that have new stream data but haven't completed a daily
1309 think yet, then submits up to ``MAX_UPDATED_CATCHUP`` thinks in chronological
1310 order (oldest first, yesterday last) via the TaskQueue.
1311
1312 Think auto-detects updated state and enables ``--refresh`` internally, so we
1313 don't pass it here.
1314
1315 Skipped in remote mode (no local data to process).
1316 """
1317 # Remote mode: no local processing, data is on the server
1318 if _is_remote_mode:
1319 return
1320
1321 today = datetime.now().date()
1322
1323 # Only trigger when day actually changes (at midnight)
1324 if today != _daily_state["last_day"]:
1325 # The day that just ended is what we process
1326 prev_day = _daily_state["last_day"]
1327
1328 # Guard against None (e.g., module reloaded without going through main())
1329 if prev_day is None:
1330 logging.warning("Daily state not initialized, skipping daily processing")
1331 _daily_state["last_day"] = today
1332 return
1333
1334 prev_day_str = prev_day.strftime("%Y%m%d")
1335
1336 # Update state for new day
1337 _daily_state["last_day"] = today
1338
1339 # Flush any dangling segment state from the previous day before daily think
1340 if not _flush_state["flushed"] and _flush_state["day"] == prev_day_str:
1341 _check_segment_flush(force=True)
1342
1343 today_str = today.strftime("%Y%m%d")
1344 all_updated = updated_days(exclude={today_str})
1345
1346 if not all_updated:
1347 logging.info("Day changed to %s, no updated days to process", today)
1348 return
1349
1350 # Take the newest MAX_UPDATED_CATCHUP days (already sorted ascending)
1351 days_to_process = all_updated[-MAX_UPDATED_CATCHUP:]
1352 skipped = len(all_updated) - len(days_to_process)
1353
1354 if skipped:
1355 logging.warning(
1356 "Skipping %d older updated days (max catchup %d): %s",
1357 skipped,
1358 MAX_UPDATED_CATCHUP,
1359 all_updated[:skipped],
1360 )
1361
1362 logging.info(
1363 "Day changed to %s, queuing daily think for %d updated day(s): %s",
1364 today,
1365 len(days_to_process),
1366 days_to_process,
1367 )
1368
1369 # Submit oldest-first so yesterday is processed last
1370 for day_str in days_to_process:
1371 cmd = ["sol", "think", "-v", "--day", day_str]
1372 if _task_queue:
1373 _task_queue.submit(cmd, day=day_str)
1374 logging.debug("Submitted daily think for %s", day_str)
1375 else:
1376 logging.warning(
1377 "No task queue available for daily processing: %s", day_str
1378 )
1379
1380
1381def _handle_segment_observed(message: dict) -> None:
1382 """Handle segment completion events (from live observation or imports).
1383
1384 Submits sol think in segment mode via task queue, which handles both
1385 generators and segment agents. Also updates flush state to track
1386 segment recency.
1387 """
1388 if message.get("tract") != "observe" or message.get("event") != "observed":
1389 return
1390
1391 segment = message.get("segment") # e.g., "163045_300"
1392 if not segment:
1393 logging.warning("observed event missing segment field")
1394 return
1395
1396 # Use day from event payload, fallback to today (for live observation)
1397 day = message.get("day") or datetime.now().strftime("%Y%m%d")
1398 stream = message.get("stream")
1399
1400 # Update flush state — new segment resets the flush timer
1401 _flush_state["last_segment_ts"] = time.time()
1402 _flush_state["day"] = day
1403 _flush_state["segment"] = segment
1404 _flush_state["stream"] = stream
1405 _flush_state["flushed"] = False
1406
1407 logging.info(f"Segment observed: {day}/{segment}, submitting processing...")
1408
1409 # Submit via task queue — serializes with other think invocations
1410 cmd = ["sol", "think", "-v", "--day", day, "--segment", segment]
1411 if stream:
1412 cmd.extend(["--stream", stream])
1413 if _task_queue:
1414 _task_queue.submit(cmd, day=day)
1415 else:
1416 logging.warning(
1417 "No task queue available for segment processing: %s/%s", day, segment
1418 )
1419
1420
1421def _check_segment_flush(force: bool = False) -> None:
1422 """Check if the last observed segment needs flushing.
1423
1424 If no new segments have arrived within FLUSH_TIMEOUT seconds, runs
1425 ``sol think --flush`` on the last segment to let flush-enabled agents
1426 close out dangling state (e.g., end active activities).
1427
1428 Args:
1429 force: Skip timeout check (used at day boundary to flush
1430 before daily think regardless of elapsed time).
1431
1432 Skipped in remote mode (no local processing).
1433 """
1434 if _is_remote_mode:
1435 return
1436
1437 last_ts = _flush_state["last_segment_ts"]
1438 if not last_ts or _flush_state["flushed"]:
1439 return
1440
1441 if not force and time.time() - last_ts < FLUSH_TIMEOUT:
1442 return
1443
1444 day = _flush_state["day"]
1445 segment = _flush_state["segment"]
1446 if not day or not segment:
1447 return
1448
1449 _flush_state["flushed"] = True
1450
1451 stream = _flush_state.get("stream")
1452 cmd = ["sol", "think", "-v", "--day", day, "--segment", segment, "--flush"]
1453 if stream:
1454 cmd.extend(["--stream", stream])
1455 if _task_queue:
1456 _task_queue.submit(cmd, day=day)
1457 logging.info(f"Queued segment flush: {day}/{segment}")
1458 else:
1459 logging.warning(
1460 "No task queue available for segment flush: %s/%s", day, segment
1461 )
1462
1463
1464def _handle_segment_event_log(message: dict) -> None:
1465 """Log observe, think, and activity events with day+segment to segment/events.jsonl.
1466
1467 Any observe, think, or activity tract message with both day and segment fields
1468 gets logged to journal/day/segment/events.jsonl if that directory exists.
1469 """
1470 if message.get("tract") not in {"observe", "think", "activity"}:
1471 return
1472
1473 day = message.get("day")
1474 segment = message.get("segment")
1475
1476 if not day or not segment:
1477 return
1478
1479 stream = message.get("stream")
1480
1481 try:
1482 if stream:
1483 segment_dir = day_path(day, create=False) / stream / segment
1484 else:
1485 segment_dir = day_path(day, create=False) / segment
1486
1487 # Only log if segment directory exists
1488 if not segment_dir.is_dir():
1489 return
1490
1491 events_file = segment_dir / "events.jsonl"
1492
1493 # Append event as JSON line
1494 with open(events_file, "a", encoding="utf-8") as f:
1495 f.write(json.dumps(message, ensure_ascii=False) + "\n")
1496
1497 except Exception as e:
1498 logging.debug(f"Failed to log segment event: {e}")
1499
1500
1501def _handle_activity_recorded(message: dict) -> None:
1502 """Queue a per-activity think task when an activity is recorded.
1503
1504 Listens for activity.recorded events and submits a queued think task
1505 for per-activity agent processing (serialized via TaskQueue).
1506 """
1507 if message.get("tract") != "activity" or message.get("event") != "recorded":
1508 return
1509
1510 record_id = message.get("id")
1511 facet = message.get("facet")
1512 day = message.get("day")
1513
1514 if not record_id or not facet or not day:
1515 logging.warning("activity.recorded event missing required fields")
1516 return
1517
1518 cmd = ["sol", "think", "--activity", record_id, "--facet", facet, "--day", day]
1519
1520 if _task_queue:
1521 _task_queue.submit(cmd, day=day)
1522 logging.info(f"Queued activity think: {record_id} for #{facet}")
1523 else:
1524 logging.warning("No task queue available for activity think: %s", record_id)
1525
1526
1527def _handle_think_daily_complete(message: dict) -> None:
1528 """Submit a heartbeat task after daily think processing completes.
1529
1530 Listens for think.daily_complete events. Skips if a heartbeat process
1531 is already running (PID file guard).
1532 """
1533 if message.get("tract") != "think" or message.get("event") != "daily_complete":
1534 return
1535
1536 # Check if heartbeat is already running via PID file
1537 pid_file = Path(get_journal()) / "health" / "heartbeat.pid"
1538 if pid_file.exists():
1539 try:
1540 existing_pid = int(pid_file.read_text().strip())
1541 os.kill(existing_pid, 0)
1542 logging.info("Heartbeat already running (pid=%d), skipping", existing_pid)
1543 return
1544 except ProcessLookupError:
1545 pass # Stale PID file, proceed
1546 except PermissionError:
1547 logging.info(
1548 "Heartbeat running under different user (pid file exists), skipping"
1549 )
1550 return
1551 except ValueError:
1552 pass # Corrupt PID file, proceed
1553
1554 cmd = ["sol", "heartbeat"]
1555 if _task_queue:
1556 _task_queue.submit(cmd)
1557 logging.info("Queued heartbeat after daily think completion")
1558 else:
1559 logging.warning("No task queue available for heartbeat submission")
1560
1561
1562def _handle_callosum_message(message: dict) -> None:
1563 """Dispatch incoming Callosum messages to appropriate handlers."""
1564 _handle_task_request(message)
1565 _handle_supervisor_request(message)
1566 _handle_segment_observed(message)
1567 _handle_activity_recorded(message)
1568 _handle_think_daily_complete(message)
1569 _handle_segment_event_log(message)
1570
1571
1572async def supervise(
1573 *,
1574 daily: bool = True,
1575 schedule: bool = True,
1576 procs: list[RunnerManagedProcess] | None = None,
1577) -> None:
1578 """Main supervision loop. Runs at 1-second intervals for responsiveness.
1579
1580 Monitors runner health, emits status, triggers daily processing,
1581 and checks scheduled agents.
1582 """
1583 last_status_emit = 0.0
1584
1585 try:
1586 while (
1587 not shutdown_requested
1588 ): # pragma: no cover - loop checked via unit tests by patching
1589 if _task_queue:
1590 _task_queue.enforce_deadlines(time.time())
1591
1592 # Check for runner exits first (immediate alert)
1593 if procs:
1594 await handle_runner_exits(procs)
1595
1596 # Emit status every 5 seconds
1597 now = time.time()
1598 if now - last_status_emit >= 5:
1599 if _supervisor_callosum and procs:
1600 try:
1601 status = collect_status(procs)
1602 _supervisor_callosum.emit("supervisor", "status", **status)
1603 except Exception as e:
1604 logging.debug(f"Status emission failed: {e}")
1605 last_status_emit = now
1606
1607 # Check for segment flush (non-blocking, submits via task queue)
1608 _check_segment_flush()
1609
1610 # Check for daily processing (non-blocking, submits via task queue)
1611 if daily:
1612 handle_daily_tasks()
1613
1614 # Check periodic task schedules (non-blocking, submits via callosum)
1615 if schedule:
1616 scheduler.check()
1617 routines.check()
1618
1619 # Sleep 1 second before next iteration (responsive to shutdown)
1620 await asyncio.sleep(1)
1621 finally:
1622 pass # Callosum cleanup happens in main()
1623
1624
1625def parse_args() -> argparse.ArgumentParser:
1626 parser = SupervisorArgumentParser(description="Monitor journaling health")
1627 parser.add_argument(
1628 "port",
1629 nargs="?",
1630 type=int,
1631 default=0,
1632 help="Convey port (0 = auto-select available port)",
1633 )
1634 parser.add_argument(
1635 "--threshold",
1636 type=int,
1637 default=DEFAULT_THRESHOLD,
1638 help="Seconds before heartbeat considered stale",
1639 )
1640 parser.add_argument(
1641 "--interval", type=int, default=CHECK_INTERVAL, help="Polling interval seconds"
1642 )
1643 parser.add_argument(
1644 "--no-daily",
1645 action="store_true",
1646 help="Disable daily processing run at midnight",
1647 )
1648 parser.add_argument(
1649 "--no-cortex",
1650 action="store_true",
1651 help="Do not start the Cortex server (run it manually for debugging)",
1652 )
1653 parser.add_argument(
1654 "--no-link",
1655 action="store_true",
1656 help="Do not start the link tunnel service",
1657 )
1658 parser.add_argument(
1659 "--no-convey",
1660 action="store_true",
1661 help="Do not start the Convey web application",
1662 )
1663 parser.add_argument(
1664 "--no-schedule",
1665 action="store_true",
1666 help="Disable periodic task scheduler",
1667 )
1668 parser.add_argument(
1669 "--remote",
1670 type=str,
1671 help="Remote mode: URL for segment transfer (not yet implemented)",
1672 )
1673 return parser
1674
1675
1676def handle_shutdown(signum, frame):
1677 """Handle shutdown signals gracefully."""
1678 global shutdown_requested
1679 if not shutdown_requested: # Only log once
1680 shutdown_requested = True
1681 logging.info("Shutdown requested, cleaning up...")
1682 raise KeyboardInterrupt
1683
1684
1685def main() -> None:
1686 parser = parse_args()
1687
1688 # Capture journal info BEFORE setup_cli() loads .env and pollutes os.environ
1689 journal_info = get_journal_info()
1690
1691 args = setup_cli(parser)
1692
1693 journal_path = _get_journal_path()
1694
1695 log_level = logging.DEBUG if args.debug else logging.INFO
1696 log_path = journal_path / "health" / "supervisor.log"
1697 log_path.parent.mkdir(parents=True, exist_ok=True)
1698 logging.getLogger().handlers = []
1699 logging.basicConfig(
1700 level=log_level,
1701 handlers=[logging.FileHandler(log_path, encoding="utf-8")],
1702 format="%(asctime)s [supervisor:log] %(levelname)s %(message)s",
1703 datefmt="%Y-%m-%dT%H:%M:%S",
1704 )
1705
1706 if args.verbose or args.debug:
1707 console_handler = logging.StreamHandler()
1708 console_handler.setLevel(log_level)
1709 console_handler.setFormatter(
1710 logging.Formatter("%(asctime)s %(levelname)s %(message)s")
1711 )
1712 logging.getLogger().addHandler(console_handler)
1713
1714 # Singleton guard: only one supervisor per journal
1715 health_dir = journal_path / "health"
1716 lock_path = health_dir / "supervisor.lock"
1717 pid_path = health_dir / "supervisor.pid"
1718 import fcntl
1719
1720 lock_fd = open(lock_path, "w")
1721 try:
1722 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
1723 except OSError:
1724 lock_fd.close()
1725 pid_str = ""
1726 try:
1727 pid_str = pid_path.read_text().strip()
1728 except OSError:
1729 pass
1730 pid_msg = f" (PID {pid_str})" if pid_str else ""
1731 sock_path = health_dir / "callosum.sock"
1732 if sock_path.exists():
1733 try:
1734 from think.health_cli import health_check
1735
1736 print(f"Supervisor already running{pid_msg}\n")
1737 health_check()
1738 except Exception:
1739 print(f"Supervisor already running{pid_msg}")
1740 else:
1741 print(f"Supervisor already running{pid_msg}")
1742 sys.exit(1)
1743 pid_path.write_text(str(os.getpid()))
1744 start_time_path = health_dir / "supervisor.start_time"
1745 # Written here, not at _supervisor_start, to minimize drift from psutil create_time().
1746 start_time_path.write_text(str(time.time()))
1747 logging.info("Singleton lock acquired (PID %d)", os.getpid())
1748 _sweep_cgroup_at_startup()
1749
1750 # Set up signal handlers
1751 signal.signal(signal.SIGINT, handle_shutdown)
1752 signal.signal(signal.SIGTERM, handle_shutdown)
1753
1754 # Show journal path and source on startup
1755 path, source = journal_info
1756 print(f"Journal: {path} (from {source})")
1757 logging.info("Supervisor starting...")
1758
1759 global _managed_procs, _supervisor_callosum, _is_remote_mode
1760 global _digest_submitted_this_boot
1761 global _task_queue
1762 procs: list[RunnerManagedProcess] = []
1763 convey_port = None
1764
1765 # Remote mode: run sync instead of local processing
1766 _is_remote_mode = bool(args.remote)
1767 _digest_submitted_this_boot = False
1768
1769 # Run pending journal-maintenance tasks before spawning any writer children.
1770 # Callosum isn't up yet (emit_fn=None); migrations log through supervisor's logger only.
1771 try:
1772 ran, succeeded = run_pending_tasks(journal_path, emit_fn=None)
1773 if ran > 0:
1774 print(f" Ran {ran} maintenance task(s)", flush=True)
1775 if ran == succeeded:
1776 logging.info("Completed %d/%d maintenance task(s)", succeeded, ran)
1777 else:
1778 logging.error(
1779 "Maintenance tasks completed with failures: %d/%d succeeded",
1780 succeeded,
1781 ran,
1782 )
1783 except Exception:
1784 logging.exception("Maintenance runner raised; continuing startup")
1785
1786 try:
1787 from think.importers.journal_archive import sweep_stale_extract_dirs
1788
1789 swept = sweep_stale_extract_dirs()
1790 if swept > 0:
1791 logging.info("Swept %d stale journal-archive extract dir(s)", swept)
1792 except Exception:
1793 logging.exception("Journal archive extract sweep raised; continuing startup")
1794
1795 # Start Callosum in-process first - it's the message bus that other services depend on
1796 try:
1797 print(" Starting Callosum bus...", flush=True)
1798 start_callosum_in_process()
1799 except RuntimeError as e:
1800 logging.error(f"Failed to start Callosum server: {e}")
1801 parser.error(f"Failed to start Callosum server: {e}")
1802 return
1803
1804 # Connect supervisor's Callosum client to capture startup events from other services
1805 try:
1806 _supervisor_callosum = CallosumConnection(defaults={"rev": get_rev()})
1807 _supervisor_callosum.start(callback=_handle_callosum_message)
1808 logging.info("Supervisor connected to Callosum")
1809 except Exception as e:
1810 logging.warning(f"Failed to start Callosum connection: {e}")
1811
1812 # Mirror supervisor log output to callosum logs tract (best-effort)
1813 supervisor_ref = str(now_ms())
1814 global _supervisor_ref, _supervisor_start
1815 _supervisor_ref = supervisor_ref
1816 _supervisor_start = time.time()
1817 if _supervisor_callosum:
1818 try:
1819 handler = CallosumLogHandler(_supervisor_callosum, supervisor_ref)
1820 handler.setFormatter(
1821 logging.Formatter("%(asctime)s %(levelname)s %(message)s")
1822 )
1823 logging.getLogger().addHandler(handler)
1824 except Exception:
1825 pass
1826
1827 # Initialize task queue with callosum event callback
1828 _task_queue = TaskQueue(on_queue_change=_emit_queue_event, ready=False)
1829
1830 # Now start other services (their startup events will be captured)
1831 if _is_remote_mode:
1832 # Remote mode: transfer send will be added here
1833 pass
1834 else:
1835 # Local mode: convey first, then sense for file processing
1836 os.environ["SOL_SUPERVISOR_SPAWNED"] = "1"
1837 if not args.no_convey:
1838 print(f" Starting convey on port {args.port}...", flush=True)
1839 proc, convey_port = start_convey_server(
1840 verbose=args.verbose, debug=args.debug, port=args.port
1841 )
1842 procs.append(proc)
1843 wait_for_convey_ready(proc)
1844 print(" Convey ready", flush=True)
1845 # Sense handles file processing
1846 print(" Starting sense...", flush=True)
1847 procs.append(start_sense())
1848 # Cortex for agent execution
1849 if not args.no_cortex:
1850 print(" Starting cortex...", flush=True)
1851 procs.append(start_cortex_server())
1852 # Link tunnel service (opt-out via --no-link)
1853 if not args.no_link:
1854 print(" Starting link...", flush=True)
1855 procs.append(start_link_server())
1856
1857 # Make procs accessible to restart handler
1858 _managed_procs = procs
1859
1860 # Initialize daily state to today - think only triggers at midnight when day changes
1861 _daily_state["last_day"] = datetime.now().date()
1862
1863 # Initialize periodic task scheduler
1864 schedule_enabled = not args.no_schedule and not _is_remote_mode
1865 if schedule_enabled and _supervisor_callosum:
1866 scheduler.init(_supervisor_callosum)
1867 scheduler.register_defaults()
1868 if _task_queue:
1869 for cmd, seconds in scheduler.collect_runtime_caps():
1870 cmd_name = TaskQueue.get_command_name(cmd)
1871 _task_queue.set_cap(cmd_name, seconds)
1872 logging.info(
1873 "Registered max_runtime cap for %s: %ss",
1874 cmd_name,
1875 seconds,
1876 )
1877 routines.init(_supervisor_callosum)
1878
1879 if _task_queue:
1880 _task_queue.set_ready()
1881 _maybe_submit_startup_digest(no_cortex=args.no_cortex)
1882
1883 # Show Convey URL if running
1884 if convey_port:
1885 print(f"Convey: http://localhost:{convey_port}/")
1886
1887 logging.info(f"Started {len(procs)} processes, entering supervision loop")
1888 daily_enabled = not args.no_daily and not _is_remote_mode
1889 if daily_enabled:
1890 logging.info("Daily processing scheduled for midnight")
1891
1892 # Startup catchup: submit thinks for days with pending stream data
1893 if daily_enabled:
1894 all_updated = updated_days()
1895 if all_updated:
1896 days_to_process = all_updated[-MAX_UPDATED_CATCHUP:]
1897 skipped = len(all_updated) - len(days_to_process)
1898
1899 if skipped:
1900 logging.warning(
1901 "Startup catchup: skipping %d older updated days (max %d): %s",
1902 skipped,
1903 MAX_UPDATED_CATCHUP,
1904 all_updated[:skipped],
1905 )
1906
1907 logging.info(
1908 "Startup catchup: submitted %d day(s) with pending stream data: %s",
1909 len(days_to_process),
1910 days_to_process,
1911 )
1912
1913 for day_str in days_to_process:
1914 cmd = ["sol", "think", "-v", "--day", day_str]
1915 if _task_queue:
1916 _task_queue.submit(cmd, day=day_str)
1917 logging.debug("Startup catchup: submitted think for %s", day_str)
1918 else:
1919 logging.warning(
1920 "No task queue available for startup catchup: %s", day_str
1921 )
1922
1923 try:
1924 print(" Supervisor ready", flush=True)
1925 asyncio.run(
1926 supervise(
1927 daily=daily_enabled,
1928 schedule=schedule_enabled,
1929 procs=procs if procs else None,
1930 )
1931 )
1932 except KeyboardInterrupt:
1933 logging.info("Caught KeyboardInterrupt, shutting down...")
1934 finally:
1935 logging.info("Stopping all processes...")
1936 print("\nShutting down gracefully (this may take a moment)...", flush=True)
1937
1938 if _task_queue:
1939 _task_queue.shutdown(timeout=10)
1940
1941 # Stop services in reverse order
1942 for managed in reversed(procs):
1943 _stop_process(managed)
1944
1945 if schedule_enabled:
1946 try:
1947 routines.save_state()
1948 except Exception as exc:
1949 logging.warning("Failed to save routines state on shutdown: %s", exc)
1950
1951 # Disconnect supervisor's Callosum connection
1952 if _supervisor_callosum:
1953 _supervisor_callosum.stop()
1954 logging.info("Supervisor disconnected from Callosum")
1955
1956 # Stop in-process Callosum server last
1957 stop_callosum_in_process()
1958
1959 logging.info("Supervisor shutdown complete.")
1960 print("Shutdown complete.", flush=True)
1961
1962
1963if __name__ == "__main__":
1964 main()