personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Remove duplicate function definitions from merge

The auto-merge of hopper-ykpvruoc-speakers-import-composition included
both old (position-based) and refined (time-based alignment) versions of
seed_from_imports and related functions. Remove the superseded versions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+1 -389
+1 -185
apps/speakers/bootstrap.py
··· 39 39 load_journal_entity, 40 40 save_journal_entity, 41 41 ) 42 - from think.utils import day_dirs, iter_segments, now_ms, segment_path 42 + from think.utils import day_dirs, now_ms, segment_path 43 43 44 44 logger = logging.getLogger(__name__) 45 45 ··· 276 276 stats["segments_scanned"], 277 277 sum(len(v) for v in entity_embeddings.values()), 278 278 ) 279 - 280 - # Batch save all collected embeddings 281 - if not dry_run: 282 - for entity_id, emb_list in entity_embeddings.items(): 283 - try: 284 - saved = _save_voiceprints_batch(entity_id, emb_list) 285 - stats["embeddings_saved"] += saved 286 - except Exception as e: 287 - name = entity_names.get(entity_id, entity_id) 288 - stats["errors"].append(f"Failed to save for {name}: {e}") 289 - logger.exception("Failed to save voiceprints for %s", entity_id) 290 - else: 291 - stats["embeddings_saved"] = sum(len(v) for v in entity_embeddings.values()) 292 - 293 - return stats 294 - 295 - 296 - # Generic speaker names to skip (AI conversation imports) 297 - _GENERIC_SPEAKERS = frozenset({"Human", "Assistant", "human", "assistant", ""}) 298 - 299 - 300 - def seed_from_imports(dry_run: bool = False) -> dict[str, Any]: 301 - """Seed voiceprints from speaker-attributed import transcripts. 302 - 303 - Scans import-stream segments for conversation_transcript.jsonl files 304 - with per-line speaker attribution. Maps speaker names to journal 305 - entities and saves corresponding embeddings as voiceprints. 306 - 307 - Unlike bootstrap_voiceprints(), this does NOT create new entities — 308 - unmatched speaker names are skipped. 309 - 310 - Args: 311 - dry_run: If True, report what would be saved without saving 312 - 313 - Returns: 314 - Dict with statistics about the seed run 315 - """ 316 - ( 317 - load_embeddings_file, 318 - normalize_embedding, 319 - _, 320 - _, 321 - ) = _routes_helpers() 322 - 323 - # Load owner centroid — required for owner subtraction 324 - centroid_data = load_owner_centroid() 325 - if centroid_data is None: 326 - return {"error": "No confirmed owner centroid. Run owner detection first."} 327 - 328 - owner_centroid, owner_threshold = centroid_data 329 - 330 - # Load all journal entities for speaker name matching 331 - journal_entities = load_all_journal_entities() 332 - entities_list = [e for e in journal_entities.values() if not e.get("blocked")] 333 - 334 - stats: dict[str, Any] = { 335 - "segments_scanned": 0, 336 - "segments_with_speakers": 0, 337 - "speakers_found": {}, 338 - "embeddings_saved": 0, 339 - "embeddings_skipped_owner": 0, 340 - "embeddings_skipped_duplicate": 0, 341 - "errors": [], 342 - } 343 - 344 - # Collect embeddings per entity for efficient batch saves 345 - entity_embeddings: dict[str, list[tuple[np.ndarray, dict]]] = defaultdict(list) 346 - entity_existing: dict[str, set] = {} 347 - entity_names: dict[str, str] = {} 348 - 349 - days = sorted(day_dirs().keys()) 350 - 351 - for day in days: 352 - for stream, seg_key, seg_dir in iter_segments(day): 353 - # Only process import streams 354 - if not stream.startswith("import."): 355 - continue 356 - 357 - stats["segments_scanned"] += 1 358 - 359 - # Read conversation_transcript.jsonl 360 - jsonl_path = seg_dir / "conversation_transcript.jsonl" 361 - if not jsonl_path.exists(): 362 - continue 363 - 364 - try: 365 - lines = jsonl_path.read_text(encoding="utf-8").strip().split("\n") 366 - except OSError as e: 367 - stats["errors"].append(f"Failed to read {jsonl_path}: {e}") 368 - continue 369 - 370 - if len(lines) < 2: 371 - continue 372 - 373 - # Build sentence_id -> speaker mapping 374 - # Line 0 is metadata header. Lines 1+ are entries. 375 - # sentence_id is 1-based, matching statement_ids in NPZ. 376 - sid_to_speaker: dict[int, str] = {} 377 - has_real_speakers = False 378 - for line_idx in range(1, len(lines)): 379 - try: 380 - entry = json.loads(lines[line_idx]) 381 - except (json.JSONDecodeError, IndexError): 382 - continue 383 - speaker = entry.get("speaker", "") 384 - if speaker and speaker not in _GENERIC_SPEAKERS: 385 - sid_to_speaker[line_idx] = speaker 386 - has_real_speakers = True 387 - 388 - if not has_real_speakers: 389 - continue 390 - 391 - stats["segments_with_speakers"] += 1 392 - 393 - # Find audio embedding NPZ files in this segment 394 - # Accept both "<source>_audio" pattern and plain "audio" 395 - npz_files = list(seg_dir.glob("*.npz")) 396 - sources = [ 397 - f.stem 398 - for f in npz_files 399 - if f.stem.endswith("_audio") or f.stem == "audio" 400 - ] 401 - if not sources: 402 - continue 403 - 404 - for source in sources: 405 - emb_data = load_embeddings_file(seg_dir / f"{source}.npz") 406 - if emb_data is None: 407 - continue 408 - 409 - embeddings, statement_ids = emb_data 410 - 411 - for embedding, sid in zip(embeddings, statement_ids): 412 - sentence_id = int(sid) 413 - speaker_name = sid_to_speaker.get(sentence_id) 414 - if not speaker_name: 415 - continue 416 - 417 - # Match speaker to entity — skip if no match 418 - entity = find_matching_entity(speaker_name, entities_list) 419 - if not entity: 420 - continue 421 - 422 - entity_id = entity["id"] 423 - entity_name = entity.get("name", speaker_name) 424 - entity_names[entity_id] = entity_name 425 - stats["speakers_found"].setdefault(entity_name, 0) 426 - 427 - # Load existing voiceprint keys for idempotency (once per entity) 428 - if entity_id not in entity_existing: 429 - entity_existing[entity_id] = _load_existing_voiceprint_keys( 430 - entity_id 431 - ) 432 - 433 - existing_keys = entity_existing[entity_id] 434 - vp_key = (day, seg_key, source, sentence_id) 435 - 436 - # Idempotency: skip if already saved 437 - if vp_key in existing_keys: 438 - stats["embeddings_skipped_duplicate"] += 1 439 - continue 440 - 441 - normalized = normalize_embedding(embedding) 442 - if normalized is None: 443 - continue 444 - 445 - # Contamination guard: reject embeddings too similar to owner 446 - owner_score = float(np.dot(normalized, owner_centroid)) 447 - if owner_score >= owner_threshold: 448 - stats["embeddings_skipped_owner"] += 1 449 - continue 450 - 451 - metadata = { 452 - "day": day, 453 - "segment_key": seg_key, 454 - "source": source, 455 - "stream": stream, 456 - "sentence_id": sentence_id, 457 - "added_at": now_ms(), 458 - } 459 - 460 - entity_embeddings[entity_id].append((normalized, metadata)) 461 - existing_keys.add(vp_key) 462 - stats["speakers_found"][entity_name] += 1 463 279 464 280 # Batch save all collected embeddings 465 281 if not dry_run:
-124
apps/speakers/call.py
··· 489 489 from apps.speakers.suggest import format_suggestions 490 490 491 491 typer.echo(format_suggestions(results)) 492 - 493 - 494 - @app.command("link-import") 495 - def link_import( 496 - name: str = typer.Argument(..., help="Import participant name to link."), 497 - entity_id: str = typer.Option( 498 - ..., "--entity-id", help="Journal entity ID to link to." 499 - ), 500 - ) -> None: 501 - """Link an import participant name to a journal entity as an aka.""" 502 - import json as json_mod 503 - 504 - from think.entities.journal import ( 505 - load_all_journal_entities, 506 - load_journal_entity, 507 - save_journal_entity, 508 - ) 509 - from think.entities.matching import validate_aka_uniqueness 510 - from think.utils import now_ms 511 - 512 - entity = load_journal_entity(entity_id) 513 - if entity is None: 514 - typer.echo( 515 - json_mod.dumps({"error": f"Entity not found: {entity_id}"}, indent=2), 516 - err=True, 517 - ) 518 - raise typer.Exit(1) 519 - 520 - existing_aka = set(entity.get("aka", [])) 521 - already_present = name in existing_aka 522 - if not already_present: 523 - # Check for alias collision with other entities 524 - all_entities = load_all_journal_entities() 525 - entities_list = [e for e in all_entities.values() if not e.get("blocked")] 526 - collision = validate_aka_uniqueness( 527 - name, entities_list, exclude_entity_name=entity.get("name") 528 - ) 529 - if collision: 530 - typer.echo( 531 - json_mod.dumps( 532 - { 533 - "error": f"Name '{name}' conflicts with entity: {collision}", 534 - }, 535 - indent=2, 536 - ), 537 - err=True, 538 - ) 539 - raise typer.Exit(1) 540 - 541 - existing_aka.add(name) 542 - entity["aka"] = sorted(existing_aka) 543 - entity["updated_at"] = now_ms() 544 - save_journal_entity(entity) 545 - 546 - typer.echo( 547 - json_mod.dumps( 548 - { 549 - "linked": True, 550 - "entity_id": entity_id, 551 - "name_added": name, 552 - "already_present": already_present, 553 - }, 554 - indent=2, 555 - default=str, 556 - ) 557 - ) 558 - 559 - 560 - @app.command("seed-from-imports") 561 - def seed_from_imports_cmd( 562 - dry_run: bool = typer.Option( 563 - False, "--dry-run", help="Show what would be saved without saving." 564 - ), 565 - json_output: bool = typer.Option( 566 - False, "--json", help="Output full result as JSON." 567 - ), 568 - ) -> None: 569 - """Seed voiceprints from speaker-attributed import transcripts. 570 - 571 - Scans import-stream segments for conversation_transcript.jsonl files 572 - with per-line speaker attribution. Maps speaker names to journal 573 - entities and saves corresponding embeddings as voiceprints. 574 - """ 575 - from apps.speakers.bootstrap import seed_from_imports 576 - 577 - if dry_run and not json_output: 578 - typer.echo("DRY RUN — no voiceprints will be saved\n") 579 - 580 - if not json_output: 581 - typer.echo("Seeding voiceprints from import transcripts...") 582 - stats = seed_from_imports(dry_run=dry_run) 583 - 584 - if "error" in stats: 585 - typer.echo(f"Error: {stats['error']}", err=True) 586 - raise typer.Exit(1) 587 - if json_output: 588 - import json as json_mod 589 - 590 - typer.echo(json_mod.dumps(stats, indent=2, default=str)) 591 - return 592 - 593 - typer.echo(f"\nImport segments scanned: {stats['segments_scanned']}") 594 - typer.echo(f"Segments with speakers: {stats['segments_with_speakers']}") 595 - typer.echo(f"Unique speakers: {len(stats['speakers_found'])}") 596 - typer.echo(f"Embeddings saved: {stats['embeddings_saved']}") 597 - typer.echo(f"Embeddings skipped (owner): {stats['embeddings_skipped_owner']}") 598 - typer.echo( 599 - f"Embeddings skipped (duplicate): {stats['embeddings_skipped_duplicate']}" 600 - ) 601 - 602 - if stats["speakers_found"]: 603 - typer.echo("\nTop speakers by embedding count:") 604 - sorted_speakers = sorted( 605 - stats["speakers_found"].items(), key=lambda x: x[1], reverse=True 606 - ) 607 - for name, count in sorted_speakers[:15]: 608 - typer.echo(f" {name}: {count}") 609 - if len(sorted_speakers) > 15: 610 - typer.echo(f" ... and {len(sorted_speakers) - 15} more") 611 - 612 - if stats["errors"]: 613 - typer.echo(f"\nErrors ({len(stats['errors'])}):", err=True) 614 - for err in stats["errors"]: 615 - typer.echo(f" {err}", err=True)
-80
apps/speakers/tests/conftest.py
··· 117 117 118 118 return segment_dir 119 119 120 - def create_import_segment( 121 - self, 122 - day: str, 123 - segment_key: str, 124 - speakers_text: list[tuple[str, str]], 125 - source: str = "audio", 126 - *, 127 - stream: str = "import.granola", 128 - embeddings: np.ndarray | None = None, 129 - ) -> Path: 130 - """Create an import segment with conversation_transcript.jsonl and NPZ. 131 - 132 - Args: 133 - day: Day string (YYYYMMDD) 134 - segment_key: Segment key (HHMMSS_LEN) 135 - speakers_text: List of (speaker_name, text) tuples for transcript lines 136 - source: Audio source stem for NPZ file (default "audio") 137 - stream: Import stream name (default "import.granola") 138 - embeddings: Optional custom embeddings array (N×256) 139 - """ 140 - segment_dir = self.journal / day / stream / segment_key 141 - segment_dir.mkdir(parents=True, exist_ok=True) 142 - 143 - num_sentences = len(speakers_text) 144 - 145 - # Write conversation_transcript.jsonl 146 - jsonl_path = segment_dir / "conversation_transcript.jsonl" 147 - lines = [ 148 - json.dumps( 149 - { 150 - "imported": {"id": "test-import", "source": "test"}, 151 - "topics": "test", 152 - "setting": "meeting", 153 - } 154 - ) 155 - ] 156 - 157 - time_part = segment_key.split("_")[0] 158 - base_h = int(time_part[0:2]) 159 - base_m = int(time_part[2:4]) 160 - base_s = int(time_part[4:6]) 161 - base_seconds = base_h * 3600 + base_m * 60 + base_s 162 - 163 - for i, (speaker, text) in enumerate(speakers_text): 164 - offset = i * 5 165 - abs_seconds = base_seconds + offset 166 - h = (abs_seconds // 3600) % 24 167 - m = (abs_seconds % 3600) // 60 168 - s = abs_seconds % 60 169 - lines.append( 170 - json.dumps( 171 - { 172 - "start": f"{h:02d}:{m:02d}:{s:02d}", 173 - "speaker": speaker, 174 - "text": text, 175 - "source": "import", 176 - } 177 - ) 178 - ) 179 - jsonl_path.write_text("\n".join(lines) + "\n") 180 - 181 - # Create NPZ embeddings 182 - npz_path = segment_dir / f"{source}.npz" 183 - if embeddings is None: 184 - source_embeddings = np.random.randn(num_sentences, 256).astype( 185 - np.float32 186 - ) 187 - norms = np.linalg.norm(source_embeddings, axis=1, keepdims=True) 188 - source_embeddings = source_embeddings / norms 189 - else: 190 - source_embeddings = embeddings.astype(np.float32) 191 - statement_ids = np.arange(1, num_sentences + 1, dtype=np.int32) 192 - np.savez_compressed( 193 - npz_path, 194 - embeddings=source_embeddings, 195 - statement_ids=statement_ids, 196 - ) 197 - 198 - return segment_dir 199 - 200 120 def create_embedding(self, vector: list[float] | None = None) -> np.ndarray: 201 121 """Create a normalized 256-dim embedding.""" 202 122 if vector is None: