personal memory agent
1#!/usr/bin/env python3
2# SPDX-License-Identifier: AGPL-3.0-only
3# Copyright (c) 2026 sol pbc
4
5"""Unified process spawning and lifecycle management utilities.
6
7All subprocess output is automatically logged to:
8 journal/chronicle/{YYYYMMDD}/health/{ref}_{process_name}.log
9
10Where process_name is derived from cmd[0] basename, and ref is a unique correlation ID.
11
12Symlinks provide stable access paths:
13 journal/chronicle/{YYYYMMDD}/health/{process_name}.log (day-level symlink)
14 journal/health/{process_name}.log (journal-level symlink)
15
16Logs automatically roll over at midnight for long-running processes.
17"""
18
19from __future__ import annotations
20
21import logging
22import os
23import subprocess
24import threading
25import time
26from dataclasses import dataclass
27from datetime import datetime
28from pathlib import Path
29
30from think.callosum import CallosumConnection
31from think.utils import CHRONICLE_DIR, get_journal, now_ms
32
33logger = logging.getLogger(__name__)
34
35
36def _get_journal_path() -> Path:
37 """Return the journal path (auto-creates if needed)."""
38 return Path(get_journal())
39
40
41def _current_day() -> str:
42 """Get current day in YYYYMMDD format."""
43 return datetime.now().strftime("%Y%m%d")
44
45
46def _day_health_log_path(day: str, ref: str, name: str) -> Path:
47 """Build path to day health log.
48
49 Returns: journal/chronicle/{day}/health/{ref}_{name}.log
50 """
51 return _get_journal_path() / CHRONICLE_DIR / day / "health" / f"{ref}_{name}.log"
52
53
54def _atomic_symlink(link_path: Path, target: str) -> None:
55 """Create or update symlink atomically.
56
57 Args:
58 link_path: Path where symlink should be created
59 target: Target path (can be relative or absolute)
60 """
61 link_path.parent.mkdir(parents=True, exist_ok=True)
62 tmp_link = link_path.with_suffix(f".tmp{os.getpid()}_{threading.get_ident()}")
63 try:
64 tmp_link.symlink_to(target)
65 tmp_link.replace(link_path)
66 finally:
67 # Clean up temp file if it still exists
68 if tmp_link.exists() or tmp_link.is_symlink():
69 tmp_link.unlink(missing_ok=True)
70
71
72def _format_log_line(prefix: str, stream: str, line: str) -> str:
73 """Format log line with ISO timestamp and labels.
74
75 Args:
76 prefix: Process identifier (e.g., "observer" or "describe:file.webm")
77 stream: "stdout" or "stderr"
78 line: Output line from process
79
80 Returns:
81 Formatted line: "2024-11-01T10:30:45 [prefix:stream] line\\n"
82 """
83 timestamp = datetime.now().isoformat(timespec="seconds")
84 clean_line = line.rstrip("\n")
85 return f"{timestamp} [{prefix}:{stream}] {clean_line}\n"
86
87
88class DailyLogWriter:
89 """Thread-safe log writer that automatically rolls over at midnight.
90
91 When ``day`` is provided, the writer is pinned to that day directory
92 and midnight rollover is disabled (batch processing of historical days).
93
94 Writes to: journal/chronicle/{YYYYMMDD}/health/{ref}_{name}.log
95
96 Creates and maintains symlinks:
97 - journal/chronicle/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log (day-level)
98 - journal/health/{name}.log -> chronicle/{YYYYMMDD}/health/{ref}_{name}.log (journal-level)
99
100 When the day changes, automatically closes old file, opens new file, and updates symlinks.
101 """
102
103 def __init__(self, ref: str, name: str, day: str | None = None):
104 self._ref = ref
105 self._name = name
106 self._pinned = day is not None
107 self._lock = threading.Lock()
108 self._current_day = day or _current_day()
109 self._fh = self._open_log()
110 self._update_symlinks()
111
112 def _open_log(self):
113 """Open log file for current day."""
114 log_path = _day_health_log_path(self._current_day, self._ref, self._name)
115 log_path.parent.mkdir(parents=True, exist_ok=True)
116 return log_path.open("a", encoding="utf-8")
117
118 def _update_symlinks(self) -> None:
119 """Update day-level and journal-level symlinks to point to current log."""
120 journal = _get_journal_path()
121 day_health = journal / CHRONICLE_DIR / self._current_day / "health"
122 log_filename = f"{self._ref}_{self._name}.log"
123
124 # Day-level symlink: chronicle/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log
125 day_symlink = day_health / f"{self._name}.log"
126 _atomic_symlink(day_symlink, log_filename)
127
128 # Journal-level symlink: health/{name}.log -> ../chronicle/{YYYYMMDD}/health/{ref}_{name}.log
129 # Relative from journal/health/ to journal/chronicle/{YYYYMMDD}/health/
130 journal_symlink = journal / "health" / f"{self._name}.log"
131 relative_target = (
132 f"../{CHRONICLE_DIR}/{self._current_day}/health/{log_filename}"
133 )
134 _atomic_symlink(journal_symlink, relative_target)
135
136 def write(self, message: str) -> None:
137 """Write message to log, handling day rollover."""
138 with self._lock:
139 if not self._pinned:
140 # Check for day change
141 day_now = _current_day()
142 if day_now != self._current_day:
143 # Close old log
144 if not self._fh.closed:
145 self._fh.close()
146 # Open new log for new day — keep old handle on failure
147 try:
148 self._fh = self._open_log()
149 self._current_day = day_now
150 self._update_symlinks()
151 except OSError:
152 pass
153
154 # Write and flush — swallow disk-full so output threads survive
155 try:
156 self._fh.write(message)
157 self._fh.flush()
158 except OSError:
159 pass
160
161 def close(self) -> None:
162 """Close log file."""
163 with self._lock:
164 if not self._fh.closed:
165 self._fh.close()
166
167 @property
168 def path(self) -> Path:
169 """Get current log file path."""
170 return _day_health_log_path(self._current_day, self._ref, self._name)
171
172
173@dataclass
174class ManagedProcess:
175 """Subprocess wrapper with automatic output logging and lifecycle management.
176
177 All output is automatically logged to:
178 journal/chronicle/{YYYYMMDD}/health/{ref}_{name}.log
179
180 Where name is derived from cmd[0] basename, and ref is a unique correlation ID.
181
182 Symlinks are automatically created and maintained:
183 journal/chronicle/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log (day-level)
184 journal/health/{name}.log -> chronicle/{YYYYMMDD}/health/{ref}_{name}.log (journal-level)
185
186 Logs roll over automatically at midnight for long-running processes.
187
188 Process lifecycle events are broadcast via Callosum logs tract.
189 """
190
191 process: subprocess.Popen
192 name: str
193 log_writer: DailyLogWriter
194 cmd: list[str]
195 _threads: list[threading.Thread]
196 ref: str
197 _start_time: float
198 _callosum: CallosumConnection | None
199 _owns_callosum: bool = True
200
201 @property
202 def start_time(self) -> float:
203 """Epoch timestamp when this process was spawned."""
204 return self._start_time
205
206 @classmethod
207 def spawn(
208 cls,
209 cmd: list[str],
210 *,
211 env: dict | None = None,
212 ref: str | None = None,
213 callosum: CallosumConnection | None = None,
214 day: str | None = None,
215 ) -> "ManagedProcess":
216 """Spawn process with automatic output logging to daily health directory.
217
218 Args:
219 cmd: Command and arguments
220 env: Optional environment variables (inherits parent env if not provided)
221 ref: Optional correlation ID (auto-generated if not provided)
222 callosum: Optional shared CallosumConnection (creates new one if not provided)
223 day: Optional day override (YYYYMMDD). When provided, logs are placed
224 in that day's health directory instead of today's.
225
226 Returns:
227 ManagedProcess instance
228
229 Raises:
230 RuntimeError: If process fails to spawn
231
232 Example:
233 managed = ManagedProcess.spawn(["observer", "-v"])
234 # Logs to: {JOURNAL}/{YYYYMMDD}/health/{ref}_observer.log
235 # Symlinks: {YYYYMMDD}/health/observer.log (day-level)
236 # health/observer.log (journal-level)
237
238 # With explicit correlation ID:
239 managed = ManagedProcess.spawn(
240 ["sol", "indexer", "--rescan"],
241 ref="1730476800000",
242 )
243 # Logs to: {JOURNAL}/{YYYYMMDD}/health/1730476800000_indexer.log
244 """
245 # Derive name from command - use subcommand if invoked via sol
246 if cmd[0] == "sol" and len(cmd) > 1:
247 name = cmd[1]
248 if name == "think":
249 for flag, mode in [
250 ("--activity", "activity"),
251 ("--flush", "flush"),
252 ("--segments", "segment"),
253 ("--weekly", "weekly"),
254 ("--segment", "segment"),
255 ]:
256 if flag in cmd:
257 name = mode
258 break
259 else:
260 name = "daily"
261 else:
262 name = Path(cmd[0]).name
263
264 # Generate correlation ID (use provided ref, else timestamp)
265 ref = ref if ref else str(now_ms())
266 start_time = time.time()
267
268 # Use provided callosum or create new one
269 owns_callosum = callosum is None
270 if owns_callosum:
271 callosum = CallosumConnection()
272 callosum.start()
273
274 log_writer = DailyLogWriter(ref, name, day=day)
275
276 logger.info(f"Starting {name}: {' '.join(cmd)}")
277
278 try:
279 proc = subprocess.Popen(
280 cmd,
281 stdout=subprocess.PIPE,
282 stderr=subprocess.PIPE,
283 text=True,
284 bufsize=1,
285 env=env,
286 )
287 except Exception as exc:
288 log_writer.close()
289 if owns_callosum and callosum:
290 callosum.stop()
291 raise RuntimeError(f"Failed to spawn {name}: {exc}") from exc
292
293 logger.info(f"Started {name} with PID {proc.pid}")
294
295 # Emit exec event
296 if callosum:
297 callosum.emit(
298 "logs",
299 "exec",
300 ref=ref,
301 name=name,
302 pid=proc.pid,
303 cmd=list(cmd),
304 log_path=str(log_writer.path),
305 )
306
307 # Start output streaming threads
308 def stream_output(pipe, stream_label: str):
309 if pipe is None:
310 return
311 with pipe:
312 for line in pipe:
313 formatted = _format_log_line(name, stream_label, line)
314 log_writer.write(formatted)
315
316 # Emit line event
317 if callosum:
318 callosum.emit(
319 "logs",
320 "line",
321 ref=ref,
322 name=name,
323 pid=proc.pid,
324 stream=stream_label,
325 line=line.rstrip("\n"),
326 )
327
328 threads = [
329 threading.Thread(
330 target=stream_output,
331 args=(proc.stdout, "stdout"),
332 daemon=True,
333 ),
334 threading.Thread(
335 target=stream_output,
336 args=(proc.stderr, "stderr"),
337 daemon=True,
338 ),
339 ]
340 for thread in threads:
341 thread.start()
342
343 return cls(
344 process=proc,
345 name=name,
346 log_writer=log_writer,
347 cmd=list(cmd),
348 _threads=threads,
349 ref=ref,
350 _start_time=start_time,
351 _callosum=callosum,
352 _owns_callosum=owns_callosum,
353 )
354
355 def wait(self, timeout: float | None = None) -> int:
356 """Wait for process completion, return exit code.
357
358 Args:
359 timeout: Optional timeout in seconds
360
361 Returns:
362 Exit code
363
364 Raises:
365 subprocess.TimeoutExpired: If timeout exceeded
366 """
367 return self.process.wait(timeout=timeout)
368
369 def poll(self) -> int | None:
370 """Check if process has terminated.
371
372 Returns:
373 Exit code if terminated, None if still running
374 """
375 return self.process.poll()
376
377 def is_running(self) -> bool:
378 """Check if process is still running."""
379 return self.process.poll() is None
380
381 def terminate(self, timeout: float = 15) -> int:
382 """Gracefully terminate process with automatic escalation.
383
384 This method handles the full termination sequence in ONE CALL:
385 1. Send SIGTERM (graceful shutdown request)
386 2. Wait up to `timeout` seconds for process to exit
387 3. If still alive, send SIGKILL (force kill)
388 4. Wait for final cleanup (max 1 second)
389 5. Return exit code
390
391 Args:
392 timeout: Seconds to wait after SIGTERM before SIGKILL (default: 15)
393
394 Returns:
395 Exit code (may be negative for signals, e.g., -15 for SIGTERM)
396
397 Example:
398 exit_code = managed.terminate(timeout=10) # One call, blocks until dead
399 """
400 logger.debug(f"Terminating {self.name} (PID {self.pid})...")
401 try:
402 self.process.terminate()
403 exit_code = self.process.wait(timeout=timeout)
404 logger.debug(f"{self.name} terminated gracefully with code {exit_code}")
405 return exit_code
406 except subprocess.TimeoutExpired:
407 logger.warning(
408 f"{self.name} did not terminate after {timeout}s, force killing..."
409 )
410 self.process.kill()
411 exit_code = self.process.wait(timeout=1)
412 logger.debug(f"{self.name} killed with code {exit_code}")
413 return exit_code
414
415 def cleanup(self) -> None:
416 """Wait for output threads to finish and close log file.
417
418 Call this after process exits to clean up resources.
419 Each step is isolated so one failure doesn't block the rest.
420 """
421 for thread in self._threads:
422 try:
423 thread.join(timeout=1)
424 except Exception:
425 pass
426
427 try:
428 self.log_writer.close()
429 except Exception:
430 pass
431
432 # Emit exit event
433 if self._callosum:
434 try:
435 duration_ms = int((time.time() - self._start_time) * 1000)
436 self._callosum.emit(
437 "logs",
438 "exit",
439 ref=self.ref,
440 name=self.name,
441 pid=self.pid,
442 exit_code=self.returncode,
443 duration_ms=duration_ms,
444 cmd=self.cmd,
445 log_path=str(self.log_writer.path),
446 )
447 except Exception:
448 pass
449 # Only stop callosum if we created it (not shared)
450 if self._owns_callosum:
451 try:
452 self._callosum.stop()
453 except Exception:
454 pass
455
456 @property
457 def pid(self) -> int:
458 """Process ID."""
459 return self.process.pid
460
461 @property
462 def returncode(self) -> int | None:
463 """Return code if process has exited, None otherwise."""
464 return self.process.returncode
465
466
467def run_task(
468 cmd: list[str],
469 *,
470 timeout: float | None = None,
471 env: dict | None = None,
472 ref: str | None = None,
473 callosum: CallosumConnection | None = None,
474 day: str | None = None,
475) -> tuple[bool, int, Path]:
476 """Run a task to completion with automatic logging (blocking).
477
478 Spawns process, waits for completion, cleans up resources.
479 Output is automatically logged to: journal/{YYYYMMDD}/health/{ref}_{name}.log
480 where name is derived from cmd[0] basename.
481
482 Args:
483 cmd: Command and arguments
484 timeout: Optional timeout in seconds
485 env: Optional environment variables
486 ref: Optional correlation ID (auto-generated if not provided)
487 callosum: Optional shared CallosumConnection (creates new one if not provided)
488 day: Optional day override (YYYYMMDD). When provided, logs are placed
489 in that day's health directory instead of today's.
490
491 Returns:
492 (success, exit_code, log_path) tuple where success = (exit_code == 0)
493 and log_path points to the process output log file.
494
495 Example:
496 success, code, log = run_task(
497 ["sol", "generate", "20241101", "-f", "flow"],
498 timeout=300,
499 )
500 # Logs to: {JOURNAL}/{YYYYMMDD}/health/{ref}_generate.log
501
502 # With explicit correlation ID:
503 success, code, log = run_task(
504 ["sol", "indexer", "--rescan"],
505 ref="1730476800000",
506 )
507 # Logs to: {JOURNAL}/{YYYYMMDD}/health/1730476800000_indexer.log
508 """
509 managed = ManagedProcess.spawn(cmd, env=env, ref=ref, callosum=callosum, day=day)
510 log_path = managed.log_writer.path
511 try:
512 exit_code = managed.wait(timeout=timeout)
513 except subprocess.TimeoutExpired:
514 logger.error(f"{managed.name} timed out after {timeout}s, terminating...")
515 exit_code = managed.terminate()
516 finally:
517 managed.cleanup()
518
519 if exit_code != 0:
520 logger.warning(f"{managed.name} exited with code {exit_code}")
521
522 return (exit_code == 0, exit_code, log_path)