personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

journal: archive importer — propagate merge log/staging paths, summary errors, and segment_errored decisions

- Adds merge_log_path and merge_staging_path to ImportResult; populated from _merge_artifact_paths() and mirrored into processing_results and the importer.file_imported Callosum payload.
- Persists merge summary_errors so the UI can surface them later without re-reading source files.
- Adds staging_path to entity_staged decision rows; emits segment_errored decisions on per-segment failures.

Co-Authored-By: OpenAI Codex <codex@openai.com>

+149 -1
+124 -1
tests/test_journal_archive_importer.py
··· 121 121 assert result.errors == [] 122 122 assert result.merge_summary is not None 123 123 assert result.merge_summary["segments_copied"] == 1 124 + assert result.merge_log_path is not None 125 + assert result.merge_staging_path is not None 124 126 assert (target / "chronicle" / "20260101" / "default" / "090000_300").exists() 125 127 assert (target / "entities" / "source_person" / "entity.json").exists() 126 128 assert (target / "imports" / "20260101_090000" / "manifest.json").exists() ··· 349 351 } 350 352 351 353 354 + def test_journal_archive_importer_logs_segment_errors(tmp_path, monkeypatch): 355 + archive_path = tmp_path / "journal-export.zip" 356 + _write_archive(archive_path, _build_archive_members()) 357 + extract_root = tmp_path / "extracts" 358 + extract_root.mkdir() 359 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 360 + 361 + target = tmp_path / "target" 362 + _reset_journal(monkeypatch, target) 363 + monkeypatch.setattr(journal_archive.subprocess, "Popen", MagicMock()) 364 + 365 + merge_mod = importlib.import_module("think.merge") 366 + original_copytree = merge_mod.shutil.copytree 367 + 368 + def failing_copytree(src, dst, *args, **kwargs): 369 + if Path(src).name == "090000_300": 370 + raise OSError("segment copy failed") 371 + return original_copytree(src, dst, *args, **kwargs) 372 + 373 + monkeypatch.setattr(merge_mod.shutil, "copytree", failing_copytree) 374 + 375 + result = journal_archive.JournalArchiveImporter().process( 376 + archive_path, 377 + target, 378 + import_id="20260426_120000", 379 + ) 380 + 381 + assert result.merge_summary is not None 382 + assert result.merge_summary["segments_errored"] == 1 383 + assert result.merge_log_path is not None 384 + decision_log = Path(result.merge_log_path) 385 + rows = [ 386 + json.loads(line) 387 + for line in decision_log.read_text(encoding="utf-8").splitlines() 388 + if line 389 + ] 390 + assert { 391 + "action": "segment_errored", 392 + "item_type": "segment", 393 + "item_id": "20260101/default/090000_300", 394 + "reason": "segment copy failed", 395 + }.items() <= next( 396 + row.items() for row in rows if row.get("action") == "segment_errored" 397 + ) 398 + 399 + 400 + def test_journal_archive_importer_logs_staged_entity_paths(tmp_path, monkeypatch): 401 + archive_path = tmp_path / "journal-export.zip" 402 + _write_archive( 403 + archive_path, 404 + _build_archive_members( 405 + source_entity_id="shared_person", 406 + source_name="Source Person", 407 + ), 408 + ) 409 + extract_root = tmp_path / "extracts" 410 + extract_root.mkdir() 411 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 412 + 413 + target = tmp_path / "target" 414 + _reset_journal(monkeypatch, target) 415 + monkeypatch.setattr(journal_archive.subprocess, "Popen", MagicMock()) 416 + _write_json( 417 + target / "entities" / "shared_person" / "entity.json", 418 + { 419 + "id": "shared_person", 420 + "name": "Target Person", 421 + "type": "person", 422 + "created_at": 1, 423 + }, 424 + ) 425 + 426 + result = journal_archive.JournalArchiveImporter().process( 427 + archive_path, 428 + target, 429 + import_id="20260426_120000", 430 + ) 431 + 432 + assert result.merge_summary is not None 433 + assert result.merge_summary["entities_staged"] == 1 434 + assert result.merge_log_path is not None 435 + assert result.merge_staging_path is not None 436 + decision_log = Path(result.merge_log_path) 437 + rows = [ 438 + json.loads(line) 439 + for line in decision_log.read_text(encoding="utf-8").splitlines() 440 + if line 441 + ] 442 + staged_row = next(row for row in rows if row.get("action") == "entity_staged") 443 + expected_path = str( 444 + Path(result.merge_staging_path) / "shared_person" / "entity.json" 445 + ) 446 + assert staged_row["staging_path"] == expected_path 447 + assert staged_row["staging_path"].startswith(result.merge_staging_path) 448 + 449 + 352 450 def test_journal_archive_importer_process_dry_run_is_read_only(tmp_path, monkeypatch): 353 451 archive_path = tmp_path / "journal-export.zip" 354 452 _write_archive(archive_path, _build_archive_members()) ··· 566 664 entries_written=1, 567 665 entities_seeded=0, 568 666 files_created=[], 569 - errors=[], 667 + errors=["segment 20260101/default/090000_300: segment copy failed"], 570 668 summary="Merged archive", 571 669 merge_summary={"segments_copied": 1}, 572 670 principal_collision={"source_entity_id": "a"}, 671 + merge_log_path="/tmp/journal.merge/run/decisions.jsonl", 672 + merge_staging_path="/tmp/journal.merge/run/staging", 573 673 ) 574 674 575 675 monkeypatch.setattr( ··· 600 700 ) 601 701 assert file_imported.kwargs["merge_summary"] == {"segments_copied": 1} 602 702 assert file_imported.kwargs["principal_collision"] == {"source_entity_id": "a"} 703 + assert ( 704 + file_imported.kwargs["merge_log_path"] 705 + == "/tmp/journal.merge/run/decisions.jsonl" 706 + ) 707 + assert ( 708 + file_imported.kwargs["merge_staging_path"] == "/tmp/journal.merge/run/staging" 709 + ) 710 + assert file_imported.kwargs["summary_errors"] == [ 711 + "segment 20260101/default/090000_300: segment copy failed" 712 + ] 603 713 604 714 payload = json.loads(capsys.readouterr().out) 605 715 assert payload["merge_summary"] == {"segments_copied": 1} 606 716 assert payload["principal_collision"] == {"source_entity_id": "a"} 717 + assert payload["merge_log_path"] == "/tmp/journal.merge/run/decisions.jsonl" 718 + assert payload["merge_staging_path"] == "/tmp/journal.merge/run/staging" 719 + assert payload["summary_errors"] == [ 720 + "segment 20260101/default/090000_300: segment copy failed" 721 + ] 722 + 723 + imported_path = tmp_path / "imports" / "20260303_120000" / "imported.json" 724 + imported = json.loads(imported_path.read_text(encoding="utf-8")) 725 + assert imported["merge_log_path"] == "/tmp/journal.merge/run/decisions.jsonl" 726 + assert imported["merge_staging_path"] == "/tmp/journal.merge/run/staging" 727 + assert imported["summary_errors"] == [ 728 + "segment 20260101/default/090000_300: segment copy failed" 729 + ] 607 730 608 731 609 732 def test_acquire_merge_lock_reclaims_stale_pid(tmp_path, monkeypatch):
+9
think/importers/cli.py
··· 790 790 processing_results["entities_seeded"] = result.entities_seeded 791 791 if result.merge_summary is not None: 792 792 processing_results["merge_summary"] = result.merge_summary 793 + processing_results["merge_log_path"] = result.merge_log_path 794 + processing_results["merge_staging_path"] = result.merge_staging_path 795 + processing_results["summary_errors"] = list(result.errors) 793 796 if result.principal_collision is not None: 794 797 processing_results["principal_collision"] = result.principal_collision 795 798 ··· 815 818 } 816 819 if result.merge_summary is not None: 817 820 file_imported_payload["merge_summary"] = result.merge_summary 821 + file_imported_payload["merge_log_path"] = result.merge_log_path 822 + file_imported_payload["merge_staging_path"] = result.merge_staging_path 823 + file_imported_payload["summary_errors"] = list(result.errors) 818 824 if result.principal_collision is not None: 819 825 file_imported_payload["principal_collision"] = ( 820 826 result.principal_collision ··· 923 929 "summary": result.summary, 924 930 "merge_summary": result.merge_summary, 925 931 "principal_collision": result.principal_collision, 932 + "merge_log_path": result.merge_log_path, 933 + "merge_staging_path": result.merge_staging_path, 934 + "summary_errors": list(result.errors), 926 935 } 927 936 ) 928 937 )
+2
think/importers/file_importer.py
··· 34 34 date_range: tuple[str, str] | None = None 35 35 merge_summary: dict[str, Any] | None = None 36 36 principal_collision: dict[str, Any] | None = None 37 + merge_log_path: str | None = None 38 + merge_staging_path: str | None = None 37 39 38 40 39 41 @runtime_checkable
+2
think/importers/journal_archive.py
··· 575 575 date_range=_collect_day_range(extracted_root), 576 576 merge_summary=merge_summary, 577 577 principal_collision=principal_collision, 578 + merge_log_path=str(log_path), 579 + merge_staging_path=str(staging_path), 578 580 ) 579 581 finally: 580 582 shutil.rmtree(extract_dir, ignore_errors=True)
+12
think/merge.py
··· 241 241 raise 242 242 except Exception as exc: 243 243 summary.segments_errored += 1 244 + _log_decision( 245 + log_path, 246 + { 247 + "action": "segment_errored", 248 + "item_type": "segment", 249 + "item_id": item_id, 250 + "reason": str(exc), 251 + }, 252 + ) 244 253 summary.errors.append(f"segment {day_name}/{stream}/{seg_key}: {exc}") 245 254 finally: 246 255 _report_progress(progress, "segments", index, total, item_id) ··· 307 316 "reason": "id_collision_no_match", 308 317 "source": source_entity, 309 318 "target": dict(target_entities[entity_id]), 319 + "staging_path": str( 320 + staging_path / entity_id / "entity.json" 321 + ), 310 322 }, 311 323 ) 312 324 else: