personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""CLI for service health status and logs.
5
6Usage:
7 sol health Show current service health status
8 sol health logs View service health logs
9"""
10
11from __future__ import annotations
12
13import argparse
14import sys
15import threading
16from datetime import timedelta
17from pathlib import Path
18from typing import Any
19
20from think.callosum import CallosumConnection
21from think.utils import get_journal, setup_cli
22
23STATUS_TIMEOUT = 10
24
25
26def format_uptime(seconds: int) -> str:
27 """Format uptime in human-readable format."""
28 if seconds < 60:
29 return f"{seconds}s"
30
31 delta = timedelta(seconds=seconds)
32 parts = []
33 if delta.days:
34 parts.append(f"{delta.days}d")
35
36 hours = delta.seconds // 3600
37 if hours:
38 parts.append(f"{hours}h")
39
40 mins = (delta.seconds % 3600) // 60
41 if mins:
42 parts.append(f"{mins}m")
43
44 return " ".join(parts)
45
46
47def print_status(status: dict[str, Any]) -> None:
48 """Print supervisor status in a human-readable format."""
49 print("Services:")
50 for service in status.get("services", []):
51 name = service.get("name", "?")
52 pid = service.get("pid", "?")
53 uptime_seconds = int(service.get("uptime_seconds", 0) or 0)
54 print(f" {name:16} pid {pid} uptime {format_uptime(uptime_seconds)}")
55
56 crashed = status.get("crashed") or []
57 if crashed:
58 print()
59 print("Crashed:")
60 for service in crashed:
61 name = service.get("name", "?")
62 attempts = service.get("restart_attempts", 0)
63 print(f" {name:16} {attempts} restart attempts")
64
65 print()
66 tasks = status.get("tasks") or []
67 queues = status.get("queues") or {}
68 non_zero_queues = [(name, count) for name, count in sorted(queues.items()) if count]
69
70 if tasks:
71 print("Tasks:")
72 for task in tasks:
73 name = task.get("name", "?")
74 duration = task.get("duration_seconds", 0)
75 print(f" {name:16} {duration}s")
76 for name, count in non_zero_queues:
77 print(f" queued {name:9} {count}")
78 elif non_zero_queues:
79 print("Tasks:")
80 for name, count in non_zero_queues:
81 print(f" queued {name:9} {count}")
82 else:
83 print("Tasks: none")
84
85 stale = status.get("stale_heartbeats") or []
86 if stale:
87 print()
88 print(f"Heartbeat: STALE ({', '.join(stale)})")
89 else:
90 print("Heartbeat: ok")
91 callosum_clients = status.get("callosum_clients", 0)
92 print(f"Callosum: {callosum_clients} clients")
93
94
95def health_check() -> int:
96 """Request and print one-shot supervisor status."""
97 sock_path = Path(get_journal()) / "health" / "callosum.sock"
98 if not sock_path.exists():
99 print(
100 f"Cannot connect: callosum socket not found at {sock_path}",
101 file=sys.stderr,
102 )
103 return 1
104
105 status_event = threading.Event()
106 status_holder: dict[str, dict[str, Any]] = {}
107
108 def callback(msg: dict[str, Any]) -> None:
109 if msg.get("tract") == "supervisor" and msg.get("event") == "status":
110 status_holder["data"] = msg
111 status_event.set()
112
113 conn = CallosumConnection(socket_path=sock_path)
114 conn.start(callback=callback)
115 try:
116 got_status = status_event.wait(timeout=STATUS_TIMEOUT)
117 finally:
118 conn.stop()
119
120 if not got_status:
121 print(
122 f"Timed out waiting for supervisor status ({STATUS_TIMEOUT:g}s)",
123 file=sys.stderr,
124 )
125 return 1
126
127 print_status(status_holder["data"])
128 return 0
129
130
131def main() -> None:
132 """Entry point for ``sol health``."""
133 args = sys.argv[1:]
134 if args and args[0] == "logs":
135 sys.argv = ["sol health logs"] + args[1:]
136 from think.logs_cli import main as logs_main
137
138 logs_main()
139 return
140
141 parser = argparse.ArgumentParser(
142 prog="sol health",
143 description=(
144 "Show service health status.\n\n"
145 "Subcommands:\n"
146 " logs View service health logs (sol health logs -h for details)"
147 ),
148 formatter_class=argparse.RawDescriptionHelpFormatter,
149 )
150 setup_cli(parser)
151 sys.exit(health_check())
152
153
154if __name__ == "__main__":
155 main()