personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix Callosum feedback loop causing 200KB+ log lines

Root cause: buffer not cleared on reconnect, causing stale partial data
to corrupt messages after socket disconnect/reconnect.

Fixes:
- Clear buffer when connection drops (root cause)
- Silent JSON parse failures to prevent warning→stderr→emit loop
- Skip parsing entirely when no callback registered
- Truncate any logged content to 100 chars

Also modernized type hints (Python 3.10+) and fixed duplicate logging
setup in main() that ignored the -d/--debug flag.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+23 -29
+23 -29
think/callosum.py
··· 14 14 import threading 15 15 import time 16 16 from pathlib import Path 17 - from typing import Any, Callable, Dict, List, Optional 17 + from typing import Any, Callable 18 18 19 19 from think.utils import get_journal 20 20 ··· 29 29 concurrently. 30 30 """ 31 31 32 - def __init__(self, socket_path: Optional[Path] = None): 32 + def __init__(self, socket_path: Path | None = None): 33 33 if socket_path is None: 34 34 socket_path = Path(get_journal()) / "health" / "callosum.sock" 35 35 36 36 self.socket_path = Path(socket_path) 37 - self.clients: List[socket.socket] = [] 37 + self.clients: list[socket.socket] = [] 38 38 self.lock = threading.RLock() 39 39 self.stop_event = threading.Event() 40 - self.server_socket: Optional[socket.socket] = None 40 + self.server_socket: socket.socket | None = None 41 41 42 42 # Broadcast queue and writer thread for serialized sends 43 43 self.broadcast_queue: queue.Queue = queue.Queue(maxsize=10000) 44 - self.writer_thread: Optional[threading.Thread] = None 44 + self.writer_thread: threading.Thread | None = None 45 45 46 46 def start(self) -> None: 47 47 """Start the broadcast server.""" ··· 110 110 message = json.loads(line) 111 111 self.broadcast(message) 112 112 except json.JSONDecodeError: 113 - logger.warning(f"Invalid JSON: {line}") 113 + pass # Silent failure - avoid feedback loops 114 114 except socket.timeout: 115 115 continue 116 116 except Exception as e: ··· 140 140 141 141 self._send_to_clients(message) 142 142 143 - def _send_to_clients(self, message: Dict[str, Any]) -> None: 143 + def _send_to_clients(self, message: dict[str, Any]) -> None: 144 144 """Send a message to all connected clients, removing dead ones. 145 145 146 146 This method handles the actual socket I/O and dead client cleanup. ··· 178 178 except Exception: 179 179 pass 180 180 181 - def broadcast(self, message: Dict[str, Any]) -> bool: 181 + def broadcast(self, message: dict[str, Any]) -> bool: 182 182 """Queue message for broadcast to all connected clients. 183 183 184 184 Returns immediately after queueing. The writer thread handles 185 185 actual transmission to ensure serialized, non-interleaved sends. 186 186 187 187 Args: 188 - message: Dict with required 'tract' and 'event' fields 188 + message: dict with required 'tract' and 'event' fields 189 189 190 190 Returns: 191 191 True if queued successfully, False if validation failed or queue full ··· 228 228 dropped (with debug logging) when disconnected. 229 229 """ 230 230 231 - def __init__(self, socket_path: Optional[Path] = None): 231 + def __init__(self, socket_path: Path | None = None): 232 232 """Initialize connection (does not connect immediately). 233 233 234 234 Args: ··· 239 239 240 240 self.socket_path = Path(socket_path) 241 241 self.send_queue: queue.Queue = queue.Queue(maxsize=1000) 242 - self.callback: Optional[Callable[[Dict[str, Any]], Any]] = None 243 - self.thread: Optional[threading.Thread] = None 242 + self.callback: Callable[[dict[str, Any]], Any] | None = None 243 + self.thread: threading.Thread | None = None 244 244 self.stop_event = threading.Event() 245 245 246 - def start(self, callback: Optional[Callable[[Dict[str, Any]], Any]] = None) -> None: 246 + def start(self, callback: Callable[[dict[str, Any]], Any] | None = None) -> None: 247 247 """Start background thread for sending and receiving. 248 248 249 249 Thread will auto-connect with retry and drain the send queue even when ··· 262 262 263 263 def _run_loop(self) -> None: 264 264 """Main loop: drain queue, connect/reconnect, receive when connected.""" 265 - sock: Optional[socket.socket] = None 265 + sock: socket.socket | None = None 266 266 buffer = "" 267 267 last_connect_attempt = 0.0 268 268 ··· 326 326 except Exception: 327 327 pass 328 328 sock = None 329 + buffer = "" # Clear partial data from old connection 329 330 continue 330 331 331 332 buffer += data.decode("utf-8") 332 333 while "\n" in buffer: 333 334 line, buffer = buffer.split("\n", 1) 334 - if line.strip(): 335 + if line.strip() and self.callback: 335 336 try: 336 337 message = json.loads(line) 337 - if self.callback: 338 - try: 339 - self.callback(message) 340 - except Exception as e: 341 - logger.error(f"Callback error: {e}") 338 + self.callback(message) 342 339 except json.JSONDecodeError: 343 - logger.warning(f"Invalid JSON: {line}") 340 + pass # Silent failure - avoid feedback loops 341 + except Exception as e: 342 + logger.error(f"Callback error: {e}") 344 343 except socket.timeout: 345 344 continue # Normal, just loop back to drain queue 346 345 except Exception as e: ··· 350 349 except Exception: 351 350 pass 352 351 sock = None 352 + buffer = "" # Clear partial data from old connection 353 353 354 354 # Cleanup on stop 355 355 if sock: ··· 398 398 def callosum_send( 399 399 tract: str, 400 400 event: str, 401 - socket_path: Optional[Path] = None, 401 + socket_path: Path | None = None, 402 402 timeout: float = 2.0, 403 403 **fields, 404 404 ) -> bool: ··· 442 442 from think.utils import setup_cli 443 443 444 444 parser = argparse.ArgumentParser(description="Callosum message bus") 445 - args = setup_cli(parser) 446 - 447 - # Set up logging 448 - logging.basicConfig( 449 - level=logging.INFO if not args.verbose else logging.DEBUG, 450 - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 451 - ) 445 + setup_cli(parser) # Handles logging setup based on -v/-d flags 452 446 453 447 server = CallosumServer() 454 448