personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'hopper-hjqopyvy-journal-merge-scope'

+1030
+550
tests/test_journal_merge.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for journal merge command.""" 5 + 6 + import json 7 + import shutil 8 + from pathlib import Path 9 + 10 + import pytest 11 + from typer.testing import CliRunner 12 + 13 + from think.call import call_app 14 + 15 + runner = CliRunner() 16 + 17 + 18 + def _write_json(path: Path, payload: dict) -> None: 19 + path.parent.mkdir(parents=True, exist_ok=True) 20 + path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") 21 + 22 + 23 + def _write_jsonl(path: Path, items: list[dict]) -> None: 24 + path.parent.mkdir(parents=True, exist_ok=True) 25 + path.write_text( 26 + "".join(json.dumps(item) + "\n" for item in items), 27 + encoding="utf-8", 28 + ) 29 + 30 + 31 + def _read_json(path: Path) -> dict: 32 + return json.loads(path.read_text(encoding="utf-8")) 33 + 34 + 35 + def _read_jsonl(path: Path) -> list[dict]: 36 + return [ 37 + json.loads(line) 38 + for line in path.read_text(encoding="utf-8").splitlines() 39 + if line.strip() 40 + ] 41 + 42 + 43 + def _mock_indexer(monkeypatch): 44 + import think.tools.call as call_module 45 + 46 + calls = [] 47 + 48 + def _run(*args, **kwargs): 49 + calls.append((args, kwargs)) 50 + return None 51 + 52 + monkeypatch.setattr(call_module.subprocess, "run", _run) 53 + return calls 54 + 55 + 56 + @pytest.fixture 57 + def merge_journals_fixture(tmp_path, monkeypatch): 58 + target = tmp_path / "target" 59 + source = tmp_path / "source" 60 + 61 + (source / "20260101" / "143022_300").mkdir(parents=True) 62 + (source / "20260101" / "143022_300" / "audio.jsonl").write_text( 63 + '{"audio": "source-segment"}\n', 64 + encoding="utf-8", 65 + ) 66 + (source / "20260101" / "120000_60").mkdir(parents=True) 67 + (source / "20260101" / "120000_60" / "audio.jsonl").write_text( 68 + '{"audio": "source-existing-segment"}\n', 69 + encoding="utf-8", 70 + ) 71 + 72 + (target / "20260101" / "120000_60").mkdir(parents=True) 73 + (target / "20260101" / "120000_60" / "audio.jsonl").write_text( 74 + '{"audio": "target-existing-segment"}\n', 75 + encoding="utf-8", 76 + ) 77 + 78 + _write_json( 79 + source / "entities" / "alice_johnson" / "entity.json", 80 + { 81 + "id": "alice_johnson", 82 + "name": "Alice Johnson", 83 + "type": "person", 84 + "aka": ["Ali"], 85 + "emails": ["alice@example.com"], 86 + "is_principal": False, 87 + "created_at": 1000, 88 + }, 89 + ) 90 + _write_json( 91 + target / "entities" / "alice_johnson" / "entity.json", 92 + { 93 + "id": "alice_johnson", 94 + "name": "Alice Johnson", 95 + "type": "person", 96 + "aka": ["AJ"], 97 + "emails": ["aj@work.com"], 98 + "is_principal": False, 99 + "created_at": 500, 100 + }, 101 + ) 102 + 103 + _write_json( 104 + source / "facets" / "work" / "facet.json", 105 + {"title": "Work"}, 106 + ) 107 + 108 + (source / "sol").mkdir(parents=True) 109 + (source / "sol" / "self.md").write_text("source sol\n", encoding="utf-8") 110 + (source / "config").mkdir(parents=True) 111 + (source / "config" / "source-only.json").write_text("{}", encoding="utf-8") 112 + 113 + (source / "imports" / "20260101_120000").mkdir(parents=True) 114 + (source / "imports" / "20260101_120000" / "manifest.json").write_text( 115 + '{"manifest": "source"}\n', 116 + encoding="utf-8", 117 + ) 118 + 119 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(target)) 120 + import think.utils 121 + 122 + think.utils._journal_path_cache = None 123 + yield {"target": target, "source": source} 124 + think.utils._journal_path_cache = None 125 + 126 + 127 + def test_segment_copy(merge_journals_fixture, monkeypatch): 128 + paths = merge_journals_fixture 129 + _mock_indexer(monkeypatch) 130 + 131 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 132 + 133 + assert result.exit_code == 0 134 + assert (paths["target"] / "20260101" / "143022_300" / "audio.jsonl").exists() 135 + 136 + 137 + def test_segment_skip(merge_journals_fixture, monkeypatch): 138 + paths = merge_journals_fixture 139 + _mock_indexer(monkeypatch) 140 + 141 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 142 + 143 + assert result.exit_code == 0 144 + assert (paths["target"] / "20260101" / "120000_60" / "audio.jsonl").read_text( 145 + encoding="utf-8" 146 + ) == '{"audio": "target-existing-segment"}\n' 147 + 148 + 149 + def test_entity_create(merge_journals_fixture, monkeypatch): 150 + paths = merge_journals_fixture 151 + _mock_indexer(monkeypatch) 152 + _write_json( 153 + paths["source"] / "entities" / "bob_smith" / "entity.json", 154 + { 155 + "id": "bob_smith", 156 + "name": "Bob Smith", 157 + "type": "person", 158 + "created_at": 2000, 159 + }, 160 + ) 161 + 162 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 163 + 164 + assert result.exit_code == 0 165 + merged = _read_json(paths["target"] / "entities" / "bob_smith" / "entity.json") 166 + assert merged["name"] == "Bob Smith" 167 + 168 + 169 + def test_entity_merge_aka_emails(merge_journals_fixture, monkeypatch): 170 + paths = merge_journals_fixture 171 + _mock_indexer(monkeypatch) 172 + 173 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 174 + 175 + assert result.exit_code == 0 176 + merged = _read_json(paths["target"] / "entities" / "alice_johnson" / "entity.json") 177 + assert merged["name"] == "Alice Johnson" 178 + assert merged["type"] == "person" 179 + assert merged["aka"] == ["AJ", "Ali"] 180 + assert merged["emails"] == ["aj@work.com", "alice@example.com"] 181 + 182 + 183 + def test_entity_principal_dedup(merge_journals_fixture, monkeypatch): 184 + paths = merge_journals_fixture 185 + _mock_indexer(monkeypatch) 186 + _write_json( 187 + paths["target"] / "entities" / "jer" / "entity.json", 188 + { 189 + "id": "jer", 190 + "name": "Jer", 191 + "type": "person", 192 + "is_principal": True, 193 + "created_at": 1, 194 + }, 195 + ) 196 + _write_json( 197 + paths["source"] / "entities" / "principal_person" / "entity.json", 198 + { 199 + "id": "principal_person", 200 + "name": "Principal Person", 201 + "type": "person", 202 + "is_principal": True, 203 + "created_at": 2, 204 + }, 205 + ) 206 + 207 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 208 + 209 + assert result.exit_code == 0 210 + merged = _read_json( 211 + paths["target"] / "entities" / "principal_person" / "entity.json" 212 + ) 213 + assert merged["is_principal"] is False 214 + 215 + 216 + def test_entity_id_collision(merge_journals_fixture, monkeypatch): 217 + paths = merge_journals_fixture 218 + _mock_indexer(monkeypatch) 219 + _write_json( 220 + paths["source"] / "entities" / "alice_johnson" / "entity.json", 221 + { 222 + "id": "alice_johnson", 223 + "name": "Alice Cooper", 224 + "type": "person", 225 + "created_at": 3000, 226 + }, 227 + ) 228 + 229 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 230 + 231 + assert result.exit_code == 0 232 + merged = _read_json( 233 + paths["target"] / "entities" / "alice_johnson_2" / "entity.json" 234 + ) 235 + assert merged["id"] == "alice_johnson_2" 236 + assert merged["name"] == "Alice Cooper" 237 + 238 + 239 + def test_facet_copy_new(merge_journals_fixture, monkeypatch): 240 + paths = merge_journals_fixture 241 + _mock_indexer(monkeypatch) 242 + (paths["source"] / "facets" / "work" / "logs").mkdir(parents=True) 243 + (paths["source"] / "facets" / "work" / "logs" / "20260101.jsonl").write_text( 244 + '{"log": "copied"}\n', 245 + encoding="utf-8", 246 + ) 247 + 248 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 249 + 250 + assert result.exit_code == 0 251 + assert (paths["target"] / "facets" / "work" / "facet.json").exists() 252 + assert (paths["target"] / "facets" / "work" / "logs" / "20260101.jsonl").exists() 253 + 254 + 255 + def test_facet_merge_overlapping(merge_journals_fixture, monkeypatch): 256 + paths = merge_journals_fixture 257 + _mock_indexer(monkeypatch) 258 + 259 + _write_json(paths["target"] / "facets" / "work" / "facet.json", {"title": "Work"}) 260 + _write_json( 261 + paths["source"] 262 + / "facets" 263 + / "work" 264 + / "entities" 265 + / "alice_johnson" 266 + / "entity.json", 267 + { 268 + "entity_id": "alice_johnson", 269 + "description": "Source description", 270 + "source_only": "keep-me", 271 + }, 272 + ) 273 + _write_json( 274 + paths["target"] 275 + / "facets" 276 + / "work" 277 + / "entities" 278 + / "alice_johnson" 279 + / "entity.json", 280 + { 281 + "entity_id": "alice_johnson", 282 + "description": "Target description", 283 + "target_only": "wins", 284 + }, 285 + ) 286 + _write_jsonl( 287 + paths["source"] 288 + / "facets" 289 + / "work" 290 + / "entities" 291 + / "alice_johnson" 292 + / "observations.jsonl", 293 + [ 294 + {"content": "Shared fact", "observed_at": 100}, 295 + {"content": "Source fact", "observed_at": 200}, 296 + ], 297 + ) 298 + _write_jsonl( 299 + paths["target"] 300 + / "facets" 301 + / "work" 302 + / "entities" 303 + / "alice_johnson" 304 + / "observations.jsonl", 305 + [ 306 + {"content": "Shared fact", "observed_at": 100}, 307 + {"content": "Target fact", "observed_at": 300}, 308 + ], 309 + ) 310 + _write_jsonl( 311 + paths["source"] / "facets" / "work" / "todos" / "20260101.jsonl", 312 + [ 313 + {"text": "Duplicate todo", "created_at": 10}, 314 + {"text": "Source todo", "created_at": 11}, 315 + ], 316 + ) 317 + _write_jsonl( 318 + paths["target"] / "facets" / "work" / "todos" / "20260101.jsonl", 319 + [ 320 + {"text": "Duplicate todo", "created_at": 10}, 321 + {"text": "Target todo", "created_at": 12}, 322 + ], 323 + ) 324 + _write_jsonl( 325 + paths["source"] / "facets" / "work" / "calendar" / "20260101.jsonl", 326 + [ 327 + {"title": "Duplicate event", "start": "09:00"}, 328 + {"title": "Source event", "start": "10:00"}, 329 + ], 330 + ) 331 + _write_jsonl( 332 + paths["target"] / "facets" / "work" / "calendar" / "20260101.jsonl", 333 + [ 334 + {"title": "Duplicate event", "start": "09:00"}, 335 + {"title": "Target event", "start": "11:00"}, 336 + ], 337 + ) 338 + (paths["source"] / "facets" / "work" / "news").mkdir(parents=True) 339 + (paths["target"] / "facets" / "work" / "news").mkdir(parents=True) 340 + (paths["source"] / "facets" / "work" / "news" / "20260101.md").write_text( 341 + "source duplicate\n", 342 + encoding="utf-8", 343 + ) 344 + (paths["source"] / "facets" / "work" / "news" / "20260102.md").write_text( 345 + "source new\n", 346 + encoding="utf-8", 347 + ) 348 + (paths["target"] / "facets" / "work" / "news" / "20260101.md").write_text( 349 + "target duplicate\n", 350 + encoding="utf-8", 351 + ) 352 + (paths["source"] / "facets" / "work" / "activities").mkdir(parents=True) 353 + (paths["source"] / "facets" / "work" / "activities" / "skip.txt").write_text( 354 + "skip\n", 355 + encoding="utf-8", 356 + ) 357 + (paths["source"] / "facets" / "work" / "logs").mkdir(parents=True) 358 + (paths["source"] / "facets" / "work" / "logs" / "20260101.jsonl").write_text( 359 + '{"log": "skip"}\n', 360 + encoding="utf-8", 361 + ) 362 + (paths["source"] / "facets" / "work" / "entities.jsonl").write_text( 363 + '{"skip": true}\n', 364 + encoding="utf-8", 365 + ) 366 + 367 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 368 + 369 + assert result.exit_code == 0 370 + relationship = _read_json( 371 + paths["target"] 372 + / "facets" 373 + / "work" 374 + / "entities" 375 + / "alice_johnson" 376 + / "entity.json" 377 + ) 378 + assert relationship["description"] == "Target description" 379 + assert relationship["target_only"] == "wins" 380 + assert relationship["source_only"] == "keep-me" 381 + 382 + observations = _read_jsonl( 383 + paths["target"] 384 + / "facets" 385 + / "work" 386 + / "entities" 387 + / "alice_johnson" 388 + / "observations.jsonl" 389 + ) 390 + assert len(observations) == 3 391 + assert {item["content"] for item in observations} == { 392 + "Shared fact", 393 + "Source fact", 394 + "Target fact", 395 + } 396 + 397 + todos = _read_jsonl( 398 + paths["target"] / "facets" / "work" / "todos" / "20260101.jsonl" 399 + ) 400 + assert {item["text"] for item in todos} == { 401 + "Duplicate todo", 402 + "Source todo", 403 + "Target todo", 404 + } 405 + 406 + events = _read_jsonl( 407 + paths["target"] / "facets" / "work" / "calendar" / "20260101.jsonl" 408 + ) 409 + assert {(item["title"], item["start"]) for item in events} == { 410 + ("Duplicate event", "09:00"), 411 + ("Source event", "10:00"), 412 + ("Target event", "11:00"), 413 + } 414 + 415 + assert (paths["target"] / "facets" / "work" / "news" / "20260102.md").read_text( 416 + encoding="utf-8" 417 + ) == "source new\n" 418 + assert (paths["target"] / "facets" / "work" / "news" / "20260101.md").read_text( 419 + encoding="utf-8" 420 + ) == "target duplicate\n" 421 + assert not (paths["target"] / "facets" / "work" / "activities").exists() 422 + assert not (paths["target"] / "facets" / "work" / "logs").exists() 423 + assert not (paths["target"] / "facets" / "work" / "entities.jsonl").exists() 424 + 425 + 426 + def test_import_copy(merge_journals_fixture, monkeypatch): 427 + paths = merge_journals_fixture 428 + _mock_indexer(monkeypatch) 429 + 430 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 431 + 432 + assert result.exit_code == 0 433 + assert (paths["target"] / "imports" / "20260101_120000" / "manifest.json").exists() 434 + 435 + 436 + def test_import_skip(merge_journals_fixture, monkeypatch): 437 + paths = merge_journals_fixture 438 + _mock_indexer(monkeypatch) 439 + (paths["target"] / "imports" / "20260101_120000").mkdir(parents=True) 440 + (paths["target"] / "imports" / "20260101_120000" / "manifest.json").write_text( 441 + '{"manifest": "target"}\n', 442 + encoding="utf-8", 443 + ) 444 + 445 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 446 + 447 + assert result.exit_code == 0 448 + assert ( 449 + paths["target"] / "imports" / "20260101_120000" / "manifest.json" 450 + ).read_text(encoding="utf-8") == '{"manifest": "target"}\n' 451 + 452 + 453 + def test_source_sol_skipped(merge_journals_fixture, monkeypatch): 454 + paths = merge_journals_fixture 455 + _mock_indexer(monkeypatch) 456 + 457 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 458 + 459 + assert result.exit_code == 0 460 + assert not (paths["target"] / "sol" / "self.md").exists() 461 + 462 + 463 + def test_source_config_skipped(merge_journals_fixture, monkeypatch): 464 + paths = merge_journals_fixture 465 + _mock_indexer(monkeypatch) 466 + 467 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 468 + 469 + assert result.exit_code == 0 470 + assert not (paths["target"] / "config" / "source-only.json").exists() 471 + 472 + 473 + def test_dry_run(merge_journals_fixture): 474 + paths = merge_journals_fixture 475 + _write_json( 476 + paths["source"] / "entities" / "bob_smith" / "entity.json", 477 + { 478 + "id": "bob_smith", 479 + "name": "Bob Smith", 480 + "type": "person", 481 + "created_at": 2000, 482 + }, 483 + ) 484 + 485 + result = runner.invoke( 486 + call_app, 487 + ["journal", "merge", str(paths["source"]), "--dry-run"], 488 + ) 489 + 490 + assert result.exit_code == 0 491 + assert "Would merge:" in result.output 492 + assert not (paths["target"] / "20260101" / "143022_300").exists() 493 + assert not (paths["target"] / "entities" / "bob_smith").exists() 494 + assert not (paths["target"] / "facets" / "work").exists() 495 + assert not (paths["target"] / "imports" / "20260101_120000").exists() 496 + 497 + 498 + def test_invalid_source(tmp_path): 499 + result = runner.invoke(call_app, ["journal", "merge", str(tmp_path / "missing")]) 500 + 501 + assert result.exit_code == 1 502 + assert "is not a directory" in result.output 503 + 504 + 505 + def test_invalid_source_no_days(tmp_path): 506 + source = tmp_path / "source" 507 + source.mkdir() 508 + 509 + result = runner.invoke(call_app, ["journal", "merge", str(source)]) 510 + 511 + assert result.exit_code == 1 512 + assert "does not appear to be a journal" in result.output 513 + 514 + 515 + def test_error_resilience(merge_journals_fixture, monkeypatch): 516 + paths = merge_journals_fixture 517 + _mock_indexer(monkeypatch) 518 + _write_json( 519 + paths["source"] / "entities" / "bob_smith" / "entity.json", 520 + { 521 + "id": "bob_smith", 522 + "name": "Bob Smith", 523 + "type": "person", 524 + "created_at": 2000, 525 + }, 526 + ) 527 + (paths["source"] / "20260101" / "150000_60").mkdir(parents=True) 528 + (paths["source"] / "20260101" / "150000_60" / "audio.jsonl").write_text( 529 + '{"audio": "bad"}\n', 530 + encoding="utf-8", 531 + ) 532 + 533 + import think.tools.journal_merge as journal_merge_module 534 + 535 + real_copytree = shutil.copytree 536 + bad_segment = paths["source"] / "20260101" / "150000_60" 537 + 538 + def failing_copytree(src, dst, *args, **kwargs): 539 + if Path(src) == bad_segment: 540 + raise OSError("boom") 541 + return real_copytree(src, dst, *args, **kwargs) 542 + 543 + monkeypatch.setattr(journal_merge_module.shutil, "copytree", failing_copytree) 544 + 545 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 546 + 547 + assert result.exit_code == 0 548 + assert (paths["target"] / "20260101" / "143022_300" / "audio.jsonl").exists() 549 + assert (paths["target"] / "entities" / "bob_smith" / "entity.json").exists() 550 + assert "1 errors:" in result.output
+80
think/tools/call.py
··· 1021 1021 f"{summary.segments_with_raw} with raw media, " 1022 1022 f"{summary.segments_purged} purged" 1023 1023 ) 1024 + 1025 + 1026 + @app.command("merge") 1027 + def journal_merge( 1028 + source: str = typer.Argument( 1029 + help="Path to source journal directory to merge from." 1030 + ), 1031 + dry_run: bool = typer.Option( 1032 + False, 1033 + "--dry-run", 1034 + help="Show what would be merged without making changes.", 1035 + ), 1036 + ) -> None: 1037 + """Merge segments, entities, facets, and imports from a source journal.""" 1038 + from think.tools.journal_merge import merge_journals 1039 + 1040 + source_path = Path(source).resolve() 1041 + 1042 + if not source_path.is_dir(): 1043 + typer.echo(f"Error: '{source}' is not a directory.", err=True) 1044 + raise typer.Exit(1) 1045 + 1046 + import re 1047 + 1048 + has_day = any( 1049 + entry.is_dir() and re.match(r"^\d{8}$", entry.name) 1050 + for entry in source_path.iterdir() 1051 + ) 1052 + if not has_day: 1053 + typer.echo( 1054 + f"Error: '{source}' does not appear to be a journal (no YYYYMMDD directories found).", 1055 + err=True, 1056 + ) 1057 + raise typer.Exit(1) 1058 + 1059 + summary = merge_journals(source_path, dry_run=dry_run) 1060 + 1061 + action = "Would merge" if dry_run else "Merged" 1062 + typer.echo(f"\n{action}:") 1063 + typer.echo( 1064 + f" Segments: {summary.segments_copied} copied, {summary.segments_skipped} skipped, {summary.segments_errored} errored" 1065 + ) 1066 + typer.echo( 1067 + f" Entities: {summary.entities_created} created, {summary.entities_merged} merged, {summary.entities_skipped} skipped" 1068 + ) 1069 + typer.echo( 1070 + f" Facets: {summary.facets_created} created, {summary.facets_merged} merged" 1071 + ) 1072 + typer.echo( 1073 + f" Imports: {summary.imports_copied} copied, {summary.imports_skipped} skipped" 1074 + ) 1075 + 1076 + if summary.errors: 1077 + typer.echo(f"\n{len(summary.errors)} errors:") 1078 + for error in summary.errors: 1079 + typer.echo(f" - {error}") 1080 + 1081 + if not dry_run: 1082 + subprocess.run( 1083 + ["sol", "indexer", "--rescan-full"], 1084 + check=False, 1085 + capture_output=True, 1086 + ) 1087 + 1088 + log_call_action( 1089 + facet=None, 1090 + action="journal_merge", 1091 + params={ 1092 + "source": str(source_path), 1093 + "segments_copied": summary.segments_copied, 1094 + "entities_created": summary.entities_created, 1095 + "entities_merged": summary.entities_merged, 1096 + "facets_created": summary.facets_created, 1097 + "facets_merged": summary.facets_merged, 1098 + "imports_copied": summary.imports_copied, 1099 + "errors": len(summary.errors), 1100 + }, 1101 + ) 1102 + 1103 + typer.echo("Index rebuild started.")
+400
think/tools/journal_merge.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Journal merge engine - one-shot merge of a source journal into the target.""" 5 + 6 + import json 7 + import re 8 + import shutil 9 + from dataclasses import dataclass, field 10 + from pathlib import Path 11 + from typing import Any 12 + 13 + import typer 14 + 15 + from think.entities.core import entity_slug 16 + from think.entities.journal import ( 17 + load_all_journal_entities, 18 + save_journal_entity, 19 + ) 20 + from think.entities.matching import find_matching_entity 21 + from think.entities.observations import save_observations 22 + from think.entities.relationships import save_facet_relationship 23 + from think.utils import get_journal, iter_segments 24 + 25 + DATE_RE = re.compile(r"^\d{8}$") 26 + 27 + 28 + @dataclass 29 + class MergeSummary: 30 + segments_copied: int = 0 31 + segments_skipped: int = 0 32 + segments_errored: int = 0 33 + entities_created: int = 0 34 + entities_merged: int = 0 35 + entities_skipped: int = 0 36 + facets_created: int = 0 37 + facets_merged: int = 0 38 + imports_copied: int = 0 39 + imports_skipped: int = 0 40 + errors: list[str] = field(default_factory=list) 41 + 42 + 43 + def merge_journals(source: Path, dry_run: bool = False) -> MergeSummary: 44 + target = Path(get_journal()) 45 + summary = MergeSummary() 46 + target_entities = load_all_journal_entities() 47 + 48 + _merge_segments(source, target, summary, dry_run) 49 + _merge_entities(source, summary, dry_run, target_entities) 50 + _merge_facets(source, target, summary, dry_run) 51 + _merge_imports(source, target, summary, dry_run) 52 + 53 + return summary 54 + 55 + 56 + def _source_day_dirs(source: Path) -> dict[str, Path]: 57 + days: dict[str, Path] = {} 58 + for entry in sorted(source.iterdir()): 59 + if entry.is_dir() and DATE_RE.match(entry.name): 60 + days[entry.name] = entry 61 + return days 62 + 63 + 64 + def _merge_segments( 65 + source: Path, target: Path, summary: MergeSummary, dry_run: bool 66 + ) -> None: 67 + for day_name, source_day in sorted(_source_day_dirs(source).items()): 68 + target_day = target / day_name 69 + for stream, seg_key, seg_path in iter_segments(source_day): 70 + if stream == "_default": 71 + target_path = target_day / seg_key 72 + else: 73 + target_path = target_day / stream / seg_key 74 + 75 + try: 76 + if target_path.exists(): 77 + summary.segments_skipped += 1 78 + continue 79 + 80 + typer.echo(f" Segment {day_name}/{stream}/{seg_key}", err=True) 81 + if dry_run: 82 + summary.segments_copied += 1 83 + continue 84 + 85 + shutil.copytree(seg_path, target_path, copy_function=shutil.copy2) 86 + summary.segments_copied += 1 87 + except Exception as exc: 88 + summary.segments_errored += 1 89 + summary.errors.append(f"segment {day_name}/{stream}/{seg_key}: {exc}") 90 + 91 + 92 + def _merge_entities( 93 + source: Path, 94 + summary: MergeSummary, 95 + dry_run: bool, 96 + target_entities: dict[str, dict[str, Any]], 97 + ) -> None: 98 + target_has_principal = any( 99 + bool(entity.get("is_principal")) for entity in target_entities.values() 100 + ) 101 + source_entities_dir = source / "entities" 102 + if not source_entities_dir.is_dir(): 103 + return 104 + 105 + for entity_dir in sorted(source_entities_dir.iterdir()): 106 + entity_path = entity_dir / "entity.json" 107 + if not entity_dir.is_dir() or not entity_path.is_file(): 108 + continue 109 + 110 + try: 111 + source_entity = json.loads(entity_path.read_text(encoding="utf-8")) 112 + source_name = str(source_entity.get("name", "")).strip() 113 + if not source_name: 114 + raise ValueError("missing entity name") 115 + 116 + entity_id = str( 117 + source_entity.get("id") or entity_dir.name or entity_slug(source_name) 118 + ) 119 + if not entity_id: 120 + raise ValueError("missing entity id") 121 + source_entity["id"] = entity_id 122 + 123 + typer.echo(f" Entity {entity_id}", err=True) 124 + match = find_matching_entity(source_name, list(target_entities.values())) 125 + if match is None: 126 + base_id = entity_id 127 + next_id = base_id 128 + suffix = 2 129 + while next_id in target_entities: 130 + next_id = f"{base_id}_{suffix}" 131 + suffix += 1 132 + source_entity["id"] = next_id 133 + 134 + if source_entity.get("is_principal") and target_has_principal: 135 + source_entity["is_principal"] = False 136 + elif source_entity.get("is_principal"): 137 + target_has_principal = True 138 + 139 + if not dry_run: 140 + save_journal_entity(source_entity) 141 + summary.entities_created += 1 142 + target_entities[source_entity["id"]] = source_entity 143 + continue 144 + 145 + target_id = str(match.get("id", "")) 146 + if not target_id: 147 + raise ValueError("matched target entity missing id") 148 + 149 + target_entity = dict(target_entities.get(target_id, match)) 150 + 151 + aka_by_lower: dict[str, str] = {} 152 + for values in (target_entity.get("aka", []), source_entity.get("aka", [])): 153 + if not isinstance(values, list): 154 + continue 155 + for value in values: 156 + if not value: 157 + continue 158 + key = str(value).lower() 159 + if key not in aka_by_lower: 160 + aka_by_lower[key] = str(value) 161 + if aka_by_lower: 162 + target_entity["aka"] = sorted(aka_by_lower.values(), key=str.lower) 163 + 164 + merged_emails: list[str] = [] 165 + seen_emails: set[str] = set() 166 + for values in ( 167 + target_entity.get("emails", []), 168 + source_entity.get("emails", []), 169 + ): 170 + if not isinstance(values, list): 171 + continue 172 + for value in values: 173 + if not value: 174 + continue 175 + email = str(value) 176 + key = email.lower() 177 + if key in seen_emails: 178 + continue 179 + seen_emails.add(key) 180 + merged_emails.append(email) 181 + if merged_emails: 182 + target_entity["emails"] = merged_emails 183 + 184 + if not dry_run: 185 + save_journal_entity(target_entity) 186 + summary.entities_merged += 1 187 + target_entities[target_id] = target_entity 188 + except Exception as exc: 189 + summary.errors.append(f"entity {entity_dir.name}: {exc}") 190 + 191 + 192 + def _merge_facets( 193 + source: Path, target: Path, summary: MergeSummary, dry_run: bool 194 + ) -> None: 195 + source_facets_dir = source / "facets" 196 + if not source_facets_dir.is_dir(): 197 + return 198 + 199 + for source_facet_dir in sorted(source_facets_dir.iterdir()): 200 + facet_json = source_facet_dir / "facet.json" 201 + if not source_facet_dir.is_dir() or not facet_json.is_file(): 202 + continue 203 + 204 + facet_name = source_facet_dir.name 205 + target_facet_dir = target / "facets" / facet_name 206 + 207 + try: 208 + typer.echo(f" Facet {facet_name}", err=True) 209 + if not target_facet_dir.exists(): 210 + if not dry_run: 211 + shutil.copytree( 212 + source_facet_dir, 213 + target_facet_dir, 214 + copy_function=shutil.copy2, 215 + ) 216 + summary.facets_created += 1 217 + continue 218 + 219 + _merge_overlapping_facet( 220 + facet_name, 221 + source_facet_dir, 222 + target_facet_dir, 223 + summary, 224 + dry_run, 225 + ) 226 + summary.facets_merged += 1 227 + except Exception as exc: 228 + summary.errors.append(f"facet {facet_name}: {exc}") 229 + 230 + 231 + def _merge_overlapping_facet( 232 + facet_name: str, 233 + source_facet_dir: Path, 234 + target_facet_dir: Path, 235 + summary: MergeSummary, 236 + dry_run: bool, 237 + ) -> None: 238 + source_entities_dir = source_facet_dir / "entities" 239 + if source_entities_dir.is_dir(): 240 + for source_entity_dir in sorted(source_entities_dir.iterdir()): 241 + source_entity_json = source_entity_dir / "entity.json" 242 + if not source_entity_dir.is_dir() or not source_entity_json.is_file(): 243 + continue 244 + 245 + entity_id = source_entity_dir.name 246 + target_entity_dir = target_facet_dir / "entities" / entity_id 247 + try: 248 + if target_entity_dir.exists(): 249 + source_relationship = json.loads( 250 + source_entity_json.read_text(encoding="utf-8") 251 + ) 252 + target_relationship_path = target_entity_dir / "entity.json" 253 + target_relationship: dict[str, Any] = {} 254 + if target_relationship_path.is_file(): 255 + target_relationship = json.loads( 256 + target_relationship_path.read_text(encoding="utf-8") 257 + ) 258 + merged_relationship = {**source_relationship, **target_relationship} 259 + 260 + source_observations = _read_jsonl( 261 + source_entity_dir / "observations.jsonl" 262 + ) 263 + target_observations = _read_jsonl( 264 + target_entity_dir / "observations.jsonl" 265 + ) 266 + seen = { 267 + (item.get("content", ""), item.get("observed_at")) 268 + for item in target_observations 269 + } 270 + merged_observations = list(target_observations) 271 + for item in source_observations: 272 + key = (item.get("content", ""), item.get("observed_at")) 273 + if key in seen: 274 + continue 275 + seen.add(key) 276 + merged_observations.append(item) 277 + 278 + if not dry_run: 279 + save_facet_relationship( 280 + facet_name, entity_id, merged_relationship 281 + ) 282 + save_observations(facet_name, entity_id, merged_observations) 283 + continue 284 + 285 + if not dry_run: 286 + shutil.copytree( 287 + source_entity_dir, 288 + target_entity_dir, 289 + copy_function=shutil.copy2, 290 + ) 291 + except Exception as exc: 292 + summary.errors.append(f"facet {facet_name} entity {entity_id}: {exc}") 293 + 294 + source_todos_dir = source_facet_dir / "todos" 295 + if source_todos_dir.is_dir(): 296 + for source_todo_file in sorted(source_todos_dir.glob("*.jsonl")): 297 + try: 298 + target_todo_file = target_facet_dir / "todos" / source_todo_file.name 299 + target_items = _read_jsonl(target_todo_file) 300 + seen = {(item["text"], item.get("created_at")) for item in target_items} 301 + new_items = [ 302 + item 303 + for item in _read_jsonl(source_todo_file) 304 + if (item["text"], item.get("created_at")) not in seen 305 + ] 306 + if new_items and not dry_run: 307 + _append_jsonl(target_todo_file, new_items) 308 + except Exception as exc: 309 + summary.errors.append( 310 + f"facet {facet_name} todo {source_todo_file.name}: {exc}" 311 + ) 312 + 313 + source_calendar_dir = source_facet_dir / "calendar" 314 + if source_calendar_dir.is_dir(): 315 + for source_calendar_file in sorted(source_calendar_dir.glob("*.jsonl")): 316 + try: 317 + target_calendar_file = ( 318 + target_facet_dir / "calendar" / source_calendar_file.name 319 + ) 320 + target_items = _read_jsonl(target_calendar_file) 321 + seen = {(item["title"], item.get("start")) for item in target_items} 322 + new_items = [ 323 + item 324 + for item in _read_jsonl(source_calendar_file) 325 + if (item["title"], item.get("start")) not in seen 326 + ] 327 + if new_items and not dry_run: 328 + _append_jsonl(target_calendar_file, new_items) 329 + except Exception as exc: 330 + summary.errors.append( 331 + f"facet {facet_name} calendar {source_calendar_file.name}: {exc}" 332 + ) 333 + 334 + source_news_dir = source_facet_dir / "news" 335 + if source_news_dir.is_dir(): 336 + target_news_dir = target_facet_dir / "news" 337 + for source_news_file in sorted(source_news_dir.glob("*.md")): 338 + try: 339 + target_news_file = target_news_dir / source_news_file.name 340 + if target_news_file.exists(): 341 + continue 342 + if not dry_run: 343 + target_news_dir.mkdir(parents=True, exist_ok=True) 344 + shutil.copy2(source_news_file, target_news_file) 345 + except Exception as exc: 346 + summary.errors.append( 347 + f"facet {facet_name} news {source_news_file.name}: {exc}" 348 + ) 349 + 350 + 351 + def _merge_imports( 352 + source: Path, target: Path, summary: MergeSummary, dry_run: bool 353 + ) -> None: 354 + source_imports_dir = source / "imports" 355 + if not source_imports_dir.is_dir(): 356 + return 357 + 358 + for source_import_dir in sorted(source_imports_dir.iterdir()): 359 + if not source_import_dir.is_dir(): 360 + continue 361 + 362 + target_import_dir = target / "imports" / source_import_dir.name 363 + try: 364 + typer.echo(f" Import {source_import_dir.name}", err=True) 365 + if target_import_dir.exists(): 366 + summary.imports_skipped += 1 367 + continue 368 + if not dry_run: 369 + shutil.copytree( 370 + source_import_dir, 371 + target_import_dir, 372 + copy_function=shutil.copy2, 373 + ) 374 + summary.imports_copied += 1 375 + except Exception as exc: 376 + summary.errors.append(f"import {source_import_dir.name}: {exc}") 377 + 378 + 379 + def _read_jsonl(path: Path) -> list[dict[str, Any]]: 380 + if not path.is_file(): 381 + return [] 382 + 383 + items: list[dict[str, Any]] = [] 384 + with open(path, encoding="utf-8") as handle: 385 + for line in handle: 386 + line = line.strip() 387 + if not line: 388 + continue 389 + items.append(json.loads(line)) 390 + return items 391 + 392 + 393 + def _append_jsonl(path: Path, items: list[dict[str, Any]]) -> None: 394 + if not items: 395 + return 396 + 397 + path.parent.mkdir(parents=True, exist_ok=True) 398 + with open(path, "a", encoding="utf-8") as handle: 399 + for item in items: 400 + handle.write(json.dumps(item, ensure_ascii=False) + "\n")