personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix supervisor blocking during daily dream processing

The supervision loop was blocked for 20+ minutes during midnight dream
processing, preventing process restarts and health monitoring. Changed
daily dream to run in a background thread with state tracking so the
loop continues running.

Key changes:
- Add _daily_state dict to track dream execution (running/completed)
- New _run_daily_dream() spawns dream in background thread
- handle_daily_tasks() now non-blocking, spawns thread on day change
- Scheduled agents only run after dream completes successfully
- Restart handler ignores already-exited processes (auto-restart handles)
- Remove dead run_dream()/run_subprocess_task() functions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+193 -174
+27
tests/conftest.py
··· 204 204 sys.modules[name] = types.ModuleType(name) 205 205 206 206 207 + @pytest.fixture(autouse=True) 208 + def reset_supervisor_daily_state(): 209 + """Reset supervisor _daily_state before/after tests to prevent cross-test pollution. 210 + 211 + This prevents background threads spawned by one test from affecting other tests. 212 + """ 213 + try: 214 + from think.supervisor import _daily_state 215 + 216 + # Reset before test 217 + _daily_state["dream_running"] = False 218 + _daily_state["dream_completed"] = False 219 + _daily_state["last_day"] = None 220 + except ImportError: 221 + pass # supervisor not loaded yet 222 + yield 223 + try: 224 + from think.supervisor import _daily_state 225 + 226 + # Reset after test 227 + _daily_state["dream_running"] = False 228 + _daily_state["dream_completed"] = False 229 + _daily_state["last_day"] = None 230 + except ImportError: 231 + pass 232 + 233 + 207 234 @pytest.fixture 208 235 def mock_callosum(monkeypatch): 209 236 """Mock Callosum connections to capture emitted events without real I/O.
+4 -62
tests/test_supervisor.py
··· 191 191 192 192 193 193 @pytest.mark.asyncio 194 - async def test_run_dream(tmp_path, monkeypatch): 195 - mod = importlib.import_module("think.supervisor") 196 - runner_mod = importlib.import_module("think.runner") 197 - 198 - spawn_calls = {} 199 - 200 - class DummyProcess: 201 - def __init__(self): 202 - self.pid = 12345 203 - self.returncode = 0 204 - 205 - def wait(self, timeout=None): 206 - return 0 207 - 208 - class DummyManagedProcess: 209 - def __init__(self, cmd): 210 - from pathlib import Path 211 - 212 - self.process = DummyProcess() 213 - self.name = Path(cmd[0]).name # Derive from cmd[0] 214 - self.cmd = cmd 215 - self.log_writer = DummyLogger() 216 - self._threads = [] 217 - spawn_calls["name"] = self.name 218 - spawn_calls["cmd"] = cmd 219 - 220 - def wait(self, timeout=None): 221 - return 0 222 - 223 - def cleanup(self): 224 - self.log_writer.close() 225 - 226 - class DummyLogger: 227 - def __init__(self): 228 - self.closed = False 229 - 230 - def close(self): 231 - self.closed = True 232 - 233 - def fake_spawn(cmd, *, env=None, ref=None, callosum=None): 234 - return DummyManagedProcess(cmd) 235 - 236 - monkeypatch.setattr(runner_mod.ManagedProcess, "spawn", fake_spawn) 237 - monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 238 - 239 - times = iter([0, 1]) 240 - monkeypatch.setattr(mod.time, "time", lambda: next(times)) 241 - 242 - messages = [] 243 - monkeypatch.setattr( 244 - mod.logging, "info", lambda msg, *a: messages.append(msg % a if a else msg) 245 - ) 246 - 247 - assert await mod.run_dream() is True 248 - 249 - assert spawn_calls["name"] == "think-dream" # Derived from cmd[0] 250 - assert spawn_calls["cmd"] == ["think-dream", "-v"] 251 - assert os.environ["JOURNAL_PATH"] == str(tmp_path) 252 - assert any("seconds" in m for m in messages) 253 - 254 - 255 - @pytest.mark.asyncio 256 194 async def test_supervise_logs_recovery(mock_callosum, monkeypatch, caplog): 257 195 mod = importlib.reload(importlib.import_module("think.supervisor")) 258 196 mod.shutdown_requested = False ··· 284 222 async def fake_check_scheduled_agents(): 285 223 pass 286 224 225 + def fake_handle_daily_tasks(): 226 + pass 227 + 287 228 monkeypatch.setattr(mod, "check_runner_exits", lambda procs: []) 288 229 monkeypatch.setattr(mod, "check_health", fake_check_health) 289 230 monkeypatch.setattr(mod, "send_notification", fake_send_notification) 290 231 monkeypatch.setattr(mod, "clear_notification", fake_clear_notification) 291 232 monkeypatch.setattr(mod, "check_scheduled_agents", fake_check_scheduled_agents) 233 + monkeypatch.setattr(mod, "handle_daily_tasks", fake_handle_daily_tasks) 292 234 monkeypatch.setattr(mod.time, "time", fake_time) 293 235 monkeypatch.setattr(mod.asyncio, "sleep", fake_sleep) 294 236
+99 -66
tests/test_supervisor_schedule.py
··· 3 3 4 4 """Test supervisor scheduling functionality.""" 5 5 6 - import asyncio 7 6 import os 8 - from unittest.mock import Mock, patch 7 + from unittest.mock import patch 9 8 10 9 import pytest 11 10 ··· 17 16 @patch("think.supervisor.get_agents") 18 17 @pytest.mark.asyncio 19 18 async def test_spawn_scheduled_agents( 20 - mock_get_agents, mock_cortex_request, mock_input_summary 19 + mock_get_agents, mock_cortex_request, mock_input_summary, tmp_path 21 20 ): 22 21 """Test that scheduled agents are spawned correctly via Cortex.""" 23 22 from think.supervisor import check_scheduled_agents ··· 47 46 mock_input_summary.return_value = "No recordings" 48 47 49 48 # Call the functions (prepare then execute) 50 - with patch.dict(os.environ, {"JOURNAL_PATH": "/test/journal"}, clear=True): 49 + with patch.dict(os.environ, {"JOURNAL_PATH": str(tmp_path)}, clear=True): 51 50 spawn_scheduled_agents() 52 51 await check_scheduled_agents() 53 52 ··· 67 66 assert "No recordings" in second_call[1]["prompt"] 68 67 69 68 70 - @patch("think.supervisor.check_scheduled_agents") 69 + @patch("think.runner.run_task") 71 70 @patch("think.supervisor.spawn_scheduled_agents") 72 - @patch("think.supervisor.run_dream") 73 - def test_supervisor_runs_scheduled_after_dream( 74 - mock_run_dream, mock_spawn_scheduled, mock_check_scheduled, tmp_path, mock_callosum 71 + def test_run_daily_dream_spawns_agents_on_success( 72 + mock_spawn_scheduled, mock_run_task, mock_callosum 75 73 ): 76 - """Test that scheduled agents run only after successful dream.""" 77 - from think.supervisor import supervise 74 + """Test that _run_daily_dream spawns scheduled agents after successful dream.""" 75 + from think.supervisor import _daily_state, _run_daily_dream 78 76 79 - # Test successful dream 80 - mock_run_dream.return_value = True 77 + # Reset state 78 + _daily_state["dream_running"] = True 79 + _daily_state["dream_completed"] = False 81 80 82 - with patch("think.supervisor.datetime") as mock_datetime: 83 - with patch("think.supervisor.asyncio.sleep") as mock_sleep: 84 - with patch("think.supervisor.check_health") as mock_check_health: 85 - # Mock dates to trigger daily processing 86 - mock_now = Mock() 87 - mock_now.date.side_effect = [ 88 - Mock(name="day1"), 89 - Mock(name="day2"), # Different day triggers dream 90 - Mock(name="day2"), # Same day after processing 91 - ] 92 - mock_datetime.now.return_value = mock_now 93 - mock_check_health.return_value = [] # No stale processes 81 + # Mock run_task to return success 82 + mock_run_task.return_value = (True, 0) 94 83 95 - # Use side effect to break loop after first iteration 96 - mock_sleep.side_effect = KeyboardInterrupt 84 + _run_daily_dream() 97 85 98 - with patch.dict( 99 - os.environ, {"JOURNAL_PATH": str(tmp_path)}, clear=True 100 - ): 101 - try: 102 - asyncio.run(supervise(daily=True)) 103 - except KeyboardInterrupt: 104 - pass 86 + # Verify state was updated 87 + assert _daily_state["dream_running"] is False 88 + assert _daily_state["dream_completed"] is True 105 89 106 - mock_run_dream.assert_called_once() 107 - mock_spawn_scheduled.assert_called_once_with() 90 + # Verify spawn_scheduled_agents was called 91 + mock_spawn_scheduled.assert_called_once() 108 92 109 93 110 - @patch("think.supervisor.check_scheduled_agents") 94 + @patch("think.runner.run_task") 111 95 @patch("think.supervisor.spawn_scheduled_agents") 112 - @patch("think.supervisor.run_dream") 113 - def test_supervisor_skips_scheduled_on_dream_failure( 114 - mock_run_dream, mock_spawn_scheduled, mock_check_scheduled, tmp_path, mock_callosum 96 + def test_run_daily_dream_skips_agents_on_failure( 97 + mock_spawn_scheduled, mock_run_task, mock_callosum 115 98 ): 116 - """Test that scheduled agents don't run if dream fails.""" 117 - from think.supervisor import supervise 99 + """Test that _run_daily_dream does not spawn agents when dream fails.""" 100 + from think.supervisor import _daily_state, _run_daily_dream 101 + 102 + # Reset state 103 + _daily_state["dream_running"] = True 104 + _daily_state["dream_completed"] = False 105 + 106 + # Mock run_task to return failure 107 + mock_run_task.return_value = (False, 1) 108 + 109 + _run_daily_dream() 110 + 111 + # Verify state was updated 112 + assert _daily_state["dream_running"] is False 113 + assert _daily_state["dream_completed"] is False # Stays False on failure 114 + 115 + # Verify spawn_scheduled_agents was NOT called 116 + mock_spawn_scheduled.assert_not_called() 117 + 118 + 119 + def test_handle_daily_tasks_spawns_dream_on_day_change(mock_callosum): 120 + """Test that handle_daily_tasks spawns dream thread when day changes.""" 121 + from datetime import date 118 122 119 - # Test failed dream 120 - mock_run_dream.return_value = False 123 + from think.supervisor import _daily_state, handle_daily_tasks 121 124 122 - with patch("think.supervisor.datetime") as mock_datetime: 123 - with patch("think.supervisor.asyncio.sleep") as mock_sleep: 124 - with patch("think.supervisor.check_health") as mock_check_health: 125 - # Mock dates to trigger daily processing 126 - mock_now = Mock() 127 - mock_now.date.side_effect = [ 128 - Mock(name="day1"), 129 - Mock(name="day2"), # Different day triggers dream 130 - Mock(name="day2"), # Same day after processing 131 - ] 132 - mock_datetime.now.return_value = mock_now 133 - mock_check_health.return_value = [] # No stale processes 125 + # Reset state to a previous day 126 + _daily_state["last_day"] = date(2025, 1, 1) 127 + _daily_state["dream_running"] = False 128 + _daily_state["dream_completed"] = False 134 129 135 - # Use side effect to break loop after first iteration 136 - mock_sleep.side_effect = KeyboardInterrupt 130 + # Mock threading.Thread to capture the spawn 131 + spawned_threads = [] 137 132 138 - with patch.dict( 139 - os.environ, {"JOURNAL_PATH": str(tmp_path)}, clear=True 140 - ): 141 - try: 142 - asyncio.run(supervise(daily=True)) 143 - except KeyboardInterrupt: 144 - pass 133 + class MockThread: 134 + def __init__(self, target, daemon=False): 135 + spawned_threads.append(target) 136 + self.target = target 145 137 146 - mock_run_dream.assert_called_once() 147 - mock_spawn_scheduled.assert_not_called() 138 + def start(self): 139 + pass # Don't actually start the thread 140 + 141 + with patch("think.supervisor.threading.Thread", MockThread): 142 + with patch("think.supervisor.datetime") as mock_datetime: 143 + mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 144 + handle_daily_tasks() 145 + 146 + # Verify a thread was spawned 147 + assert len(spawned_threads) == 1 148 + assert _daily_state["dream_running"] is True 149 + assert _daily_state["last_day"] == date(2025, 1, 2) 150 + 151 + 152 + def test_handle_daily_tasks_no_spawn_same_day(mock_callosum): 153 + """Test that handle_daily_tasks does not spawn dream on same day.""" 154 + from datetime import date 155 + 156 + from think.supervisor import _daily_state, handle_daily_tasks 157 + 158 + today = date(2025, 1, 2) 159 + 160 + # Set state to today 161 + _daily_state["last_day"] = today 162 + _daily_state["dream_running"] = False 163 + _daily_state["dream_completed"] = True 164 + 165 + spawned_threads = [] 166 + 167 + class MockThread: 168 + def __init__(self, target, daemon=False): 169 + spawned_threads.append(target) 170 + 171 + def start(self): 172 + pass 173 + 174 + with patch("think.supervisor.threading.Thread", MockThread): 175 + with patch("think.supervisor.datetime") as mock_datetime: 176 + mock_datetime.now.return_value.date.return_value = today 177 + handle_daily_tasks() 178 + 179 + # Verify no thread was spawned 180 + assert len(spawned_threads) == 0
+63 -46
think/supervisor.py
··· 115 115 # Track whether observer was started (for health check conditioning) 116 116 _observer_enabled: bool = True 117 117 118 + # State for daily processing (dream runs in background, agents wait for completion) 119 + _daily_state = { 120 + "dream_running": False, # True while dream subprocess is active 121 + "dream_completed": False, # True after dream finishes (reset each day) 122 + "last_day": None, # Track which day we last processed 123 + } 124 + 118 125 119 126 def _get_journal_path() -> Path: 120 127 journal = os.getenv("JOURNAL_PATH") ··· 304 311 logging.error("Failed to clear notification: %s", exc) 305 312 306 313 307 - async def run_subprocess_task(name: str, cmd: list[str]) -> bool: 308 - """Run a subprocess task while mirroring output to a dedicated log. 309 - 310 - Runs the subprocess in a thread to avoid blocking the async event loop. 311 - 312 - Args: 313 - name: Display name for the task 314 - cmd: Command and arguments to execute 315 - 316 - Returns: 317 - True when the subprocess exits successfully. 318 - """ 319 - 320 - def _blocking_run(): 321 - start = time.time() 322 - try: 323 - managed = RunnerManagedProcess.spawn(cmd, callosum=_supervisor_callosum) 324 - return_code = managed.wait() 325 - finally: 326 - managed.cleanup() 327 - 328 - duration = int(time.time() - start) 329 - logging.info(f"{name} finished in {duration} seconds") 330 - return return_code == 0 331 - 332 - return await asyncio.to_thread(_blocking_run) 333 - 334 - 335 - async def run_dream() -> bool: 336 - """Run ``think.dream`` while mirroring output to a dedicated log.""" 337 - return await run_subprocess_task("dream", ["think-dream", "-v"]) 338 - 339 - 340 314 def spawn_scheduled_agents() -> None: 341 315 """Prepare scheduled agents grouped by priority for sequential execution.""" 342 316 try: ··· 614 588 logging.error("Invalid restart request: missing service") 615 589 return 616 590 617 - # Find and signal the process 591 + # Find the process 618 592 for proc in _managed_procs: 619 - if proc.name == service and proc.process.poll() is None: 593 + if proc.name == service: 594 + # Check if process is still running 595 + if proc.process.poll() is not None: 596 + # Already exited - ignore, supervision loop will auto-restart 597 + logging.debug( 598 + f"Ignoring restart for {service}: already exited, awaiting auto-restart" 599 + ) 600 + return 601 + 620 602 logging.info(f"Restart requested for {service}, sending SIGINT...") 621 603 622 604 # Emit restarting event ··· 638 620 logging.error(f"Failed to send SIGINT to {service}: {e}") 639 621 return 640 622 641 - logging.warning(f"Cannot restart {service}: not found or not running") 623 + logging.warning(f"Cannot restart {service}: not found in managed processes") 642 624 643 625 644 626 def cancel_task(ref: str) -> bool: ··· 975 957 return now, stale_set 976 958 977 959 978 - async def handle_daily_tasks(last_day: datetime.date) -> datetime.date: 979 - """Run daily processing (dream + scheduled agents). Returns new last_day.""" 960 + def _run_daily_dream() -> None: 961 + """Run daily think-dream in background thread, update state on completion.""" 962 + from think.runner import run_task 963 + 964 + logging.info("Starting daily dream processing...") 965 + success, exit_code = run_task( 966 + ["think-dream", "-v"], 967 + callosum=_supervisor_callosum, 968 + ) 969 + 970 + # Update state on completion 971 + _daily_state["dream_running"] = False 972 + 973 + if success: 974 + logging.info("Daily dream completed successfully") 975 + _daily_state["dream_completed"] = True 976 + spawn_scheduled_agents() 977 + else: 978 + logging.error(f"Daily dream failed with exit code {exit_code}") 979 + # dream_completed stays False, so scheduled agents won't run 980 + 981 + 982 + def handle_daily_tasks() -> None: 983 + """Check for day change and spawn daily dream if needed (non-blocking). 984 + 985 + Dream runs in a background thread so the supervision loop continues. 986 + Scheduled agents are spawned after dream completes successfully. 987 + """ 980 988 today = datetime.now().date() 981 - if today != last_day: 982 - if await run_dream(): 983 - spawn_scheduled_agents() 984 - return today 985 - return last_day 989 + 990 + # Check if day changed 991 + if today != _daily_state["last_day"]: 992 + # Reset state for new day 993 + _daily_state["last_day"] = today 994 + _daily_state["dream_completed"] = False 995 + 996 + # Don't start new dream if one is already running (edge case) 997 + if _daily_state["dream_running"]: 998 + logging.warning("Day changed but dream already running, skipping") 999 + return 1000 + 1001 + # Spawn dream in background thread 1002 + _daily_state["dream_running"] = True 1003 + threading.Thread(target=_run_daily_dream, daemon=True).start() 986 1004 987 1005 988 1006 def _handle_segment_observed(message: dict) -> None: ··· 1111 1129 scheduled agents check continuously but only advance when ready). 1112 1130 """ 1113 1131 alert_mgr = AlertManager() 1114 - last_day = datetime.now().date() 1115 1132 last_health_check = 0.0 1116 1133 last_status_emit = 0.0 1117 1134 prev_stale: set[str] = set() ··· 1154 1171 logging.debug(f"Status emission failed: {e}") 1155 1172 last_status_emit = now 1156 1173 1157 - # Check for daily processing 1174 + # Check for daily processing (non-blocking, spawns dream in background) 1158 1175 if daily: 1159 - last_day = await handle_daily_tasks(last_day) 1176 + handle_daily_tasks() 1160 1177 1161 1178 # Advance scheduled agent execution (non-blocking) 1162 1179 await check_scheduled_agents()