personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at scratch/segment-sense-rd 480 lines 18 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Transcript viewer app - browse and playback daily transcripts.""" 5 6from __future__ import annotations 7 8import json 9import os 10import re 11import shutil 12from datetime import date 13from glob import glob 14from pathlib import Path 15from typing import Any 16 17from flask import ( 18 Blueprint, 19 jsonify, 20 redirect, 21 render_template, 22 send_file, 23 url_for, 24) 25 26from apps.utils import log_app_action 27from convey import emit, state 28from convey.utils import DATE_RE, error_response, format_date, success_response 29from observe.hear import format_audio 30from observe.screen import format_screen 31from observe.utils import AUDIO_EXTENSIONS, VIDEO_EXTENSIONS 32from think.cluster import cluster_scan, cluster_segments 33from think.entities.journal import get_journal_principal, load_journal_entity 34from think.models import get_usage_cost 35from think.utils import day_dirs, day_path, segment_path 36from think.utils import segment_key as validate_segment_key 37 38# Regex for HHMMSS time format validation 39TIME_RE = re.compile(r"\d{6}") 40 41transcripts_bp = Blueprint( 42 "app:transcripts", 43 __name__, 44 url_prefix="/app/transcripts", 45) 46 47 48@transcripts_bp.route("/") 49def index() -> Any: 50 """Redirect to the most recent day with segments, falling back to today.""" 51 today = date.today().strftime("%Y%m%d") 52 if cluster_segments(today): 53 return redirect(url_for("app:transcripts.transcripts_day", day=today)) 54 for day in sorted(day_dirs().keys(), reverse=True): 55 if cluster_segments(day): 56 return redirect(url_for("app:transcripts.transcripts_day", day=day)) 57 return redirect(url_for("app:transcripts.transcripts_day", day=today)) 58 59 60@transcripts_bp.route("/<day>") 61def transcripts_day(day: str) -> str: 62 """Render transcript viewer for a specific day.""" 63 if not DATE_RE.fullmatch(day): 64 return "", 404 65 66 title = format_date(day) 67 68 return render_template("app.html", title=title) 69 70 71@transcripts_bp.route("/api/ranges/<day>") 72def transcript_ranges(day: str) -> Any: 73 """Return available transcript ranges for a day.""" 74 if not DATE_RE.fullmatch(day): 75 return "", 404 76 77 audio_ranges, screen_ranges = cluster_scan(day) 78 return jsonify({"audio": audio_ranges, "screen": screen_ranges}) 79 80 81@transcripts_bp.route("/api/segments/<day>") 82def transcript_segments(day: str) -> Any: 83 """Return individual recording segments for a day. 84 85 Returns list of segments with their content types for the segment selector UI. 86 """ 87 if not DATE_RE.fullmatch(day): 88 return "", 404 89 90 segments = cluster_segments(day) 91 return jsonify({"segments": segments}) 92 93 94@transcripts_bp.route("/api/serve_file/<day>/<path:encoded_path>") 95def serve_file(day: str, encoded_path: str) -> Any: 96 """Serve actual media files for embedding.""" 97 if not DATE_RE.fullmatch(day): 98 return "", 404 99 100 try: 101 rel_path = encoded_path.replace("__", "/") 102 full_path = os.path.join(state.journal_root, day, rel_path) 103 104 day_dir = str(day_path(day)) 105 if not os.path.commonpath([full_path, day_dir]) == day_dir: 106 return "", 403 107 108 if not os.path.isfile(full_path): 109 return "", 404 110 111 return send_file(full_path) 112 113 except Exception: 114 return "", 404 115 116 117@transcripts_bp.route("/api/stats/<month>") 118def api_stats(month: str): 119 """Return transcript range counts for each day in a specific month. 120 121 Args: 122 month: YYYYMM format month string 123 124 Returns: 125 JSON dict mapping day (YYYYMMDD) to transcript range count. 126 Transcripts app is not facet-aware, so returns simple {day: count} mapping. 127 """ 128 if not TIME_RE.fullmatch(month): 129 return jsonify({"error": "Invalid month format, expected YYYYMM"}), 400 130 131 stats: dict[str, int] = {} 132 133 for day_name in day_dirs().keys(): 134 if not day_name.startswith(month): 135 continue 136 137 audio_ranges, screen_ranges = cluster_scan(day_name) 138 total_ranges = len(audio_ranges) + len(screen_ranges) 139 if total_ranges > 0: 140 stats[day_name] = total_ranges 141 142 return jsonify(stats) 143 144 145def _load_jsonl(path: str) -> list[dict]: 146 """Load JSONL file and return list of entries.""" 147 import json 148 149 entries = [] 150 with open(path, "r") as f: 151 for line in f: 152 line = line.strip() 153 if line: 154 entries.append(json.loads(line)) 155 return entries 156 157 158def _format_time_from_offset(segment_key: str, offset_sec: float) -> str: 159 """Convert segment start + offset to HH:MM:SS format.""" 160 from think.utils import segment_parse 161 162 start_time, _ = segment_parse(segment_key) 163 if not start_time: 164 return "" 165 166 total_sec = start_time.hour * 3600 + start_time.minute * 60 + start_time.second 167 total_sec += int(offset_sec) 168 169 h = total_sec // 3600 170 m = (total_sec % 3600) // 60 171 s = total_sec % 60 172 return f"{h:02d}:{m:02d}:{s:02d}" 173 174 175@transcripts_bp.route("/api/segment/<day>/<stream>/<segment_key>") 176def segment_content(day: str, stream: str, segment_key: str) -> Any: 177 """Return unified timeline of audio and screen entries for a segment. 178 179 Uses format_audio() and format_screen() to get chunks with source data, 180 then merges chronologically for unified display. 181 182 Returns JSON with: 183 - chunks: List of entries sorted by timestamp, each with: 184 - type: "audio" or "screen" 185 - time: formatted wall-clock time (HH:MM:SS) 186 - timestamp: unix ms for ordering 187 - markdown: formatted content 188 - source_ref: key fields from source for media lookup 189 - audio_file: URL to segment audio file (if exists) 190 - video_files: dict mapping jsonl filename to video URL for client-side decoding 191 - segment_key: segment directory name 192 - cost: processing cost in USD (float, 0.0 if no data) 193 - media_sizes: dict with audio/screen byte counts for raw media files 194 """ 195 if not DATE_RE.fullmatch(day): 196 return "", 404 197 198 if not validate_segment_key(segment_key): 199 return "", 404 200 201 segment_dir = str(segment_path(day, segment_key, stream)) 202 if not os.path.isdir(segment_dir): 203 return "", 404 204 205 chunks: list[dict] = [] 206 audio_file_url = None 207 video_files: dict[str, str] = {} # jsonl filename -> video URL 208 media_sizes: dict[str, int] = {"audio": 0, "screen": 0} 209 has_raw_reference = False 210 has_raw_file = False 211 212 # Load speaker labels if available. 213 speaker_labels_path = Path(segment_dir) / "agents" / "speaker_labels.json" 214 speaker_map: dict[int, dict] = {} 215 if speaker_labels_path.is_file(): 216 try: 217 with open(speaker_labels_path) as f: 218 labels_data = json.load(f) 219 principal = get_journal_principal() 220 principal_id = principal["id"] if principal else None 221 entity_cache: dict[str, dict | None] = {} 222 for label in labels_data.get("labels", []): 223 sid = label.get("sentence_id") 224 entity_id = label.get("speaker") 225 confidence = label.get("confidence") 226 if sid is None or not entity_id or not confidence: 227 continue 228 if entity_id not in entity_cache: 229 entity_cache[entity_id] = load_journal_entity(entity_id) 230 entity = entity_cache[entity_id] 231 name = entity["name"] if entity else entity_id 232 is_owner = entity_id == principal_id 233 speaker_map[sid] = { 234 "name": name, 235 "entity_id": entity_id, 236 "confidence": confidence, 237 "is_owner": is_owner, 238 } 239 except (json.JSONDecodeError, OSError, KeyError): 240 pass 241 242 # Process audio files 243 audio_files = glob(os.path.join(segment_dir, "*audio.jsonl")) 244 for audio_path in sorted(audio_files): 245 try: 246 entries = _load_jsonl(audio_path) 247 formatted_chunks, meta = format_audio(entries, {"file_path": audio_path}) 248 249 # Build sentence_id mapping (1-based over transcript entries only). 250 entry_to_sid: dict[int, int] = {} 251 sid = 0 252 for entry in entries: 253 if "start" in entry: 254 sid += 1 255 entry_to_sid[id(entry)] = sid 256 257 # Find the raw audio file from metadata (first entry without "start") 258 raw_audio = None 259 for entry in entries: 260 if "start" not in entry and "raw" in entry: 261 raw_audio = entry["raw"] 262 break 263 264 # Validate raw points to an audio file (skip if not) 265 if raw_audio and raw_audio.endswith(AUDIO_EXTENSIONS): 266 has_raw_reference = True 267 audio_full = os.path.join(segment_dir, raw_audio) 268 if os.path.isfile(audio_full): 269 has_raw_file = True 270 rel_path = f"{stream}/{segment_key}/{raw_audio}" 271 audio_file_url = f"/app/transcripts/api/serve_file/{day}/{rel_path.replace('/', '__')}" 272 media_sizes["audio"] += os.path.getsize(audio_full) 273 274 for chunk in formatted_chunks: 275 source = chunk.get("source", {}) 276 # Audio has start time in HH:MM:SS format 277 time_str = source.get("start", "") 278 markdown = chunk.get("markdown", "") 279 280 chunk_sid = entry_to_sid.get(id(source)) 281 speaker_label = speaker_map.get(chunk_sid) if chunk_sid else None 282 if speaker_label: 283 markdown = re.sub(r"Speaker \d+:\s*", "", markdown) 284 285 chunk_data: dict[str, Any] = { 286 "type": "audio", 287 "time": time_str, 288 "timestamp": chunk.get("timestamp", 0), 289 "markdown": markdown, 290 "source_ref": { 291 "start": time_str, 292 "source": source.get("source"), 293 "speaker": source.get("speaker"), 294 }, 295 } 296 if speaker_label: 297 chunk_data["speaker_label"] = speaker_label 298 chunks.append(chunk_data) 299 except Exception: 300 continue 301 302 # Process screen files and collect video URLs for client-side decoding 303 screen_files = glob(os.path.join(segment_dir, "*screen.jsonl")) 304 for screen_path in sorted(screen_files): 305 try: 306 entries = _load_jsonl(screen_path) 307 formatted_chunks, meta = format_screen(entries, {"file_path": screen_path}) 308 309 filename = os.path.basename(screen_path) 310 monitor = ( 311 filename.replace("_screen.jsonl", "") 312 if filename != "screen.jsonl" 313 else "" 314 ) 315 316 # Extract video URL from header (first entry without frame_id) 317 raw_video = None 318 for entry in entries: 319 if "frame_id" not in entry and "raw" in entry: 320 raw_video = entry["raw"] 321 break 322 323 # Validate raw points to a video file (skip if not, e.g. tmux) 324 if raw_video and raw_video.endswith(VIDEO_EXTENSIONS): 325 has_raw_reference = True 326 video_full = os.path.join(segment_dir, raw_video) 327 if os.path.isfile(video_full): 328 has_raw_file = True 329 rel_path = f"{stream}/{segment_key}/{raw_video}" 330 video_files[filename] = ( 331 f"/app/transcripts/api/serve_file/{day}/{rel_path.replace('/', '__')}" 332 ) 333 media_sizes["screen"] += os.path.getsize(video_full) 334 335 for chunk in formatted_chunks: 336 source = chunk.get("source", {}) 337 frame_id = source.get("frame_id") 338 offset = source.get("timestamp", 0) 339 340 # Calculate wall-clock time from segment start + offset 341 time_str = _format_time_from_offset(segment_key, offset) 342 343 # Basic frames have no enriched content 344 frame_content = source.get("content", {}) 345 is_basic = not frame_content 346 347 # Extract participant boxes for meeting frames 348 participants = [] 349 meeting_data = frame_content.get("meeting") 350 if meeting_data: 351 for p in meeting_data.get("participants", []): 352 box = p.get("box_2d") 353 # Only include participants with video and valid box_2d 354 if p.get("video") and box and len(box) == 4: 355 y_min, x_min, y_max, x_max = box 356 participants.append( 357 { 358 "name": p.get("name", "Unknown"), 359 "status": p.get("status", "unknown"), 360 "top": y_min / 10, 361 "left": x_min / 10, 362 "height": (y_max - y_min) / 10, 363 "width": (x_max - x_min) / 10, 364 } 365 ) 366 367 # Include box_2d for client-side bounding box drawing 368 box_2d = source.get("box_2d") 369 370 chunks.append( 371 { 372 "type": "screen", 373 "time": time_str, 374 "timestamp": chunk.get("timestamp", 0), 375 "markdown": chunk.get("markdown", ""), 376 "source_ref": { 377 "frame_id": frame_id, 378 "filename": filename, 379 "monitor": monitor, 380 "offset": offset, 381 "box_2d": box_2d, 382 "analysis": source.get("analysis"), 383 "participants": participants if participants else None, 384 "aruco": source.get("aruco"), 385 }, 386 "basic": is_basic, 387 } 388 ) 389 except Exception: 390 continue 391 392 # Sort all chunks by timestamp 393 chunks.sort(key=lambda c: c["timestamp"]) 394 media_purged = has_raw_reference and not has_raw_file 395 396 # Get cost data for this segment 397 cost_data = get_usage_cost(day, segment=segment_key) 398 399 # Collect agent .md files 400 md_files = {} 401 agents_dir = Path(segment_dir) / "agents" 402 if agents_dir.is_dir(): 403 for md_path in sorted(agents_dir.rglob("*.md")): 404 try: 405 key = md_path.relative_to(agents_dir).with_suffix("").as_posix() 406 md_files[key] = md_path.read_text() 407 except Exception: 408 continue 409 410 return jsonify( 411 { 412 "chunks": chunks, 413 "audio_file": audio_file_url, 414 "video_files": video_files, 415 "md_files": md_files, 416 "segment_key": segment_key, 417 "cost": cost_data["cost"], 418 "media_sizes": media_sizes, 419 "media_purged": media_purged, 420 } 421 ) 422 423 424@transcripts_bp.route("/api/segment/<day>/<stream>/<segment_key>", methods=["DELETE"]) 425def delete_segment(day: str, stream: str, segment_key: str) -> Any: 426 """Delete a segment directory and all its contents. 427 428 This permanently removes all audio files, screen recordings, transcripts, 429 and insights for the specified segment. This action cannot be undone. 430 431 Args: 432 day: Day in YYYYMMDD format 433 stream: Stream name 434 segment_key: Segment directory name (HHMMSS_LEN format) 435 436 Returns: 437 JSON success response or error response 438 """ 439 if not DATE_RE.fullmatch(day): 440 return error_response("Invalid day format", 400) 441 442 if not validate_segment_key(segment_key): 443 return error_response("Invalid segment key format", 400) 444 445 day_dir = str(day_path(day)) 446 segment_dir = str(segment_path(day, segment_key, stream)) 447 448 # Verify segment exists 449 if not os.path.isdir(segment_dir): 450 return error_response("Segment not found", 404) 451 452 # Security check: ensure segment_dir is within day_dir 453 if not os.path.commonpath([segment_dir, day_dir]) == day_dir: 454 return error_response("Invalid segment path", 403) 455 456 try: 457 # Remove the entire segment directory 458 shutil.rmtree(segment_dir) 459 460 # Log the deletion for audit trail 461 log_app_action( 462 app="transcripts", 463 facet=None, # Transcripts are not facet-scoped 464 action="segment_delete", 465 params={"day": day, "segment_key": segment_key}, 466 day=day, 467 ) 468 469 # Trigger indexer rescan to remove deleted segment from search index 470 # Supervisor queues by command name, serializing concurrent indexer requests 471 emit( 472 "supervisor", 473 "request", 474 cmd=["sol", "indexer", "--rescan-full"], 475 ) 476 477 return success_response({"deleted": segment_key}) 478 479 except OSError as e: 480 return error_response(f"Failed to delete segment: {e}", 500)