personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(sense): parallelize HandlerQueue per-handler concurrency

HandlerQueue now supports N concurrent slots per handler, configurable per
handler in journal.json via {handler}.max_concurrent. Default stays N=1
(byte-identical to prior behavior); operators raise it where API quota and
hardware allow. observe.status payload changes: handler_status["running"]
is now a list (length 0..N); top.py and apps/health/workspace.html updated
to render the count.

Co-Authored-By: Codex <codex@openai.com>

+273 -71
+10 -7
apps/health/workspace.html
··· 1490 1490 } 1491 1491 1492 1492 const processingDetails = []; 1493 - if (observers.some(observer => observer.describe?.running)) processingDetails.push('scene analysis'); 1494 - if (observers.some(observer => observer.transcribe?.running)) processingDetails.push('audio transcription'); 1493 + if (observers.some(observer => observer.describe?.running?.length)) processingDetails.push('scene analysis'); 1494 + if (observers.some(observer => observer.transcribe?.running?.length)) processingDetails.push('audio transcription'); 1495 1495 if (processingDetails.length > 0) { 1496 1496 parts.push(`Processing (${processingDetails.join(', ')})`); 1497 1497 } ··· 2188 2188 } else if (result.processor) { 2189 2189 // Describe/transcribe processor logic 2190 2190 const p = result.processor; 2191 - const isRunning = !!p.running; 2191 + const running = p.running || []; 2192 + const isRunning = running.length > 0; 2192 2193 const queued = p.queued?.length || 0; 2193 2194 if (isRunning && queued > 0) { 2194 - ch.statusEl.textContent = `running (+${queued} queued)`; 2195 + ch.statusEl.textContent = running.length > 1 2196 + ? `running (${running.length}, +${queued} queued)` 2197 + : `running (+${queued} queued)`; 2195 2198 } else if (isRunning) { 2196 - ch.statusEl.textContent = 'running'; 2199 + ch.statusEl.textContent = running.length > 1 ? `running (${running.length})` : 'running'; 2197 2200 } else if (queued > 0) { 2198 2201 ch.statusEl.textContent = `queued: ${queued}`; 2199 2202 } else { 2200 2203 ch.statusEl.textContent = 'idle'; 2201 2204 } 2202 - if (isRunning && p.running.file) { 2203 - ch.detailEl.textContent = truncate(p.running.file.split('/').pop(), 30); 2205 + if (isRunning && running[0]?.file) { 2206 + ch.detailEl.textContent = truncate(running[0].file.split('/').pop(), 30); 2204 2207 } else { 2205 2208 ch.detailEl.textContent = ''; 2206 2209 }
+73 -51
observe/sense.py
··· 29 29 CHRONICLE_DIR, 30 30 DATE_RE, 31 31 day_path, 32 + get_config, 32 33 get_journal, 33 34 get_rev, 34 35 iter_segments, ··· 40 41 ) 41 42 42 43 logger = logging.getLogger(__name__) 44 + 45 + # Handlers with serialized HandlerQueues. Add a new entry here when registering one in main(). 46 + HANDLER_NAMES = ("describe", "transcribe") 43 47 44 48 45 49 class QueuedItem: ··· 60 64 61 65 62 66 class HandlerQueue: 63 - """Queue for serializing handler execution (one at a time). 67 + """Queue for serializing handler execution. 64 68 65 - Ensures only one handler process runs at a time for resource-intensive 66 - operations like describe (GPU) or transcribe (memory/API constraints). 69 + Up to `max_concurrent` handler processes can run simultaneously per handler 70 + type. Default is 1 (serial); operators raise via {handler}.max_concurrent in 71 + journal.json where API quota / hardware allows it. 67 72 """ 68 73 69 - def __init__(self, name: str): 74 + def __init__(self, name: str, max_concurrent: int = 1): 70 75 self.name = name 76 + self.max_concurrent = max_concurrent 71 77 self.queue: List[QueuedItem] = [] 72 - self.current_process: Optional["HandlerProcess"] = None 78 + self.running: List["HandlerProcess"] = [] 73 79 74 80 def can_start(self) -> bool: 75 - """Returns True if no handler is currently running.""" 76 - return self.current_process is None 81 + """Returns True if a handler slot is available.""" 82 + return len(self.running) < self.max_concurrent 83 + 84 + def available_slots(self) -> int: 85 + """Return number of available handler slots.""" 86 + return self.max_concurrent - len(self.running) 87 + 88 + def add_running(self, proc: "HandlerProcess") -> None: 89 + """Track a running handler process.""" 90 + self.running.append(proc) 91 + 92 + def remove_running(self, proc: "HandlerProcess") -> None: 93 + """Stop tracking a running handler process.""" 94 + self.running.remove(proc) 77 95 78 96 def enqueue( 79 97 self, ··· 87 105 self.queue.append(QueuedItem(file_path, observer, meta)) 88 106 return True 89 107 return False 90 - 91 - def set_current(self, proc: "HandlerProcess") -> None: 92 - """Set the currently running handler process.""" 93 - self.current_process = proc 94 - 95 - def clear_current(self) -> None: 96 - """Clear the current process reference.""" 97 - self.current_process = None 98 108 99 109 def pop_next(self) -> Optional[QueuedItem]: 100 110 """Pop and return next queued item, or None if empty.""" ··· 145 155 self.running: Dict[Path, HandlerProcess] = {} 146 156 self.lock = threading.RLock() 147 157 148 - # Serialized handler queues (only one process at a time per handler type) 158 + # Serialized handler queues (bounded concurrent processes per handler type) 149 159 self.handler_queues: Dict[str, HandlerQueue] = { 150 - "describe": HandlerQueue("describe"), 151 - "transcribe": HandlerQueue("transcribe"), 160 + name: HandlerQueue(name, max_concurrent=self._resolve_concurrency(name)) 161 + for name in HANDLER_NAMES 152 162 } 153 163 154 164 self.running_flag = True ··· 173 183 self.segment_errors: Dict[str, list[str]] = {} 174 184 # Track stream identity per segment: {segment_key: stream_name} 175 185 self.segment_stream: Dict[str, str] = {} 186 + 187 + def _resolve_concurrency(self, handler_name: str) -> int: 188 + cfg = get_config() 189 + raw = cfg.get(handler_name, {}).get("max_concurrent", 1) 190 + if not isinstance(raw, int) or isinstance(raw, bool) or raw < 1: 191 + logger.warning( 192 + "Invalid %s.max_concurrent in journal config: %r — defaulting to 1", 193 + handler_name, 194 + raw, 195 + ) 196 + return 1 197 + return raw 176 198 177 199 def register(self, pattern: str, handler_name: str, command: List[str]): 178 200 """ ··· 342 364 ) 343 365 except RuntimeError as exc: 344 366 logger.error(str(exc)) 345 - # Release handler queue lock if this handler uses serialized execution 346 - handler_queue = self.handler_queues.get(handler_name) 347 - if handler_queue: 348 - with self.lock: 349 - handler_queue.clear_current() 350 367 return 351 368 352 369 handler_proc = HandlerProcess( ··· 355 372 356 373 with self.lock: 357 374 self.running[file_path] = handler_proc 358 - # Track as current process if using serialized execution 375 + # Track running processes if using serialized execution 359 376 handler_queue = self.handler_queues.get(handler_name) 360 377 if handler_queue: 361 - handler_queue.set_current(handler_proc) 378 + handler_queue.add_running(handler_proc) 362 379 363 380 # Monitor process completion in background 364 381 threading.Thread( ··· 455 472 exc_info=True, 456 473 ) 457 474 finally: 458 - # Process next queued item if this handler uses serialized execution 475 + # Process queued items if this handler uses serialized execution 459 476 handler_queue = self.handler_queues.get(handler_proc.handler_name) 460 - if handler_queue and handler_proc is handler_queue.current_process: 461 - self._process_next_queued(handler_queue) 477 + if handler_queue is not None: 478 + self._process_next_queued(handler_queue, handler_proc) 462 479 463 - def _process_next_queued(self, handler_queue: HandlerQueue): 464 - """Process next queued task for a serialized handler.""" 480 + def _process_next_queued( 481 + self, handler_queue: HandlerQueue, completed_proc: HandlerProcess 482 + ): 483 + """Free a slot and dispatch as many queued items as capacity allows.""" 465 484 with self.lock: 466 - handler_queue.clear_current() 467 - item = handler_queue.pop_next() 468 - if item: 485 + handler_queue.remove_running(completed_proc) 486 + while handler_queue.queue and handler_queue.can_start(): 487 + item = handler_queue.pop_next() 488 + if item is None: 489 + break 469 490 logger.info( 470 - f"Starting queued {handler_queue.name} for {item.file_path.name} " 471 - f"({handler_queue.queue_size()} remaining)" 491 + "Starting queued %s for %s", handler_queue.name, item.file_path 472 492 ) 473 493 handler_info = self._match_pattern(item.file_path) 474 494 if handler_info: ··· 688 708 for handler_name, handler_queue in self.handler_queues.items(): 689 709 handler_status = {} 690 710 691 - # Current running process 692 - if handler_queue.current_process is not None: 693 - handler_proc = handler_queue.current_process 694 - try: 695 - rel_file = journal_relative_path( 696 - journal_path, handler_proc.file_path 697 - ) 698 - except ValueError: 699 - rel_file = str(handler_proc.file_path) 711 + # Current running processes 712 + if handler_queue.running: 713 + running_list = [] 714 + for handler_proc in handler_queue.running: 715 + try: 716 + rel_file = journal_relative_path( 717 + journal_path, handler_proc.file_path 718 + ) 719 + except ValueError: 720 + rel_file = str(handler_proc.file_path) 700 721 701 - handler_status["running"] = { 702 - "file": rel_file, 703 - "ref": handler_proc.managed.ref, 704 - } 705 - handler_status["running"]["duration_seconds"] = int( 706 - now - handler_proc.started_at 707 - ) 722 + running_list.append( 723 + { 724 + "file": rel_file, 725 + "ref": handler_proc.managed.ref, 726 + "duration_seconds": int(now - handler_proc.started_at), 727 + } 728 + ) 729 + handler_status["running"] = running_list 708 730 709 731 # Queued items with age 710 732 if handler_queue.queue_size() > 0:
+180 -8
tests/test_sense.py
··· 54 54 def test_handler_queue_can_start_empty(): 55 55 """Test can_start returns True when no process running.""" 56 56 queue = HandlerQueue("test") 57 + assert queue.running == [] 57 58 assert queue.can_start() is True 58 59 59 60 60 - def test_handler_queue_can_start_with_current(): 61 + def test_handler_queue_can_start_when_full(): 61 62 """Test can_start returns False when process is running.""" 62 63 queue = HandlerQueue("test") 63 - queue.current_process = MagicMock() # Simulate running process 64 + queue.add_running(MagicMock()) 64 65 assert queue.can_start() is False 65 66 66 67 ··· 121 122 assert queue.pop_next() is None 122 123 123 124 124 - def test_handler_queue_set_clear_current(): 125 - """Test set_current and clear_current.""" 125 + def test_handler_queue_add_remove_running(): 126 + """Test add_running and remove_running.""" 126 127 queue = HandlerQueue("test") 127 128 mock_proc = MagicMock() 128 129 129 - queue.set_current(mock_proc) 130 - assert queue.current_process is mock_proc 130 + queue.add_running(mock_proc) 131 + assert queue.running == [mock_proc] 131 132 assert queue.can_start() is False 132 133 133 - queue.clear_current() 134 - assert queue.current_process is None 134 + queue.remove_running(mock_proc) 135 + assert queue.running == [] 135 136 assert queue.can_start() is True 137 + 138 + 139 + def test_handler_queue_backwards_compat_n1(): 140 + """Test default single-slot queue behavior stays FIFO and capacity-limited.""" 141 + queue = HandlerQueue("test") 142 + running = MagicMock() 143 + path1 = Path("/tmp/test1.flac") 144 + path2 = Path("/tmp/test2.flac") 145 + 146 + assert queue.can_start() is True 147 + queue.add_running(running) 148 + assert queue.can_start() is False 149 + 150 + assert queue.enqueue(path1) is True 151 + assert queue.enqueue(path2) is True 152 + assert queue.pop_next().file_path == path1 153 + assert queue.pop_next().file_path == path2 154 + assert queue.pop_next() is None 155 + 156 + 157 + def test_handler_queue_multi_slot_dispatch(): 158 + """Test max_concurrent allows multiple running handler processes.""" 159 + queue = HandlerQueue("test", max_concurrent=3) 160 + procs = [MagicMock(), MagicMock(), MagicMock()] 161 + 162 + for proc in procs: 163 + assert queue.can_start() is True 164 + queue.add_running(proc) 165 + 166 + assert queue.can_start() is False 167 + assert queue.running == procs 168 + assert queue.available_slots() == 0 169 + 170 + 171 + def test_handler_queue_slot_release_on_completion(): 172 + """Test removing a running process frees one slot.""" 173 + queue = HandlerQueue("test", max_concurrent=2) 174 + proc1 = MagicMock() 175 + proc2 = MagicMock() 176 + 177 + queue.add_running(proc1) 178 + queue.add_running(proc2) 179 + assert queue.can_start() is False 180 + 181 + queue.remove_running(proc1) 182 + assert queue.running == [proc2] 183 + assert queue.available_slots() == 1 184 + assert queue.can_start() is True 185 + 186 + 187 + def test_handler_queue_concurrent_enqueue_completion(): 188 + """Test serialized concurrent queue mutations stay consistent.""" 189 + queue = HandlerQueue("test", max_concurrent=5) 190 + lock = threading.Lock() 191 + 192 + def enqueue_items(): 193 + for idx in range(10): 194 + with lock: 195 + queue.enqueue(Path(f"/tmp/test{idx}.flac")) 196 + 197 + def complete_items(): 198 + for _ in range(5): 199 + proc = MagicMock() 200 + with lock: 201 + queue.add_running(proc) 202 + queue.remove_running(proc) 203 + 204 + enqueue_thread = threading.Thread(target=enqueue_items) 205 + completion_thread = threading.Thread(target=complete_items) 206 + enqueue_thread.start() 207 + completion_thread.start() 208 + enqueue_thread.join() 209 + completion_thread.join() 210 + 211 + queued_paths = [item.file_path for item in queue.queue] 212 + assert len(queue.running) == 0 213 + assert len(queued_paths) == 10 214 + assert len(set(queued_paths)) == 10 215 + 216 + 217 + def test_resolve_concurrency_per_handler_uniform(tmp_path, monkeypatch, caplog): 218 + """Test handler concurrency config is applied uniformly.""" 219 + import observe.sense as sense_module 220 + 221 + monkeypatch.setattr( 222 + sense_module, 223 + "get_config", 224 + lambda: { 225 + "describe": {"max_concurrent": 4}, 226 + "transcribe": {"max_concurrent": 2}, 227 + }, 228 + ) 229 + 230 + sensor = FileSensor(tmp_path) 231 + 232 + assert sensor.handler_queues["describe"].max_concurrent == 4 233 + assert sensor.handler_queues["transcribe"].max_concurrent == 2 234 + 235 + monkeypatch.setattr( 236 + sense_module, 237 + "get_config", 238 + lambda: { 239 + "describe": {"max_concurrent": "bad"}, 240 + "transcribe": {"max_concurrent": -1}, 241 + }, 242 + ) 243 + 244 + caplog.clear() 245 + invalid_sensor = FileSensor(tmp_path) 246 + 247 + assert invalid_sensor.handler_queues["describe"].max_concurrent == 1 248 + assert invalid_sensor.handler_queues["transcribe"].max_concurrent == 1 249 + assert "Invalid describe.max_concurrent" in caplog.text 250 + assert "Invalid transcribe.max_concurrent" in caplog.text 251 + 252 + 253 + def test_process_next_queued_dispatches_on_slot_release(tmp_path, monkeypatch): 254 + """Test queued work dispatches one item per freed handler slot.""" 255 + import observe.sense as sense_module 256 + 257 + monkeypatch.setattr(sense_module, "HANDLER_NAMES", ("test",)) 258 + monkeypatch.setattr( 259 + sense_module, "get_config", lambda: {"test": {"max_concurrent": 2}} 260 + ) 261 + 262 + segment_dir = tmp_path / "chronicle" / "20250101" / "default" / "143022_300" 263 + segment_dir.mkdir(parents=True) 264 + path1 = segment_dir / "first.fake" 265 + path2 = segment_dir / "second.fake" 266 + path3 = segment_dir / "third.fake" 267 + for path in (path1, path2, path3): 268 + path.write_text("content") 269 + 270 + sensor = FileSensor(tmp_path) 271 + sensor.register("*.fake", "test", ["echo", "{file}"]) 272 + handler_queue = sensor.handler_queues["test"] 273 + mock_proc_a = MagicMock() 274 + mock_proc_b = MagicMock() 275 + dispatched = [] 276 + 277 + def mock_spawn(file_path, handler_name, command, **_kwargs): 278 + dispatched_proc = MagicMock() 279 + dispatched_proc.file_path = file_path 280 + dispatched.append((file_path, handler_name, command, dispatched_proc)) 281 + handler_queue.add_running(dispatched_proc) 282 + sensor.running[file_path] = dispatched_proc 283 + 284 + monkeypatch.setattr(sensor, "_spawn_handler", mock_spawn) 285 + 286 + handler_queue.add_running(mock_proc_a) 287 + handler_queue.add_running(mock_proc_b) 288 + handler_queue.enqueue(path1) 289 + handler_queue.enqueue(path2) 290 + handler_queue.enqueue(path3) 291 + 292 + sensor._process_next_queued(handler_queue, mock_proc_a) 293 + 294 + assert mock_proc_a not in handler_queue.running 295 + assert len(dispatched) == 1 296 + assert len(handler_queue.running) == 2 297 + assert handler_queue.running[0] is mock_proc_b 298 + assert dispatched[0][0] == path1 299 + assert [item.file_path for item in handler_queue.queue] == [path2, path3] 300 + 301 + sensor._process_next_queued(handler_queue, mock_proc_b) 302 + 303 + assert mock_proc_b not in handler_queue.running 304 + assert len(dispatched) == 2 305 + assert len(handler_queue.running) == 2 306 + assert dispatched[1][0] == path2 307 + assert [item.file_path for item in handler_queue.queue] == [path3] 136 308 137 309 138 310 def test_handler_queue_queue_size():
+4
think/journal_default.json
··· 20 20 "portal_url": "https://support.solpbc.org" 21 21 }, 22 22 "describe": { 23 + "max_concurrent": 1, 23 24 "redact": [ 24 25 "use *** instead of any visible passwords, credentials, keys, tokens, and secrets", 25 26 "completely omit and ignore any NSFW or adult content, do not mention or note it", 26 27 "use *** instead of any visible credit card numbers, bank account numbers, and government ID numbers" 27 28 ] 29 + }, 30 + "transcribe": { 31 + "max_concurrent": 1 28 32 }, 29 33 "agent": { 30 34 "name": "sol",
+6 -5
think/top.py
··· 605 605 return "─".ljust(width) 606 606 607 607 parts = [] 608 - if handler_dict.get("running"): 609 - parts.append("▸1") 610 - queued = handler_dict.get("queued", []) 611 - if queued: 612 - parts.append(f"+{len(queued)}") 608 + running_count = len(handler_dict.get("running") or []) 609 + if running_count > 0: 610 + parts.append(f"▸{running_count}") 611 + queued_count = len(handler_dict.get("queued") or []) 612 + if queued_count > 0: 613 + parts.append(f"+{queued_count}") 613 614 614 615 result = " ".join(parts) if parts else "─" 615 616 return result.ljust(width)