personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: prevent disk-full from permanently stalling supervisor task queue

A disk-full condition at 02:00 caused the import task queue to get
permanently stuck because cleanup failures in _run_task's finally block
prevented _process_next() from ever running. Every subsequent hourly
plaud sync queued behind the phantom "running" import for 9+ hours.

- Isolate each cleanup step in _run_task's finally block so
_process_next() always executes regardless of cleanup failures
- Add stale-queue detection: _running now tracks the task thread,
and submit() checks thread liveness before queuing — if the thread
died without clearing state, it self-heals on next submission
- Harden RunnerManagedProcess.cleanup() so one failure doesn't
block subsequent cleanup steps
- Make DailyLogWriter.write() swallow OSError so output threads
survive disk-full instead of crashing silently

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+124 -34
+49 -4
tests/test_supervisor.py
··· 385 385 386 386 # Create task queue with pre-set state 387 387 mod._task_queue = mod.TaskQueue(on_queue_change=None) 388 - mod._task_queue._running = {"indexer": "ref123"} 388 + mod._task_queue._running = {"indexer": {"ref": "ref123", "thread": None}} 389 389 mod._task_queue._queues = { 390 390 "indexer": [ 391 391 {"refs": ["queued-ref"], "cmd": ["sol", "indexer", "--rescan-full"]} ··· 418 418 419 419 # Create task queue with pre-set state (no queued tasks) 420 420 mod._task_queue = mod.TaskQueue(on_queue_change=None) 421 - mod._task_queue._running = {"indexer": "ref123"} 421 + mod._task_queue._running = {"indexer": {"ref": "ref123", "thread": None}} 422 422 mod._task_queue._queues = {"indexer": []} 423 423 424 424 spawned = [] ··· 462 462 mod._handle_task_request(msg) 463 463 464 464 # Should use the provided ref 465 - assert mod._task_queue._running["indexer"] == "my-custom-ref-123" 465 + assert mod._task_queue._running["indexer"]["ref"] == "my-custom-ref-123" 466 466 assert spawned[0][0] == ["my-custom-ref-123"] # refs is a list 467 467 468 468 ··· 564 564 565 565 # Create task queue with pre-set state (queued task with multiple refs) 566 566 mod._task_queue = mod.TaskQueue(on_queue_change=None) 567 - mod._task_queue._running = {"indexer": "running-ref"} 567 + mod._task_queue._running = {"indexer": {"ref": "running-ref", "thread": None}} 568 568 mod._task_queue._queues = { 569 569 "indexer": [ 570 570 { ··· 588 588 assert len(spawned) == 1 589 589 assert spawned[0][0] == ["ref-A", "ref-B", "ref-C"] # all refs passed 590 590 assert spawned[0][1] == ["sol", "indexer", "--rescan"] 591 + 592 + 593 + def test_stale_queue_detected_on_submit(monkeypatch): 594 + """Test that a dead task thread is detected and cleared on next submit.""" 595 + import threading 596 + 597 + mod = importlib.import_module("think.supervisor") 598 + 599 + mod._task_queue = mod.TaskQueue(on_queue_change=None) 600 + 601 + # Create a dead thread BEFORE monkeypatching Thread.start 602 + dead_thread = threading.Thread(target=lambda: None) 603 + dead_thread.start() 604 + dead_thread.join() 605 + assert not dead_thread.is_alive() 606 + 607 + spawned = [] 608 + 609 + def fake_thread_start(self): 610 + spawned.append(self._target.__name__) 611 + 612 + monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 613 + 614 + mod._task_queue._running = {"indexer": {"ref": "stale-ref", "thread": dead_thread}} 615 + mod._task_queue._queues = { 616 + "indexer": [ 617 + {"refs": ["queued-ref"], "cmd": ["sol", "indexer", "--rescan-full"]} 618 + ] 619 + } 620 + 621 + # Submit a new indexer task — should detect stale state and start immediately 622 + msg = { 623 + "tract": "supervisor", 624 + "event": "request", 625 + "cmd": ["sol", "indexer", "--rescan-new"], 626 + "ref": "new-ref", 627 + } 628 + mod._handle_task_request(msg) 629 + 630 + # Stale entry should have been cleared, new task started 631 + assert mod._task_queue._running["indexer"]["ref"] == "new-ref" 632 + assert len(spawned) == 1 633 + 634 + # Old queued entries should still be in queue (stale clear only removes _running) 635 + assert len(mod._task_queue._queues["indexer"]) == 1 591 636 592 637 593 638 def test_supervisor_singleton_lock_acquired(tmp_path, monkeypatch):
+42 -23
think/runner.py
··· 141 141 # Close old log 142 142 if not self._fh.closed: 143 143 self._fh.close() 144 - # Open new log for new day 145 - self._current_day = day_now 146 - self._fh = self._open_log() 147 - # Update symlinks to point to new day's file 148 - self._update_symlinks() 144 + # Open new log for new day — keep old handle on failure 145 + try: 146 + self._fh = self._open_log() 147 + self._current_day = day_now 148 + self._update_symlinks() 149 + except OSError: 150 + pass 149 151 150 - # Write and flush 151 - self._fh.write(message) 152 - self._fh.flush() 152 + # Write and flush — swallow disk-full so output threads survive 153 + try: 154 + self._fh.write(message) 155 + self._fh.flush() 156 + except OSError: 157 + pass 153 158 154 159 def close(self) -> None: 155 160 """Close log file.""" ··· 396 401 """Wait for output threads to finish and close log file. 397 402 398 403 Call this after process exits to clean up resources. 404 + Each step is isolated so one failure doesn't block the rest. 399 405 """ 400 406 for thread in self._threads: 401 - thread.join(timeout=1) 402 - self.log_writer.close() 407 + try: 408 + thread.join(timeout=1) 409 + except Exception: 410 + pass 411 + 412 + try: 413 + self.log_writer.close() 414 + except Exception: 415 + pass 403 416 404 417 # Emit exit event 405 418 if self._callosum: 406 - duration_ms = int((time.time() - self._start_time) * 1000) 407 - self._callosum.emit( 408 - "logs", 409 - "exit", 410 - ref=self.ref, 411 - name=self.name, 412 - pid=self.pid, 413 - exit_code=self.returncode, 414 - duration_ms=duration_ms, 415 - cmd=self.cmd, 416 - log_path=str(self.log_writer.path), 417 - ) 419 + try: 420 + duration_ms = int((time.time() - self._start_time) * 1000) 421 + self._callosum.emit( 422 + "logs", 423 + "exit", 424 + ref=self.ref, 425 + name=self.name, 426 + pid=self.pid, 427 + exit_code=self.returncode, 428 + duration_ms=duration_ms, 429 + cmd=self.cmd, 430 + log_path=str(self.log_writer.path), 431 + ) 432 + except Exception: 433 + pass 418 434 # Only stop callosum if we created it (not shared) 419 435 if self._owns_callosum: 420 - self._callosum.stop() 436 + try: 437 + self._callosum.stop() 438 + except Exception: 439 + pass 421 440 422 441 @property 423 442 def pid(self) -> int:
+33 -7
think/supervisor.py
··· 143 143 on_queue_change: Optional callback(cmd_name, running_ref, queue_entries) 144 144 called after queue state changes. Called outside lock. 145 145 """ 146 - self._running: dict[str, str] = {} # command_name -> ref of running task 146 + self._running: dict[str, dict] = {} # command_name -> {"ref": str, "thread": Thread} 147 147 self._queues: dict[str, list] = {} # command_name -> list of {refs, cmd} dicts 148 148 self._active: dict[str, RunnerManagedProcess] = {} # ref -> process 149 149 self._lock = threading.Lock() ··· 166 166 167 167 with self._lock: 168 168 queue = list(self._queues.get(cmd_name, [])) 169 - running_ref = self._running.get(cmd_name) 169 + entry = self._running.get(cmd_name) 170 + running_ref = entry["ref"] if entry else None 170 171 171 172 self._on_queue_change(cmd_name, running_ref, queue) 172 173 ··· 196 197 should_start = False 197 198 198 199 with self._lock: 200 + # Detect stale running state (task thread died without clearing queue) 201 + if cmd_name in self._running: 202 + stale = self._running[cmd_name] 203 + if stale["thread"] is not None and not stale["thread"].is_alive(): 204 + logging.warning( 205 + f"Clearing stale {cmd_name} queue " 206 + f"(thread dead, ref={stale['ref']})" 207 + ) 208 + self._running.pop(cmd_name) 209 + 199 210 if cmd_name in self._running: 200 211 # Command already running - queue or coalesce 201 212 queue = self._queues.setdefault(cmd_name, []) ··· 220 231 should_notify = True 221 232 else: 222 233 # Not running - mark as running and start 223 - self._running[cmd_name] = ref 234 + # Thread is set to None here; _run_task registers it on entry 235 + self._running[cmd_name] = {"ref": ref, "thread": None} 224 236 should_start = True 225 237 226 238 # Notify outside lock ··· 254 266 cmd_name: Command name for queue management 255 267 day: Optional day override (YYYYMMDD) for log placement 256 268 """ 269 + # Register this thread for stale-queue detection 270 + with self._lock: 271 + if cmd_name in self._running and self._running[cmd_name]["ref"] == refs[0]: 272 + self._running[cmd_name]["thread"] = threading.current_thread() 273 + 257 274 callosum = CallosumConnection() 258 275 managed = None 259 276 primary_ref = refs[0] ··· 309 326 exit_code=-1, 310 327 ) 311 328 finally: 312 - if managed: 313 - managed.cleanup() 329 + # Each cleanup step is isolated so _process_next always runs. 330 + # A disk-full or other OS error in cleanup must never stall the queue. 331 + try: 332 + if managed: 333 + managed.cleanup() 334 + except Exception: 335 + logging.exception(f"Task {cmd_name} ({primary_ref}): cleanup failed") 314 336 self._active.pop(primary_ref, None) 315 - callosum.stop() 337 + try: 338 + callosum.stop() 339 + except Exception: 340 + logging.exception(f"Task {cmd_name} ({primary_ref}): callosum stop failed") 316 341 self._process_next(cmd_name) 317 342 318 343 def _process_next(self, cmd_name: str) -> None: ··· 328 353 refs = entry["refs"] 329 354 next_cmd = entry["cmd"] 330 355 day = entry.get("day") 331 - self._running[cmd_name] = refs[0] 356 + # Thread is set to None here; _run_task registers it on entry 357 + self._running[cmd_name] = {"ref": refs[0], "thread": None} 332 358 logging.info( 333 359 f"Dequeued task {cmd_name}: {' '.join(next_cmd)} refs={refs} " 334 360 f"(remaining: {len(queue)})"