personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Process dirty days at midnight instead of just previous day

Replace the single prev_day dream submission in handle_daily_tasks()
with dirty_days()-based catchup. At midnight, the newest 4 dirty days
are queued oldest-first so yesterday is always processed last. Dream
auto-detects dirty state and enables --refresh internally, so we no
longer pass it explicitly. Days with no stream data are naturally
skipped.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+174 -35
+131 -22
tests/test_supervisor_schedule.py
··· 12 12 import think.supervisor as mod 13 13 from think.supervisor import _daily_state, handle_daily_tasks 14 14 15 - # Reset state to a previous day 16 15 _daily_state["last_day"] = date(2025, 1, 1) 17 16 18 - # Track submitted commands 19 17 submitted = [] 20 18 original_submit = mod._task_queue.submit 21 19 ··· 25 23 26 24 mod._task_queue.submit = capture_submit 27 25 28 - with patch("think.supervisor.datetime") as mock_datetime: 26 + with ( 27 + patch("think.supervisor.datetime") as mock_datetime, 28 + patch("think.supervisor.dirty_days", return_value=["20250101"]), 29 + ): 29 30 mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 30 31 handle_daily_tasks() 31 32 32 - # Verify a dream task was submitted 33 33 assert len(submitted) == 1 34 34 assert submitted[0][1] == "dream" 35 35 36 - # Verify day state was updated 37 36 assert _daily_state["last_day"] == date(2025, 1, 2) 38 37 39 38 ··· 43 42 from think.supervisor import _daily_state, handle_daily_tasks 44 43 45 44 today = date(2025, 1, 2) 46 - 47 - # Set state to today 48 45 _daily_state["last_day"] = today 49 46 50 - # Track submitted commands 51 47 submitted = [] 52 48 original_submit = mod._task_queue.submit 53 49 ··· 61 57 mock_datetime.now.return_value.date.return_value = today 62 58 handle_daily_tasks() 63 59 64 - # Verify no task was submitted 65 60 assert len(submitted) == 0 66 61 67 62 68 63 def test_handle_daily_tasks_submits_correct_command(mock_callosum): 69 - """Test that handle_daily_tasks submits sol dream with --refresh for the previous day.""" 64 + """Test that handle_daily_tasks submits sol dream without --refresh (dream auto-detects).""" 70 65 import think.supervisor as mod 71 66 from think.supervisor import _daily_state, handle_daily_tasks 72 67 73 68 _daily_state["last_day"] = date(2025, 1, 1) 74 69 75 - # Track submitted commands 76 70 submitted = [] 77 71 original_submit = mod._task_queue.submit 78 72 ··· 82 76 83 77 mod._task_queue.submit = capture_submit 84 78 85 - with patch("think.supervisor.datetime") as mock_datetime: 79 + with ( 80 + patch("think.supervisor.datetime") as mock_datetime, 81 + patch("think.supervisor.dirty_days", return_value=["20250101"]), 82 + ): 86 83 mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 87 84 handle_daily_tasks() 88 85 89 86 assert len(submitted) == 1 90 87 cmd = submitted[0] 91 - assert cmd[0] == "sol" 92 - assert cmd[1] == "dream" 93 - assert "--day" in cmd 94 - assert "20250101" in cmd 95 - assert "--refresh" in cmd 88 + assert cmd == ["sol", "dream", "-v", "--day", "20250101"] 96 89 97 90 98 91 def test_handle_daily_tasks_skipped_in_remote_mode(mock_callosum): ··· 100 93 import think.supervisor as mod 101 94 from think.supervisor import _daily_state, handle_daily_tasks 102 95 103 - # Reset state to a previous day (would normally trigger dream) 104 96 _daily_state["last_day"] = date(2025, 1, 1) 105 97 106 - # Track submitted commands 107 98 submitted = [] 108 99 original_submit = mod._task_queue.submit 109 100 ··· 113 104 114 105 mod._task_queue.submit = capture_submit 115 106 116 - # Enable remote mode (fixture resets after test) 117 107 mod._is_remote_mode = True 118 108 119 109 with patch("think.supervisor.datetime") as mock_datetime: 120 110 mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 121 111 handle_daily_tasks() 122 112 123 - # Verify no task was submitted (remote mode skips daily processing) 124 113 assert len(submitted) == 0 125 - # State should be unchanged (early return before any state updates) 126 114 assert _daily_state["last_day"] == date(2025, 1, 1) 115 + 116 + 117 + def test_handle_daily_tasks_multiple_dirty_days_chronological(mock_callosum): 118 + """Dirty days are submitted oldest-first so yesterday is processed last.""" 119 + import think.supervisor as mod 120 + from think.supervisor import _daily_state, handle_daily_tasks 121 + 122 + _daily_state["last_day"] = date(2025, 1, 5) 123 + 124 + submitted = [] 125 + original_submit = mod._task_queue.submit 126 + 127 + def capture_submit(cmd, *args, **kwargs): 128 + submitted.append(cmd) 129 + return original_submit(cmd, *args, **kwargs) 130 + 131 + mod._task_queue.submit = capture_submit 132 + 133 + with ( 134 + patch("think.supervisor.datetime") as mock_datetime, 135 + patch( 136 + "think.supervisor.dirty_days", 137 + return_value=["20250103", "20250104", "20250105"], 138 + ), 139 + ): 140 + mock_datetime.now.return_value.date.return_value = date(2025, 1, 6) 141 + handle_daily_tasks() 142 + 143 + assert len(submitted) == 3 144 + days = [cmd[cmd.index("--day") + 1] for cmd in submitted] 145 + assert days == ["20250103", "20250104", "20250105"] 146 + 147 + 148 + def test_handle_daily_tasks_caps_at_max_dirty_catchup(mock_callosum): 149 + """Only the newest MAX_DIRTY_CATCHUP days are processed.""" 150 + import think.supervisor as mod 151 + from think.supervisor import _daily_state, handle_daily_tasks 152 + 153 + _daily_state["last_day"] = date(2025, 1, 10) 154 + 155 + submitted = [] 156 + original_submit = mod._task_queue.submit 157 + 158 + def capture_submit(cmd, *args, **kwargs): 159 + submitted.append(cmd) 160 + return original_submit(cmd, *args, **kwargs) 161 + 162 + mod._task_queue.submit = capture_submit 163 + 164 + all_dirty = [ 165 + "20250104", 166 + "20250105", 167 + "20250106", 168 + "20250107", 169 + "20250108", 170 + "20250109", 171 + "20250110", 172 + ] 173 + 174 + with ( 175 + patch("think.supervisor.datetime") as mock_datetime, 176 + patch("think.supervisor.dirty_days", return_value=all_dirty), 177 + ): 178 + mock_datetime.now.return_value.date.return_value = date(2025, 1, 11) 179 + handle_daily_tasks() 180 + 181 + # Only newest 4 182 + assert len(submitted) == 4 183 + days = [cmd[cmd.index("--day") + 1] for cmd in submitted] 184 + assert days == ["20250107", "20250108", "20250109", "20250110"] 185 + 186 + 187 + def test_handle_daily_tasks_no_dirty_days(mock_callosum): 188 + """No submissions when there are no dirty days.""" 189 + import think.supervisor as mod 190 + from think.supervisor import _daily_state, handle_daily_tasks 191 + 192 + _daily_state["last_day"] = date(2025, 1, 1) 193 + 194 + submitted = [] 195 + original_submit = mod._task_queue.submit 196 + 197 + def capture_submit(cmd, *args, **kwargs): 198 + submitted.append(cmd) 199 + return original_submit(cmd, *args, **kwargs) 200 + 201 + mod._task_queue.submit = capture_submit 202 + 203 + with ( 204 + patch("think.supervisor.datetime") as mock_datetime, 205 + patch("think.supervisor.dirty_days", return_value=[]), 206 + ): 207 + mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 208 + handle_daily_tasks() 209 + 210 + assert len(submitted) == 0 211 + # State still advances even with no dirty days 212 + assert _daily_state["last_day"] == date(2025, 1, 2) 213 + 214 + 215 + def test_handle_daily_tasks_excludes_today(mock_callosum): 216 + """Today is excluded from dirty_days query.""" 217 + import think.supervisor as mod 218 + from think.supervisor import _daily_state, handle_daily_tasks 219 + 220 + _daily_state["last_day"] = date(2025, 1, 1) 221 + 222 + captured_exclude = {} 223 + 224 + def fake_dirty_days(exclude=None): 225 + captured_exclude["value"] = exclude 226 + return ["20250101"] 227 + 228 + with ( 229 + patch("think.supervisor.datetime") as mock_datetime, 230 + patch("think.supervisor.dirty_days", side_effect=fake_dirty_days), 231 + ): 232 + mock_datetime.now.return_value.date.return_value = date(2025, 1, 2) 233 + handle_daily_tasks() 234 + 235 + assert captured_exclude["value"] == {"20250102"}
+43 -13
think/supervisor.py
··· 23 23 from think.runner import DailyLogWriter 24 24 from think.runner import ManagedProcess as RunnerManagedProcess 25 25 from think.utils import ( 26 + dirty_days, 26 27 find_available_port, 27 28 get_journal, 28 29 get_journal_info, ··· 33 34 34 35 DEFAULT_THRESHOLD = 60 35 36 CHECK_INTERVAL = 30 37 + MAX_DIRTY_CATCHUP = 4 36 38 37 39 # Global shutdown flag 38 40 shutdown_requested = False ··· 1017 1019 1018 1020 1019 1021 def handle_daily_tasks() -> None: 1020 - """Check for day change and submit daily dream via task queue (non-blocking). 1022 + """Check for day change and submit daily dream for dirty days (non-blocking). 1021 1023 1022 - Dream only triggers when the day actually changes during runtime (at midnight). 1023 - The supervisor initializes last_day on startup, so restarts don't trigger dream. 1024 - Submitted via TaskQueue so it serializes with segment/activity/flush dreams. 1024 + Triggers once when the day rolls over at midnight. Queries ``dirty_days()`` 1025 + for journal days that have new stream data but haven't completed a daily 1026 + dream yet, then submits up to ``MAX_DIRTY_CATCHUP`` dreams in chronological 1027 + order (oldest first, yesterday last) via the TaskQueue. 1028 + 1029 + Dream auto-detects dirty state and enables ``--refresh`` internally, so we 1030 + don't pass it here. 1025 1031 1026 1032 Skipped in remote mode (no local data to process). 1027 1033 """ ··· 1051 1057 if not _flush_state["flushed"] and _flush_state["day"] == prev_day_str: 1052 1058 _check_segment_flush(force=True) 1053 1059 1054 - logging.info( 1055 - f"Day changed to {today}, starting daily processing for {prev_day_str}" 1056 - ) 1060 + today_str = today.strftime("%Y%m%d") 1061 + all_dirty = dirty_days(exclude={today_str}) 1062 + 1063 + if not all_dirty: 1064 + logging.info("Day changed to %s, no dirty days to process", today) 1065 + return 1057 1066 1058 - # Submit via task queue — serializes with other dream invocations 1059 - cmd = ["sol", "dream", "-v", "--day", prev_day_str, "--refresh"] 1060 - if _task_queue: 1061 - _task_queue.submit(cmd, day=prev_day_str) 1062 - else: 1067 + # Take the newest MAX_DIRTY_CATCHUP days (already sorted ascending) 1068 + days_to_process = all_dirty[-MAX_DIRTY_CATCHUP:] 1069 + skipped = len(all_dirty) - len(days_to_process) 1070 + 1071 + if skipped: 1063 1072 logging.warning( 1064 - "No task queue available for daily processing: %s", prev_day_str 1073 + "Skipping %d older dirty days (max catchup %d): %s", 1074 + skipped, 1075 + MAX_DIRTY_CATCHUP, 1076 + all_dirty[:skipped], 1065 1077 ) 1078 + 1079 + logging.info( 1080 + "Day changed to %s, queuing daily dream for %d dirty day(s): %s", 1081 + today, 1082 + len(days_to_process), 1083 + days_to_process, 1084 + ) 1085 + 1086 + # Submit oldest-first so yesterday is processed last 1087 + for day_str in days_to_process: 1088 + cmd = ["sol", "dream", "-v", "--day", day_str] 1089 + if _task_queue: 1090 + _task_queue.submit(cmd, day=day_str) 1091 + logging.debug("Submitted daily dream for %s", day_str) 1092 + else: 1093 + logging.warning( 1094 + "No task queue available for daily processing: %s", day_str 1095 + ) 1066 1096 1067 1097 1068 1098 def _handle_segment_observed(message: dict) -> None: