linux observer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 572 lines 20 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4""" 5Portal-based multi-monitor screencast recording. 6 7Uses xdg-desktop-portal ScreenCast API with PipeWire + GStreamer to record 8each monitor as a separate file. This replaces the old GNOME Shell D-Bus approach. 9 10Extracted from solstone's observe/linux/screencast.py. 11 12Changes from monorepo version: 13- Replaces `from think.utils import get_journal` with config-based restore token path 14- Replaces `from observe.gnome.activity import get_monitor_geometries` with local activity module 15 16Runtime deps: 17 - xdg-desktop-portal with org.freedesktop.portal.ScreenCast 18 - Portal backend: xdg-desktop-portal-gnome (or -kde, -wlr, etc.) 19 - PipeWire running 20 - GStreamer with PipeWire plugin: gst-launch-1.0 pipewiresrc 21""" 22 23import asyncio 24import logging 25import os 26import signal 27import subprocess 28import uuid 29from dataclasses import dataclass 30from pathlib import Path 31 32from dbus_next import Variant, introspection 33from dbus_next.aio import MessageBus 34from dbus_next.constants import BusType 35from dbus_next.errors import ( 36 DBusError, 37 InvalidIntrospectionError, 38 InvalidMemberNameError, 39) 40 41# Workaround for dbus-next issue #122: portal has properties with hyphens 42# (e.g., "power-saver-enabled") which violate strict D-Bus naming validation. 43introspection.assert_member_name_valid = lambda name: None 44 45logger = logging.getLogger(__name__) 46 47# Portal D-Bus constants 48PORTAL_BUS = "org.freedesktop.portal.Desktop" 49PORTAL_PATH = "/org/freedesktop/portal/desktop" 50SC_IFACE = "org.freedesktop.portal.ScreenCast" 51REQ_IFACE = "org.freedesktop.portal.Request" 52SESSION_IFACE = "org.freedesktop.portal.Session" 53 54 55@dataclass 56class StreamInfo: 57 """Information about a single monitor's recording stream.""" 58 59 node_id: int 60 position: str 61 connector: str 62 x: int 63 y: int 64 width: int 65 height: int 66 file_path: str # Final path in segment directory 67 68 @property 69 def filename(self) -> str: 70 """Return just the filename for event payloads.""" 71 return os.path.basename(self.file_path) 72 73 74def _load_restore_token(token_path: Path) -> str | None: 75 """Load restore token from disk.""" 76 try: 77 data = token_path.read_text(encoding="utf-8").strip() 78 return data or None 79 except (FileNotFoundError, OSError): 80 return None 81 82 83def _save_restore_token(token: str, token_path: Path) -> None: 84 """Save restore token to disk.""" 85 try: 86 token_path.parent.mkdir(parents=True, exist_ok=True) 87 token_path.write_text(token.strip() + "\n", encoding="utf-8") 88 logger.debug(f"Saved restore token to {token_path}") 89 except OSError as e: 90 logger.warning(f"Failed to save restore token: {e}") 91 92 93def _make_request_handle(bus: MessageBus, token: str) -> str: 94 """Compute expected Request object path for a handle_token.""" 95 sender = bus.unique_name.lstrip(":").replace(".", "_") 96 return f"/org/freedesktop/portal/desktop/request/{sender}/{token}" 97 98 99def _prepare_request_handler(bus: MessageBus, handle: str) -> asyncio.Future: 100 """Set up signal handler for Request::Response before calling portal method.""" 101 loop = asyncio.get_running_loop() 102 fut: asyncio.Future = loop.create_future() 103 104 def _message_handler(msg): 105 if ( 106 msg.message_type.name == "SIGNAL" 107 and msg.path == handle 108 and msg.interface == REQ_IFACE 109 and msg.member == "Response" 110 ): 111 response = msg.body[0] 112 results = msg.body[1] if len(msg.body) > 1 else {} 113 if not fut.done(): 114 fut.set_result((int(response), results)) 115 bus.remove_message_handler(_message_handler) 116 117 bus.add_message_handler(_message_handler) 118 return fut 119 120 121def _variant_or_value(val): 122 """Extract value from Variant if needed.""" 123 if isinstance(val, Variant): 124 return val.value 125 return val 126 127 128def _match_streams_to_monitors(streams: list[dict], monitors: list[dict]) -> list[dict]: 129 """ 130 Match portal stream geometries to monitor info. 131 132 Portal streams have position (x, y) and size (width, height). 133 Monitors (from GDK or KScreen) have connector IDs and box coordinates. 134 135 Returns streams augmented with connector and position labels. 136 """ 137 matched = [] 138 used_position_connectors = set() 139 140 # Detect if all streams lack meaningful position data (KDE portal reports (0,0) for all) 141 all_zero_position = True 142 for stream in streams: 143 props = stream.get("props", {}) 144 pos = _variant_or_value(props.get("position", (0, 0))) 145 if isinstance(pos, (tuple, list)) and len(pos) >= 2: 146 if int(pos[0]) != 0 or int(pos[1]) != 0: 147 all_zero_position = False 148 break 149 150 for stream in streams: 151 props = stream.get("props", {}) 152 153 # Extract stream geometry from portal properties 154 stream_pos = _variant_or_value(props.get("position", (0, 0))) 155 stream_size = _variant_or_value(props.get("size", (0, 0))) 156 157 if isinstance(stream_pos, (tuple, list)) and len(stream_pos) >= 2: 158 sx, sy = int(stream_pos[0]), int(stream_pos[1]) 159 else: 160 sx, sy = 0, 0 161 162 if isinstance(stream_size, (tuple, list)) and len(stream_size) >= 2: 163 sw, sh = int(stream_size[0]), int(stream_size[1]) 164 else: 165 sw, sh = 0, 0 166 167 # Find matching monitor by geometry 168 best_match = None 169 best_overlap = 0 170 171 if not all_zero_position: 172 for monitor in monitors: 173 if monitor["id"] in used_position_connectors: 174 continue 175 176 mx1, my1, mx2, my2 = monitor["box"] 177 mw, mh = mx2 - mx1, my2 - my1 178 179 # Check if geometries match (within tolerance for scaling) 180 if abs(sx - mx1) < 10 and abs(sy - my1) < 10: 181 overlap = min(sw, mw) * min(sh, mh) 182 if overlap > best_overlap: 183 best_overlap = overlap 184 best_match = monitor 185 186 if best_match: 187 used_position_connectors.add(best_match["id"]) 188 stream["connector"] = best_match["id"] 189 stream["position_label"] = best_match.get("position", "unknown") 190 stream["x"] = best_match["box"][0] 191 stream["y"] = best_match["box"][1] 192 stream["width"] = best_match["box"][2] - best_match["box"][0] 193 stream["height"] = best_match["box"][3] - best_match["box"][1] 194 else: 195 # Fallback: use stream index as identifier 196 stream["connector"] = f"monitor-{stream['idx']}" 197 stream["position_label"] = "unknown" 198 stream["x"] = sx 199 stream["y"] = sy 200 stream["width"] = sw 201 stream["height"] = sh 202 203 matched.append(stream) 204 205 unmatched_streams = [ 206 stream 207 for stream in matched 208 if str(stream.get("connector", "")).startswith("monitor-") 209 ] 210 matched_connectors = { 211 stream["connector"] 212 for stream in matched 213 if not str(stream.get("connector", "")).startswith("monitor-") 214 } 215 unmatched_monitors = [ 216 monitor for monitor in monitors if monitor["id"] not in matched_connectors 217 ] 218 219 for stream in unmatched_streams: 220 if not unmatched_monitors: 221 break 222 223 best_match = None 224 sw, sh = stream["width"], stream["height"] 225 for monitor in unmatched_monitors: 226 mx1, my1, mx2, my2 = monitor["box"] 227 mw, mh = mx2 - mx1, my2 - my1 228 if abs(sw - mw) <= 2 and abs(sh - mh) <= 2: 229 best_match = monitor 230 break 231 232 if best_match: 233 stream["connector"] = best_match["id"] 234 stream["position_label"] = best_match.get("position", "unknown") 235 stream["x"] = best_match["box"][0] 236 stream["y"] = best_match["box"][1] 237 stream["width"] = best_match["box"][2] - best_match["box"][0] 238 stream["height"] = best_match["box"][3] - best_match["box"][1] 239 unmatched_monitors.remove(best_match) 240 241 return matched 242 243 244class Screencaster: 245 """Portal-based multi-monitor screencast manager.""" 246 247 def __init__(self, restore_token_path: Path): 248 self.bus: MessageBus | None = None 249 self.session_handle: str | None = None 250 self.pw_fd: int | None = None 251 self.gst_process: subprocess.Popen | None = None 252 self.streams: list[StreamInfo] = [] 253 self._started = False 254 self._restore_token_path = restore_token_path 255 256 async def connect(self) -> bool: 257 """ 258 Establish D-Bus connection and verify portal availability. 259 260 Returns: 261 True if portal is available, False otherwise. 262 """ 263 if self.bus is not None: 264 return True 265 266 try: 267 self.bus = await MessageBus( 268 bus_type=BusType.SESSION, 269 negotiate_unix_fd=True, 270 ).connect() 271 272 # Verify portal interface exists 273 root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH) 274 root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro) 275 root_obj.get_interface(SC_IFACE) 276 return True 277 278 except Exception as e: 279 logger.error(f"Portal not available: {e}") 280 self.bus = None 281 return False 282 283 async def start( 284 self, 285 output_dir: str, 286 framerate: int = 1, 287 draw_cursor: bool = True, 288 ) -> list[StreamInfo]: 289 """ 290 Start screencast recording for all monitors. 291 292 Files are written directly to output_dir with final names (position_connector_screen.webm). 293 The output_dir is typically a segment directory that will be renamed on completion. 294 295 Args: 296 output_dir: Directory for output files (e.g., YYYYMMDD/stream/HHMMSS.incomplete/) 297 framerate: Frames per second (default: 1) 298 draw_cursor: Whether to draw mouse cursor (default: True) 299 300 Returns: 301 List of StreamInfo for each monitor being recorded. 302 303 Raises: 304 RuntimeError: If recording fails to start. 305 """ 306 if not await self.connect(): 307 raise RuntimeError("Portal not available") 308 309 # Get monitor info from GDK for connector IDs 310 from .activity import get_monitor_geometries 311 312 try: 313 monitors = get_monitor_geometries() 314 except Exception as e: 315 logger.warning(f"Failed to get monitor geometries: {e}") 316 monitors = [] 317 318 # Fall back to KScreen on KDE when GDK is unavailable 319 if not monitors and self.bus: 320 from .activity import get_monitor_geometries_kscreen 321 322 try: 323 monitors = await get_monitor_geometries_kscreen(self.bus) 324 except Exception as e: 325 logger.warning(f"KScreen monitor fallback failed: {e}") 326 monitors = [] 327 328 # Get portal interface 329 root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH) 330 root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro) 331 screencast = root_obj.get_interface(SC_IFACE) 332 333 # 1) CreateSession 334 create_token = "h_" + uuid.uuid4().hex 335 create_handle = _make_request_handle(self.bus, create_token) 336 create_fut = _prepare_request_handler(self.bus, create_handle) 337 338 create_opts = { 339 "handle_token": Variant("s", create_token), 340 "session_handle_token": Variant("s", "s_" + uuid.uuid4().hex), 341 } 342 343 await screencast.call_create_session(create_opts) 344 resp, results = await create_fut 345 if resp != 0: 346 raise RuntimeError(f"CreateSession failed with code {resp}") 347 348 self.session_handle = str(_variant_or_value(results.get("session_handle"))) 349 if not self.session_handle: 350 raise RuntimeError("CreateSession returned no session_handle") 351 352 logger.debug(f"Portal session: {self.session_handle}") 353 354 # 2) SelectSources 355 restore_token = _load_restore_token(self._restore_token_path) 356 if restore_token: 357 logger.debug("Using saved restore token") 358 359 cursor_mode = 1 if draw_cursor else 0 360 361 select_token = "h_" + uuid.uuid4().hex 362 select_handle = _make_request_handle(self.bus, select_token) 363 select_fut = _prepare_request_handler(self.bus, select_handle) 364 365 select_opts = { 366 "handle_token": Variant("s", select_token), 367 "types": Variant("u", 1), # 1 = MONITOR 368 "multiple": Variant("b", True), 369 "cursor_mode": Variant("u", cursor_mode), 370 "persist_mode": Variant("u", 2), # Persist until revoked 371 } 372 if restore_token: 373 select_opts["restore_token"] = Variant("s", restore_token) 374 375 await screencast.call_select_sources(self.session_handle, select_opts) 376 resp, _ = await select_fut 377 if resp != 0: 378 await self._close_session() 379 raise RuntimeError(f"SelectSources failed with code {resp}") 380 381 # 3) Start 382 start_token = "h_" + uuid.uuid4().hex 383 start_handle = _make_request_handle(self.bus, start_token) 384 start_fut = _prepare_request_handler(self.bus, start_handle) 385 386 start_opts = {"handle_token": Variant("s", start_token)} 387 await screencast.call_start(self.session_handle, "", start_opts) 388 resp, results = await start_fut 389 if resp != 0: 390 await self._close_session() 391 raise RuntimeError(f"Start failed with code {resp}") 392 393 portal_streams = _variant_or_value(results.get("streams")) or [] 394 if not portal_streams: 395 await self._close_session() 396 raise RuntimeError("Start returned no streams") 397 398 # Save new restore token if provided 399 new_token = _variant_or_value(results.get("restore_token")) 400 if isinstance(new_token, str) and new_token.strip(): 401 _save_restore_token(new_token, self._restore_token_path) 402 403 # Parse streams 404 stream_info = [] 405 for idx, stream in enumerate(portal_streams): 406 try: 407 node_id = int(stream[0]) 408 props = stream[1] if len(stream) > 1 else {} 409 stream_info.append({"idx": idx, "node_id": node_id, "props": props}) 410 except Exception as e: 411 logger.warning(f"Could not parse stream {idx}: {e}") 412 413 if not stream_info: 414 await self._close_session() 415 raise RuntimeError("No valid streams found") 416 417 # Match streams to monitors 418 stream_info = _match_streams_to_monitors(stream_info, monitors) 419 420 logger.info(f"Portal returned {len(stream_info)} stream(s)") 421 422 # 4) OpenPipeWireRemote 423 fd_obj = await screencast.call_open_pipe_wire_remote(self.session_handle, {}) 424 if hasattr(fd_obj, "take"): 425 self.pw_fd = fd_obj.take() 426 else: 427 self.pw_fd = int(fd_obj) 428 429 # 5) Build GStreamer pipeline 430 self.streams = [] 431 pipeline_parts = [] 432 433 for info in stream_info: 434 node_id = info["node_id"] 435 position = info["position_label"] 436 connector = info["connector"] 437 438 # Final file path: position_connector_screen.webm 439 file_path = os.path.join(output_dir, f"{position}_{connector}_screen.webm") 440 441 stream_obj = StreamInfo( 442 node_id=node_id, 443 position=position, 444 connector=connector, 445 x=info["x"], 446 y=info["y"], 447 width=info["width"], 448 height=info["height"], 449 file_path=file_path, 450 ) 451 self.streams.append(stream_obj) 452 453 # GStreamer branch for this stream 454 branch = ( 455 f"pipewiresrc fd={self.pw_fd} path={node_id} ! " 456 f"videorate ! video/x-raw,framerate={framerate}/1 ! " 457 f"videoconvert ! vp8enc end-usage=cq cq-level=4 max-quantizer=15 " 458 f"keyframe-max-dist=30 static-threshold=100 ! webmmux ! " 459 f"filesink location={file_path}" 460 ) 461 pipeline_parts.append(branch) 462 463 logger.info(f" Stream {node_id}: {position} ({connector}) -> {file_path}") 464 465 pipeline_str = " ".join(pipeline_parts) 466 cmd = ["gst-launch-1.0", "-e"] + pipeline_str.split() 467 468 try: 469 self.gst_process = subprocess.Popen( 470 cmd, 471 pass_fds=(self.pw_fd,), 472 stdout=subprocess.DEVNULL, 473 stderr=subprocess.PIPE, 474 ) 475 except FileNotFoundError: 476 await self._close_session() 477 raise RuntimeError("gst-launch-1.0 not found") 478 except Exception as e: 479 await self._close_session() 480 raise RuntimeError(f"Failed to start GStreamer: {e}") 481 482 # Brief delay to check for immediate failure 483 await asyncio.sleep(0.2) 484 if self.gst_process.poll() is not None: 485 stderr = ( 486 self.gst_process.stderr.read().decode() 487 if self.gst_process.stderr 488 else "" 489 ) 490 await self._close_session() 491 raise RuntimeError(f"GStreamer exited immediately: {stderr[:200]}") 492 493 self._started = True 494 return self.streams 495 496 async def stop(self) -> list[StreamInfo]: 497 """ 498 Stop screencast recording gracefully. 499 500 Returns: 501 List of StreamInfo with file_path for the recorded files. 502 """ 503 streams = self.streams.copy() 504 505 # Stop GStreamer with SIGINT for clean EOS 506 if self.gst_process and self.gst_process.poll() is None: 507 try: 508 self.gst_process.send_signal(signal.SIGINT) 509 try: 510 await asyncio.wait_for( 511 asyncio.to_thread(self.gst_process.wait), 512 timeout=5.0, 513 ) 514 except asyncio.TimeoutError: 515 logger.warning("GStreamer did not exit cleanly, killing") 516 self.gst_process.kill() 517 self.gst_process.wait() 518 except Exception as e: 519 logger.warning(f"Error stopping GStreamer: {e}") 520 521 self.gst_process = None 522 523 # Close PipeWire fd 524 if self.pw_fd is not None: 525 try: 526 os.close(self.pw_fd) 527 except OSError: 528 pass 529 self.pw_fd = None 530 531 # Close portal session 532 await self._close_session() 533 534 self.streams = [] 535 self._started = False 536 537 return streams 538 539 async def _close_session(self): 540 """Close the portal session.""" 541 if self.session_handle and self.bus: 542 try: 543 session_intro = await self.bus.introspect( 544 PORTAL_BUS, self.session_handle 545 ) 546 session_obj = self.bus.get_proxy_object( 547 PORTAL_BUS, self.session_handle, session_intro 548 ) 549 session_iface = session_obj.get_interface(SESSION_IFACE) 550 await session_iface.call_close() 551 except ( 552 DBusError, 553 InvalidMemberNameError, 554 InvalidIntrospectionError, 555 OSError, 556 ) as exc: 557 logger.warning( 558 "_close_session failed: service=%s path=%s: %s: %s", 559 PORTAL_BUS, 560 self.session_handle, 561 type(exc).__name__, 562 exc, 563 ) 564 self.session_handle = None 565 566 def is_healthy(self) -> bool: 567 """Check if recording is still running.""" 568 if not self._started: 569 return False 570 if self.gst_process is None: 571 return False 572 return self.gst_process.poll() is None