personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

dual-observer orchestration: independent linux/tmux observer processes

Replace the supervisor's single observer with two independently managed
processes (linux-observer, tmux-observer). Add --observers flag for
selective startup, reorder boot so convey starts before observers,
implement per-observer health monitoring keyed by stream name from
observe.status events, and update graceful shutdown with universal 2s
drain pause after observers stop.

+282 -161
+2
tests/conftest.py
··· 267 267 # Reset after test 268 268 mod._daily_state["last_day"] = None 269 269 mod._is_remote_mode = False 270 + mod._observer_health = {} 271 + mod._enabled_observers = set() 270 272 # Create fresh task queue 271 273 mod._task_queue = mod.TaskQueue(on_queue_change=None) 272 274 except ImportError:
+158 -89
tests/test_supervisor.py
··· 11 11 12 12 13 13 def test_check_health(): 14 - """Test health checking based on observe.status event freshness. 15 - 16 - Health model is simple: if observer is running, it sends status events. 17 - If it has problems, it exits and supervisor restarts it (fail-fast). 18 - """ 14 + """Test per-observer health checking based on observe.status event freshness.""" 19 15 mod = importlib.import_module("think.supervisor") 20 16 21 - # Ensure observer is enabled for this test 22 - mod._observer_enabled = True 17 + mod._enabled_observers = {"linux-observer", "tmux-observer"} 18 + mod._observer_health = {} 23 19 24 - # Reset state for clean test 25 - mod._observe_status_state["last_ts"] = 0.0 26 - mod._observe_status_state["ever_received"] = False 20 + # No status events yet - grace period, returns empty 21 + stale = mod.check_health(threshold=60) 22 + assert stale == [] 27 23 28 - # Startup grace period: no status ever received - returns healthy (no alerts) 24 + # Simulate first status from linux observer 25 + mod._observer_health["archon"] = {"last_ts": time.time(), "ever_received": True} 29 26 stale = mod.check_health(threshold=60) 30 - assert stale == [] # Grace period - don't alert until first status received 27 + assert stale == [] 31 28 32 - # After first status received, stale timestamp triggers alerts 33 - mod._observe_status_state["ever_received"] = True 34 - mod._observe_status_state["last_ts"] = 0.0 # Very old timestamp 29 + # Linux observer goes stale 30 + mod._observer_health["archon"]["last_ts"] = time.time() - 100 35 31 stale = mod.check_health(threshold=60) 36 - assert sorted(stale) == ["hear", "see"] 32 + assert stale == ["archon"] 37 33 38 - # Fresh status event means healthy (observer is running) 39 - mod._observe_status_state["last_ts"] = time.time() 34 + # Add tmux observer - fresh 35 + mod._observer_health["archon.tmux"] = { 36 + "last_ts": time.time(), 37 + "ever_received": True, 38 + } 40 39 stale = mod.check_health(threshold=60) 41 - assert stale == [] # Healthy - receiving status events 40 + assert stale == ["archon"] 42 41 43 - # Status became stale (old timestamp) - observer stopped sending 44 - mod._observe_status_state["last_ts"] = time.time() - 100 42 + # Both stale 43 + mod._observer_health["archon.tmux"]["last_ts"] = time.time() - 100 44 + stale = mod.check_health(threshold=60) 45 + assert sorted(stale) == ["archon", "archon.tmux"] 46 + 47 + # Both fresh 48 + mod._observer_health["archon"]["last_ts"] = time.time() 49 + mod._observer_health["archon.tmux"]["last_ts"] = time.time() 45 50 stale = mod.check_health(threshold=60) 46 - assert sorted(stale) == ["hear", "see"] 51 + assert stale == [] 47 52 48 53 49 54 def test_check_health_observer_disabled(monkeypatch): 50 - """Test that health checks are skipped when observer is disabled (--no-observers).""" 55 + """Test that health checks are skipped when no observers are enabled.""" 51 56 mod = importlib.import_module("think.supervisor") 52 57 53 - # Simulate --no-observers mode (monkeypatch auto-restores after test) 54 - monkeypatch.setattr(mod, "_observer_enabled", False) 58 + monkeypatch.setattr(mod, "_enabled_observers", set()) 55 59 56 - # Even with stale status, should return empty (no health alerts) 57 - mod._observe_status_state["ever_received"] = True 58 - mod._observe_status_state["last_ts"] = 0.0 # Very stale 60 + # Even with stale health entries, should return empty 61 + mod._observer_health["archon"] = {"last_ts": 0.0, "ever_received": True} 59 62 stale = mod.check_health(threshold=60) 60 - assert stale == [] # No alerts when observer disabled 63 + assert stale == [] 61 64 62 65 63 66 def test_handle_observe_status(): 64 - """Test that observe.status events update health state. 65 - 66 - Handler just tracks event freshness - observer is responsible for 67 - exiting if it's unhealthy (fail-fast model). 68 - """ 67 + """Test that observe.status events update per-observer health state.""" 69 68 mod = importlib.import_module("think.supervisor") 70 69 71 - # Reset state 72 - mod._observe_status_state["last_ts"] = 0.0 73 - mod._observe_status_state["ever_received"] = False 70 + mod._observer_health = {} 74 71 75 - # Simulate observe.status message (only tract/event are required) 76 - message = {"tract": "observe", "event": "status"} 77 - mod._handle_observe_status(message) 72 + # Status with stream field creates health entry 73 + mod._handle_observe_status( 74 + {"tract": "observe", "event": "status", "stream": "archon"} 75 + ) 76 + assert "archon" in mod._observer_health 77 + assert mod._observer_health["archon"]["last_ts"] > 0 78 + assert mod._observer_health["archon"]["ever_received"] is True 78 79 79 - assert mod._observe_status_state["last_ts"] > 0 80 - assert mod._observe_status_state["ever_received"] is True # Grace period ended 80 + # Second stream creates separate entry 81 + mod._handle_observe_status( 82 + {"tract": "observe", "event": "status", "stream": "archon.tmux"} 83 + ) 84 + assert "archon.tmux" in mod._observer_health 85 + assert mod._observer_health["archon.tmux"]["ever_received"] is True 81 86 82 87 # Non-observe messages should be ignored 83 - old_ts = mod._observe_status_state["last_ts"] 84 - mod._handle_observe_status({"tract": "supervisor", "event": "status"}) 85 - assert mod._observe_status_state["last_ts"] == old_ts 88 + old_ts = mod._observer_health["archon"]["last_ts"] 89 + mod._handle_observe_status( 90 + {"tract": "supervisor", "event": "status", "stream": "archon"} 91 + ) 92 + assert mod._observer_health["archon"]["last_ts"] == old_ts 93 + 94 + # Events without stream field ignored (sense status) 95 + mod._observer_health = {} 96 + mod._handle_observe_status({"tract": "observe", "event": "status"}) 97 + assert mod._observer_health == {} 86 98 87 99 88 100 @pytest.mark.asyncio ··· 139 151 assert len(cleared) == 1 # Still just one clear call 140 152 141 153 142 - def test_start_observer_and_sense(tmp_path, mock_callosum, monkeypatch): 143 - """Test that start_observer() and start_sense() launch their respective processes.""" 154 + def test_start_observers_and_sense(tmp_path, mock_callosum, monkeypatch): 155 + """Test that linux-observer, tmux-observer, and sense launch correctly.""" 144 156 mod = importlib.import_module("think.supervisor") 145 157 146 158 started = [] ··· 173 185 monkeypatch.setattr(mod.subprocess, "Popen", fake_popen) 174 186 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 175 187 176 - # Test start_observer() 177 - observer_proc = mod.start_observer() 178 - assert observer_proc is not None 188 + # Test linux-observer 189 + linux_proc = mod._launch_process( 190 + "linux-observer", ["sol", "observer", "-v"], restart=True 191 + ) 192 + assert linux_proc is not None 179 193 assert any(cmd == ["sol", "observer", "-v"] for cmd, _, _ in started) 194 + 195 + # Test tmux-observer 196 + tmux_proc = mod._launch_process( 197 + "tmux-observer", ["sol", "tmux-observer", "-v"], restart=True 198 + ) 199 + assert tmux_proc is not None 200 + assert any(cmd == ["sol", "tmux-observer", "-v"] for cmd, _, _ in started) 180 201 181 202 # Test start_sense() 182 203 sense_proc = mod.start_sense() ··· 255 276 assert args.remote is None 256 277 257 278 258 - def test_remote_mode_shutdown_pauses_after_observer(monkeypatch): 259 - """In remote mode, shutdown pauses after observer exits before stopping sync.""" 279 + def test_parse_args_observers_flag(): 280 + """Test --observers flag parsing.""" 281 + mod = importlib.import_module("think.supervisor") 282 + 283 + parser = mod.parse_args() 284 + 285 + args = parser.parse_args([]) 286 + assert args.observers == "linux,tmux" 287 + 288 + args = parser.parse_args(["--observers", "linux"]) 289 + assert args.observers == "linux" 290 + 291 + args = parser.parse_args(["--observers", "none"]) 292 + assert args.observers == "none" 293 + 294 + args = parser.parse_args(["--no-observers"]) 295 + assert args.no_observers is True 296 + 297 + 298 + def test_shutdown_pauses_after_observers(monkeypatch): 299 + """Shutdown stops observers first, drains 2s, then remaining services.""" 260 300 mod = importlib.import_module("think.supervisor") 261 301 262 302 operations = [] ··· 274 314 def kill(self): 275 315 pass 276 316 317 + def poll(self): 318 + return None 319 + 277 320 class MockManaged: 278 321 def __init__(self, name): 279 322 self.name = name ··· 283 326 def cleanup(self): 284 327 operations.append(("cleanup", self.name)) 285 328 286 - procs = [MockManaged("sync"), MockManaged("observer")] 329 + procs = [ 330 + MockManaged("convey"), 331 + MockManaged("sense"), 332 + MockManaged("linux-observer"), 333 + MockManaged("tmux-observer"), 334 + MockManaged("cortex"), 335 + ] 287 336 288 337 sleep_calls = [] 289 338 ··· 293 342 294 343 monkeypatch.setattr(mod.time, "sleep", fake_sleep) 295 344 296 - # Remote mode: observer exits first, then a 2s pause occurs before sync cleanup. 297 - monkeypatch.setattr(mod, "_is_remote_mode", True) 298 - for managed in reversed(procs): 299 - name = managed.name 345 + observer_procs = [p for p in procs if p.name in ("linux-observer", "tmux-observer")] 346 + other_procs = [ 347 + p for p in procs if p.name not in ("linux-observer", "tmux-observer") 348 + ] 349 + 350 + for managed in observer_procs: 300 351 proc = managed.process 301 352 try: 302 353 proc.terminate() 303 354 except Exception: 304 355 pass 305 356 try: 306 - timeout = getattr(managed, "shutdown_timeout", 15) 307 - proc.wait(timeout=timeout) 357 + proc.wait(timeout=managed.shutdown_timeout) 308 358 except Exception: 309 359 pass 310 360 managed.cleanup() 311 - if mod._is_remote_mode and name == "observer": 312 - mod.logging.info( 313 - "Remote mode: pausing for sync to receive observer's final event" 314 - ) 315 - mod.time.sleep(2) 316 361 317 - assert operations == [ 318 - ("terminate", "observer"), 319 - ("wait", "observer"), 320 - ("cleanup", "observer"), 321 - ("sleep", 2), 322 - ("terminate", "sync"), 323 - ("wait", "sync"), 324 - ("cleanup", "sync"), 325 - ] 326 - assert sleep_calls == [2] 362 + if observer_procs: 363 + mod.time.sleep(2) 327 364 328 - # Non-remote mode: no pause after observer cleanup. 329 - operations.clear() 330 - sleep_calls.clear() 331 - monkeypatch.setattr(mod, "_is_remote_mode", False) 332 - for managed in reversed(procs): 333 - name = managed.name 365 + for managed in reversed(other_procs): 334 366 proc = managed.process 335 367 try: 336 368 proc.terminate() 337 369 except Exception: 338 370 pass 339 371 try: 340 - timeout = getattr(managed, "shutdown_timeout", 15) 341 - proc.wait(timeout=timeout) 372 + proc.wait(timeout=managed.shutdown_timeout) 342 373 except Exception: 343 374 pass 344 375 managed.cleanup() 345 - if mod._is_remote_mode and name == "observer": 346 - mod.logging.info( 347 - "Remote mode: pausing for sync to receive observer's final event" 348 - ) 349 - mod.time.sleep(2) 376 + 377 + assert operations == [ 378 + ("terminate", "linux-observer"), 379 + ("wait", "linux-observer"), 380 + ("cleanup", "linux-observer"), 381 + ("terminate", "tmux-observer"), 382 + ("wait", "tmux-observer"), 383 + ("cleanup", "tmux-observer"), 384 + ("sleep", 2), 385 + ("terminate", "cortex"), 386 + ("wait", "cortex"), 387 + ("cleanup", "cortex"), 388 + ("terminate", "sense"), 389 + ("wait", "sense"), 390 + ("cleanup", "sense"), 391 + ("terminate", "convey"), 392 + ("wait", "convey"), 393 + ("cleanup", "convey"), 394 + ] 395 + assert sleep_calls == [2] 396 + 397 + operations.clear() 398 + sleep_calls.clear() 399 + procs_no_obs = [MockManaged("sense"), MockManaged("cortex")] 400 + observer_procs = [ 401 + p for p in procs_no_obs if p.name in ("linux-observer", "tmux-observer") 402 + ] 403 + other_procs = [ 404 + p for p in procs_no_obs if p.name not in ("linux-observer", "tmux-observer") 405 + ] 406 + 407 + for managed in observer_procs: 408 + managed.process.terminate() 409 + managed.process.wait(timeout=managed.shutdown_timeout) 410 + managed.cleanup() 411 + 412 + if observer_procs: 413 + mod.time.sleep(2) 414 + 415 + for managed in reversed(other_procs): 416 + managed.process.terminate() 417 + managed.process.wait(timeout=managed.shutdown_timeout) 418 + managed.cleanup() 350 419 351 420 assert sleep_calls == [] 352 421 ··· 356 425 mod = importlib.reload(importlib.import_module("think.supervisor")) 357 426 mod.shutdown_requested = False 358 427 359 - health_states = [["hear"], []] 428 + health_states = [["archon"], []] 360 429 time_counter = {"value": 0.0} # Use dict to allow mutation in closure 361 430 362 431 def fake_time(): ··· 397 466 await mod.supervise(threshold=1, interval=1, schedule=False, procs=[]) 398 467 399 468 messages = [record.getMessage() for record in caplog.records] 400 - assert "hear heartbeat recovered" in messages 469 + assert "archon heartbeat recovered" in messages 401 470 assert messages.count("Heartbeat OK") == 1 402 471 403 472 mod.shutdown_requested = False
+122 -72
think/supervisor.py
··· 420 420 # Restart request tracking for SIGKILL enforcement 421 421 _restart_requests: dict[str, tuple[float, subprocess.Popen]] = {} 422 422 423 - # Observe status state for health monitoring (updated from observe.status events) 424 - # Health is now simple: if observer is running, it sends status events. 425 - # If it has problems, it exits and gets restarted (fail-fast model). 426 - _observe_status_state: dict = { 427 - "last_ts": 0.0, # Timestamp of last observe.status event 428 - "ever_received": False, # Whether we've received at least one status event 429 - } 423 + # Per-observer health state, keyed by stream name from observe.status events. 424 + # Each value: {"last_ts": float, "ever_received": bool} 425 + # Populated when observe.status events arrive with a stream field. 426 + _observer_health: dict[str, dict] = {} 430 427 431 - # Track whether observer was started (for health check conditioning) 432 - _observer_enabled: bool = True 428 + # Set of enabled observer process names (e.g. {"linux-observer", "tmux-observer"}). 429 + # Empty set means no observers. Used to gate health checks. 430 + _enabled_observers: set[str] = set() 433 431 434 432 # Track whether running in remote mode (upload-only, no local processing) 435 433 _is_remote_mode: bool = False ··· 554 552 555 553 556 554 def check_health(threshold: int = DEFAULT_THRESHOLD) -> list[str]: 557 - """Return a list of stale heartbeat names based on observe.status events. 555 + """Return a list of stale observer stream names based on observe.status events. 558 556 559 - Health model is simple: if observer is running, it sends status events. 560 - If it has problems, it exits and supervisor restarts it (fail-fast). 557 + Returns stream names of observers that haven't sent status within threshold. 558 + During startup grace period (before first status event per stream), 559 + returns empty list for that stream to avoid false alerts. 561 560 562 - Returns ["hear", "see"] if no status received within threshold, 563 - empty list otherwise. During startup grace period (before first 564 - status event received), returns empty list to avoid false alerts. 565 - 566 - When observer is disabled (--no-observers), always returns empty list 567 - since there's no local capture to monitor. 561 + When no observers are enabled, always returns empty list. 568 562 """ 569 - # Skip health checks if observer was not started 570 - if not _observer_enabled: 563 + if not _enabled_observers: 571 564 return [] 572 565 573 - # Grace period: don't alert until we've received at least one status event 574 - if not _observe_status_state["ever_received"]: 575 - return [] 576 - 566 + stale = [] 577 567 now = time.time() 578 - last_ts = _observe_status_state["last_ts"] 568 + for stream, state in _observer_health.items(): 569 + if not state["ever_received"]: 570 + continue 571 + if now - state["last_ts"] > threshold: 572 + stale.append(stream) 579 573 580 - # If no recent status, observer is not running - both stale 581 - if now - last_ts > threshold: 582 - return ["hear", "see"] 574 + return stale 575 + 583 576 584 - # Receiving status means observer is healthy 585 - return [] 577 + def _latest_observe_ts() -> float: 578 + """Return the most recent observe.status timestamp across all observers.""" 579 + if not _observer_health: 580 + return 0.0 581 + return max(s["last_ts"] for s in _observer_health.values()) 586 582 587 583 588 584 def _get_notifier() -> DesktopNotifier: ··· 815 811 "schedules": schedules, 816 812 "callosum_clients": callosum_clients, 817 813 } 818 - 819 - 820 - def start_observer() -> ManagedProcess: 821 - """Launch platform-detected observer with output logging.""" 822 - return _launch_process("observer", ["sol", "observer", "-v"], restart=True) 823 814 824 815 825 816 def start_sense() -> ManagedProcess: ··· 1051 1042 "capture", 1052 1043 { 1053 1044 "status": "stale", 1054 - "last_seen": _observe_status_state.get("last_ts", 0), 1045 + "last_seen": _latest_observe_ts(), 1055 1046 }, 1056 1047 ) 1057 1048 elif prev_stale: ··· 1059 1050 "capture", 1060 1051 { 1061 1052 "status": "ok", 1062 - "last_seen": _observe_status_state.get("last_ts", 0), 1053 + "last_seen": _latest_observe_ts(), 1063 1054 }, 1064 1055 ) 1065 1056 except Exception: ··· 1252 1243 1253 1244 1254 1245 def _handle_observe_status(message: dict) -> None: 1255 - """Handle observe.status events for health monitoring. 1246 + """Handle observe.status events for per-observer health monitoring. 1256 1247 1257 - Just tracks that we received a status event. The observer is responsible 1258 - for exiting if it's unhealthy (fail-fast model), so receiving status 1259 - means it's working. 1248 + Tracks status freshness per stream. The stream field identifies the 1249 + observer (e.g. "archon" for linux, "archon.tmux" for tmux). 1250 + Events without a stream field (e.g. from sense) are ignored for health. 1260 1251 """ 1261 1252 if message.get("tract") != "observe" or message.get("event") != "status": 1262 1253 return 1263 1254 1264 - _observe_status_state["last_ts"] = time.time() 1265 - _observe_status_state["ever_received"] = True 1255 + stream = message.get("stream") 1256 + if not stream: 1257 + return # sense status events don't have stream field 1258 + 1259 + if stream not in _observer_health: 1260 + _observer_health[stream] = {"last_ts": 0.0, "ever_received": False} 1261 + 1262 + _observer_health[stream]["last_ts"] = time.time() 1263 + _observer_health[stream]["ever_received"] = True 1266 1264 1267 1265 1268 1266 def _handle_segment_event_log(message: dict) -> None: ··· 1473 1471 parser.add_argument( 1474 1472 "--no-observers", 1475 1473 action="store_true", 1476 - help="Do not start local observer (sense still runs for remote/imports)", 1474 + help="Do not start observers (sense still runs for remote/imports)", 1475 + ) 1476 + parser.add_argument( 1477 + "--observers", 1478 + type=str, 1479 + default="linux,tmux", 1480 + help="Comma-separated observers to start: linux, tmux, none (default: linux,tmux)", 1477 1481 ) 1478 1482 parser.add_argument( 1479 1483 "--no-daily", ··· 1550 1554 print(f"Journal: {path} (from {source})") 1551 1555 logging.info("Supervisor starting...") 1552 1556 1553 - global _managed_procs, _supervisor_callosum, _observer_enabled, _is_remote_mode 1557 + global _managed_procs, _supervisor_callosum, _enabled_observers, _is_remote_mode 1554 1558 global _task_queue 1555 1559 procs: list[ManagedProcess] = [] 1560 + convey_port = None 1556 1561 1557 - # Remote mode: run sync instead of local sense/observer 1562 + # Remote mode: run sync instead of local processing 1558 1563 _is_remote_mode = bool(args.remote) 1559 - _observer_enabled = not args.no_observers 1564 + if args.no_observers: 1565 + _enabled_observers = set() 1566 + else: 1567 + obs_names = [o.strip() for o in args.observers.split(",")] 1568 + if "none" in obs_names: 1569 + _enabled_observers = set() 1570 + else: 1571 + valid = {"linux", "tmux"} 1572 + for name in obs_names: 1573 + if name not in valid: 1574 + parser.error( 1575 + f"Invalid observer: {name}. Choose from: linux, tmux, none" 1576 + ) 1577 + _enabled_observers = {f"{name}-observer" for name in obs_names} 1560 1578 1561 1579 # Start Callosum in-process first - it's the message bus that other services depend on 1562 1580 try: ··· 1603 1621 parser.error(f"Remote server not available: {message}") 1604 1622 logging.info(f"Remote server verified: {message}") 1605 1623 procs.append(start_sync(args.remote)) 1606 - # Observer runs unless disabled 1607 - if not args.no_observers: 1608 - procs.append(start_observer()) 1624 + # Start enabled observers (they upload to remote convey ingest) 1625 + if "linux-observer" in _enabled_observers: 1626 + procs.append( 1627 + _launch_process( 1628 + "linux-observer", ["sol", "observer", "-v"], restart=True 1629 + ) 1630 + ) 1631 + if "tmux-observer" in _enabled_observers: 1632 + procs.append( 1633 + _launch_process( 1634 + "tmux-observer", ["sol", "tmux-observer", "-v"], restart=True 1635 + ) 1636 + ) 1609 1637 else: 1610 - # Local mode: sense handles file processing 1638 + # Local mode: convey first (observers upload via HTTP to convey ingest) 1639 + if not args.no_convey: 1640 + proc, convey_port = start_convey_server( 1641 + verbose=args.verbose, debug=args.debug, port=args.port 1642 + ) 1643 + procs.append(proc) 1644 + # Sense handles file processing 1611 1645 procs.append(start_sense()) 1612 - # Observer only runs if not disabled (local capture) 1613 - if not args.no_observers: 1614 - procs.append(start_observer()) 1615 - # Cortex and Convey only run in local mode (remote has no data to serve) 1616 - convey_port = None 1617 - if not _is_remote_mode and not args.no_cortex: 1618 - procs.append(start_cortex_server()) 1619 - if not _is_remote_mode and not args.no_convey: 1620 - proc, convey_port = start_convey_server( 1621 - verbose=args.verbose, debug=args.debug, port=args.port 1622 - ) 1623 - procs.append(proc) 1646 + # Start enabled observers 1647 + if "linux-observer" in _enabled_observers: 1648 + procs.append( 1649 + _launch_process( 1650 + "linux-observer", ["sol", "observer", "-v"], restart=True 1651 + ) 1652 + ) 1653 + if "tmux-observer" in _enabled_observers: 1654 + procs.append( 1655 + _launch_process( 1656 + "tmux-observer", ["sol", "tmux-observer", "-v"], restart=True 1657 + ) 1658 + ) 1659 + # Cortex after observers 1660 + if not args.no_cortex: 1661 + procs.append(start_cortex_server()) 1624 1662 1625 1663 # Make procs accessible to restart handler 1626 1664 _managed_procs = procs ··· 1658 1696 finally: 1659 1697 logging.info("Stopping all processes...") 1660 1698 print("\nShutting down gracefully (this may take a moment)...", flush=True) 1661 - # Shut down managed processes in reverse order to respect dependencies 1662 - for managed in reversed(procs): 1699 + observer_procs = [ 1700 + p for p in procs if p.name in ("linux-observer", "tmux-observer") 1701 + ] 1702 + other_procs = [ 1703 + p for p in procs if p.name not in ("linux-observer", "tmux-observer") 1704 + ] 1705 + 1706 + def _stop_process(managed: ManagedProcess) -> None: 1663 1707 name = managed.name 1664 1708 proc = managed.process 1665 1709 logging.info(f"Stopping {name}...") ··· 1681 1725 except Exception: 1682 1726 pass 1683 1727 managed.cleanup() 1684 - # In remote mode, pause after observer exits so sync can receive 1685 - # the final observe.observing event before entering drain mode 1686 - if _is_remote_mode and name == "observer": 1687 - logging.info( 1688 - "Remote mode: pausing for sync to receive observer's final event" 1689 - ) 1690 - time.sleep(2) 1728 + 1729 + # Stop observers first 1730 + for managed in observer_procs: 1731 + _stop_process(managed) 1732 + 1733 + # Drain pause: let in-flight HTTP uploads complete to convey 1734 + if observer_procs: 1735 + logging.info("Pausing for observer uploads to drain") 1736 + time.sleep(2) 1737 + 1738 + # Stop remaining services in reverse order 1739 + for managed in reversed(other_procs): 1740 + _stop_process(managed) 1691 1741 1692 1742 # Save scheduler state before disconnecting 1693 1743 if schedule_enabled and scheduler._state: