personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

journal: archive importer (L2) — extract→validate→merge→rescan with lockfile + var-tmp cleanup

- Add JournalArchiveImporter to FileImporter registry (auto-detected for .zip uploads).
- Harden validate_journal_archive against zip-slip: rejects absolute paths, ".." traversal, and symlink entries.
- New <journal>/.merge.lock contract (O_EXCL + PID reclaim) shared between dispatcher and importer; serializes file imports with archive merges.
- Pre-merge principal-collision detection surfaced via ImportResult.principal_collision.
- Merge-phase progress bridges through _progress_callback into the existing 5s callosum emitter.
- Async sol indexer --rescan-full kickoff (Popen, start_new_session) on successful merge.
- Supervisor startup sweeps stale /var/tmp/solstone-merge-* directories older than 24h.
- FileImporter.process() Protocol gains dry_run: bool = False; all existing implementations updated.
- ImportResult gains optional merge_summary / principal_collision fields, propagated through file_imported emit, imported.json, and the CLI --json output.

L2 ships no UI and no new HTTP endpoints. L3 will add the convey UI.

Co-Authored-By: OpenAI Codex <codex@openai.com>

+1194 -27
+5
docs/THINK.md
··· 37 37 `GOOGLE_API_KEY` can also be provided in a `.env` file which 38 38 is loaded automatically by most commands. 39 39 40 + Structured file importers are registered in `think/importers/file_importer.py` and 41 + run through `sol import`'s dispatcher. Their `process()` contract now accepts 42 + `dry_run: bool = False`, and journal-archive imports use the same dispatcher 43 + surface while serializing journal mutation with the merge lock contract. 44 + 40 45 ## Service Discovery 41 46 42 47 Agents invoke tools through `sol call` shell commands:
+58
tests/test_journal_archive.py
··· 18 18 archive.writestr(name, payload) 19 19 20 20 21 + def _write_zip_infos(path: Path, members: list[tuple[zipfile.ZipInfo, str]]) -> None: 22 + with zipfile.ZipFile(path, "w") as archive: 23 + for info, payload in members: 24 + archive.writestr(info, payload) 25 + 26 + 21 27 def test_validate_journal_archive_rejects_missing_file(tmp_path): 22 28 archive_path = tmp_path / "missing.zip" 23 29 ··· 200 206 assert [warning.code for warning in unparseable_result.warnings] == [ 201 207 "manifest-unparseable" 202 208 ] 209 + 210 + 211 + def test_validate_journal_archive_rejects_absolute_member(tmp_path): 212 + archive_path = tmp_path / "absolute.zip" 213 + _write_zip( 214 + archive_path, 215 + { 216 + "chronicle/20260101/default/090000_300/audio.jsonl": "{}\n", 217 + "/etc/passwd": "unsafe\n", 218 + }, 219 + ) 220 + 221 + result = journal_archive.validate_journal_archive(archive_path) 222 + 223 + assert result.ok is False 224 + assert result.warnings[-1].code == "archive-unsafe-path" 225 + 226 + 227 + def test_validate_journal_archive_rejects_parent_traversal_member(tmp_path): 228 + archive_path = tmp_path / "traversal.zip" 229 + _write_zip( 230 + archive_path, 231 + { 232 + "chronicle/20260101/default/090000_300/audio.jsonl": "{}\n", 233 + "../escape.txt": "unsafe\n", 234 + }, 235 + ) 236 + 237 + result = journal_archive.validate_journal_archive(archive_path) 238 + 239 + assert result.ok is False 240 + assert result.warnings[-1].code == "archive-unsafe-path" 241 + 242 + 243 + def test_validate_journal_archive_rejects_symlink_member(tmp_path): 244 + archive_path = tmp_path / "symlink.zip" 245 + symlink_info = zipfile.ZipInfo("chronicle/20260101/default/link") 246 + symlink_info.external_attr = 0xA1ED << 16 247 + safe_info = zipfile.ZipInfo("chronicle/20260101/default/090000_300/audio.jsonl") 248 + 249 + _write_zip_infos( 250 + archive_path, 251 + [ 252 + (safe_info, "{}\n"), 253 + (symlink_info, "target\n"), 254 + ], 255 + ) 256 + 257 + result = journal_archive.validate_journal_archive(archive_path) 258 + 259 + assert result.ok is False 260 + assert result.warnings[-1].code == "archive-unsafe-path"
+631
tests/test_journal_archive_importer.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import datetime as dt 5 + import importlib 6 + import json 7 + import os 8 + import shutil 9 + import threading 10 + import zipfile 11 + from pathlib import Path 12 + from unittest.mock import MagicMock 13 + 14 + import pytest 15 + 16 + import think.importers.journal_archive as journal_archive 17 + from think.importers.file_importer import ImportResult 18 + 19 + 20 + def _reset_journal(monkeypatch, journal_root: Path) -> None: 21 + journal_root.mkdir(parents=True, exist_ok=True) 22 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(journal_root)) 23 + think_utils = importlib.import_module("think.utils") 24 + think_utils._journal_path_cache = None 25 + 26 + 27 + def _write_json(path: Path, payload: dict) -> None: 28 + path.parent.mkdir(parents=True, exist_ok=True) 29 + path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") 30 + 31 + 32 + def _build_archive_members( 33 + *, 34 + prefix: str = "", 35 + day: str = "20260101", 36 + source_entity_id: str = "source_person", 37 + source_name: str = "Source Person", 38 + source_is_principal: bool = False, 39 + ) -> dict[str, str]: 40 + entity_payload = { 41 + "id": source_entity_id, 42 + "name": source_name, 43 + "type": "person", 44 + "created_at": 1, 45 + "is_principal": source_is_principal, 46 + } 47 + return { 48 + f"{prefix}chronicle/{day}/default/090000_300/audio.jsonl": "{}\n", 49 + f"{prefix}entities/{source_entity_id}/entity.json": json.dumps(entity_payload), 50 + f"{prefix}facets/work/facet.json": json.dumps({"title": "Work"}), 51 + f"{prefix}imports/{day}_090000/manifest.json": "{}\n", 52 + f"{prefix}_export.json": json.dumps( 53 + { 54 + "solstone_version": "0.1.0", 55 + "exported_at": "2026-04-26T20:00:00Z", 56 + "source_journal": "/tmp/source", 57 + "day_count": 1, 58 + "entity_count": 1, 59 + "facet_count": 1, 60 + } 61 + ), 62 + } 63 + 64 + 65 + def _write_archive(path: Path, members: dict[str, str]) -> None: 66 + with zipfile.ZipFile(path, "w") as archive: 67 + for name, payload in members.items(): 68 + archive.writestr(name, payload) 69 + 70 + 71 + def _hold_merge_lock( 72 + journal_root: Path, 73 + kind: str, 74 + import_id: str, 75 + ready: threading.Event, 76 + release: threading.Event, 77 + ) -> None: 78 + with journal_archive.acquire_merge_lock(journal_root, kind, import_id): 79 + ready.set() 80 + release.wait(timeout=5) 81 + 82 + 83 + def test_journal_archive_importer_detect_accepts_valid_export_zip(tmp_path): 84 + archive_path = tmp_path / "journal-export.zip" 85 + _write_archive(archive_path, _build_archive_members()) 86 + 87 + assert journal_archive.JournalArchiveImporter().detect(archive_path) is True 88 + 89 + 90 + def test_journal_archive_importer_preview_uses_validator_counts(tmp_path): 91 + archive_path = tmp_path / "journal-export.zip" 92 + _write_archive(archive_path, _build_archive_members()) 93 + 94 + preview = journal_archive.JournalArchiveImporter().preview(archive_path) 95 + 96 + assert preview.date_range == ("20260101", "20260101") 97 + assert preview.item_count == 1 98 + assert preview.entity_count == 1 99 + assert "1 days" in preview.summary 100 + 101 + 102 + def test_journal_archive_importer_process_merges_wrapped_archive(tmp_path, monkeypatch): 103 + archive_path = tmp_path / "wrapped-export.zip" 104 + _write_archive(archive_path, _build_archive_members(prefix="snapshot/")) 105 + extract_root = tmp_path / "extracts" 106 + extract_root.mkdir() 107 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 108 + 109 + target = tmp_path / "target" 110 + _reset_journal(monkeypatch, target) 111 + 112 + popen = MagicMock() 113 + monkeypatch.setattr(journal_archive.subprocess, "Popen", popen) 114 + 115 + result = journal_archive.JournalArchiveImporter().process( 116 + archive_path, 117 + target, 118 + import_id="20260426_120000", 119 + ) 120 + 121 + assert result.errors == [] 122 + assert result.merge_summary is not None 123 + assert result.merge_summary["segments_copied"] == 1 124 + assert (target / "chronicle" / "20260101" / "default" / "090000_300").exists() 125 + assert (target / "entities" / "source_person" / "entity.json").exists() 126 + assert (target / "imports" / "20260101_090000" / "manifest.json").exists() 127 + popen.assert_called_once_with( 128 + ["sol", "indexer", "--rescan-full"], 129 + stdout=journal_archive.subprocess.DEVNULL, 130 + stderr=journal_archive.subprocess.DEVNULL, 131 + start_new_session=True, 132 + ) 133 + 134 + 135 + def test_dispatcher_blocks_file_import_when_merge_lock_held(tmp_path, monkeypatch): 136 + mod = importlib.import_module("think.importers.cli") 137 + ics_file = tmp_path / "calendar.ics" 138 + ics_file.write_text("BEGIN:VCALENDAR\nEND:VCALENDAR", encoding="utf-8") 139 + 140 + _reset_journal(monkeypatch, tmp_path) 141 + ready = threading.Event() 142 + release = threading.Event() 143 + holder = threading.Thread( 144 + target=_hold_merge_lock, 145 + args=(tmp_path, "file-import", "lock-holder", ready, release), 146 + daemon=True, 147 + ) 148 + holder.start() 149 + assert ready.wait(timeout=5) 150 + 151 + mock_imp = MagicMock() 152 + mock_imp.name = "ics" 153 + mock_imp.display_name = "ICS Calendar" 154 + callosum = MagicMock() 155 + 156 + monkeypatch.setattr( 157 + "sys.argv", 158 + [ 159 + "sol import", 160 + str(ics_file), 161 + "--source", 162 + "ics", 163 + "--timestamp", 164 + "20260303_120000", 165 + ], 166 + ) 167 + monkeypatch.setattr( 168 + "think.importers.file_importer.get_file_importer", lambda name: mock_imp 169 + ) 170 + monkeypatch.setattr(mod, "CallosumConnection", lambda **kwargs: callosum) 171 + monkeypatch.setattr(mod, "get_rev", lambda: "test-rev") 172 + monkeypatch.setattr(mod, "_status_emitter", lambda: None) 173 + 174 + with pytest.raises(journal_archive.MergeLockError, match="pid"): 175 + mod.main() 176 + 177 + assert mock_imp.process.call_count == 0 178 + assert (tmp_path / "imports" / "20260303_120000" / "imported.json").exists() 179 + 180 + release.set() 181 + holder.join(timeout=5) 182 + 183 + 184 + def test_dispatcher_treats_archive_lock_contention_as_failure(tmp_path, monkeypatch): 185 + mod = importlib.import_module("think.importers.cli") 186 + archive_path = tmp_path / "journal-export.zip" 187 + _write_archive(archive_path, _build_archive_members()) 188 + extract_root = tmp_path / "extracts" 189 + extract_root.mkdir() 190 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 191 + 192 + _reset_journal(monkeypatch, tmp_path) 193 + ready = threading.Event() 194 + release = threading.Event() 195 + holder = threading.Thread( 196 + target=_hold_merge_lock, 197 + args=(tmp_path, "journal-archive-import", "lock-holder", ready, release), 198 + daemon=True, 199 + ) 200 + holder.start() 201 + assert ready.wait(timeout=5) 202 + 203 + callosum = MagicMock() 204 + monkeypatch.setattr( 205 + "sys.argv", 206 + [ 207 + "sol import", 208 + str(archive_path), 209 + "--source", 210 + "journal_archive", 211 + "--timestamp", 212 + "20260303_120000", 213 + ], 214 + ) 215 + monkeypatch.setattr(mod, "CallosumConnection", lambda **kwargs: callosum) 216 + monkeypatch.setattr(mod, "get_rev", lambda: "test-rev") 217 + monkeypatch.setattr(mod, "_status_emitter", lambda: None) 218 + monkeypatch.setattr(journal_archive.subprocess, "Popen", MagicMock()) 219 + 220 + with pytest.raises(journal_archive.MergeLockError, match="pid"): 221 + mod.main() 222 + 223 + emit_kinds = [call.args[:2] for call in callosum.emit.call_args_list] 224 + assert ("importer", "file_imported") not in emit_kinds 225 + assert ("importer", "error") in emit_kinds 226 + 227 + imported_path = tmp_path / "imports" / "20260303_120000" / "imported.json" 228 + payload = json.loads(imported_path.read_text(encoding="utf-8")) 229 + assert "processing_failed" in payload 230 + assert payload["error_stage"] == "importing" 231 + 232 + release.set() 233 + holder.join(timeout=5) 234 + 235 + 236 + def test_journal_archive_importer_process_blocks_when_lock_held(tmp_path, monkeypatch): 237 + archive_path = tmp_path / "journal-export.zip" 238 + _write_archive(archive_path, _build_archive_members()) 239 + extract_root = tmp_path / "extracts" 240 + extract_root.mkdir() 241 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 242 + 243 + target = tmp_path / "target" 244 + _reset_journal(monkeypatch, target) 245 + 246 + ready = threading.Event() 247 + release = threading.Event() 248 + holder = threading.Thread( 249 + target=_hold_merge_lock, 250 + args=(target, "journal-archive-import", "lock-holder", ready, release), 251 + daemon=True, 252 + ) 253 + holder.start() 254 + assert ready.wait(timeout=5) 255 + 256 + with pytest.raises(journal_archive.MergeLockError, match="pid"): 257 + journal_archive.JournalArchiveImporter().process( 258 + archive_path, 259 + target, 260 + import_id="20260426_120000", 261 + ) 262 + 263 + assert not (target / "chronicle").exists() 264 + 265 + release.set() 266 + holder.join(timeout=5) 267 + 268 + 269 + def test_journal_archive_importer_process_raises_on_invalid_archive(tmp_path): 270 + archive_path = tmp_path / "invalid.zip" 271 + archive_path.write_text("not a zip", encoding="utf-8") 272 + 273 + with pytest.raises(ValueError, match="readable ZIP file"): 274 + journal_archive.JournalArchiveImporter().process( 275 + archive_path, 276 + tmp_path / "target", 277 + import_id="20260426_120000", 278 + ) 279 + 280 + 281 + def test_journal_archive_importer_process_bridges_merge_progress(tmp_path, monkeypatch): 282 + archive_path = tmp_path / "journal-export.zip" 283 + _write_archive(archive_path, _build_archive_members()) 284 + extract_root = tmp_path / "extracts" 285 + extract_root.mkdir() 286 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 287 + 288 + target = tmp_path / "target" 289 + _reset_journal(monkeypatch, target) 290 + monkeypatch.setattr(journal_archive.subprocess, "Popen", MagicMock()) 291 + 292 + events: list[tuple[int, int, str | None]] = [] 293 + 294 + def progress_callback(current, total, **kwargs): 295 + events.append((current, total, kwargs.get("stage"))) 296 + 297 + journal_archive.JournalArchiveImporter().process( 298 + archive_path, 299 + target, 300 + import_id="20260426_120000", 301 + progress_callback=progress_callback, 302 + ) 303 + 304 + stages = {stage for _, _, stage in events} 305 + assert {"segments", "entities", "facets", "imports"} <= stages 306 + 307 + 308 + def test_journal_archive_importer_process_reports_principal_collision( 309 + tmp_path, monkeypatch 310 + ): 311 + archive_path = tmp_path / "journal-export.zip" 312 + _write_archive( 313 + archive_path, 314 + _build_archive_members( 315 + source_entity_id="source_principal", 316 + source_name="Source Principal", 317 + source_is_principal=True, 318 + ), 319 + ) 320 + extract_root = tmp_path / "extracts" 321 + extract_root.mkdir() 322 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 323 + 324 + target = tmp_path / "target" 325 + _reset_journal(monkeypatch, target) 326 + _write_json( 327 + target / "entities" / "target_principal" / "entity.json", 328 + { 329 + "id": "target_principal", 330 + "name": "Target Principal", 331 + "type": "person", 332 + "created_at": 1, 333 + "is_principal": True, 334 + }, 335 + ) 336 + 337 + result = journal_archive.JournalArchiveImporter().process( 338 + archive_path, 339 + target, 340 + import_id="20260426_120000", 341 + dry_run=True, 342 + ) 343 + 344 + assert result.principal_collision == { 345 + "source_entity_id": "source_principal", 346 + "source_name": "Source Principal", 347 + "target_entity_id": "target_principal", 348 + "target_name": "Target Principal", 349 + } 350 + 351 + 352 + def test_journal_archive_importer_process_dry_run_is_read_only(tmp_path, monkeypatch): 353 + archive_path = tmp_path / "journal-export.zip" 354 + _write_archive(archive_path, _build_archive_members()) 355 + extract_root = tmp_path / "extracts" 356 + extract_root.mkdir() 357 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 358 + 359 + target = tmp_path / "target" 360 + _reset_journal(monkeypatch, target) 361 + popen = MagicMock() 362 + monkeypatch.setattr(journal_archive.subprocess, "Popen", popen) 363 + 364 + result = journal_archive.JournalArchiveImporter().process( 365 + archive_path, 366 + target, 367 + import_id="20260426_120000", 368 + dry_run=True, 369 + ) 370 + 371 + assert result.merge_summary is not None 372 + assert result.merge_summary["segments_copied"] == 1 373 + assert not (target / "chronicle").exists() 374 + popen.assert_not_called() 375 + 376 + 377 + def test_journal_archive_importer_safe_extract_rejects_escape_target( 378 + tmp_path, monkeypatch 379 + ): 380 + archive_path = tmp_path / "unsafe.zip" 381 + _write_archive( 382 + archive_path, 383 + { 384 + "chronicle/20260101/default/090000_300/audio.jsonl": "{}\n", 385 + "../escape.txt": "unsafe\n", 386 + }, 387 + ) 388 + extract_root = tmp_path / "extracts" 389 + extract_root.mkdir() 390 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 391 + 392 + importer = journal_archive.JournalArchiveImporter() 393 + validation = journal_archive.ArchiveValidation( 394 + ok=True, 395 + archive_path=archive_path, 396 + root_prefix="", 397 + manifest=None, 398 + ) 399 + 400 + with pytest.raises(ImportError, match="unsafe path"): 401 + importer._safe_extract(archive_path, validation, "20260426_120000") 402 + 403 + 404 + def test_journal_archive_importer_safe_extract_skips_metadata_entries( 405 + tmp_path, monkeypatch 406 + ): 407 + archive_path = tmp_path / "metadata.zip" 408 + _write_archive( 409 + archive_path, 410 + { 411 + "__MACOSX/ignored.txt": "ignored\n", 412 + "chronicle/20260101/default/090000_300/audio.jsonl": "{}\n", 413 + ".DS_Store": "ignored\n", 414 + "_export.json": json.dumps( 415 + { 416 + "solstone_version": "0.1.0", 417 + "exported_at": "2026-04-26T20:00:00Z", 418 + "source_journal": "/tmp/source", 419 + "day_count": 1, 420 + "entity_count": 0, 421 + "facet_count": 0, 422 + } 423 + ), 424 + }, 425 + ) 426 + extract_root = tmp_path / "extracts" 427 + extract_root.mkdir() 428 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 429 + 430 + validation = journal_archive.validate_journal_archive(archive_path) 431 + importer = journal_archive.JournalArchiveImporter() 432 + extracted_root, temp_dir = importer._safe_extract( 433 + archive_path, 434 + validation, 435 + "20260426_120000", 436 + ) 437 + try: 438 + assert ( 439 + extracted_root / "chronicle" / "20260101" / "default" / "090000_300" 440 + ).exists() 441 + assert not (extracted_root / "__MACOSX").exists() 442 + assert not (extracted_root / ".DS_Store").exists() 443 + finally: 444 + shutil.rmtree(temp_dir, ignore_errors=True) 445 + 446 + 447 + def test_journal_archive_importer_process_starts_async_full_rescan( 448 + tmp_path, monkeypatch 449 + ): 450 + archive_path = tmp_path / "journal-export.zip" 451 + _write_archive(archive_path, _build_archive_members()) 452 + extract_root = tmp_path / "extracts" 453 + extract_root.mkdir() 454 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 455 + 456 + target = tmp_path / "target" 457 + _reset_journal(monkeypatch, target) 458 + popen = MagicMock() 459 + monkeypatch.setattr(journal_archive.subprocess, "Popen", popen) 460 + 461 + journal_archive.JournalArchiveImporter().process( 462 + archive_path, 463 + target, 464 + import_id="20260426_120000", 465 + ) 466 + 467 + popen.assert_called_once_with( 468 + ["sol", "indexer", "--rescan-full"], 469 + stdout=journal_archive.subprocess.DEVNULL, 470 + stderr=journal_archive.subprocess.DEVNULL, 471 + start_new_session=True, 472 + ) 473 + 474 + 475 + def test_journal_archive_importer_process_warns_when_full_rescan_launch_fails( 476 + tmp_path, monkeypatch, caplog 477 + ): 478 + archive_path = tmp_path / "journal-export.zip" 479 + _write_archive(archive_path, _build_archive_members()) 480 + extract_root = tmp_path / "extracts" 481 + extract_root.mkdir() 482 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 483 + 484 + target = tmp_path / "target" 485 + _reset_journal(monkeypatch, target) 486 + monkeypatch.setattr( 487 + journal_archive.subprocess, 488 + "Popen", 489 + MagicMock(side_effect=OSError("nope")), 490 + ) 491 + 492 + with caplog.at_level("WARNING"): 493 + result = journal_archive.JournalArchiveImporter().process( 494 + archive_path, 495 + target, 496 + import_id="20260426_120000", 497 + ) 498 + 499 + assert result.errors == [] 500 + assert "Failed to start full index rescan" in caplog.text 501 + 502 + 503 + def test_journal_archive_importer_process_cleans_extract_dir_on_success_and_error( 504 + tmp_path, monkeypatch 505 + ): 506 + archive_path = tmp_path / "journal-export.zip" 507 + _write_archive(archive_path, _build_archive_members()) 508 + extract_root = tmp_path / "extracts" 509 + extract_root.mkdir() 510 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", extract_root) 511 + 512 + target = tmp_path / "target" 513 + _reset_journal(monkeypatch, target) 514 + monkeypatch.setattr(journal_archive.subprocess, "Popen", MagicMock()) 515 + 516 + journal_archive.JournalArchiveImporter().process( 517 + archive_path, 518 + target, 519 + import_id="20260426_120000", 520 + ) 521 + assert list(extract_root.glob("solstone-merge-*")) == [] 522 + 523 + def boom(*args, **kwargs): 524 + raise RuntimeError("boom") 525 + 526 + monkeypatch.setattr(journal_archive, "merge_journals", boom) 527 + with pytest.raises(RuntimeError, match="boom"): 528 + journal_archive.JournalArchiveImporter().process( 529 + archive_path, 530 + target, 531 + import_id="20260426_120001", 532 + ) 533 + assert list(extract_root.glob("solstone-merge-*")) == [] 534 + 535 + 536 + def test_sweep_stale_extract_dirs_removes_old_directories(tmp_path, monkeypatch): 537 + monkeypatch.setattr(journal_archive, "TEMP_EXTRACT_ROOT", tmp_path) 538 + stale = tmp_path / "solstone-merge-stale" 539 + stale.mkdir() 540 + fresh = tmp_path / "solstone-merge-fresh" 541 + fresh.mkdir() 542 + 543 + old_ts = (dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=2)).timestamp() 544 + os.utime(stale, (old_ts, old_ts)) 545 + 546 + swept = journal_archive.sweep_stale_extract_dirs() 547 + 548 + assert swept == 1 549 + assert not stale.exists() 550 + assert fresh.exists() 551 + 552 + 553 + def test_importer_cli_emits_merge_summary_and_principal_collision( 554 + tmp_path, monkeypatch, capsys 555 + ): 556 + mod = importlib.import_module("think.importers.cli") 557 + archive_path = tmp_path / "journal-export.zip" 558 + archive_path.write_bytes(b"fake zip") 559 + 560 + _reset_journal(monkeypatch, tmp_path) 561 + callosum = MagicMock() 562 + mock_imp = MagicMock() 563 + mock_imp.name = "journal_archive" 564 + mock_imp.display_name = "Journal Archive" 565 + mock_imp.process.return_value = ImportResult( 566 + entries_written=1, 567 + entities_seeded=0, 568 + files_created=[], 569 + errors=[], 570 + summary="Merged archive", 571 + merge_summary={"segments_copied": 1}, 572 + principal_collision={"source_entity_id": "a"}, 573 + ) 574 + 575 + monkeypatch.setattr( 576 + "sys.argv", 577 + [ 578 + "sol import", 579 + str(archive_path), 580 + "--source", 581 + "journal_archive", 582 + "--timestamp", 583 + "20260303_120000", 584 + "--json", 585 + ], 586 + ) 587 + monkeypatch.setattr( 588 + "think.importers.file_importer.get_file_importer", lambda name: mock_imp 589 + ) 590 + monkeypatch.setattr(mod, "CallosumConnection", lambda **kwargs: callosum) 591 + monkeypatch.setattr(mod, "get_rev", lambda: "test-rev") 592 + monkeypatch.setattr(mod, "_status_emitter", lambda: None) 593 + 594 + mod.main() 595 + 596 + file_imported = next( 597 + call 598 + for call in callosum.emit.call_args_list 599 + if call.args[:2] == ("importer", "file_imported") 600 + ) 601 + assert file_imported.kwargs["merge_summary"] == {"segments_copied": 1} 602 + assert file_imported.kwargs["principal_collision"] == {"source_entity_id": "a"} 603 + 604 + payload = json.loads(capsys.readouterr().out) 605 + assert payload["merge_summary"] == {"segments_copied": 1} 606 + assert payload["principal_collision"] == {"source_entity_id": "a"} 607 + 608 + 609 + def test_acquire_merge_lock_reclaims_stale_pid(tmp_path, monkeypatch): 610 + lock_path = tmp_path / ".merge.lock" 611 + lock_path.write_text( 612 + json.dumps( 613 + { 614 + "pid": 999999, 615 + "started_at_utc": "2026-04-26T00:00:00+00:00", 616 + "kind": "file-import", 617 + "import_id": "stale", 618 + } 619 + ), 620 + encoding="utf-8", 621 + ) 622 + monkeypatch.setattr( 623 + journal_archive.os, "kill", MagicMock(side_effect=ProcessLookupError) 624 + ) 625 + 626 + with journal_archive.acquire_merge_lock(tmp_path, "file-import", "fresh"): 627 + payload = json.loads(lock_path.read_text(encoding="utf-8")) 628 + assert payload["import_id"] == "fresh" 629 + assert payload["pid"] == journal_archive.os.getpid() 630 + 631 + assert not lock_path.exists()
+1
think/importers/chatgpt.py
··· 201 201 facet: str | None = None, 202 202 import_id: str | None = None, 203 203 progress_callback: Callable | None = None, 204 + dry_run: bool = False, 204 205 ) -> ImportResult: 205 206 conversations = _open_conversations(path) 206 207 import_id = import_id or dt.datetime.now().strftime("%Y%m%d_%H%M%S")
+1
think/importers/claude_chat.py
··· 163 163 facet: str | None = None, 164 164 import_id: str | None = None, 165 165 progress_callback: Callable | None = None, 166 + dry_run: bool = False, 166 167 ) -> ImportResult: 167 168 conversations = _open_conversations(path) 168 169 import_id = import_id or dt.datetime.now().strftime("%Y%m%d_%H%M%S")
+47 -20
think/importers/cli.py
··· 82 82 """Callback for importers to report progress stats.""" 83 83 _progress_stats["items_processed"] = current 84 84 _progress_stats["items_total"] = total 85 + if stage := kwargs.get("stage"): 86 + # Reuse the existing stage tracker so merge-phase status rides the normal emitter. 87 + _set_stage(str(stage)) 85 88 for key in ("earliest_date", "latest_date", "entities_found"): 86 89 if key in kwargs: 87 90 _progress_stats[key] = kwargs[key] ··· 745 748 _source_hash = hash_source(Path(args.media)) 746 749 747 750 import_dir = _setup_file_import(_import_id) 748 - result = _file_importer.process( 749 - Path(args.media), 750 - journal_root, 751 - facet=args.facet, 752 - import_id=_import_id, 753 - progress_callback=_progress_callback, 754 - ) 751 + if _file_importer.name == "journal_archive": 752 + # The archive importer owns the same O_EXCL lock internally for direct callers. 753 + result = _file_importer.process( 754 + Path(args.media), 755 + journal_root, 756 + facet=args.facet, 757 + import_id=_import_id, 758 + progress_callback=_progress_callback, 759 + ) 760 + else: 761 + from think.importers.journal_archive import acquire_merge_lock 762 + 763 + with acquire_merge_lock(journal_root, "file-import", _import_id): 764 + result = _file_importer.process( 765 + Path(args.media), 766 + journal_root, 767 + facet=args.facet, 768 + import_id=_import_id, 769 + progress_callback=_progress_callback, 770 + ) 755 771 756 772 all_created_files.extend(result.files_created) 757 773 processing_results["outputs"].append( ··· 772 788 ) 773 789 processing_results["entries_written"] = result.entries_written 774 790 processing_results["entities_seeded"] = result.entities_seeded 791 + if result.merge_summary is not None: 792 + processing_results["merge_summary"] = result.merge_summary 793 + if result.principal_collision is not None: 794 + processing_results["principal_collision"] = result.principal_collision 775 795 776 796 if result.errors: 777 797 logger.warning( ··· 782 802 ) 783 803 784 804 # Emit callosum events for file imports 785 - _callosum.emit( 786 - "importer", 787 - "file_imported", 788 - import_id=_import_id, 789 - importer=_file_importer.name, 790 - entries_written=result.entries_written, 791 - entities_seeded=result.entities_seeded, 792 - files_created=len(result.files_created), 793 - errors=len(result.errors), 794 - stream=stream, 795 - source_display=_file_importer.display_name, 796 - date_range=list(result.date_range) if result.date_range else None, 797 - ) 805 + file_imported_payload = { 806 + "import_id": _import_id, 807 + "importer": _file_importer.name, 808 + "entries_written": result.entries_written, 809 + "entities_seeded": result.entities_seeded, 810 + "files_created": len(result.files_created), 811 + "errors": len(result.errors), 812 + "stream": stream, 813 + "source_display": _file_importer.display_name, 814 + "date_range": list(result.date_range) if result.date_range else None, 815 + } 816 + if result.merge_summary is not None: 817 + file_imported_payload["merge_summary"] = result.merge_summary 818 + if result.principal_collision is not None: 819 + file_imported_payload["principal_collision"] = ( 820 + result.principal_collision 821 + ) 822 + _callosum.emit("importer", "file_imported", **file_imported_payload) 798 823 799 824 if result.segments: 800 825 for seg_day, seg_key in result.segments: ··· 896 921 "files_created": result.files_created, 897 922 "errors": result.errors, 898 923 "summary": result.summary, 924 + "merge_summary": result.merge_summary, 925 + "principal_collision": result.principal_collision, 899 926 } 900 927 ) 901 928 )
+1
think/importers/documents.py
··· 186 186 facet: str | None = None, 187 187 import_id: str | None = None, 188 188 progress_callback: Callable | None = None, 189 + dry_run: bool = False, 189 190 ) -> ImportResult: 190 191 pdfs = _find_pdfs(path) 191 192 import_id = import_id or dt.datetime.now().strftime("%Y%m%d_%H%M%S")
+5 -1
think/importers/file_importer.py
··· 6 6 import logging 7 7 from dataclasses import dataclass 8 8 from pathlib import Path 9 - from typing import Callable, Protocol, runtime_checkable 9 + from typing import Any, Callable, Protocol, runtime_checkable 10 10 11 11 logger = logging.getLogger(__name__) 12 12 ··· 32 32 summary: str 33 33 segments: list[tuple[str, str]] | None = None 34 34 date_range: tuple[str, str] | None = None 35 + merge_summary: dict[str, Any] | None = None 36 + principal_collision: dict[str, Any] | None = None 35 37 36 38 37 39 @runtime_checkable ··· 53 55 facet: str | None = None, 54 56 import_id: str | None = None, 55 57 progress_callback: Callable | None = None, 58 + dry_run: bool = False, 56 59 ) -> ImportResult: ... 57 60 58 61 ··· 64 67 "kindle": "think.importers.kindle", 65 68 "gemini": "think.importers.gemini", 66 69 "document": "think.importers.documents", 70 + "journal_archive": "think.importers.journal_archive", 67 71 } 68 72 69 73
+1
think/importers/gemini.py
··· 227 227 facet: str | None = None, 228 228 import_id: str | None = None, 229 229 progress_callback: Callable | None = None, 230 + dry_run: bool = False, 230 231 ) -> ImportResult: 231 232 activities = _load_activities(path) 232 233 import_id = import_id or dt.datetime.now().strftime("%Y%m%d_%H%M%S")
+1
think/importers/ics.py
··· 425 425 facet: str | None = None, 426 426 import_id: str | None = None, 427 427 progress_callback: Callable | None = None, 428 + dry_run: bool = False, 428 429 ) -> ImportResult: 429 430 ics_blobs = _extract_ics_data(path) 430 431 import_id = import_id or dt.datetime.now().strftime("%Y%m%d_%H%M%S")
+432 -6
think/importers/journal_archive.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """Read-only validation for exported journal archives.""" 5 - 6 - # L1 read-only validator. L2 will add a JournalArchiveImporter class to this 7 - # module; the validator must remain free of writes (scope §4 / AGENTS.md §7 L7). 4 + """Validator and importer for exported journal archives.""" 8 5 9 6 from __future__ import annotations 10 7 8 + import datetime as dt 11 9 import json 10 + import logging 11 + import os 12 12 import re 13 + import shutil 14 + import subprocess 13 15 import zipfile 14 - from dataclasses import dataclass, field 16 + from contextlib import contextmanager 17 + from dataclasses import asdict, dataclass, field 15 18 from pathlib import Path 16 - from typing import Any 19 + from typing import Any, Callable, Iterator 20 + 21 + from think.entities.journal import get_journal_principal 22 + from think.importers.file_importer import ImportPreview, ImportResult 23 + from think.merge import ProgressCallback, merge_journals 24 + 25 + logger = logging.getLogger(__name__) 17 26 18 27 DATE_RE = re.compile(r"^\d{8}$") 19 28 JOURNAL_ROOT_ENTRIES = {"chronicle", "entities", "facets", "imports", "_export.json"} ··· 25 34 "entity_count", 26 35 "facet_count", 27 36 ) 37 + SYMLINK_TYPE = 0xA000 38 + SYMLINK_MASK = 0xF000 39 + TEMP_EXTRACT_GLOB = "solstone-merge-*" 40 + TEMP_EXTRACT_ROOT = Path("/var/tmp") 28 41 29 42 30 43 @dataclass ··· 45 58 facet_count: int = 0 46 59 47 60 61 + class MergeLockError(RuntimeError): 62 + """Raised when the journal merge/import lock cannot be acquired.""" 63 + 64 + def __init__(self, pid: int | None, message: str): 65 + super().__init__(message) 66 + self.pid = pid 67 + 68 + 48 69 def _visible_name(name: str) -> str | None: 49 70 parts = [part for part in name.split("/") if part] 50 71 if not parts: ··· 102 123 facet_slugs.add(parts[1]) 103 124 104 125 return len(day_dirs), len(entity_slugs), len(facet_slugs) 126 + 127 + 128 + def _is_symlink_entry(info: zipfile.ZipInfo) -> bool: 129 + return ((info.external_attr >> 16) & SYMLINK_MASK) == SYMLINK_TYPE 130 + 131 + 132 + def _has_unsafe_path(name: str) -> bool: 133 + entry_path = Path(name) 134 + return entry_path.is_absolute() or ".." in entry_path.parts 105 135 106 136 107 137 def _build_fatal( ··· 175 205 warnings=warnings, 176 206 ) 177 207 208 + for info in infos: 209 + if _has_unsafe_path(info.filename) or _is_symlink_entry(info): 210 + return _build_fatal( 211 + archive_path, 212 + "archive-unsafe-path", 213 + f"unsafe entry: {info.filename}", 214 + warnings=warnings, 215 + ) 216 + 178 217 day_count, entity_count, facet_count = _scan_counts( 179 218 visible_names, root_prefix 180 219 ) ··· 265 304 "Archive is not a readable ZIP file.", 266 305 warnings=warnings, 267 306 ) 307 + 308 + 309 + def _validation_messages(validation: ArchiveValidation) -> list[str]: 310 + return [warning.message for warning in validation.warnings] 311 + 312 + 313 + def _archive_day_range(archive_path: Path, root_prefix: str) -> tuple[str, str] | None: 314 + try: 315 + with zipfile.ZipFile(archive_path, "r") as archive: 316 + names = [ 317 + name 318 + for info in archive.infolist() 319 + if (name := _visible_name(info.filename)) is not None 320 + ] 321 + except (OSError, RuntimeError, zipfile.BadZipFile, zipfile.LargeZipFile): 322 + return None 323 + 324 + days = sorted( 325 + { 326 + parts[1] 327 + for name in names 328 + if (relative_name := name[len(root_prefix) :] if root_prefix else name) 329 + and (parts := relative_name.split("/")) 330 + and len(parts) >= 2 331 + and parts[0] == "chronicle" 332 + and DATE_RE.match(parts[1]) 333 + } 334 + ) 335 + if not days: 336 + return None 337 + return (days[0], days[-1]) 338 + 339 + 340 + def _format_preview_summary(validation: ArchiveValidation) -> str: 341 + base = ( 342 + f"{validation.day_count} days, {validation.entity_count} entities, " 343 + f"{validation.facet_count} facets" 344 + ) 345 + if validation.warnings: 346 + return f"Journal archive: {base} ({len(validation.warnings)} warnings)" 347 + return f"Journal archive: {base}" 348 + 349 + 350 + def _merge_artifact_paths(journal_root: Path) -> tuple[Path, Path]: 351 + run_id = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ") 352 + artifact_root = journal_root.parent / f"{journal_root.name}.merge" / run_id 353 + return artifact_root / "decisions.jsonl", artifact_root / "staging" 354 + 355 + 356 + def _collect_day_range(root: Path) -> tuple[str, str] | None: 357 + chronicle_dir = root / "chronicle" 358 + if not chronicle_dir.is_dir(): 359 + return None 360 + days = sorted( 361 + entry.name 362 + for entry in chronicle_dir.iterdir() 363 + if entry.is_dir() and DATE_RE.match(entry.name) 364 + ) 365 + if not days: 366 + return None 367 + return (days[0], days[-1]) 368 + 369 + 370 + def _format_merge_summary(summary: dict[str, Any], *, dry_run: bool) -> str: 371 + prefix = "Dry run merge" if dry_run else "Merged archive" 372 + return ( 373 + f"{prefix}: {summary['segments_copied']} segments copied, " 374 + f"{summary['segments_skipped']} skipped, " 375 + f"{summary['entities_created']} entities created, " 376 + f"{summary['entities_merged']} merged, " 377 + f"{summary['entities_staged']} staged, " 378 + f"{summary['facets_created']} facets created, " 379 + f"{summary['facets_merged']} merged, " 380 + f"{summary['imports_copied']} imports copied" 381 + ) 382 + 383 + 384 + def _lock_message(pid: int | None) -> str: 385 + if pid is None: 386 + return "another journal merge is in progress" 387 + return f"another journal merge is in progress (pid {pid})" 388 + 389 + 390 + def _read_lock_owner(lock_path: Path) -> tuple[int | None, dict[str, Any] | None]: 391 + try: 392 + payload = json.loads(lock_path.read_text(encoding="utf-8")) 393 + except (OSError, json.JSONDecodeError): 394 + return None, None 395 + raw_pid = payload.get("pid") 396 + if isinstance(raw_pid, int): 397 + return raw_pid, payload 398 + if isinstance(raw_pid, str) and raw_pid.isdigit(): 399 + return int(raw_pid), payload 400 + return None, payload 401 + 402 + 403 + @contextmanager 404 + def acquire_merge_lock( 405 + journal_root: Path, 406 + kind: str, 407 + import_id: str, 408 + ) -> Iterator[None]: 409 + """Acquire the journal merge/import lock using an O_EXCL lockfile.""" 410 + 411 + lock_path = journal_root / ".merge.lock" 412 + payload = { 413 + "pid": os.getpid(), 414 + "started_at_utc": dt.datetime.now(dt.timezone.utc).isoformat(), 415 + "kind": kind, 416 + "import_id": import_id, 417 + } 418 + 419 + for attempt in range(2): 420 + try: 421 + fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) 422 + except FileExistsError: 423 + pid, _ = _read_lock_owner(lock_path) 424 + if pid is None: 425 + raise MergeLockError(None, _lock_message(None)) 426 + try: 427 + os.kill(pid, 0) 428 + except ProcessLookupError: 429 + try: 430 + lock_path.unlink() 431 + except FileNotFoundError: 432 + pass 433 + if attempt == 0: 434 + continue 435 + raise MergeLockError(pid, _lock_message(pid)) 436 + except OSError: 437 + raise MergeLockError(pid, _lock_message(pid)) 438 + raise MergeLockError(pid, _lock_message(pid)) 439 + else: 440 + try: 441 + with os.fdopen(fd, "w", encoding="utf-8") as handle: 442 + json.dump(payload, handle) 443 + except Exception: 444 + try: 445 + lock_path.unlink() 446 + except FileNotFoundError: 447 + pass 448 + raise 449 + break 450 + else: 451 + raise MergeLockError(None, _lock_message(None)) 452 + 453 + try: 454 + yield 455 + finally: 456 + try: 457 + lock_path.unlink() 458 + except FileNotFoundError: 459 + pass 460 + 461 + 462 + def sweep_stale_extract_dirs(max_age_seconds: int = 86400) -> int: 463 + """Remove stale journal-archive extraction directories under /var/tmp.""" 464 + 465 + swept = 0 466 + now = dt.datetime.now(dt.timezone.utc).timestamp() 467 + for path in TEMP_EXTRACT_ROOT.glob(TEMP_EXTRACT_GLOB): 468 + if not path.is_dir(): 469 + continue 470 + try: 471 + age_seconds = now - path.stat().st_mtime 472 + except OSError: 473 + continue 474 + if age_seconds <= max_age_seconds: 475 + continue 476 + shutil.rmtree(path, ignore_errors=True) 477 + if not path.exists(): 478 + swept += 1 479 + return swept 480 + 481 + 482 + class JournalArchiveImporter: 483 + name = "journal_archive" 484 + display_name = "Journal Archive" 485 + file_patterns = ["*.zip"] 486 + description = "Merge an exported journal archive into the current journal" 487 + 488 + def detect(self, path: Path) -> bool: 489 + if not path.is_file(): 490 + return False 491 + if path.suffix.lower() != ".zip": 492 + return False 493 + return validate_journal_archive(path).ok 494 + 495 + def preview(self, path: Path) -> ImportPreview: 496 + validation = validate_journal_archive(path) 497 + if not validation.ok: 498 + messages = _validation_messages(validation) 499 + return ImportPreview( 500 + date_range=("", ""), 501 + item_count=0, 502 + entity_count=0, 503 + summary=messages[-1] 504 + if messages 505 + else "Journal archive validation failed", 506 + ) 507 + 508 + date_range = _archive_day_range(validation.archive_path, validation.root_prefix) 509 + return ImportPreview( 510 + date_range=date_range or ("", ""), 511 + item_count=validation.day_count, 512 + entity_count=validation.entity_count, 513 + summary=_format_preview_summary(validation), 514 + ) 515 + 516 + def process( 517 + self, 518 + path: Path, 519 + journal_root: Path, 520 + *, 521 + facet: str | None = None, 522 + import_id: str | None = None, 523 + progress_callback: Callable | None = None, 524 + dry_run: bool = False, 525 + ) -> ImportResult: 526 + del facet 527 + import_id = import_id or dt.datetime.now().strftime("%Y%m%d_%H%M%S") 528 + validation = validate_journal_archive(path) 529 + if not validation.ok: 530 + messages = _validation_messages(validation) 531 + summary = messages[-1] if messages else "Journal archive validation failed" 532 + raise ValueError(summary) 533 + 534 + with acquire_merge_lock(journal_root, "journal-archive-import", import_id): 535 + extracted_root, extract_dir = self._safe_extract( 536 + validation.archive_path, validation, import_id 537 + ) 538 + try: 539 + principal_collision = self._check_principal_collision(extracted_root) 540 + progress = self._bridge_progress(progress_callback) 541 + log_path, staging_path = _merge_artifact_paths(journal_root) 542 + summary = merge_journals( 543 + extracted_root, 544 + journal_root, 545 + dry_run=dry_run, 546 + log_path=log_path, 547 + staging_path=staging_path, 548 + progress=progress, 549 + ) 550 + merge_summary = asdict(summary) 551 + if not dry_run: 552 + try: 553 + subprocess.Popen( 554 + ["sol", "indexer", "--rescan-full"], 555 + stdout=subprocess.DEVNULL, 556 + stderr=subprocess.DEVNULL, 557 + start_new_session=True, 558 + ) 559 + except OSError as exc: 560 + logger.warning( 561 + "Failed to start full index rescan for journal archive import %s: %s", 562 + import_id, 563 + exc, 564 + ) 565 + 566 + return ImportResult( 567 + entries_written=summary.segments_copied, 568 + entities_seeded=summary.entities_created, 569 + files_created=[], 570 + errors=list(summary.errors), 571 + summary=_format_merge_summary( 572 + merge_summary, 573 + dry_run=dry_run, 574 + ), 575 + date_range=_collect_day_range(extracted_root), 576 + merge_summary=merge_summary, 577 + principal_collision=principal_collision, 578 + ) 579 + finally: 580 + shutil.rmtree(extract_dir, ignore_errors=True) 581 + 582 + def _safe_extract( 583 + self, 584 + archive_path: Path, 585 + validation: ArchiveValidation, 586 + import_id: str, 587 + ) -> tuple[Path, Path]: 588 + temp_name = ( 589 + f"solstone-merge-{import_id}-{os.getpid()}-" 590 + f"{int(dt.datetime.now(dt.timezone.utc).timestamp() * 1000)}" 591 + ) 592 + extract_dir = TEMP_EXTRACT_ROOT / temp_name 593 + previous_umask = os.umask(0o077) 594 + try: 595 + extract_dir.mkdir(mode=0o700) 596 + extract_root = extract_dir / "journal" 597 + extract_root.mkdir(mode=0o700) 598 + resolved_root = extract_root.resolve() 599 + 600 + with zipfile.ZipFile(archive_path, "r") as archive: 601 + for info in archive.infolist(): 602 + visible_name = _visible_name(info.filename) 603 + if visible_name is None: 604 + continue 605 + if validation.root_prefix: 606 + if not visible_name.startswith(validation.root_prefix): 607 + continue 608 + relative_name = visible_name[len(validation.root_prefix) :] 609 + else: 610 + relative_name = visible_name 611 + if not relative_name: 612 + continue 613 + 614 + target_path = (extract_root / relative_name).resolve() 615 + try: 616 + target_path.relative_to(resolved_root) 617 + except ValueError as exc: 618 + raise ImportError(f"unsafe path {visible_name}") from exc 619 + if _has_unsafe_path(relative_name) or _is_symlink_entry(info): 620 + raise ImportError(f"unsafe path {visible_name}") 621 + 622 + if info.is_dir(): 623 + target_path.mkdir(parents=True, exist_ok=True, mode=0o700) 624 + continue 625 + 626 + target_path.parent.mkdir(parents=True, exist_ok=True, mode=0o700) 627 + with ( 628 + archive.open(info, "r") as source, 629 + open(target_path, "wb") as dest, 630 + ): 631 + shutil.copyfileobj(source, dest) 632 + 633 + return extract_root, extract_dir 634 + finally: 635 + os.umask(previous_umask) 636 + 637 + def _check_principal_collision( 638 + self, 639 + extracted_root: Path, 640 + ) -> dict[str, str] | None: 641 + target_principal = get_journal_principal() 642 + if not target_principal: 643 + return None 644 + 645 + candidate_paths = sorted((extracted_root / "entities").glob("*/entity.json")) 646 + candidate_paths.extend( 647 + sorted((extracted_root / "facets").glob("*/entities/*/entity.json")) 648 + ) 649 + for path in candidate_paths: 650 + try: 651 + entity = json.loads(path.read_text(encoding="utf-8")) 652 + except (OSError, json.JSONDecodeError): 653 + continue 654 + if not entity.get("is_principal"): 655 + continue 656 + source_entity_id = str(entity.get("id") or path.parent.name) 657 + target_entity_id = str(target_principal.get("id") or "") 658 + if ( 659 + source_entity_id 660 + and target_entity_id 661 + and source_entity_id != target_entity_id 662 + ): 663 + return { 664 + "source_entity_id": source_entity_id, 665 + "source_name": str(entity.get("name") or source_entity_id), 666 + "target_entity_id": target_entity_id, 667 + "target_name": str( 668 + target_principal.get("name") or target_entity_id 669 + ), 670 + } 671 + return None 672 + return None 673 + 674 + def _bridge_progress( 675 + self, 676 + progress_callback: Callable | None, 677 + ) -> ProgressCallback | None: 678 + if progress_callback is None: 679 + return None 680 + 681 + def _bridge( 682 + phase: str, 683 + completed: int, 684 + total: int | None, 685 + item_name: str | None, 686 + ) -> None: 687 + del item_name 688 + progress_callback(completed, total or 0, stage=phase) 689 + 690 + return _bridge 691 + 692 + 693 + importer = JournalArchiveImporter()
+1
think/importers/kindle.py
··· 256 256 facet: str | None = None, 257 257 import_id: str | None = None, 258 258 progress_callback: Callable | None = None, 259 + dry_run: bool = False, 259 260 ) -> ImportResult: 260 261 text = path.read_text(encoding="utf-8-sig") 261 262 blocks = text.split(DELIMITER)
+1
think/importers/obsidian.py
··· 350 350 facet: str | None = None, 351 351 import_id: str | None = None, 352 352 progress_callback: Callable | None = None, 353 + dry_run: bool = False, 353 354 ) -> ImportResult: 354 355 md_files = list(self._walk_md_files(path)) 355 356 total = len(md_files)
+9
think/supervisor.py
··· 1590 1590 except Exception: 1591 1591 logging.exception("Maintenance runner raised; continuing startup") 1592 1592 1593 + try: 1594 + from think.importers.journal_archive import sweep_stale_extract_dirs 1595 + 1596 + swept = sweep_stale_extract_dirs() 1597 + if swept > 0: 1598 + logging.info("Swept %d stale journal-archive extract dir(s)", swept) 1599 + except Exception: 1600 + logging.exception("Journal archive extract sweep raised; continuing startup") 1601 + 1593 1602 # Start Callosum in-process first - it's the message bus that other services depend on 1594 1603 try: 1595 1604 start_callosum_in_process()