personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(entities): add sol call entities merge primitive

Lifts the deep-merge algorithm from apps/speakers/bootstrap.py:merge_names
into think/entities/merge.py as a slug-keyed primitive merge_entity(...),
exposed via sol call entities merge SOURCE_SLUG TARGET_SLUG. Default-safe
(--no-commit); --commit required to mutate. Reduces sol call speakers
merge-names to a name-resolving shim that preserves its existing contract
including the any-principal rejection.

Policy deltas vs. the original merge_names:
- source-principal + target-not: transfers principal to target
- both-principal: error, no mutation
- --no-keep-source-as-aka: skips adding source display name to target.aka
while still merging source's own aka list
- rejects the merge if any other entity's aka list references the source
slug or display name, listing every offending entity id

Lifts _dedupe_akas / _dedupe_emails / _dedupe_observations from
think/merge.py:_merge_entities into think/entities/merge.py so merge_journals
and merge_entity share one implementation.

Extracts voiceprint I/O (load_existing_voiceprint_keys,
save_voiceprints_batch, save_voiceprints_safely, normalize_embedding,
load_entity_voiceprints_file) into think/entities/voiceprints.py so the
entity merge primitive does not reach up into apps/speakers.

Audit-log line appended to journal/logs/entity-merges.jsonl on successful
commit only; dry-run prints a plan JSON with would_* counts and zero disk
mutation.

Updates the L2 write-owner table in docs/coding-standards.md.

