personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add tmux terminal observer for capturing terminal content

New observe-tmux command polls tmux sessions for activity and captures
visible pane content with ANSI escape codes. Creates 5-minute segments
with JSONL output containing window layouts and pane content.

- Captures active window and all panes for each active session
- Deduplicates by content hash (only stores changes)
- Includes window list and pane layout info for visualization
- Emits Callosum events for health monitoring

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+483
+1
observe/tmux/__init__.py
··· 1 + """Tmux terminal capture for observe package."""
+481
observe/tmux/observer.py
··· 1 + #!/usr/bin/env python3 2 + """ 3 + Tmux terminal observer for journaling. 4 + 5 + Polls tmux for active sessions and captures terminal content from the active 6 + window's panes. Creates 5-minute segments with JSONL output. 7 + """ 8 + 9 + import argparse 10 + import hashlib 11 + import json 12 + import logging 13 + import os 14 + import signal 15 + import subprocess 16 + import time 17 + from dataclasses import dataclass 18 + 19 + from think.callosum import CallosumConnection 20 + from think.utils import day_path, setup_cli 21 + 22 + logger = logging.getLogger(__name__) 23 + 24 + # Constants 25 + DEFAULT_INTERVAL = 300 # 5 minutes 26 + DEFAULT_POLL_INTERVAL = 5 # seconds 27 + 28 + 29 + @dataclass 30 + class PaneInfo: 31 + """Information about a tmux pane.""" 32 + 33 + id: str 34 + index: int 35 + left: int 36 + top: int 37 + width: int 38 + height: int 39 + active: bool 40 + content: str = "" 41 + 42 + 43 + @dataclass 44 + class WindowInfo: 45 + """Information about a tmux window.""" 46 + 47 + id: str 48 + index: int 49 + name: str 50 + active: bool 51 + 52 + 53 + @dataclass 54 + class CaptureResult: 55 + """Result of capturing a session's active window.""" 56 + 57 + session: str 58 + window: WindowInfo 59 + windows: list[WindowInfo] 60 + panes: list[PaneInfo] 61 + 62 + 63 + def run_tmux_command(args: list[str]) -> str | None: 64 + """Run a tmux command and return stdout, or None on error.""" 65 + try: 66 + result = subprocess.run( 67 + ["tmux"] + args, 68 + capture_output=True, 69 + text=True, 70 + timeout=5, 71 + ) 72 + if result.returncode != 0: 73 + return None 74 + return result.stdout 75 + except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as e: 76 + logger.debug(f"tmux command failed: {e}") 77 + return None 78 + 79 + 80 + class TmuxObserver: 81 + """Observer for tmux terminal activity.""" 82 + 83 + def __init__(self, interval: int = DEFAULT_INTERVAL, poll_interval: int = DEFAULT_POLL_INTERVAL): 84 + self.interval = interval 85 + self.poll_interval = poll_interval 86 + self.running = True 87 + 88 + # Segment state 89 + self.start_at = time.time() 90 + self.captures: list[dict] = [] 91 + self.capture_id = 0 92 + self.sessions_seen: set[str] = set() 93 + 94 + # Deduplication: session -> hash of last capture 95 + self.last_hash: dict[str, str] = {} 96 + 97 + # Callosum for status events 98 + self.callosum: CallosumConnection | None = None 99 + 100 + def get_active_sessions(self) -> list[dict]: 101 + """Get sessions with recent client activity. 102 + 103 + Returns list of dicts with 'session' and 'activity' keys. 104 + """ 105 + output = run_tmux_command([ 106 + "list-clients", 107 + "-F", "#{client_session} #{client_activity}", 108 + ]) 109 + if not output: 110 + return [] 111 + 112 + now = time.time() 113 + active = [] 114 + seen_sessions = set() 115 + 116 + for line in output.strip().split("\n"): 117 + if not line: 118 + continue 119 + parts = line.split(" ", 1) 120 + if len(parts) != 2: 121 + continue 122 + 123 + session, activity_str = parts 124 + try: 125 + activity = int(activity_str) 126 + except ValueError: 127 + continue 128 + 129 + # Check if active within poll interval 130 + if now - activity <= self.poll_interval and session not in seen_sessions: 131 + active.append({"session": session, "activity": activity}) 132 + seen_sessions.add(session) 133 + 134 + return active 135 + 136 + def get_windows(self, session: str) -> list[WindowInfo]: 137 + """Get all windows for a session.""" 138 + output = run_tmux_command([ 139 + "list-windows", 140 + "-t", session, 141 + "-F", "#{window_active} #{window_id} #{window_index} #{window_name}", 142 + ]) 143 + if not output: 144 + return [] 145 + 146 + windows = [] 147 + for line in output.strip().split("\n"): 148 + if not line: 149 + continue 150 + parts = line.split(" ", 3) 151 + if len(parts) < 4: 152 + continue 153 + 154 + active_str, window_id, index_str, name = parts 155 + try: 156 + windows.append(WindowInfo( 157 + id=window_id, 158 + index=int(index_str), 159 + name=name, 160 + active=(active_str == "1"), 161 + )) 162 + except ValueError: 163 + continue 164 + 165 + return windows 166 + 167 + def get_panes(self, window_id: str) -> list[PaneInfo]: 168 + """Get all panes for a window with layout info.""" 169 + output = run_tmux_command([ 170 + "list-panes", 171 + "-t", window_id, 172 + "-F", "#{pane_id} #{pane_index} #{pane_left} #{pane_top} #{pane_width} #{pane_height} #{pane_active}", 173 + ]) 174 + if not output: 175 + return [] 176 + 177 + panes = [] 178 + for line in output.strip().split("\n"): 179 + if not line: 180 + continue 181 + parts = line.split(" ") 182 + if len(parts) != 7: 183 + continue 184 + 185 + try: 186 + panes.append(PaneInfo( 187 + id=parts[0], 188 + index=int(parts[1]), 189 + left=int(parts[2]), 190 + top=int(parts[3]), 191 + width=int(parts[4]), 192 + height=int(parts[5]), 193 + active=(parts[6] == "1"), 194 + )) 195 + except ValueError: 196 + continue 197 + 198 + return panes 199 + 200 + def capture_pane(self, pane_id: str) -> str: 201 + """Capture visible pane content with ANSI escape codes.""" 202 + output = run_tmux_command([ 203 + "capture-pane", 204 + "-p", # Print to stdout 205 + "-e", # Include escape sequences (ANSI codes) 206 + "-t", pane_id, 207 + ]) 208 + return output if output else "" 209 + 210 + def capture_session(self, session: str) -> CaptureResult | None: 211 + """Capture the active window of a session with all its panes. 212 + 213 + Returns None if session doesn't exist or has no active window. 214 + """ 215 + windows = self.get_windows(session) 216 + if not windows: 217 + return None 218 + 219 + # Find active window 220 + active_window = next((w for w in windows if w.active), None) 221 + if not active_window: 222 + return None 223 + 224 + # Get panes for active window 225 + panes = self.get_panes(active_window.id) 226 + if not panes: 227 + return None 228 + 229 + # Capture content for each pane 230 + for pane in panes: 231 + pane.content = self.capture_pane(pane.id) 232 + 233 + return CaptureResult( 234 + session=session, 235 + window=active_window, 236 + windows=windows, 237 + panes=panes, 238 + ) 239 + 240 + def compute_hash(self, result: CaptureResult) -> str: 241 + """Compute hash of capture for deduplication.""" 242 + # Hash window id + all pane contents 243 + parts = [result.window.id] 244 + for pane in sorted(result.panes, key=lambda p: p.id): 245 + parts.append(pane.content) 246 + content = "\n".join(parts) 247 + return hashlib.md5(content.encode()).hexdigest() 248 + 249 + def result_to_dict(self, result: CaptureResult, ts: float) -> dict: 250 + """Convert CaptureResult to JSON-serializable dict.""" 251 + self.capture_id += 1 252 + return { 253 + "id": self.capture_id, 254 + "ts": ts, 255 + "session": result.session, 256 + "window": { 257 + "id": result.window.id, 258 + "index": result.window.index, 259 + "name": result.window.name, 260 + }, 261 + "windows": [ 262 + {"id": w.id, "index": w.index, "name": w.name, "active": w.active} 263 + for w in result.windows 264 + ], 265 + "panes": [ 266 + { 267 + "id": p.id, 268 + "index": p.index, 269 + "left": p.left, 270 + "top": p.top, 271 + "width": p.width, 272 + "height": p.height, 273 + "active": p.active, 274 + "content": p.content, 275 + } 276 + for p in result.panes 277 + ], 278 + } 279 + 280 + def poll_and_capture(self) -> list[dict]: 281 + """Poll for active sessions and capture changed content. 282 + 283 + Returns list of capture dicts for sessions that changed. 284 + """ 285 + active_sessions = self.get_active_sessions() 286 + if not active_sessions: 287 + return [] 288 + 289 + ts = time.time() 290 + new_captures = [] 291 + 292 + for session_info in active_sessions: 293 + session = session_info["session"] 294 + self.sessions_seen.add(session) 295 + 296 + result = self.capture_session(session) 297 + if not result: 298 + continue 299 + 300 + # Check if content changed 301 + content_hash = self.compute_hash(result) 302 + if self.last_hash.get(session) == content_hash: 303 + logger.debug(f"Session {session} unchanged, skipping") 304 + continue 305 + 306 + self.last_hash[session] = content_hash 307 + capture_dict = self.result_to_dict(result, ts) 308 + new_captures.append(capture_dict) 309 + logger.debug(f"Captured session {session}: {len(result.panes)} panes") 310 + 311 + return new_captures 312 + 313 + def handle_boundary(self): 314 + """Write accumulated captures to segment JSONL and reset.""" 315 + if not self.captures: 316 + logger.info("No captures in segment, skipping write") 317 + self.reset_segment() 318 + return 319 + 320 + # Build segment path 321 + start_dt = time.localtime(self.start_at) 322 + date_part = time.strftime("%Y%m%d", start_dt) 323 + time_part = time.strftime("%H%M%S", start_dt) 324 + duration = int(time.time() - self.start_at) 325 + segment_key = f"{time_part}_{duration}" 326 + 327 + segment_dir = day_path(date_part) / segment_key 328 + segment_dir.mkdir(parents=True, exist_ok=True) 329 + 330 + output_path = segment_dir / "tmux.jsonl" 331 + 332 + # Write JSONL: metadata header + captures 333 + with open(output_path, "w") as f: 334 + # Header with metadata 335 + header = { 336 + "captures": len(self.captures), 337 + "sessions": sorted(self.sessions_seen), 338 + } 339 + f.write(json.dumps(header) + "\n") 340 + 341 + # Write each capture 342 + for capture in self.captures: 343 + f.write(json.dumps(capture) + "\n") 344 + 345 + logger.info( 346 + f"Wrote {len(self.captures)} captures to {output_path}" 347 + ) 348 + 349 + # Emit event 350 + if self.callosum: 351 + self.callosum.emit( 352 + "observe", 353 + "tmux_captured", 354 + segment=segment_key, 355 + captures=len(self.captures), 356 + sessions=sorted(self.sessions_seen), 357 + ) 358 + 359 + self.reset_segment() 360 + 361 + def reset_segment(self): 362 + """Reset state for new segment.""" 363 + self.start_at = time.time() 364 + self.captures = [] 365 + self.capture_id = 0 366 + self.sessions_seen = set() 367 + # Keep last_hash for cross-segment deduplication 368 + 369 + def emit_status(self): 370 + """Emit observe.status event for health monitoring.""" 371 + if not self.callosum: 372 + return 373 + 374 + elapsed = int(time.time() - self.start_at) 375 + self.callosum.emit( 376 + "observe", 377 + "status", 378 + tmux={ 379 + "captures": len(self.captures), 380 + "sessions": sorted(self.sessions_seen), 381 + "window_elapsed_seconds": elapsed, 382 + }, 383 + ) 384 + 385 + def main_loop(self): 386 + """Run the main observer loop.""" 387 + logger.info( 388 + f"Starting tmux observer (interval={self.interval}s, poll={self.poll_interval}s)" 389 + ) 390 + 391 + # Start Callosum connection 392 + self.callosum = CallosumConnection() 393 + self.callosum.start() 394 + 395 + last_status_emit = 0.0 396 + 397 + while self.running: 398 + # Poll and capture 399 + new_captures = self.poll_and_capture() 400 + self.captures.extend(new_captures) 401 + 402 + # Check for segment boundary 403 + elapsed = time.time() - self.start_at 404 + if elapsed >= self.interval: 405 + logger.info(f"Segment boundary at {elapsed:.1f}s") 406 + self.handle_boundary() 407 + 408 + # Emit status every 5 seconds 409 + now = time.time() 410 + if now - last_status_emit >= 5: 411 + self.emit_status() 412 + last_status_emit = now 413 + 414 + # Sleep until next poll 415 + time.sleep(self.poll_interval) 416 + 417 + # Cleanup 418 + self.shutdown() 419 + 420 + def shutdown(self): 421 + """Clean shutdown - write any pending captures.""" 422 + logger.info("Shutting down tmux observer...") 423 + 424 + if self.captures: 425 + logger.info(f"Writing {len(self.captures)} pending captures") 426 + self.handle_boundary() 427 + 428 + if self.callosum: 429 + self.callosum.stop() 430 + 431 + 432 + def main(): 433 + """CLI entry point.""" 434 + parser = argparse.ArgumentParser( 435 + description="Tmux terminal observer for journaling." 436 + ) 437 + parser.add_argument( 438 + "--interval", 439 + type=int, 440 + default=DEFAULT_INTERVAL, 441 + help=f"Segment duration in seconds (default: {DEFAULT_INTERVAL})", 442 + ) 443 + parser.add_argument( 444 + "--poll", 445 + type=int, 446 + default=DEFAULT_POLL_INTERVAL, 447 + help=f"Poll interval in seconds (default: {DEFAULT_POLL_INTERVAL})", 448 + ) 449 + args = setup_cli(parser) 450 + 451 + # Verify journal path exists 452 + journal = os.getenv("JOURNAL_PATH") 453 + if not journal or not os.path.exists(journal): 454 + logger.error(f"JOURNAL_PATH not set or does not exist: {journal}") 455 + return 1 456 + 457 + # Check tmux is available 458 + if run_tmux_command(["list-sessions"]) is None: 459 + logger.warning("tmux not available or no sessions - will poll for server") 460 + 461 + observer = TmuxObserver(interval=args.interval, poll_interval=args.poll) 462 + 463 + # Signal handlers 464 + def signal_handler(signum, frame): 465 + logger.info("Received shutdown signal") 466 + observer.running = False 467 + 468 + signal.signal(signal.SIGINT, signal_handler) 469 + signal.signal(signal.SIGTERM, signal_handler) 470 + 471 + try: 472 + observer.main_loop() 473 + except Exception as e: 474 + logger.error(f"Observer error: {e}", exc_info=True) 475 + return 1 476 + 477 + return 0 478 + 479 + 480 + if __name__ == "__main__": 481 + exit(main())
+1
pyproject.toml
··· 118 118 observe-transcribe = "observe.transcribe:main" 119 119 observe-describe = "observe.describe:main" 120 120 observe-sense = "observe.sense:main" 121 + observe-tmux = "observe.tmux.observer:main" 121 122 think-formatter = "think.formatters:main" 122 123 123 124 [project.urls]