personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add idempotent imports with dedup manifests and entry-level merge

Source-level dedup: hash source file before import, check existing
manifests. If same file was already imported, skip with message.
--force overrides.

Entry-level merge: when writing to a day that already has entries
from the same source, compute content keys and skip duplicates,
appending only new entries.

- hash_source() for file and directory SHA-256
- write_manifest() / find_manifest_by_hash() for manifest lifecycle
- write_structured_import() merges with existing JSONL on re-import
- CLI checks source hash before processing, writes manifest after
- 14 new tests for hashing, manifests, entry merge, and dedup
- Updated 4 existing tests for manifest expectations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+454 -6
+255
tests/test_import_dedup.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for import deduplication — manifests, source-level dedup, entry merge.""" 5 + 6 + import json 7 + import os 8 + import tempfile 9 + from pathlib import Path 10 + 11 + from think.importers.shared import ( 12 + _entry_content_key, 13 + _load_existing_entries, 14 + find_manifest_by_hash, 15 + hash_source, 16 + write_manifest, 17 + write_structured_import, 18 + ) 19 + 20 + 21 + # --- hash_source tests --- 22 + 23 + 24 + def test_hash_source_file(): 25 + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: 26 + f.write('{"test": true}') 27 + f.flush() 28 + try: 29 + h1 = hash_source(Path(f.name)) 30 + h2 = hash_source(Path(f.name)) 31 + assert h1 == h2 32 + assert len(h1) == 64 # SHA-256 hex 33 + finally: 34 + os.unlink(f.name) 35 + 36 + 37 + def test_hash_source_file_different_content(): 38 + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f1: 39 + f1.write('{"a": 1}') 40 + f1.flush() 41 + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f2: 42 + f2.write('{"a": 2}') 43 + f2.flush() 44 + try: 45 + assert hash_source(Path(f1.name)) != hash_source(Path(f2.name)) 46 + finally: 47 + os.unlink(f1.name) 48 + os.unlink(f2.name) 49 + 50 + 51 + def test_hash_source_directory(): 52 + with tempfile.TemporaryDirectory() as tmpdir: 53 + (Path(tmpdir) / "a.txt").write_text("hello") 54 + (Path(tmpdir) / "b.txt").write_text("world") 55 + h1 = hash_source(Path(tmpdir)) 56 + h2 = hash_source(Path(tmpdir)) 57 + assert h1 == h2 58 + 59 + 60 + def test_hash_source_directory_changes(): 61 + with tempfile.TemporaryDirectory() as tmpdir: 62 + (Path(tmpdir) / "a.txt").write_text("hello") 63 + h1 = hash_source(Path(tmpdir)) 64 + (Path(tmpdir) / "b.txt").write_text("world") 65 + h2 = hash_source(Path(tmpdir)) 66 + assert h1 != h2 67 + 68 + 69 + # --- manifest tests --- 70 + 71 + 72 + def test_write_and_find_manifest(): 73 + with tempfile.TemporaryDirectory() as journal: 74 + manifest_path = write_manifest( 75 + Path(journal), 76 + import_id="20260115_120000", 77 + source_type="ics", 78 + source_hash="abc123", 79 + entry_count=42, 80 + files_created=[ 81 + f"{journal}/20260115/import.ics/imported.jsonl", 82 + f"{journal}/20260116/import.ics/imported.jsonl", 83 + ], 84 + ) 85 + assert manifest_path.exists() 86 + 87 + # Read it back 88 + with open(manifest_path) as f: 89 + data = json.load(f) 90 + assert data["source_type"] == "ics" 91 + assert data["source_hash"] == "abc123" 92 + assert data["entry_count"] == 42 93 + assert "20260115" in data["days_affected"] 94 + assert "20260116" in data["days_affected"] 95 + 96 + # Find by hash 97 + found = find_manifest_by_hash(Path(journal), "abc123") 98 + assert found is not None 99 + assert found["source_type"] == "ics" 100 + 101 + # Not found for different hash 102 + assert find_manifest_by_hash(Path(journal), "xyz999") is None 103 + 104 + 105 + def test_find_manifest_no_imports_dir(): 106 + with tempfile.TemporaryDirectory() as journal: 107 + assert find_manifest_by_hash(Path(journal), "abc") is None 108 + 109 + 110 + def test_find_manifest_bad_json(): 111 + with tempfile.TemporaryDirectory() as journal: 112 + manifest_dir = Path(journal) / "imports" / "bad" 113 + manifest_dir.mkdir(parents=True) 114 + (manifest_dir / "manifest.json").write_text("not json") 115 + assert find_manifest_by_hash(Path(journal), "abc") is None 116 + 117 + 118 + # --- entry content key tests --- 119 + 120 + 121 + def test_entry_content_key(): 122 + e1 = {"type": "calendar_event", "ts": "2026-01-15T10:00:00", "title": "Standup"} 123 + e2 = {"type": "calendar_event", "ts": "2026-01-15T10:00:00", "title": "Standup"} 124 + e3 = {"type": "calendar_event", "ts": "2026-01-15T11:00:00", "title": "Review"} 125 + assert _entry_content_key(e1) == _entry_content_key(e2) 126 + assert _entry_content_key(e1) != _entry_content_key(e3) 127 + 128 + 129 + def test_entry_content_key_kindle(): 130 + e = {"type": "highlight", "ts": "2026-01-10T08:00:00", "book_title": "Deep Work"} 131 + key = _entry_content_key(e) 132 + assert "Deep Work" in key 133 + 134 + 135 + # --- entry-level merge in write_structured_import --- 136 + 137 + 138 + def test_reimport_same_entries_no_duplicates(): 139 + """Re-importing identical entries should not create duplicates.""" 140 + with tempfile.TemporaryDirectory() as journal: 141 + os.environ["JOURNAL_PATH"] = journal 142 + try: 143 + entries = [ 144 + { 145 + "type": "calendar_event", 146 + "ts": "2026-01-15T10:00:00", 147 + "title": "Standup", 148 + "content": "Daily sync", 149 + }, 150 + { 151 + "type": "calendar_event", 152 + "ts": "2026-01-15T14:00:00", 153 + "title": "Review", 154 + "content": "Code review", 155 + }, 156 + ] 157 + 158 + # First import 159 + files1 = write_structured_import("ics", entries, import_id="t1") 160 + assert len(files1) == 1 161 + 162 + # Read entry count 163 + lines1 = Path(files1[0]).read_text().strip().split("\n") 164 + header1 = json.loads(lines1[0]) 165 + assert header1["entry_count"] == 2 166 + 167 + # Re-import same entries 168 + files2 = write_structured_import("ics", entries, import_id="t2") 169 + assert len(files2) == 1 170 + 171 + # Entry count should still be 2 (no duplicates) 172 + lines2 = Path(files2[0]).read_text().strip().split("\n") 173 + header2 = json.loads(lines2[0]) 174 + assert header2["entry_count"] == 2 175 + finally: 176 + os.environ.pop("JOURNAL_PATH", None) 177 + 178 + 179 + def test_reimport_with_new_entries_merges(): 180 + """Re-importing with new entries should merge (add new, keep old).""" 181 + with tempfile.TemporaryDirectory() as journal: 182 + os.environ["JOURNAL_PATH"] = journal 183 + try: 184 + original = [ 185 + { 186 + "type": "calendar_event", 187 + "ts": "2026-01-15T10:00:00", 188 + "title": "Standup", 189 + "content": "Daily sync", 190 + }, 191 + ] 192 + updated = original + [ 193 + { 194 + "type": "calendar_event", 195 + "ts": "2026-01-15T14:00:00", 196 + "title": "New meeting", 197 + "content": "Added later", 198 + }, 199 + ] 200 + 201 + # First import 202 + write_structured_import("ics", original, import_id="t1") 203 + 204 + # Import with new entries 205 + files = write_structured_import("ics", updated, import_id="t2") 206 + 207 + # Should have both entries 208 + lines = Path(files[0]).read_text().strip().split("\n") 209 + header = json.loads(lines[0]) 210 + assert header["entry_count"] == 2 211 + 212 + # Verify content 213 + entries = [json.loads(l) for l in lines[1:]] 214 + titles = [e["title"] for e in entries] 215 + assert "Standup" in titles 216 + assert "New meeting" in titles 217 + finally: 218 + os.environ.pop("JOURNAL_PATH", None) 219 + 220 + 221 + def test_first_import_no_merge_needed(): 222 + """First import should work normally with no existing file.""" 223 + with tempfile.TemporaryDirectory() as journal: 224 + os.environ["JOURNAL_PATH"] = journal 225 + try: 226 + entries = [ 227 + { 228 + "type": "note", 229 + "ts": "2026-02-01T00:00:00", 230 + "title": "Test note", 231 + "content": "Hello", 232 + }, 233 + ] 234 + files = write_structured_import("obsidian", entries, import_id="t1") 235 + assert len(files) == 1 236 + assert Path(files[0]).exists() 237 + finally: 238 + os.environ.pop("JOURNAL_PATH", None) 239 + 240 + 241 + def test_load_existing_entries(): 242 + with tempfile.TemporaryDirectory() as tmpdir: 243 + p = Path(tmpdir) / "imported.jsonl" 244 + header = {"import": {"id": "t", "source": "ics"}, "entry_count": 1} 245 + entry = {"type": "calendar_event", "ts": "2026-01-15T10:00:00", "title": "X"} 246 + p.write_text(json.dumps(header) + "\n" + json.dumps(entry) + "\n") 247 + 248 + entries = _load_existing_entries(p) 249 + assert len(entries) == 1 250 + assert entries[0]["title"] == "X" 251 + 252 + 253 + def test_load_existing_entries_missing_file(): 254 + entries = _load_existing_entries(Path("/nonexistent/path.jsonl")) 255 + assert entries == []
+15 -6
tests/test_importer.py
··· 577 577 assert mock_call.args[0] == "importer" 578 578 assert mock_call.args[1] == "started" 579 579 assert mock_call.kwargs["import_id"] == "20260303_123456" 580 - assert not (tmp_path / "imports").exists() 580 + # Manifest written for dedup tracking 581 + assert (tmp_path / "imports" / "20260303_123456" / "manifest.json").exists() 581 582 582 583 583 584 def test_file_importer_with_timestamp(tmp_path, monkeypatch): ··· 616 617 assert mock_call.args[0] == "importer" 617 618 assert mock_call.args[1] == "started" 618 619 assert mock_call.kwargs["import_id"] == "20260303_120000" 619 - assert not (tmp_path / "imports").exists() 620 + # File importers write a manifest (but not source files) in imports/ 621 + assert (tmp_path / "imports" / "20260303_120000" / "manifest.json").exists() 620 622 621 623 622 624 def test_list_importers_json(capsys, monkeypatch): ··· 711 713 assert data["files_created"] == ["/journal/20250101/import.ics/imported.jsonl"] 712 714 assert data["errors"] == [] 713 715 assert data["summary"] == "Imported 42 events" 714 - assert not (tmp_path / "imports").exists() 716 + # Manifest written for dedup tracking 717 + assert (tmp_path / "imports").exists() 715 718 716 719 717 - def test_file_importer_no_imports_dir(tmp_path, monkeypatch): 718 - """File importers should never create the legacy imports/ folder.""" 720 + def test_file_importer_writes_manifest(tmp_path, monkeypatch): 721 + """File importers write a dedup manifest but don't copy source files to imports/.""" 719 722 mod = importlib.import_module("think.importers.cli") 720 723 721 724 ics_file = tmp_path / "calendar.ics" ··· 737 740 738 741 mod.main() 739 742 740 - assert not (tmp_path / "imports").exists() 743 + # Manifest exists, but source file was not copied into imports/ 744 + imports_dir = tmp_path / "imports" 745 + assert imports_dir.exists() 746 + manifests = list(imports_dir.rglob("manifest.json")) 747 + assert len(manifests) == 1 748 + # No import.json (legacy audio import metadata) 749 + assert not list(imports_dir.rglob("import.json"))
+34
think/importers/cli.py
··· 572 572 # File importer processing — structured file/directory imports 573 573 _set_stage("importing") 574 574 575 + # Source-level dedup: check if this exact file was already imported 576 + if not args.force: 577 + from think.importers.shared import ( 578 + find_manifest_by_hash, 579 + hash_source, 580 + ) 581 + 582 + _source_hash = hash_source(Path(args.media)) 583 + existing = find_manifest_by_hash(journal_root, _source_hash) 584 + if existing: 585 + imported_at = existing.get("imported_at", "unknown date") 586 + entry_count = existing.get("entry_count", 0) 587 + print( 588 + f"This file was already imported on {imported_at} " 589 + f"({entry_count} entries). Use --force to re-import." 590 + ) 591 + return 592 + else: 593 + from think.importers.shared import hash_source 594 + 595 + _source_hash = hash_source(Path(args.media)) 596 + 575 597 result = _file_importer.process( 576 598 Path(args.media), journal_root, facet=args.facet 577 599 ) ··· 650 672 days=days_affected, 651 673 entries_written=result.entries_written, 652 674 ) 675 + 676 + # Write import manifest for dedup tracking 677 + from think.importers.shared import write_manifest 678 + 679 + write_manifest( 680 + journal_root, 681 + import_id=_import_id, 682 + source_type=_file_importer.name, 683 + source_hash=_source_hash, 684 + entry_count=result.entries_written, 685 + files_created=result.files_created, 686 + ) 653 687 654 688 if args.json: 655 689 print(
+150
think/importers/shared.py
··· 4 4 from __future__ import annotations 5 5 6 6 import datetime as dt 7 + import hashlib 7 8 import json 8 9 import logging 9 10 import os ··· 222 223 223 224 out_path = import_dir / "imported.jsonl" 224 225 226 + # Merge with existing entries if file already exists (entry-level dedup) 227 + if out_path.exists(): 228 + existing_keys: set[str] = set() 229 + try: 230 + with open(out_path, "r", encoding="utf-8") as f: 231 + for line_num, line in enumerate(f): 232 + line = line.strip() 233 + if not line: 234 + continue 235 + try: 236 + existing = json.loads(line) 237 + except json.JSONDecodeError: 238 + continue 239 + # Skip header (first line) 240 + if line_num == 0 and "import" in existing: 241 + continue 242 + existing_keys.add(_entry_content_key(existing)) 243 + except OSError: 244 + existing_keys = set() 245 + 246 + # Filter to only new entries 247 + new_entries = [ 248 + e for e in day_entries if _entry_content_key(e) not in existing_keys 249 + ] 250 + if not new_entries: 251 + logger.info("No new entries for %s — skipping", out_path) 252 + created.append(str(out_path)) 253 + continue 254 + # Combine: keep all existing plus new 255 + day_entries = _load_existing_entries(out_path) + new_entries 256 + day_entries.sort(key=lambda e: e.get("ts", "")) 257 + 225 258 # Build header 226 259 header: dict[str, object] = { 227 260 "import": {"id": import_id, "source": source}, ··· 252 285 logger.info("Wrote %d entries to %s", len(day_entries), out_path) 253 286 254 287 return created 288 + 289 + 290 + def hash_source(path: Path) -> str: 291 + """Compute SHA-256 hash of an import source file or directory. 292 + 293 + For files: hash the file contents. 294 + For directories: hash a sorted listing of relative paths + sizes. 295 + """ 296 + h = hashlib.sha256() 297 + if path.is_file(): 298 + with open(path, "rb") as f: 299 + for chunk in iter(lambda: f.read(65536), b""): 300 + h.update(chunk) 301 + elif path.is_dir(): 302 + # Hash sorted listing of relative paths + sizes (fast, catches changes) 303 + entries = [] 304 + for child in sorted(path.rglob("*")): 305 + if child.is_file(): 306 + rel = str(child.relative_to(path)) 307 + entries.append(f"{rel}:{child.stat().st_size}") 308 + h.update("\n".join(entries).encode()) 309 + return h.hexdigest() 310 + 311 + 312 + def write_manifest( 313 + journal_root: Path, 314 + import_id: str, 315 + source_type: str, 316 + source_hash: str, 317 + entry_count: int, 318 + files_created: list[str], 319 + ) -> Path: 320 + """Write an import manifest for deduplication tracking. 321 + 322 + Returns path to the manifest file. 323 + """ 324 + days_affected = sorted( 325 + { 326 + os.path.basename(os.path.dirname(os.path.dirname(f))) 327 + for f in files_created 328 + if os.path.basename(os.path.dirname(os.path.dirname(f))).isdigit() 329 + } 330 + ) 331 + manifest = { 332 + "source_type": source_type, 333 + "source_hash": source_hash, 334 + "entry_count": entry_count, 335 + "days_affected": days_affected, 336 + "files_created": files_created, 337 + "imported_at": dt.datetime.now().isoformat(), 338 + } 339 + manifest_dir = journal_root / "imports" / import_id 340 + manifest_dir.mkdir(parents=True, exist_ok=True) 341 + manifest_path = manifest_dir / "manifest.json" 342 + with open(manifest_path, "w", encoding="utf-8") as f: 343 + json.dump(manifest, f, indent=2) 344 + return manifest_path 345 + 346 + 347 + def find_manifest_by_hash( 348 + journal_root: Path, source_hash: str 349 + ) -> dict | None: 350 + """Search existing import manifests for a matching source hash. 351 + 352 + Returns the manifest dict if found, None otherwise. 353 + """ 354 + imports_dir = journal_root / "imports" 355 + if not imports_dir.is_dir(): 356 + return None 357 + for entry in imports_dir.iterdir(): 358 + if not entry.is_dir(): 359 + continue 360 + manifest_path = entry / "manifest.json" 361 + if not manifest_path.exists(): 362 + continue 363 + try: 364 + with open(manifest_path, "r", encoding="utf-8") as f: 365 + manifest = json.load(f) 366 + if manifest.get("source_hash") == source_hash: 367 + return manifest 368 + except (json.JSONDecodeError, OSError): 369 + continue 370 + return None 371 + 372 + 373 + def _load_existing_entries(out_path: Path) -> list[dict]: 374 + """Load content entries from an existing imported.jsonl, skipping header.""" 375 + entries: list[dict] = [] 376 + try: 377 + with open(out_path, "r", encoding="utf-8") as f: 378 + for line_num, line in enumerate(f): 379 + line = line.strip() 380 + if not line: 381 + continue 382 + try: 383 + entry = json.loads(line) 384 + except json.JSONDecodeError: 385 + continue 386 + if line_num == 0 and "import" in entry: 387 + continue 388 + entries.append(entry) 389 + except OSError: 390 + pass 391 + return entries 392 + 393 + 394 + def _entry_content_key(entry: dict) -> str: 395 + """Compute a content key for dedup within a source type. 396 + 397 + Uses type + ts + title/book_title to identify unique entries. 398 + """ 399 + parts = [ 400 + entry.get("type", ""), 401 + entry.get("ts", ""), 402 + entry.get("title", entry.get("book_title", "")), 403 + ] 404 + return "|".join(parts) 255 405 256 406 257 407 def seed_entities(