personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement entity deep merge (spec: entity-deep-merge)

Replace shallow aka-only merge with phased deep merge that rewrites
all references, consolidates all data, and deletes the alias entity:

- Phase 0: merged_into resume marker on alias entity
- Phase 1: Merge identity data (akas, emails)
- Phase 2: Merge voiceprints with deduplication (existing)
- Phase 3: Merge facet relationships (move or consolidate per facet)
- Phase 4: Rewrite speaker_labels.json and speaker_corrections.json
across all segments (byte-level fast path skips unrelated files)
- Phase 5: Delete alias entity directory, bust discovery cache

Delete-last ordering ensures interrupt safety — the alias entity
exists and resolves correctly until all references are rewritten.

Also updates resolve_name_variants to call deep merge instead of
doing manual aka-only writes.

26 tests covering all acceptance criteria. No regressions in
existing attribution tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+1093 -35
+246 -27
apps/speakers/bootstrap.py
··· 24 24 import bisect 25 25 import json 26 26 import logging 27 + import shutil 27 28 from collections import defaultdict 28 29 from pathlib import Path 29 30 from typing import Any ··· 39 40 load_journal_entity, 40 41 save_journal_entity, 41 42 ) 42 - from think.utils import day_dirs, now_ms, segment_path 43 + from think.utils import day_dirs, get_journal, iter_segments, now_ms, segment_path 43 44 44 45 logger = logging.getLogger(__name__) 45 46 ··· 296 297 297 298 298 299 def merge_names(alias_name: str, canonical_name: str) -> dict[str, Any]: 299 - """Merge a speaker name variant into a canonical entity. 300 + """Deep merge a speaker entity into a canonical entity. 300 301 301 - Finds both entities by name, adds the alias name as an aka on the 302 - canonical entity, and merges voiceprint embeddings with deduplication. 302 + Performs a phased deep merge: identity data, voiceprints, facet 303 + relationships, speaker references, then deletes the alias entity. 304 + Designed for interrupt safety — delete-last ordering ensures the 305 + system is never in an unrecoverable state. Every phase is idempotent. 303 306 304 307 Args: 305 308 alias_name: The alias/variant name to merge from ··· 311 314 _, normalize_embedding, _, load_entity_voiceprints_file = _routes_helpers() 312 315 313 316 journal_entities = load_all_journal_entities() 314 - entities_list = [e for e in journal_entities.values() if not e.get("blocked")] 317 + entities_list = list(journal_entities.values()) 315 318 319 + # --- Phase 0: Resolve and validate --- 316 320 alias_entity = find_matching_entity(alias_name, entities_list) 317 321 if not alias_entity: 318 322 return {"error": f"No entity found for alias: {alias_name}"} ··· 327 331 if alias_id == canonical_id: 328 332 return {"error": "Alias and canonical resolve to the same entity."} 329 333 330 - # Add alias name as aka on canonical entity 334 + alias = load_journal_entity(alias_id) 335 + if not alias: 336 + return {"error": f"Failed to load alias entity: {alias_id}"} 337 + 331 338 canonical = load_journal_entity(canonical_id) 332 339 if not canonical: 333 340 return {"error": f"Failed to load canonical entity: {canonical_id}"} 334 341 335 - alias_display = alias_entity.get("name", alias_name) 342 + if alias.get("is_principal") or canonical.get("is_principal"): 343 + return {"error": "Cannot merge the principal entity."} 344 + if alias.get("blocked"): 345 + return {"error": f"Cannot merge blocked entity: {alias_id}"} 346 + if canonical.get("blocked"): 347 + return {"error": f"Cannot merge blocked entity: {canonical_id}"} 348 + 349 + alias_display = alias.get("name", alias_name) 350 + 351 + # Set merged_into resume marker on alias 352 + alias["merged_into"] = canonical_id 353 + alias["updated_at"] = now_ms() 354 + save_journal_entity(alias) 355 + 356 + # --- Phase 1: Merge identity data --- 357 + akas_added: list[str] = [] 336 358 existing_aka = set(canonical.get("aka", [])) 337 - if alias_display not in existing_aka: 359 + canonical_name_val = canonical.get("name", "") 360 + 361 + # Add alias display name as aka 362 + if alias_display not in existing_aka and alias_display != canonical_name_val: 338 363 existing_aka.add(alias_display) 339 - canonical["aka"] = sorted(existing_aka) 340 - canonical["updated_at"] = now_ms() 341 - save_journal_entity(canonical) 364 + akas_added.append(alias_display) 365 + 366 + # Merge alias's akas 367 + for aka in alias.get("aka", []): 368 + if aka not in existing_aka and aka != canonical_name_val: 369 + existing_aka.add(aka) 370 + akas_added.append(aka) 371 + 372 + canonical["aka"] = sorted(existing_aka) 373 + 374 + # Merge emails 375 + canonical_emails = {e.lower() for e in canonical.get("emails", [])} 376 + for email in alias.get("emails", []): 377 + canonical_emails.add(email.lower()) 378 + if canonical_emails: 379 + canonical["emails"] = sorted(canonical_emails) 380 + 381 + canonical["updated_at"] = now_ms() 382 + save_journal_entity(canonical) 342 383 343 - # Merge voiceprint embeddings 384 + # --- Phase 2: Merge voiceprints --- 344 385 alias_vp = load_entity_voiceprints_file(alias_id) 345 - embeddings_merged = 0 386 + voiceprints_merged = 0 346 387 347 388 if alias_vp is not None: 348 389 alias_embeddings, alias_metadata = alias_vp ··· 364 405 existing_keys.add(key) 365 406 366 407 if new_items: 367 - embeddings_merged = _save_voiceprints_batch(canonical_id, new_items) 408 + voiceprints_merged = _save_voiceprints_batch(canonical_id, new_items) 368 409 369 410 canonical_vp = load_entity_voiceprints_file(canonical_id) 370 - total = len(canonical_vp[0]) if canonical_vp else 0 411 + voiceprints_total = len(canonical_vp[0]) if canonical_vp else 0 412 + 413 + # --- Phase 3: Merge facet relationships --- 414 + facets_merged: list[str] = [] 415 + facets_moved: list[str] = [] 416 + journal = get_journal() 417 + facets_dir = Path(journal) / "facets" 418 + 419 + if facets_dir.exists(): 420 + for facet_entry in sorted(facets_dir.iterdir()): 421 + if not facet_entry.is_dir(): 422 + continue 423 + facet_name = facet_entry.name 424 + alias_rel_dir = facet_entry / "entities" / alias_id 425 + alias_rel_path = alias_rel_dir / "entity.json" 426 + if not alias_rel_path.is_file(): 427 + continue 428 + 429 + canonical_rel_dir = facet_entry / "entities" / canonical_id 430 + canonical_rel_path = canonical_rel_dir / "entity.json" 431 + 432 + if not canonical_rel_path.is_file(): 433 + # Move: rename alias relationship dir to canonical 434 + if canonical_rel_dir.exists(): 435 + shutil.rmtree(canonical_rel_dir) 436 + alias_rel_dir.rename(canonical_rel_dir) 437 + # Update entity_id inside the moved entity.json 438 + moved_path = canonical_rel_dir / "entity.json" 439 + try: 440 + with open(moved_path, encoding="utf-8") as f: 441 + rel_data = json.load(f) 442 + rel_data["entity_id"] = canonical_id 443 + tmp = moved_path.with_suffix(".tmp") 444 + with open(tmp, "w", encoding="utf-8") as f: 445 + json.dump(rel_data, f, ensure_ascii=False, indent=2) 446 + f.write("\n") 447 + tmp.rename(moved_path) 448 + except (json.JSONDecodeError, OSError): 449 + pass 450 + facets_moved.append(facet_name) 451 + else: 452 + # Both have relationships: merge timestamps and data 453 + try: 454 + with open(alias_rel_path, encoding="utf-8") as f: 455 + alias_rel = json.load(f) 456 + with open(canonical_rel_path, encoding="utf-8") as f: 457 + canonical_rel = json.load(f) 458 + except (json.JSONDecodeError, OSError): 459 + continue 460 + 461 + # Merge timestamps: earliest attached_at 462 + alias_attached = alias_rel.get("attached_at") 463 + canonical_attached = canonical_rel.get("attached_at") 464 + if alias_attached and ( 465 + not canonical_attached or alias_attached < canonical_attached 466 + ): 467 + canonical_rel["attached_at"] = alias_attached 468 + 469 + # Latest updated_at and last_seen 470 + for ts_field in ("updated_at", "last_seen"): 471 + alias_ts = alias_rel.get(ts_field) 472 + canonical_ts = canonical_rel.get(ts_field) 473 + if alias_ts and (not canonical_ts or alias_ts > canonical_ts): 474 + canonical_rel[ts_field] = alias_ts 475 + 476 + # Merge description: keep canonical's if non-empty 477 + if not canonical_rel.get("description") and alias_rel.get( 478 + "description" 479 + ): 480 + canonical_rel["description"] = alias_rel["description"] 481 + 482 + # Save merged relationship (atomic write) 483 + canonical_rel["entity_id"] = canonical_id 484 + content = ( 485 + json.dumps(canonical_rel, ensure_ascii=False, indent=2) + "\n" 486 + ) 487 + tmp = canonical_rel_path.with_suffix(".tmp") 488 + with open(tmp, "w", encoding="utf-8") as f: 489 + f.write(content) 490 + tmp.rename(canonical_rel_path) 491 + 492 + # Merge observations: append alias's to canonical's 493 + alias_obs_path = alias_rel_dir / "observations.jsonl" 494 + if alias_obs_path.exists(): 495 + alias_obs = alias_obs_path.read_text(encoding="utf-8") 496 + if alias_obs.strip(): 497 + canonical_obs_path = ( 498 + canonical_rel_dir / "observations.jsonl" 499 + ) 500 + existing_obs = "" 501 + if canonical_obs_path.exists(): 502 + existing_obs = canonical_obs_path.read_text( 503 + encoding="utf-8" 504 + ) 505 + with open( 506 + canonical_obs_path, "a", encoding="utf-8" 507 + ) as f: 508 + if existing_obs and not existing_obs.endswith("\n"): 509 + f.write("\n") 510 + f.write(alias_obs) 511 + if not alias_obs.endswith("\n"): 512 + f.write("\n") 513 + 514 + # Delete alias relationship directory 515 + shutil.rmtree(alias_rel_dir) 516 + facets_merged.append(facet_name) 517 + 518 + # --- Phase 4: Rewrite speaker references --- 519 + segments_scanned = 0 520 + labels_rewritten = 0 521 + corrections_rewritten = 0 522 + errors: list[str] = [] 523 + alias_id_bytes = alias_id.encode("utf-8") 524 + 525 + for day in sorted(day_dirs().keys()): 526 + for _stream, _seg_key, seg_path in iter_segments(day): 527 + segments_scanned += 1 528 + agents_dir = seg_path / "agents" 529 + 530 + # Rewrite speaker_labels.json 531 + labels_path = agents_dir / "speaker_labels.json" 532 + if labels_path.is_file(): 533 + try: 534 + raw = labels_path.read_bytes() 535 + if alias_id_bytes in raw: 536 + data = json.loads(raw) 537 + changed = False 538 + for label in data.get("labels", []): 539 + if label.get("speaker") == alias_id: 540 + label["speaker"] = canonical_id 541 + changed = True 542 + if changed: 543 + tmp = labels_path.with_suffix(".tmp") 544 + with open(tmp, "w", encoding="utf-8") as f: 545 + json.dump(data, f, indent=2) 546 + tmp.rename(labels_path) 547 + labels_rewritten += 1 548 + except Exception as e: 549 + errors.append(f"{labels_path}: {e}") 550 + 551 + # Rewrite speaker_corrections.json 552 + corrections_path = agents_dir / "speaker_corrections.json" 553 + if corrections_path.is_file(): 554 + try: 555 + raw = corrections_path.read_bytes() 556 + if alias_id_bytes in raw: 557 + data = json.loads(raw) 558 + changed = False 559 + for correction in data.get("corrections", []): 560 + if correction.get("original_speaker") == alias_id: 561 + correction["original_speaker"] = canonical_id 562 + changed = True 563 + if correction.get("corrected_speaker") == alias_id: 564 + correction["corrected_speaker"] = canonical_id 565 + changed = True 566 + if changed: 567 + tmp = corrections_path.with_suffix(".tmp") 568 + with open(tmp, "w", encoding="utf-8") as f: 569 + json.dump(data, f, indent=2) 570 + tmp.rename(corrections_path) 571 + corrections_rewritten += 1 572 + except Exception as e: 573 + errors.append(f"{corrections_path}: {e}") 574 + 575 + # --- Phase 5: Cleanup --- 576 + alias_entity_dir = Path(journal) / "entities" / alias_id 577 + if alias_entity_dir.exists(): 578 + shutil.rmtree(alias_entity_dir) 579 + 580 + discovery_cache = Path(journal) / "awareness" / "discovery_clusters.json" 581 + if discovery_cache.exists(): 582 + discovery_cache.unlink() 371 583 372 584 return { 373 585 "merged": True, 374 586 "alias": alias_display, 375 - "canonical_entity_id": canonical_id, 587 + "alias_id": alias_id, 376 588 "canonical_name": canonical.get("name", canonical_name), 377 - "embeddings_merged": embeddings_merged, 378 - "total_embeddings": total, 589 + "canonical_id": canonical_id, 590 + "akas_added": akas_added, 591 + "voiceprints_merged": voiceprints_merged, 592 + "voiceprints_total": voiceprints_total, 593 + "facets_merged": facets_merged, 594 + "facets_moved": facets_moved, 595 + "segments_scanned": segments_scanned, 596 + "labels_rewritten": labels_rewritten, 597 + "corrections_rewritten": corrections_rewritten, 598 + "errors": errors, 379 599 } 380 600 381 601 ··· 529 749 ) 530 750 continue 531 751 532 - # Auto-merge: add alias as aka on canonical entity 752 + # Auto-merge: call deep merge 533 753 if not dry_run: 534 754 try: 535 - je = load_journal_entity(canonical_id) 536 - if je: 537 - existing_aka = set(je.get("aka", [])) 538 - if alias_name not in existing_aka: 539 - existing_aka.add(alias_name) 540 - je["aka"] = sorted(existing_aka) 541 - je["updated_at"] = now_ms() 542 - save_journal_entity(je) 755 + result = merge_names(alias_name, canonical_name) 756 + if result.get("error"): 757 + stats["errors"].append( 758 + f"Failed to merge {alias_name} -> {canonical_name}: " 759 + f"{result['error']}" 760 + ) 761 + continue 543 762 except Exception as e: 544 763 stats["errors"].append( 545 764 f"Failed to merge {alias_name} -> {canonical_name}: {e}"
+82
apps/speakers/tests/conftest.py
··· 230 230 231 231 return labels_path 232 232 233 + def create_speaker_corrections( 234 + self, 235 + day: str, 236 + segment_key: str, 237 + corrections: list[dict], 238 + *, 239 + stream: str | None = None, 240 + ) -> Path: 241 + """Create a speaker_corrections.json file in a segment directory. 242 + 243 + Args: 244 + day: Day string (YYYYMMDD) 245 + segment_key: Segment key (HHMMSS_LEN) 246 + corrections: List of correction dicts with sentence_id, 247 + original_speaker, corrected_speaker, timestamp 248 + stream: Optional stream name (defaults to STREAM) 249 + """ 250 + agents_dir = ( 251 + self.journal / day / (stream or STREAM) / segment_key / "agents" 252 + ) 253 + agents_dir.mkdir(parents=True, exist_ok=True) 254 + 255 + data = {"corrections": corrections} 256 + corrections_path = agents_dir / "speaker_corrections.json" 257 + with open(corrections_path, "w", encoding="utf-8") as f: 258 + json.dump(data, f) 259 + 260 + return corrections_path 261 + 262 + def create_facet_relationship( 263 + self, 264 + facet: str, 265 + entity_id: str, 266 + *, 267 + description: str = "", 268 + attached_at: int = 1700000000000, 269 + updated_at: int | None = None, 270 + last_seen: str | None = None, 271 + observations: list[str] | None = None, 272 + ) -> Path: 273 + """Create a facet relationship for an entity. 274 + 275 + Args: 276 + facet: Facet name (e.g., "work", "personal") 277 + entity_id: Entity ID (slug) 278 + description: Relationship description 279 + attached_at: When the relationship was created 280 + updated_at: Last update timestamp 281 + last_seen: Last seen day string (YYYYMMDD) 282 + observations: Optional list of observation strings 283 + """ 284 + rel_dir = self.journal / "facets" / facet / "entities" / entity_id 285 + rel_dir.mkdir(parents=True, exist_ok=True) 286 + 287 + relationship: dict = { 288 + "entity_id": entity_id, 289 + "attached_at": attached_at, 290 + } 291 + if description: 292 + relationship["description"] = description 293 + if updated_at is not None: 294 + relationship["updated_at"] = updated_at 295 + if last_seen is not None: 296 + relationship["last_seen"] = last_seen 297 + 298 + with open(rel_dir / "entity.json", "w", encoding="utf-8") as f: 299 + json.dump(relationship, f, indent=2) 300 + 301 + if observations: 302 + with open( 303 + rel_dir / "observations.jsonl", "w", encoding="utf-8" 304 + ) as f: 305 + for obs in observations: 306 + f.write( 307 + json.dumps( 308 + {"content": obs, "observed_at": 1700000000000} 309 + ) 310 + + "\n" 311 + ) 312 + 313 + return rel_dir 314 + 233 315 def create_import_segment( 234 316 self, 235 317 day: str,
+765 -8
apps/speakers/tests/test_merge_names.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """Tests for merge-names CLI command.""" 4 + """Tests for entity deep merge.""" 5 5 6 6 from __future__ import annotations 7 7 ··· 10 10 import numpy as np 11 11 from typer.testing import CliRunner 12 12 13 + from apps.speakers.bootstrap import merge_names 13 14 from apps.speakers.call import app as speakers_app 15 + from think.entities.journal import load_journal_entity, scan_journal_entities 14 16 15 17 _runner = CliRunner() 16 18 19 + # Match conftest default stream 20 + STREAM = "test" 21 + 22 + 23 + # --------------------------------------------------------------------------- 24 + # Core merge behavior 25 + # --------------------------------------------------------------------------- 26 + 27 + 28 + def test_deep_merge_full(speakers_env): 29 + """Full deep merge: identity, voiceprints, facets, speaker refs, cleanup.""" 30 + env = speakers_env() 31 + 32 + # Create entities with voiceprints 33 + env.create_entity( 34 + "Alice Alias", 35 + voiceprints=[ 36 + ("20240101", "143022_300", "mic_audio", 1), 37 + ("20240101", "143022_300", "mic_audio", 2), 38 + ], 39 + ) 40 + env.create_entity( 41 + "Alice Canonical", 42 + voiceprints=[ 43 + ("20240101", "143022_300", "mic_audio", 3), 44 + ], 45 + ) 46 + 47 + # Facet relationships 48 + env.create_facet_relationship( 49 + "work", 50 + "alice_alias", 51 + description="Works at Acme", 52 + attached_at=1600000000000, 53 + ) 54 + env.create_facet_relationship( 55 + "work", 56 + "alice_canonical", 57 + description="Senior engineer", 58 + attached_at=1700000000000, 59 + ) 60 + env.create_facet_relationship("personal", "alice_alias", description="Hiker") 61 + 62 + # Speaker labels referencing alias 63 + env.create_speaker_labels( 64 + "20240101", 65 + "143022_300", 66 + [ 67 + { 68 + "sentence_id": 1, 69 + "speaker": "alice_alias", 70 + "confidence": "high", 71 + "method": "acoustic", 72 + }, 73 + { 74 + "sentence_id": 2, 75 + "speaker": "alice_canonical", 76 + "confidence": "high", 77 + "method": "acoustic", 78 + }, 79 + { 80 + "sentence_id": 3, 81 + "speaker": "alice_alias", 82 + "confidence": "medium", 83 + "method": "context", 84 + }, 85 + ], 86 + ) 87 + 88 + # Speaker corrections referencing alias 89 + env.create_speaker_corrections( 90 + "20240101", 91 + "143022_300", 92 + [ 93 + { 94 + "sentence_id": 1, 95 + "original_speaker": "alice_alias", 96 + "corrected_speaker": "alice_alias", 97 + "timestamp": 1700000000000, 98 + }, 99 + ], 100 + ) 101 + 102 + result = merge_names("Alice Alias", "Alice Canonical") 103 + 104 + assert result["merged"] is True 105 + assert result["alias"] == "Alice Alias" 106 + assert result["alias_id"] == "alice_alias" 107 + assert result["canonical_id"] == "alice_canonical" 108 + assert "Alice Alias" in result["akas_added"] 109 + assert result["voiceprints_merged"] == 2 110 + assert result["labels_rewritten"] == 1 111 + assert result["corrections_rewritten"] == 1 112 + assert "work" in result["facets_merged"] 113 + assert "personal" in result["facets_moved"] 114 + assert result["errors"] == [] 115 + 116 + # Alias entity is gone 117 + assert load_journal_entity("alice_alias") is None 118 + assert "alice_alias" not in scan_journal_entities() 119 + 120 + # Canonical has merged data 121 + canonical = load_journal_entity("alice_canonical") 122 + assert "Alice Alias" in canonical["aka"] 123 + 124 + # Speaker labels rewritten 125 + labels_path = ( 126 + env.journal 127 + / "20240101" 128 + / STREAM 129 + / "143022_300" 130 + / "agents" 131 + / "speaker_labels.json" 132 + ) 133 + with open(labels_path) as f: 134 + labels = json.load(f) 135 + speakers = [l["speaker"] for l in labels["labels"]] 136 + assert "alice_alias" not in speakers 137 + assert speakers.count("alice_canonical") == 3 138 + 139 + # Corrections rewritten 140 + corr_path = ( 141 + env.journal 142 + / "20240101" 143 + / STREAM 144 + / "143022_300" 145 + / "agents" 146 + / "speaker_corrections.json" 147 + ) 148 + with open(corr_path) as f: 149 + corr = json.load(f) 150 + for c in corr["corrections"]: 151 + assert c.get("original_speaker") != "alice_alias" 152 + assert c.get("corrected_speaker") != "alice_alias" 153 + 154 + 155 + def test_alias_entity_deleted(speakers_env): 156 + """After merge, scan_journal_entities does not return alias_id.""" 157 + env = speakers_env() 158 + env.create_entity("Gone") 159 + env.create_entity("Stays") 160 + 161 + merge_names("Gone", "Stays") 162 + 163 + assert "gone" not in scan_journal_entities() 164 + assert load_journal_entity("gone") is None 165 + 166 + 167 + def test_akas_transitive(speakers_env): 168 + """Alias's existing akas are merged into canonical.""" 169 + env = speakers_env() 170 + alias_dir = env.create_entity("Alias Trans") 171 + alias_path = alias_dir / "entity.json" 172 + with open(alias_path) as f: 173 + data = json.load(f) 174 + data["aka"] = ["AT", "A.T."] 175 + with open(alias_path, "w") as f: 176 + json.dump(data, f) 177 + 178 + env.create_entity("Canonical Trans") 179 + 180 + result = merge_names("Alias Trans", "Canonical Trans") 181 + 182 + assert result["merged"] is True 183 + assert "Alias Trans" in result["akas_added"] 184 + assert "AT" in result["akas_added"] 185 + assert "A.T." in result["akas_added"] 186 + 187 + canonical = load_journal_entity("canonical_trans") 188 + assert {"Alias Trans", "AT", "A.T."} <= set(canonical["aka"]) 189 + 190 + 191 + def test_emails_merged(speakers_env): 192 + """Emails from alias are merged into canonical (union, lowercased).""" 193 + env = speakers_env() 194 + alias_dir = env.create_entity("Email Alias") 195 + canonical_dir = env.create_entity("Email Canonical") 196 + 197 + for entity_dir, emails in [ 198 + (alias_dir, ["alias@example.com", "SHARED@Example.COM"]), 199 + (canonical_dir, ["canonical@example.com", "shared@example.com"]), 200 + ]: 201 + path = entity_dir / "entity.json" 202 + with open(path) as f: 203 + data = json.load(f) 204 + data["emails"] = emails 205 + with open(path, "w") as f: 206 + json.dump(data, f) 207 + 208 + merge_names("Email Alias", "Email Canonical") 17 209 18 - def test_merge_names_cli_error_missing_entity(speakers_env): 210 + canonical = load_journal_entity("email_canonical") 211 + assert set(canonical["emails"]) == { 212 + "alias@example.com", 213 + "canonical@example.com", 214 + "shared@example.com", 215 + } 216 + 217 + 218 + # --------------------------------------------------------------------------- 219 + # Facet relationship merging 220 + # --------------------------------------------------------------------------- 221 + 222 + 223 + def test_facet_move(speakers_env): 224 + """Alias facet relationship moves to canonical when canonical has none.""" 225 + env = speakers_env() 226 + env.create_entity("Short") 227 + env.create_entity("Full Name") 228 + env.create_facet_relationship( 229 + "work", "short", description="Consultant", attached_at=1600000000000 230 + ) 231 + 232 + result = merge_names("Short", "Full Name") 233 + 234 + assert "work" in result["facets_moved"] 235 + rel_path = ( 236 + env.journal / "facets" / "work" / "entities" / "full_name" / "entity.json" 237 + ) 238 + assert rel_path.exists() 239 + with open(rel_path) as f: 240 + rel = json.load(f) 241 + assert rel["entity_id"] == "full_name" 242 + assert rel["description"] == "Consultant" 243 + 244 + # Alias dir gone 245 + assert not (env.journal / "facets" / "work" / "entities" / "short").exists() 246 + 247 + 248 + def test_facet_merge_timestamps(speakers_env): 249 + """Facet merge: earliest attached_at, latest updated_at/last_seen.""" 250 + env = speakers_env() 251 + env.create_entity("Alias P") 252 + env.create_entity("Canonical P") 253 + env.create_facet_relationship( 254 + "work", 255 + "alias_p", 256 + attached_at=1500000000000, 257 + updated_at=1800000000000, 258 + last_seen="20260301", 259 + ) 260 + env.create_facet_relationship( 261 + "work", 262 + "canonical_p", 263 + attached_at=1700000000000, 264 + updated_at=1600000000000, 265 + last_seen="20260201", 266 + ) 267 + 268 + result = merge_names("Alias P", "Canonical P") 269 + 270 + assert "work" in result["facets_merged"] 271 + rel_path = ( 272 + env.journal / "facets" / "work" / "entities" / "canonical_p" / "entity.json" 273 + ) 274 + with open(rel_path) as f: 275 + rel = json.load(f) 276 + assert rel["attached_at"] == 1500000000000 277 + assert rel["updated_at"] == 1800000000000 278 + assert rel["last_seen"] == "20260301" 279 + 280 + 281 + def test_facet_merge_description_priority(speakers_env): 282 + """Canonical description takes priority; alias fills empty.""" 283 + env = speakers_env() 284 + env.create_entity("Alias D") 285 + env.create_entity("Canonical D") 286 + 287 + # Both have descriptions: canonical's wins 288 + env.create_facet_relationship("work", "alias_d", description="From alias") 289 + env.create_facet_relationship( 290 + "work", "canonical_d", description="From canonical" 291 + ) 292 + 293 + merge_names("Alias D", "Canonical D") 294 + 295 + rel_path = ( 296 + env.journal / "facets" / "work" / "entities" / "canonical_d" / "entity.json" 297 + ) 298 + with open(rel_path) as f: 299 + rel = json.load(f) 300 + assert rel["description"] == "From canonical" 301 + 302 + 303 + def test_facet_merge_description_fallback(speakers_env): 304 + """Alias description used when canonical's is empty.""" 305 + env = speakers_env() 306 + env.create_entity("Alias E") 307 + env.create_entity("Canonical E") 308 + 309 + env.create_facet_relationship("work", "alias_e", description="Has desc") 310 + env.create_facet_relationship("work", "canonical_e", description="") 311 + 312 + merge_names("Alias E", "Canonical E") 313 + 314 + rel_path = ( 315 + env.journal / "facets" / "work" / "entities" / "canonical_e" / "entity.json" 316 + ) 317 + with open(rel_path) as f: 318 + rel = json.load(f) 319 + assert rel["description"] == "Has desc" 320 + 321 + 322 + def test_facet_observations_merged(speakers_env): 323 + """Observations from alias are appended to canonical's.""" 324 + env = speakers_env() 325 + env.create_entity("Alias Obs") 326 + env.create_entity("Canonical Obs") 327 + env.create_facet_relationship( 328 + "work", "alias_obs", observations=["Likes coffee", "Morning person"] 329 + ) 330 + env.create_facet_relationship( 331 + "work", "canonical_obs", observations=["Senior role"] 332 + ) 333 + 334 + merge_names("Alias Obs", "Canonical Obs") 335 + 336 + obs_path = ( 337 + env.journal 338 + / "facets" 339 + / "work" 340 + / "entities" 341 + / "canonical_obs" 342 + / "observations.jsonl" 343 + ) 344 + obs = [json.loads(line) for line in obs_path.read_text().strip().split("\n")] 345 + contents = [o["content"] for o in obs] 346 + assert "Senior role" in contents 347 + assert "Likes coffee" in contents 348 + assert "Morning person" in contents 349 + 350 + 351 + def test_facet_no_alias_relationship(speakers_env): 352 + """No change when only canonical has a facet relationship.""" 353 + env = speakers_env() 354 + env.create_entity("Alias None") 355 + env.create_entity("Canonical None") 356 + env.create_facet_relationship( 357 + "work", "canonical_none", description="Only me" 358 + ) 359 + 360 + result = merge_names("Alias None", "Canonical None") 361 + 362 + assert result["facets_merged"] == [] 363 + assert result["facets_moved"] == [] 364 + rel_path = ( 365 + env.journal 366 + / "facets" 367 + / "work" 368 + / "entities" 369 + / "canonical_none" 370 + / "entity.json" 371 + ) 372 + with open(rel_path) as f: 373 + rel = json.load(f) 374 + assert rel["description"] == "Only me" 375 + 376 + 377 + def test_no_alias_facet_dirs_remain(speakers_env): 378 + """After merge, no facet has a relationship directory for alias_id.""" 379 + env = speakers_env() 380 + env.create_entity("Multi Alias") 381 + env.create_entity("Multi Canon") 382 + env.create_facet_relationship("work", "multi_alias") 383 + env.create_facet_relationship("personal", "multi_alias") 384 + env.create_facet_relationship("work", "multi_canon") 385 + 386 + merge_names("Multi Alias", "Multi Canon") 387 + 388 + facets_dir = env.journal / "facets" 389 + for facet_entry in facets_dir.iterdir(): 390 + if facet_entry.is_dir(): 391 + alias_dir = facet_entry / "entities" / "multi_alias" 392 + assert not alias_dir.exists(), f"alias dir exists in {facet_entry.name}" 393 + 394 + 395 + # --------------------------------------------------------------------------- 396 + # Speaker reference rewriting 397 + # --------------------------------------------------------------------------- 398 + 399 + 400 + def test_speaker_labels_rewritten(speakers_env): 401 + """speaker_labels.json files referencing alias_id are rewritten.""" 402 + env = speakers_env() 403 + env.create_entity("Label Alias") 404 + env.create_entity("Label Canon") 405 + env.create_speaker_labels( 406 + "20240101", 407 + "143022_300", 408 + [ 409 + { 410 + "sentence_id": 1, 411 + "speaker": "label_alias", 412 + "confidence": "high", 413 + "method": "acoustic", 414 + }, 415 + { 416 + "sentence_id": 2, 417 + "speaker": "other_entity", 418 + "confidence": "high", 419 + "method": "acoustic", 420 + }, 421 + ], 422 + ) 423 + 424 + result = merge_names("Label Alias", "Label Canon") 425 + 426 + assert result["labels_rewritten"] == 1 427 + labels_path = ( 428 + env.journal 429 + / "20240101" 430 + / STREAM 431 + / "143022_300" 432 + / "agents" 433 + / "speaker_labels.json" 434 + ) 435 + with open(labels_path) as f: 436 + data = json.load(f) 437 + assert data["labels"][0]["speaker"] == "label_canon" 438 + assert data["labels"][1]["speaker"] == "other_entity" 439 + 440 + 441 + def test_speaker_corrections_rewritten(speakers_env): 442 + """speaker_corrections.json files referencing alias_id are rewritten.""" 443 + env = speakers_env() 444 + env.create_entity("Corr Alias") 445 + env.create_entity("Corr Canon") 446 + env.create_speaker_corrections( 447 + "20240101", 448 + "143022_300", 449 + [ 450 + { 451 + "sentence_id": 1, 452 + "original_speaker": "corr_alias", 453 + "corrected_speaker": "someone_else", 454 + "timestamp": 1700000000000, 455 + }, 456 + { 457 + "sentence_id": 2, 458 + "original_speaker": "someone_else", 459 + "corrected_speaker": "corr_alias", 460 + "timestamp": 1700000000000, 461 + }, 462 + ], 463 + ) 464 + 465 + result = merge_names("Corr Alias", "Corr Canon") 466 + 467 + assert result["corrections_rewritten"] == 1 468 + corr_path = ( 469 + env.journal 470 + / "20240101" 471 + / STREAM 472 + / "143022_300" 473 + / "agents" 474 + / "speaker_corrections.json" 475 + ) 476 + with open(corr_path) as f: 477 + data = json.load(f) 478 + assert data["corrections"][0]["original_speaker"] == "corr_canon" 479 + assert data["corrections"][0]["corrected_speaker"] == "someone_else" 480 + assert data["corrections"][1]["original_speaker"] == "someone_else" 481 + assert data["corrections"][1]["corrected_speaker"] == "corr_canon" 482 + 483 + 484 + def test_fast_path_skips_unrelated_files(speakers_env): 485 + """Files without alias_id bytes are not modified.""" 486 + env = speakers_env() 487 + env.create_entity("Fast Alias") 488 + env.create_entity("Fast Canon") 489 + env.create_speaker_labels( 490 + "20240101", 491 + "143022_300", 492 + [ 493 + { 494 + "sentence_id": 1, 495 + "speaker": "other_person", 496 + "confidence": "high", 497 + "method": "acoustic", 498 + }, 499 + ], 500 + ) 501 + 502 + labels_path = ( 503 + env.journal 504 + / "20240101" 505 + / STREAM 506 + / "143022_300" 507 + / "agents" 508 + / "speaker_labels.json" 509 + ) 510 + mtime_before = labels_path.stat().st_mtime_ns 511 + 512 + result = merge_names("Fast Alias", "Fast Canon") 513 + 514 + assert result["labels_rewritten"] == 0 515 + assert labels_path.stat().st_mtime_ns == mtime_before 516 + 517 + 518 + def test_corrupted_labels_logged_not_aborted(speakers_env): 519 + """Corrupted speaker_labels.json is logged as error but merge continues.""" 520 + env = speakers_env() 521 + env.create_entity("Corrupt Alias") 522 + env.create_entity("Corrupt Canon") 523 + 524 + # Write corrupted file containing the alias_id string 525 + agents_dir = env.journal / "20240101" / STREAM / "143022_300" / "agents" 526 + agents_dir.mkdir(parents=True, exist_ok=True) 527 + (agents_dir / "speaker_labels.json").write_text( 528 + "corrupt_alias {not valid json" 529 + ) 530 + 531 + result = merge_names("Corrupt Alias", "Corrupt Canon") 532 + 533 + assert result["merged"] is True 534 + assert len(result["errors"]) == 1 535 + 536 + 537 + # --------------------------------------------------------------------------- 538 + # Interrupt safety 539 + # --------------------------------------------------------------------------- 540 + 541 + 542 + def test_idempotent_rerun(speakers_env): 543 + """Running merge twice: second run is a no-op (alias gone → error).""" 544 + env = speakers_env() 545 + env.create_entity("Idem Alias") 546 + env.create_entity("Idem Canon") 547 + 548 + result1 = merge_names("Idem Alias", "Idem Canon") 549 + assert result1["merged"] is True 550 + 551 + # Second run: alias name now resolves to canonical via aka → same entity 552 + result2 = merge_names("Idem Alias", "Idem Canon") 553 + assert "error" in result2 554 + 555 + 556 + # --------------------------------------------------------------------------- 557 + # Validation and error handling 558 + # --------------------------------------------------------------------------- 559 + 560 + 561 + def test_error_self_merge(speakers_env): 562 + """Merging entity into itself returns error.""" 563 + env = speakers_env() 564 + env.create_entity("Same Person") 565 + 566 + result = merge_names("Same Person", "Same Person") 567 + assert "error" in result 568 + 569 + 570 + def test_error_blocked_entity(speakers_env): 571 + """Merging a blocked entity returns error.""" 572 + env = speakers_env() 573 + entity_dir = env.create_entity("Blocked One") 574 + path = entity_dir / "entity.json" 575 + with open(path) as f: 576 + data = json.load(f) 577 + data["blocked"] = True 578 + with open(path, "w") as f: 579 + json.dump(data, f) 580 + 581 + env.create_entity("Target One") 582 + 583 + result = merge_names("Blocked One", "Target One") 584 + assert "error" in result 585 + assert "blocked" in result["error"].lower() 586 + 587 + 588 + def test_error_principal_as_alias(speakers_env): 589 + """Merging the principal entity (as alias) returns error.""" 590 + env = speakers_env() 591 + env.create_entity("Self Person", is_principal=True) 592 + env.create_entity("Other Person") 593 + 594 + result = merge_names("Self Person", "Other Person") 595 + assert "error" in result 596 + assert "principal" in result["error"].lower() 597 + 598 + 599 + def test_error_principal_as_canonical(speakers_env): 600 + """Merging into the principal entity (as canonical) returns error.""" 601 + env = speakers_env() 602 + env.create_entity("Other Person") 603 + env.create_entity("Self Person", is_principal=True) 604 + 605 + result = merge_names("Other Person", "Self Person") 606 + assert "error" in result 607 + assert "principal" in result["error"].lower() 608 + 609 + 610 + def test_error_nonexistent_entity(speakers_env): 611 + """Merging non-existent entity returns error.""" 612 + env = speakers_env() 613 + env.create_entity("Real Person") 614 + 615 + result = merge_names("Ghost", "Real Person") 616 + assert "error" in result 617 + 618 + 619 + # --------------------------------------------------------------------------- 620 + # Cleanup 621 + # --------------------------------------------------------------------------- 622 + 623 + 624 + def test_discovery_cache_busted(speakers_env): 625 + """Discovery cache is deleted after merge.""" 626 + env = speakers_env() 627 + env.create_entity("Cache Alias") 628 + env.create_entity("Cache Canon") 629 + 630 + awareness_dir = env.journal / "awareness" 631 + awareness_dir.mkdir(parents=True, exist_ok=True) 632 + cache_path = awareness_dir / "discovery_clusters.json" 633 + cache_path.write_text('{"clusters": []}') 634 + 635 + merge_names("Cache Alias", "Cache Canon") 636 + 637 + assert not cache_path.exists() 638 + 639 + 640 + # --------------------------------------------------------------------------- 641 + # resolve_name_variants integration 642 + # --------------------------------------------------------------------------- 643 + 644 + 645 + def test_resolve_name_variants_deep_merge(speakers_env): 646 + """resolve_name_variants(dry_run=False) calls deep merge (alias deleted).""" 647 + env = speakers_env() 648 + 649 + # Create two entities with identical voiceprints 650 + embedding = np.random.default_rng(42).standard_normal(256).astype(np.float32) 651 + embedding = embedding / np.linalg.norm(embedding) 652 + embeddings = np.tile(embedding.reshape(1, -1), (5, 1)) 653 + 654 + alias_dir = env.create_entity("Alice") 655 + canonical_dir = env.create_entity("Alice Johnson") 656 + 657 + meta_a = np.array( 658 + [ 659 + json.dumps( 660 + { 661 + "day": "20240101", 662 + "segment_key": "143022_300", 663 + "source": "mic_audio", 664 + "sentence_id": i, 665 + } 666 + ) 667 + for i in range(5) 668 + ], 669 + dtype=str, 670 + ) 671 + meta_b = np.array( 672 + [ 673 + json.dumps( 674 + { 675 + "day": "20240101", 676 + "segment_key": "143022_300", 677 + "source": "mic_audio", 678 + "sentence_id": i + 10, 679 + } 680 + ) 681 + for i in range(5) 682 + ], 683 + dtype=str, 684 + ) 685 + np.savez_compressed( 686 + alias_dir / "voiceprints.npz", embeddings=embeddings, metadata=meta_a 687 + ) 688 + np.savez_compressed( 689 + canonical_dir / "voiceprints.npz", embeddings=embeddings, metadata=meta_b 690 + ) 691 + 692 + from apps.speakers.bootstrap import resolve_name_variants 693 + 694 + stats = resolve_name_variants(dry_run=False) 695 + 696 + assert len(stats["auto_merged"]) == 1 697 + 698 + # Deep merge: alias entity should be deleted 699 + assert load_journal_entity("alice") is None 700 + assert "alice" not in scan_journal_entities() 701 + 702 + 703 + def test_resolve_name_variants_dry_run_unchanged(speakers_env): 704 + """resolve_name_variants(dry_run=True) does not delete entities.""" 705 + env = speakers_env() 706 + 707 + embedding = np.random.default_rng(42).standard_normal(256).astype(np.float32) 708 + embedding = embedding / np.linalg.norm(embedding) 709 + embeddings = np.tile(embedding.reshape(1, -1), (5, 1)) 710 + 711 + alias_dir = env.create_entity("Bob") 712 + canonical_dir = env.create_entity("Bob Smith") 713 + 714 + meta_a = np.array( 715 + [ 716 + json.dumps( 717 + { 718 + "day": "20240101", 719 + "segment_key": "143022_300", 720 + "source": "mic_audio", 721 + "sentence_id": i, 722 + } 723 + ) 724 + for i in range(5) 725 + ], 726 + dtype=str, 727 + ) 728 + meta_b = np.array( 729 + [ 730 + json.dumps( 731 + { 732 + "day": "20240101", 733 + "segment_key": "143022_300", 734 + "source": "mic_audio", 735 + "sentence_id": i + 10, 736 + } 737 + ) 738 + for i in range(5) 739 + ], 740 + dtype=str, 741 + ) 742 + np.savez_compressed( 743 + alias_dir / "voiceprints.npz", embeddings=embeddings, metadata=meta_a 744 + ) 745 + np.savez_compressed( 746 + canonical_dir / "voiceprints.npz", embeddings=embeddings, metadata=meta_b 747 + ) 748 + 749 + from apps.speakers.bootstrap import resolve_name_variants 750 + 751 + stats = resolve_name_variants(dry_run=True) 752 + 753 + assert len(stats["auto_merged"]) == 1 754 + # Dry run: both entities still exist 755 + assert load_journal_entity("bob") is not None 756 + assert load_journal_entity("bob_smith") is not None 757 + 758 + 759 + # --------------------------------------------------------------------------- 760 + # CLI 761 + # --------------------------------------------------------------------------- 762 + 763 + 764 + def test_cli_error_missing_entity(speakers_env): 19 765 """CLI merge-names outputs error JSON and exits 1 for unknown entity.""" 20 766 speakers_env() 21 767 result = _runner.invoke(speakers_app, ["merge-names", "Nobody", "Also Nobody"]) 22 768 assert result.exit_code == 1 23 769 24 770 25 - def test_merge_names_cli_success(speakers_env): 26 - """CLI merge-names outputs JSON with merged=True on success.""" 771 + def test_cli_success(speakers_env): 772 + """CLI merge-names outputs JSON with deep merge fields on success.""" 27 773 env = speakers_env() 28 774 entity_a = env.create_entity("Alice Alias") 29 775 entity_b = env.create_entity("Alice Canonical") 30 776 31 777 emb_a = np.random.default_rng(42).standard_normal((3, 256)).astype(np.float32) 32 778 emb_b = np.random.default_rng(99).standard_normal((3, 256)).astype(np.float32) 33 - meta_a = np.array([json.dumps({"key": f"a_{i}"}) for i in range(3)], dtype=str) 34 - meta_b = np.array([json.dumps({"key": f"b_{i}"}) for i in range(3)], dtype=str) 35 - np.savez_compressed(entity_a / "voiceprints.npz", embeddings=emb_a, metadata=meta_a) 36 - np.savez_compressed(entity_b / "voiceprints.npz", embeddings=emb_b, metadata=meta_b) 779 + meta_a = np.array( 780 + [json.dumps({"key": f"a_{i}"}) for i in range(3)], dtype=str 781 + ) 782 + meta_b = np.array( 783 + [json.dumps({"key": f"b_{i}"}) for i in range(3)], dtype=str 784 + ) 785 + np.savez_compressed( 786 + entity_a / "voiceprints.npz", embeddings=emb_a, metadata=meta_a 787 + ) 788 + np.savez_compressed( 789 + entity_b / "voiceprints.npz", embeddings=emb_b, metadata=meta_b 790 + ) 37 791 38 792 result = _runner.invoke( 39 793 speakers_app, ["merge-names", "Alice Alias", "Alice Canonical"] ··· 42 796 data = json.loads(result.output) 43 797 assert data["merged"] is True 44 798 assert data["canonical_name"] == "Alice Canonical" 799 + assert data["alias_id"] == "alice_alias" 800 + assert "segments_scanned" in data 801 + assert "facets_merged" in data