personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add backpressure to transcribe handler and fix queued context loss

Refactor sense.py handler queue system to serialize both describe and
transcribe execution (one at a time each). Introduces QueuedItem class
to preserve remote context for deferred processing, fixing a bug where
REMOTE_NAME env var was lost for queued files.

- Add HandlerQueue class for reusable serialized execution
- Add QueuedItem to store file_path, queued_at, and remote context
- Both describe and transcribe now run one-at-a-time
- Status events report queue depth for both handler types
- Add 11 unit tests for QueuedItem and HandlerQueue

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+250 -79
+122 -78
observe/sense.py
··· 28 28 logger = logging.getLogger(__name__) 29 29 30 30 31 + class QueuedItem: 32 + """Item in a handler queue with context for deferred processing.""" 33 + 34 + __slots__ = ("file_path", "queued_at", "remote") 35 + 36 + def __init__(self, file_path: Path, remote: Optional[str] = None): 37 + self.file_path = file_path 38 + self.queued_at = time.time() 39 + self.remote = remote 40 + 41 + 42 + class HandlerQueue: 43 + """Queue for serializing handler execution (one at a time). 44 + 45 + Ensures only one handler process runs at a time for resource-intensive 46 + operations like describe (GPU) or transcribe (memory/API constraints). 47 + """ 48 + 49 + def __init__(self, name: str): 50 + self.name = name 51 + self.queue: List[QueuedItem] = [] 52 + self.current_process: Optional["HandlerProcess"] = None 53 + 54 + def can_start(self) -> bool: 55 + """Returns True if no handler is currently running.""" 56 + return self.current_process is None 57 + 58 + def enqueue(self, file_path: Path, remote: Optional[str] = None) -> bool: 59 + """Add file to queue if not already present. Returns True if queued.""" 60 + queued_paths = [item.file_path for item in self.queue] 61 + if file_path not in queued_paths: 62 + self.queue.append(QueuedItem(file_path, remote)) 63 + return True 64 + return False 65 + 66 + def set_current(self, proc: "HandlerProcess") -> None: 67 + """Set the currently running handler process.""" 68 + self.current_process = proc 69 + 70 + def clear_current(self) -> None: 71 + """Clear the current process reference.""" 72 + self.current_process = None 73 + 74 + def pop_next(self) -> Optional[QueuedItem]: 75 + """Pop and return next queued item, or None if empty.""" 76 + if self.queue: 77 + return self.queue.pop(0) 78 + return None 79 + 80 + def queue_size(self) -> int: 81 + """Return number of items in queue.""" 82 + return len(self.queue) 83 + 84 + 31 85 class HandlerProcess: 32 86 """Manages a running handler subprocess with RunnerManagedProcess.""" 33 87 ··· 58 112 self.running: Dict[Path, HandlerProcess] = {} 59 113 self.lock = threading.RLock() 60 114 61 - # Queue for describe requests (only one describe runs at a time) 62 - # Each entry is (file_path, queued_at_timestamp) 63 - self.describe_queue: List[tuple[Path, float]] = [] 64 - self.current_describe_process: Optional[HandlerProcess] = None 115 + # Serialized handler queues (only one process at a time per handler type) 116 + self.handler_queues: Dict[str, HandlerQueue] = { 117 + "describe": HandlerQueue("describe"), 118 + "transcribe": HandlerQueue("transcribe"), 119 + } 65 120 66 121 self.running_flag = True 67 122 ··· 167 222 self.segment_batch[segment] = True 168 223 self.segment_files[segment].add(file_path) 169 224 170 - # Queue describe requests to ensure only one runs at a time 171 - if handler_name == "describe": 172 - if self.current_describe_process is not None: 173 - # Check if file already queued (compare just paths) 174 - queued_paths = [p for p, _ in self.describe_queue] 175 - if file_path not in queued_paths: 176 - self.describe_queue.append((file_path, time.time())) 177 - logger.info( 178 - f"Queueing {file_path.name} for describe (queue size: {len(self.describe_queue)})" 179 - ) 180 - return 225 + # Check if this handler uses serialized execution 226 + handler_queue = self.handler_queues.get(handler_name) 227 + if handler_queue and not handler_queue.can_start(): 228 + if handler_queue.enqueue(file_path, remote=remote): 229 + logger.info( 230 + f"Queueing {file_path.name} for {handler_name} " 231 + f"(queue size: {handler_queue.queue_size()})" 232 + ) 233 + return 181 234 182 235 # Generate correlation ID for this handler run 183 236 ref = str(int(time.time() * 1000)) ··· 225 278 ) 226 279 except RuntimeError as exc: 227 280 logger.error(str(exc)) 228 - # Release describe lock if this was a describe handler 229 - if handler_name == "describe": 281 + # Release handler queue lock if this handler uses serialized execution 282 + handler_queue = self.handler_queues.get(handler_name) 283 + if handler_queue: 230 284 with self.lock: 231 - self.current_describe_process = None 285 + handler_queue.clear_current() 232 286 return 233 287 234 288 handler_proc = HandlerProcess(file_path, managed, handler_name) 235 289 236 290 with self.lock: 237 291 self.running[file_path] = handler_proc 238 - if handler_name == "describe": 239 - self.current_describe_process = handler_proc 292 + # Track as current process if using serialized execution 293 + handler_queue = self.handler_queues.get(handler_name) 294 + if handler_queue: 295 + handler_queue.set_current(handler_proc) 240 296 241 297 # Monitor process completion in background 242 298 threading.Thread( ··· 275 331 exc_info=True, 276 332 ) 277 333 finally: 278 - # Always process next queued describe if this was a describe handler 279 - if handler_proc is self.current_describe_process: 280 - self._process_next_describe() 334 + # Process next queued item if this handler uses serialized execution 335 + handler_queue = self.handler_queues.get(handler_proc.handler_name) 336 + if handler_queue and handler_proc is handler_queue.current_process: 337 + self._process_next_queued(handler_queue) 281 338 282 - def _process_next_describe(self): 283 - """Process next queued describe task.""" 339 + def _process_next_queued(self, handler_queue: HandlerQueue): 340 + """Process next queued task for a serialized handler.""" 284 341 with self.lock: 285 - self.current_describe_process = None 286 - if self.describe_queue: 287 - next_file, queued_at = self.describe_queue.pop(0) 342 + handler_queue.clear_current() 343 + item = handler_queue.pop_next() 344 + if item: 288 345 logger.info( 289 - f"Starting queued describe for {next_file.name} ({len(self.describe_queue)} remaining)" 346 + f"Starting queued {handler_queue.name} for {item.file_path.name} " 347 + f"({handler_queue.queue_size()} remaining)" 290 348 ) 291 - handler_info = self._match_pattern(next_file) 349 + handler_info = self._match_pattern(item.file_path) 292 350 if handler_info: 293 351 handler_name, command = handler_info 294 - self._spawn_handler(next_file, handler_name, command) 352 + self._spawn_handler( 353 + item.file_path, handler_name, command, remote=item.remote 354 + ) 295 355 296 356 def _check_segment_observed(self, file_path: Path): 297 357 """Check if all files for this segment have completed processing.""" ··· 415 475 416 476 with self.lock: 417 477 # Check if there's any activity to report 418 - if not self.running and not self.describe_queue: 478 + has_queued = any(q.queue_size() > 0 for q in self.handler_queues.values()) 479 + if not self.running and not has_queued: 419 480 return # Nothing active, don't emit 420 481 421 482 # Build status object ··· 423 484 424 485 # Get journal path for relative paths 425 486 journal_path = get_journal() 426 - 427 - # Collect describe info 428 - describe_running = None 429 - describe_queued = [] 430 - 431 - if self.current_describe_process is not None: 432 - handler_proc = self.current_describe_process 433 - try: 434 - rel_file = str(handler_proc.file_path.relative_to(journal_path)) 435 - except ValueError: 436 - rel_file = str(handler_proc.file_path) 437 - 438 - describe_running = { 439 - "file": rel_file, 440 - "ref": handler_proc.managed.ref, 441 - } 442 - 443 - # Get queued describes with age 444 487 now = time.time() 445 - for file_path, queued_at in self.describe_queue: 446 - try: 447 - rel_file = str(file_path.relative_to(journal_path)) 448 - except ValueError: 449 - rel_file = str(file_path) 450 488 451 - describe_queued.append( 452 - {"file": rel_file, "age_seconds": int(now - queued_at)} 453 - ) 454 - 455 - # Add describe section if any activity 456 - if describe_running or describe_queued: 457 - status["describe"] = {} 458 - if describe_running: 459 - status["describe"]["running"] = describe_running 460 - if describe_queued: 461 - status["describe"]["queued"] = describe_queued 489 + # Build status for each serialized handler queue 490 + for handler_name, handler_queue in self.handler_queues.items(): 491 + handler_status = {} 462 492 463 - # Collect transcribe info (any running handler that's not describe) 464 - transcribe_running = [] 465 - for file_path, handler_proc in self.running.items(): 466 - if handler_proc is not self.current_describe_process: 493 + # Current running process 494 + if handler_queue.current_process is not None: 495 + handler_proc = handler_queue.current_process 467 496 try: 468 - rel_file = str(file_path.relative_to(journal_path)) 497 + rel_file = str(handler_proc.file_path.relative_to(journal_path)) 469 498 except ValueError: 470 - rel_file = str(file_path) 499 + rel_file = str(handler_proc.file_path) 500 + 501 + handler_status["running"] = { 502 + "file": rel_file, 503 + "ref": handler_proc.managed.ref, 504 + } 505 + 506 + # Queued items with age 507 + if handler_queue.queue_size() > 0: 508 + queued_list = [] 509 + for item in handler_queue.queue: 510 + try: 511 + rel_file = str(item.file_path.relative_to(journal_path)) 512 + except ValueError: 513 + rel_file = str(item.file_path) 471 514 472 - transcribe_running.append( 473 - {"file": rel_file, "ref": handler_proc.managed.ref} 474 - ) 515 + queued_list.append( 516 + {"file": rel_file, "age_seconds": int(now - item.queued_at)} 517 + ) 518 + handler_status["queued"] = queued_list 475 519 476 - # Add transcribe section if any activity 477 - if transcribe_running: 478 - status["transcribe"] = {"running": transcribe_running} 520 + # Add section if any activity for this handler 521 + if handler_status: 522 + status[handler_name] = handler_status 479 523 480 524 # Only emit if we have something to report 481 525 if status:
+128 -1
tests/test_sense.py
··· 12 12 13 13 import pytest 14 14 15 - from observe.sense import FileSensor, HandlerProcess 15 + from observe.sense import FileSensor, HandlerProcess, HandlerQueue, QueuedItem 16 16 from think.runner import DailyLogWriter as ProcessLogWriter 17 17 from think.runner import _format_log_line 18 + 19 + 20 + # --- QueuedItem Tests --- 21 + 22 + 23 + def test_queued_item_basic(): 24 + """Test QueuedItem stores file_path and queued_at.""" 25 + path = Path("/tmp/test.flac") 26 + item = QueuedItem(path) 27 + 28 + assert item.file_path == path 29 + assert item.queued_at > 0 30 + assert item.remote is None 31 + 32 + 33 + def test_queued_item_with_remote(): 34 + """Test QueuedItem stores remote context.""" 35 + path = Path("/tmp/test.flac") 36 + item = QueuedItem(path, remote="my-remote") 37 + 38 + assert item.file_path == path 39 + assert item.remote == "my-remote" 40 + 41 + 42 + # --- HandlerQueue Tests --- 43 + 44 + 45 + def test_handler_queue_can_start_empty(): 46 + """Test can_start returns True when no process running.""" 47 + queue = HandlerQueue("test") 48 + assert queue.can_start() is True 49 + 50 + 51 + def test_handler_queue_can_start_with_current(): 52 + """Test can_start returns False when process is running.""" 53 + queue = HandlerQueue("test") 54 + queue.current_process = MagicMock() # Simulate running process 55 + assert queue.can_start() is False 56 + 57 + 58 + def test_handler_queue_enqueue(): 59 + """Test enqueue adds items to queue.""" 60 + queue = HandlerQueue("test") 61 + path1 = Path("/tmp/test1.flac") 62 + path2 = Path("/tmp/test2.flac") 63 + 64 + assert queue.enqueue(path1) is True 65 + assert queue.enqueue(path2) is True 66 + assert queue.queue_size() == 2 67 + 68 + 69 + def test_handler_queue_enqueue_duplicate(): 70 + """Test enqueue rejects duplicate paths.""" 71 + queue = HandlerQueue("test") 72 + path = Path("/tmp/test.flac") 73 + 74 + assert queue.enqueue(path) is True 75 + assert queue.enqueue(path) is False # Duplicate 76 + assert queue.queue_size() == 1 77 + 78 + 79 + def test_handler_queue_enqueue_with_remote(): 80 + """Test enqueue preserves remote context.""" 81 + queue = HandlerQueue("test") 82 + path = Path("/tmp/test.flac") 83 + 84 + queue.enqueue(path, remote="my-remote") 85 + 86 + assert queue.queue_size() == 1 87 + item = queue.pop_next() 88 + assert item.remote == "my-remote" 89 + 90 + 91 + def test_handler_queue_pop_next(): 92 + """Test pop_next returns items in FIFO order.""" 93 + queue = HandlerQueue("test") 94 + path1 = Path("/tmp/test1.flac") 95 + path2 = Path("/tmp/test2.flac") 96 + 97 + queue.enqueue(path1) 98 + queue.enqueue(path2) 99 + 100 + item1 = queue.pop_next() 101 + assert item1.file_path == path1 102 + 103 + item2 = queue.pop_next() 104 + assert item2.file_path == path2 105 + 106 + assert queue.pop_next() is None # Empty 107 + 108 + 109 + def test_handler_queue_pop_next_empty(): 110 + """Test pop_next returns None on empty queue.""" 111 + queue = HandlerQueue("test") 112 + assert queue.pop_next() is None 113 + 114 + 115 + def test_handler_queue_set_clear_current(): 116 + """Test set_current and clear_current.""" 117 + queue = HandlerQueue("test") 118 + mock_proc = MagicMock() 119 + 120 + queue.set_current(mock_proc) 121 + assert queue.current_process is mock_proc 122 + assert queue.can_start() is False 123 + 124 + queue.clear_current() 125 + assert queue.current_process is None 126 + assert queue.can_start() is True 127 + 128 + 129 + def test_handler_queue_queue_size(): 130 + """Test queue_size reports correct count.""" 131 + queue = HandlerQueue("test") 132 + assert queue.queue_size() == 0 133 + 134 + queue.enqueue(Path("/tmp/test1.flac")) 135 + assert queue.queue_size() == 1 136 + 137 + queue.enqueue(Path("/tmp/test2.flac")) 138 + assert queue.queue_size() == 2 139 + 140 + queue.pop_next() 141 + assert queue.queue_size() == 1 142 + 143 + 144 + # --- Existing Tests --- 18 145 19 146 20 147 def test_format_log_line():