personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at d18a7c02359cd827d0ff15058861de5c2600a96f 522 lines 18 kB view raw
1#!/usr/bin/env python3 2# SPDX-License-Identifier: AGPL-3.0-only 3# Copyright (c) 2026 sol pbc 4 5"""Unified process spawning and lifecycle management utilities. 6 7All subprocess output is automatically logged to: 8 journal/chronicle/{YYYYMMDD}/health/{ref}_{process_name}.log 9 10Where process_name is derived from cmd[0] basename, and ref is a unique correlation ID. 11 12Symlinks provide stable access paths: 13 journal/chronicle/{YYYYMMDD}/health/{process_name}.log (day-level symlink) 14 journal/health/{process_name}.log (journal-level symlink) 15 16Logs automatically roll over at midnight for long-running processes. 17""" 18 19from __future__ import annotations 20 21import logging 22import os 23import subprocess 24import threading 25import time 26from dataclasses import dataclass 27from datetime import datetime 28from pathlib import Path 29 30from think.callosum import CallosumConnection 31from think.utils import CHRONICLE_DIR, get_journal, now_ms 32 33logger = logging.getLogger(__name__) 34 35 36def _get_journal_path() -> Path: 37 """Return the journal path (auto-creates if needed).""" 38 return Path(get_journal()) 39 40 41def _current_day() -> str: 42 """Get current day in YYYYMMDD format.""" 43 return datetime.now().strftime("%Y%m%d") 44 45 46def _day_health_log_path(day: str, ref: str, name: str) -> Path: 47 """Build path to day health log. 48 49 Returns: journal/chronicle/{day}/health/{ref}_{name}.log 50 """ 51 return _get_journal_path() / CHRONICLE_DIR / day / "health" / f"{ref}_{name}.log" 52 53 54def _atomic_symlink(link_path: Path, target: str) -> None: 55 """Create or update symlink atomically. 56 57 Args: 58 link_path: Path where symlink should be created 59 target: Target path (can be relative or absolute) 60 """ 61 link_path.parent.mkdir(parents=True, exist_ok=True) 62 tmp_link = link_path.with_suffix(f".tmp{os.getpid()}_{threading.get_ident()}") 63 try: 64 tmp_link.symlink_to(target) 65 tmp_link.replace(link_path) 66 finally: 67 # Clean up temp file if it still exists 68 if tmp_link.exists() or tmp_link.is_symlink(): 69 tmp_link.unlink(missing_ok=True) 70 71 72def _format_log_line(prefix: str, stream: str, line: str) -> str: 73 """Format log line with ISO timestamp and labels. 74 75 Args: 76 prefix: Process identifier (e.g., "observer" or "describe:file.webm") 77 stream: "stdout" or "stderr" 78 line: Output line from process 79 80 Returns: 81 Formatted line: "2024-11-01T10:30:45 [prefix:stream] line\\n" 82 """ 83 timestamp = datetime.now().isoformat(timespec="seconds") 84 clean_line = line.rstrip("\n") 85 return f"{timestamp} [{prefix}:{stream}] {clean_line}\n" 86 87 88class DailyLogWriter: 89 """Thread-safe log writer that automatically rolls over at midnight. 90 91 When ``day`` is provided, the writer is pinned to that day directory 92 and midnight rollover is disabled (batch processing of historical days). 93 94 Writes to: journal/chronicle/{YYYYMMDD}/health/{ref}_{name}.log 95 96 Creates and maintains symlinks: 97 - journal/chronicle/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log (day-level) 98 - journal/health/{name}.log -> chronicle/{YYYYMMDD}/health/{ref}_{name}.log (journal-level) 99 100 When the day changes, automatically closes old file, opens new file, and updates symlinks. 101 """ 102 103 def __init__(self, ref: str, name: str, day: str | None = None): 104 self._ref = ref 105 self._name = name 106 self._pinned = day is not None 107 self._lock = threading.Lock() 108 self._current_day = day or _current_day() 109 self._fh = self._open_log() 110 self._update_symlinks() 111 112 def _open_log(self): 113 """Open log file for current day.""" 114 log_path = _day_health_log_path(self._current_day, self._ref, self._name) 115 log_path.parent.mkdir(parents=True, exist_ok=True) 116 return log_path.open("a", encoding="utf-8") 117 118 def _update_symlinks(self) -> None: 119 """Update day-level and journal-level symlinks to point to current log.""" 120 journal = _get_journal_path() 121 day_health = journal / CHRONICLE_DIR / self._current_day / "health" 122 log_filename = f"{self._ref}_{self._name}.log" 123 124 # Day-level symlink: chronicle/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log 125 day_symlink = day_health / f"{self._name}.log" 126 _atomic_symlink(day_symlink, log_filename) 127 128 # Journal-level symlink: health/{name}.log -> ../chronicle/{YYYYMMDD}/health/{ref}_{name}.log 129 # Relative from journal/health/ to journal/chronicle/{YYYYMMDD}/health/ 130 journal_symlink = journal / "health" / f"{self._name}.log" 131 relative_target = ( 132 f"../{CHRONICLE_DIR}/{self._current_day}/health/{log_filename}" 133 ) 134 _atomic_symlink(journal_symlink, relative_target) 135 136 def write(self, message: str) -> None: 137 """Write message to log, handling day rollover.""" 138 with self._lock: 139 if not self._pinned: 140 # Check for day change 141 day_now = _current_day() 142 if day_now != self._current_day: 143 # Close old log 144 if not self._fh.closed: 145 self._fh.close() 146 # Open new log for new day — keep old handle on failure 147 try: 148 self._fh = self._open_log() 149 self._current_day = day_now 150 self._update_symlinks() 151 except OSError: 152 pass 153 154 # Write and flush — swallow disk-full so output threads survive 155 try: 156 self._fh.write(message) 157 self._fh.flush() 158 except OSError: 159 pass 160 161 def close(self) -> None: 162 """Close log file.""" 163 with self._lock: 164 if not self._fh.closed: 165 self._fh.close() 166 167 @property 168 def path(self) -> Path: 169 """Get current log file path.""" 170 return _day_health_log_path(self._current_day, self._ref, self._name) 171 172 173@dataclass 174class ManagedProcess: 175 """Subprocess wrapper with automatic output logging and lifecycle management. 176 177 All output is automatically logged to: 178 journal/chronicle/{YYYYMMDD}/health/{ref}_{name}.log 179 180 Where name is derived from cmd[0] basename, and ref is a unique correlation ID. 181 182 Symlinks are automatically created and maintained: 183 journal/chronicle/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log (day-level) 184 journal/health/{name}.log -> chronicle/{YYYYMMDD}/health/{ref}_{name}.log (journal-level) 185 186 Logs roll over automatically at midnight for long-running processes. 187 188 Process lifecycle events are broadcast via Callosum logs tract. 189 """ 190 191 process: subprocess.Popen 192 name: str 193 log_writer: DailyLogWriter 194 cmd: list[str] 195 _threads: list[threading.Thread] 196 ref: str 197 _start_time: float 198 _callosum: CallosumConnection | None 199 _owns_callosum: bool = True 200 201 @property 202 def start_time(self) -> float: 203 """Epoch timestamp when this process was spawned.""" 204 return self._start_time 205 206 @classmethod 207 def spawn( 208 cls, 209 cmd: list[str], 210 *, 211 env: dict | None = None, 212 ref: str | None = None, 213 callosum: CallosumConnection | None = None, 214 day: str | None = None, 215 ) -> "ManagedProcess": 216 """Spawn process with automatic output logging to daily health directory. 217 218 Args: 219 cmd: Command and arguments 220 env: Optional environment variables (inherits parent env if not provided) 221 ref: Optional correlation ID (auto-generated if not provided) 222 callosum: Optional shared CallosumConnection (creates new one if not provided) 223 day: Optional day override (YYYYMMDD). When provided, logs are placed 224 in that day's health directory instead of today's. 225 226 Returns: 227 ManagedProcess instance 228 229 Raises: 230 RuntimeError: If process fails to spawn 231 232 Example: 233 managed = ManagedProcess.spawn(["observer", "-v"]) 234 # Logs to: {JOURNAL}/{YYYYMMDD}/health/{ref}_observer.log 235 # Symlinks: {YYYYMMDD}/health/observer.log (day-level) 236 # health/observer.log (journal-level) 237 238 # With explicit correlation ID: 239 managed = ManagedProcess.spawn( 240 ["sol", "indexer", "--rescan"], 241 ref="1730476800000", 242 ) 243 # Logs to: {JOURNAL}/{YYYYMMDD}/health/1730476800000_indexer.log 244 """ 245 # Derive name from command - use subcommand if invoked via sol 246 if cmd[0] == "sol" and len(cmd) > 1: 247 name = cmd[1] 248 if name == "think": 249 for flag, mode in [ 250 ("--activity", "activity"), 251 ("--flush", "flush"), 252 ("--segments", "segment"), 253 ("--weekly", "weekly"), 254 ("--segment", "segment"), 255 ]: 256 if flag in cmd: 257 name = mode 258 break 259 else: 260 name = "daily" 261 else: 262 name = Path(cmd[0]).name 263 264 # Generate correlation ID (use provided ref, else timestamp) 265 ref = ref if ref else str(now_ms()) 266 start_time = time.time() 267 268 # Use provided callosum or create new one 269 owns_callosum = callosum is None 270 if owns_callosum: 271 callosum = CallosumConnection() 272 callosum.start() 273 274 log_writer = DailyLogWriter(ref, name, day=day) 275 276 logger.info(f"Starting {name}: {' '.join(cmd)}") 277 278 try: 279 proc = subprocess.Popen( 280 cmd, 281 stdout=subprocess.PIPE, 282 stderr=subprocess.PIPE, 283 text=True, 284 bufsize=1, 285 env=env, 286 ) 287 except Exception as exc: 288 log_writer.close() 289 if owns_callosum and callosum: 290 callosum.stop() 291 raise RuntimeError(f"Failed to spawn {name}: {exc}") from exc 292 293 logger.info(f"Started {name} with PID {proc.pid}") 294 295 # Emit exec event 296 if callosum: 297 callosum.emit( 298 "logs", 299 "exec", 300 ref=ref, 301 name=name, 302 pid=proc.pid, 303 cmd=list(cmd), 304 log_path=str(log_writer.path), 305 ) 306 307 # Start output streaming threads 308 def stream_output(pipe, stream_label: str): 309 if pipe is None: 310 return 311 with pipe: 312 for line in pipe: 313 formatted = _format_log_line(name, stream_label, line) 314 log_writer.write(formatted) 315 316 # Emit line event 317 if callosum: 318 callosum.emit( 319 "logs", 320 "line", 321 ref=ref, 322 name=name, 323 pid=proc.pid, 324 stream=stream_label, 325 line=line.rstrip("\n"), 326 ) 327 328 threads = [ 329 threading.Thread( 330 target=stream_output, 331 args=(proc.stdout, "stdout"), 332 daemon=True, 333 ), 334 threading.Thread( 335 target=stream_output, 336 args=(proc.stderr, "stderr"), 337 daemon=True, 338 ), 339 ] 340 for thread in threads: 341 thread.start() 342 343 return cls( 344 process=proc, 345 name=name, 346 log_writer=log_writer, 347 cmd=list(cmd), 348 _threads=threads, 349 ref=ref, 350 _start_time=start_time, 351 _callosum=callosum, 352 _owns_callosum=owns_callosum, 353 ) 354 355 def wait(self, timeout: float | None = None) -> int: 356 """Wait for process completion, return exit code. 357 358 Args: 359 timeout: Optional timeout in seconds 360 361 Returns: 362 Exit code 363 364 Raises: 365 subprocess.TimeoutExpired: If timeout exceeded 366 """ 367 return self.process.wait(timeout=timeout) 368 369 def poll(self) -> int | None: 370 """Check if process has terminated. 371 372 Returns: 373 Exit code if terminated, None if still running 374 """ 375 return self.process.poll() 376 377 def is_running(self) -> bool: 378 """Check if process is still running.""" 379 return self.process.poll() is None 380 381 def terminate(self, timeout: float = 15) -> int: 382 """Gracefully terminate process with automatic escalation. 383 384 This method handles the full termination sequence in ONE CALL: 385 1. Send SIGTERM (graceful shutdown request) 386 2. Wait up to `timeout` seconds for process to exit 387 3. If still alive, send SIGKILL (force kill) 388 4. Wait for final cleanup (max 1 second) 389 5. Return exit code 390 391 Args: 392 timeout: Seconds to wait after SIGTERM before SIGKILL (default: 15) 393 394 Returns: 395 Exit code (may be negative for signals, e.g., -15 for SIGTERM) 396 397 Example: 398 exit_code = managed.terminate(timeout=10) # One call, blocks until dead 399 """ 400 logger.debug(f"Terminating {self.name} (PID {self.pid})...") 401 try: 402 self.process.terminate() 403 exit_code = self.process.wait(timeout=timeout) 404 logger.debug(f"{self.name} terminated gracefully with code {exit_code}") 405 return exit_code 406 except subprocess.TimeoutExpired: 407 logger.warning( 408 f"{self.name} did not terminate after {timeout}s, force killing..." 409 ) 410 self.process.kill() 411 exit_code = self.process.wait(timeout=1) 412 logger.debug(f"{self.name} killed with code {exit_code}") 413 return exit_code 414 415 def cleanup(self) -> None: 416 """Wait for output threads to finish and close log file. 417 418 Call this after process exits to clean up resources. 419 Each step is isolated so one failure doesn't block the rest. 420 """ 421 for thread in self._threads: 422 try: 423 thread.join(timeout=1) 424 except Exception: 425 pass 426 427 try: 428 self.log_writer.close() 429 except Exception: 430 pass 431 432 # Emit exit event 433 if self._callosum: 434 try: 435 duration_ms = int((time.time() - self._start_time) * 1000) 436 self._callosum.emit( 437 "logs", 438 "exit", 439 ref=self.ref, 440 name=self.name, 441 pid=self.pid, 442 exit_code=self.returncode, 443 duration_ms=duration_ms, 444 cmd=self.cmd, 445 log_path=str(self.log_writer.path), 446 ) 447 except Exception: 448 pass 449 # Only stop callosum if we created it (not shared) 450 if self._owns_callosum: 451 try: 452 self._callosum.stop() 453 except Exception: 454 pass 455 456 @property 457 def pid(self) -> int: 458 """Process ID.""" 459 return self.process.pid 460 461 @property 462 def returncode(self) -> int | None: 463 """Return code if process has exited, None otherwise.""" 464 return self.process.returncode 465 466 467def run_task( 468 cmd: list[str], 469 *, 470 timeout: float | None = None, 471 env: dict | None = None, 472 ref: str | None = None, 473 callosum: CallosumConnection | None = None, 474 day: str | None = None, 475) -> tuple[bool, int, Path]: 476 """Run a task to completion with automatic logging (blocking). 477 478 Spawns process, waits for completion, cleans up resources. 479 Output is automatically logged to: journal/{YYYYMMDD}/health/{ref}_{name}.log 480 where name is derived from cmd[0] basename. 481 482 Args: 483 cmd: Command and arguments 484 timeout: Optional timeout in seconds 485 env: Optional environment variables 486 ref: Optional correlation ID (auto-generated if not provided) 487 callosum: Optional shared CallosumConnection (creates new one if not provided) 488 day: Optional day override (YYYYMMDD). When provided, logs are placed 489 in that day's health directory instead of today's. 490 491 Returns: 492 (success, exit_code, log_path) tuple where success = (exit_code == 0) 493 and log_path points to the process output log file. 494 495 Example: 496 success, code, log = run_task( 497 ["sol", "generate", "20241101", "-f", "flow"], 498 timeout=300, 499 ) 500 # Logs to: {JOURNAL}/{YYYYMMDD}/health/{ref}_generate.log 501 502 # With explicit correlation ID: 503 success, code, log = run_task( 504 ["sol", "indexer", "--rescan"], 505 ref="1730476800000", 506 ) 507 # Logs to: {JOURNAL}/{YYYYMMDD}/health/1730476800000_indexer.log 508 """ 509 managed = ManagedProcess.spawn(cmd, env=env, ref=ref, callosum=callosum, day=day) 510 log_path = managed.log_writer.path 511 try: 512 exit_code = managed.wait(timeout=timeout) 513 except subprocess.TimeoutExpired: 514 logger.error(f"{managed.name} timed out after {timeout}s, terminating...") 515 exit_code = managed.terminate() 516 finally: 517 managed.cleanup() 518 519 if exit_code != 0: 520 logger.warning(f"{managed.name} exited with code {exit_code}") 521 522 return (exit_code == 0, exit_code, log_path)