personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Replace filesystem-based health monitoring with Callosum events

Replace the .up file heartbeat mechanism with health derivation from
observe.status Callosum events. The supervisor now tracks the last
status event and derives health signals:

- hear: healthy if status received within threshold (audio always runs)
- see: healthy if user idle OR (recording AND files_growing)

Changes:
- Add files_growing field to observe.status screencast info
- Add _observe_status_state tracking in supervisor
- Rewrite check_health() to use in-memory state vs filesystem
- Remove touch_health() function and all callers
- Update tests for new message-based health checking
- Update docs (DOCTOR.md, CALLOSUM.md, doctor.txt agent prompt)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+158 -82
+2
docs/CALLOSUM.md
··· 58 58 **Fields:** 59 59 - `status`: Periodic state (every 5s while running) 60 60 - From `observer.py`: `screencast`, `audio`, `activity` - Live capture state 61 + - `screencast.files_growing` - Whether recording files are actively being written (used for health) 61 62 - From `sense.py`: `describe`, `transcribe` - Processing pipeline state (with `running`/`queued` sub-fields) 62 63 - `observing`: `segment`, `files` - Recording window boundary crossed with saved files 63 64 - `detected`: `file`, `handler`, `ref` - File detected and handler spawned 64 65 - `described`/`transcribed`: `input`, `output`, `duration_ms` - Processing complete 65 66 - `observed`: `segment`, `duration` - All files for segment fully processed 66 67 **Purpose:** Track observation pipeline from live capture state through processing completion 68 + **Health Derivation:** Supervisor derives `see`/`hear` health from `observe.status` event recency and content 67 69 **Path Format:** Relative to `JOURNAL_PATH` (e.g., `20251102/163045_300_center_DP-3_screen.webm` for multi-monitor recordings) 68 70 **Correlation:** `detected.ref` matches `logs.exec.ref` for the same handler process; `observed.segment` groups all files from same capture window 69 71
+13 -14
docs/DOCTOR.md
··· 18 18 # Check if supervisor services are running 19 19 pgrep -af "observe-gnome|observe-sense|think-supervisor" 20 20 21 - # Check heartbeat freshness (should be recent) 22 - ls -la $JOURNAL_PATH/health/*.up 23 - 24 21 # Check Callosum socket exists 25 22 ls -la $JOURNAL_PATH/health/callosum.sock 26 23 ··· 30 27 31 28 **Healthy state:** 32 29 - All three processes running 33 - - `.up` files modified within last 60 seconds 34 30 - `callosum.sock` exists 31 + - `supervisor.status` events show no stale heartbeats 35 32 - No `_active.jsonl` files older than a few minutes 36 33 37 34 --- ··· 75 72 76 73 ## Health Signals 77 74 78 - ### Heartbeat Files 75 + Health is derived from `observe.status` Callosum events (emitted every 5 seconds): 79 76 80 - | File | Updated by | Meaning | 81 - |------|------------|---------| 82 - | `health/see.up` | Observer | Screen capture active | 83 - | `health/hear.up` | Observer | Audio capture active | 77 + | Signal | Healthy when | Stale when | 78 + |--------|--------------|------------| 79 + | `hear` | Status received within threshold | No status for 60+ seconds | 80 + | `see` | User idle OR (recording AND files growing) | User active AND (not recording OR files not growing) | 84 81 85 - Staleness threshold: 60 seconds (configurable). Supervisor checks these and alerts if stale. 82 + Staleness threshold: 60 seconds (configurable via `--threshold`). 86 83 87 84 ### Callosum Status Events 88 85 89 86 Services emit periodic status to Callosum (every 5 seconds when active): 90 87 91 - - `observe.status` - Capture state (screencast, audio, activity) 88 + - `observe.status` - Capture state (screencast, audio, activity, files_growing) 92 89 - `cortex.status` - Running agents list 93 90 - `supervisor.status` - Service health, stale heartbeats 91 + 92 + The supervisor derives health from `observe.status` events and includes `stale_heartbeats` in its own status. 94 93 95 94 See [CALLOSUM.md](CALLOSUM.md) Tract Registry for event schemas. 96 95 ··· 132 131 ### Observer not capturing 133 132 134 133 ```bash 135 - # Check heartbeats 136 - ls -la $JOURNAL_PATH/health/*.up 137 - 138 134 # Check observer log for errors 139 135 tail -50 $JOURNAL_PATH/health/observe-gnome.log | grep -i error 136 + 137 + # Check if observer is emitting status (supervisor.status will show stale_heartbeats) 138 + # Health is derived from observe.status Callosum events 140 139 ``` 141 140 142 141 Causes: DBus issues, screencast permissions, audio device unavailable.
+5 -6
muse/agents/doctor.txt
··· 26 26 ``` 27 27 ./ 28 28 ├── health/ # Service health and logs 29 - │ ├── *.up # Heartbeat files 30 29 │ ├── *.log # Service log symlinks 31 30 │ └── callosum.sock # Message bus socket 32 31 ├── agents/ # Agent execution logs ··· 45 44 46 45 ### Quick Health Check 47 46 1. Check if supervisor services are running: `pgrep -af "observe-gnome|observe-sense|think-supervisor"` 48 - 2. Check heartbeat freshness: `ls -la health/*.up` 49 - 3. Check Callosum socket exists: `ls -la health/callosum.sock` 50 - 4. Check for stuck agents: `ls agents/*_active.jsonl 2>/dev/null` 47 + 2. Check Callosum socket exists: `ls -la health/callosum.sock` 48 + 3. Check for stuck agents: `ls agents/*_active.jsonl 2>/dev/null` 49 + 4. Check observer log for recent activity: `tail -20 health/observe-gnome.log` 51 50 52 51 **Healthy state:** 53 52 - All three processes running 54 - - `.up` files modified within last 60 seconds 55 53 - `callosum.sock` exists 54 + - Observer log shows recent status emissions (health derived from Callosum events) 56 55 - No `_active.jsonl` files older than a few minutes 57 56 58 57 ### Service Status ··· 70 69 ### Common Issues 71 70 72 71 **Observer not capturing:** 73 - - Check heartbeats: `ls -la health/*.up` 74 72 - Check log for errors: `tail -50 health/observe-gnome.log | grep -i error` 73 + - Check for recent status emissions in log (health is derived from Callosum events) 75 74 - Causes: DBus issues, screencast permissions, audio device unavailable 76 75 77 76 **Agent appears stuck:**
+12 -15
observe/gnome/observer.py
··· 29 29 from observe.gnome.screencast import Screencaster, StreamInfo 30 30 from observe.hear import AudioRecorder 31 31 from think.callosum import CallosumConnection 32 - from think.utils import day_path, setup_cli, touch_health 32 + from think.utils import day_path, setup_cli 33 33 34 34 logger = logging.getLogger(__name__) 35 35 ··· 72 72 73 73 # Mute state at segment start (determines save format) 74 74 self.segment_is_muted = False 75 + 76 + # Health tracking - whether screencast files are actively growing 77 + self.files_growing = False 75 78 76 79 async def setup(self): 77 80 """Initialize audio devices and DBus connection.""" ··· 348 351 "recording": True, 349 352 "streams": streams_info, 350 353 "window_elapsed_seconds": elapsed, 354 + "files_growing": self.files_growing, 351 355 } 352 356 else: 353 - screencast_info = {"recording": False} 357 + screencast_info = {"recording": False, "files_growing": False} 354 358 355 359 # Audio info 356 360 audio_info = { ··· 494 498 ) 495 499 await self.handle_boundary(is_active) 496 500 497 - # Touch health for audio processing (always) 498 - touch_health("hear") 499 - 500 - # Touch health for screencast based on file existence and growth 501 - if not is_active: 502 - # Healthy not to record when idle/locked 503 - touch_health("see") 504 - elif self.screencast_running and self.current_streams: 505 - # Check if ANY recording file exists and is growing 501 + # Check if screencast files are actively growing (for health reporting) 502 + if self.screencast_running and self.current_streams: 506 503 any_growing = False 507 504 for stream in self.current_streams: 508 505 if os.path.exists(stream.temp_path): ··· 511 508 if current_size > last_size: 512 509 any_growing = True 513 510 self.last_screencast_sizes[stream.temp_path] = current_size 514 - 515 - if any_growing: 516 - touch_health("see") 511 + self.files_growing = any_growing 512 + else: 513 + self.files_growing = False 517 514 518 - # Emit status event 515 + # Emit status event (supervisor derives health from this) 519 516 self.emit_status() 520 517 521 518 # Cleanup on exit
+2 -5
observe/macos/TODO.md
··· 176 176 - Calculate elapsed time since window start (monotonic) 177 177 - Check for boundary: `elapsed >= self.interval or activation_edge` 178 178 - If boundary, call `handle_boundary(is_active)` 179 - - Touch health files: 180 - - `touch_health("hear")` - audio always captured when recording 181 - - `touch_health("see")` - video captured when active 182 - - Verify video file is growing if capturing (health check) 183 - - Emit status event 179 + - Track if capture files are growing (for health reporting via status event) 180 + - Emit status event with `screencast.files_growing` field (supervisor derives health from this) 184 181 - [ ] Call `shutdown()` after loop exits 185 182 186 183 ### 3.8 Implement `shutdown()`
+1 -1
observe/macos/observer.py
··· 23 23 ) 24 24 from observe.macos.screencapture import ScreenCaptureKitManager 25 25 from think.callosum import CallosumConnection 26 - from think.utils import day_path, setup_cli, touch_health 26 + from think.utils import day_path, setup_cli 27 27 28 28 logger = logging.getLogger(__name__) 29 29
+68 -11
tests/test_supervisor.py
··· 8 8 import pytest 9 9 10 10 11 - def test_check_health(tmp_path, monkeypatch): 11 + def test_check_health(): 12 + """Test health checking based on observe.status events.""" 12 13 mod = importlib.import_module("think.supervisor") 13 - health = tmp_path / "health" 14 - health.mkdir() 15 - for name in ("see.up", "hear.up"): 16 - (health / name).write_text("x") 17 - monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 18 - assert mod.check_health(threshold=90) == [] 14 + 15 + # Reset state for clean test 16 + mod._observe_status_state["last_ts"] = 0.0 17 + mod._observe_status_state["activity_active"] = False 18 + mod._observe_status_state["screencast_recording"] = False 19 + mod._observe_status_state["files_growing"] = False 20 + 21 + # No status received yet - both should be stale 22 + stale = mod.check_health(threshold=60) 23 + assert sorted(stale) == ["hear", "see"] 24 + 25 + # Simulate receiving a status event (user inactive) 26 + mod._observe_status_state["last_ts"] = time.time() 27 + mod._observe_status_state["activity_active"] = False 28 + stale = mod.check_health(threshold=60) 29 + assert stale == [] # Healthy - inactive means OK not to record 30 + 31 + # User active but not recording - see should be stale 32 + mod._observe_status_state["activity_active"] = True 33 + mod._observe_status_state["screencast_recording"] = False 34 + stale = mod.check_health(threshold=60) 35 + assert stale == ["see"] 36 + 37 + # User active, recording, but files not growing - see should be stale 38 + mod._observe_status_state["screencast_recording"] = True 39 + mod._observe_status_state["files_growing"] = False 40 + stale = mod.check_health(threshold=60) 41 + assert stale == ["see"] 42 + 43 + # User active, recording, files growing - all healthy 44 + mod._observe_status_state["files_growing"] = True 45 + stale = mod.check_health(threshold=60) 46 + assert stale == [] 19 47 20 - old = time.time() - 100 21 - for hb in health.iterdir(): 22 - os.utime(hb, (old, old)) 23 - stale = mod.check_health(threshold=90) 48 + # Status became stale (old timestamp) 49 + mod._observe_status_state["last_ts"] = time.time() - 100 50 + stale = mod.check_health(threshold=60) 24 51 assert sorted(stale) == ["hear", "see"] 52 + 53 + 54 + def test_handle_observe_status(): 55 + """Test that observe.status events update health state.""" 56 + mod = importlib.import_module("think.supervisor") 57 + 58 + # Reset state 59 + mod._observe_status_state["last_ts"] = 0.0 60 + mod._observe_status_state["activity_active"] = False 61 + mod._observe_status_state["screencast_recording"] = False 62 + mod._observe_status_state["files_growing"] = False 63 + 64 + # Simulate observe.status message 65 + message = { 66 + "tract": "observe", 67 + "event": "status", 68 + "activity": {"active": True}, 69 + "screencast": {"recording": True, "files_growing": True}, 70 + } 71 + mod._handle_observe_status(message) 72 + 73 + assert mod._observe_status_state["last_ts"] > 0 74 + assert mod._observe_status_state["activity_active"] is True 75 + assert mod._observe_status_state["screencast_recording"] is True 76 + assert mod._observe_status_state["files_growing"] is True 77 + 78 + # Non-observe messages should be ignored 79 + old_ts = mod._observe_status_state["last_ts"] 80 + mod._handle_observe_status({"tract": "supervisor", "event": "status"}) 81 + assert mod._observe_status_state["last_ts"] == old_ts 25 82 26 83 27 84 @pytest.mark.asyncio
+55 -12
think/supervisor.py
··· 100 100 # Restart request tracking for SIGKILL enforcement 101 101 _restart_requests: dict[str, tuple[float, subprocess.Popen]] = {} 102 102 103 + # Observe status state for health monitoring (updated from observe.status events) 104 + _observe_status_state: dict = { 105 + "last_ts": 0.0, # Timestamp of last observe.status event 106 + "activity_active": False, # Whether user is active (not idle/locked) 107 + "screencast_recording": False, # Whether screencast is running 108 + "files_growing": False, # Whether screencast files are growing 109 + } 110 + 103 111 104 112 def _get_journal_path() -> Path: 105 113 journal = os.getenv("JOURNAL_PATH") ··· 206 214 207 215 208 216 def check_health(threshold: int = DEFAULT_THRESHOLD) -> list[str]: 209 - """Return a list of stale heartbeat names.""" 217 + """Return a list of stale heartbeat names based on observe.status events. 218 + 219 + Health is derived from the last observe.status Callosum event: 220 + - hear: Stale if no status received within threshold 221 + - see: Stale if active AND (not recording OR files not growing) 222 + """ 210 223 now = time.time() 211 224 stale: list[str] = [] 212 - health_dir = _get_journal_path() / "health" 213 - for name in ("see", "hear"): 214 - path = health_dir / f"{name}.up" 215 - try: 216 - mtime = path.stat().st_mtime 217 - except FileNotFoundError: 218 - stale.append(name) 219 - continue 220 - if now - mtime > threshold: 221 - stale.append(name) 225 + 226 + last_ts = _observe_status_state["last_ts"] 227 + status_age = now - last_ts 228 + 229 + # If no recent status, both are stale 230 + if status_age > threshold: 231 + return ["hear", "see"] 232 + 233 + # hear is healthy if we're receiving status events (audio always runs) 234 + # (already passed the threshold check above) 235 + 236 + # see health depends on activity state 237 + activity_active = _observe_status_state["activity_active"] 238 + screencast_recording = _observe_status_state["screencast_recording"] 239 + files_growing = _observe_status_state["files_growing"] 240 + 241 + if activity_active: 242 + # User is active - screencast should be recording and files growing 243 + if not screencast_recording or not files_growing: 244 + stale.append("see") 245 + # If not active (idle/locked/power-save), it's healthy not to record 246 + 222 247 return stale 223 248 224 249 ··· 1005 1030 ) 1006 1031 1007 1032 1033 + def _handle_observe_status(message: dict) -> None: 1034 + """Handle observe.status events for health monitoring.""" 1035 + if message.get("tract") != "observe" or message.get("event") != "status": 1036 + return 1037 + 1038 + # Update observe status state for health checking 1039 + _observe_status_state["last_ts"] = time.time() 1040 + 1041 + activity = message.get("activity", {}) 1042 + _observe_status_state["activity_active"] = activity.get("active", False) 1043 + 1044 + screencast = message.get("screencast", {}) 1045 + _observe_status_state["screencast_recording"] = screencast.get("recording", False) 1046 + _observe_status_state["files_growing"] = screencast.get("files_growing", False) 1047 + 1048 + 1008 1049 def _handle_callosum_message(message: dict) -> None: 1009 1050 """Dispatch incoming Callosum messages to appropriate handlers.""" 1010 1051 _handle_task_request(message) 1011 1052 _handle_supervisor_request(message) 1012 1053 _handle_segment_observed(message) 1054 + _handle_observe_status(message) 1013 1055 1014 1056 1015 1057 async def supervise( ··· 1019 1061 daily: bool = True, 1020 1062 procs: list[ManagedProcess] | None = None, 1021 1063 ) -> None: 1022 - """Monitor heartbeat files and alert when they become stale. 1064 + """Monitor health via Callosum events and alert when stale. 1023 1065 1066 + Health is derived from observe.status events (see check_health()). 1024 1067 Main supervision loop runs at 1-second intervals for responsiveness. 1025 1068 Subsystems manage their own timing (health checks every interval seconds, 1026 1069 scheduled agents check continuously but only advance when ready).
-18
think/utils.py
··· 462 462 return f"{segment_count} segments, {duration_str}" 463 463 464 464 465 - def touch_health(name: str) -> None: 466 - """Update the journal's ``name`` heartbeat file. 467 - 468 - The journal path is read from ``JOURNAL_PATH`` in the environment. 469 - """ 470 - load_dotenv() 471 - journal = os.getenv("JOURNAL_PATH") 472 - if not journal: 473 - return 474 - path = Path(journal) / "health" / f"{name}.up" 475 - try: 476 - path.parent.mkdir(parents=True, exist_ok=True) 477 - path.touch() 478 - logging.getLogger(__name__).debug(f"Health touched: {name}") 479 - except Exception: 480 - pass 481 - 482 - 483 465 def setup_cli(parser: argparse.ArgumentParser, *, parse_known: bool = False): 484 466 """Parse command line arguments and configure logging. 485 467