linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""
5Portal-based multi-monitor screencast recording.
6
7Uses xdg-desktop-portal ScreenCast API with PipeWire + GStreamer to record
8each monitor as a separate file. This replaces the old GNOME Shell D-Bus approach.
9
10Extracted from solstone's observe/linux/screencast.py.
11
12Changes from monorepo version:
13- Replaces `from think.utils import get_journal` with config-based restore token path
14- Replaces `from observe.gnome.activity import get_monitor_geometries` with local activity module
15
16Runtime deps:
17 - xdg-desktop-portal with org.freedesktop.portal.ScreenCast
18 - Portal backend: xdg-desktop-portal-gnome (or -kde, -wlr, etc.)
19 - PipeWire running
20 - GStreamer with PipeWire plugin: gst-launch-1.0 pipewiresrc
21"""
22
23import asyncio
24import logging
25import os
26import signal
27import subprocess
28import uuid
29from dataclasses import dataclass
30from pathlib import Path
31
32from dbus_next import Variant, introspection
33from dbus_next.aio import MessageBus
34from dbus_next.constants import BusType
35from dbus_next.errors import (
36 DBusError,
37 InvalidIntrospectionError,
38 InvalidMemberNameError,
39)
40
41# Workaround for dbus-next issue #122: portal has properties with hyphens
42# (e.g., "power-saver-enabled") which violate strict D-Bus naming validation.
43introspection.assert_member_name_valid = lambda name: None
44
45logger = logging.getLogger(__name__)
46
47# Portal D-Bus constants
48PORTAL_BUS = "org.freedesktop.portal.Desktop"
49PORTAL_PATH = "/org/freedesktop/portal/desktop"
50SC_IFACE = "org.freedesktop.portal.ScreenCast"
51REQ_IFACE = "org.freedesktop.portal.Request"
52SESSION_IFACE = "org.freedesktop.portal.Session"
53
54
55@dataclass
56class StreamInfo:
57 """Information about a single monitor's recording stream."""
58
59 node_id: int
60 position: str
61 connector: str
62 x: int
63 y: int
64 width: int
65 height: int
66 file_path: str # Final path in segment directory
67
68 @property
69 def filename(self) -> str:
70 """Return just the filename for event payloads."""
71 return os.path.basename(self.file_path)
72
73
74def _load_restore_token(token_path: Path) -> str | None:
75 """Load restore token from disk."""
76 try:
77 data = token_path.read_text(encoding="utf-8").strip()
78 return data or None
79 except (FileNotFoundError, OSError):
80 return None
81
82
83def _save_restore_token(token: str, token_path: Path) -> None:
84 """Save restore token to disk."""
85 try:
86 token_path.parent.mkdir(parents=True, exist_ok=True)
87 token_path.write_text(token.strip() + "\n", encoding="utf-8")
88 logger.debug(f"Saved restore token to {token_path}")
89 except OSError as e:
90 logger.warning(f"Failed to save restore token: {e}")
91
92
93def _make_request_handle(bus: MessageBus, token: str) -> str:
94 """Compute expected Request object path for a handle_token."""
95 sender = bus.unique_name.lstrip(":").replace(".", "_")
96 return f"/org/freedesktop/portal/desktop/request/{sender}/{token}"
97
98
99def _prepare_request_handler(bus: MessageBus, handle: str) -> asyncio.Future:
100 """Set up signal handler for Request::Response before calling portal method."""
101 loop = asyncio.get_running_loop()
102 fut: asyncio.Future = loop.create_future()
103
104 def _message_handler(msg):
105 if (
106 msg.message_type.name == "SIGNAL"
107 and msg.path == handle
108 and msg.interface == REQ_IFACE
109 and msg.member == "Response"
110 ):
111 response = msg.body[0]
112 results = msg.body[1] if len(msg.body) > 1 else {}
113 if not fut.done():
114 fut.set_result((int(response), results))
115 bus.remove_message_handler(_message_handler)
116
117 bus.add_message_handler(_message_handler)
118 return fut
119
120
121def _variant_or_value(val):
122 """Extract value from Variant if needed."""
123 if isinstance(val, Variant):
124 return val.value
125 return val
126
127
128def _match_streams_to_monitors(streams: list[dict], monitors: list[dict]) -> list[dict]:
129 """
130 Match portal stream geometries to monitor info.
131
132 Portal streams have position (x, y) and size (width, height).
133 Monitors (from GDK or KScreen) have connector IDs and box coordinates.
134
135 Returns streams augmented with connector and position labels.
136 """
137 matched = []
138 used_position_connectors = set()
139
140 # Detect if all streams lack meaningful position data (KDE portal reports (0,0) for all)
141 all_zero_position = True
142 for stream in streams:
143 props = stream.get("props", {})
144 pos = _variant_or_value(props.get("position", (0, 0)))
145 if isinstance(pos, (tuple, list)) and len(pos) >= 2:
146 if int(pos[0]) != 0 or int(pos[1]) != 0:
147 all_zero_position = False
148 break
149
150 for stream in streams:
151 props = stream.get("props", {})
152
153 # Extract stream geometry from portal properties
154 stream_pos = _variant_or_value(props.get("position", (0, 0)))
155 stream_size = _variant_or_value(props.get("size", (0, 0)))
156
157 if isinstance(stream_pos, (tuple, list)) and len(stream_pos) >= 2:
158 sx, sy = int(stream_pos[0]), int(stream_pos[1])
159 else:
160 sx, sy = 0, 0
161
162 if isinstance(stream_size, (tuple, list)) and len(stream_size) >= 2:
163 sw, sh = int(stream_size[0]), int(stream_size[1])
164 else:
165 sw, sh = 0, 0
166
167 # Find matching monitor by geometry
168 best_match = None
169 best_overlap = 0
170
171 if not all_zero_position:
172 for monitor in monitors:
173 if monitor["id"] in used_position_connectors:
174 continue
175
176 mx1, my1, mx2, my2 = monitor["box"]
177 mw, mh = mx2 - mx1, my2 - my1
178
179 # Check if geometries match (within tolerance for scaling)
180 if abs(sx - mx1) < 10 and abs(sy - my1) < 10:
181 overlap = min(sw, mw) * min(sh, mh)
182 if overlap > best_overlap:
183 best_overlap = overlap
184 best_match = monitor
185
186 if best_match:
187 used_position_connectors.add(best_match["id"])
188 stream["connector"] = best_match["id"]
189 stream["position_label"] = best_match.get("position", "unknown")
190 stream["x"] = best_match["box"][0]
191 stream["y"] = best_match["box"][1]
192 stream["width"] = best_match["box"][2] - best_match["box"][0]
193 stream["height"] = best_match["box"][3] - best_match["box"][1]
194 else:
195 # Fallback: use stream index as identifier
196 stream["connector"] = f"monitor-{stream['idx']}"
197 stream["position_label"] = "unknown"
198 stream["x"] = sx
199 stream["y"] = sy
200 stream["width"] = sw
201 stream["height"] = sh
202
203 matched.append(stream)
204
205 unmatched_streams = [
206 stream
207 for stream in matched
208 if str(stream.get("connector", "")).startswith("monitor-")
209 ]
210 matched_connectors = {
211 stream["connector"]
212 for stream in matched
213 if not str(stream.get("connector", "")).startswith("monitor-")
214 }
215 unmatched_monitors = [
216 monitor for monitor in monitors if monitor["id"] not in matched_connectors
217 ]
218
219 for stream in unmatched_streams:
220 if not unmatched_monitors:
221 break
222
223 best_match = None
224 sw, sh = stream["width"], stream["height"]
225 for monitor in unmatched_monitors:
226 mx1, my1, mx2, my2 = monitor["box"]
227 mw, mh = mx2 - mx1, my2 - my1
228 if abs(sw - mw) <= 2 and abs(sh - mh) <= 2:
229 best_match = monitor
230 break
231
232 if best_match:
233 stream["connector"] = best_match["id"]
234 stream["position_label"] = best_match.get("position", "unknown")
235 stream["x"] = best_match["box"][0]
236 stream["y"] = best_match["box"][1]
237 stream["width"] = best_match["box"][2] - best_match["box"][0]
238 stream["height"] = best_match["box"][3] - best_match["box"][1]
239 unmatched_monitors.remove(best_match)
240
241 return matched
242
243
244class Screencaster:
245 """Portal-based multi-monitor screencast manager."""
246
247 def __init__(self, restore_token_path: Path):
248 self.bus: MessageBus | None = None
249 self.session_handle: str | None = None
250 self.pw_fd: int | None = None
251 self.gst_process: subprocess.Popen | None = None
252 self.streams: list[StreamInfo] = []
253 self._started = False
254 self._restore_token_path = restore_token_path
255
256 async def connect(self) -> bool:
257 """
258 Establish D-Bus connection and verify portal availability.
259
260 Returns:
261 True if portal is available, False otherwise.
262 """
263 if self.bus is not None:
264 return True
265
266 try:
267 self.bus = await MessageBus(
268 bus_type=BusType.SESSION,
269 negotiate_unix_fd=True,
270 ).connect()
271
272 # Verify portal interface exists
273 root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH)
274 root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro)
275 root_obj.get_interface(SC_IFACE)
276 return True
277
278 except Exception as e:
279 logger.error(f"Portal not available: {e}")
280 self.bus = None
281 return False
282
283 async def start(
284 self,
285 output_dir: str,
286 framerate: int = 1,
287 draw_cursor: bool = True,
288 ) -> list[StreamInfo]:
289 """
290 Start screencast recording for all monitors.
291
292 Files are written directly to output_dir with final names (position_connector_screen.webm).
293 The output_dir is typically a segment directory that will be renamed on completion.
294
295 Args:
296 output_dir: Directory for output files (e.g., YYYYMMDD/stream/HHMMSS.incomplete/)
297 framerate: Frames per second (default: 1)
298 draw_cursor: Whether to draw mouse cursor (default: True)
299
300 Returns:
301 List of StreamInfo for each monitor being recorded.
302
303 Raises:
304 RuntimeError: If recording fails to start.
305 """
306 if not await self.connect():
307 raise RuntimeError("Portal not available")
308
309 # Get monitor info from GDK for connector IDs
310 from .activity import get_monitor_geometries
311
312 try:
313 monitors = get_monitor_geometries()
314 except Exception as e:
315 logger.warning(f"Failed to get monitor geometries: {e}")
316 monitors = []
317
318 # Fall back to KScreen on KDE when GDK is unavailable
319 if not monitors and self.bus:
320 from .activity import get_monitor_geometries_kscreen
321
322 try:
323 monitors = await get_monitor_geometries_kscreen(self.bus)
324 except Exception as e:
325 logger.warning(f"KScreen monitor fallback failed: {e}")
326 monitors = []
327
328 # Get portal interface
329 root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH)
330 root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro)
331 screencast = root_obj.get_interface(SC_IFACE)
332
333 # 1) CreateSession
334 create_token = "h_" + uuid.uuid4().hex
335 create_handle = _make_request_handle(self.bus, create_token)
336 create_fut = _prepare_request_handler(self.bus, create_handle)
337
338 create_opts = {
339 "handle_token": Variant("s", create_token),
340 "session_handle_token": Variant("s", "s_" + uuid.uuid4().hex),
341 }
342
343 await screencast.call_create_session(create_opts)
344 resp, results = await create_fut
345 if resp != 0:
346 raise RuntimeError(f"CreateSession failed with code {resp}")
347
348 self.session_handle = str(_variant_or_value(results.get("session_handle")))
349 if not self.session_handle:
350 raise RuntimeError("CreateSession returned no session_handle")
351
352 logger.debug(f"Portal session: {self.session_handle}")
353
354 # 2) SelectSources
355 restore_token = _load_restore_token(self._restore_token_path)
356 if restore_token:
357 logger.debug("Using saved restore token")
358
359 cursor_mode = 1 if draw_cursor else 0
360
361 select_token = "h_" + uuid.uuid4().hex
362 select_handle = _make_request_handle(self.bus, select_token)
363 select_fut = _prepare_request_handler(self.bus, select_handle)
364
365 select_opts = {
366 "handle_token": Variant("s", select_token),
367 "types": Variant("u", 1), # 1 = MONITOR
368 "multiple": Variant("b", True),
369 "cursor_mode": Variant("u", cursor_mode),
370 "persist_mode": Variant("u", 2), # Persist until revoked
371 }
372 if restore_token:
373 select_opts["restore_token"] = Variant("s", restore_token)
374
375 await screencast.call_select_sources(self.session_handle, select_opts)
376 resp, _ = await select_fut
377 if resp != 0:
378 await self._close_session()
379 raise RuntimeError(f"SelectSources failed with code {resp}")
380
381 # 3) Start
382 start_token = "h_" + uuid.uuid4().hex
383 start_handle = _make_request_handle(self.bus, start_token)
384 start_fut = _prepare_request_handler(self.bus, start_handle)
385
386 start_opts = {"handle_token": Variant("s", start_token)}
387 await screencast.call_start(self.session_handle, "", start_opts)
388 resp, results = await start_fut
389 if resp != 0:
390 await self._close_session()
391 raise RuntimeError(f"Start failed with code {resp}")
392
393 portal_streams = _variant_or_value(results.get("streams")) or []
394 if not portal_streams:
395 await self._close_session()
396 raise RuntimeError("Start returned no streams")
397
398 # Save new restore token if provided
399 new_token = _variant_or_value(results.get("restore_token"))
400 if isinstance(new_token, str) and new_token.strip():
401 _save_restore_token(new_token, self._restore_token_path)
402
403 # Parse streams
404 stream_info = []
405 for idx, stream in enumerate(portal_streams):
406 try:
407 node_id = int(stream[0])
408 props = stream[1] if len(stream) > 1 else {}
409 stream_info.append({"idx": idx, "node_id": node_id, "props": props})
410 except Exception as e:
411 logger.warning(f"Could not parse stream {idx}: {e}")
412
413 if not stream_info:
414 await self._close_session()
415 raise RuntimeError("No valid streams found")
416
417 # Match streams to monitors
418 stream_info = _match_streams_to_monitors(stream_info, monitors)
419
420 logger.info(f"Portal returned {len(stream_info)} stream(s)")
421
422 # 4) OpenPipeWireRemote
423 fd_obj = await screencast.call_open_pipe_wire_remote(self.session_handle, {})
424 if hasattr(fd_obj, "take"):
425 self.pw_fd = fd_obj.take()
426 else:
427 self.pw_fd = int(fd_obj)
428
429 # 5) Build GStreamer pipeline
430 self.streams = []
431 pipeline_parts = []
432
433 for info in stream_info:
434 node_id = info["node_id"]
435 position = info["position_label"]
436 connector = info["connector"]
437
438 # Final file path: position_connector_screen.webm
439 file_path = os.path.join(output_dir, f"{position}_{connector}_screen.webm")
440
441 stream_obj = StreamInfo(
442 node_id=node_id,
443 position=position,
444 connector=connector,
445 x=info["x"],
446 y=info["y"],
447 width=info["width"],
448 height=info["height"],
449 file_path=file_path,
450 )
451 self.streams.append(stream_obj)
452
453 # GStreamer branch for this stream
454 branch = (
455 f"pipewiresrc fd={self.pw_fd} path={node_id} ! "
456 f"videorate ! video/x-raw,framerate={framerate}/1 ! "
457 f"videoconvert ! vp8enc end-usage=cq cq-level=4 max-quantizer=15 "
458 f"keyframe-max-dist=30 static-threshold=100 ! webmmux ! "
459 f"filesink location={file_path}"
460 )
461 pipeline_parts.append(branch)
462
463 logger.info(f" Stream {node_id}: {position} ({connector}) -> {file_path}")
464
465 pipeline_str = " ".join(pipeline_parts)
466 cmd = ["gst-launch-1.0", "-e"] + pipeline_str.split()
467
468 try:
469 self.gst_process = subprocess.Popen(
470 cmd,
471 pass_fds=(self.pw_fd,),
472 stdout=subprocess.DEVNULL,
473 stderr=subprocess.PIPE,
474 )
475 except FileNotFoundError:
476 await self._close_session()
477 raise RuntimeError("gst-launch-1.0 not found")
478 except Exception as e:
479 await self._close_session()
480 raise RuntimeError(f"Failed to start GStreamer: {e}")
481
482 # Brief delay to check for immediate failure
483 await asyncio.sleep(0.2)
484 if self.gst_process.poll() is not None:
485 stderr = (
486 self.gst_process.stderr.read().decode()
487 if self.gst_process.stderr
488 else ""
489 )
490 await self._close_session()
491 raise RuntimeError(f"GStreamer exited immediately: {stderr[:200]}")
492
493 self._started = True
494 return self.streams
495
496 async def stop(self) -> list[StreamInfo]:
497 """
498 Stop screencast recording gracefully.
499
500 Returns:
501 List of StreamInfo with file_path for the recorded files.
502 """
503 streams = self.streams.copy()
504
505 # Stop GStreamer with SIGINT for clean EOS
506 if self.gst_process and self.gst_process.poll() is None:
507 try:
508 self.gst_process.send_signal(signal.SIGINT)
509 try:
510 await asyncio.wait_for(
511 asyncio.to_thread(self.gst_process.wait),
512 timeout=5.0,
513 )
514 except asyncio.TimeoutError:
515 logger.warning("GStreamer did not exit cleanly, killing")
516 self.gst_process.kill()
517 self.gst_process.wait()
518 except Exception as e:
519 logger.warning(f"Error stopping GStreamer: {e}")
520
521 self.gst_process = None
522
523 # Close PipeWire fd
524 if self.pw_fd is not None:
525 try:
526 os.close(self.pw_fd)
527 except OSError:
528 pass
529 self.pw_fd = None
530
531 # Close portal session
532 await self._close_session()
533
534 self.streams = []
535 self._started = False
536
537 return streams
538
539 async def _close_session(self):
540 """Close the portal session."""
541 if self.session_handle and self.bus:
542 try:
543 session_intro = await self.bus.introspect(
544 PORTAL_BUS, self.session_handle
545 )
546 session_obj = self.bus.get_proxy_object(
547 PORTAL_BUS, self.session_handle, session_intro
548 )
549 session_iface = session_obj.get_interface(SESSION_IFACE)
550 await session_iface.call_close()
551 except (
552 DBusError,
553 InvalidMemberNameError,
554 InvalidIntrospectionError,
555 OSError,
556 ) as exc:
557 logger.warning(
558 "_close_session failed: service=%s path=%s: %s: %s",
559 PORTAL_BUS,
560 self.session_handle,
561 type(exc).__name__,
562 exc,
563 )
564 self.session_handle = None
565
566 def is_healthy(self) -> bool:
567 """Check if recording is still running."""
568 if not self._started:
569 return False
570 if self.gst_process is None:
571 return False
572 return self.gst_process.poll() is None