personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Transcript viewer app - browse and playback daily transcripts."""
5
6from __future__ import annotations
7
8import json
9import os
10import re
11import shutil
12from datetime import date
13from glob import glob
14from pathlib import Path
15from typing import Any
16
17from flask import (
18 Blueprint,
19 jsonify,
20 redirect,
21 render_template,
22 send_file,
23 url_for,
24)
25
26from apps.utils import log_app_action
27from convey import emit, state
28from convey.utils import DATE_RE, error_response, format_date, success_response
29from observe.hear import format_audio
30from observe.screen import format_screen
31from observe.utils import AUDIO_EXTENSIONS, VIDEO_EXTENSIONS
32from think.cluster import cluster_scan, cluster_segments
33from think.entities.journal import get_journal_principal, load_journal_entity
34from think.models import get_usage_cost
35from think.utils import day_dirs, day_path, segment_path
36from think.utils import segment_key as validate_segment_key
37
38# Regex for HHMMSS time format validation
39TIME_RE = re.compile(r"\d{6}")
40
41transcripts_bp = Blueprint(
42 "app:transcripts",
43 __name__,
44 url_prefix="/app/transcripts",
45)
46
47
48@transcripts_bp.route("/")
49def index() -> Any:
50 """Redirect to the most recent day with segments, falling back to today."""
51 today = date.today().strftime("%Y%m%d")
52 if cluster_segments(today):
53 return redirect(url_for("app:transcripts.transcripts_day", day=today))
54 for day in sorted(day_dirs().keys(), reverse=True):
55 if cluster_segments(day):
56 return redirect(url_for("app:transcripts.transcripts_day", day=day))
57 return redirect(url_for("app:transcripts.transcripts_day", day=today))
58
59
60@transcripts_bp.route("/<day>")
61def transcripts_day(day: str) -> str:
62 """Render transcript viewer for a specific day."""
63 if not DATE_RE.fullmatch(day):
64 return "", 404
65
66 title = format_date(day)
67
68 return render_template("app.html", title=title)
69
70
71@transcripts_bp.route("/api/ranges/<day>")
72def transcript_ranges(day: str) -> Any:
73 """Return available transcript ranges for a day."""
74 if not DATE_RE.fullmatch(day):
75 return "", 404
76
77 audio_ranges, screen_ranges = cluster_scan(day)
78 return jsonify({"audio": audio_ranges, "screen": screen_ranges})
79
80
81@transcripts_bp.route("/api/segments/<day>")
82def transcript_segments(day: str) -> Any:
83 """Return individual recording segments for a day.
84
85 Returns list of segments with their content types for the segment selector UI.
86 """
87 if not DATE_RE.fullmatch(day):
88 return "", 404
89
90 segments = cluster_segments(day)
91 return jsonify({"segments": segments})
92
93
94@transcripts_bp.route("/api/serve_file/<day>/<path:encoded_path>")
95def serve_file(day: str, encoded_path: str) -> Any:
96 """Serve actual media files for embedding."""
97 if not DATE_RE.fullmatch(day):
98 return "", 404
99
100 try:
101 rel_path = encoded_path.replace("__", "/")
102 full_path = os.path.join(state.journal_root, day, rel_path)
103
104 day_dir = str(day_path(day))
105 if not os.path.commonpath([full_path, day_dir]) == day_dir:
106 return "", 403
107
108 if not os.path.isfile(full_path):
109 return "", 404
110
111 return send_file(full_path)
112
113 except Exception:
114 return "", 404
115
116
117@transcripts_bp.route("/api/stats/<month>")
118def api_stats(month: str):
119 """Return transcript range counts for each day in a specific month.
120
121 Args:
122 month: YYYYMM format month string
123
124 Returns:
125 JSON dict mapping day (YYYYMMDD) to transcript range count.
126 Transcripts app is not facet-aware, so returns simple {day: count} mapping.
127 """
128 if not TIME_RE.fullmatch(month):
129 return jsonify({"error": "Invalid month format, expected YYYYMM"}), 400
130
131 stats: dict[str, int] = {}
132
133 for day_name in day_dirs().keys():
134 if not day_name.startswith(month):
135 continue
136
137 audio_ranges, screen_ranges = cluster_scan(day_name)
138 total_ranges = len(audio_ranges) + len(screen_ranges)
139 if total_ranges > 0:
140 stats[day_name] = total_ranges
141
142 return jsonify(stats)
143
144
145def _load_jsonl(path: str) -> list[dict]:
146 """Load JSONL file and return list of entries."""
147 import json
148
149 entries = []
150 with open(path, "r") as f:
151 for line in f:
152 line = line.strip()
153 if line:
154 entries.append(json.loads(line))
155 return entries
156
157
158def _format_time_from_offset(segment_key: str, offset_sec: float) -> str:
159 """Convert segment start + offset to HH:MM:SS format."""
160 from think.utils import segment_parse
161
162 start_time, _ = segment_parse(segment_key)
163 if not start_time:
164 return ""
165
166 total_sec = start_time.hour * 3600 + start_time.minute * 60 + start_time.second
167 total_sec += int(offset_sec)
168
169 h = total_sec // 3600
170 m = (total_sec % 3600) // 60
171 s = total_sec % 60
172 return f"{h:02d}:{m:02d}:{s:02d}"
173
174
175@transcripts_bp.route("/api/segment/<day>/<stream>/<segment_key>")
176def segment_content(day: str, stream: str, segment_key: str) -> Any:
177 """Return unified timeline of audio and screen entries for a segment.
178
179 Uses format_audio() and format_screen() to get chunks with source data,
180 then merges chronologically for unified display.
181
182 Returns JSON with:
183 - chunks: List of entries sorted by timestamp, each with:
184 - type: "audio" or "screen"
185 - time: formatted wall-clock time (HH:MM:SS)
186 - timestamp: unix ms for ordering
187 - markdown: formatted content
188 - source_ref: key fields from source for media lookup
189 - audio_file: URL to segment audio file (if exists)
190 - video_files: dict mapping jsonl filename to video URL for client-side decoding
191 - segment_key: segment directory name
192 - cost: processing cost in USD (float, 0.0 if no data)
193 - media_sizes: dict with audio/screen byte counts for raw media files
194 """
195 if not DATE_RE.fullmatch(day):
196 return "", 404
197
198 if not validate_segment_key(segment_key):
199 return "", 404
200
201 segment_dir = str(segment_path(day, segment_key, stream))
202 if not os.path.isdir(segment_dir):
203 return "", 404
204
205 chunks: list[dict] = []
206 audio_file_url = None
207 video_files: dict[str, str] = {} # jsonl filename -> video URL
208 media_sizes: dict[str, int] = {"audio": 0, "screen": 0}
209 has_raw_reference = False
210 has_raw_file = False
211
212 # Load speaker labels if available.
213 speaker_labels_path = Path(segment_dir) / "agents" / "speaker_labels.json"
214 speaker_map: dict[int, dict] = {}
215 if speaker_labels_path.is_file():
216 try:
217 with open(speaker_labels_path) as f:
218 labels_data = json.load(f)
219 principal = get_journal_principal()
220 principal_id = principal["id"] if principal else None
221 entity_cache: dict[str, dict | None] = {}
222 for label in labels_data.get("labels", []):
223 sid = label.get("sentence_id")
224 entity_id = label.get("speaker")
225 confidence = label.get("confidence")
226 if sid is None or not entity_id or not confidence:
227 continue
228 if entity_id not in entity_cache:
229 entity_cache[entity_id] = load_journal_entity(entity_id)
230 entity = entity_cache[entity_id]
231 name = entity["name"] if entity else entity_id
232 is_owner = entity_id == principal_id
233 speaker_map[sid] = {
234 "name": name,
235 "entity_id": entity_id,
236 "confidence": confidence,
237 "is_owner": is_owner,
238 }
239 except (json.JSONDecodeError, OSError, KeyError):
240 pass
241
242 # Process audio files
243 audio_files = glob(os.path.join(segment_dir, "*audio.jsonl"))
244 for audio_path in sorted(audio_files):
245 try:
246 entries = _load_jsonl(audio_path)
247 formatted_chunks, meta = format_audio(entries, {"file_path": audio_path})
248
249 # Build sentence_id mapping (1-based over transcript entries only).
250 entry_to_sid: dict[int, int] = {}
251 sid = 0
252 for entry in entries:
253 if "start" in entry:
254 sid += 1
255 entry_to_sid[id(entry)] = sid
256
257 # Find the raw audio file from metadata (first entry without "start")
258 raw_audio = None
259 for entry in entries:
260 if "start" not in entry and "raw" in entry:
261 raw_audio = entry["raw"]
262 break
263
264 # Validate raw points to an audio file (skip if not)
265 if raw_audio and raw_audio.endswith(AUDIO_EXTENSIONS):
266 has_raw_reference = True
267 audio_full = os.path.join(segment_dir, raw_audio)
268 if os.path.isfile(audio_full):
269 has_raw_file = True
270 rel_path = f"{stream}/{segment_key}/{raw_audio}"
271 audio_file_url = f"/app/transcripts/api/serve_file/{day}/{rel_path.replace('/', '__')}"
272 media_sizes["audio"] += os.path.getsize(audio_full)
273
274 for chunk in formatted_chunks:
275 source = chunk.get("source", {})
276 # Audio has start time in HH:MM:SS format
277 time_str = source.get("start", "")
278 markdown = chunk.get("markdown", "")
279
280 chunk_sid = entry_to_sid.get(id(source))
281 speaker_label = speaker_map.get(chunk_sid) if chunk_sid else None
282 if speaker_label:
283 markdown = re.sub(r"Speaker \d+:\s*", "", markdown)
284
285 chunk_data: dict[str, Any] = {
286 "type": "audio",
287 "time": time_str,
288 "timestamp": chunk.get("timestamp", 0),
289 "markdown": markdown,
290 "source_ref": {
291 "start": time_str,
292 "source": source.get("source"),
293 "speaker": source.get("speaker"),
294 },
295 }
296 if speaker_label:
297 chunk_data["speaker_label"] = speaker_label
298 chunks.append(chunk_data)
299 except Exception:
300 continue
301
302 # Process screen files and collect video URLs for client-side decoding
303 screen_files = glob(os.path.join(segment_dir, "*screen.jsonl"))
304 for screen_path in sorted(screen_files):
305 try:
306 entries = _load_jsonl(screen_path)
307 formatted_chunks, meta = format_screen(entries, {"file_path": screen_path})
308
309 filename = os.path.basename(screen_path)
310 monitor = (
311 filename.replace("_screen.jsonl", "")
312 if filename != "screen.jsonl"
313 else ""
314 )
315
316 # Extract video URL from header (first entry without frame_id)
317 raw_video = None
318 for entry in entries:
319 if "frame_id" not in entry and "raw" in entry:
320 raw_video = entry["raw"]
321 break
322
323 # Validate raw points to a video file (skip if not, e.g. tmux)
324 if raw_video and raw_video.endswith(VIDEO_EXTENSIONS):
325 has_raw_reference = True
326 video_full = os.path.join(segment_dir, raw_video)
327 if os.path.isfile(video_full):
328 has_raw_file = True
329 rel_path = f"{stream}/{segment_key}/{raw_video}"
330 video_files[filename] = (
331 f"/app/transcripts/api/serve_file/{day}/{rel_path.replace('/', '__')}"
332 )
333 media_sizes["screen"] += os.path.getsize(video_full)
334
335 for chunk in formatted_chunks:
336 source = chunk.get("source", {})
337 frame_id = source.get("frame_id")
338 offset = source.get("timestamp", 0)
339
340 # Calculate wall-clock time from segment start + offset
341 time_str = _format_time_from_offset(segment_key, offset)
342
343 # Basic frames have no enriched content
344 frame_content = source.get("content", {})
345 is_basic = not frame_content
346
347 # Extract participant boxes for meeting frames
348 participants = []
349 meeting_data = frame_content.get("meeting")
350 if meeting_data:
351 for p in meeting_data.get("participants", []):
352 box = p.get("box_2d")
353 # Only include participants with video and valid box_2d
354 if p.get("video") and box and len(box) == 4:
355 y_min, x_min, y_max, x_max = box
356 participants.append(
357 {
358 "name": p.get("name", "Unknown"),
359 "status": p.get("status", "unknown"),
360 "top": y_min / 10,
361 "left": x_min / 10,
362 "height": (y_max - y_min) / 10,
363 "width": (x_max - x_min) / 10,
364 }
365 )
366
367 # Include box_2d for client-side bounding box drawing
368 box_2d = source.get("box_2d")
369
370 chunks.append(
371 {
372 "type": "screen",
373 "time": time_str,
374 "timestamp": chunk.get("timestamp", 0),
375 "markdown": chunk.get("markdown", ""),
376 "source_ref": {
377 "frame_id": frame_id,
378 "filename": filename,
379 "monitor": monitor,
380 "offset": offset,
381 "box_2d": box_2d,
382 "analysis": source.get("analysis"),
383 "participants": participants if participants else None,
384 "aruco": source.get("aruco"),
385 },
386 "basic": is_basic,
387 }
388 )
389 except Exception:
390 continue
391
392 # Sort all chunks by timestamp
393 chunks.sort(key=lambda c: c["timestamp"])
394 media_purged = has_raw_reference and not has_raw_file
395
396 # Get cost data for this segment
397 cost_data = get_usage_cost(day, segment=segment_key)
398
399 # Collect agent .md files
400 md_files = {}
401 agents_dir = Path(segment_dir) / "agents"
402 if agents_dir.is_dir():
403 for md_path in sorted(agents_dir.rglob("*.md")):
404 try:
405 key = md_path.relative_to(agents_dir).with_suffix("").as_posix()
406 md_files[key] = md_path.read_text()
407 except Exception:
408 continue
409
410 return jsonify(
411 {
412 "chunks": chunks,
413 "audio_file": audio_file_url,
414 "video_files": video_files,
415 "md_files": md_files,
416 "segment_key": segment_key,
417 "cost": cost_data["cost"],
418 "media_sizes": media_sizes,
419 "media_purged": media_purged,
420 }
421 )
422
423
424@transcripts_bp.route("/api/segment/<day>/<stream>/<segment_key>", methods=["DELETE"])
425def delete_segment(day: str, stream: str, segment_key: str) -> Any:
426 """Delete a segment directory and all its contents.
427
428 This permanently removes all audio files, screen recordings, transcripts,
429 and insights for the specified segment. This action cannot be undone.
430
431 Args:
432 day: Day in YYYYMMDD format
433 stream: Stream name
434 segment_key: Segment directory name (HHMMSS_LEN format)
435
436 Returns:
437 JSON success response or error response
438 """
439 if not DATE_RE.fullmatch(day):
440 return error_response("Invalid day format", 400)
441
442 if not validate_segment_key(segment_key):
443 return error_response("Invalid segment key format", 400)
444
445 day_dir = str(day_path(day))
446 segment_dir = str(segment_path(day, segment_key, stream))
447
448 # Verify segment exists
449 if not os.path.isdir(segment_dir):
450 return error_response("Segment not found", 404)
451
452 # Security check: ensure segment_dir is within day_dir
453 if not os.path.commonpath([segment_dir, day_dir]) == day_dir:
454 return error_response("Invalid segment path", 403)
455
456 try:
457 # Remove the entire segment directory
458 shutil.rmtree(segment_dir)
459
460 # Log the deletion for audit trail
461 log_app_action(
462 app="transcripts",
463 facet=None, # Transcripts are not facet-scoped
464 action="segment_delete",
465 params={"day": day, "segment_key": segment_key},
466 day=day,
467 )
468
469 # Trigger indexer rescan to remove deleted segment from search index
470 # Supervisor queues by command name, serializing concurrent indexer requests
471 emit(
472 "supervisor",
473 "request",
474 cmd=["sol", "indexer", "--rescan-full"],
475 )
476
477 return success_response({"deleted": segment_key})
478
479 except OSError as e:
480 return error_response(f"Failed to delete segment: {e}", 500)