+1640 -547
+28
apps/entities/call.py
··· 6 6 Auto-discovered by ``think.call`` and mounted as ``sol call entities ...``. 7 7 """ 8 8 9 + import json 9 10 import re 10 11 import shutil 11 12 from pathlib import Path ··· 436 437 """Consolidate segment-detected entities into journal identities.""" 437 438 n = consolidate_detected_entities(get_journal(), full=full) 438 439 typer.echo(f"Wrote {n} new entities.") 440 + 441 + 442 + @app.command("merge") 443 + def merge( 444 + source_slug: str = typer.Argument(help="Source entity slug to merge from."), 445 + target_slug: str = typer.Argument(help="Target entity slug to merge into."), 446 + commit: bool = typer.Option(False, "--commit/--no-commit"), 447 + keep_source_as_aka: bool = typer.Option( 448 + True, 449 + "--keep-source-as-aka/--no-keep-source-as-aka", 450 + ), 451 + ) -> None: 452 + """Plan or commit a journal-entity merge.""" 453 + from think.entities import merge_entity 454 + 455 + result = merge_entity( 456 + source_slug, 457 + target_slug, 458 + keep_source_as_aka=keep_source_as_aka, 459 + commit=commit, 460 + caller="entities.merge", 461 + ) 462 + output = json.dumps(result, indent=2, default=str) 463 + if "error" in result: 464 + typer.echo(output, err=True) 465 + raise typer.Exit(1) 466 + typer.echo(output) 439 467 440 468 441 469 @app.command("observations")
+50 -3
apps/entities/tests/conftest.py
··· 6 6 from __future__ import annotations 7 7 8 8 import json 9 + import sys 10 + from pathlib import Path 9 11 10 12 import pytest 11 13 12 - from think.entities.observations import add_observation, save_observations 14 + ROOT = Path(__file__).resolve().parents[3] 15 + if str(ROOT) not in sys.path: 16 + sys.path.insert(0, str(ROOT)) 17 + 18 + from apps.speakers.tests.conftest import speakers_env as _speakers_env 19 + from think.entities.journal import clear_journal_entity_cache 20 + from think.entities.loading import clear_entity_loading_cache 21 + from think.entities.observations import ( 22 + add_observation, 23 + clear_observation_cache, 24 + save_observations, 25 + ) 26 + from think.entities.relationships import clear_relationship_caches 13 27 from think.entities.saving import save_entities 28 + 29 + 30 + @pytest.fixture 31 + def speakers_env(tmp_path, monkeypatch): 32 + yield from _speakers_env.__wrapped__(tmp_path, monkeypatch) 14 33 15 34 16 35 @pytest.fixture ··· 25 44 # _SOLSTONE_JOURNAL_OVERRIDE is set, entity files exist 26 45 """ 27 46 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 47 + clear_journal_entity_cache() 48 + clear_entity_loading_cache() 49 + clear_relationship_caches() 50 + clear_observation_cache() 51 + import think.utils 52 + 53 + think.utils._journal_path_cache = None 28 54 29 55 def _create( 30 56 attached: list[dict] | None = None, ··· 43 69 add_observation(facet, observation_entity, content, i) 44 70 return tmp_path 45 71 46 - return _create 72 + yield _create 73 + clear_journal_entity_cache() 74 + clear_entity_loading_cache() 75 + clear_relationship_caches() 76 + clear_observation_cache() 77 + import think.utils 78 + 79 + think.utils._journal_path_cache = None 47 80 48 81 49 82 @pytest.fixture 50 83 def entity_move_env(tmp_path, monkeypatch): 51 84 """Create a two-facet environment for entity move tests.""" 52 85 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 86 + clear_journal_entity_cache() 87 + clear_entity_loading_cache() 88 + clear_relationship_caches() 89 + clear_observation_cache() 90 + import think.utils 91 + 92 + think.utils._journal_path_cache = None 53 93 54 94 def _create( 55 95 entity_name: str = "Alice Johnson", ··· 87 127 88 128 return tmp_path, src_facet, dst_facet, entity_name 89 129 90 - return _create 130 + yield _create 131 + clear_journal_entity_cache() 132 + clear_entity_loading_cache() 133 + clear_relationship_caches() 134 + clear_observation_cache() 135 + import think.utils 136 + 137 + think.utils._journal_path_cache = None
+435
apps/entities/tests/test_merge.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for ``sol call entities merge``.""" 5 + 6 + from __future__ import annotations 7 + 8 + import json 9 + 10 + import numpy as np 11 + from typer.testing import CliRunner 12 + 13 + from apps.entities.call import app as entities_app 14 + from think.entities.journal import load_journal_entity 15 + 16 + runner = CliRunner() 17 + STREAM = "test" 18 + 19 + 20 + def _read_json(path): 21 + return json.loads(path.read_text(encoding="utf-8")) 22 + 23 + 24 + def _write_json(path, payload) -> None: 25 + path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") 26 + 27 + 28 + def _entity_path(env, entity_id: str): 29 + return env.journal / "entities" / entity_id / "entity.json" 30 + 31 + 32 + def _update_entity(env, entity_id: str, **fields) -> None: 33 + path = _entity_path(env, entity_id) 34 + payload = _read_json(path) 35 + payload.update(fields) 36 + _write_json(path, payload) 37 + 38 + 39 + def _labels_path(env, day: str, segment_key: str): 40 + return env.journal / day / STREAM / segment_key / "talents" / "speaker_labels.json" 41 + 42 + 43 + def _corrections_path(env, day: str, segment_key: str): 44 + return ( 45 + env.journal 46 + / day 47 + / STREAM 48 + / segment_key 49 + / "talents" 50 + / "speaker_corrections.json" 51 + ) 52 + 53 + 54 + def _audit_log_path(env): 55 + return env.journal / "logs" / "entity-merges.jsonl" 56 + 57 + 58 + def _voiceprint_count(env, entity_id: str) -> int: 59 + path = env.journal / "entities" / entity_id / "voiceprints.npz" 60 + with np.load(path, allow_pickle=False) as data: 61 + return len(data["embeddings"]) 62 + 63 + 64 + def test_merge_dry_run_plans_without_writing(speakers_env): 65 + env = speakers_env() 66 + env.create_segment("20240101", "143022_300", ["mic_audio"]) 67 + env.create_entity( 68 + "Dry Alias", 69 + voiceprints=[ 70 + ("20240101", "143022_300", "mic_audio", 1), 71 + ("20240101", "143022_300", "mic_audio", 2), 72 + ], 73 + ) 74 + env.create_entity( 75 + "Dry Canon", 76 + voiceprints=[("20240101", "143022_300", "mic_audio", 3)], 77 + ) 78 + env.create_facet_relationship( 79 + "work", 80 + "dry_alias", 81 + observations=["Likes coffee"], 82 + ) 83 + env.create_facet_relationship( 84 + "work", 85 + "dry_canon", 86 + observations=["Senior role"], 87 + ) 88 + env.create_facet_relationship("personal", "dry_alias", description="Runner") 89 + env.create_speaker_labels( 90 + "20240101", 91 + "143022_300", 92 + [ 93 + { 94 + "sentence_id": 1, 95 + "speaker": "dry_alias", 96 + "confidence": "high", 97 + "method": "acoustic", 98 + } 99 + ], 100 + ) 101 + env.create_speaker_corrections( 102 + "20240101", 103 + "143022_300", 104 + [ 105 + { 106 + "sentence_id": 1, 107 + "original_speaker": "dry_alias", 108 + "corrected_speaker": "dry_alias", 109 + "timestamp": 1700000000000, 110 + } 111 + ], 112 + ) 113 + cache_path = env.journal / "awareness" / "discovery_clusters.json" 114 + cache_path.parent.mkdir(parents=True, exist_ok=True) 115 + cache_path.write_text('{"clusters": []}', encoding="utf-8") 116 + 117 + source_before = _entity_path(env, "dry_alias").read_text(encoding="utf-8") 118 + target_before = _entity_path(env, "dry_canon").read_text(encoding="utf-8") 119 + labels_before = _labels_path(env, "20240101", "143022_300").read_text( 120 + encoding="utf-8" 121 + ) 122 + corrections_before = _corrections_path(env, "20240101", "143022_300").read_text( 123 + encoding="utf-8" 124 + ) 125 + 126 + result = runner.invoke(entities_app, ["merge", "dry_alias", "dry_canon"]) 127 + 128 + assert result.exit_code == 0, f"{result.output}\n{result.exception!r}" 129 + data = json.loads(result.output) 130 + assert data["merged"] is False 131 + assert data["identity"]["akas_added"] == [] 132 + assert data["voiceprints"]["added"] == 0 133 + assert data["facets"]["moved"] == [] 134 + assert data["segments"]["files_scanned"] == 0 135 + assert "Dry Alias" in data["would_identity"]["akas_added"] 136 + assert data["would_voiceprints"]["added"] == 2 137 + assert data["would_facets"]["merged"] == ["work"] 138 + assert data["would_facets"]["moved"] == ["personal"] 139 + assert data["would_segments"]["labels_rewritten"] == 1 140 + assert data["would_segments"]["corrections_rewritten"] == 1 141 + assert data["audit_log_path"] is None 142 + assert data["caches_cleared"] == [] 143 + 144 + assert _entity_path(env, "dry_alias").read_text(encoding="utf-8") == source_before 145 + assert _entity_path(env, "dry_canon").read_text(encoding="utf-8") == target_before 146 + assert ( 147 + _labels_path(env, "20240101", "143022_300").read_text(encoding="utf-8") 148 + == labels_before 149 + ) 150 + assert ( 151 + _corrections_path(env, "20240101", "143022_300").read_text(encoding="utf-8") 152 + == corrections_before 153 + ) 154 + assert cache_path.exists() 155 + assert load_journal_entity("dry_alias") is not None 156 + assert not _audit_log_path(env).exists() 157 + 158 + 159 + def test_merge_commit_deep_merges_and_logs(speakers_env): 160 + env = speakers_env() 161 + env.create_segment("20240101", "143022_300", ["mic_audio"]) 162 + env.create_entity( 163 + "Alice Alias", 164 + voiceprints=[ 165 + ("20240101", "143022_300", "mic_audio", 1), 166 + ("20240101", "143022_300", "mic_audio", 2), 167 + ], 168 + ) 169 + env.create_entity( 170 + "Alice Canonical", 171 + voiceprints=[("20240101", "143022_300", "mic_audio", 3)], 172 + ) 173 + env.create_facet_relationship( 174 + "work", 175 + "alice_alias", 176 + description="Works at Acme", 177 + attached_at=1600000000000, 178 + observations=["Likes coffee", "Morning person"], 179 + ) 180 + env.create_facet_relationship( 181 + "work", 182 + "alice_canonical", 183 + description="Senior engineer", 184 + attached_at=1700000000000, 185 + observations=["Staff role"], 186 + ) 187 + env.create_facet_relationship("personal", "alice_alias", description="Hiker") 188 + env.create_speaker_labels( 189 + "20240101", 190 + "143022_300", 191 + [ 192 + { 193 + "sentence_id": 1, 194 + "speaker": "alice_alias", 195 + "confidence": "high", 196 + "method": "acoustic", 197 + }, 198 + { 199 + "sentence_id": 2, 200 + "speaker": "alice_canonical", 201 + "confidence": "high", 202 + "method": "acoustic", 203 + }, 204 + { 205 + "sentence_id": 3, 206 + "speaker": "alice_alias", 207 + "confidence": "medium", 208 + "method": "context", 209 + }, 210 + ], 211 + ) 212 + env.create_speaker_corrections( 213 + "20240101", 214 + "143022_300", 215 + [ 216 + { 217 + "sentence_id": 1, 218 + "original_speaker": "alice_alias", 219 + "corrected_speaker": "alice_alias", 220 + "timestamp": 1700000000000, 221 + }, 222 + ], 223 + ) 224 + cache_path = env.journal / "awareness" / "discovery_clusters.json" 225 + cache_path.parent.mkdir(parents=True, exist_ok=True) 226 + cache_path.write_text('{"clusters": []}', encoding="utf-8") 227 + 228 + result = runner.invoke( 229 + entities_app, 230 + ["merge", "alice_alias", "alice_canonical", "--commit"], 231 + ) 232 + 233 + assert result.exit_code == 0, f"{result.output}\n{result.exception!r}" 234 + data = json.loads(result.output) 235 + assert data["merged"] is True 236 + assert "Alice Alias" in data["identity"]["akas_added"] 237 + assert data["voiceprints"]["added"] == 2 238 + assert data["voiceprints"]["target_total"] == 3 239 + assert "work" in data["facets"]["merged"] 240 + assert "personal" in data["facets"]["moved"] 241 + assert data["facets"]["observations_appended"] == 2 242 + assert data["segments"]["labels_rewritten"] == 1 243 + assert data["segments"]["corrections_rewritten"] == 1 244 + assert data["segments"]["errors"] == [] 245 + assert data["audit_log_path"] == str(_audit_log_path(env)) 246 + assert set(data["caches_cleared"]) >= { 247 + "journal_entity_cache", 248 + "relationship_caches", 249 + "observation_cache", 250 + "entity_loading_cache", 251 + "discovery_clusters", 252 + } 253 + 254 + assert load_journal_entity("alice_alias") is None 255 + canonical = load_journal_entity("alice_canonical") 256 + assert canonical is not None 257 + assert "Alice Alias" in canonical["aka"] 258 + 259 + assert _voiceprint_count(env, "alice_canonical") == 3 260 + 261 + labels = _read_json(_labels_path(env, "20240101", "143022_300")) 262 + speakers = [label["speaker"] for label in labels["labels"]] 263 + assert "alice_alias" not in speakers 264 + assert speakers.count("alice_canonical") == 3 265 + 266 + corrections = _read_json(_corrections_path(env, "20240101", "143022_300")) 267 + for correction in corrections["corrections"]: 268 + assert correction.get("original_speaker") != "alice_alias" 269 + assert correction.get("corrected_speaker") != "alice_alias" 270 + 271 + observations_path = ( 272 + env.journal 273 + / "facets" 274 + / "work" 275 + / "entities" 276 + / "alice_canonical" 277 + / "observations.jsonl" 278 + ) 279 + contents = [ 280 + json.loads(line)["content"] 281 + for line in observations_path.read_text(encoding="utf-8").splitlines() 282 + if line.strip() 283 + ] 284 + assert set(contents) == {"Staff role", "Likes coffee", "Morning person"} 285 + assert not cache_path.exists() 286 + 287 + audit_entries = [ 288 + json.loads(line) 289 + for line in _audit_log_path(env).read_text(encoding="utf-8").splitlines() 290 + if line.strip() 291 + ] 292 + assert len(audit_entries) == 1 293 + assert isinstance(audit_entries[0]["ts"], int) 294 + assert audit_entries[0]["caller"] == "entities.merge" 295 + assert audit_entries[0]["source_id"] == "alice_alias" 296 + assert audit_entries[0]["source_display_name"] == "Alice Alias" 297 + assert audit_entries[0]["target_id"] == "alice_canonical" 298 + assert audit_entries[0]["target_display_name"] == "Alice Canonical" 299 + assert audit_entries[0]["principal_transferred"] is False 300 + assert set(audit_entries[0]["counts"]) == { 301 + "identity", 302 + "voiceprints", 303 + "facets", 304 + "segments", 305 + } 306 + 307 + 308 + def test_merge_default_keeps_source_as_aka(speakers_env): 309 + env = speakers_env() 310 + env.create_entity("Keep Alias") 311 + env.create_entity("Keep Canon") 312 + 313 + result = runner.invoke( 314 + entities_app, 315 + ["merge", "keep_alias", "keep_canon", "--commit"], 316 + ) 317 + 318 + assert result.exit_code == 0, f"{result.output}\n{result.exception!r}" 319 + canonical = load_journal_entity("keep_canon") 320 + assert canonical is not None 321 + assert "Keep Alias" in canonical["aka"] 322 + 323 + 324 + def test_merge_no_keep_source_as_aka_keeps_only_existing_aliases(speakers_env): 325 + env = speakers_env() 326 + env.create_entity("Skip Alias") 327 + env.create_entity("Skip Canon") 328 + _update_entity(env, "skip_alias", aka=["SA", "S.A."]) 329 + 330 + result = runner.invoke( 331 + entities_app, 332 + [ 333 + "merge", 334 + "skip_alias", 335 + "skip_canon", 336 + "--commit", 337 + "--no-keep-source-as-aka", 338 + ], 339 + ) 340 + 341 + assert result.exit_code == 0, f"{result.output}\n{result.exception!r}" 342 + canonical = load_journal_entity("skip_canon") 343 + assert canonical is not None 344 + assert "Skip Alias" not in canonical.get("aka", []) 345 + assert {"SA", "S.A."} <= set(canonical["aka"]) 346 + 347 + 348 + def test_merge_transfers_principal_from_source_to_target(speakers_env): 349 + env = speakers_env() 350 + env.create_entity("Principal Source", is_principal=True) 351 + env.create_entity("Principal Target") 352 + 353 + result = runner.invoke( 354 + entities_app, 355 + ["merge", "principal_source", "principal_target", "--commit"], 356 + ) 357 + 358 + assert result.exit_code == 0, f"{result.output}\n{result.exception!r}" 359 + data = json.loads(result.output) 360 + assert data["identity"]["principal_transferred"] is True 361 + assert load_journal_entity("principal_source") is None 362 + target = load_journal_entity("principal_target") 363 + assert target is not None 364 + assert target["is_principal"] is True 365 + 366 + 367 + def test_merge_errors_when_both_entities_are_principal(speakers_env): 368 + env = speakers_env() 369 + env.create_entity("First Principal", is_principal=True) 370 + env.create_entity("Second Principal", is_principal=True) 371 + 372 + result = runner.invoke( 373 + entities_app, 374 + ["merge", "first_principal", "second_principal", "--commit"], 375 + ) 376 + 377 + assert result.exit_code == 1, f"{result.output}\n{result.exception!r}" 378 + data = json.loads(result.output) 379 + assert data["error"] == "Cannot merge two principal entities." 380 + assert load_journal_entity("first_principal") is not None 381 + assert load_journal_entity("second_principal") is not None 382 + 383 + 384 + def test_merge_errors_on_aka_cross_reference(speakers_env): 385 + env = speakers_env() 386 + env.create_entity("Cross Source") 387 + env.create_entity("Cross Target") 388 + env.create_entity("Cross Watcher") 389 + _update_entity(env, "cross_watcher", aka=["cross_source", "Watcher Alias"]) 390 + 391 + result = runner.invoke( 392 + entities_app, 393 + ["merge", "cross_source", "cross_target", "--commit"], 394 + ) 395 + 396 + assert result.exit_code == 1, f"{result.output}\n{result.exception!r}" 397 + data = json.loads(result.output) 398 + assert ( 399 + data["error"] 400 + == "Cannot merge 'cross_source': referenced in aka lists of entity ids: cross_watcher" 401 + ) 402 + assert load_journal_entity("cross_source") is not None 403 + assert load_journal_entity("cross_target") is not None 404 + 405 + 406 + def test_merge_validation_errors(speakers_env): 407 + env = speakers_env() 408 + env.create_entity("Blocked Source") 409 + env.create_entity("Validation Target") 410 + _update_entity(env, "blocked_source", blocked=True) 411 + 412 + cases = [ 413 + ( 414 + ["merge", "validation_target", "validation_target", "--commit"], 415 + "Source and target must be different entities.", 416 + ), 417 + ( 418 + ["merge", "missing_source", "validation_target", "--commit"], 419 + "Source entity not found: missing_source", 420 + ), 421 + ( 422 + ["merge", "validation_target", "missing_target", "--commit"], 423 + "Target entity not found: missing_target", 424 + ), 425 + ( 426 + ["merge", "blocked_source", "validation_target", "--commit"], 427 + "Cannot merge blocked entity: blocked_source", 428 + ), 429 + ] 430 + 431 + for argv, expected_error in cases: 432 + result = runner.invoke(entities_app, argv) 433 + assert result.exit_code == 1, f"{result.output}\n{result.exception!r}" 434 + data = json.loads(result.output) 435 + assert data["error"] == expected_error
+5 -5
apps/speakers/attribution.py
··· 547 547 548 548 Returns dict mapping entity_id -> number of new embeddings saved. 549 549 """ 550 - from apps.speakers.bootstrap import ( 551 - _load_existing_voiceprint_keys, 552 - _save_voiceprints_batch, 550 + from think.entities import ( 551 + load_existing_voiceprint_keys, 552 + save_voiceprints_batch, 553 553 ) 554 554 555 555 ( ··· 612 612 613 613 # Idempotency check 614 614 if speaker not in entity_existing: 615 - entity_existing[speaker] = _load_existing_voiceprint_keys(speaker) 615 + entity_existing[speaker] = load_existing_voiceprint_keys(speaker) 616 616 vp_key = (day, segment_key, source, sid) 617 617 if vp_key in entity_existing[speaker]: 618 618 continue ··· 630 630 631 631 for eid, items in entity_new.items(): 632 632 try: 633 - count = _save_voiceprints_batch(eid, items) 633 + count = save_voiceprints_batch(eid, items) 634 634 saved_counts[eid] = count 635 635 except Exception as exc: 636 636 logger.warning("Failed to accumulate voiceprints for %s: %s", eid, exc)
+53 -393
apps/speakers/bootstrap.py
··· 24 24 import bisect 25 25 import json 26 26 import logging 27 - import shutil 28 27 from collections import defaultdict 29 28 from pathlib import Path 30 29 from typing import Any ··· 35 34 from think.entities import entity_slug, find_matching_entity, is_name_variant_match 36 35 from think.entities.journal import ( 37 36 create_journal_entity, 38 - ensure_journal_entity_memory, 39 37 load_all_journal_entities, 40 38 load_journal_entity, 41 39 save_journal_entity, 42 40 ) 43 - from think.utils import day_dirs, get_journal, iter_segments, now_ms, segment_path 41 + from think.utils import day_dirs, now_ms, segment_path 44 42 45 43 logger = logging.getLogger(__name__) 46 44 ··· 48 46 NAME_MERGE_THRESHOLD = 0.90 49 47 50 48 51 - def _routes_helpers(): 52 - """Load speakers route helpers lazily to avoid import cycles.""" 53 - from apps.speakers.routes import ( 54 - _load_embeddings_file, 55 - _load_entity_voiceprints_file, 56 - _normalize_embedding, 57 - _scan_segment_embeddings, 58 - ) 59 - 60 - return ( 61 - _load_embeddings_file, 62 - _normalize_embedding, 63 - _scan_segment_embeddings, 64 - _load_entity_voiceprints_file, 65 - ) 66 - 67 - 68 - def _load_existing_voiceprint_keys(entity_id: str) -> set[tuple]: 69 - """Load already-saved voiceprint keys for idempotency. 70 - 71 - Returns set of (day, segment_key, source, sentence_id) tuples. 72 - """ 73 - _, _, _, load_entity_voiceprints_file = _routes_helpers() 74 - 75 - result = load_entity_voiceprints_file(entity_id) 76 - if result is None: 77 - return set() 78 - 79 - _, metadata_list = result 80 - return { 81 - (m.get("day"), m.get("segment_key"), m.get("source"), m.get("sentence_id")) 82 - for m in metadata_list 83 - } 84 - 85 - 86 - def _save_voiceprints_batch( 87 - entity_id: str, 88 - new_items: list[tuple[np.ndarray, dict]], 89 - ) -> int: 90 - """Save multiple voiceprints to an entity's voiceprints.npz in one write. 91 - 92 - Args: 93 - entity_id: Entity ID (slug) 94 - new_items: List of (normalized_embedding, metadata_dict) tuples 95 - 96 - Returns: 97 - Number of embeddings saved 98 - """ 99 - if not new_items: 100 - return 0 101 - 102 - folder = ensure_journal_entity_memory(entity_id) 103 - npz_path = folder / "voiceprints.npz" 104 - 105 - # Load existing voiceprints 106 - if npz_path.exists(): 107 - try: 108 - # Use np.load with allow_pickle=False for safety, adjust if metadata requires it. 109 - with np.load(npz_path, allow_pickle=False) as data: 110 - existing_emb = data["embeddings"] 111 - # Existing metadata was likely saved as JSON strings. Deserialize them. 112 - # Assuming np.load returns an array of strings if saved as dtype=str. 113 - existing_meta_strings = data["metadata"] 114 - existing_meta_dicts = [json.loads(m) for m in existing_meta_strings] 115 - except (FileNotFoundError, ValueError, np.lib.npyio.NpzFile) as e: 116 - logger.warning( 117 - f"Failed to load existing voiceprints for {entity_id} from {npz_path}: {e}. Starting fresh." 118 - ) 119 - existing_emb = np.empty((0, 256), dtype=np.float32) 120 - existing_meta_dicts = [] 121 - except Exception as e: # Catch other potential errors during loading 122 - logger.error( 123 - f"Unexpected error loading existing voiceprints for {entity_id} from {npz_path}: {e}" 124 - ) 125 - raise 126 - else: 127 - existing_emb = np.empty((0, 256), dtype=np.float32) 128 - existing_meta_dicts = [] 129 - 130 - # Prepare new embeddings and metadata dicts 131 - new_emb_list = [] 132 - new_meta_dicts = [] 133 - for emb, meta_dict in new_items: 134 - new_emb_list.append(emb.reshape(1, -1).astype(np.float32)) 135 - new_meta_dicts.append(meta_dict) 136 - 137 - # Combine existing and new data 138 - if new_emb_list: 139 - new_emb_np = np.vstack(new_emb_list) 140 - combined_emb = ( 141 - np.vstack([existing_emb, new_emb_np]) 142 - if len(existing_emb) > 0 143 - else new_emb_np 144 - ) 145 - # Combine the metadata dictionaries 146 - combined_meta_dicts = existing_meta_dicts + new_meta_dicts 147 - else: # Should not happen if new_items is not empty, but for safety 148 - combined_emb = existing_emb 149 - combined_meta_dicts = existing_meta_dicts 150 - 151 - # Use the new safe saving utility 152 - try: 153 - # Import the utility function 154 - from apps.speakers.voiceprint_io import save_voiceprints_safely 155 - 156 - save_voiceprints_safely( 157 - npz_path=npz_path, 158 - embeddings=combined_emb, 159 - metadata=combined_meta_dicts, # Pass metadata as a list of dicts 160 - ) 161 - return len(new_items) 162 - except Exception as e: 163 - logger.error(f"Failed to safely save voiceprints for {entity_id}: {e}") 164 - # The save_voiceprints_safely function already logs critical errors and re-raises. 165 - # We re-raise here to propagate the failure. 166 - raise 167 - 168 - 169 49 def bootstrap_voiceprints(dry_run: bool = False) -> dict[str, Any]: 170 50 """Bootstrap voiceprints from 1-listed-speaker segments across the full journal. 171 51 ··· 186 66 Returns: 187 67 Dict with statistics about the bootstrap run 188 68 """ 189 - ( 190 - load_embeddings_file, 69 + from apps.speakers.routes import _load_embeddings_file, _scan_segment_embeddings 70 + from think.entities import ( 71 + load_existing_voiceprint_keys, 191 72 normalize_embedding, 192 - scan_segment_embeddings, 193 - _, 194 - ) = _routes_helpers() 73 + save_voiceprints_batch, 74 + ) 75 + 76 + load_embeddings_file = _load_embeddings_file 77 + scan_segment_embeddings = _scan_segment_embeddings 195 78 196 79 # Load owner centroid — required for owner subtraction 197 80 centroid_data = load_owner_centroid() ··· 262 145 263 146 # Load existing voiceprint keys for idempotency (once per entity) 264 147 if entity_id not in entity_existing: 265 - entity_existing[entity_id] = _load_existing_voiceprint_keys(entity_id) 148 + entity_existing[entity_id] = load_existing_voiceprint_keys(entity_id) 266 149 267 150 existing_keys = entity_existing[entity_id] 268 151 seg_dir = segment_path(day, seg_key, stream) ··· 320 203 if not dry_run: 321 204 for entity_id, emb_list in entity_embeddings.items(): 322 205 try: 323 - saved = _save_voiceprints_batch(entity_id, emb_list) 206 + saved = save_voiceprints_batch(entity_id, emb_list) 324 207 stats["embeddings_saved"] += saved 325 208 except Exception as e: 326 209 name = entity_names.get(entity_id, entity_id) ··· 333 216 334 217 335 218 def merge_names(alias_name: str, canonical_name: str) -> dict[str, Any]: 336 - """Deep merge a speaker entity into a canonical entity. 337 - 338 - Performs a phased deep merge: identity data, voiceprints, facet 339 - relationships, speaker references, then deletes the alias entity. 340 - Designed for interrupt safety — delete-last ordering ensures the 341 - system is never in an unrecoverable state. Every phase is idempotent. 342 - 343 - Args: 344 - alias_name: The alias/variant name to merge from 345 - canonical_name: The canonical/full name to merge into 346 - 347 - Returns: 348 - Dict with merge statistics or error 349 - """ 350 - _, normalize_embedding, _, load_entity_voiceprints_file = _routes_helpers() 219 + """Deep merge a speaker entity into a canonical entity.""" 220 + from think.entities import merge_entity 351 221 352 222 journal_entities = load_all_journal_entities() 353 223 entities_list = list(journal_entities.values()) 354 224 355 - # --- Phase 0: Resolve and validate --- 356 225 alias_entity = find_matching_entity(alias_name, entities_list) 357 226 if not alias_entity: 358 227 return {"error": f"No entity found for alias: {alias_name}"} ··· 377 246 378 247 if alias.get("is_principal") or canonical.get("is_principal"): 379 248 return {"error": "Cannot merge the principal entity."} 380 - if alias.get("blocked"): 381 - return {"error": f"Cannot merge blocked entity: {alias_id}"} 382 - if canonical.get("blocked"): 383 - return {"error": f"Cannot merge blocked entity: {canonical_id}"} 384 - 385 - alias_display = alias.get("name", alias_name) 386 - 387 - # Set merged_into resume marker on alias 388 - alias["merged_into"] = canonical_id 389 - alias["updated_at"] = now_ms() 390 - save_journal_entity(alias) 391 - 392 - # --- Phase 1: Merge identity data --- 393 - akas_added: list[str] = [] 394 - existing_aka = set(canonical.get("aka", [])) 395 - canonical_name_val = canonical.get("name", "") 396 - 397 - # Add alias display name as aka 398 - if alias_display not in existing_aka and alias_display != canonical_name_val: 399 - existing_aka.add(alias_display) 400 - akas_added.append(alias_display) 401 - 402 - # Merge alias's akas 403 - for aka in alias.get("aka", []): 404 - if aka not in existing_aka and aka != canonical_name_val: 405 - existing_aka.add(aka) 406 - akas_added.append(aka) 407 - 408 - canonical["aka"] = sorted(existing_aka) 409 - 410 - # Merge emails 411 - canonical_emails = {e.lower() for e in canonical.get("emails", [])} 412 - for email in alias.get("emails", []): 413 - canonical_emails.add(email.lower()) 414 - if canonical_emails: 415 - canonical["emails"] = sorted(canonical_emails) 416 - 417 - canonical["updated_at"] = now_ms() 418 - save_journal_entity(canonical) 419 - 420 - # --- Phase 2: Merge voiceprints --- 421 - alias_vp = load_entity_voiceprints_file(alias_id) 422 - voiceprints_merged = 0 423 - 424 - if alias_vp is not None: 425 - alias_embeddings, alias_metadata = alias_vp 426 - existing_keys = _load_existing_voiceprint_keys(canonical_id) 427 - 428 - new_items: list[tuple[np.ndarray, dict]] = [] 429 - for emb, meta in zip(alias_embeddings, alias_metadata): 430 - key = ( 431 - meta.get("day"), 432 - meta.get("segment_key"), 433 - meta.get("source"), 434 - meta.get("sentence_id"), 435 - ) 436 - if key in existing_keys: 437 - continue 438 - normalized = normalize_embedding(emb) 439 - if normalized is not None: 440 - new_items.append((normalized, meta)) 441 - existing_keys.add(key) 442 - 443 - if new_items: 444 - voiceprints_merged = _save_voiceprints_batch(canonical_id, new_items) 445 - 446 - canonical_vp = load_entity_voiceprints_file(canonical_id) 447 - voiceprints_total = len(canonical_vp[0]) if canonical_vp else 0 448 - 449 - # --- Phase 3: Merge facet relationships --- 450 - facets_merged: list[str] = [] 451 - facets_moved: list[str] = [] 452 - journal = get_journal() 453 - facets_dir = Path(journal) / "facets" 454 - 455 - if facets_dir.exists(): 456 - for facet_entry in sorted(facets_dir.iterdir()): 457 - if not facet_entry.is_dir(): 458 - continue 459 - facet_name = facet_entry.name 460 - alias_rel_dir = facet_entry / "entities" / alias_id 461 - alias_rel_path = alias_rel_dir / "entity.json" 462 - if not alias_rel_path.is_file(): 463 - continue 464 - 465 - canonical_rel_dir = facet_entry / "entities" / canonical_id 466 - canonical_rel_path = canonical_rel_dir / "entity.json" 467 - 468 - if not canonical_rel_path.is_file(): 469 - # Move: rename alias relationship dir to canonical 470 - if canonical_rel_dir.exists(): 471 - shutil.rmtree(canonical_rel_dir) 472 - alias_rel_dir.rename(canonical_rel_dir) 473 - # Update entity_id inside the moved entity.json 474 - moved_path = canonical_rel_dir / "entity.json" 475 - try: 476 - with open(moved_path, encoding="utf-8") as f: 477 - rel_data = json.load(f) 478 - rel_data["entity_id"] = canonical_id 479 - tmp = moved_path.with_suffix(".tmp") 480 - with open(tmp, "w", encoding="utf-8") as f: 481 - json.dump(rel_data, f, ensure_ascii=False, indent=2) 482 - f.write("\n") 483 - tmp.rename(moved_path) 484 - except (json.JSONDecodeError, OSError): 485 - pass 486 - facets_moved.append(facet_name) 487 - else: 488 - # Both have relationships: merge timestamps and data 489 - try: 490 - with open(alias_rel_path, encoding="utf-8") as f: 491 - alias_rel = json.load(f) 492 - with open(canonical_rel_path, encoding="utf-8") as f: 493 - canonical_rel = json.load(f) 494 - except (json.JSONDecodeError, OSError): 495 - continue 496 - 497 - # Merge timestamps: earliest attached_at 498 - alias_attached = alias_rel.get("attached_at") 499 - canonical_attached = canonical_rel.get("attached_at") 500 - if alias_attached and ( 501 - not canonical_attached or alias_attached < canonical_attached 502 - ): 503 - canonical_rel["attached_at"] = alias_attached 504 - 505 - # Latest updated_at and last_seen 506 - for ts_field in ("updated_at", "last_seen"): 507 - alias_ts = alias_rel.get(ts_field) 508 - canonical_ts = canonical_rel.get(ts_field) 509 - if alias_ts and (not canonical_ts or alias_ts > canonical_ts): 510 - canonical_rel[ts_field] = alias_ts 511 - 512 - # Merge description: keep canonical's if non-empty 513 - if not canonical_rel.get("description") and alias_rel.get( 514 - "description" 515 - ): 516 - canonical_rel["description"] = alias_rel["description"] 249 + result = merge_entity( 250 + alias_id, 251 + canonical_id, 252 + keep_source_as_aka=True, 253 + commit=True, 254 + caller="speakers.merge_names", 255 + ) 256 + if "error" in result: 257 + return result 517 258 518 - # Save merged relationship (atomic write) 519 - canonical_rel["entity_id"] = canonical_id 520 - content = json.dumps(canonical_rel, ensure_ascii=False, indent=2) + "\n" 521 - tmp = canonical_rel_path.with_suffix(".tmp") 522 - with open(tmp, "w", encoding="utf-8") as f: 523 - f.write(content) 524 - tmp.rename(canonical_rel_path) 525 - 526 - # Merge observations: append alias's to canonical's 527 - alias_obs_path = alias_rel_dir / "observations.jsonl" 528 - if alias_obs_path.exists(): 529 - alias_obs = alias_obs_path.read_text(encoding="utf-8") 530 - if alias_obs.strip(): 531 - canonical_obs_path = canonical_rel_dir / "observations.jsonl" 532 - existing_obs = "" 533 - if canonical_obs_path.exists(): 534 - existing_obs = canonical_obs_path.read_text( 535 - encoding="utf-8" 536 - ) 537 - with open(canonical_obs_path, "a", encoding="utf-8") as f: 538 - if existing_obs and not existing_obs.endswith("\n"): 539 - f.write("\n") 540 - f.write(alias_obs) 541 - if not alias_obs.endswith("\n"): 542 - f.write("\n") 543 - 544 - # Delete alias relationship directory 545 - shutil.rmtree(alias_rel_dir) 546 - facets_merged.append(facet_name) 547 - 548 - # --- Phase 4: Rewrite speaker references --- 549 - segments_scanned = 0 550 - labels_rewritten = 0 551 - corrections_rewritten = 0 552 - errors: list[str] = [] 553 - alias_id_bytes = alias_id.encode("utf-8") 554 - 555 - for day in sorted(day_dirs().keys()): 556 - for _stream, _seg_key, seg_path in iter_segments(day): 557 - segments_scanned += 1 558 - agents_dir = seg_path / "talents" 559 - 560 - # Rewrite speaker_labels.json 561 - labels_path = agents_dir / "speaker_labels.json" 562 - if labels_path.is_file(): 563 - try: 564 - raw = labels_path.read_bytes() 565 - if alias_id_bytes in raw: 566 - data = json.loads(raw) 567 - changed = False 568 - for label in data.get("labels", []): 569 - if label.get("speaker") == alias_id: 570 - label["speaker"] = canonical_id 571 - changed = True 572 - if changed: 573 - tmp = labels_path.with_suffix(".tmp") 574 - with open(tmp, "w", encoding="utf-8") as f: 575 - json.dump(data, f, indent=2) 576 - tmp.rename(labels_path) 577 - labels_rewritten += 1 578 - except Exception as e: 579 - errors.append(f"{labels_path}: {e}") 580 - 581 - # Rewrite speaker_corrections.json 582 - corrections_path = agents_dir / "speaker_corrections.json" 583 - if corrections_path.is_file(): 584 - try: 585 - raw = corrections_path.read_bytes() 586 - if alias_id_bytes in raw: 587 - data = json.loads(raw) 588 - changed = False 589 - for correction in data.get("corrections", []): 590 - if correction.get("original_speaker") == alias_id: 591 - correction["original_speaker"] = canonical_id 592 - changed = True 593 - if correction.get("corrected_speaker") == alias_id: 594 - correction["corrected_speaker"] = canonical_id 595 - changed = True 596 - if changed: 597 - tmp = corrections_path.with_suffix(".tmp") 598 - with open(tmp, "w", encoding="utf-8") as f: 599 - json.dump(data, f, indent=2) 600 - tmp.rename(corrections_path) 601 - corrections_rewritten += 1 602 - except Exception as e: 603 - errors.append(f"{corrections_path}: {e}") 604 - 605 - # --- Phase 5: Cleanup --- 606 - alias_entity_dir = Path(journal) / "entities" / alias_id 607 - if alias_entity_dir.exists(): 608 - shutil.rmtree(alias_entity_dir) 609 - 610 - discovery_cache = Path(journal) / "awareness" / "discovery_clusters.json" 611 - if discovery_cache.exists(): 612 - discovery_cache.unlink() 259 + errors = [] 260 + for error in result["segments"]["errors"]: 261 + path = error.get("path") 262 + message = error.get("message", "") 263 + if path: 264 + errors.append(f"{path}: {message}") 265 + else: 266 + errors.append(str(message)) 613 267 614 268 return { 615 269 "merged": True, 616 - "alias": alias_display, 270 + "alias": alias.get("name", alias_name), 617 271 "alias_id": alias_id, 618 272 "canonical_name": canonical.get("name", canonical_name), 619 273 "canonical_id": canonical_id, 620 - "akas_added": akas_added, 621 - "voiceprints_merged": voiceprints_merged, 622 - "voiceprints_total": voiceprints_total, 623 - "facets_merged": facets_merged, 624 - "facets_moved": facets_moved, 625 - "segments_scanned": segments_scanned, 626 - "labels_rewritten": labels_rewritten, 627 - "corrections_rewritten": corrections_rewritten, 274 + "akas_added": result["identity"]["akas_added"], 275 + "voiceprints_merged": result["voiceprints"]["added"], 276 + "voiceprints_total": result["voiceprints"]["target_total"], 277 + "facets_merged": result["facets"]["merged"], 278 + "facets_moved": result["facets"]["moved"], 279 + "segments_scanned": result["segments"]["files_scanned"], 280 + "labels_rewritten": result["segments"]["labels_rewritten"], 281 + "corrections_rewritten": result["segments"]["corrections_rewritten"], 628 282 "errors": errors, 629 283 } 630 284 ··· 649 303 Returns: 650 304 Dict with merge statistics 651 305 """ 652 - _, normalize_embedding, _, load_entity_voiceprints_file = _routes_helpers() 306 + from think.entities import ( 307 + load_entity_voiceprints_file, 308 + normalize_embedding, 309 + ) 653 310 654 311 journal_entities = load_all_journal_entities() 655 312 ··· 909 566 910 567 def seed_from_imports(dry_run: bool = False) -> dict[str, Any]: 911 568 """Seed voiceprints from import segments with speaker-attributed transcripts.""" 912 - ( 913 - load_embeddings_file, 569 + from apps.speakers.routes import _load_embeddings_file, _scan_segment_embeddings 570 + from think.entities import ( 571 + load_existing_voiceprint_keys, 914 572 normalize_embedding, 915 - scan_segment_embeddings, 916 - _, 917 - ) = _routes_helpers() 573 + save_voiceprints_batch, 574 + ) 575 + 576 + load_embeddings_file = _load_embeddings_file 577 + scan_segment_embeddings = _scan_segment_embeddings 918 578 919 579 centroid_data = load_owner_centroid() 920 580 if centroid_data is None: ··· 1015 675 stats["speakers_found"].setdefault(entity_name, 0) 1016 676 1017 677 if entity_id not in entity_existing: 1018 - entity_existing[entity_id] = _load_existing_voiceprint_keys( 678 + entity_existing[entity_id] = load_existing_voiceprint_keys( 1019 679 entity_id 1020 680 ) 1021 681 ··· 1060 720 if not dry_run: 1061 721 for entity_id, emb_list in entity_embeddings.items(): 1062 722 try: 1063 - saved = _save_voiceprints_batch(entity_id, emb_list) 723 + saved = save_voiceprints_batch(entity_id, emb_list) 1064 724 stats["embeddings_saved"] += saved 1065 725 except Exception as e: 1066 726 stats["errors"].append(f"Failed to save for {entity_id}: {e}")
+5 -11
apps/speakers/discovery.py
··· 48 48 ) 49 49 50 50 51 - def _bootstrap_helpers(): 52 - """Load bootstrap helpers lazily to avoid import cycles.""" 53 - from apps.speakers.bootstrap import ( 54 - _load_existing_voiceprint_keys, 55 - _save_voiceprints_batch, 56 - ) 57 - 58 - return _load_existing_voiceprint_keys, _save_voiceprints_batch 59 - 60 - 61 51 def _owner_helpers(): 62 52 """Load owner helpers lazily to avoid import cycles.""" 63 53 from apps.speakers.owner import load_owner_centroid ··· 315 305 cluster_id: int, name: str, entity_id: str | None = None 316 306 ) -> dict[str, Any]: 317 307 """Identify a discovered unknown speaker cluster.""" 308 + from think.entities import ( 309 + load_existing_voiceprint_keys, 310 + save_voiceprints_batch, 311 + ) 312 + 318 313 ( 319 314 load_embeddings_file, 320 315 load_speaker_labels, ··· 324 319 append_speaker_correction, 325 320 check_owner_contamination, 326 321 ) = _routes_helpers() 327 - load_existing_voiceprint_keys, save_voiceprints_batch = _bootstrap_helpers() 328 322 329 323 cache_path = _discovery_cache_path() 330 324 if not cache_path.exists():
+5 -41
apps/speakers/routes.py
··· 68 68 69 69 70 70 def _normalize_embedding(emb: np.ndarray) -> np.ndarray | None: 71 - """L2-normalize an embedding vector. Returns None if norm is zero.""" 72 - emb = emb.astype(np.float32) 73 - norm = np.linalg.norm(emb) 74 - if norm > 0: 75 - return emb / norm 76 - return None 71 + from think.entities import normalize_embedding 72 + 73 + return normalize_embedding(emb) 77 74 78 75 79 76 def _parse_time_to_seconds(time_str: str) -> int: ··· 140 137 def _load_entity_voiceprints_file( 141 138 entity_id: str, 142 139 ) -> tuple[np.ndarray, list[dict]] | None: 143 - """Load voiceprints for an entity from journal-level voiceprints.npz. 144 - 145 - Voiceprints are stored at the journal level (entities/<id>/voiceprints.npz) 146 - since a person's voice is the same across all facets. 147 - 148 - Args: 149 - entity_id: Entity ID (slug) 140 + from think.entities import load_entity_voiceprints_file 150 141 151 - Returns: 152 - Tuple of (embeddings, metadata_list) or None if not found. 153 - - embeddings: (N, 256) float32 array 154 - - metadata_list: List of dicts parsed from JSON metadata strings 155 - """ 156 - try: 157 - folder = journal_entity_memory_path(entity_id) 158 - except (RuntimeError, ValueError): 159 - return None 160 - 161 - npz_path = folder / "voiceprints.npz" 162 - if not npz_path.exists(): 163 - return None 164 - 165 - try: 166 - data = np.load(npz_path, allow_pickle=False) 167 - embeddings = data.get("embeddings") 168 - metadata_arr = data.get("metadata") 169 - 170 - if embeddings is None or metadata_arr is None: 171 - return None 172 - 173 - # Parse JSON metadata strings 174 - metadata_list = [json.loads(m) for m in metadata_arr] 175 - return embeddings, metadata_list 176 - except Exception as e: 177 - logger.warning("Failed to load voiceprints for entity %s: %s", entity_id, e) 178 - return None 142 + return load_entity_voiceprints_file(entity_id) 179 143 180 144 181 145 def _save_voiceprint(
+115 -55
apps/speakers/tests/conftest.py
··· 6 6 from __future__ import annotations 7 7 8 8 import json 9 + import sys 9 10 from pathlib import Path 10 11 11 12 import numpy as np 12 13 import pytest 14 + 15 + ROOT = Path(__file__).resolve().parents[3] 16 + if str(ROOT) not in sys.path: 17 + sys.path.insert(0, str(ROOT)) 13 18 14 19 from think.entities import entity_slug 20 + from think.entities.journal import clear_journal_entity_cache 21 + from think.entities.loading import clear_entity_loading_cache 22 + from think.entities.observations import clear_observation_cache 23 + from think.entities.relationships import clear_relationship_caches 15 24 16 25 # Default stream name for test fixtures 17 26 STREAM = "test" ··· 43 52 def __init__(self, journal_path: Path): 44 53 self.journal = journal_path 45 54 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 55 + monkeypatch.setenv("SOL_SKIP_SUPERVISOR_CHECK", "1") 56 + clear_journal_entity_cache() 57 + clear_entity_loading_cache() 58 + clear_relationship_caches() 59 + clear_observation_cache() 60 + import think.utils 61 + 62 + think.utils._journal_path_cache = None 63 + 64 + def _segment_dirs( 65 + self, 66 + day: str, 67 + segment_key: str, 68 + *, 69 + stream: str | None = None, 70 + ) -> tuple[Path, Path]: 71 + stream_name = stream or STREAM 72 + chronicle_day = self.journal / "chronicle" / day 73 + chronicle_day.mkdir(parents=True, exist_ok=True) 74 + flat_day = self.journal / day 75 + if not flat_day.exists(): 76 + flat_day.symlink_to(chronicle_day, target_is_directory=True) 77 + flat_dir = flat_day / stream_name / segment_key 78 + chronicle_dir = chronicle_day / stream_name / segment_key 79 + chronicle_dir.mkdir(parents=True, exist_ok=True) 80 + return flat_dir, chronicle_dir 46 81 47 82 def create_segment( 48 83 self, ··· 64 99 sources: List of audio sources (e.g., ["mic_audio", "sys_audio"]) 65 100 num_sentences: Number of sentences to create 66 101 """ 67 - segment_dir = self.journal / day / (stream or STREAM) / segment_key 68 - segment_dir.mkdir(parents=True, exist_ok=True) 102 + flat_dir, chronicle_dir = self._segment_dirs( 103 + day, 104 + segment_key, 105 + stream=stream, 106 + ) 69 107 70 108 sentence_count = ( 71 109 embeddings.shape[0] if embeddings is not None else num_sentences 72 110 ) 73 111 74 112 for source in sources: 75 - # Create JSONL transcript 76 - jsonl_path = segment_dir / f"{source}.jsonl" 77 113 lines = [json.dumps({"raw": f"{source}.flac", "model": "medium.en"})] 78 114 79 115 # Parse segment_key to get base time (e.g., "143022_300" -> 14:30:22) ··· 98 134 } 99 135 ) 100 136 ) 101 - jsonl_path.write_text("\n".join(lines) + "\n") 137 + for segment_dir in (flat_dir, chronicle_dir): 138 + (segment_dir / f"{source}.jsonl").write_text( 139 + "\n".join(lines) + "\n" 140 + ) 102 141 103 142 # Create NPZ embeddings 104 - npz_path = segment_dir / f"{source}.npz" 105 143 if embeddings is None: 106 144 source_embeddings = np.random.randn(sentence_count, 256).astype( 107 145 np.float32 ··· 111 149 else: 112 150 source_embeddings = embeddings.astype(np.float32) 113 151 statement_ids = np.arange(1, sentence_count + 1, dtype=np.int32) 114 - np.savez_compressed( 115 - npz_path, 116 - embeddings=source_embeddings, 117 - statement_ids=statement_ids, 118 - ) 119 - 120 - # Create dummy audio file 121 - audio_path = segment_dir / f"{source}.flac" 122 - audio_path.write_bytes(b"") # Empty placeholder 152 + for segment_dir in (flat_dir, chronicle_dir): 153 + np.savez_compressed( 154 + segment_dir / f"{source}.npz", 155 + embeddings=source_embeddings, 156 + statement_ids=statement_ids, 157 + ) 158 + (segment_dir / f"{source}.flac").write_bytes(b"") 123 159 124 - return segment_dir 160 + return flat_dir 125 161 126 162 def create_embedding(self, vector: list[float] | None = None) -> np.ndarray: 127 163 """Create a normalized 256-dim embedding.""" ··· 194 230 segment_key: Segment key (HHMMSS_LEN) 195 231 speakers: List of speaker names 196 232 """ 197 - agents_dir = self.journal / day / STREAM / segment_key / "talents" 198 - agents_dir.mkdir(parents=True, exist_ok=True) 233 + flat_dir, chronicle_dir = self._segment_dirs(day, segment_key) 234 + paths = [] 235 + for segment_dir in (flat_dir, chronicle_dir): 236 + agents_dir = segment_dir / "talents" 237 + agents_dir.mkdir(parents=True, exist_ok=True) 238 + speakers_path = agents_dir / "speakers.json" 239 + with open(speakers_path, "w", encoding="utf-8") as f: 240 + json.dump(speakers, f) 241 + paths.append(speakers_path) 199 242 200 - speakers_path = agents_dir / "speakers.json" 201 - with open(speakers_path, "w", encoding="utf-8") as f: 202 - json.dump(speakers, f) 203 - 204 - return speakers_path 243 + return paths[0] 205 244 206 245 def create_speaker_labels( 207 246 self, ··· 220 259 metadata: Optional extra metadata (owner_centroid_version, 221 260 voiceprint_versions) 222 261 """ 223 - agents_dir = self.journal / day / STREAM / segment_key / "talents" 224 - agents_dir.mkdir(parents=True, exist_ok=True) 225 - 226 262 data = {"labels": labels} 227 263 if metadata: 228 264 data.update(metadata) 229 265 else: 230 266 data["owner_centroid_version"] = None 231 267 data["voiceprint_versions"] = {} 268 + flat_dir, chronicle_dir = self._segment_dirs(day, segment_key) 269 + paths = [] 270 + for segment_dir in (flat_dir, chronicle_dir): 271 + agents_dir = segment_dir / "talents" 272 + agents_dir.mkdir(parents=True, exist_ok=True) 273 + labels_path = agents_dir / "speaker_labels.json" 274 + with open(labels_path, "w", encoding="utf-8") as f: 275 + json.dump(data, f) 276 + paths.append(labels_path) 232 277 233 - labels_path = agents_dir / "speaker_labels.json" 234 - with open(labels_path, "w", encoding="utf-8") as f: 235 - json.dump(data, f) 236 - 237 - return labels_path 278 + return paths[0] 238 279 239 280 def create_speaker_corrections( 240 281 self, ··· 253 294 original_speaker, corrected_speaker, timestamp 254 295 stream: Optional stream name (defaults to STREAM) 255 296 """ 256 - agents_dir = ( 257 - self.journal / day / (stream or STREAM) / segment_key / "talents" 297 + data = {"corrections": corrections} 298 + flat_dir, chronicle_dir = self._segment_dirs( 299 + day, 300 + segment_key, 301 + stream=stream, 258 302 ) 259 - agents_dir.mkdir(parents=True, exist_ok=True) 260 - 261 - data = {"corrections": corrections} 262 - corrections_path = agents_dir / "speaker_corrections.json" 263 - with open(corrections_path, "w", encoding="utf-8") as f: 264 - json.dump(data, f) 303 + paths = [] 304 + for segment_dir in (flat_dir, chronicle_dir): 305 + agents_dir = segment_dir / "talents" 306 + agents_dir.mkdir(parents=True, exist_ok=True) 307 + corrections_path = agents_dir / "speaker_corrections.json" 308 + with open(corrections_path, "w", encoding="utf-8") as f: 309 + json.dump(data, f) 310 + paths.append(corrections_path) 265 311 266 - return corrections_path 312 + return paths[0] 267 313 268 314 def create_facet_relationship( 269 315 self, ··· 336 382 stream: Import stream name (default: import.granola) 337 383 embeddings: Optional pre-built embeddings array (num_sentences x 256) 338 384 """ 339 - segment_dir = self.journal / day / stream / segment_key 340 - segment_dir.mkdir(parents=True, exist_ok=True) 385 + flat_dir, chronicle_dir = self._segment_dirs( 386 + day, 387 + segment_key, 388 + stream=stream, 389 + ) 341 390 342 391 num_sentences = len(speakers) 343 392 ··· 366 415 } 367 416 ) 368 417 ) 369 - ct_path = segment_dir / "conversation_transcript.jsonl" 370 - ct_path.write_text("\n".join(ct_lines) + "\n") 418 + for segment_dir in (flat_dir, chronicle_dir): 419 + (segment_dir / "conversation_transcript.jsonl").write_text( 420 + "\n".join(ct_lines) + "\n" 421 + ) 371 422 372 423 audio_lines = [ 373 424 json.dumps({"raw": "imported_audio.flac", "model": "medium.en"}) ··· 386 437 } 387 438 ) 388 439 ) 389 - audio_jsonl_path = segment_dir / "imported_audio.jsonl" 390 - audio_jsonl_path.write_text("\n".join(audio_lines) + "\n") 440 + for segment_dir in (flat_dir, chronicle_dir): 441 + (segment_dir / "imported_audio.jsonl").write_text( 442 + "\n".join(audio_lines) + "\n" 443 + ) 391 444 392 445 if embeddings is None: 393 446 source_embeddings = np.random.randn(num_sentences, 256).astype( ··· 398 451 else: 399 452 source_embeddings = embeddings.astype(np.float32) 400 453 statement_ids = np.arange(1, num_sentences + 1, dtype=np.int32) 401 - np.savez_compressed( 402 - segment_dir / "imported_audio.npz", 403 - embeddings=source_embeddings, 404 - statement_ids=statement_ids, 405 - ) 454 + for segment_dir in (flat_dir, chronicle_dir): 455 + np.savez_compressed( 456 + segment_dir / "imported_audio.npz", 457 + embeddings=source_embeddings, 458 + statement_ids=statement_ids, 459 + ) 460 + (segment_dir / "imported_audio.flac").write_bytes(b"") 406 461 407 - (segment_dir / "imported_audio.flac").write_bytes(b"") 408 - 409 - return segment_dir 462 + return flat_dir 410 463 411 464 def _create(): 412 465 return SpeakersEnv(tmp_path) 413 466 414 - return _create 467 + yield _create 468 + clear_journal_entity_cache() 469 + clear_entity_loading_cache() 470 + clear_relationship_caches() 471 + clear_observation_cache() 472 + import think.utils 473 + 474 + think.utils._journal_path_cache = None
+1 -1
docs/coding-standards.md
··· 67 67 68 68 | Domain | Write-owning module(s) | 69 69 |--------|------------------------| 70 - | Entities (`entities/*/entity.json`, `entities/*/*.npz`) | `think/entities/journal.py` + `think/entities/consolidation.py` + `think/entities/saving.py` + `apps/entities/call.py` | 70 + | Entities (`entities/*/entity.json`, `entities/*/*.npz`) | `think/entities/journal.py` + `think/entities/consolidation.py` + `think/entities/saving.py` + `think/entities/merge.py` + `apps/entities/call.py` | 71 71 | Facets (`facets/*/facet.json`, `facets/*/relationships/`) | `think/facets.py` + `apps/facets/*` (if/when created) | 72 72 | Observations (`observations.jsonl`) | `think/entities/observations.py` | 73 73 | Activities (`facets/*/activities/*.jsonl`) | `think/activities.py` |
+16
think/entities/__init__.py
··· 84 84 resolve_entity, 85 85 validate_aka_uniqueness, 86 86 ) 87 + from think.entities.merge import merge_entity 87 88 88 89 # Observations 89 90 from think.entities.observations import ( ··· 110 111 save_entities, 111 112 update_detected_entity, 112 113 ) 114 + from think.entities.voiceprints import ( 115 + load_entity_voiceprints_file, 116 + load_existing_voiceprint_keys, 117 + normalize_embedding, 118 + save_voiceprints_batch, 119 + save_voiceprints_safely, 120 + voiceprint_file_path, 121 + ) 113 122 114 123 __all__ = [ 115 124 # Core ··· 150 159 "load_entities", 151 160 "load_entity_names", 152 161 "load_recent_entity_names", 162 + "merge_entity", 153 163 "parse_entity_file", 154 164 # Saving 155 165 "save_detected_entity", 156 166 "save_entities", 167 + "save_voiceprints_batch", 168 + "save_voiceprints_safely", 157 169 "update_detected_entity", 170 + "voiceprint_file_path", 158 171 # Matching 159 172 "MatchResult", 160 173 "MatchTier", ··· 174 187 "load_observations", 175 188 "observations_file_path", 176 189 "save_observations", 190 + "load_entity_voiceprints_file", 191 + "load_existing_voiceprint_keys", 192 + "normalize_embedding", 177 193 # Formatting 178 194 "format_entities", 179 195 "format_observations",
+726
think/entities/merge.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Journal-entity merge primitive.""" 5 + 6 + from __future__ import annotations 7 + 8 + import json 9 + import shutil 10 + from pathlib import Path 11 + from typing import Any 12 + 13 + from think.entities.journal import ( 14 + clear_journal_entity_cache, 15 + load_journal_entity, 16 + save_journal_entity, 17 + scan_journal_entities, 18 + ) 19 + from think.entities.loading import clear_entity_loading_cache 20 + from think.entities.observations import clear_observation_cache, save_observations 21 + from think.entities.relationships import ( 22 + clear_relationship_caches, 23 + save_facet_relationship, 24 + ) 25 + from think.entities.voiceprints import ( 26 + load_entity_voiceprints_file, 27 + load_existing_voiceprint_keys, 28 + normalize_embedding, 29 + save_voiceprints_batch, 30 + ) 31 + from think.utils import day_dirs, get_journal, iter_segments, now_ms 32 + 33 + 34 + def _dedupe_akas(target_values: list[Any], source_values: list[Any]) -> list[str]: 35 + """Case-insensitive aka dedup, preserving first-seen spelling.""" 36 + aka_by_lower: dict[str, str] = {} 37 + for values in (target_values, source_values): 38 + if not isinstance(values, list): 39 + continue 40 + for value in values: 41 + if not value: 42 + continue 43 + key = str(value).lower() 44 + if key not in aka_by_lower: 45 + aka_by_lower[key] = str(value) 46 + return sorted(aka_by_lower.values(), key=str.lower) 47 + 48 + 49 + def _dedupe_emails(target_values: list[Any], source_values: list[Any]) -> list[str]: 50 + """Case-insensitive email dedup, preserving first-seen order/spelling.""" 51 + merged_emails: list[str] = [] 52 + seen_emails: set[str] = set() 53 + for values in (target_values, source_values): 54 + if not isinstance(values, list): 55 + continue 56 + for value in values: 57 + if not value: 58 + continue 59 + email = str(value) 60 + key = email.lower() 61 + if key in seen_emails: 62 + continue 63 + seen_emails.add(key) 64 + merged_emails.append(email) 65 + return merged_emails 66 + 67 + 68 + def _dedupe_observations( 69 + source_observations: list[dict[str, Any]], 70 + target_observations: list[dict[str, Any]], 71 + ) -> list[dict[str, Any]]: 72 + """Deduplicate observations on (content, observed_at).""" 73 + seen = { 74 + (item.get("content", ""), item.get("observed_at")) 75 + for item in target_observations 76 + } 77 + merged_observations = list(target_observations) 78 + for item in source_observations: 79 + key = (item.get("content", ""), item.get("observed_at")) 80 + if key in seen: 81 + continue 82 + seen.add(key) 83 + merged_observations.append(item) 84 + return merged_observations 85 + 86 + 87 + def _read_jsonl(path: Path) -> list[dict[str, Any]]: 88 + if not path.is_file(): 89 + return [] 90 + rows: list[dict[str, Any]] = [] 91 + try: 92 + with open(path, encoding="utf-8") as handle: 93 + for line in handle: 94 + line = line.strip() 95 + if not line: 96 + continue 97 + try: 98 + rows.append(json.loads(line)) 99 + except json.JSONDecodeError: 100 + continue 101 + except OSError: 102 + return [] 103 + return rows 104 + 105 + 106 + def _identity_section( 107 + akas_added: list[str], 108 + emails_added: list[str], 109 + principal_transferred: bool, 110 + ) -> dict[str, Any]: 111 + return { 112 + "akas_added": akas_added, 113 + "akas_added_count": len(akas_added), 114 + "emails_added": emails_added, 115 + "emails_added_count": len(emails_added), 116 + "principal_transferred": principal_transferred, 117 + } 118 + 119 + 120 + def _voiceprint_section( 121 + added: int, skipped_duplicate: int, target_total: int 122 + ) -> dict[str, Any]: 123 + return { 124 + "added": added, 125 + "skipped_duplicate": skipped_duplicate, 126 + "target_total": target_total, 127 + } 128 + 129 + 130 + def _facet_section( 131 + moved: list[str], 132 + merged: list[str], 133 + observations_appended: int, 134 + ) -> dict[str, Any]: 135 + return { 136 + "moved": moved, 137 + "moved_count": len(moved), 138 + "merged": merged, 139 + "merged_count": len(merged), 140 + "observations_appended": observations_appended, 141 + } 142 + 143 + 144 + def _segment_section( 145 + labels_rewritten: int, 146 + corrections_rewritten: int, 147 + files_scanned: int, 148 + errors: list[dict[str, Any]], 149 + ) -> dict[str, Any]: 150 + return { 151 + "labels_rewritten": labels_rewritten, 152 + "corrections_rewritten": corrections_rewritten, 153 + "files_scanned": files_scanned, 154 + "errors": errors, 155 + } 156 + 157 + 158 + def _empty_result_section() -> dict[str, Any]: 159 + return { 160 + "identity": _identity_section([], [], False), 161 + "voiceprints": _voiceprint_section(0, 0, 0), 162 + "facets": _facet_section([], [], 0), 163 + "segments": _segment_section(0, 0, 0, []), 164 + } 165 + 166 + 167 + def _is_missing_value(value: Any) -> bool: 168 + return value in (None, "", [], {}) 169 + 170 + 171 + def _plan_resume_marker( 172 + source_entity: dict[str, Any], 173 + target_id: str, 174 + *, 175 + principal_transferred: bool, 176 + ) -> dict[str, Any]: 177 + updated = dict(source_entity) 178 + updated["merged_into"] = target_id 179 + updated["updated_at"] = now_ms() 180 + if principal_transferred: 181 + updated.pop("is_principal", None) 182 + return updated 183 + 184 + 185 + def _plan_identity_merge( 186 + source_entity: dict[str, Any], 187 + target_entity: dict[str, Any], 188 + *, 189 + keep_source_as_aka: bool, 190 + ) -> tuple[dict[str, Any], dict[str, Any]]: 191 + target_after = dict(target_entity) 192 + source_display = str(source_entity.get("name", source_entity.get("id", ""))) 193 + target_name = str(target_entity.get("name", "")) 194 + 195 + target_akas = target_entity.get("aka", []) 196 + if not isinstance(target_akas, list): 197 + target_akas = [] 198 + 199 + source_aka_values = source_entity.get("aka", []) 200 + if not isinstance(source_aka_values, list): 201 + source_aka_values = [] 202 + 203 + aka_candidates: list[str] = [] 204 + if keep_source_as_aka and source_display and source_display != target_name: 205 + aka_candidates.append(source_display) 206 + aka_candidates.extend(str(value) for value in source_aka_values if value) 207 + 208 + target_aka_keys = {str(value).lower() for value in target_akas if value} 209 + added_akas: list[str] = [] 210 + seen_added_akas: set[str] = set() 211 + for value in aka_candidates: 212 + key = value.lower() 213 + if key in target_aka_keys or key in seen_added_akas: 214 + continue 215 + seen_added_akas.add(key) 216 + added_akas.append(value) 217 + 218 + merged_akas = _dedupe_akas(target_akas, aka_candidates) 219 + if merged_akas: 220 + target_after["aka"] = merged_akas 221 + 222 + target_emails = target_entity.get("emails", []) 223 + if not isinstance(target_emails, list): 224 + target_emails = [] 225 + source_emails = source_entity.get("emails", []) 226 + if not isinstance(source_emails, list): 227 + source_emails = [] 228 + 229 + target_email_keys = {str(value).lower() for value in target_emails if value} 230 + added_emails: list[str] = [] 231 + seen_added_emails: set[str] = set() 232 + for value in source_emails: 233 + if not value: 234 + continue 235 + email = str(value) 236 + key = email.lower() 237 + if key in target_email_keys or key in seen_added_emails: 238 + continue 239 + seen_added_emails.add(key) 240 + added_emails.append(email) 241 + 242 + merged_emails = _dedupe_emails(target_emails, source_emails) 243 + if merged_emails: 244 + target_after["emails"] = merged_emails 245 + 246 + principal_transferred = bool( 247 + source_entity.get("is_principal") and not target_entity.get("is_principal") 248 + ) 249 + if principal_transferred: 250 + target_after["is_principal"] = True 251 + 252 + for key, value in source_entity.items(): 253 + if key in { 254 + "id", 255 + "name", 256 + "aka", 257 + "emails", 258 + "created_at", 259 + "updated_at", 260 + "merged_into", 261 + "blocked", 262 + "is_principal", 263 + }: 264 + continue 265 + if _is_missing_value(target_after.get(key)) and not _is_missing_value(value): 266 + target_after[key] = value 267 + 268 + target_after["updated_at"] = now_ms() 269 + return target_after, _identity_section( 270 + added_akas, added_emails, principal_transferred 271 + ) 272 + 273 + 274 + def _plan_voiceprint_merge(source_id: str, target_id: str) -> dict[str, Any]: 275 + source_vp = load_entity_voiceprints_file(source_id) 276 + target_vp = load_entity_voiceprints_file(target_id) 277 + existing_keys = load_existing_voiceprint_keys(target_id) 278 + 279 + new_items: list[tuple[Any, dict[str, Any]]] = [] 280 + skipped_duplicate = 0 281 + if source_vp is not None: 282 + source_embeddings, source_metadata = source_vp 283 + for emb, meta in zip(source_embeddings, source_metadata): 284 + key = ( 285 + meta.get("day"), 286 + meta.get("segment_key"), 287 + meta.get("source"), 288 + meta.get("sentence_id"), 289 + ) 290 + if key in existing_keys: 291 + skipped_duplicate += 1 292 + continue 293 + normalized = normalize_embedding(emb) 294 + if normalized is None: 295 + continue 296 + new_items.append((normalized, meta)) 297 + existing_keys.add(key) 298 + 299 + target_existing_total = len(target_vp[0]) if target_vp else 0 300 + added = len(new_items) 301 + return { 302 + "items": new_items, 303 + "section": _voiceprint_section( 304 + added=added, 305 + skipped_duplicate=skipped_duplicate, 306 + target_total=target_existing_total + added, 307 + ), 308 + } 309 + 310 + 311 + def _plan_facet_merge(source_id: str, target_id: str) -> dict[str, Any]: 312 + journal = Path(get_journal()) 313 + facets_dir = journal / "facets" 314 + operations: list[dict[str, Any]] = [] 315 + moved: list[str] = [] 316 + merged: list[str] = [] 317 + observations_appended = 0 318 + 319 + if not facets_dir.exists(): 320 + return { 321 + "operations": operations, 322 + "section": _facet_section(moved, merged, observations_appended), 323 + } 324 + 325 + for facet_entry in sorted(facets_dir.iterdir()): 326 + if not facet_entry.is_dir(): 327 + continue 328 + facet_name = facet_entry.name 329 + source_rel_dir = facet_entry / "entities" / source_id 330 + source_rel_path = source_rel_dir / "entity.json" 331 + if not source_rel_path.is_file(): 332 + continue 333 + 334 + target_rel_dir = facet_entry / "entities" / target_id 335 + target_rel_path = target_rel_dir / "entity.json" 336 + 337 + if not target_rel_path.is_file(): 338 + operations.append( 339 + { 340 + "kind": "move", 341 + "facet": facet_name, 342 + "source_rel_dir": source_rel_dir, 343 + "target_rel_dir": target_rel_dir, 344 + } 345 + ) 346 + moved.append(facet_name) 347 + continue 348 + 349 + try: 350 + with open(source_rel_path, encoding="utf-8") as handle: 351 + source_rel = json.load(handle) 352 + with open(target_rel_path, encoding="utf-8") as handle: 353 + target_rel = json.load(handle) 354 + except (json.JSONDecodeError, OSError): 355 + continue 356 + 357 + merged_rel = dict(target_rel) 358 + source_attached = source_rel.get("attached_at") 359 + target_attached = merged_rel.get("attached_at") 360 + if source_attached and ( 361 + not target_attached or source_attached < target_attached 362 + ): 363 + merged_rel["attached_at"] = source_attached 364 + 365 + for field in ("updated_at", "last_seen"): 366 + source_ts = source_rel.get(field) 367 + target_ts = merged_rel.get(field) 368 + if source_ts and (not target_ts or source_ts > target_ts): 369 + merged_rel[field] = source_ts 370 + 371 + if not merged_rel.get("description") and source_rel.get("description"): 372 + merged_rel["description"] = source_rel["description"] 373 + 374 + source_obs = _read_jsonl(source_rel_dir / "observations.jsonl") 375 + target_obs = _read_jsonl(target_rel_dir / "observations.jsonl") 376 + merged_obs = _dedupe_observations(source_obs, target_obs) 377 + observations_added = len(merged_obs) - len(target_obs) 378 + observations_appended += observations_added 379 + 380 + operations.append( 381 + { 382 + "kind": "merge", 383 + "facet": facet_name, 384 + "source_rel_dir": source_rel_dir, 385 + "target_rel_dir": target_rel_dir, 386 + "relationship": merged_rel, 387 + "observations": merged_obs, 388 + "observations_added": observations_added, 389 + } 390 + ) 391 + merged.append(facet_name) 392 + 393 + return { 394 + "operations": operations, 395 + "section": _facet_section(moved, merged, observations_appended), 396 + } 397 + 398 + 399 + def _plan_segment_rewrites(source_id: str, target_id: str) -> dict[str, Any]: 400 + labels_rewritten = 0 401 + corrections_rewritten = 0 402 + files_scanned = 0 403 + errors: list[dict[str, Any]] = [] 404 + operations: list[dict[str, Any]] = [] 405 + source_id_bytes = source_id.encode("utf-8") 406 + 407 + for day_path in _segment_day_dirs(): 408 + for _stream, _seg_key, seg_path in iter_segments(day_path): 409 + files_scanned += 1 410 + talents_dir = seg_path / "talents" 411 + 412 + labels_path = talents_dir / "speaker_labels.json" 413 + if labels_path.is_file(): 414 + try: 415 + raw = labels_path.read_bytes() 416 + if source_id_bytes in raw: 417 + data = json.loads(raw) 418 + changed = False 419 + for label in data.get("labels", []): 420 + if label.get("speaker") == source_id: 421 + label["speaker"] = target_id 422 + changed = True 423 + if changed: 424 + labels_rewritten += 1 425 + operations.append( 426 + { 427 + "kind": "speaker_labels", 428 + "path": labels_path, 429 + "data": data, 430 + } 431 + ) 432 + except Exception as exc: 433 + errors.append( 434 + { 435 + "kind": "speaker_labels", 436 + "path": str(labels_path), 437 + "message": str(exc), 438 + } 439 + ) 440 + 441 + corrections_path = talents_dir / "speaker_corrections.json" 442 + if corrections_path.is_file(): 443 + try: 444 + raw = corrections_path.read_bytes() 445 + if source_id_bytes in raw: 446 + data = json.loads(raw) 447 + changed = False 448 + for correction in data.get("corrections", []): 449 + if correction.get("original_speaker") == source_id: 450 + correction["original_speaker"] = target_id 451 + changed = True 452 + if correction.get("corrected_speaker") == source_id: 453 + correction["corrected_speaker"] = target_id 454 + changed = True 455 + if changed: 456 + corrections_rewritten += 1 457 + operations.append( 458 + { 459 + "kind": "speaker_corrections", 460 + "path": corrections_path, 461 + "data": data, 462 + } 463 + ) 464 + except Exception as exc: 465 + errors.append( 466 + { 467 + "kind": "speaker_corrections", 468 + "path": str(corrections_path), 469 + "message": str(exc), 470 + } 471 + ) 472 + 473 + return { 474 + "operations": operations, 475 + "section": _segment_section( 476 + labels_rewritten=labels_rewritten, 477 + corrections_rewritten=corrections_rewritten, 478 + files_scanned=files_scanned, 479 + errors=errors, 480 + ), 481 + } 482 + 483 + 484 + def _segment_day_dirs() -> list[Path]: 485 + chronicle_days = [Path(path) for _, path in sorted(day_dirs().items())] 486 + journal = Path(get_journal()) 487 + flat_days = sorted( 488 + entry 489 + for entry in journal.iterdir() 490 + if entry.is_dir() and entry.name.isdigit() and len(entry.name) == 8 491 + ) 492 + return chronicle_days or flat_days 493 + 494 + 495 + def _check_aka_cross_references( 496 + source_id: str, source_display: str, target_id: str 497 + ) -> list[str]: 498 + offenders: list[str] = [] 499 + for entity_id in scan_journal_entities(): 500 + if entity_id in {source_id, target_id}: 501 + continue 502 + entity = load_journal_entity(entity_id) 503 + if not entity: 504 + continue 505 + aka_values = entity.get("aka", []) 506 + if not isinstance(aka_values, list): 507 + continue 508 + if source_id in aka_values or source_display in aka_values: 509 + offenders.append(entity_id) 510 + offenders.sort() 511 + return offenders 512 + 513 + 514 + def _apply_facet_plan(operations: list[dict[str, Any]], target_id: str) -> None: 515 + for operation in operations: 516 + if operation["kind"] == "move": 517 + source_rel_dir = operation["source_rel_dir"] 518 + target_rel_dir = operation["target_rel_dir"] 519 + if target_rel_dir.exists(): 520 + shutil.rmtree(target_rel_dir) 521 + source_rel_dir.rename(target_rel_dir) 522 + moved_path = target_rel_dir / "entity.json" 523 + try: 524 + with open(moved_path, encoding="utf-8") as handle: 525 + rel_data = json.load(handle) 526 + rel_data["entity_id"] = target_id 527 + save_facet_relationship(operation["facet"], target_id, rel_data) 528 + except (json.JSONDecodeError, OSError): 529 + pass 530 + continue 531 + 532 + save_facet_relationship( 533 + operation["facet"], target_id, operation["relationship"] 534 + ) 535 + save_observations(operation["facet"], target_id, operation["observations"]) 536 + shutil.rmtree(operation["source_rel_dir"]) 537 + 538 + 539 + def _apply_segment_plan(operations: list[dict[str, Any]]) -> None: 540 + for operation in operations: 541 + out_path = operation["path"] 542 + tmp_path = out_path.with_suffix(".tmp") 543 + with open(tmp_path, "w", encoding="utf-8") as handle: 544 + json.dump(operation["data"], handle, indent=2) 545 + tmp_path.rename(out_path) 546 + 547 + 548 + def _clear_merge_caches() -> list[str]: 549 + clear_journal_entity_cache() 550 + clear_relationship_caches() 551 + clear_observation_cache() 552 + clear_entity_loading_cache() 553 + return [ 554 + "journal_entity_cache", 555 + "relationship_caches", 556 + "observation_cache", 557 + "entity_loading_cache", 558 + ] 559 + 560 + 561 + def _audit_counts(result: dict[str, Any]) -> dict[str, Any]: 562 + return { 563 + "identity": { 564 + "akas_added": result["identity"]["akas_added_count"], 565 + "emails_added": result["identity"]["emails_added_count"], 566 + "principal_transferred": result["identity"]["principal_transferred"], 567 + }, 568 + "voiceprints": { 569 + "added": result["voiceprints"]["added"], 570 + "skipped_duplicate": result["voiceprints"]["skipped_duplicate"], 571 + "target_total": result["voiceprints"]["target_total"], 572 + }, 573 + "facets": { 574 + "moved": result["facets"]["moved_count"], 575 + "merged": result["facets"]["merged_count"], 576 + "observations_appended": result["facets"]["observations_appended"], 577 + }, 578 + "segments": { 579 + "labels_rewritten": result["segments"]["labels_rewritten"], 580 + "corrections_rewritten": result["segments"]["corrections_rewritten"], 581 + "files_scanned": result["segments"]["files_scanned"], 582 + "errors": len(result["segments"]["errors"]), 583 + }, 584 + } 585 + 586 + 587 + def _append_audit_log( 588 + *, 589 + source_id: str, 590 + source_display_name: str, 591 + target_id: str, 592 + target_display_name: str, 593 + result: dict[str, Any], 594 + caller: str, 595 + ) -> str: 596 + logs_dir = Path(get_journal()) / "logs" 597 + logs_dir.mkdir(parents=True, exist_ok=True) 598 + audit_path = logs_dir / "entity-merges.jsonl" 599 + payload = { 600 + "ts": now_ms(), 601 + "source_id": source_id, 602 + "source_display_name": source_display_name, 603 + "target_id": target_id, 604 + "target_display_name": target_display_name, 605 + "principal_transferred": result["identity"]["principal_transferred"], 606 + "counts": _audit_counts(result), 607 + "caller": caller, 608 + } 609 + with open(audit_path, "a", encoding="utf-8") as handle: 610 + handle.write(json.dumps(payload, ensure_ascii=False) + "\n") 611 + return str(audit_path) 612 + 613 + 614 + def merge_entity( 615 + source_id: str, 616 + target_id: str, 617 + *, 618 + keep_source_as_aka: bool = True, 619 + commit: bool = False, 620 + caller: str = "entities.merge", 621 + ) -> dict[str, Any]: 622 + if source_id == target_id: 623 + return {"error": "Source and target must be different entities."} 624 + 625 + source_entity = load_journal_entity(source_id) 626 + if not source_entity: 627 + return {"error": f"Source entity not found: {source_id}"} 628 + 629 + target_entity = load_journal_entity(target_id) 630 + if not target_entity: 631 + return {"error": f"Target entity not found: {target_id}"} 632 + 633 + if source_entity.get("blocked"): 634 + return {"error": f"Cannot merge blocked entity: {source_id}"} 635 + if target_entity.get("blocked"): 636 + return {"error": f"Cannot merge blocked entity: {target_id}"} 637 + if source_entity.get("is_principal") and target_entity.get("is_principal"): 638 + return {"error": "Cannot merge two principal entities."} 639 + 640 + source_display = str(source_entity.get("name", source_id)) 641 + target_display = str(target_entity.get("name", target_id)) 642 + 643 + offenders = _check_aka_cross_references(source_id, source_display, target_id) 644 + if offenders: 645 + offender_str = ", ".join(offenders) 646 + return { 647 + "error": f"Cannot merge '{source_id}': referenced in aka lists of entity ids: {offender_str}" 648 + } 649 + 650 + planned_target, identity_plan = _plan_identity_merge( 651 + source_entity, 652 + target_entity, 653 + keep_source_as_aka=keep_source_as_aka, 654 + ) 655 + resume_source = _plan_resume_marker( 656 + source_entity, 657 + target_id, 658 + principal_transferred=identity_plan["principal_transferred"], 659 + ) 660 + voiceprint_plan = _plan_voiceprint_merge(source_id, target_id) 661 + facet_plan = _plan_facet_merge(source_id, target_id) 662 + segment_plan = _plan_segment_rewrites(source_id, target_id) 663 + 664 + zero = _empty_result_section() 665 + result: dict[str, Any] = { 666 + "merged": commit, 667 + "source_id": source_id, 668 + "target_id": target_id, 669 + "identity": identity_plan if commit else zero["identity"], 670 + "voiceprints": voiceprint_plan["section"] if commit else zero["voiceprints"], 671 + "facets": facet_plan["section"] if commit else zero["facets"], 672 + "segments": segment_plan["section"] if commit else zero["segments"], 673 + "caches_cleared": [], 674 + "audit_log_path": None, 675 + "would_identity": None if commit else identity_plan, 676 + "would_voiceprints": None if commit else voiceprint_plan["section"], 677 + "would_facets": None if commit else facet_plan["section"], 678 + "would_segments": None if commit else segment_plan["section"], 679 + } 680 + 681 + if not commit: 682 + return result 683 + 684 + try: 685 + save_journal_entity(resume_source) 686 + save_journal_entity(planned_target) 687 + 688 + if voiceprint_plan["items"]: 689 + save_voiceprints_batch(target_id, voiceprint_plan["items"]) 690 + 691 + _apply_facet_plan(facet_plan["operations"], target_id) 692 + _apply_segment_plan(segment_plan["operations"]) 693 + 694 + discovery_cache = Path(get_journal()) / "awareness" / "discovery_clusters.json" 695 + caches_cleared = _clear_merge_caches() 696 + if discovery_cache.exists(): 697 + discovery_cache.unlink() 698 + caches_cleared.append("discovery_clusters") 699 + 700 + source_entity_dir = Path(get_journal()) / "entities" / source_id 701 + if source_entity_dir.exists(): 702 + shutil.rmtree(source_entity_dir) 703 + 704 + result["caches_cleared"] = caches_cleared 705 + 706 + try: 707 + result["audit_log_path"] = _append_audit_log( 708 + source_id=source_id, 709 + source_display_name=source_display, 710 + target_id=target_id, 711 + target_display_name=str(planned_target.get("name", target_display)), 712 + result=result, 713 + caller=caller, 714 + ) 715 + except OSError as exc: 716 + result["segments"]["errors"].append( 717 + { 718 + "kind": "audit_log", 719 + "path": str(Path(get_journal()) / "logs" / "entity-merges.jsonl"), 720 + "message": str(exc), 721 + } 722 + ) 723 + 724 + return result 725 + except Exception as exc: 726 + return {"error": str(exc)}
+188
think/entities/voiceprints.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Shared voiceprint helpers for entity-aware speaker workflows.""" 5 + 6 + from __future__ import annotations 7 + 8 + import fcntl 9 + import json 10 + import logging 11 + from pathlib import Path 12 + 13 + import numpy as np 14 + 15 + from think.entities.journal import ( 16 + ensure_journal_entity_memory, 17 + journal_entity_memory_path, 18 + ) 19 + 20 + logger = logging.getLogger(__name__) 21 + 22 + 23 + def normalize_embedding(emb: np.ndarray) -> np.ndarray | None: 24 + """L2-normalize an embedding vector. Returns None if norm is zero.""" 25 + emb = emb.astype(np.float32) 26 + norm = np.linalg.norm(emb) 27 + if norm > 0: 28 + return emb / norm 29 + return None 30 + 31 + 32 + def load_entity_voiceprints_file( 33 + entity_id: str, 34 + ) -> tuple[np.ndarray, list[dict]] | None: 35 + """Load an entity's voiceprints.npz, returning embeddings and parsed metadata.""" 36 + try: 37 + folder = journal_entity_memory_path(entity_id) 38 + except (RuntimeError, ValueError): 39 + return None 40 + 41 + npz_path = folder / "voiceprints.npz" 42 + if not npz_path.exists(): 43 + return None 44 + 45 + try: 46 + with np.load(npz_path, allow_pickle=False) as data: 47 + embeddings = data.get("embeddings") 48 + metadata_arr = data.get("metadata") 49 + if embeddings is None or metadata_arr is None: 50 + return None 51 + metadata_list = [json.loads(m) for m in metadata_arr] 52 + return embeddings, metadata_list 53 + except Exception as exc: 54 + logger.warning("Failed to load voiceprints for entity %s: %s", entity_id, exc) 55 + return None 56 + 57 + 58 + def load_existing_voiceprint_keys(entity_id: str) -> set[tuple]: 59 + """Return saved voiceprint identity keys for idempotency checks.""" 60 + result = load_entity_voiceprints_file(entity_id) 61 + if result is None: 62 + return set() 63 + 64 + _, metadata_list = result 65 + return { 66 + (m.get("day"), m.get("segment_key"), m.get("source"), m.get("sentence_id")) 67 + for m in metadata_list 68 + } 69 + 70 + 71 + def save_voiceprints_batch( 72 + entity_id: str, 73 + new_items: list[tuple[np.ndarray, dict]], 74 + ) -> int: 75 + """Append a batch of normalized voiceprints to an entity in one write.""" 76 + if not new_items: 77 + return 0 78 + 79 + folder = ensure_journal_entity_memory(entity_id) 80 + npz_path = folder / "voiceprints.npz" 81 + 82 + if npz_path.exists(): 83 + try: 84 + with np.load(npz_path, allow_pickle=False) as data: 85 + existing_emb = data["embeddings"] 86 + existing_meta_strings = data["metadata"] 87 + existing_meta_dicts = [json.loads(m) for m in existing_meta_strings] 88 + except (FileNotFoundError, ValueError, np.lib.npyio.NpzFile) as exc: 89 + logger.warning( 90 + "Failed to load existing voiceprints for %s from %s: %s. Starting fresh.", 91 + entity_id, 92 + npz_path, 93 + exc, 94 + ) 95 + existing_emb = np.empty((0, 256), dtype=np.float32) 96 + existing_meta_dicts = [] 97 + except Exception: 98 + logger.exception( 99 + "Unexpected error loading existing voiceprints for %s from %s", 100 + entity_id, 101 + npz_path, 102 + ) 103 + raise 104 + else: 105 + existing_emb = np.empty((0, 256), dtype=np.float32) 106 + existing_meta_dicts = [] 107 + 108 + new_emb_list = [emb.reshape(1, -1).astype(np.float32) for emb, _ in new_items] 109 + new_meta_dicts = [meta_dict for _, meta_dict in new_items] 110 + 111 + if new_emb_list: 112 + new_emb_np = np.vstack(new_emb_list) 113 + combined_emb = ( 114 + np.vstack([existing_emb, new_emb_np]) 115 + if len(existing_emb) > 0 116 + else new_emb_np 117 + ) 118 + combined_meta_dicts = existing_meta_dicts + new_meta_dicts 119 + else: 120 + combined_emb = existing_emb 121 + combined_meta_dicts = existing_meta_dicts 122 + 123 + save_voiceprints_safely( 124 + npz_path=npz_path, 125 + embeddings=combined_emb, 126 + metadata=combined_meta_dicts, 127 + ) 128 + return len(new_items) 129 + 130 + 131 + def voiceprint_file_path(entity_id: str) -> Path: 132 + """Return the canonical voiceprints.npz path for an entity.""" 133 + return ensure_journal_entity_memory(entity_id) / "voiceprints.npz" 134 + 135 + 136 + def save_voiceprints_safely( 137 + npz_path: Path, 138 + embeddings: np.ndarray, 139 + metadata: list[dict], 140 + ) -> None: 141 + """Safely save a voiceprint NPZ with file locking and integrity check.""" 142 + lock_path = npz_path.with_suffix(".lock") 143 + tmp_path = npz_path.with_name(npz_path.stem + ".tmp.npz") 144 + 145 + npz_path.parent.mkdir(parents=True, exist_ok=True) 146 + 147 + try: 148 + with open(lock_path, "w", encoding="utf-8") as lock_file: 149 + try: 150 + fcntl.flock(lock_file, fcntl.LOCK_EX) 151 + np.savez_compressed( 152 + tmp_path, 153 + embeddings=embeddings, 154 + metadata=metadata, 155 + ) 156 + if not tmp_path.exists(): 157 + raise FileNotFoundError( 158 + f"Temporary voiceprint file not found: {tmp_path}" 159 + ) 160 + tmp_path.rename(npz_path) 161 + 162 + with np.load(npz_path, allow_pickle=False) as data: 163 + if "embeddings" not in data or "metadata" not in data: 164 + raise ValueError( 165 + "Missing 'embeddings' or 'metadata' keys in loaded NPZ." 166 + ) 167 + logger.info( 168 + "Successfully wrote and verified voiceprint file: %s", 169 + npz_path, 170 + ) 171 + except Exception: 172 + if tmp_path.exists(): 173 + try: 174 + tmp_path.unlink() 175 + except OSError: 176 + logger.exception( 177 + "Failed to clean up temporary voiceprint file %s", 178 + tmp_path, 179 + ) 180 + raise 181 + finally: 182 + try: 183 + fcntl.flock(lock_file, fcntl.LOCK_UN) 184 + except OSError: 185 + logger.exception("Failed to release lock on %s", lock_path) 186 + except OSError: 187 + logger.exception("Failed to acquire or manage lock file %s", lock_path) 188 + raise
+13 -38
think/merge.py
··· 18 18 save_journal_entity, 19 19 ) 20 20 from think.entities.matching import find_matching_entity 21 + from think.entities.merge import _dedupe_akas, _dedupe_emails, _dedupe_observations 21 22 from think.entities.observations import save_observations 22 23 from think.entities.relationships import save_facet_relationship 23 24 from think.utils import CHRONICLE_DIR, iter_segments ··· 268 269 target_entity = dict(target_entities.get(target_id, match)) 269 270 pre_merge_snapshot = dict(target_entity) 270 271 271 - aka_by_lower: dict[str, str] = {} 272 - for values in (target_entity.get("aka", []), source_entity.get("aka", [])): 273 - if not isinstance(values, list): 274 - continue 275 - for value in values: 276 - if not value: 277 - continue 278 - key = str(value).lower() 279 - if key not in aka_by_lower: 280 - aka_by_lower[key] = str(value) 281 - if aka_by_lower: 282 - target_entity["aka"] = sorted(aka_by_lower.values(), key=str.lower) 272 + merged_akas = _dedupe_akas( 273 + target_entity.get("aka", []), 274 + source_entity.get("aka", []), 275 + ) 276 + if merged_akas: 277 + target_entity["aka"] = merged_akas 283 278 284 - merged_emails: list[str] = [] 285 - seen_emails: set[str] = set() 286 - for values in ( 279 + merged_emails = _dedupe_emails( 287 280 target_entity.get("emails", []), 288 281 source_entity.get("emails", []), 289 - ): 290 - if not isinstance(values, list): 291 - continue 292 - for value in values: 293 - if not value: 294 - continue 295 - email = str(value) 296 - key = email.lower() 297 - if key in seen_emails: 298 - continue 299 - seen_emails.add(key) 300 - merged_emails.append(email) 282 + ) 301 283 if merged_emails: 302 284 target_entity["emails"] = merged_emails 303 285 ··· 424 406 target_observations = _read_jsonl( 425 407 target_entity_dir / "observations.jsonl" 426 408 ) 427 - seen = { 428 - (item.get("content", ""), item.get("observed_at")) 429 - for item in target_observations 430 - } 431 - merged_observations = list(target_observations) 432 - for item in source_observations: 433 - key = (item.get("content", ""), item.get("observed_at")) 434 - if key in seen: 435 - continue 436 - seen.add(key) 437 - merged_observations.append(item) 409 + merged_observations = _dedupe_observations( 410 + source_observations, 411 + target_observations, 412 + ) 438 413 439 414 if not dry_run: 440 415 save_facet_relationship(