personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Unify all dream invocations through TaskQueue for exclusivity

Daily and segment dreams previously spawned raw threads via run_task(),
bypassing TaskQueue. This meant they could run concurrently with each
other and with flush/activity/callosum dreams. Now all five dream paths
serialize through TaskQueue by command name, ensuring only one dream
runs at a time regardless of trigger source.

- Remove _run_daily_processing() and _run_segment_processing()
- Route handle_daily_tasks() and _handle_segment_observed() through
_task_queue.submit()
- Drop dead _daily_state keys (dream_running, dream_completed)
- Update tests to use capture_submit pattern instead of internal state

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+88 -183
+1 -9
tests/conftest.py
··· 228 228 229 229 @pytest.fixture(autouse=True) 230 230 def reset_supervisor_state(): 231 - """Reset supervisor module state before/after tests to prevent cross-test pollution. 232 - 233 - This prevents background threads spawned by one test from affecting other tests, 234 - and ensures mode flags don't leak between tests. 235 - """ 231 + """Reset supervisor module state before/after tests to prevent cross-test pollution.""" 236 232 try: 237 233 import think.supervisor as mod 238 234 239 235 # Reset before test 240 - mod._daily_state["dream_running"] = False 241 - mod._daily_state["dream_completed"] = False 242 236 mod._daily_state["last_day"] = None 243 237 mod._is_remote_mode = False 244 238 # Create fresh task queue ··· 250 244 import think.supervisor as mod 251 245 252 246 # Reset after test 253 - mod._daily_state["dream_running"] = False 254 - mod._daily_state["dream_completed"] = False 255 247 mod._daily_state["last_day"] = None 256 248 mod._is_remote_mode = False 257 249 # Create fresh task queue
+66 -95
tests/test_supervisor_schedule.py
··· 4 4 """Test supervisor daily scheduling functionality.""" 5 5 6 6 from datetime import date 7 - from pathlib import Path 8 7 from unittest.mock import patch 9 8 10 9 11 - def test_handle_daily_tasks_spawns_dream_on_day_change(mock_callosum): 12 - """Test that handle_daily_tasks spawns dream thread when day changes.""" 13 - # Import from current module state (handles reloads from other tests) 10 + def test_handle_daily_tasks_submits_dream_on_day_change(mock_callosum): 11 + """Test that handle_daily_tasks submits dream via task queue when day changes.""" 12 + import think.supervisor as mod 14 13 from think.supervisor import _daily_state, handle_daily_tasks 15 14 16 15 # Reset state to a previous day 17 16 _daily_state["last_day"] = date(2025, 1, 1) 18 - _daily_state["dream_running"] = False 19 - _daily_state["dream_completed"] = False 17 + 18 + # Track submitted commands 19 + submitted = [] 20 + original_submit = mod._task_queue.submit 20 21 21 - # Mock threading.Thread to capture the spawn 22 - spawned_threads = [] 22 + def capture_submit(cmd, *args, **kwargs): 23 + submitted.append(cmd) 24 + return original_submit(cmd, *args, **kwargs) 23 25 24 - class MockThread: 25 - def __init__(self, target, args=None, daemon=False): 26 - spawned_threads.append((target, args)) 27 - self.target = target 28 - self.args = args 26 + mod._task_queue.submit = capture_submit 29 27 30 - def start(self): 31 - pass # Don't actually start the thread 28 + with patch("think.supervisor.datetime") as mock_datetime: 29 + mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 30 + handle_daily_tasks() 32 31 33 - with patch("think.supervisor.threading.Thread", MockThread): 34 - with patch("think.supervisor.datetime") as mock_datetime: 35 - mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 36 - handle_daily_tasks() 32 + # Verify a dream task was submitted 33 + assert len(submitted) == 1 34 + assert submitted[0][1] == "dream" 37 35 38 - # Verify a thread was spawned with the correct day argument 39 - assert len(spawned_threads) == 1 40 - target, args = spawned_threads[0] 41 - assert args == ("20250101",) # The previous day (last_day) is processed 42 - assert _daily_state["dream_running"] is True 36 + # Verify day state was updated 43 37 assert _daily_state["last_day"] == date(2025, 1, 2) 44 38 45 39 46 40 def test_handle_daily_tasks_no_spawn_same_day(mock_callosum): 47 - """Test that handle_daily_tasks does not spawn dream on same day.""" 48 - # Import from current module state (handles reloads from other tests) 41 + """Test that handle_daily_tasks does not submit dream on same day.""" 42 + import think.supervisor as mod 49 43 from think.supervisor import _daily_state, handle_daily_tasks 50 44 51 45 today = date(2025, 1, 2) 52 46 53 47 # Set state to today 54 48 _daily_state["last_day"] = today 55 - _daily_state["dream_running"] = False 56 - _daily_state["dream_completed"] = True 57 49 58 - spawned_threads = [] 50 + # Track submitted commands 51 + submitted = [] 52 + original_submit = mod._task_queue.submit 59 53 60 - class MockThread: 61 - def __init__(self, target, args=None, daemon=False): 62 - spawned_threads.append((target, args)) 54 + def capture_submit(cmd, *args, **kwargs): 55 + submitted.append(cmd) 56 + return original_submit(cmd, *args, **kwargs) 63 57 64 - def start(self): 65 - pass 58 + mod._task_queue.submit = capture_submit 66 59 67 - with patch("think.supervisor.threading.Thread", MockThread): 68 - with patch("think.supervisor.datetime") as mock_datetime: 69 - mock_datetime.now.return_value.date.return_value = today 70 - handle_daily_tasks() 60 + with patch("think.supervisor.datetime") as mock_datetime: 61 + mock_datetime.now.return_value.date.return_value = today 62 + handle_daily_tasks() 71 63 72 - # Verify no thread was spawned 73 - assert len(spawned_threads) == 0 64 + # Verify no task was submitted 65 + assert len(submitted) == 0 74 66 75 67 76 - def test_run_daily_processing_success(mock_callosum): 77 - """Test that _run_daily_processing updates state on success.""" 78 - # Import from current module state (handles reloads from other tests) 79 - from think.supervisor import _daily_state, _run_daily_processing 68 + def test_handle_daily_tasks_submits_correct_command(mock_callosum): 69 + """Test that handle_daily_tasks submits sol dream with --refresh for the previous day.""" 70 + import think.supervisor as mod 71 + from think.supervisor import _daily_state, handle_daily_tasks 80 72 81 - # Reset state 82 - _daily_state["dream_running"] = True 83 - _daily_state["dream_completed"] = False 73 + _daily_state["last_day"] = date(2025, 1, 1) 84 74 85 - # Mock run_task to return success 86 - with patch("think.runner.run_task") as mock_run_task: 87 - mock_run_task.return_value = (True, 0, Path("/tmp/test.log")) 88 - _run_daily_processing("20250101") 89 - 90 - # Verify state was updated 91 - assert _daily_state["dream_running"] is False 92 - assert _daily_state["dream_completed"] is True 93 - 94 - # Verify sol dream was called with correct args 95 - mock_run_task.assert_called_once() 96 - call_args = mock_run_task.call_args[0][0] 97 - assert call_args[0] == "sol" 98 - assert call_args[1] == "dream" 99 - assert "-v" in call_args 100 - assert "--day" in call_args 101 - assert "20250101" in call_args 102 - assert "--refresh" in call_args 103 - 75 + # Track submitted commands 76 + submitted = [] 77 + original_submit = mod._task_queue.submit 104 78 105 - def test_run_daily_processing_failure(mock_callosum): 106 - """Test that _run_daily_processing handles failure correctly.""" 107 - # Import from current module state (handles reloads from other tests) 108 - from think.supervisor import _daily_state, _run_daily_processing 79 + def capture_submit(cmd, *args, **kwargs): 80 + submitted.append(cmd) 81 + return original_submit(cmd, *args, **kwargs) 109 82 110 - # Reset state 111 - _daily_state["dream_running"] = True 112 - _daily_state["dream_completed"] = False 83 + mod._task_queue.submit = capture_submit 113 84 114 - # Mock run_task to return failure 115 - with patch("think.runner.run_task") as mock_run_task: 116 - mock_run_task.return_value = (False, 1, Path("/tmp/test.log")) 117 - _run_daily_processing("20250101") 85 + with patch("think.supervisor.datetime") as mock_datetime: 86 + mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 87 + handle_daily_tasks() 118 88 119 - # Verify state was updated 120 - assert _daily_state["dream_running"] is False 121 - assert _daily_state["dream_completed"] is False # Stays False on failure 89 + assert len(submitted) == 1 90 + cmd = submitted[0] 91 + assert cmd[0] == "sol" 92 + assert cmd[1] == "dream" 93 + assert "--day" in cmd 94 + assert "20250101" in cmd 95 + assert "--refresh" in cmd 122 96 123 97 124 98 def test_handle_daily_tasks_skipped_in_remote_mode(mock_callosum): ··· 128 102 129 103 # Reset state to a previous day (would normally trigger dream) 130 104 _daily_state["last_day"] = date(2025, 1, 1) 131 - _daily_state["dream_running"] = False 132 - _daily_state["dream_completed"] = False 133 105 134 - spawned_threads = [] 106 + # Track submitted commands 107 + submitted = [] 108 + original_submit = mod._task_queue.submit 135 109 136 - class MockThread: 137 - def __init__(self, target, args=None, daemon=False): 138 - spawned_threads.append((target, args)) 110 + def capture_submit(cmd, *args, **kwargs): 111 + submitted.append(cmd) 112 + return original_submit(cmd, *args, **kwargs) 139 113 140 - def start(self): 141 - pass 114 + mod._task_queue.submit = capture_submit 142 115 143 116 # Enable remote mode (fixture resets after test) 144 117 mod._is_remote_mode = True 145 118 146 - with patch("think.supervisor.threading.Thread", MockThread): 147 - with patch("think.supervisor.datetime") as mock_datetime: 148 - mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 149 - handle_daily_tasks() 119 + with patch("think.supervisor.datetime") as mock_datetime: 120 + mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 121 + handle_daily_tasks() 150 122 151 - # Verify no thread was spawned (remote mode skips daily processing) 152 - assert len(spawned_threads) == 0 123 + # Verify no task was submitted (remote mode skips daily processing) 124 + assert len(submitted) == 0 153 125 # State should be unchanged (early return before any state updates) 154 126 assert _daily_state["last_day"] == date(2025, 1, 1) 155 - assert _daily_state["dream_running"] is False
+21 -79
think/supervisor.py
··· 416 416 # Track whether running in remote mode (upload-only, no local processing) 417 417 _is_remote_mode: bool = False 418 418 419 - # State for daily processing (dream runs in background, agents wait for completion) 419 + # State for daily processing (tracks day boundary for midnight dream trigger) 420 420 _daily_state = { 421 - "dream_running": False, # True while dream subprocess is active 422 - "dream_completed": False, # True after dream finishes (reset each day) 423 421 "last_day": None, # Track which day we last processed 424 - "start_time": 0, # When daily processing started (for duration tracking) 425 422 } 426 423 427 424 # Timeout before flushing stale segments (seconds) ··· 1008 1005 return now, stale_set 1009 1006 1010 1007 1011 - def _run_daily_processing(day: str) -> None: 1012 - """Run complete daily processing via sol dream. 1013 - 1014 - dream now handles both generators and agent execution, so we just 1015 - invoke it with --refresh and let it manage the full pipeline. 1016 - 1017 - Args: 1018 - day: Target day in YYYYMMDD format 1019 - """ 1020 - from think.runner import run_task 1021 - 1022 - logging.info(f"Starting daily processing for {day}...") 1023 - success, exit_code, log_path = run_task( 1024 - ["sol", "dream", "-v", "--day", day, "--refresh"], 1025 - callosum=_supervisor_callosum, 1026 - day=day, 1027 - ) 1028 - 1029 - # Update state on completion 1030 - _daily_state["dream_running"] = False 1031 - 1032 - if success: 1033 - logging.info(f"Daily processing completed for {day}") 1034 - _daily_state["dream_completed"] = True 1035 - else: 1036 - logging.error( 1037 - f"Daily processing failed for {day} with exit code {exit_code}, " 1038 - f"see {log_path}" 1039 - ) 1040 - 1041 - 1042 1008 def handle_daily_tasks() -> None: 1043 - """Check for day change and spawn daily dream if needed (non-blocking). 1009 + """Check for day change and submit daily dream via task queue (non-blocking). 1044 1010 1045 1011 Dream only triggers when the day actually changes during runtime (at midnight). 1046 1012 The supervisor initializes last_day on startup, so restarts don't trigger dream. 1047 - Scheduled agents are spawned after dream completes successfully. 1013 + Submitted via TaskQueue so it serializes with segment/activity/flush dreams. 1048 1014 1049 1015 Skipped in remote mode (no local data to process). 1050 1016 """ ··· 1069 1035 1070 1036 # Update state for new day 1071 1037 _daily_state["last_day"] = today 1072 - _daily_state["dream_completed"] = False 1073 - 1074 - # Don't start new dream if one is already running (edge case) 1075 - if _daily_state["dream_running"]: 1076 - logging.warning( 1077 - f"Day changed to {today} but dream already running, skipping {prev_day_str}" 1078 - ) 1079 - return 1080 1038 1081 1039 # Flush any dangling segment state from the previous day before daily dream 1082 1040 if not _flush_state["flushed"] and _flush_state["day"] == prev_day_str: ··· 1086 1044 f"Day changed to {today}, starting daily processing for {prev_day_str}" 1087 1045 ) 1088 1046 1089 - # Spawn processing in background thread with target day 1090 - _daily_state["dream_running"] = True 1091 - _daily_state["start_time"] = time.time() 1092 - threading.Thread( 1093 - target=_run_daily_processing, args=(prev_day_str,), daemon=True 1094 - ).start() 1047 + # Submit via task queue — serializes with other dream invocations 1048 + cmd = ["sol", "dream", "-v", "--day", prev_day_str, "--refresh"] 1049 + if _task_queue: 1050 + _task_queue.submit(cmd, day=prev_day_str) 1051 + else: 1052 + logging.warning( 1053 + "No task queue available for daily processing: %s", prev_day_str 1054 + ) 1095 1055 1096 1056 1097 1057 def _handle_segment_observed(message: dict) -> None: 1098 1058 """Handle segment completion events (from live observation or imports). 1099 1059 1100 - Spawns sol dream in segment mode, which handles both generators and 1101 - segment agents. Also updates flush state to track segment recency. 1060 + Submits sol dream in segment mode via task queue, which handles both 1061 + generators and segment agents. Also updates flush state to track 1062 + segment recency. 1102 1063 """ 1103 1064 if message.get("tract") != "observe" or message.get("event") != "observed": 1104 1065 return ··· 1119 1080 _flush_state["stream"] = stream 1120 1081 _flush_state["flushed"] = False 1121 1082 1122 - logging.info(f"Segment observed: {day}/{segment}, spawning processing...") 1123 - 1124 - # Run dream in segment mode (handles both generators and agents) 1125 - threading.Thread( 1126 - target=_run_segment_processing, 1127 - args=(day, segment, stream), 1128 - daemon=True, 1129 - ).start() 1130 - 1131 - 1132 - def _run_segment_processing(day: str, segment: str, stream: str | None = None) -> None: 1133 - """Run sol dream for a specific segment.""" 1134 - from think.runner import run_task 1083 + logging.info(f"Segment observed: {day}/{segment}, submitting processing...") 1135 1084 1136 - logging.info(f"Starting segment processing: {day}/{segment}") 1085 + # Submit via task queue — serializes with other dream invocations 1137 1086 cmd = ["sol", "dream", "-v", "--day", day, "--segment", segment] 1138 1087 if stream: 1139 1088 cmd.extend(["--stream", stream]) 1140 - success, exit_code, log_path = run_task( 1141 - cmd, 1142 - callosum=_supervisor_callosum, 1143 - day=day, 1144 - ) 1145 - 1146 - if success: 1147 - logging.info(f"Segment processing completed: {day}/{segment}") 1089 + if _task_queue: 1090 + _task_queue.submit(cmd, day=day) 1148 1091 else: 1149 - logging.error( 1150 - f"Segment processing failed with exit code {exit_code}: " 1151 - f"{day}/{segment}, see {log_path}" 1092 + logging.warning( 1093 + "No task queue available for segment processing: %s/%s", day, segment 1152 1094 ) 1153 1095 1154 1096 ··· 1344 1286 # Check for segment flush (non-blocking, submits via task queue) 1345 1287 _check_segment_flush() 1346 1288 1347 - # Check for daily processing (non-blocking, spawns dream in background) 1289 + # Check for daily processing (non-blocking, submits via task queue) 1348 1290 if daily: 1349 1291 handle_daily_tasks() 1350 1292