personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Speaker ID agent-CLI foundation: status command, gate removal, JSON output

VPE-1: Add `sol call speakers status` command — aggregates all speaker ID
state (embeddings, owner, speakers, clusters, imports, attribution) into
a single JSON dashboard. Supports `--section` for targeted reads.

VPE-4: Remove MIN_SEGMENTS hard gate from detect_owner_candidate(). The
function now always attempts detection and returns quality metrics (stream
diversity, cluster quality, recommendation) so the calling agent decides
whether data is sufficient. Return type changed from Optional[dict] to dict.

VPE-5: Add `--json` flag to bootstrap, resolve-names, backfill, and
discover commands for structured agent-consumable output.

Also includes VPE-3 (identify + merge-names commands) from concurrent session.

All 97 speaker tests + 2249 core tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+1047 -45
+84
apps/speakers/bootstrap.py
··· 287 287 return stats 288 288 289 289 290 + def merge_names(alias_name: str, canonical_name: str) -> dict[str, Any]: 291 + """Merge a speaker name variant into a canonical entity. 292 + 293 + Finds both entities by name, adds the alias name as an aka on the 294 + canonical entity, and merges voiceprint embeddings with deduplication. 295 + 296 + Args: 297 + alias_name: The alias/variant name to merge from 298 + canonical_name: The canonical/full name to merge into 299 + 300 + Returns: 301 + Dict with merge statistics or error 302 + """ 303 + _, normalize_embedding, _, load_entity_voiceprints_file = _routes_helpers() 304 + 305 + journal_entities = load_all_journal_entities() 306 + entities_list = [e for e in journal_entities.values() if not e.get("blocked")] 307 + 308 + alias_entity = find_matching_entity(alias_name, entities_list) 309 + if not alias_entity: 310 + return {"error": f"No entity found for alias: {alias_name}"} 311 + 312 + canonical_entity = find_matching_entity(canonical_name, entities_list) 313 + if not canonical_entity: 314 + return {"error": f"No entity found for canonical: {canonical_name}"} 315 + 316 + alias_id = alias_entity["id"] 317 + canonical_id = canonical_entity["id"] 318 + 319 + if alias_id == canonical_id: 320 + return {"error": "Alias and canonical resolve to the same entity."} 321 + 322 + # Add alias name as aka on canonical entity 323 + canonical = load_journal_entity(canonical_id) 324 + if not canonical: 325 + return {"error": f"Failed to load canonical entity: {canonical_id}"} 326 + 327 + alias_display = alias_entity.get("name", alias_name) 328 + existing_aka = set(canonical.get("aka", [])) 329 + if alias_display not in existing_aka: 330 + existing_aka.add(alias_display) 331 + canonical["aka"] = sorted(existing_aka) 332 + canonical["updated_at"] = now_ms() 333 + save_journal_entity(canonical) 334 + 335 + # Merge voiceprint embeddings 336 + alias_vp = load_entity_voiceprints_file(alias_id) 337 + embeddings_merged = 0 338 + 339 + if alias_vp is not None: 340 + alias_embeddings, alias_metadata = alias_vp 341 + existing_keys = _load_existing_voiceprint_keys(canonical_id) 342 + 343 + new_items: list[tuple[np.ndarray, dict]] = [] 344 + for emb, meta in zip(alias_embeddings, alias_metadata): 345 + key = ( 346 + meta.get("day"), 347 + meta.get("segment_key"), 348 + meta.get("source"), 349 + meta.get("sentence_id"), 350 + ) 351 + if key in existing_keys: 352 + continue 353 + normalized = normalize_embedding(emb) 354 + if normalized is not None: 355 + new_items.append((normalized, meta)) 356 + existing_keys.add(key) 357 + 358 + if new_items: 359 + embeddings_merged = _save_voiceprints_batch(canonical_id, new_items) 360 + 361 + canonical_vp = load_entity_voiceprints_file(canonical_id) 362 + total = len(canonical_vp[0]) if canonical_vp else 0 363 + 364 + return { 365 + "merged": True, 366 + "alias": alias_display, 367 + "canonical_entity_id": canonical_id, 368 + "canonical_name": canonical.get("name", canonical_name), 369 + "embeddings_merged": embeddings_merged, 370 + "total_embeddings": total, 371 + } 372 + 373 + 290 374 def resolve_name_variants(dry_run: bool = False) -> dict[str, Any]: 291 375 """Find and merge speaker name variants using voiceprint similarity. 292 376
+137 -11
apps/speakers/call.py
··· 4 4 """CLI interface for speaker voiceprint management. 5 5 6 6 Provides: 7 - sol call speakers bootstrap [--dry-run] 8 - sol call speakers resolve-names [--dry-run] 7 + sol call speakers status [--section SECTION] 8 + sol call speakers bootstrap [--dry-run] [--json] 9 + sol call speakers resolve-names [--dry-run] [--json] 9 10 sol call speakers attribute-segment <day> <stream> <segment> 10 - sol call speakers discover 11 + sol call speakers backfill [--dry-run] [--json] 12 + sol call speakers discover [--json] 13 + sol call speakers identify <cluster_id> <name> [--entity-id ID] 14 + sol call speakers merge-names <alias> <canonical> 11 15 """ 12 16 13 17 from __future__ import annotations ··· 19 23 help="Speaker voiceprint management.", 20 24 no_args_is_help=True, 21 25 ) 26 + 27 + 28 + @app.command("status") 29 + def status( 30 + section: str = typer.Option( 31 + None, 32 + "--section", 33 + help="Return only one section: embeddings, owner, speakers, clusters, imports, attribution.", 34 + ), 35 + ) -> None: 36 + """Return the full speaker ID state model as JSON. 37 + 38 + Aggregates embedding coverage, owner centroid status, known speakers, 39 + candidate clusters, import signals, and attribution coverage into a 40 + single dashboard view. All data is read from disk — no new computations. 41 + """ 42 + import json 43 + 44 + from apps.speakers.status import get_status 45 + 46 + result = get_status(section=section) 47 + 48 + if "error" in result: 49 + typer.echo(json.dumps(result, indent=2), err=True) 50 + raise typer.Exit(1) 51 + 52 + typer.echo(json.dumps(result, indent=2)) 22 53 23 54 24 55 @app.command("bootstrap") ··· 26 57 dry_run: bool = typer.Option( 27 58 False, "--dry-run", help="Show what would be saved without saving." 28 59 ), 60 + json_output: bool = typer.Option( 61 + False, "--json", help="Output results as JSON." 62 + ), 29 63 ) -> None: 30 64 """Bootstrap voiceprints from single-speaker segments. 31 65 ··· 34 68 speaker. Saves them as voiceprints using the owner centroid for 35 69 owner subtraction. 36 70 """ 71 + import json as json_mod 72 + 37 73 from apps.speakers.bootstrap import bootstrap_voiceprints 38 74 39 - if dry_run: 75 + if dry_run and not json_output: 40 76 typer.echo("DRY RUN — no voiceprints will be saved\n") 41 77 42 - typer.echo("Bootstrapping voiceprints from single-speaker segments...") 78 + if not json_output: 79 + typer.echo("Bootstrapping voiceprints from single-speaker segments...") 80 + 43 81 stats = bootstrap_voiceprints(dry_run=dry_run) 44 82 83 + if json_output: 84 + typer.echo(json_mod.dumps(stats, indent=2)) 85 + if "error" in stats: 86 + raise typer.Exit(1) 87 + return 88 + 45 89 if "error" in stats: 46 90 typer.echo(f"Error: {stats['error']}", err=True) 47 91 raise typer.Exit(1) ··· 77 121 dry_run: bool = typer.Option( 78 122 False, "--dry-run", help="Show merges without applying them." 79 123 ), 124 + json_output: bool = typer.Option( 125 + False, "--json", help="Output results as JSON." 126 + ), 80 127 ) -> None: 81 128 """Resolve speaker name variants using voiceprint similarity. 82 129 ··· 85 132 (short name is first word of full name) are auto-merged by adding the 86 133 short name as an aka on the canonical entity. 87 134 """ 135 + import json as json_mod 136 + 88 137 from apps.speakers.bootstrap import resolve_name_variants 89 138 90 - if dry_run: 139 + if dry_run and not json_output: 91 140 typer.echo("DRY RUN — no merges will be applied\n") 92 141 93 - typer.echo("Resolving speaker name variants...") 142 + if not json_output: 143 + typer.echo("Resolving speaker name variants...") 144 + 94 145 stats = resolve_name_variants(dry_run=dry_run) 146 + 147 + if json_output: 148 + typer.echo(json_mod.dumps(stats, indent=2)) 149 + return 95 150 96 151 typer.echo(f"\nEntities with voiceprints: {stats['entities_with_voiceprints']}") 97 152 typer.echo(f"Pairs compared: {stats['pairs_compared']}") ··· 196 251 dry_run: bool = typer.Option( 197 252 False, "--dry-run", help="Enumerate segments without processing." 198 253 ), 254 + json_output: bool = typer.Option( 255 + False, "--json", help="Output results as JSON." 256 + ), 199 257 ) -> None: 200 258 """Run speaker attribution across all segments with embeddings. 201 259 202 260 Processes segments oldest-first for progressive voiceprint building. 203 261 Skips segments that already have speaker_labels.json (safe to re-run). 204 262 """ 263 + import json as json_mod 205 264 import time 206 265 207 266 from apps.speakers.attribution import backfill_segments 208 267 209 - if dry_run: 268 + if dry_run and not json_output: 210 269 typer.echo("DRY RUN — no labels will be written\n") 211 270 212 - typer.echo("Scanning journal for segments with embeddings...") 271 + if not json_output: 272 + typer.echo("Scanning journal for segments with embeddings...") 213 273 214 274 start = time.monotonic() 215 275 last_day = "" ··· 227 287 228 288 stats = backfill_segments( 229 289 dry_run=dry_run, 230 - progress_callback=None if dry_run else on_progress, 290 + progress_callback=None if (dry_run or json_output) else on_progress, 231 291 ) 232 292 233 293 elapsed = time.monotonic() - start 234 294 295 + if json_output: 296 + stats["elapsed_seconds"] = round(elapsed, 1) 297 + typer.echo(json_mod.dumps(stats, indent=2)) 298 + return 299 + 235 300 typer.echo("\n") 236 301 typer.echo(f"Total segments scanned: {stats['total_segments']}") 237 302 typer.echo(f"With embeddings: {stats['total_eligible']}") ··· 258 323 259 324 260 325 @app.command() 261 - def discover() -> None: 326 + def discover( 327 + json_output: bool = typer.Option( 328 + False, "--json", help="Output results as JSON." 329 + ), 330 + ) -> None: 262 331 """Discover recurring unknown speakers across segments.""" 332 + import json as json_mod 333 + 263 334 from apps.speakers.discovery import discover_unknown_speakers 264 335 265 336 result = discover_unknown_speakers() 337 + 338 + if json_output: 339 + typer.echo(json_mod.dumps(result, indent=2)) 340 + return 341 + 266 342 clusters = result.get("clusters", []) 267 343 268 344 if not clusters: ··· 282 358 f"sid={sample['sentence_id']}: {text_preview}" 283 359 ) 284 360 typer.echo() 361 + 362 + 363 + @app.command() 364 + def identify( 365 + cluster_id: int = typer.Argument(..., help="Cluster ID from discovery output."), 366 + name: str = typer.Argument(..., help="Speaker name to assign."), 367 + entity_id: str | None = typer.Option( 368 + None, "--entity-id", help="Link to existing entity instead of name matching." 369 + ), 370 + ) -> None: 371 + """Name an unknown speaker cluster from discovery. 372 + 373 + Creates or matches a speaker entity and saves the cluster's embeddings 374 + as voiceprints. Updates speaker labels in all affected segments. 375 + Returns JSON. 376 + """ 377 + import json as json_mod 378 + 379 + from apps.speakers.discovery import identify_cluster 380 + 381 + result = identify_cluster(cluster_id, name, entity_id=entity_id) 382 + 383 + if "error" in result: 384 + typer.echo(json_mod.dumps({"error": result["error"]}, indent=2), err=True) 385 + raise typer.Exit(1) 386 + 387 + typer.echo(json_mod.dumps(result, indent=2)) 388 + 389 + 390 + @app.command("merge-names") 391 + def merge_names_cmd( 392 + alias: str = typer.Argument(..., help="Alias/variant name to merge from."), 393 + canonical: str = typer.Argument(..., help="Canonical/full name to merge into."), 394 + ) -> None: 395 + """Merge a speaker name variant into a canonical entity. 396 + 397 + Adds the alias as an aka on the canonical entity and merges voiceprint 398 + embeddings with deduplication. Returns JSON. 399 + """ 400 + import json as json_mod 401 + 402 + from apps.speakers.bootstrap import merge_names 403 + 404 + result = merge_names(alias, canonical) 405 + 406 + if "error" in result: 407 + typer.echo(json_mod.dumps({"error": result["error"]}, indent=2), err=True) 408 + raise typer.Exit(1) 409 + 410 + typer.echo(json_mod.dumps(result, indent=2))
+30 -15
apps/speakers/discovery.py
··· 311 311 return {"clusters": result_clusters} 312 312 313 313 314 - def identify_cluster(cluster_id: int, name: str) -> dict[str, Any]: 314 + def identify_cluster( 315 + cluster_id: int, name: str, entity_id: str | None = None 316 + ) -> dict[str, Any]: 315 317 """Identify a discovered unknown speaker cluster.""" 316 318 ( 317 319 load_embeddings_file, ··· 343 345 from think.entities.journal import ( 344 346 get_or_create_journal_entity, 345 347 load_all_journal_entities, 348 + load_journal_entity, 346 349 ) 347 350 348 - journal_entities = load_all_journal_entities() 349 - entities_list = [ 350 - entity for entity in journal_entities.values() if not entity.get("blocked") 351 - ] 351 + entity_created = False 352 352 353 - entity = find_matching_entity(name, entities_list) 354 - if entity: 355 - entity_id = entity["id"] 353 + if entity_id: 354 + # Direct entity ID — load it 355 + entity = load_journal_entity(entity_id) 356 + if not entity: 357 + return {"error": f"Entity '{entity_id}' not found."} 356 358 entity_name = entity.get("name", name) 357 359 else: 358 - entity_id = entity_slug(name) 359 - entity = get_or_create_journal_entity( 360 - entity_id=entity_id, 361 - name=name, 362 - entity_type="Person", 363 - ) 364 - entity_name = entity.get("name", name) 360 + journal_entities = load_all_journal_entities() 361 + entities_list = [ 362 + entity for entity in journal_entities.values() if not entity.get("blocked") 363 + ] 364 + 365 + entity = find_matching_entity(name, entities_list) 366 + if entity: 367 + entity_id = entity["id"] 368 + entity_name = entity.get("name", name) 369 + else: 370 + entity_id = entity_slug(name) 371 + existing = load_journal_entity(entity_id) 372 + entity_created = existing is None 373 + entity = get_or_create_journal_entity( 374 + entity_id=entity_id, 375 + name=name, 376 + entity_type="Person", 377 + ) 378 + entity_name = entity.get("name", name) 365 379 366 380 existing_keys = load_existing_voiceprint_keys(entity_id) 367 381 vp_batch: list[tuple[np.ndarray, dict[str, Any]]] = [] ··· 489 503 "status": "identified", 490 504 "entity_id": entity_id, 491 505 "entity_name": entity_name, 506 + "entity_created": entity_created, 492 507 "voiceprints_saved": voiceprints_saved, 493 508 "segments_updated": segments_updated, 494 509 "sentences_attributed": sentences_attributed,
+84 -12
apps/speakers/owner.py
··· 126 126 return sampled_embeddings, sampled_provenance 127 127 128 128 129 - def detect_owner_candidate() -> dict[str, Any] | None: 130 - """Detect a likely owner voice centroid from journal embeddings.""" 129 + def detect_owner_candidate() -> dict[str, Any]: 130 + """Detect a likely owner voice centroid from journal embeddings. 131 + 132 + Always attempts detection regardless of data volume. Returns quality 133 + metrics so the calling agent can decide whether the data is sufficient. 134 + """ 131 135 load_embeddings_file, normalize_embedding, scan_segment_embeddings = ( 132 136 _routes_helpers() 133 137 ) 134 138 135 139 segment_count = count_segments_with_embeddings() 136 - if segment_count < MIN_SEGMENTS: 137 - return None 138 140 139 141 embedding_chunks: list[np.ndarray] = [] 140 142 provenance: list[dict[str, Any]] = [] 143 + streams_seen: set[str] = set() 141 144 142 145 for day in day_dirs().keys(): 143 146 for segment in scan_segment_embeddings(day): 144 147 stream = segment["stream"] 145 148 segment_key = segment["key"] 146 149 segment_dir = segment_path(day, segment_key, stream) 150 + streams_seen.add(stream) 147 151 148 152 for source in segment["sources"]: 149 153 emb_data = load_embeddings_file(segment_dir / f"{source}.npz") ··· 166 170 for sid in statement_ids 167 171 ) 168 172 173 + total_embeddings = sum(len(c) for c in embedding_chunks) if embedding_chunks else 0 174 + stream_diversity = len(streams_seen) 175 + 169 176 if not embedding_chunks: 170 177 _mark_no_cluster(segment_count) 171 - return None 178 + return { 179 + "status": "insufficient_data", 180 + "reason": f"{segment_count} segments with embeddings, 0 embeddings found", 181 + "segments_available": segment_count, 182 + "embeddings_available": 0, 183 + "stream_diversity": stream_diversity, 184 + "recommendation": "need_more_data", 185 + } 172 186 173 187 embeddings_matrix = np.vstack(embedding_chunks) 174 188 embeddings_matrix, provenance = _subsample_embeddings(embeddings_matrix, provenance) 175 189 176 190 if len(embeddings_matrix) < 50: 177 191 _mark_no_cluster(segment_count) 178 - return None 192 + return { 193 + "status": "insufficient_data", 194 + "reason": ( 195 + f"{len(embeddings_matrix)} embeddings available, " 196 + f"recommend {MIN_SEGMENTS}+ segments for reliable detection" 197 + ), 198 + "segments_available": segment_count, 199 + "embeddings_available": total_embeddings, 200 + "stream_diversity": stream_diversity, 201 + "recommendation": ( 202 + "ready" if segment_count >= MIN_SEGMENTS and stream_diversity >= 3 203 + else "need_more_data" 204 + ), 205 + } 179 206 180 207 clusterer = HDBSCAN( 181 208 min_cluster_size=50, ··· 188 215 valid_labels = labels[labels != -1] 189 216 if len(valid_labels) == 0: 190 217 _mark_no_cluster(segment_count) 191 - return None 218 + return { 219 + "status": "no_cluster", 220 + "reason": "HDBSCAN found no clusters in the embedding space", 221 + "segments_available": segment_count, 222 + "embeddings_available": total_embeddings, 223 + "stream_diversity": stream_diversity, 224 + "recommendation": "need_more_data", 225 + } 192 226 193 227 largest_label = int(np.bincount(valid_labels).argmax()) 194 228 cluster_indices = np.flatnonzero(labels == largest_label) 195 229 if len(cluster_indices) == 0: 196 230 _mark_no_cluster(segment_count) 197 - return None 231 + return { 232 + "status": "no_cluster", 233 + "reason": "Largest cluster was empty after filtering", 234 + "segments_available": segment_count, 235 + "embeddings_available": total_embeddings, 236 + "stream_diversity": stream_diversity, 237 + "recommendation": "need_more_data", 238 + } 198 239 199 240 cluster_embeddings = embeddings_matrix[cluster_indices] 200 241 centroid = normalize_embedding(np.mean(cluster_embeddings, axis=0)) 201 242 if centroid is None: 202 243 _mark_no_cluster(segment_count) 203 - return None 244 + return { 245 + "status": "no_cluster", 246 + "reason": "Could not compute centroid for largest cluster", 247 + "segments_available": segment_count, 248 + "embeddings_available": total_embeddings, 249 + "stream_diversity": stream_diversity, 250 + "recommendation": "need_more_data", 251 + } 204 252 205 253 cluster_size = int(len(cluster_indices)) 206 254 similarities = np.dot(cluster_embeddings, centroid) 255 + mean_similarity = float(np.mean(similarities)) 207 256 sorted_cluster_positions = np.argsort(similarities)[::-1] 257 + 258 + # Compute stream diversity within the cluster 259 + cluster_streams = { 260 + provenance[int(cluster_indices[i])]["stream"] for i in range(len(cluster_indices)) 261 + } 208 262 209 263 samples: list[dict[str, Any]] = [] 210 264 seen_segments: set[tuple[str, str, str]] = set() ··· 218 272 samples.append( 219 273 { 220 274 **record, 275 + "similarity": round(float(similarities[position]), 4), 221 276 "audio_url": _audio_url( 222 277 record["day"], 223 278 record["stream"], ··· 234 289 record = provenance[int(cluster_indices[position])] 235 290 sample = { 236 291 **record, 292 + "similarity": round(float(similarities[position]), 4), 237 293 "audio_url": _audio_url( 238 294 record["day"], 239 295 record["stream"], ··· 266 322 }, 267 323 ) 268 324 325 + # Determine recommendation based on quality metrics 326 + if cluster_size >= 100 and len(cluster_streams) >= 3: 327 + recommendation = "strong_candidate" 328 + elif cluster_size >= 50 and len(cluster_streams) >= 2: 329 + recommendation = "good_candidate" 330 + else: 331 + recommendation = "weak_candidate" 332 + 269 333 return { 270 - "status": "candidate", 271 - "cluster_size": cluster_size, 272 - "samples": samples, 334 + "status": "candidate_found", 335 + "candidate": { 336 + "cluster_size": cluster_size, 337 + "mean_similarity": round(mean_similarity, 4), 338 + "threshold": OWNER_THRESHOLD, 339 + "streams": sorted(cluster_streams), 340 + "stream_diversity": len(cluster_streams), 341 + "sample_count": segment_count, 342 + "samples": samples, 343 + }, 344 + "recommendation": recommendation, 273 345 } 274 346 275 347
+10 -2
apps/speakers/routes.py
··· 1139 1139 def api_owner_detect() -> Any: 1140 1140 """Run owner voice candidate detection.""" 1141 1141 result = detect_owner_candidate() 1142 - if result is None: 1143 - return jsonify({"status": "none", "reason": "No valid cluster found"}) 1142 + # Map new structured statuses back to the web UI expectations 1143 + status = result.get("status") 1144 + if status == "candidate_found": 1145 + # Flatten for web UI compatibility 1146 + candidate = result.get("candidate", {}) 1147 + return jsonify({ 1148 + "status": "candidate", 1149 + "cluster_size": candidate.get("cluster_size"), 1150 + "samples": candidate.get("samples", []), 1151 + }) 1144 1152 return jsonify(result) 1145 1153 1146 1154
+426
apps/speakers/status.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Speaker ID status aggregation — read-only state inspection. 5 + 6 + Aggregates speaker identification state from disk into a structured 7 + JSON dashboard. No new computations — just reads existing files. 8 + """ 9 + 10 + from __future__ import annotations 11 + 12 + import json 13 + import logging 14 + from pathlib import Path 15 + from typing import Any 16 + 17 + import numpy as np 18 + 19 + from think.utils import day_dirs, day_path, get_journal, iter_segments, segment_path 20 + 21 + logger = logging.getLogger(__name__) 22 + 23 + VALID_SECTIONS = {"embeddings", "owner", "speakers", "clusters", "imports", "attribution"} 24 + 25 + 26 + def _routes_helpers(): 27 + """Load speakers route helpers lazily to avoid import cycles.""" 28 + from apps.speakers.routes import ( 29 + _load_embeddings_file, 30 + _load_entity_voiceprints_file, 31 + _load_speaker_labels, 32 + _normalize_embedding, 33 + _scan_segment_embeddings, 34 + ) 35 + 36 + return ( 37 + _load_embeddings_file, 38 + _load_entity_voiceprints_file, 39 + _load_speaker_labels, 40 + _normalize_embedding, 41 + _scan_segment_embeddings, 42 + ) 43 + 44 + 45 + def _has_audio_embeddings(seg_path: Path) -> bool: 46 + """Return True if the segment has audio embedding NPZ files.""" 47 + for p in seg_path.glob("*.npz"): 48 + if p.stem.endswith("_audio") or p.stem == "audio": 49 + return True 50 + return False 51 + 52 + 53 + def get_embeddings_status() -> dict[str, Any]: 54 + """Aggregate embedding coverage statistics.""" 55 + ( 56 + load_embeddings_file, 57 + _, 58 + _, 59 + _, 60 + scan_segment_embeddings, 61 + ) = _routes_helpers() 62 + 63 + total_segments = 0 64 + segments_with_embeddings = 0 65 + total_embeddings = 0 66 + streams: dict[str, dict[str, int]] = {} 67 + days_with: set[str] = set() 68 + 69 + for day_name in sorted(day_dirs().keys()): 70 + segments = list(iter_segments(day_name)) 71 + total_segments += len(segments) 72 + 73 + for segment in scan_segment_embeddings(day_name): 74 + segments_with_embeddings += 1 75 + stream = segment["stream"] 76 + seg_key = segment["key"] 77 + seg_dir = segment_path(day_name, seg_key, stream) 78 + 79 + if stream not in streams: 80 + streams[stream] = {"segments": 0, "embeddings": 0} 81 + streams[stream]["segments"] += 1 82 + days_with.add(day_name) 83 + 84 + for source in segment["sources"]: 85 + emb_data = load_embeddings_file(seg_dir / f"{source}.npz") 86 + if emb_data is not None: 87 + count = len(emb_data[0]) 88 + total_embeddings += count 89 + streams[stream]["embeddings"] += count 90 + 91 + sorted_days = sorted(days_with) if days_with else [] 92 + 93 + return { 94 + "total_segments": total_segments, 95 + "segments_with_embeddings": segments_with_embeddings, 96 + "total_embeddings": total_embeddings, 97 + "coverage_pct": round( 98 + 100.0 * segments_with_embeddings / total_segments, 1 99 + ) 100 + if total_segments > 0 101 + else 0.0, 102 + "date_range": [sorted_days[0], sorted_days[-1]] if sorted_days else [], 103 + "days_with_embeddings": len(days_with), 104 + "streams": dict(sorted(streams.items(), key=lambda x: x[1]["embeddings"], reverse=True)), 105 + } 106 + 107 + 108 + def get_owner_status() -> dict[str, Any]: 109 + """Aggregate owner centroid status.""" 110 + from apps.speakers.owner import load_owner_centroid 111 + from think.entities.journal import get_journal_principal, journal_entity_memory_path 112 + 113 + principal = get_journal_principal() 114 + if not principal: 115 + return {"exists": False} 116 + 117 + centroid_data = load_owner_centroid() 118 + if centroid_data is None: 119 + return {"exists": False} 120 + 121 + # Load full centroid metadata 122 + centroid_path = journal_entity_memory_path(principal["id"]) / "owner_centroid.npz" 123 + result: dict[str, Any] = {"exists": True} 124 + 125 + try: 126 + data = np.load(centroid_path, allow_pickle=False) 127 + cluster_size = data.get("cluster_size") 128 + threshold = data.get("threshold") 129 + version = data.get("version") 130 + 131 + if cluster_size is not None: 132 + result["cluster_size"] = int(np.asarray(cluster_size).item()) 133 + if threshold is not None: 134 + result["threshold"] = round(float(np.asarray(threshold).item()), 2) 135 + if version is not None: 136 + result["version"] = str(np.asarray(version).item()) 137 + except Exception: 138 + pass 139 + 140 + # Estimate coverage: count how many embeddings match the owner centroid 141 + _, centroid_threshold = centroid_data 142 + owner_centroid = centroid_data[0] 143 + 144 + ( 145 + load_embeddings_file, 146 + _, 147 + _, 148 + normalize_embedding, 149 + scan_segment_embeddings, 150 + ) = _routes_helpers() 151 + 152 + total_embeddings = 0 153 + owner_matches = 0 154 + streams_represented: set[str] = set() 155 + 156 + for day_name in day_dirs().keys(): 157 + for segment in scan_segment_embeddings(day_name): 158 + stream = segment["stream"] 159 + seg_dir = segment_path(day_name, segment["key"], stream) 160 + for source in segment["sources"]: 161 + emb_data = load_embeddings_file(seg_dir / f"{source}.npz") 162 + if emb_data is None: 163 + continue 164 + embeddings, _ = emb_data 165 + total_embeddings += len(embeddings) 166 + for emb in embeddings: 167 + normalized = normalize_embedding(emb) 168 + if normalized is not None: 169 + score = float(np.dot(normalized, owner_centroid)) 170 + if score >= centroid_threshold: 171 + owner_matches += 1 172 + streams_represented.add(stream) 173 + 174 + result["streams_represented"] = sorted(streams_represented) 175 + result["coverage_estimate_pct"] = ( 176 + round(100.0 * owner_matches / total_embeddings, 1) if total_embeddings > 0 else 0.0 177 + ) 178 + 179 + return result 180 + 181 + 182 + def get_speakers_status() -> dict[str, Any]: 183 + """Aggregate known speaker statistics.""" 184 + from think.entities.journal import load_all_journal_entities 185 + 186 + _, load_entity_voiceprints_file, _, _, _ = _routes_helpers() 187 + 188 + journal_entities = load_all_journal_entities() 189 + speakers: list[dict[str, Any]] = [] 190 + total_voiceprint_embeddings = 0 191 + 192 + for entity_id, entity in journal_entities.items(): 193 + if entity.get("blocked") or entity.get("is_principal"): 194 + continue 195 + 196 + result = load_entity_voiceprints_file(entity_id) 197 + if result is None: 198 + continue 199 + 200 + embeddings, metadata_list = result 201 + embedding_count = len(embeddings) 202 + if embedding_count == 0: 203 + continue 204 + 205 + total_voiceprint_embeddings += embedding_count 206 + 207 + # Count unique segments and streams 208 + segments: set[tuple[str, str]] = set() 209 + streams: set[str] = set() 210 + for m in metadata_list: 211 + day = m.get("day", "") 212 + seg_key = m.get("segment_key", "") 213 + stream = m.get("stream", "") 214 + if day and seg_key: 215 + segments.add((day, seg_key)) 216 + if stream: 217 + streams.add(stream) 218 + 219 + # Derive confidence rating 220 + stream_count = len(streams) 221 + if embedding_count >= 100 and stream_count >= 3: 222 + confidence = "strong" 223 + elif embedding_count >= 20 or stream_count >= 2: 224 + confidence = "moderate" 225 + else: 226 + confidence = "developing" 227 + 228 + speakers.append({ 229 + "entity_id": entity_id, 230 + "name": entity.get("name", entity_id), 231 + "embeddings": embedding_count, 232 + "segments": len(segments), 233 + "streams": stream_count, 234 + "confidence": confidence, 235 + }) 236 + 237 + # Sort by embedding count descending 238 + speakers.sort(key=lambda s: s["embeddings"], reverse=True) 239 + 240 + return { 241 + "total": len(speakers), 242 + "total_voiceprint_embeddings": total_voiceprint_embeddings, 243 + "top": speakers[:10], 244 + } 245 + 246 + 247 + def get_clusters_status() -> dict[str, Any]: 248 + """Aggregate discovery cluster statistics from cache.""" 249 + cache_path = Path(get_journal()) / "awareness" / "discovery_clusters.json" 250 + 251 + if not cache_path.exists(): 252 + return { 253 + "total_unmatched": 0, 254 + "candidate_count": 0, 255 + "candidates": [], 256 + } 257 + 258 + try: 259 + with open(cache_path, encoding="utf-8") as f: 260 + cache_data = json.load(f) 261 + except (json.JSONDecodeError, OSError): 262 + return { 263 + "total_unmatched": 0, 264 + "candidate_count": 0, 265 + "candidates": [], 266 + } 267 + 268 + clusters = cache_data.get("clusters", {}) 269 + total_unmatched = sum(len(members) for members in clusters.values()) 270 + 271 + candidates: list[dict[str, Any]] = [] 272 + for cluster_id, members in clusters.items(): 273 + segment_set = { 274 + (m["day"], m["stream"], m["segment_key"]) for m in members 275 + } 276 + # Get a preview from the first member's text if possible 277 + preview = "" 278 + if members: 279 + first = members[0] 280 + seg_dir = segment_path(first["day"], first["segment_key"], first["stream"]) 281 + jsonl_path = seg_dir / f"{first['source']}.jsonl" 282 + if jsonl_path.exists(): 283 + try: 284 + lines = jsonl_path.read_text(encoding="utf-8").splitlines() 285 + sid = int(first.get("sentence_id", 0)) 286 + if 0 < sid < len(lines): 287 + entry = json.loads(lines[sid]) 288 + preview = (entry.get("text") or "")[:80] 289 + except Exception: 290 + pass 291 + 292 + candidates.append({ 293 + "cluster_id": int(cluster_id), 294 + "size": len(members), 295 + "segment_count": len(segment_set), 296 + "preview": preview, 297 + }) 298 + 299 + candidates.sort(key=lambda c: c["size"], reverse=True) 300 + 301 + return { 302 + "total_unmatched": total_unmatched, 303 + "candidate_count": len(candidates), 304 + "candidates": candidates, 305 + } 306 + 307 + 308 + def get_imports_status() -> dict[str, Any]: 309 + """Aggregate import signal statistics.""" 310 + from apps.speakers.attribution import ( 311 + _extract_meeting_participants, 312 + _extract_screen_participants, 313 + _load_setting_field, 314 + _parse_setting_names, 315 + ) 316 + 317 + settings_with_participants = 0 318 + meetings_with_attendees = 0 319 + screen_with_participants = 0 320 + 321 + seen_meeting_days: set[str] = set() 322 + 323 + for day_name in day_dirs().keys(): 324 + for stream, seg_key, seg_path in iter_segments(day_name): 325 + # Check setting field 326 + setting = _load_setting_field(seg_path) 327 + if setting: 328 + names = _parse_setting_names(setting) 329 + if names: 330 + settings_with_participants += 1 331 + 332 + # Check screen.md 333 + screen_names = _extract_screen_participants(seg_path) 334 + if screen_names: 335 + screen_with_participants += 1 336 + 337 + # Check meetings.md (once per day) 338 + if day_name not in seen_meeting_days: 339 + meeting_names = _extract_meeting_participants(day_name, seg_key) 340 + if meeting_names: 341 + meetings_with_attendees += 1 342 + seen_meeting_days.add(day_name) 343 + 344 + return { 345 + "settings_with_participants": settings_with_participants, 346 + "meetings_with_attendees": meetings_with_attendees, 347 + "screen_with_participants": screen_with_participants, 348 + } 349 + 350 + 351 + def get_attribution_status() -> dict[str, Any]: 352 + """Aggregate attribution coverage statistics.""" 353 + _, _, load_speaker_labels, _, scan_segment_embeddings = _routes_helpers() 354 + 355 + segments_with_embeddings = 0 356 + segments_with_labels = 0 357 + total_sentences = 0 358 + high_count = 0 359 + medium_count = 0 360 + null_count = 0 361 + method_breakdown: dict[str, int] = {} 362 + 363 + for day_name in day_dirs().keys(): 364 + for segment in scan_segment_embeddings(day_name): 365 + segments_with_embeddings += 1 366 + seg_dir = segment_path(day_name, segment["key"], segment["stream"]) 367 + labels_data = load_speaker_labels(seg_dir) 368 + if labels_data is None: 369 + continue 370 + 371 + segments_with_labels += 1 372 + for label in labels_data.get("labels", []): 373 + total_sentences += 1 374 + confidence = label.get("confidence") 375 + method = label.get("method") or "unmatched" 376 + 377 + if confidence == "high": 378 + high_count += 1 379 + elif confidence == "medium": 380 + medium_count += 1 381 + else: 382 + null_count += 1 383 + 384 + method_breakdown[method] = method_breakdown.get(method, 0) + 1 385 + 386 + return { 387 + "segments_with_labels": segments_with_labels, 388 + "coverage_pct": round( 389 + 100.0 * segments_with_labels / segments_with_embeddings, 1 390 + ) 391 + if segments_with_embeddings > 0 392 + else 0.0, 393 + "total_sentences": total_sentences, 394 + "high": high_count, 395 + "medium": medium_count, 396 + "null": null_count, 397 + "needs_review": medium_count + null_count, 398 + "method_breakdown": dict(sorted(method_breakdown.items(), key=lambda x: x[1], reverse=True)), 399 + } 400 + 401 + 402 + def get_status(section: str | None = None) -> dict[str, Any]: 403 + """Return full speaker ID status or a single section. 404 + 405 + Args: 406 + section: Optional section name to return. If None, returns all sections. 407 + 408 + Returns: 409 + Dict with all six sections, or just the requested section. 410 + """ 411 + if section and section not in VALID_SECTIONS: 412 + return {"error": f"Unknown section: {section}. Valid: {', '.join(sorted(VALID_SECTIONS))}"} 413 + 414 + builders = { 415 + "embeddings": get_embeddings_status, 416 + "owner": get_owner_status, 417 + "speakers": get_speakers_status, 418 + "clusters": get_clusters_status, 419 + "imports": get_imports_status, 420 + "attribution": get_attribution_status, 421 + } 422 + 423 + if section: 424 + return {section: builders[section]()} 425 + 426 + return {name: builder() for name, builder in builders.items()}
+12 -5
apps/speakers/tests/test_owner.py
··· 109 109 _owner_embeddings(1, rng), 110 110 ) 111 111 112 - assert detect_owner_candidate() is None 112 + result = detect_owner_candidate() 113 + assert result["status"] == "insufficient_data" 114 + assert result["segments_available"] == 10 115 + assert "recommendation" in result 113 116 114 117 115 118 def test_detect_owner_no_cluster(speakers_env): ··· 128 131 embedding, 129 132 ) 130 133 131 - assert detect_owner_candidate() is None 134 + result = detect_owner_candidate() 135 + assert result["status"] == "no_cluster" 136 + assert "recommendation" in result 132 137 assert get_current()["voiceprint"]["status"] == "no_cluster" 133 138 134 139 ··· 167 172 result = detect_owner_candidate() 168 173 169 174 assert result is not None 170 - assert result["status"] == "candidate" 171 - assert result["cluster_size"] >= 50 172 - assert len(result["samples"]) == 3 175 + assert result["status"] == "candidate_found" 176 + assert result["candidate"]["cluster_size"] >= 50 177 + assert len(result["candidate"]["samples"]) == 3 178 + assert "recommendation" in result 179 + assert result["recommendation"] in ("strong_candidate", "good_candidate", "weak_candidate") 173 180 assert _candidate_path(env.journal).exists() 174 181 assert get_current()["voiceprint"]["status"] == "candidate" 175 182
+264
apps/speakers/tests/test_status.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for the speakers status command.""" 5 + 6 + from __future__ import annotations 7 + 8 + import json 9 + from pathlib import Path 10 + 11 + import numpy as np 12 + 13 + from think.entities import entity_slug 14 + 15 + 16 + def _normalized(vector: np.ndarray) -> np.ndarray: 17 + return vector / np.linalg.norm(vector) 18 + 19 + 20 + def test_status_empty_journal(speakers_env): 21 + from apps.speakers.status import get_status 22 + 23 + speakers_env() 24 + result = get_status() 25 + 26 + assert "embeddings" in result 27 + assert "owner" in result 28 + assert "speakers" in result 29 + assert "clusters" in result 30 + assert "imports" in result 31 + assert "attribution" in result 32 + 33 + assert result["embeddings"]["total_segments"] == 0 34 + assert result["embeddings"]["segments_with_embeddings"] == 0 35 + assert result["owner"]["exists"] is False 36 + assert result["speakers"]["total"] == 0 37 + assert result["clusters"]["candidate_count"] == 0 38 + assert result["attribution"]["segments_with_labels"] == 0 39 + 40 + 41 + def test_status_section_filter(speakers_env): 42 + from apps.speakers.status import get_status 43 + 44 + speakers_env() 45 + result = get_status(section="embeddings") 46 + 47 + assert "embeddings" in result 48 + assert "owner" not in result 49 + 50 + 51 + def test_status_invalid_section(speakers_env): 52 + from apps.speakers.status import get_status 53 + 54 + speakers_env() 55 + result = get_status(section="nonexistent") 56 + 57 + assert "error" in result 58 + 59 + 60 + def test_status_embeddings(speakers_env): 61 + from apps.speakers.status import get_status 62 + 63 + env = speakers_env() 64 + env.create_segment("20240101", "090000_300", ["mic_audio"], num_sentences=10) 65 + env.create_segment("20240101", "100000_300", ["mic_audio"], num_sentences=5) 66 + env.create_segment("20240102", "090000_300", ["mic_audio"], num_sentences=8) 67 + 68 + result = get_status(section="embeddings") 69 + emb = result["embeddings"] 70 + 71 + assert emb["segments_with_embeddings"] == 3 72 + assert emb["total_embeddings"] == 23 73 + assert emb["days_with_embeddings"] == 2 74 + assert len(emb["date_range"]) == 2 75 + assert emb["date_range"][0] == "20240101" 76 + assert emb["date_range"][1] == "20240102" 77 + assert "test" in emb["streams"] 78 + assert emb["streams"]["test"]["segments"] == 3 79 + assert emb["streams"]["test"]["embeddings"] == 23 80 + 81 + 82 + def test_status_owner_no_centroid(speakers_env): 83 + from apps.speakers.status import get_status 84 + 85 + speakers_env() 86 + result = get_status(section="owner") 87 + assert result["owner"]["exists"] is False 88 + 89 + 90 + def test_status_owner_with_centroid(speakers_env): 91 + from apps.speakers.owner import OWNER_THRESHOLD 92 + from apps.speakers.status import get_status 93 + 94 + env = speakers_env() 95 + # Create principal entity with owner centroid 96 + principal_dir = env.create_entity("Self Person", is_principal=True) 97 + centroid = _normalized(np.array([1.0] + [0.0] * 255, dtype=np.float32)) 98 + np.savez_compressed( 99 + principal_dir / "owner_centroid.npz", 100 + centroid=centroid, 101 + cluster_size=np.array(100, dtype=np.int32), 102 + threshold=np.array(OWNER_THRESHOLD, dtype=np.float32), 103 + version=np.array("2026-03-15T10:30:00Z"), 104 + ) 105 + 106 + # Create a segment with embeddings similar to owner 107 + close_embs = np.tile( 108 + _normalized(np.array([0.95, 0.05] + [0.0] * 254, dtype=np.float32)), 109 + (5, 1), 110 + ) 111 + env.create_segment("20240101", "090000_300", ["mic_audio"], embeddings=close_embs) 112 + 113 + result = get_status(section="owner") 114 + owner = result["owner"] 115 + 116 + assert owner["exists"] is True 117 + assert owner["cluster_size"] == 100 118 + assert owner["threshold"] == OWNER_THRESHOLD 119 + assert owner["version"] == "2026-03-15T10:30:00Z" 120 + 121 + 122 + def test_status_speakers(speakers_env): 123 + from apps.speakers.status import get_status 124 + 125 + env = speakers_env() 126 + 127 + # Create entities with voiceprints 128 + env.create_entity( 129 + "Alice Test", 130 + voiceprints=[ 131 + ("20240101", "090000_300", "mic_audio", 1), 132 + ("20240101", "090000_300", "mic_audio", 2), 133 + ("20240101", "090000_300", "mic_audio", 3), 134 + ], 135 + ) 136 + env.create_entity( 137 + "Bob Test", 138 + voiceprints=[ 139 + ("20240101", "100000_300", "mic_audio", 1), 140 + ], 141 + ) 142 + 143 + result = get_status(section="speakers") 144 + spk = result["speakers"] 145 + 146 + assert spk["total"] == 2 147 + assert spk["total_voiceprint_embeddings"] == 4 148 + assert len(spk["top"]) == 2 149 + assert spk["top"][0]["name"] == "Alice Test" 150 + assert spk["top"][0]["embeddings"] == 3 151 + assert spk["top"][0]["confidence"] == "developing" 152 + 153 + 154 + def test_status_clusters_empty(speakers_env): 155 + from apps.speakers.status import get_status 156 + 157 + speakers_env() 158 + result = get_status(section="clusters") 159 + 160 + assert result["clusters"]["total_unmatched"] == 0 161 + assert result["clusters"]["candidate_count"] == 0 162 + 163 + 164 + def test_status_clusters_with_cache(speakers_env): 165 + from apps.speakers.status import get_status 166 + 167 + env = speakers_env() 168 + # Create a segment so transcript text lookup doesn't crash 169 + env.create_segment("20240101", "090000_300", ["mic_audio"], num_sentences=5) 170 + 171 + # Write a discovery cache file 172 + cache_dir = env.journal / "awareness" 173 + cache_dir.mkdir(parents=True, exist_ok=True) 174 + cache_data = { 175 + "version": "2026-03-15T10:00:00", 176 + "clusters": { 177 + "0": [ 178 + { 179 + "day": "20240101", 180 + "stream": "test", 181 + "segment_key": "090000_300", 182 + "source": "mic_audio", 183 + "sentence_id": 1, 184 + }, 185 + { 186 + "day": "20240101", 187 + "stream": "test", 188 + "segment_key": "090000_300", 189 + "source": "mic_audio", 190 + "sentence_id": 2, 191 + }, 192 + ], 193 + }, 194 + } 195 + with open(cache_dir / "discovery_clusters.json", "w") as f: 196 + json.dump(cache_data, f) 197 + 198 + result = get_status(section="clusters") 199 + clusters = result["clusters"] 200 + 201 + assert clusters["total_unmatched"] == 2 202 + assert clusters["candidate_count"] == 1 203 + assert clusters["candidates"][0]["cluster_id"] == 0 204 + assert clusters["candidates"][0]["size"] == 2 205 + 206 + 207 + def test_status_attribution(speakers_env): 208 + from apps.speakers.status import get_status 209 + 210 + env = speakers_env() 211 + env.create_segment("20240101", "090000_300", ["mic_audio"], num_sentences=5) 212 + env.create_segment("20240101", "100000_300", ["mic_audio"], num_sentences=3) 213 + 214 + # Create labels for one segment 215 + env.create_speaker_labels( 216 + "20240101", 217 + "090000_300", 218 + [ 219 + {"sentence_id": 1, "speaker": "alice", "confidence": "high", "method": "owner_centroid"}, 220 + {"sentence_id": 2, "speaker": "alice", "confidence": "high", "method": "owner_centroid"}, 221 + {"sentence_id": 3, "speaker": "bob", "confidence": "medium", "method": "acoustic"}, 222 + {"sentence_id": 4, "speaker": None, "confidence": None, "method": None}, 223 + {"sentence_id": 5, "speaker": "alice", "confidence": "high", "method": "structural_single_speaker"}, 224 + ], 225 + ) 226 + 227 + result = get_status(section="attribution") 228 + attr = result["attribution"] 229 + 230 + assert attr["segments_with_labels"] == 1 231 + assert attr["total_sentences"] == 5 232 + assert attr["high"] == 3 233 + assert attr["medium"] == 1 234 + assert attr["null"] == 1 235 + assert attr["needs_review"] == 2 236 + assert "owner_centroid" in attr["method_breakdown"] 237 + 238 + 239 + def test_status_full_returns_all_sections(speakers_env): 240 + from apps.speakers.status import get_status 241 + 242 + env = speakers_env() 243 + env.create_segment("20240101", "090000_300", ["mic_audio"]) 244 + 245 + result = get_status() 246 + 247 + assert len(result) == 6 248 + for section in ("embeddings", "owner", "speakers", "clusters", "imports", "attribution"): 249 + assert section in result 250 + 251 + 252 + def test_status_json_serializable(speakers_env): 253 + """Ensure the full status output can be serialized to JSON.""" 254 + from apps.speakers.status import get_status 255 + 256 + env = speakers_env() 257 + env.create_segment("20240101", "090000_300", ["mic_audio"]) 258 + 259 + result = get_status() 260 + # This will raise if any value is not JSON-serializable 261 + serialized = json.dumps(result) 262 + assert isinstance(serialized, str) 263 + parsed = json.loads(serialized) 264 + assert "embeddings" in parsed