personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(observe): add screencast with embedded monitor layout metadata

Enables video recordings to carry monitor geometry and position data for multi-monitor layout reconstruction and analysis.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+262 -8
+1
observe/__init__.py
··· 1 + """Observe - System observation and recording utilities."""
+1
observe/gnome/__init__.py
··· 1 + """GNOME-specific observation utilities."""
+178
observe/gnome/screencast.py
··· 1 + #!/usr/bin/env python3 2 + """ 3 + gnome_screencast.py — minimal GNOME Shell screencast via D-Bus (no portals) 4 + 5 + Requirements: 6 + pip install dbus-next 7 + 8 + Examples: 9 + # 10s sample at 30 fps with cursor, saved to ./screencast.webm 10 + python gnome_screencast.py --screencast 10 11 + 12 + # 5s sample at 15 fps without cursor, custom path 13 + python gnome_screencast.py --screencast 5 --fps 15 --no-cursor --out /tmp/sample.webm 14 + """ 15 + 16 + import asyncio 17 + import os 18 + import sys 19 + import signal 20 + import argparse 21 + import subprocess 22 + from typing import Tuple 23 + 24 + from dbus_next.aio import MessageBus 25 + from dbus_next.constants import BusType 26 + from dbus_next import Variant 27 + 28 + from see.screen_dbus import get_monitor_geometries 29 + 30 + SCREencast_BUS = "org.gnome.Shell.Screencast" 31 + SCREencast_PATH = "/org/gnome/Shell/Screencast" 32 + SCREencast_IFACE = "org.gnome.Shell.Screencast" 33 + 34 + 35 + class Screencaster: 36 + def __init__(self): 37 + self.bus: MessageBus | None = None 38 + self.iface = None 39 + self._started = False 40 + 41 + async def connect(self): 42 + self.bus = await MessageBus(bus_type=BusType.SESSION).connect() 43 + introspection = await self.bus.introspect(SCREencast_BUS, SCREencast_PATH) 44 + obj = self.bus.get_proxy_object(SCREencast_BUS, SCREencast_PATH, introspection) 45 + self.iface = obj.get_interface(SCREencast_IFACE) 46 + 47 + async def start(self, out_path: str, framerate: int = 30, draw_cursor: bool = True) -> Tuple[bool, str]: 48 + """ 49 + Call org.gnome.Shell.Screencast.Screencast("file://...", { 'framerate': u, 'draw-cursor': b }) 50 + Returns (ok: bool, resolved_output_path: str) 51 + """ 52 + if self.iface is None: 53 + await self.connect() 54 + 55 + # GNOME expects a file:// URI (not a plain filesystem path) 56 + uri = f"{out_path}" 57 + options = { 58 + "framerate": Variant("u", int(framerate)), 59 + "draw-cursor": Variant("b", bool(draw_cursor)), 60 + # Additional options that GNOME understands exist, but these two keep it simple and robust. 61 + } 62 + 63 + ok, resolved = await self.iface.call_screencast(uri, options) 64 + self._started = bool(ok) 65 + return bool(ok), resolved 66 + 67 + async def stop(self): 68 + if self.iface is None: 69 + return 70 + try: 71 + await self.iface.call_stop_screencast() 72 + finally: 73 + self._started = False 74 + 75 + @property 76 + def started(self) -> bool: 77 + return self._started 78 + 79 + 80 + async def run_screencast(duration_s: int, out_path: str, fps: int, draw_cursor: bool) -> int: 81 + # Capture monitor geometries before starting recording 82 + geometries = get_monitor_geometries() 83 + 84 + sc = Screencaster() 85 + ok, resolved_path = await sc.start(out_path, fps, draw_cursor) 86 + if not ok: 87 + print("ERROR: Failed to start screencast.", file=sys.stderr) 88 + return 1 89 + 90 + print(f"Recording… ({duration_s}s) -> {resolved_path}") 91 + 92 + # Graceful Ctrl-C handling (stop and exit) 93 + stop_event = asyncio.Event() 94 + 95 + def _signal_handler(): 96 + stop_event.set() 97 + 98 + loop = asyncio.get_running_loop() 99 + for sig in (signal.SIGINT, signal.SIGTERM): 100 + try: 101 + loop.add_signal_handler(sig, _signal_handler) 102 + except NotImplementedError: 103 + # Windows / restricted environments 104 + pass 105 + 106 + try: 107 + # Wait for either duration elapsed or a signal 108 + done = asyncio.create_task(asyncio.sleep(duration_s)) 109 + interrupted = asyncio.create_task(stop_event.wait()) 110 + await asyncio.wait({done, interrupted}, return_when=asyncio.FIRST_COMPLETED) 111 + finally: 112 + await sc.stop() 113 + print("Stopped.") 114 + 115 + # Update video title with monitor dimensions 116 + # Format: "connector-id:position,x1,y1,x2,y2 connector-id:position,x1,y1,x2,y2 ..." 117 + title_parts = [] 118 + for geom_info in geometries: 119 + x1, y1, x2, y2 = geom_info["box"] 120 + title_parts.append(f"{geom_info['id']}:{geom_info['position']},{x1},{y1},{x2},{y2}") 121 + title = " ".join(title_parts) 122 + 123 + try: 124 + subprocess.run( 125 + ["mkvpropedit", resolved_path, "--edit", "info", "--set", f"title={title}"], 126 + check=True, 127 + capture_output=True, 128 + text=True 129 + ) 130 + print(f"Updated video title with monitor dimensions: {title}") 131 + except subprocess.CalledProcessError as e: 132 + print(f"Warning: Failed to update video title: {e.stderr}", file=sys.stderr) 133 + except FileNotFoundError: 134 + print("Warning: mkvpropedit not found, skipping title update", file=sys.stderr) 135 + 136 + return 0 137 + 138 + 139 + def parse_args(argv: list[str]) -> argparse.Namespace: 140 + p = argparse.ArgumentParser(description="Minimal GNOME Shell screencast via D-Bus.") 141 + p.add_argument("--screencast", type=int, metavar="SECONDS", 142 + help="Record a screencast for the given number of seconds, then stop.") 143 + p.add_argument("--out", default="./screencast.webm", 144 + help="Output file path for the screencast (default: ./screencast.webm).") 145 + p.add_argument("--fps", type=int, default=30, 146 + help="Framerate for the screencast (default: 30).") 147 + p.add_argument("--no-cursor", action="store_true", 148 + help="Do not draw the mouse cursor.") 149 + return p.parse_args(argv) 150 + 151 + 152 + def main(): 153 + args = parse_args(sys.argv[1:]) 154 + 155 + if args.screencast is None: 156 + print("Nothing to do. Example:\n python gnome_screencast.py --screencast 10\n", file=sys.stderr) 157 + sys.exit(2) 158 + 159 + # Basic sanity on FPS 160 + fps = max(1, int(args.fps)) 161 + 162 + try: 163 + rc = asyncio.run(run_screencast( 164 + duration_s=int(args.screencast), 165 + out_path=args.out, 166 + fps=fps, 167 + draw_cursor=not args.no_cursor, 168 + )) 169 + sys.exit(rc) 170 + except Exception as e: 171 + # Common issues: not running GNOME Shell; calling from a non-GNOME compositor session. 172 + print(f"ERROR: {e}", file=sys.stderr) 173 + sys.exit(1) 174 + 175 + 176 + if __name__ == "__main__": 177 + main() 178 +
+2 -1
pyproject.toml
··· 104 104 think-detect-created = "think.detect_created:main" 105 105 think-cortex = "think.cortex:main" 106 106 think-messages = "think.messages:main" 107 + gnome-screencast = "observe.gnome.screencast:main" 107 108 108 109 [project.urls] 109 110 Homepage = "https://github.com/yourusername/sunstone" ··· 112 113 "Bug Tracker" = "https://github.com/yourusername/sunstone/issues" 113 114 114 115 [tool.setuptools.packages.find] 115 - include = ["hear*", "see*", "think*", "dream*"] 116 + include = ["hear*", "see*", "think*", "dream*", "observe*"] 116 117 117 118 [tool.setuptools.package-data] 118 119 hear = ["*.txt"]
+77 -7
see/screen_dbus.py
··· 109 109 110 110 111 111 def get_monitor_geometries(): 112 + """ 113 + Get structured monitor information. 114 + 115 + Returns: 116 + List of dicts with format: 117 + [{"id": "connector-id", "box": [x1, y1, x2, y2], "position": "center|left|right|top|bottom|left-top|..."}, ...] 118 + where box contains [left, top, right, bottom] coordinates 119 + """ 112 120 # Initialize GTK before using GDK functions 113 121 Gtk.init() 114 122 ··· 122 130 raise RuntimeError("No display available") 123 131 # In GTK 4, get_monitors() returns a list of Gdk.Monitor objects. 124 132 monitors = display.get_monitors() 125 - geometries = [] 133 + 134 + # First pass: collect all geometries and compute union bounding box 135 + monitor_data = [] 126 136 for monitor in monitors: 127 - geom = ( 128 - monitor.get_geometry() 129 - ) # geom is a Gdk.Rectangle with attributes: x, y, width, height 130 - geometries.append(geom) 137 + geom = monitor.get_geometry() 138 + connector = monitor.get_connector() or f"monitor-{len(monitor_data)}" 139 + monitor_data.append({ 140 + "monitor": monitor, 141 + "connector": connector, 142 + "x": geom.x, 143 + "y": geom.y, 144 + "width": geom.width, 145 + "height": geom.height 146 + }) 147 + 148 + # Compute union bounding box 149 + min_x = min(m["x"] for m in monitor_data) 150 + min_y = min(m["y"] for m in monitor_data) 151 + max_x = max(m["x"] + m["width"] for m in monitor_data) 152 + max_y = max(m["y"] + m["height"] for m in monitor_data) 153 + 154 + # Compute midlines 155 + union_mid_x = (min_x + max_x) / 2 156 + union_mid_y = (min_y + max_y) / 2 157 + 158 + # Epsilon for intersection detection (1 pixel tolerance) 159 + epsilon = 1 160 + 161 + # Second pass: assign positions based on midline intersections 162 + geometries = [] 163 + for m in monitor_data: 164 + x_left = m["x"] 165 + x_right = m["x"] + m["width"] 166 + y_top = m["y"] 167 + y_bottom = m["y"] + m["height"] 168 + 169 + # Horizontal position 170 + if x_left <= union_mid_x + epsilon and x_right >= union_mid_x - epsilon: 171 + h_pos = "center" 172 + elif x_right < union_mid_x - epsilon: 173 + h_pos = "left" 174 + else: 175 + h_pos = "right" 176 + 177 + # Vertical position 178 + if y_top <= union_mid_y + epsilon and y_bottom >= union_mid_y - epsilon: 179 + v_pos = "center" 180 + elif y_bottom < union_mid_y - epsilon: 181 + v_pos = "top" 182 + else: 183 + v_pos = "bottom" 184 + 185 + # Combine positions 186 + if h_pos == "center" and v_pos == "center": 187 + position = "center" 188 + elif h_pos == "center": 189 + position = v_pos 190 + elif v_pos == "center": 191 + position = h_pos 192 + else: 193 + position = f"{h_pos}-{v_pos}" 194 + 195 + geometries.append({ 196 + "id": m["connector"], 197 + "box": [m["x"], m["y"], m["x"] + m["width"], m["y"] + m["height"]], 198 + "position": position 199 + }) 200 + 131 201 return geometries 132 202 133 203 ··· 156 226 im = Image.open(io.BytesIO(screenshot_data)) 157 227 geometries = get_monitor_geometries() 158 228 monitor_images = [] 159 - for geom in geometries: 160 - box = (geom.x, geom.y, geom.x + geom.width, geom.y + geom.height) 229 + for geom_info in geometries: 230 + box = tuple(geom_info["box"]) # [x, y, x+w, y+h] 161 231 monitor_img = im.crop(box) 162 232 monitor_images.append(monitor_img) 163 233 last_screenshot_timestamp = now
+3
tests/conftest.py
··· 115 115 screen_dbus.screen_snap = lambda: [] 116 116 screen_dbus.idle_time_ms = lambda: 0 117 117 screen_dbus.check_screen_state = lambda: {"locked": False, "power_save": False} 118 + screen_dbus.get_monitor_geometries = lambda: [ 119 + {"id": "HDMI-1", "box": [0, 0, 1920, 1080], "position": "center"} 120 + ] 118 121 sys.modules["see.screen_dbus"] = screen_dbus 119 122 sys.modules["screen_dbus"] = screen_dbus 120 123 google_mod = sys.modules.get("google", types.ModuleType("google"))