personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: ensure callosum drains queue before shutdown and improve task cleanup

Fix race condition in CallosumConnection where exit events could be lost when
stop() was called immediately after emit(). Refactored _run_loop to only exit
when both stop_event is set AND queue is empty, guaranteeing all messages are
sent before shutdown. Reduced join timeout from 2s to 0.5s since proper queue
draining makes long waits unnecessary.

Replace timeout-based task cleanup with PID validity checking in service
manager. Remove arbitrary 5-minute timeout that could kill slow processes.
Instead, detect dead tasks by checking if PID exists via psutil, cleaning up
tasks that crashed, were killed, or became zombies. Check every 5s for faster
cleanup.

Add resilience to missed exec events by detecting new tasks from logs/line
events. If manager starts after processes are running, it will pick them up
from first log output using ref, name, and pid fields.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+80 -4
+7 -4
think/callosum.py
··· 201 201 buffer = "" 202 202 last_connect_attempt = 0.0 203 203 204 - while not self.stop_event.is_set(): 204 + while True: 205 205 # Try to connect if not connected (rate limited to 1/sec) 206 206 if not sock and time.time() - last_connect_attempt > 1.0: 207 207 try: ··· 241 241 f"{msg.get('tract')}/{msg.get('event')}" 242 242 ) 243 243 except queue.Empty: 244 - pass # Normal, continue to receive 244 + # Queue is empty - check if we should exit 245 + if self.stop_event.is_set(): 246 + break 247 + # Otherwise continue to receive 245 248 246 249 # Receive incoming messages (only if connected) 247 250 if sock: ··· 313 316 return False 314 317 315 318 def stop(self) -> None: 316 - """Stop background thread gracefully.""" 319 + """Stop background thread gracefully, draining queue first.""" 317 320 if not self.thread: 318 321 return 319 322 320 323 self.stop_event.set() 321 - self.thread.join(timeout=2) 324 + self.thread.join(timeout=0.5) 322 325 323 326 if self.thread.is_alive(): 324 327 logger.warning("Background thread did not stop cleanly")
+73
think/manage.py
··· 6 6 7 7 import argparse 8 8 import asyncio 9 + import signal 9 10 from datetime import datetime, timedelta 10 11 11 12 import psutil ··· 110 111 111 112 elif event == "line": 112 113 ref = message.get("ref") 114 + name = message.get("name") 115 + pid = message.get("pid") 113 116 line = message.get("line", "") 114 117 stream = message.get("stream", "stdout") 118 + 115 119 if ref: 120 + # Create task entry if we haven't seen it yet (missed exec event) 121 + if ref not in self.running_tasks and name and pid: 122 + self.running_tasks[ref] = { 123 + "ref": ref, 124 + "name": name, 125 + "pid": pid, 126 + "cmd": [], # Unknown since we missed exec 127 + "start_time": datetime.now(), 128 + } 129 + # Initialize CPU tracking 130 + try: 131 + self.cpu_procs[pid] = psutil.Process(pid) 132 + self.cpu_procs[pid].cpu_percent(interval=None) 133 + except (psutil.NoSuchProcess, psutil.AccessDenied): 134 + pass 135 + 136 + # Store log line 116 137 self.last_log_lines[ref] = (datetime.now(), stream, line) 117 138 118 139 elif event == "exit": ··· 231 252 return f"{self.cpu_cache[pid]:.0f}" 232 253 return "-" 233 254 255 + def cleanup_dead_tasks(self) -> None: 256 + """Remove tasks whose PIDs are no longer valid. 257 + 258 + This handles cases where: 259 + - Process died without sending exit event (crash, kill -9, etc.) 260 + - Exit event was lost or not received 261 + - Process became a zombie 262 + """ 263 + refs_to_remove = [] 264 + 265 + for ref, task in self.running_tasks.items(): 266 + pid = task["pid"] 267 + try: 268 + # Try to get process info - this will fail if PID is invalid 269 + process = psutil.Process(pid) 270 + # Check if process is a zombie 271 + if process.status() == psutil.STATUS_ZOMBIE: 272 + refs_to_remove.append(ref) 273 + except psutil.NoSuchProcess: 274 + # Process no longer exists 275 + refs_to_remove.append(ref) 276 + except psutil.AccessDenied: 277 + # We don't have permission - assume it's still running 278 + pass 279 + 280 + # Clean up dead tasks 281 + for ref in refs_to_remove: 282 + task = self.running_tasks[ref] 283 + pid = task["pid"] 284 + name = task["name"] 285 + 286 + # Clean up CPU tracking 287 + if pid in self.cpu_procs: 288 + del self.cpu_procs[pid] 289 + if pid in self.cpu_cache: 290 + del self.cpu_cache[pid] 291 + 292 + # Remove task 293 + del self.running_tasks[ref] 294 + 295 + # Clean up log lines 296 + if ref in self.last_log_lines: 297 + del self.last_log_lines[ref] 298 + 234 299 def render_tasks_table(self) -> list[str]: 235 300 """Render the running tasks table. 236 301 ··· 406 471 """Main event loop for the TUI.""" 407 472 self.callosum.start(callback=self.handle_event) 408 473 474 + # Track iteration count for periodic timeout checks 475 + iteration = 0 476 + 409 477 with self.term.cbreak(), self.term.hidden_cursor(): 410 478 # Initial render 411 479 print(self.render(), flush=True) ··· 430 498 self.running = False 431 499 elif key.code == 4: # Ctrl-D 432 500 self.running = False 501 + 502 + # Check for dead tasks every 5 seconds (50 iterations * 0.1s = 5s) 503 + iteration += 1 504 + if iteration % 50 == 0: 505 + self.cleanup_dead_tasks() 433 506 434 507 # Render on every iteration (includes callosum updates) 435 508 print(self.render(), flush=True)