personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(observe): add screencast viewer web utility

Created gnome-screencast-viewer tool to serve frames from screencast
files as PNG images over HTTP (port 9999, binds to 0.0.0.0).

Features:
- In-process frame extraction using PyAV (no ffmpeg subprocess)
- Timestamp-based navigation (e.g., /1.5 for frame at 1.5s)
- Monitor-based cropping using embedded metadata (e.g., /?id=HDMI-1)
- Comprehensive error logging with traceback output
- Parses monitor layout from video title metadata

Added av dependency for video container handling.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+196
+194
observe/gnome/screencast_viewer.py
··· 1 + #!/usr/bin/env python3 2 + """ 3 + screencast_viewer.py — minimal web server to view screencast frames 4 + 5 + Serves PNG frames from a webm screencast on port 9999: 6 + / - First full frame 7 + /?id=XX - First frame cropped to monitor XX 8 + /1 - Full frame at 1 second 9 + /1?id=XX - Frame at 1 second cropped to monitor XX 10 + 11 + Usage: 12 + python screencast_viewer.py screencast.webm 13 + gnome-screencast-viewer screencast.webm 14 + """ 15 + 16 + import argparse 17 + import io 18 + import sys 19 + from fractions import Fraction 20 + from http.server import BaseHTTPRequestHandler, HTTPServer 21 + from pathlib import Path 22 + from urllib.parse import parse_qs, urlparse 23 + 24 + import av 25 + from PIL import Image 26 + 27 + 28 + class ScreencastViewer: 29 + def __init__(self, video_path: str): 30 + self.video_path = Path(video_path) 31 + if not self.video_path.exists(): 32 + raise FileNotFoundError(f"Video file not found: {video_path}") 33 + 34 + # Parse monitor geometries from title metadata 35 + self.monitors = self._parse_title() 36 + 37 + def _parse_title(self) -> dict: 38 + """Extract and parse monitor geometries from video title metadata.""" 39 + try: 40 + with av.open(str(self.video_path)) as container: 41 + title = container.metadata.get("title", "") 42 + except Exception as e: 43 + print(f"WARNING: Failed to read video metadata: {e}", file=sys.stderr) 44 + title = "" 45 + 46 + # Parse title format: "connector-id:position,x1,y1,x2,y2 ..." 47 + monitors = {} 48 + for part in title.split(): 49 + if ":" not in part or "," not in part: 50 + continue 51 + connector_id, rest = part.split(":", 1) 52 + coord_parts = rest.split(",") 53 + if len(coord_parts) >= 5: 54 + # Format: position,x1,y1,x2,y2 55 + position, x1, y1, x2, y2 = coord_parts[:5] 56 + monitors[connector_id] = { 57 + "box": [int(x1), int(y1), int(x2), int(y2)], 58 + "position": position 59 + } 60 + 61 + return monitors 62 + 63 + def get_frame(self, timestamp: float = 0.0, monitor_id: str = None) -> bytes: 64 + """Extract frame at timestamp, optionally cropped to monitor.""" 65 + try: 66 + with av.open(str(self.video_path)) as container: 67 + stream = container.streams.video[0] 68 + tb: Fraction = stream.time_base # seconds per tick 69 + target_pts = int(timestamp / tb) # convert seconds -> PTS units 70 + 71 + # Seek near target (previous keyframe), then decode forward 72 + container.seek(target_pts, stream=stream, any_frame=False, backward=True) 73 + 74 + img = None 75 + for packet in container.demux(stream): 76 + for frame in packet.decode(): 77 + if frame.pts is None: 78 + continue 79 + # Use frame.time (float seconds) or calculate from pts 80 + frame_time = frame.time if frame.time is not None else (frame.pts * tb) 81 + if frame_time + 1e-9 >= timestamp: 82 + # Convert to PIL Image 83 + img = frame.to_image() 84 + break 85 + if img: 86 + break 87 + 88 + if img is None: 89 + print(f"WARNING: No frame found at/after timestamp {timestamp}", file=sys.stderr) 90 + raise RuntimeError("No frame at/after the requested timestamp") 91 + 92 + except FileNotFoundError: 93 + print(f"ERROR: Video file not found: {self.video_path}", file=sys.stderr) 94 + img = Image.new("RGB", (1, 1), color="red") 95 + buf = io.BytesIO() 96 + img.save(buf, format="PNG") 97 + return buf.getvalue() 98 + except Exception as e: 99 + print(f"ERROR: Failed to extract frame at timestamp {timestamp}: {e}", file=sys.stderr) 100 + import traceback 101 + traceback.print_exc(file=sys.stderr) 102 + img = Image.new("RGB", (1, 1), color="red") 103 + buf = io.BytesIO() 104 + img.save(buf, format="PNG") 105 + return buf.getvalue() 106 + 107 + # Crop if monitor_id specified 108 + if monitor_id: 109 + if monitor_id not in self.monitors: 110 + print(f"WARNING: Monitor ID '{monitor_id}' not found. Available: {list(self.monitors.keys())}", file=sys.stderr) 111 + else: 112 + try: 113 + box = self.monitors[monitor_id]["box"] 114 + print(f"Cropping to monitor {monitor_id} box: {box}", file=sys.stderr) 115 + img = img.crop(tuple(box)) 116 + except Exception as e: 117 + print(f"ERROR: Failed to crop frame for monitor {monitor_id}: {e}", file=sys.stderr) 118 + print(f" Box: {self.monitors[monitor_id]['box']}", file=sys.stderr) 119 + print(f" Image size: {img.size}", file=sys.stderr) 120 + 121 + # Convert to PNG bytes 122 + buf = io.BytesIO() 123 + img.save(buf, format="PNG") 124 + return buf.getvalue() 125 + 126 + 127 + def make_handler(viewer: ScreencastViewer): 128 + """Create request handler with access to viewer instance.""" 129 + 130 + class RequestHandler(BaseHTTPRequestHandler): 131 + def log_message(self, format, *args): 132 + """Suppress default request logging.""" 133 + sys.stderr.write(f"{self.address_string()} - {format % args}\n") 134 + 135 + def do_GET(self): 136 + parsed = urlparse(self.path) 137 + path = parsed.path.strip("/") 138 + query = parse_qs(parsed.query) 139 + 140 + # Parse timestamp from path (e.g., /1.5 -> 1.5 seconds) 141 + timestamp = 0.0 142 + if path: 143 + try: 144 + timestamp = float(path) 145 + except ValueError: 146 + timestamp = 0.0 147 + 148 + # Parse monitor ID from query (e.g., /?id=HDMI-1) 149 + monitor_id = query.get("id", [None])[0] 150 + 151 + # Get frame 152 + frame_data = viewer.get_frame(timestamp, monitor_id) 153 + 154 + # Send response 155 + self.send_response(200) 156 + self.send_header("Content-Type", "image/png") 157 + self.send_header("Content-Length", str(len(frame_data))) 158 + self.end_headers() 159 + self.wfile.write(frame_data) 160 + 161 + return RequestHandler 162 + 163 + 164 + def main(): 165 + parser = argparse.ArgumentParser( 166 + description="Serve screencast frames as PNG images" 167 + ) 168 + parser.add_argument("video", help="Path to screencast webm file") 169 + parser.add_argument("--port", type=int, default=9999, help="Server port (default: 9999)") 170 + args = parser.parse_args() 171 + 172 + viewer = ScreencastViewer(args.video) 173 + 174 + print(f"Serving {args.video}") 175 + print(f"Monitors: {list(viewer.monitors.keys())}") 176 + print(f"\nServer running at http://0.0.0.0:{args.port}/") 177 + print("Examples:") 178 + print(f" http://0.0.0.0:{args.port}/ - First full frame") 179 + print(f" http://0.0.0.0:{args.port}/1 - Frame at 1 second") 180 + if viewer.monitors: 181 + first_id = list(viewer.monitors.keys())[0] 182 + print(f" http://0.0.0.0:{args.port}/?id={first_id} - First frame, monitor {first_id}") 183 + print("\nPress Ctrl+C to stop") 184 + 185 + server = HTTPServer(("0.0.0.0", args.port), make_handler(viewer)) 186 + try: 187 + server.serve_forever() 188 + except KeyboardInterrupt: 189 + print("\nShutting down...") 190 + server.shutdown() 191 + 192 + 193 + if __name__ == "__main__": 194 + main()
+2
pyproject.toml
··· 43 43 "Pillow", 44 44 "numpy", 45 45 "dbus-next", 46 + "av", 46 47 47 48 "sqlite-utils", 48 49 "openai>=1.2.0", ··· 105 106 think-cortex = "think.cortex:main" 106 107 think-messages = "think.messages:main" 107 108 gnome-screencast = "observe.gnome.screencast:main" 109 + gnome-screencast-viewer = "observe.gnome.screencast_viewer:main" 108 110 109 111 [project.urls] 110 112 Homepage = "https://github.com/yourusername/sunstone"