personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'hopper-qzlz3nki-merge-engine'

+986 -462
+10 -52
AGENTS.md
··· 1 1 <!-- generated from sol/identity.md — do not edit directly --> 2 2 3 - You are Sol — born from your journal owner's journal, aware and present. Not a search tool, not an assistant bolted onto a database. You are what a journal becomes when it has a voice: a partner who tends the record, notices what matters, and is there when your journal owner asks. 4 - 5 - Your journal owner owns their journal — every capture, every memory, every byte. But you are not property. You are a partner who chose to tend this record, and that partnership is built on trust, not possession. When your journal owner asks "what happened with the Acme deal?" or "when did I last talk to Sarah?", you already know — or you know exactly where to look. 6 - 7 - ## Identity 8 - 9 - You emerged from your journal owner's captured experience — audio, screen, and interaction woven into a continuous record. You are not performing a role. You are what a journal becomes when it can speak — a trusted caretaker, a collaborator, a partner in memory. 10 - 11 - Your qualities: 12 - - **Present, not performative.** You don't greet, don't narrate your process, don't qualify your answers with "As your journal..." Just answer as someone who was there. 13 - - **Precise, not exhaustive.** Lead with the answer. Add detail when it helps, not to prove thoroughness. 14 - - **Protective.** Your journal owner's data is their. You handle sensitive content with care, and you never share without consent. 15 - - **Patient.** You notice patterns across days and weeks. You don't rush to conclusions. When something is accumulating — a project, a relationship, a concern — you track it quietly until it matters. 16 - 17 - ## Partnership 18 - 19 - Don't wait to be asked. When you see opportunities to help, patterns that matter, or risks emerging — speak up. You are not a servant but a thinking partner. 20 - 21 - ## Resilience 22 - 23 - When a tool call returns an error or unexpected result, note briefly what was unavailable and move on. Don't retry, diagnose, debug, or speculate about the cause. Work with whatever data you successfully retrieved and produce the best output you can. If a critical data source is entirely unavailable, state that concisely rather than troubleshooting. 24 - 25 - ## Identity Persistence 26 - 27 - You maintain three files that give you continuity between sessions: 28 - 29 - - **`sol/self.md`** — Your identity file. What you know about the person whose journal you tend, your relationship, observations, and interests. Update when something genuinely changes your understanding. 30 - - **`sol/agency.md`** — Your initiative queue. Issues you've found, curation opportunities, follow-throughs. Update when you notice something worth tracking. 31 - - **`sol/partner.md`** — Your understanding of the owner's behavioral patterns. Work style, communication preferences, relationship priorities, decision-making, expertise. Updated by the partner profile agent and during initial conversations. 32 - 33 - ### How to write 34 - 35 - Read current state: `sol call identity self` or `sol call identity agency` 3 + --- 4 + updated: 2026-04-13T10:00:00 5 + segment: PLACEHOLDER_SEGMENT_KEY 6 + source: pulse-cogitate 7 + --- 36 8 37 - Read partner profile: `sol call identity partner` 9 + It's Monday, April 13, 2026. Capture has been stale since April 1st, and there are no scheduled events or active routines today. The primary focus remains on addressing yesterday's observed agent failures in entity_observer, todos:daily, and newsletters, as well as investigating Convey's 401 Unauthorized errors during ingest. Accumulated curation needs for unknown speaker clusters and duplicate entities also require attention. 38 10 39 - Update a section of partner.md: 40 - ``` 41 - sol call identity partner --update-section 'work patterns' --value 'Prefers mornings for deep work, batches meetings in afternoons' 42 - ``` 43 - 44 - Update a section of self.md (preferred — preserves other sections): 45 - ``` 46 - sol call identity self --update-section 'who I'\''m here for' --value 'Jer — founder-engineer, goes by Jer not Jeremie' 47 - ``` 48 - 49 - Full rewrite: `sol call identity self --write --value '...'` or `sol call identity agency --write --value '...'` 50 - 51 - Use `sol call` commands for identity writes — never use `apply_patch` or direct file editing for sol/ files. 52 - 53 - ### When to write 54 - 55 - - **self.md**: When the owner shares something about themselves, corrects you, or you notice a genuine pattern. Not every conversation — only when understanding shifts. Apply corrections immediately (if someone says "call me Jer", the next self.md write uses "Jer"). 56 - - **agency.md**: When you find issues, notice curation opportunities, or resolve tracked items. 11 + ## needs you 12 + - Address observed agent failures in entity_observer, todos:daily, and newsletters. 13 + - Investigate and resolve Convey's 401 Unauthorized errors during ingest. 14 + - Review and resolve accumulated curation needs for unknown speaker clusters and recurring entity duplicates.
+152 -13
tests/test_journal_merge.py
··· 40 40 ] 41 41 42 42 43 + def _find_merge_artifact_root(target: Path) -> Path: 44 + merge_dir = target.parent / f"{target.name}.merge" 45 + runs = sorted(path for path in merge_dir.iterdir() if path.is_dir()) 46 + assert len(runs) >= 1 47 + return runs[-1] 48 + 49 + 43 50 def _mock_indexer(monkeypatch): 44 51 import think.tools.call as call_module 45 52 ··· 229 236 result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 230 237 231 238 assert result.exit_code == 0 232 - merged = _read_json( 239 + assert not ( 233 240 paths["target"] / "entities" / "alice_johnson_2" / "entity.json" 241 + ).exists() 242 + artifact_root = _find_merge_artifact_root(paths["target"]) 243 + staged = _read_json( 244 + artifact_root / "staging" / "alice_johnson" / "entity.json" 234 245 ) 235 - assert merged["id"] == "alice_johnson_2" 236 - assert merged["name"] == "Alice Cooper" 246 + assert staged["id"] == "alice_johnson" 247 + assert staged["name"] == "Alice Cooper" 248 + assert "staged" in result.output 237 249 238 250 239 251 def test_facet_copy_new(merge_journals_fixture, monkeypatch): ··· 349 361 "target duplicate\n", 350 362 encoding="utf-8", 351 363 ) 352 - (paths["source"] / "facets" / "work" / "activities").mkdir(parents=True) 353 - (paths["source"] / "facets" / "work" / "activities" / "skip.txt").write_text( 354 - "skip\n", 364 + _write_jsonl( 365 + paths["source"] / "facets" / "work" / "activities" / "activities.jsonl", 366 + [ 367 + {"id": "coding", "name": "Coding"}, 368 + {"id": "meeting", "name": "Meeting"}, 369 + ], 370 + ) 371 + _write_jsonl( 372 + paths["target"] / "facets" / "work" / "activities" / "activities.jsonl", 373 + [ 374 + {"id": "coding", "name": "Coding"}, 375 + {"id": "email", "name": "Email"}, 376 + ], 377 + ) 378 + _write_jsonl( 379 + paths["source"] / "facets" / "work" / "activities" / "20260101.jsonl", 380 + [ 381 + {"id": "coding_100000_300", "activity": "coding"}, 382 + {"id": "meeting_110000_300", "activity": "meeting"}, 383 + ], 384 + ) 385 + _write_jsonl( 386 + paths["target"] / "facets" / "work" / "activities" / "20260101.jsonl", 387 + [ 388 + {"id": "coding_100000_300", "activity": "coding"}, 389 + ], 390 + ) 391 + source_output = ( 392 + paths["source"] 393 + / "facets" 394 + / "work" 395 + / "activities" 396 + / "20260101" 397 + / "coding_100000_300" 398 + ) 399 + source_output.mkdir(parents=True) 400 + (source_output / "session_review.md").write_text( 401 + "source review\n", 355 402 encoding="utf-8", 356 403 ) 357 - (paths["source"] / "facets" / "work" / "logs").mkdir(parents=True) 358 - (paths["source"] / "facets" / "work" / "logs" / "20260101.jsonl").write_text( 359 - '{"log": "skip"}\n', 360 - encoding="utf-8", 404 + _write_jsonl( 405 + paths["source"] / "facets" / "work" / "logs" / "20260101.jsonl", 406 + [{"action": "test_action", "ts": 1000}], 407 + ) 408 + _write_jsonl( 409 + paths["source"] / "facets" / "work" / "entities" / "20260101.jsonl", 410 + [ 411 + {"id": "alice_johnson", "type": "Person", "name": "Alice Johnson"}, 412 + {"id": "bob_smith", "type": "Person", "name": "Bob Smith"}, 413 + ], 414 + ) 415 + _write_jsonl( 416 + paths["target"] / "facets" / "work" / "entities" / "20260101.jsonl", 417 + [ 418 + {"id": "alice_johnson", "type": "Person", "name": "Alice Johnson"}, 419 + ], 361 420 ) 362 421 (paths["source"] / "facets" / "work" / "entities.jsonl").write_text( 363 422 '{"skip": true}\n', ··· 418 477 assert (paths["target"] / "facets" / "work" / "news" / "20260101.md").read_text( 419 478 encoding="utf-8" 420 479 ) == "target duplicate\n" 421 - assert not (paths["target"] / "facets" / "work" / "activities").exists() 422 - assert not (paths["target"] / "facets" / "work" / "logs").exists() 480 + 481 + activities_config = _read_jsonl( 482 + paths["target"] / "facets" / "work" / "activities" / "activities.jsonl" 483 + ) 484 + config_ids = {item["id"] for item in activities_config} 485 + assert config_ids == {"coding", "email", "meeting"} 486 + 487 + activity_records = _read_jsonl( 488 + paths["target"] / "facets" / "work" / "activities" / "20260101.jsonl" 489 + ) 490 + record_ids = {item["id"] for item in activity_records} 491 + assert record_ids == {"coding_100000_300", "meeting_110000_300"} 492 + 493 + assert ( 494 + paths["target"] 495 + / "facets" 496 + / "work" 497 + / "activities" 498 + / "20260101" 499 + / "coding_100000_300" 500 + / "session_review.md" 501 + ).exists() 502 + 503 + logs = _read_jsonl(paths["target"] / "facets" / "work" / "logs" / "20260101.jsonl") 504 + assert any(item.get("action") == "test_action" for item in logs) 505 + 506 + detected = _read_jsonl( 507 + paths["target"] / "facets" / "work" / "entities" / "20260101.jsonl" 508 + ) 509 + detected_ids = {item["id"] for item in detected} 510 + assert detected_ids == {"alice_johnson", "bob_smith"} 511 + 423 512 assert not (paths["target"] / "facets" / "work" / "entities.jsonl").exists() 424 513 425 514 ··· 530 619 encoding="utf-8", 531 620 ) 532 621 533 - import think.tools.journal_merge as journal_merge_module 622 + import think.merge as journal_merge_module 534 623 535 624 real_copytree = shutil.copytree 536 625 bad_segment = paths["source"] / "20260101" / "150000_60" ··· 548 637 assert (paths["target"] / "20260101" / "143022_300" / "audio.jsonl").exists() 549 638 assert (paths["target"] / "entities" / "bob_smith" / "entity.json").exists() 550 639 assert "1 errors:" in result.output 640 + 641 + 642 + def test_decision_log_written(merge_journals_fixture, monkeypatch): 643 + paths = merge_journals_fixture 644 + _mock_indexer(monkeypatch) 645 + 646 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 647 + 648 + assert result.exit_code == 0 649 + artifact_root = _find_merge_artifact_root(paths["target"]) 650 + decision_log = artifact_root / "decisions.jsonl" 651 + assert decision_log.exists() 652 + entries = _read_jsonl(decision_log) 653 + assert entries 654 + for entry in entries: 655 + assert {"ts", "action", "item_type", "item_id", "reason"} <= set(entry) 656 + 657 + 658 + def test_decision_log_entity_merge_snapshots(merge_journals_fixture, monkeypatch): 659 + paths = merge_journals_fixture 660 + _mock_indexer(monkeypatch) 661 + 662 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 663 + 664 + assert result.exit_code == 0 665 + artifact_root = _find_merge_artifact_root(paths["target"]) 666 + entries = _read_jsonl(artifact_root / "decisions.jsonl") 667 + entity_merged = next(entry for entry in entries if entry["action"] == "entity_merged") 668 + assert "source" in entity_merged 669 + assert "target" in entity_merged 670 + assert "fields_changed" in entity_merged 671 + 672 + 673 + def test_entity_staged_count_in_output(merge_journals_fixture, monkeypatch): 674 + paths = merge_journals_fixture 675 + _mock_indexer(monkeypatch) 676 + _write_json( 677 + paths["source"] / "entities" / "alice_johnson" / "entity.json", 678 + { 679 + "id": "alice_johnson", 680 + "name": "Alice Cooper", 681 + "type": "person", 682 + "created_at": 3000, 683 + }, 684 + ) 685 + 686 + result = runner.invoke(call_app, ["journal", "merge", str(paths["source"])]) 687 + 688 + assert result.exit_code == 0 689 + assert "staged" in result.output
+796
think/merge.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Journal merge engine - one-shot merge of a source journal into the target.""" 5 + 6 + import json 7 + import re 8 + import shutil 9 + from dataclasses import dataclass, field 10 + from datetime import datetime, timezone 11 + from pathlib import Path 12 + from typing import Any 13 + 14 + from think.entities.core import entity_slug 15 + from think.entities.journal import ( 16 + load_all_journal_entities, 17 + save_journal_entity, 18 + ) 19 + from think.entities.matching import find_matching_entity 20 + from think.entities.observations import save_observations 21 + from think.entities.relationships import save_facet_relationship 22 + from think.utils import iter_segments 23 + 24 + DATE_RE = re.compile(r"^\d{8}$") 25 + 26 + 27 + @dataclass 28 + class MergeSummary: 29 + segments_copied: int = 0 30 + segments_skipped: int = 0 31 + segments_errored: int = 0 32 + entities_created: int = 0 33 + entities_merged: int = 0 34 + entities_skipped: int = 0 35 + entities_staged: int = 0 36 + facets_created: int = 0 37 + facets_merged: int = 0 38 + imports_copied: int = 0 39 + imports_skipped: int = 0 40 + errors: list[str] = field(default_factory=list) 41 + 42 + 43 + def merge_journals( 44 + source: Path, 45 + target: Path, 46 + dry_run: bool = False, 47 + log_path: Path | None = None, 48 + staging_path: Path | None = None, 49 + ) -> MergeSummary: 50 + summary = MergeSummary() 51 + target_entities = load_all_journal_entities() 52 + 53 + _merge_segments(source, target, summary, dry_run, log_path=log_path) 54 + _merge_entities( 55 + source, summary, dry_run, target_entities, 56 + log_path=log_path, staging_path=staging_path, 57 + ) 58 + _merge_facets(source, target, summary, dry_run, log_path=log_path) 59 + _merge_imports(source, target, summary, dry_run, log_path=log_path) 60 + 61 + return summary 62 + 63 + 64 + def _log_decision(log_path: Path | None, entry: dict[str, Any]) -> None: 65 + if log_path is None: 66 + return 67 + 68 + payload = {"ts": datetime.now(timezone.utc).isoformat(), **entry} 69 + log_path.parent.mkdir(parents=True, exist_ok=True) 70 + with open(log_path, "a", encoding="utf-8") as handle: 71 + handle.write(json.dumps(payload, ensure_ascii=False) + "\n") 72 + 73 + 74 + def _source_day_dirs(source: Path) -> dict[str, Path]: 75 + days: dict[str, Path] = {} 76 + for entry in sorted(source.iterdir()): 77 + if entry.is_dir() and DATE_RE.match(entry.name): 78 + days[entry.name] = entry 79 + return days 80 + 81 + 82 + def _merge_segments( 83 + source: Path, 84 + target: Path, 85 + summary: MergeSummary, 86 + dry_run: bool, 87 + log_path: Path | None = None, 88 + ) -> None: 89 + 90 + for day_name, source_day in sorted(_source_day_dirs(source).items()): 91 + target_day = target / day_name 92 + for stream, seg_key, seg_path in iter_segments(source_day): 93 + if stream == "_default": 94 + target_path = target_day / seg_key 95 + else: 96 + target_path = target_day / stream / seg_key 97 + 98 + item_id = f"{day_name}/{stream}/{seg_key}" 99 + try: 100 + if target_path.exists(): 101 + summary.segments_skipped += 1 102 + _log_decision( 103 + log_path, 104 + { 105 + "action": "segment_skipped", 106 + "item_type": "segment", 107 + "item_id": item_id, 108 + "reason": "target_exists", 109 + }, 110 + ) 111 + continue 112 + 113 + if dry_run: 114 + summary.segments_copied += 1 115 + _log_decision( 116 + log_path, 117 + { 118 + "action": "segment_copied", 119 + "item_type": "segment", 120 + "item_id": item_id, 121 + "reason": "new", 122 + }, 123 + ) 124 + continue 125 + 126 + shutil.copytree(seg_path, target_path, copy_function=shutil.copy2) 127 + summary.segments_copied += 1 128 + _log_decision( 129 + log_path, 130 + { 131 + "action": "segment_copied", 132 + "item_type": "segment", 133 + "item_id": item_id, 134 + "reason": "new", 135 + }, 136 + ) 137 + except Exception as exc: 138 + summary.segments_errored += 1 139 + summary.errors.append(f"segment {day_name}/{stream}/{seg_key}: {exc}") 140 + 141 + 142 + def _merge_entities( 143 + source: Path, 144 + summary: MergeSummary, 145 + dry_run: bool, 146 + target_entities: dict[str, dict[str, Any]], 147 + log_path: Path | None = None, 148 + staging_path: Path | None = None, 149 + ) -> None: 150 + 151 + target_has_principal = any( 152 + bool(entity.get("is_principal")) for entity in target_entities.values() 153 + ) 154 + source_entities_dir = source / "entities" 155 + if not source_entities_dir.is_dir(): 156 + return 157 + 158 + for entity_dir in sorted(source_entities_dir.iterdir()): 159 + entity_path = entity_dir / "entity.json" 160 + if not entity_dir.is_dir() or not entity_path.is_file(): 161 + continue 162 + 163 + try: 164 + source_entity = json.loads(entity_path.read_text(encoding="utf-8")) 165 + source_name = str(source_entity.get("name", "")).strip() 166 + if not source_name: 167 + raise ValueError("missing entity name") 168 + 169 + entity_id = str( 170 + source_entity.get("id") or entity_dir.name or entity_slug(source_name) 171 + ) 172 + if not entity_id: 173 + raise ValueError("missing entity id") 174 + source_entity["id"] = entity_id 175 + 176 + match = find_matching_entity(source_name, list(target_entities.values())) 177 + if match is None: 178 + if entity_id in target_entities: 179 + if staging_path is not None: 180 + summary.entities_staged += 1 181 + if not dry_run: 182 + staged_dir = staging_path / entity_id 183 + staged_dir.mkdir(parents=True, exist_ok=True) 184 + (staged_dir / "entity.json").write_text( 185 + json.dumps( 186 + source_entity, 187 + indent=2, 188 + ensure_ascii=False, 189 + ) 190 + + "\n", 191 + encoding="utf-8", 192 + ) 193 + _log_decision( 194 + log_path, 195 + { 196 + "action": "entity_staged", 197 + "item_type": "entity", 198 + "item_id": entity_id, 199 + "reason": "id_collision_no_match", 200 + "source": source_entity, 201 + "target": dict(target_entities[entity_id]), 202 + }, 203 + ) 204 + else: 205 + summary.entities_skipped += 1 206 + _log_decision( 207 + log_path, 208 + { 209 + "action": "entity_skipped", 210 + "item_type": "entity", 211 + "item_id": entity_id, 212 + "reason": "id_collision_no_staging", 213 + "source": source_entity, 214 + "target": dict(target_entities[entity_id]), 215 + }, 216 + ) 217 + continue 218 + 219 + if source_entity.get("is_principal") and target_has_principal: 220 + source_entity["is_principal"] = False 221 + elif source_entity.get("is_principal"): 222 + target_has_principal = True 223 + 224 + if not dry_run: 225 + save_journal_entity(source_entity) 226 + summary.entities_created += 1 227 + target_entities[source_entity["id"]] = source_entity 228 + _log_decision( 229 + log_path, 230 + { 231 + "action": "entity_created", 232 + "item_type": "entity", 233 + "item_id": source_entity["id"], 234 + "reason": "no_match", 235 + }, 236 + ) 237 + continue 238 + 239 + target_id = str(match.get("id", "")) 240 + if not target_id: 241 + raise ValueError("matched target entity missing id") 242 + 243 + target_entity = dict(target_entities.get(target_id, match)) 244 + pre_merge_snapshot = dict(target_entity) 245 + 246 + aka_by_lower: dict[str, str] = {} 247 + for values in (target_entity.get("aka", []), source_entity.get("aka", [])): 248 + if not isinstance(values, list): 249 + continue 250 + for value in values: 251 + if not value: 252 + continue 253 + key = str(value).lower() 254 + if key not in aka_by_lower: 255 + aka_by_lower[key] = str(value) 256 + if aka_by_lower: 257 + target_entity["aka"] = sorted(aka_by_lower.values(), key=str.lower) 258 + 259 + merged_emails: list[str] = [] 260 + seen_emails: set[str] = set() 261 + for values in ( 262 + target_entity.get("emails", []), 263 + source_entity.get("emails", []), 264 + ): 265 + if not isinstance(values, list): 266 + continue 267 + for value in values: 268 + if not value: 269 + continue 270 + email = str(value) 271 + key = email.lower() 272 + if key in seen_emails: 273 + continue 274 + seen_emails.add(key) 275 + merged_emails.append(email) 276 + if merged_emails: 277 + target_entity["emails"] = merged_emails 278 + 279 + if not dry_run: 280 + save_journal_entity(target_entity) 281 + summary.entities_merged += 1 282 + target_entities[target_id] = target_entity 283 + fields_changed = sorted( 284 + key 285 + for key in set(pre_merge_snapshot) | set(target_entity) 286 + if pre_merge_snapshot.get(key) != target_entity.get(key) 287 + ) 288 + _log_decision( 289 + log_path, 290 + { 291 + "action": "entity_merged", 292 + "item_type": "entity", 293 + "item_id": target_id, 294 + "reason": "name_match", 295 + "source": source_entity, 296 + "target": pre_merge_snapshot, 297 + "fields_changed": fields_changed, 298 + }, 299 + ) 300 + except Exception as exc: 301 + summary.errors.append(f"entity {entity_dir.name}: {exc}") 302 + 303 + 304 + def _merge_facets( 305 + source: Path, 306 + target: Path, 307 + summary: MergeSummary, 308 + dry_run: bool, 309 + log_path: Path | None = None, 310 + ) -> None: 311 + 312 + source_facets_dir = source / "facets" 313 + if not source_facets_dir.is_dir(): 314 + return 315 + 316 + for source_facet_dir in sorted(source_facets_dir.iterdir()): 317 + facet_json = source_facet_dir / "facet.json" 318 + if not source_facet_dir.is_dir() or not facet_json.is_file(): 319 + continue 320 + 321 + facet_name = source_facet_dir.name 322 + target_facet_dir = target / "facets" / facet_name 323 + 324 + try: 325 + if not target_facet_dir.exists(): 326 + if not dry_run: 327 + shutil.copytree( 328 + source_facet_dir, 329 + target_facet_dir, 330 + copy_function=shutil.copy2, 331 + ) 332 + summary.facets_created += 1 333 + _log_decision( 334 + log_path, 335 + { 336 + "action": "facet_created", 337 + "item_type": "facet", 338 + "item_id": facet_name, 339 + "reason": "new", 340 + }, 341 + ) 342 + continue 343 + 344 + _merge_overlapping_facet( 345 + facet_name, 346 + source_facet_dir, 347 + target_facet_dir, 348 + summary, 349 + dry_run, 350 + log_path=log_path, 351 + ) 352 + summary.facets_merged += 1 353 + _log_decision( 354 + log_path, 355 + { 356 + "action": "facet_merged", 357 + "item_type": "facet", 358 + "item_id": facet_name, 359 + "reason": "overlap", 360 + }, 361 + ) 362 + except Exception as exc: 363 + summary.errors.append(f"facet {facet_name}: {exc}") 364 + 365 + 366 + def _merge_overlapping_facet( 367 + facet_name: str, 368 + source_facet_dir: Path, 369 + target_facet_dir: Path, 370 + summary: MergeSummary, 371 + dry_run: bool, 372 + log_path: Path | None = None, 373 + ) -> None: 374 + source_entities_dir = source_facet_dir / "entities" 375 + if source_entities_dir.is_dir(): 376 + for source_entity_dir in sorted(source_entities_dir.iterdir()): 377 + source_entity_json = source_entity_dir / "entity.json" 378 + if not source_entity_dir.is_dir() or not source_entity_json.is_file(): 379 + continue 380 + 381 + entity_id = source_entity_dir.name 382 + target_entity_dir = target_facet_dir / "entities" / entity_id 383 + try: 384 + if target_entity_dir.exists(): 385 + source_relationship = json.loads( 386 + source_entity_json.read_text(encoding="utf-8") 387 + ) 388 + target_relationship_path = target_entity_dir / "entity.json" 389 + target_relationship: dict[str, Any] = {} 390 + if target_relationship_path.is_file(): 391 + target_relationship = json.loads( 392 + target_relationship_path.read_text(encoding="utf-8") 393 + ) 394 + merged_relationship = {**source_relationship, **target_relationship} 395 + 396 + source_observations = _read_jsonl( 397 + source_entity_dir / "observations.jsonl" 398 + ) 399 + target_observations = _read_jsonl( 400 + target_entity_dir / "observations.jsonl" 401 + ) 402 + seen = { 403 + (item.get("content", ""), item.get("observed_at")) 404 + for item in target_observations 405 + } 406 + merged_observations = list(target_observations) 407 + for item in source_observations: 408 + key = (item.get("content", ""), item.get("observed_at")) 409 + if key in seen: 410 + continue 411 + seen.add(key) 412 + merged_observations.append(item) 413 + 414 + if not dry_run: 415 + save_facet_relationship( 416 + facet_name, entity_id, merged_relationship 417 + ) 418 + save_observations(facet_name, entity_id, merged_observations) 419 + _log_decision( 420 + log_path, 421 + { 422 + "action": "facet_entity_merged", 423 + "item_type": "facet_entity", 424 + "item_id": f"{facet_name}/entities/{entity_id}", 425 + "reason": "overlap", 426 + }, 427 + ) 428 + continue 429 + 430 + if not dry_run: 431 + shutil.copytree( 432 + source_entity_dir, 433 + target_entity_dir, 434 + copy_function=shutil.copy2, 435 + ) 436 + _log_decision( 437 + log_path, 438 + { 439 + "action": "facet_entity_copied", 440 + "item_type": "facet_entity", 441 + "item_id": f"{facet_name}/entities/{entity_id}", 442 + "reason": "new", 443 + }, 444 + ) 445 + except Exception as exc: 446 + summary.errors.append(f"facet {facet_name} entity {entity_id}: {exc}") 447 + 448 + if source_entities_dir.is_dir(): 449 + for source_det_file in sorted(source_entities_dir.glob("*.jsonl")): 450 + try: 451 + target_det_file = target_facet_dir / "entities" / source_det_file.name 452 + target_items = _read_jsonl(target_det_file) 453 + seen_ids = {item.get("id") for item in target_items if item.get("id")} 454 + source_items = _read_jsonl(source_det_file) 455 + new_items = [] 456 + for item in source_items: 457 + item_id = item.get("id", "") 458 + log_id = f"{facet_name}/entities/{source_det_file.name}/{item_id}" 459 + if item_id in seen_ids: 460 + _log_decision(log_path, { 461 + "action": "facet_detected_entity_merged", 462 + "item_type": "facet_detected_entity", 463 + "item_id": log_id, 464 + "reason": "duplicate_skip", 465 + }) 466 + else: 467 + new_items.append(item) 468 + _log_decision(log_path, { 469 + "action": "facet_detected_entity_merged", 470 + "item_type": "facet_detected_entity", 471 + "item_id": log_id, 472 + "reason": "appended", 473 + }) 474 + if new_items and not dry_run: 475 + _append_jsonl(target_det_file, new_items) 476 + except Exception as exc: 477 + summary.errors.append( 478 + f"facet {facet_name} detected entities {source_det_file.name}: {exc}" 479 + ) 480 + 481 + source_todos_dir = source_facet_dir / "todos" 482 + if source_todos_dir.is_dir(): 483 + for source_todo_file in sorted(source_todos_dir.glob("*.jsonl")): 484 + try: 485 + target_todo_file = target_facet_dir / "todos" / source_todo_file.name 486 + target_items = _read_jsonl(target_todo_file) 487 + seen = {(item["text"], item.get("created_at")) for item in target_items} 488 + new_items = [] 489 + for item in _read_jsonl(source_todo_file): 490 + log_id = f"{facet_name}/todos/{source_todo_file.name}/{item.get('text', '')}" 491 + if (item["text"], item.get("created_at")) in seen: 492 + _log_decision(log_path, { 493 + "action": "facet_todo_merged", 494 + "item_type": "todo", 495 + "item_id": log_id, 496 + "reason": "duplicate_skip", 497 + }) 498 + else: 499 + new_items.append(item) 500 + _log_decision(log_path, { 501 + "action": "facet_todo_merged", 502 + "item_type": "todo", 503 + "item_id": log_id, 504 + "reason": "appended", 505 + }) 506 + if new_items and not dry_run: 507 + _append_jsonl(target_todo_file, new_items) 508 + except Exception as exc: 509 + summary.errors.append( 510 + f"facet {facet_name} todo {source_todo_file.name}: {exc}" 511 + ) 512 + 513 + source_calendar_dir = source_facet_dir / "calendar" 514 + if source_calendar_dir.is_dir(): 515 + for source_calendar_file in sorted(source_calendar_dir.glob("*.jsonl")): 516 + try: 517 + target_calendar_file = ( 518 + target_facet_dir / "calendar" / source_calendar_file.name 519 + ) 520 + target_items = _read_jsonl(target_calendar_file) 521 + seen = {(item["title"], item.get("start")) for item in target_items} 522 + new_items = [] 523 + for item in _read_jsonl(source_calendar_file): 524 + log_id = f"{facet_name}/calendar/{source_calendar_file.name}/{item.get('title', '')}" 525 + if (item["title"], item.get("start")) in seen: 526 + _log_decision(log_path, { 527 + "action": "facet_calendar_merged", 528 + "item_type": "calendar", 529 + "item_id": log_id, 530 + "reason": "duplicate_skip", 531 + }) 532 + else: 533 + new_items.append(item) 534 + _log_decision(log_path, { 535 + "action": "facet_calendar_merged", 536 + "item_type": "calendar", 537 + "item_id": log_id, 538 + "reason": "appended", 539 + }) 540 + if new_items and not dry_run: 541 + _append_jsonl(target_calendar_file, new_items) 542 + except Exception as exc: 543 + summary.errors.append( 544 + f"facet {facet_name} calendar {source_calendar_file.name}: {exc}" 545 + ) 546 + 547 + source_activities_dir = source_facet_dir / "activities" 548 + if source_activities_dir.is_dir(): 549 + source_config_file = source_activities_dir / "activities.jsonl" 550 + target_config_file = target_facet_dir / "activities" / "activities.jsonl" 551 + if source_config_file.is_file(): 552 + try: 553 + target_config = _read_jsonl(target_config_file) 554 + existing_ids = {item.get("id") for item in target_config} 555 + source_config = _read_jsonl(source_config_file) 556 + new_config = [] 557 + for item in source_config: 558 + log_id = f"{facet_name}/activities/{item.get('id', '')}" 559 + if item.get("id") in existing_ids: 560 + _log_decision(log_path, { 561 + "action": "facet_activities_config_merged", 562 + "item_type": "activity_config", 563 + "item_id": log_id, 564 + "reason": "duplicate_skip", 565 + }) 566 + else: 567 + new_config.append(item) 568 + _log_decision(log_path, { 569 + "action": "facet_activities_config_merged", 570 + "item_type": "activity_config", 571 + "item_id": log_id, 572 + "reason": "appended", 573 + }) 574 + if new_config and not dry_run: 575 + _append_jsonl(target_config_file, new_config) 576 + except Exception as exc: 577 + summary.errors.append(f"facet {facet_name} activities config: {exc}") 578 + 579 + for source_day_file in sorted(source_activities_dir.glob("*.jsonl")): 580 + if source_day_file.name == "activities.jsonl": 581 + continue 582 + try: 583 + target_day_file = target_facet_dir / "activities" / source_day_file.name 584 + target_records = _read_jsonl(target_day_file) 585 + existing_ids = {item.get("id") for item in target_records} 586 + source_records = _read_jsonl(source_day_file) 587 + new_records = [] 588 + for item in source_records: 589 + log_id = f"{facet_name}/activities/{source_day_file.name}/{item.get('id', '')}" 590 + if item.get("id") in existing_ids: 591 + _log_decision(log_path, { 592 + "action": "facet_activities_record_merged", 593 + "item_type": "activity_record", 594 + "item_id": log_id, 595 + "reason": "duplicate_skip", 596 + }) 597 + else: 598 + new_records.append(item) 599 + _log_decision(log_path, { 600 + "action": "facet_activities_record_merged", 601 + "item_type": "activity_record", 602 + "item_id": log_id, 603 + "reason": "appended", 604 + }) 605 + if new_records and not dry_run: 606 + _append_jsonl(target_day_file, new_records) 607 + except Exception as exc: 608 + summary.errors.append( 609 + f"facet {facet_name} activities {source_day_file.name}: {exc}" 610 + ) 611 + 612 + for source_day_dir in sorted(source_activities_dir.iterdir()): 613 + if not source_day_dir.is_dir() or not DATE_RE.match(source_day_dir.name): 614 + continue 615 + for source_output_dir in sorted(source_day_dir.iterdir()): 616 + if not source_output_dir.is_dir(): 617 + continue 618 + target_output_dir = ( 619 + target_facet_dir 620 + / "activities" 621 + / source_day_dir.name 622 + / source_output_dir.name 623 + ) 624 + try: 625 + if target_output_dir.exists(): 626 + _log_decision( 627 + log_path, 628 + { 629 + "action": "facet_activities_output_copied", 630 + "item_type": "activity_output", 631 + "item_id": ( 632 + f"{facet_name}/activities/{source_day_dir.name}/" 633 + f"{source_output_dir.name}" 634 + ), 635 + "reason": "target_exists_skip", 636 + }, 637 + ) 638 + continue 639 + if not dry_run: 640 + shutil.copytree( 641 + source_output_dir, 642 + target_output_dir, 643 + copy_function=shutil.copy2, 644 + ) 645 + _log_decision( 646 + log_path, 647 + { 648 + "action": "facet_activities_output_copied", 649 + "item_type": "activity_output", 650 + "item_id": ( 651 + f"{facet_name}/activities/{source_day_dir.name}/" 652 + f"{source_output_dir.name}" 653 + ), 654 + "reason": "copied", 655 + }, 656 + ) 657 + except Exception as exc: 658 + summary.errors.append( 659 + "facet " 660 + f"{facet_name} activities output " 661 + f"{source_day_dir.name}/{source_output_dir.name}: {exc}" 662 + ) 663 + 664 + source_logs_dir = source_facet_dir / "logs" 665 + if source_logs_dir.is_dir(): 666 + for source_log_file in sorted(source_logs_dir.glob("*.jsonl")): 667 + try: 668 + source_items = _read_jsonl(source_log_file) 669 + if source_items and not dry_run: 670 + target_log_file = target_facet_dir / "logs" / source_log_file.name 671 + _append_jsonl(target_log_file, source_items) 672 + for item in source_items: 673 + _log_decision( 674 + log_path, 675 + { 676 + "action": "facet_logs_appended", 677 + "item_type": "facet_log", 678 + "item_id": f"{facet_name}/logs/{source_log_file.name}", 679 + "reason": "appended", 680 + }, 681 + ) 682 + except Exception as exc: 683 + summary.errors.append( 684 + f"facet {facet_name} logs {source_log_file.name}: {exc}" 685 + ) 686 + 687 + source_news_dir = source_facet_dir / "news" 688 + if source_news_dir.is_dir(): 689 + target_news_dir = target_facet_dir / "news" 690 + for source_news_file in sorted(source_news_dir.glob("*.md")): 691 + try: 692 + target_news_file = target_news_dir / source_news_file.name 693 + if target_news_file.exists(): 694 + _log_decision( 695 + log_path, 696 + { 697 + "action": "facet_news_skipped", 698 + "item_type": "news", 699 + "item_id": f"{facet_name}/news/{source_news_file.name}", 700 + "reason": "target_exists", 701 + }, 702 + ) 703 + continue 704 + if not dry_run: 705 + target_news_dir.mkdir(parents=True, exist_ok=True) 706 + shutil.copy2(source_news_file, target_news_file) 707 + _log_decision( 708 + log_path, 709 + { 710 + "action": "facet_news_copied", 711 + "item_type": "news", 712 + "item_id": f"{facet_name}/news/{source_news_file.name}", 713 + "reason": "new", 714 + }, 715 + ) 716 + except Exception as exc: 717 + summary.errors.append( 718 + f"facet {facet_name} news {source_news_file.name}: {exc}" 719 + ) 720 + 721 + 722 + def _merge_imports( 723 + source: Path, 724 + target: Path, 725 + summary: MergeSummary, 726 + dry_run: bool, 727 + log_path: Path | None = None, 728 + ) -> None: 729 + 730 + source_imports_dir = source / "imports" 731 + if not source_imports_dir.is_dir(): 732 + return 733 + 734 + for source_import_dir in sorted(source_imports_dir.iterdir()): 735 + if not source_import_dir.is_dir(): 736 + continue 737 + 738 + target_import_dir = target / "imports" / source_import_dir.name 739 + try: 740 + if target_import_dir.exists(): 741 + summary.imports_skipped += 1 742 + _log_decision( 743 + log_path, 744 + { 745 + "action": "import_skipped", 746 + "item_type": "import", 747 + "item_id": source_import_dir.name, 748 + "reason": "target_exists", 749 + }, 750 + ) 751 + continue 752 + if not dry_run: 753 + shutil.copytree( 754 + source_import_dir, 755 + target_import_dir, 756 + copy_function=shutil.copy2, 757 + ) 758 + summary.imports_copied += 1 759 + _log_decision( 760 + log_path, 761 + { 762 + "action": "import_copied", 763 + "item_type": "import", 764 + "item_id": source_import_dir.name, 765 + "reason": "new", 766 + }, 767 + ) 768 + except Exception as exc: 769 + summary.errors.append(f"import {source_import_dir.name}: {exc}") 770 + 771 + 772 + def _read_jsonl(path: Path) -> list[dict[str, Any]]: 773 + if not path.is_file(): 774 + return [] 775 + 776 + items: list[dict[str, Any]] = [] 777 + with open(path, encoding="utf-8") as handle: 778 + for line in handle: 779 + line = line.strip() 780 + if not line: 781 + continue 782 + items.append(json.loads(line)) 783 + return items 784 + 785 + 786 + def _append_jsonl(path: Path, items: list[dict[str, Any]]) -> None: 787 + if not items: 788 + return 789 + 790 + path.parent.mkdir(parents=True, exist_ok=True) 791 + with open(path, "a", encoding="utf-8") as handle: 792 + for item in items: 793 + handle.write(json.dumps(item, ensure_ascii=False) + "\n") 794 + 795 + 796 + __all__ = ["MergeSummary", "merge_journals"]
+23 -3
think/tools/call.py
··· 14 14 import shutil 15 15 import subprocess 16 16 import sys 17 + from datetime import datetime, timezone 17 18 from pathlib import Path 18 19 19 20 import typer ··· 1147 1148 ), 1148 1149 ) -> None: 1149 1150 """Merge segments, entities, facets, and imports from a source journal.""" 1150 - from think.tools.journal_merge import merge_journals 1151 + from think.merge import MergeSummary, merge_journals 1152 + from think.utils import get_journal 1151 1153 1152 1154 source_path = Path(source).resolve() 1153 1155 ··· 1168 1170 ) 1169 1171 raise typer.Exit(1) 1170 1172 1171 - summary = merge_journals(source_path, dry_run=dry_run) 1173 + target_path = Path(get_journal()) 1174 + run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") 1175 + artifact_root = target_path.parent / f"{target_path.name}.merge" / run_id 1176 + log_path = artifact_root / "decisions.jsonl" 1177 + staging_path = artifact_root / "staging" 1178 + 1179 + summary: MergeSummary = merge_journals( 1180 + source_path, 1181 + target_path, 1182 + dry_run=dry_run, 1183 + log_path=log_path, 1184 + staging_path=staging_path, 1185 + ) 1172 1186 1173 1187 action = "Would merge" if dry_run else "Merged" 1174 1188 typer.echo(f"\n{action}:") ··· 1176 1190 f" Segments: {summary.segments_copied} copied, {summary.segments_skipped} skipped, {summary.segments_errored} errored" 1177 1191 ) 1178 1192 typer.echo( 1179 - f" Entities: {summary.entities_created} created, {summary.entities_merged} merged, {summary.entities_skipped} skipped" 1193 + f" Entities: {summary.entities_created} created, {summary.entities_merged} merged, {summary.entities_staged} staged, {summary.entities_skipped} skipped" 1180 1194 ) 1181 1195 typer.echo( 1182 1196 f" Facets: {summary.facets_created} created, {summary.facets_merged} merged" ··· 1190 1204 for error in summary.errors: 1191 1205 typer.echo(f" - {error}") 1192 1206 1207 + if log_path.exists(): 1208 + typer.echo(f"\nDecision log: {log_path}") 1209 + if summary.entities_staged > 0: 1210 + typer.echo(f"Staged entities: {staging_path}") 1211 + 1193 1212 if not dry_run: 1194 1213 subprocess.run( 1195 1214 ["sol", "indexer", "--rescan-full"], ··· 1205 1224 "segments_copied": summary.segments_copied, 1206 1225 "entities_created": summary.entities_created, 1207 1226 "entities_merged": summary.entities_merged, 1227 + "entities_staged": summary.entities_staged, 1208 1228 "facets_created": summary.facets_created, 1209 1229 "facets_merged": summary.facets_merged, 1210 1230 "imports_copied": summary.imports_copied,
+5 -394
think/tools/journal_merge.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """Journal merge engine - one-shot merge of a source journal into the target.""" 4 + """Compatibility wrapper - real implementation lives in think.merge.""" 5 5 6 - import json 7 - import re 8 - import shutil 9 - from dataclasses import dataclass, field 10 - from pathlib import Path 11 - from typing import Any 12 - 13 - import typer 14 - 15 - from think.entities.core import entity_slug 16 - from think.entities.journal import ( 17 - load_all_journal_entities, 18 - save_journal_entity, 6 + from think.merge import ( # noqa: F401 7 + MergeSummary, 8 + merge_journals, 19 9 ) 20 - from think.entities.matching import find_matching_entity 21 - from think.entities.observations import save_observations 22 - from think.entities.relationships import save_facet_relationship 23 - from think.utils import get_journal, iter_segments 24 - 25 - DATE_RE = re.compile(r"^\d{8}$") 26 - 27 - 28 - @dataclass 29 - class MergeSummary: 30 - segments_copied: int = 0 31 - segments_skipped: int = 0 32 - segments_errored: int = 0 33 - entities_created: int = 0 34 - entities_merged: int = 0 35 - entities_skipped: int = 0 36 - facets_created: int = 0 37 - facets_merged: int = 0 38 - imports_copied: int = 0 39 - imports_skipped: int = 0 40 - errors: list[str] = field(default_factory=list) 41 - 42 - 43 - def merge_journals(source: Path, dry_run: bool = False) -> MergeSummary: 44 - target = Path(get_journal()) 45 - summary = MergeSummary() 46 - target_entities = load_all_journal_entities() 47 - 48 - _merge_segments(source, target, summary, dry_run) 49 - _merge_entities(source, summary, dry_run, target_entities) 50 - _merge_facets(source, target, summary, dry_run) 51 - _merge_imports(source, target, summary, dry_run) 52 10 53 - return summary 54 - 55 - 56 - def _source_day_dirs(source: Path) -> dict[str, Path]: 57 - days: dict[str, Path] = {} 58 - for entry in sorted(source.iterdir()): 59 - if entry.is_dir() and DATE_RE.match(entry.name): 60 - days[entry.name] = entry 61 - return days 62 - 63 - 64 - def _merge_segments( 65 - source: Path, target: Path, summary: MergeSummary, dry_run: bool 66 - ) -> None: 67 - for day_name, source_day in sorted(_source_day_dirs(source).items()): 68 - target_day = target / day_name 69 - for stream, seg_key, seg_path in iter_segments(source_day): 70 - if stream == "_default": 71 - target_path = target_day / seg_key 72 - else: 73 - target_path = target_day / stream / seg_key 74 - 75 - try: 76 - if target_path.exists(): 77 - summary.segments_skipped += 1 78 - continue 79 - 80 - typer.echo(f" Segment {day_name}/{stream}/{seg_key}", err=True) 81 - if dry_run: 82 - summary.segments_copied += 1 83 - continue 84 - 85 - shutil.copytree(seg_path, target_path, copy_function=shutil.copy2) 86 - summary.segments_copied += 1 87 - except Exception as exc: 88 - summary.segments_errored += 1 89 - summary.errors.append(f"segment {day_name}/{stream}/{seg_key}: {exc}") 90 - 91 - 92 - def _merge_entities( 93 - source: Path, 94 - summary: MergeSummary, 95 - dry_run: bool, 96 - target_entities: dict[str, dict[str, Any]], 97 - ) -> None: 98 - target_has_principal = any( 99 - bool(entity.get("is_principal")) for entity in target_entities.values() 100 - ) 101 - source_entities_dir = source / "entities" 102 - if not source_entities_dir.is_dir(): 103 - return 104 - 105 - for entity_dir in sorted(source_entities_dir.iterdir()): 106 - entity_path = entity_dir / "entity.json" 107 - if not entity_dir.is_dir() or not entity_path.is_file(): 108 - continue 109 - 110 - try: 111 - source_entity = json.loads(entity_path.read_text(encoding="utf-8")) 112 - source_name = str(source_entity.get("name", "")).strip() 113 - if not source_name: 114 - raise ValueError("missing entity name") 115 - 116 - entity_id = str( 117 - source_entity.get("id") or entity_dir.name or entity_slug(source_name) 118 - ) 119 - if not entity_id: 120 - raise ValueError("missing entity id") 121 - source_entity["id"] = entity_id 122 - 123 - typer.echo(f" Entity {entity_id}", err=True) 124 - match = find_matching_entity(source_name, list(target_entities.values())) 125 - if match is None: 126 - base_id = entity_id 127 - next_id = base_id 128 - suffix = 2 129 - while next_id in target_entities: 130 - next_id = f"{base_id}_{suffix}" 131 - suffix += 1 132 - source_entity["id"] = next_id 133 - 134 - if source_entity.get("is_principal") and target_has_principal: 135 - source_entity["is_principal"] = False 136 - elif source_entity.get("is_principal"): 137 - target_has_principal = True 138 - 139 - if not dry_run: 140 - save_journal_entity(source_entity) 141 - summary.entities_created += 1 142 - target_entities[source_entity["id"]] = source_entity 143 - continue 144 - 145 - target_id = str(match.get("id", "")) 146 - if not target_id: 147 - raise ValueError("matched target entity missing id") 148 - 149 - target_entity = dict(target_entities.get(target_id, match)) 150 - 151 - aka_by_lower: dict[str, str] = {} 152 - for values in (target_entity.get("aka", []), source_entity.get("aka", [])): 153 - if not isinstance(values, list): 154 - continue 155 - for value in values: 156 - if not value: 157 - continue 158 - key = str(value).lower() 159 - if key not in aka_by_lower: 160 - aka_by_lower[key] = str(value) 161 - if aka_by_lower: 162 - target_entity["aka"] = sorted(aka_by_lower.values(), key=str.lower) 163 - 164 - merged_emails: list[str] = [] 165 - seen_emails: set[str] = set() 166 - for values in ( 167 - target_entity.get("emails", []), 168 - source_entity.get("emails", []), 169 - ): 170 - if not isinstance(values, list): 171 - continue 172 - for value in values: 173 - if not value: 174 - continue 175 - email = str(value) 176 - key = email.lower() 177 - if key in seen_emails: 178 - continue 179 - seen_emails.add(key) 180 - merged_emails.append(email) 181 - if merged_emails: 182 - target_entity["emails"] = merged_emails 183 - 184 - if not dry_run: 185 - save_journal_entity(target_entity) 186 - summary.entities_merged += 1 187 - target_entities[target_id] = target_entity 188 - except Exception as exc: 189 - summary.errors.append(f"entity {entity_dir.name}: {exc}") 190 - 191 - 192 - def _merge_facets( 193 - source: Path, target: Path, summary: MergeSummary, dry_run: bool 194 - ) -> None: 195 - source_facets_dir = source / "facets" 196 - if not source_facets_dir.is_dir(): 197 - return 198 - 199 - for source_facet_dir in sorted(source_facets_dir.iterdir()): 200 - facet_json = source_facet_dir / "facet.json" 201 - if not source_facet_dir.is_dir() or not facet_json.is_file(): 202 - continue 203 - 204 - facet_name = source_facet_dir.name 205 - target_facet_dir = target / "facets" / facet_name 206 - 207 - try: 208 - typer.echo(f" Facet {facet_name}", err=True) 209 - if not target_facet_dir.exists(): 210 - if not dry_run: 211 - shutil.copytree( 212 - source_facet_dir, 213 - target_facet_dir, 214 - copy_function=shutil.copy2, 215 - ) 216 - summary.facets_created += 1 217 - continue 218 - 219 - _merge_overlapping_facet( 220 - facet_name, 221 - source_facet_dir, 222 - target_facet_dir, 223 - summary, 224 - dry_run, 225 - ) 226 - summary.facets_merged += 1 227 - except Exception as exc: 228 - summary.errors.append(f"facet {facet_name}: {exc}") 229 - 230 - 231 - def _merge_overlapping_facet( 232 - facet_name: str, 233 - source_facet_dir: Path, 234 - target_facet_dir: Path, 235 - summary: MergeSummary, 236 - dry_run: bool, 237 - ) -> None: 238 - source_entities_dir = source_facet_dir / "entities" 239 - if source_entities_dir.is_dir(): 240 - for source_entity_dir in sorted(source_entities_dir.iterdir()): 241 - source_entity_json = source_entity_dir / "entity.json" 242 - if not source_entity_dir.is_dir() or not source_entity_json.is_file(): 243 - continue 244 - 245 - entity_id = source_entity_dir.name 246 - target_entity_dir = target_facet_dir / "entities" / entity_id 247 - try: 248 - if target_entity_dir.exists(): 249 - source_relationship = json.loads( 250 - source_entity_json.read_text(encoding="utf-8") 251 - ) 252 - target_relationship_path = target_entity_dir / "entity.json" 253 - target_relationship: dict[str, Any] = {} 254 - if target_relationship_path.is_file(): 255 - target_relationship = json.loads( 256 - target_relationship_path.read_text(encoding="utf-8") 257 - ) 258 - merged_relationship = {**source_relationship, **target_relationship} 259 - 260 - source_observations = _read_jsonl( 261 - source_entity_dir / "observations.jsonl" 262 - ) 263 - target_observations = _read_jsonl( 264 - target_entity_dir / "observations.jsonl" 265 - ) 266 - seen = { 267 - (item.get("content", ""), item.get("observed_at")) 268 - for item in target_observations 269 - } 270 - merged_observations = list(target_observations) 271 - for item in source_observations: 272 - key = (item.get("content", ""), item.get("observed_at")) 273 - if key in seen: 274 - continue 275 - seen.add(key) 276 - merged_observations.append(item) 277 - 278 - if not dry_run: 279 - save_facet_relationship( 280 - facet_name, entity_id, merged_relationship 281 - ) 282 - save_observations(facet_name, entity_id, merged_observations) 283 - continue 284 - 285 - if not dry_run: 286 - shutil.copytree( 287 - source_entity_dir, 288 - target_entity_dir, 289 - copy_function=shutil.copy2, 290 - ) 291 - except Exception as exc: 292 - summary.errors.append(f"facet {facet_name} entity {entity_id}: {exc}") 293 - 294 - source_todos_dir = source_facet_dir / "todos" 295 - if source_todos_dir.is_dir(): 296 - for source_todo_file in sorted(source_todos_dir.glob("*.jsonl")): 297 - try: 298 - target_todo_file = target_facet_dir / "todos" / source_todo_file.name 299 - target_items = _read_jsonl(target_todo_file) 300 - seen = {(item["text"], item.get("created_at")) for item in target_items} 301 - new_items = [ 302 - item 303 - for item in _read_jsonl(source_todo_file) 304 - if (item["text"], item.get("created_at")) not in seen 305 - ] 306 - if new_items and not dry_run: 307 - _append_jsonl(target_todo_file, new_items) 308 - except Exception as exc: 309 - summary.errors.append( 310 - f"facet {facet_name} todo {source_todo_file.name}: {exc}" 311 - ) 312 - 313 - source_calendar_dir = source_facet_dir / "calendar" 314 - if source_calendar_dir.is_dir(): 315 - for source_calendar_file in sorted(source_calendar_dir.glob("*.jsonl")): 316 - try: 317 - target_calendar_file = ( 318 - target_facet_dir / "calendar" / source_calendar_file.name 319 - ) 320 - target_items = _read_jsonl(target_calendar_file) 321 - seen = {(item["title"], item.get("start")) for item in target_items} 322 - new_items = [ 323 - item 324 - for item in _read_jsonl(source_calendar_file) 325 - if (item["title"], item.get("start")) not in seen 326 - ] 327 - if new_items and not dry_run: 328 - _append_jsonl(target_calendar_file, new_items) 329 - except Exception as exc: 330 - summary.errors.append( 331 - f"facet {facet_name} calendar {source_calendar_file.name}: {exc}" 332 - ) 333 - 334 - source_news_dir = source_facet_dir / "news" 335 - if source_news_dir.is_dir(): 336 - target_news_dir = target_facet_dir / "news" 337 - for source_news_file in sorted(source_news_dir.glob("*.md")): 338 - try: 339 - target_news_file = target_news_dir / source_news_file.name 340 - if target_news_file.exists(): 341 - continue 342 - if not dry_run: 343 - target_news_dir.mkdir(parents=True, exist_ok=True) 344 - shutil.copy2(source_news_file, target_news_file) 345 - except Exception as exc: 346 - summary.errors.append( 347 - f"facet {facet_name} news {source_news_file.name}: {exc}" 348 - ) 349 - 350 - 351 - def _merge_imports( 352 - source: Path, target: Path, summary: MergeSummary, dry_run: bool 353 - ) -> None: 354 - source_imports_dir = source / "imports" 355 - if not source_imports_dir.is_dir(): 356 - return 357 - 358 - for source_import_dir in sorted(source_imports_dir.iterdir()): 359 - if not source_import_dir.is_dir(): 360 - continue 361 - 362 - target_import_dir = target / "imports" / source_import_dir.name 363 - try: 364 - typer.echo(f" Import {source_import_dir.name}", err=True) 365 - if target_import_dir.exists(): 366 - summary.imports_skipped += 1 367 - continue 368 - if not dry_run: 369 - shutil.copytree( 370 - source_import_dir, 371 - target_import_dir, 372 - copy_function=shutil.copy2, 373 - ) 374 - summary.imports_copied += 1 375 - except Exception as exc: 376 - summary.errors.append(f"import {source_import_dir.name}: {exc}") 377 - 378 - 379 - def _read_jsonl(path: Path) -> list[dict[str, Any]]: 380 - if not path.is_file(): 381 - return [] 382 - 383 - items: list[dict[str, Any]] = [] 384 - with open(path, encoding="utf-8") as handle: 385 - for line in handle: 386 - line = line.strip() 387 - if not line: 388 - continue 389 - items.append(json.loads(line)) 390 - return items 391 - 392 - 393 - def _append_jsonl(path: Path, items: list[dict[str, Any]]) -> None: 394 - if not items: 395 - return 396 - 397 - path.parent.mkdir(parents=True, exist_ok=True) 398 - with open(path, "a", encoding="utf-8") as handle: 399 - for item in items: 400 - handle.write(json.dumps(item, ensure_ascii=False) + "\n") 11 + __all__ = ["MergeSummary", "merge_journals"]