personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add import metadata and config ingest/export endpoints

Complete the five-area journal export/import system by adding support for
import metadata (hash-based dedup) and config (always staged with diff).

- POST /ingest/imports: receives import metadata, deduplicates by content
hash, copies new imports, stages ID collisions
- POST /ingest/config: receives config snapshot, computes categorized
field-by-field diff (identity/preference), always stages with
source_config.json + diff.json, never auto-merges
- sol export --only imports: scans journal/imports/ for metadata dirs,
excludes sync state files (via SYNCABLE_REGISTRY) and source state dirs,
sends delta based on manifest
- sol export --only config: strips never-transfer fields, sends snapshot
- Both areas added to default export sequence (after facets)
- Dry-run, state tracking, and decision logs for both areas

+1459 -24
+315 -1
apps/import/ingest.py
··· 5 5 6 6 from __future__ import annotations 7 7 8 - import json 9 8 import hashlib 9 + import json 10 10 import logging 11 11 import os 12 12 import re 13 13 import tempfile 14 14 from datetime import datetime, timezone 15 15 from pathlib import Path 16 + from typing import Any 16 17 17 18 from flask import abort, g, jsonify, request 18 19 from werkzeug.utils import secure_filename ··· 43 44 _SEGMENT_RE = re.compile(r"^\d{6}_\d+$") 44 45 _STREAM_RE = re.compile(r"^[a-z0-9][a-z0-9._-]*$") 45 46 _FACET_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]*$") 47 + _IMPORT_ID_RE = re.compile(r"^\d{8}_\d{6}$") 48 + 49 + _NEVER_TRANSFER_PATHS = frozenset( 50 + { 51 + "convey.password_hash", 52 + "convey.secret", 53 + "setup.completed_at", 54 + "providers.auth", 55 + "providers.key_validation", 56 + "transcribe.whisper.device", 57 + } 58 + ) 59 + _NEVER_TRANSFER_PREFIXES = ("env.",) 60 + _IDENTITY_PATHS = frozenset( 61 + { 62 + "identity.name", 63 + "identity.preferred", 64 + "identity.bio", 65 + "identity.pronouns", 66 + "identity.aliases", 67 + "identity.email_addresses", 68 + "identity.timezone", 69 + } 70 + ) 46 71 47 72 48 73 def _append_decision(log_path: Path, entry: dict) -> None: ··· 61 86 except Exception: 62 87 Path(tmp_path).unlink(missing_ok=True) 63 88 raise 89 + 90 + 91 + def _flatten_config(cfg: dict, prefix: str = "") -> dict[str, Any]: 92 + """Flatten a nested config dict to dot-separated paths.""" 93 + result: dict[str, Any] = {} 94 + for key, value in cfg.items(): 95 + path = f"{prefix}{key}" if prefix else key 96 + if isinstance(value, dict): 97 + result.update(_flatten_config(value, f"{path}.")) 98 + else: 99 + result[path] = value 100 + return result 101 + 102 + 103 + def _is_never_transfer(path: str) -> bool: 104 + if path in _NEVER_TRANSFER_PATHS: 105 + return True 106 + return any(path.startswith(prefix) for prefix in _NEVER_TRANSFER_PREFIXES) 107 + 108 + 109 + def _categorize_field(path: str) -> str: 110 + """Return category for a config field path.""" 111 + if path in _IDENTITY_PATHS: 112 + return "transferable" 113 + return "preference" 64 114 65 115 66 116 from .facet_ingest import process_facet ··· 701 751 "errors": errors, 702 752 } 703 753 ) 754 + 755 + @bp.route("/journal/<key_prefix>/ingest/imports", methods=["POST"]) 756 + @require_journal_source 757 + def ingest_imports(key_prefix: str): 758 + if g.journal_source["key"][:8] != key_prefix: 759 + abort(403, description="Key prefix mismatch") 760 + 761 + payload = request.get_json(silent=True) 762 + if not isinstance(payload, dict): 763 + return jsonify({"error": "Invalid JSON body"}), 400 764 + 765 + imports = payload.get("imports") 766 + if not isinstance(imports, list): 767 + return jsonify({"error": "Missing imports array"}), 400 768 + 769 + state_dir = get_state_directory(key_prefix) 770 + log_path = state_dir / "imports" / "log.jsonl" 771 + state_path = state_dir / "imports" / "state.json" 772 + staged_dir = state_dir / "imports" / "staged" 773 + 774 + try: 775 + imports_state = json.loads(state_path.read_text(encoding="utf-8")) 776 + except (OSError, json.JSONDecodeError): 777 + imports_state = {} 778 + if not isinstance(imports_state, dict): 779 + imports_state = {} 780 + received = imports_state.get("received") 781 + if not isinstance(received, dict): 782 + received = {} 783 + imports_state = {"received": dict(received)} 784 + 785 + journal_root = Path(state.journal_root) 786 + 787 + copied = 0 788 + skipped = 0 789 + staged = 0 790 + errors: list[dict[str, str]] = [] 791 + 792 + for item in imports: 793 + try: 794 + if not isinstance(item, dict): 795 + raise ValueError("Import item must be an object") 796 + 797 + import_id = str(item.get("id", "")).strip() 798 + if not import_id or not _IMPORT_ID_RE.match(import_id): 799 + raise ValueError(f"Invalid import id: {import_id!r}") 800 + 801 + import_json = item.get("import_json") 802 + imported_json = item.get("imported_json") 803 + content_manifest = item.get("content_manifest") 804 + 805 + if not isinstance(import_json, dict): 806 + raise ValueError("import_json must be an object") 807 + if not isinstance(imported_json, dict): 808 + raise ValueError("imported_json must be an object") 809 + if not isinstance(content_manifest, list): 810 + raise ValueError("content_manifest must be an array") 811 + 812 + hash_input = json.dumps( 813 + { 814 + "import_json": import_json, 815 + "imported_json": imported_json, 816 + "content_manifest": content_manifest, 817 + }, 818 + sort_keys=True, 819 + ensure_ascii=False, 820 + ).encode() 821 + content_hash = hashlib.sha256(hash_input).hexdigest() 822 + 823 + if imports_state["received"].get(import_id) == content_hash: 824 + skipped += 1 825 + _append_decision( 826 + log_path, 827 + { 828 + "ts": datetime.now(timezone.utc).isoformat(), 829 + "action": "skipped", 830 + "item_type": "import", 831 + "item_id": import_id, 832 + "reason": "idempotent", 833 + }, 834 + ) 835 + continue 836 + 837 + target_dir = journal_root / "imports" / import_id 838 + if target_dir.is_dir(): 839 + staged_dir.mkdir(parents=True, exist_ok=True) 840 + staged_payload = { 841 + "import_id": import_id, 842 + "import_json": import_json, 843 + "imported_json": imported_json, 844 + "content_manifest": content_manifest, 845 + "reason": "id_collision", 846 + "staged_at": datetime.now(timezone.utc).isoformat(), 847 + } 848 + (staged_dir / f"{import_id}.json").write_text( 849 + json.dumps(staged_payload, indent=2, ensure_ascii=False) + "\n", 850 + encoding="utf-8", 851 + ) 852 + staged += 1 853 + _append_decision( 854 + log_path, 855 + { 856 + "ts": datetime.now(timezone.utc).isoformat(), 857 + "action": "staged", 858 + "item_type": "import", 859 + "item_id": import_id, 860 + "reason": "id_collision", 861 + }, 862 + ) 863 + else: 864 + target_dir.mkdir(parents=True, exist_ok=True) 865 + (target_dir / "import.json").write_text( 866 + json.dumps(import_json, indent=2, ensure_ascii=False) + "\n", 867 + encoding="utf-8", 868 + ) 869 + (target_dir / "imported.json").write_text( 870 + json.dumps(imported_json, indent=2, ensure_ascii=False) + "\n", 871 + encoding="utf-8", 872 + ) 873 + lines = [ 874 + json.dumps(entry, ensure_ascii=False) 875 + for entry in content_manifest 876 + ] 877 + (target_dir / "content_manifest.jsonl").write_text( 878 + "\n".join(lines) + "\n" if lines else "", 879 + encoding="utf-8", 880 + ) 881 + copied += 1 882 + _append_decision( 883 + log_path, 884 + { 885 + "ts": datetime.now(timezone.utc).isoformat(), 886 + "action": "copied", 887 + "item_type": "import", 888 + "item_id": import_id, 889 + "reason": "new", 890 + }, 891 + ) 892 + 893 + imports_state["received"][import_id] = content_hash 894 + except Exception as exc: 895 + import_id_str = item.get("id", "") if isinstance(item, dict) else "" 896 + errors.append({"import_id": str(import_id_str), "error": str(exc)}) 897 + 898 + _write_state_atomic(state_path, imports_state) 899 + 900 + if copied > 0: 901 + source = g.journal_source 902 + source.setdefault("stats", {}) 903 + source["stats"]["imports_received"] = ( 904 + source["stats"].get("imports_received", 0) + copied 905 + ) 906 + save_journal_source(source) 907 + 908 + return jsonify( 909 + { 910 + "copied": copied, 911 + "skipped": skipped, 912 + "staged": staged, 913 + "errors": errors, 914 + } 915 + ) 916 + 917 + @bp.route("/journal/<key_prefix>/ingest/config", methods=["POST"]) 918 + @require_journal_source 919 + def ingest_config(key_prefix: str): 920 + if g.journal_source["key"][:8] != key_prefix: 921 + abort(403, description="Key prefix mismatch") 922 + 923 + payload = request.get_json(silent=True) 924 + if not isinstance(payload, dict): 925 + return jsonify({"error": "Invalid JSON body"}), 400 926 + 927 + source_config = payload.get("config") 928 + if not isinstance(source_config, dict): 929 + return jsonify({"error": "Missing config object"}), 400 930 + 931 + state_dir = get_state_directory(key_prefix) 932 + log_path = state_dir / "config" / "log.jsonl" 933 + state_path = state_dir / "config" / "state.json" 934 + config_dir = state_dir / "config" 935 + 936 + try: 937 + config_state = json.loads(state_path.read_text(encoding="utf-8")) 938 + except (OSError, json.JSONDecodeError): 939 + config_state = {} 940 + if not isinstance(config_state, dict): 941 + config_state = {} 942 + 943 + content_hash = hashlib.sha256( 944 + json.dumps(source_config, sort_keys=True, ensure_ascii=False).encode() 945 + ).hexdigest() 946 + 947 + if config_state.get("last_hash") == content_hash: 948 + _append_decision( 949 + log_path, 950 + { 951 + "ts": datetime.now(timezone.utc).isoformat(), 952 + "action": "skipped", 953 + "item_type": "config", 954 + "item_id": "journal.json", 955 + "reason": "idempotent", 956 + }, 957 + ) 958 + return jsonify({"staged": False, "skipped": True, "reason": "idempotent"}) 959 + 960 + from think.utils import get_config 961 + 962 + target_config = get_config() 963 + source_flat = _flatten_config(source_config) 964 + target_flat = _flatten_config(target_config) 965 + 966 + all_keys = sorted(set(source_flat) | set(target_flat)) 967 + diff = {} 968 + for key in all_keys: 969 + if _is_never_transfer(key): 970 + continue 971 + source_val = source_flat.get(key) 972 + target_val = target_flat.get(key) 973 + if source_val != target_val: 974 + diff[key] = { 975 + "source": source_val, 976 + "target": target_val, 977 + "category": _categorize_field(key), 978 + } 979 + 980 + config_dir.mkdir(parents=True, exist_ok=True) 981 + (config_dir / "source_config.json").write_text( 982 + json.dumps(source_config, indent=2, ensure_ascii=False) + "\n", 983 + encoding="utf-8", 984 + ) 985 + (config_dir / "diff.json").write_text( 986 + json.dumps(diff, indent=2, ensure_ascii=False) + "\n", 987 + encoding="utf-8", 988 + ) 989 + 990 + config_state["last_hash"] = content_hash 991 + _write_state_atomic(state_path, config_state) 992 + 993 + _append_decision( 994 + log_path, 995 + { 996 + "ts": datetime.now(timezone.utc).isoformat(), 997 + "action": "staged", 998 + "item_type": "config", 999 + "item_id": "journal.json", 1000 + "reason": "config_received", 1001 + }, 1002 + ) 1003 + 1004 + source = g.journal_source 1005 + source.setdefault("stats", {}) 1006 + source["stats"]["config_received"] = ( 1007 + source["stats"].get("config_received", 0) + 1 1008 + ) 1009 + save_journal_source(source) 1010 + 1011 + return jsonify( 1012 + { 1013 + "staged": True, 1014 + "skipped": False, 1015 + "diff_fields": len(diff), 1016 + } 1017 + )
+270 -2
observe/export.py
··· 27 27 _normalize_url, 28 28 _parse_day_spec, 29 29 ) 30 - from think.utils import get_journal, iter_segments, setup_cli 31 30 from think.entities.journal import load_all_journal_entities 31 + from think.importers.sync import SYNCABLE_REGISTRY 32 + from think.utils import get_config, get_journal, iter_segments, setup_cli 32 33 33 34 logger = logging.getLogger(__name__) 34 35 ··· 37 38 _DAY_RE = re.compile(r"^\d{8}$") 38 39 _DAY_JSONL_RE = re.compile(r"^\d{8}\.jsonl$") 39 40 _DAY_MD_RE = re.compile(r"^\d{8}\.md$") 41 + _IMPORT_ID_RE = re.compile(r"^\d{8}_\d{6}$") 42 + _NEVER_TRANSFER_PATHS = frozenset( 43 + { 44 + "convey.password_hash", 45 + "convey.secret", 46 + "setup.completed_at", 47 + "providers.auth", 48 + "providers.key_validation", 49 + "transcribe.whisper.device", 50 + } 51 + ) 40 52 41 53 42 54 def _query_manifest( ··· 188 200 return "logs" 189 201 190 202 return None 203 + 204 + 205 + def _strip_never_transfer(config: dict) -> dict: 206 + """Return a deep copy of config with never-transfer fields removed.""" 207 + import copy as _copy 208 + 209 + result = _copy.deepcopy(config) 210 + for path in _NEVER_TRANSFER_PATHS: 211 + parts = path.split(".") 212 + obj = result 213 + for part in parts[:-1]: 214 + if isinstance(obj, dict) and part in obj: 215 + obj = obj[part] 216 + else: 217 + break 218 + else: 219 + if isinstance(obj, dict): 220 + obj.pop(parts[-1], None) 221 + result.pop("env", None) 222 + return result 191 223 192 224 193 225 def export_segments(base_url: str, key: str, days: list[str], dry_run: bool) -> None: ··· 613 645 session.close() 614 646 615 647 648 + def export_imports(base_url: str, key: str, dry_run: bool) -> None: 649 + """Export import metadata to a remote solstone instance.""" 650 + session = requests.Session() 651 + session.headers["Authorization"] = f"Bearer {key}" 652 + 653 + try: 654 + try: 655 + remote_manifest = _query_manifest(session, base_url, key, area="imports") 656 + except requests.ConnectionError: 657 + print(f"Connection failed: could not reach {base_url}") 658 + return 659 + except ValueError as e: 660 + print(str(e)) 661 + return 662 + 663 + received = remote_manifest.get("received", {}) 664 + 665 + journal_root = Path(get_journal()) 666 + imports_dir = journal_root / "imports" 667 + if not imports_dir.is_dir(): 668 + print("No imports directory found") 669 + return 670 + 671 + sync_state_names = {f"{name}.json" for name in SYNCABLE_REGISTRY} 672 + 673 + to_send = [] 674 + new_count = 0 675 + changed_count = 0 676 + unchanged_count = 0 677 + 678 + for entry in sorted(imports_dir.iterdir()): 679 + if entry.is_file() and entry.name in sync_state_names: 680 + continue 681 + if not entry.is_dir(): 682 + continue 683 + if not _IMPORT_ID_RE.match(entry.name): 684 + continue 685 + 686 + import_id = entry.name 687 + import_json_path = entry / "import.json" 688 + imported_json_path = entry / "imported.json" 689 + manifest_path = entry / "content_manifest.jsonl" 690 + 691 + if not import_json_path.exists() or not imported_json_path.exists(): 692 + continue 693 + 694 + try: 695 + import_json = json.loads(import_json_path.read_text(encoding="utf-8")) 696 + imported_json = json.loads( 697 + imported_json_path.read_text(encoding="utf-8") 698 + ) 699 + content_manifest = [] 700 + if manifest_path.exists(): 701 + for line in manifest_path.read_text(encoding="utf-8").splitlines(): 702 + line = line.strip() 703 + if line: 704 + content_manifest.append(json.loads(line)) 705 + except (json.JSONDecodeError, OSError) as exc: 706 + logger.warning("Failed to read import %s: %s", import_id, exc) 707 + continue 708 + 709 + hash_input = json.dumps( 710 + { 711 + "import_json": import_json, 712 + "imported_json": imported_json, 713 + "content_manifest": content_manifest, 714 + }, 715 + sort_keys=True, 716 + ensure_ascii=False, 717 + ).encode() 718 + content_hash = hashlib.sha256(hash_input).hexdigest() 719 + 720 + if received.get(import_id) == content_hash: 721 + unchanged_count += 1 722 + continue 723 + 724 + if import_id in received: 725 + changed_count += 1 726 + else: 727 + new_count += 1 728 + 729 + to_send.append( 730 + { 731 + "id": import_id, 732 + "import_json": import_json, 733 + "imported_json": imported_json, 734 + "content_manifest": content_manifest, 735 + } 736 + ) 737 + 738 + if dry_run: 739 + print( 740 + f"Dry run: {new_count} new, {changed_count} changed, " 741 + f"{unchanged_count} unchanged" 742 + ) 743 + return 744 + 745 + if not to_send: 746 + print("Nothing to send - remote imports are up to date") 747 + return 748 + 749 + key_prefix = key[:8] 750 + url = f"{base_url}/app/import/journal/{key_prefix}/ingest/imports" 751 + for attempt, delay in enumerate(RETRY_BACKOFF): 752 + try: 753 + response = session.post( 754 + url, json={"imports": to_send}, timeout=UPLOAD_TIMEOUT 755 + ) 756 + if response.status_code == 200: 757 + break 758 + if response.status_code == 401: 759 + print("Authentication failed: invalid or missing API key") 760 + return 761 + if response.status_code == 403: 762 + print("Authentication failed: journal source revoked or disabled") 763 + return 764 + if 500 <= response.status_code <= 599: 765 + logger.warning( 766 + "Import upload attempt %s failed: %s %s", 767 + attempt + 1, 768 + response.status_code, 769 + response.text, 770 + ) 771 + else: 772 + print( 773 + f"Import upload failed: {response.status_code} {response.text}" 774 + ) 775 + return 776 + except (requests.RequestException, OSError) as e: 777 + logger.warning("Import upload attempt %s failed: %s", attempt + 1, e) 778 + if attempt < len(RETRY_BACKOFF) - 1: 779 + time.sleep(delay) 780 + else: 781 + print("Import upload failed after all retries") 782 + return 783 + 784 + result = response.json() 785 + errors = result.get("errors", []) 786 + if errors: 787 + for err in errors: 788 + print(f" Error: {err}") 789 + print( 790 + f"\nExport complete: {result.get('copied', 0)} copied, " 791 + f"{result.get('staged', 0)} staged, " 792 + f"{result.get('skipped', 0)} skipped" 793 + ) 794 + if errors: 795 + print(f" {len(errors)} error(s)") 796 + finally: 797 + session.close() 798 + 799 + 800 + def export_config(base_url: str, key: str, dry_run: bool) -> None: 801 + """Export config snapshot to a remote solstone instance.""" 802 + session = requests.Session() 803 + session.headers["Authorization"] = f"Bearer {key}" 804 + 805 + try: 806 + try: 807 + remote_manifest = _query_manifest(session, base_url, key, area="config") 808 + except requests.ConnectionError: 809 + print(f"Connection failed: could not reach {base_url}") 810 + return 811 + except ValueError as e: 812 + print(str(e)) 813 + return 814 + 815 + config = _strip_never_transfer(get_config()) 816 + content_hash = hashlib.sha256( 817 + json.dumps(config, sort_keys=True, ensure_ascii=False).encode() 818 + ).hexdigest() 819 + 820 + if remote_manifest.get("last_hash") == content_hash: 821 + print("Nothing to send - remote config is up to date") 822 + return 823 + 824 + if dry_run: 825 + print("Dry run: config has changed, would send snapshot") 826 + return 827 + 828 + key_prefix = key[:8] 829 + url = f"{base_url}/app/import/journal/{key_prefix}/ingest/config" 830 + for attempt, delay in enumerate(RETRY_BACKOFF): 831 + try: 832 + response = session.post( 833 + url, json={"config": config}, timeout=UPLOAD_TIMEOUT 834 + ) 835 + if response.status_code == 200: 836 + break 837 + if response.status_code == 401: 838 + print("Authentication failed: invalid or missing API key") 839 + return 840 + if response.status_code == 403: 841 + print("Authentication failed: journal source revoked or disabled") 842 + return 843 + if 500 <= response.status_code <= 599: 844 + logger.warning( 845 + "Config upload attempt %s failed: %s %s", 846 + attempt + 1, 847 + response.status_code, 848 + response.text, 849 + ) 850 + else: 851 + print( 852 + f"Config upload failed: {response.status_code} {response.text}" 853 + ) 854 + return 855 + except (requests.RequestException, OSError) as e: 856 + logger.warning("Config upload attempt %s failed: %s", attempt + 1, e) 857 + if attempt < len(RETRY_BACKOFF) - 1: 858 + time.sleep(delay) 859 + else: 860 + print("Config upload failed after all retries") 861 + return 862 + 863 + result = response.json() 864 + if result.get("staged"): 865 + print( 866 + f"\nExport complete: config staged ({result.get('diff_fields', 0)} fields differ)" 867 + ) 868 + elif result.get("skipped"): 869 + print("Nothing to send - remote config is up to date") 870 + finally: 871 + session.close() 872 + 873 + 616 874 def main() -> None: 617 875 parser = argparse.ArgumentParser( 618 876 description="Export journal data to a remote solstone instance" ··· 630 888 parser.add_argument( 631 889 "--only", 632 890 default=None, 633 - help="Export only specific area (segments, entities, facets)", 891 + help="Export only specific area (segments, entities, facets, imports, config)", 634 892 ) 635 893 parser.add_argument( 636 894 "--dry-run", ··· 654 912 export_facets(base_url, args.key, args.dry_run) 655 913 return 656 914 915 + if args.only == "imports": 916 + export_imports(base_url, args.key, args.dry_run) 917 + return 918 + 919 + if args.only == "config": 920 + export_config(base_url, args.key, args.dry_run) 921 + return 922 + 657 923 if args.only is not None and args.only != "segments": 658 924 print(f"Export of '{args.only}' is not yet implemented") 659 925 sys.exit(0) ··· 663 929 if args.only is None: 664 930 export_entities(base_url, args.key, args.dry_run) 665 931 export_facets(base_url, args.key, args.dry_run) 932 + export_imports(base_url, args.key, args.dry_run) 933 + export_config(base_url, args.key, args.dry_run)
+243
tests/test_config_ingest.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import json 7 + from importlib import import_module 8 + 9 + import pytest 10 + from flask import Blueprint, Flask 11 + 12 + import convey.state 13 + import think.utils 14 + 15 + journal_sources = import_module("apps.import.journal_sources") 16 + ingest = import_module("apps.import.ingest") 17 + 18 + create_state_directory = journal_sources.create_state_directory 19 + generate_key = journal_sources.generate_key 20 + get_state_directory = journal_sources.get_state_directory 21 + load_journal_source = journal_sources.load_journal_source 22 + save_journal_source = journal_sources.save_journal_source 23 + register_ingest_routes = ingest.register_ingest_routes 24 + 25 + 26 + @pytest.fixture 27 + def journal_env(tmp_path, monkeypatch): 28 + monkeypatch.setattr(convey.state, "journal_root", str(tmp_path), raising=False) 29 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 30 + think.utils._journal_path_cache = None 31 + (tmp_path / "apps" / "import" / "journal_sources").mkdir( 32 + parents=True, exist_ok=True 33 + ) 34 + return tmp_path 35 + 36 + 37 + def _source(name="test-source", key=None, **overrides): 38 + if key is None: 39 + key = generate_key() 40 + source = { 41 + "name": name, 42 + "key": key, 43 + "created_at": 1000, 44 + "enabled": True, 45 + "revoked": False, 46 + "revoked_at": None, 47 + "stats": { 48 + "segments_received": 0, 49 + "entities_received": 0, 50 + "facets_received": 0, 51 + "imports_received": 0, 52 + "config_received": 0, 53 + }, 54 + } 55 + source.update(overrides) 56 + return source 57 + 58 + 59 + @pytest.fixture 60 + def ingest_env(journal_env): 61 + key = generate_key() 62 + source = _source(key=key) 63 + save_journal_source(source) 64 + key_prefix = key[:8] 65 + create_state_directory(journal_env, key_prefix) 66 + 67 + app = Flask(__name__) 68 + app.config["TESTING"] = True 69 + bp = Blueprint("import-test", __name__, url_prefix="/app/import") 70 + register_ingest_routes(bp) 71 + app.register_blueprint(bp) 72 + 73 + return { 74 + "root": journal_env, 75 + "key": key, 76 + "key_prefix": key_prefix, 77 + "source": source, 78 + "client": app.test_client(), 79 + } 80 + 81 + 82 + def _post_config(client, key, key_prefix, config): 83 + return client.post( 84 + f"/app/import/journal/{key_prefix}/ingest/config", 85 + headers={"Authorization": f"Bearer {key}"}, 86 + json={"config": config}, 87 + ) 88 + 89 + 90 + def _sample_config(): 91 + return { 92 + "identity": {"name": "Remote User", "preferred": "Remote", "timezone": "UTC"}, 93 + "convey": { 94 + "password_hash": "secret_hash", 95 + "secret": "secret_value", 96 + "trust_localhost": True, 97 + }, 98 + "setup": {"completed_at": 12345}, 99 + "env": {"API_KEY": "xyz"}, 100 + "retention": {"days": 30}, 101 + } 102 + 103 + 104 + def _write_target_config(root, config): 105 + config_dir = root / "config" 106 + config_dir.mkdir(parents=True, exist_ok=True) 107 + (config_dir / "journal.json").write_text( 108 + json.dumps(config, ensure_ascii=False), encoding="utf-8" 109 + ) 110 + think.utils._journal_path_cache = None 111 + 112 + 113 + def _read_json(path): 114 + return json.loads(path.read_text(encoding="utf-8")) 115 + 116 + 117 + def _read_log(key_prefix): 118 + log_path = get_state_directory(key_prefix) / "config" / "log.jsonl" 119 + if not log_path.exists(): 120 + return [] 121 + return [ 122 + json.loads(line) 123 + for line in log_path.read_text(encoding="utf-8").splitlines() 124 + if line.strip() 125 + ] 126 + 127 + 128 + def test_auth_required(ingest_env): 129 + env = ingest_env 130 + response = env["client"].post( 131 + f"/app/import/journal/{env['key_prefix']}/ingest/config", 132 + json={"config": {}}, 133 + ) 134 + assert response.status_code == 401 135 + 136 + 137 + def test_key_prefix_mismatch(ingest_env): 138 + env = ingest_env 139 + response = env["client"].post( 140 + "/app/import/journal/deadbeef/ingest/config", 141 + headers={"Authorization": f"Bearer {env['key']}"}, 142 + json={"config": {}}, 143 + ) 144 + assert response.status_code == 403 145 + 146 + 147 + def test_invalid_json(ingest_env): 148 + env = ingest_env 149 + response = env["client"].post( 150 + f"/app/import/journal/{env['key_prefix']}/ingest/config", 151 + headers={"Authorization": f"Bearer {env['key']}"}, 152 + data="not-json", 153 + content_type="application/json", 154 + ) 155 + assert response.status_code == 400 156 + assert response.get_json() == {"error": "Invalid JSON body"} 157 + 158 + 159 + def test_missing_config(ingest_env): 160 + env = ingest_env 161 + response = env["client"].post( 162 + f"/app/import/journal/{env['key_prefix']}/ingest/config", 163 + headers={"Authorization": f"Bearer {env['key']}"}, 164 + json={}, 165 + ) 166 + assert response.status_code == 400 167 + assert response.get_json() == {"error": "Missing config object"} 168 + 169 + 170 + def test_config_staged(ingest_env): 171 + env = ingest_env 172 + target_config = {"identity": {"name": "Local User"}, "retention": {"days": 90}} 173 + _write_target_config(env["root"], target_config) 174 + config = _sample_config() 175 + 176 + response = _post_config(env["client"], env["key"], env["key_prefix"], config) 177 + body = response.get_json() 178 + state_dir = get_state_directory(env["key_prefix"]) / "config" 179 + source = load_journal_source(env["key"]) 180 + 181 + assert response.status_code == 200 182 + assert body == {"staged": True, "skipped": False, "diff_fields": 5} 183 + assert (state_dir / "source_config.json").exists() 184 + assert (state_dir / "diff.json").exists() 185 + assert "last_hash" in _read_json(state_dir / "state.json") 186 + assert _read_log(env["key_prefix"])[0]["action"] == "staged" 187 + assert source["stats"]["config_received"] == 1 188 + 189 + 190 + def test_diff_categorization(ingest_env): 191 + env = ingest_env 192 + target_config = {"identity": {"name": "Local User"}, "retention": {"days": 90}} 193 + _write_target_config(env["root"], target_config) 194 + config = {"identity": {"name": "Remote User"}, "retention": {"days": 30}} 195 + 196 + response = _post_config(env["client"], env["key"], env["key_prefix"], config) 197 + diff = _read_json(get_state_directory(env["key_prefix"]) / "config" / "diff.json") 198 + 199 + assert response.status_code == 200 200 + assert diff["identity.name"]["category"] == "transferable" 201 + assert diff["retention.days"]["category"] == "preference" 202 + 203 + 204 + def test_never_transfer_excluded(ingest_env): 205 + env = ingest_env 206 + _write_target_config(env["root"], {"identity": {"name": "Local User"}}) 207 + config = _sample_config() 208 + 209 + response = _post_config(env["client"], env["key"], env["key_prefix"], config) 210 + diff = _read_json(get_state_directory(env["key_prefix"]) / "config" / "diff.json") 211 + 212 + assert response.status_code == 200 213 + assert "convey.password_hash" not in diff 214 + assert "convey.secret" not in diff 215 + assert not any(key.startswith("env.") for key in diff) 216 + 217 + 218 + def test_idempotent(ingest_env): 219 + env = ingest_env 220 + _write_target_config(env["root"], {"identity": {"name": "Local User"}}) 221 + config = _sample_config() 222 + 223 + first = _post_config(env["client"], env["key"], env["key_prefix"], config) 224 + second = _post_config(env["client"], env["key"], env["key_prefix"], config) 225 + 226 + assert first.status_code == 200 227 + assert second.status_code == 200 228 + assert second.get_json() == { 229 + "staged": False, 230 + "skipped": True, 231 + "reason": "idempotent", 232 + } 233 + 234 + 235 + def test_config_always_staged(ingest_env): 236 + env = ingest_env 237 + _write_target_config(env["root"], {"identity": {"name": "Local User"}}) 238 + config = {"identity": {"name": "Remote User"}} 239 + 240 + response = _post_config(env["client"], env["key"], env["key_prefix"], config) 241 + 242 + assert response.status_code == 200 243 + assert response.get_json()["staged"] is True
+325 -21
tests/test_export.py
··· 38 38 think.utils._journal_path_cache = None 39 39 40 40 41 - def _make_session(*, manifest_data=None, get_status=200, post_status=200, post_json=None): 41 + def _make_session( 42 + *, manifest_data=None, get_status=200, post_status=200, post_json=None 43 + ): 42 44 mock = MagicMock(spec=requests.Session) 43 45 mock.headers = {} 44 46 ··· 140 142 return hashlib.sha256(path.read_bytes()).hexdigest() 141 143 142 144 145 + def _setup_imports(tmp_path): 146 + """Create test import metadata dirs in journal fixture.""" 147 + imports_dir = tmp_path / "imports" 148 + imports_dir.mkdir(parents=True, exist_ok=True) 149 + 150 + dir1 = imports_dir / "20260101_090000" 151 + dir1.mkdir() 152 + import_json_1 = {"original_filename": "cal.zip", "file_size": 100} 153 + imported_json_1 = { 154 + "processed_timestamp": "20260101_090000", 155 + "total_files_created": 1, 156 + } 157 + manifest_1 = [{"id": "event-0", "title": "Test Event"}] 158 + (dir1 / "import.json").write_text(json.dumps(import_json_1), encoding="utf-8") 159 + (dir1 / "imported.json").write_text(json.dumps(imported_json_1), encoding="utf-8") 160 + (dir1 / "content_manifest.jsonl").write_text( 161 + json.dumps(manifest_1[0]) + "\n", encoding="utf-8" 162 + ) 163 + 164 + dir2 = imports_dir / "20260102_100000" 165 + dir2.mkdir() 166 + import_json_2 = {"original_filename": "chat.zip", "file_size": 200} 167 + imported_json_2 = { 168 + "processed_timestamp": "20260102_100000", 169 + "total_files_created": 2, 170 + } 171 + manifest_2 = [{"id": "conv-0", "title": "Test Convo"}] 172 + (dir2 / "import.json").write_text(json.dumps(import_json_2), encoding="utf-8") 173 + (dir2 / "imported.json").write_text(json.dumps(imported_json_2), encoding="utf-8") 174 + (dir2 / "content_manifest.jsonl").write_text( 175 + json.dumps(manifest_2[0]) + "\n", encoding="utf-8" 176 + ) 177 + 178 + (imports_dir / "plaud.json").write_text('{"last_sync": 123}', encoding="utf-8") 179 + 180 + source_dir = imports_dir / "abcd1234" 181 + source_dir.mkdir() 182 + (source_dir / "segments").mkdir() 183 + 184 + return { 185 + "20260101_090000": { 186 + "import_json": import_json_1, 187 + "imported_json": imported_json_1, 188 + "content_manifest": manifest_1, 189 + }, 190 + "20260102_100000": { 191 + "import_json": import_json_2, 192 + "imported_json": imported_json_2, 193 + "content_manifest": manifest_2, 194 + }, 195 + } 196 + 197 + 198 + def _import_hash(import_data): 199 + """Compute hash matching export_imports algorithm.""" 200 + hash_input = json.dumps( 201 + { 202 + "import_json": import_data["import_json"], 203 + "imported_json": import_data["imported_json"], 204 + "content_manifest": import_data["content_manifest"], 205 + }, 206 + sort_keys=True, 207 + ensure_ascii=False, 208 + ).encode() 209 + return hashlib.sha256(hash_input).hexdigest() 210 + 211 + 212 + def _setup_config(tmp_path): 213 + """Create test config in journal fixture.""" 214 + config_dir = tmp_path / "config" 215 + config_dir.mkdir(parents=True, exist_ok=True) 216 + config = { 217 + "identity": {"name": "Test", "preferred": "Tester", "timezone": "UTC"}, 218 + "convey": { 219 + "password_hash": "secret_hash", 220 + "secret": "secret_val", 221 + "trust_localhost": True, 222 + }, 223 + "setup": {"completed_at": 12345}, 224 + "env": {"KEY": "val"}, 225 + "retention": {"days": 30}, 226 + } 227 + (config_dir / "journal.json").write_text(json.dumps(config), encoding="utf-8") 228 + return config 229 + 230 + 143 231 class TestExportSegments: 144 232 def test_manifest_query_and_delta(self, tmp_path, monkeypatch): 145 233 from observe.export import export_segments ··· 168 256 mock_session = _make_session(manifest_data=manifest_data) 169 257 170 258 with patch("observe.export.requests.Session", return_value=mock_session): 171 - export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 259 + export_segments( 260 + "https://example.com", "test-key", ["20260413"], dry_run=False 261 + ) 172 262 173 263 assert mock_session.post.call_count == 1 174 264 metadata = json.loads(mock_session.post.call_args.kwargs["data"]["metadata"]) ··· 183 273 mock_session = _make_session(manifest_data={}) 184 274 185 275 with patch("observe.export.requests.Session", return_value=mock_session): 186 - export_segments("https://example.com", "test-key", ["20260413"], dry_run=True) 276 + export_segments( 277 + "https://example.com", "test-key", ["20260413"], dry_run=True 278 + ) 187 279 188 280 assert mock_session.post.call_count == 0 189 281 output = capsys.readouterr().out ··· 217 309 mock_session = _make_session(manifest_data=manifest_data) 218 310 219 311 with patch("observe.export.requests.Session", return_value=mock_session): 220 - export_segments("https://example.com", "test-key", ["20260413"], dry_run=True) 312 + export_segments( 313 + "https://example.com", "test-key", ["20260413"], dry_run=True 314 + ) 221 315 222 316 assert mock_session.post.call_count == 0 223 317 output = capsys.readouterr().out ··· 251 345 patch("observe.export.requests.Session", return_value=mock_session), 252 346 patch("observe.export.time.sleep") as mock_sleep, 253 347 ): 254 - export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 348 + export_segments( 349 + "https://example.com", "test-key", ["20260413"], dry_run=False 350 + ) 255 351 256 352 assert mock_session.post.call_count == 3 257 353 assert mock_sleep.called ··· 282 378 mock_session.get.side_effect = requests.ConnectionError 283 379 284 380 with patch("observe.export.requests.Session", return_value=mock_session): 285 - export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 381 + export_segments( 382 + "https://example.com", "test-key", ["20260413"], dry_run=False 383 + ) 286 384 287 385 assert mock_session.post.call_count == 0 288 386 assert "Connection failed" in capsys.readouterr().out ··· 328 426 mock_session = _make_session(manifest_data=manifest_data) 329 427 330 428 with patch("observe.export.requests.Session", return_value=mock_session): 331 - export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 429 + export_segments( 430 + "https://example.com", "test-key", ["20260413"], dry_run=False 431 + ) 332 432 333 433 assert mock_session.post.call_count == 0 334 434 assert "up to date" in capsys.readouterr().out ··· 351 451 mock_session.post.side_effect = [first, second] 352 452 353 453 with patch("observe.export.requests.Session", return_value=mock_session): 354 - export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 454 + export_segments( 455 + "https://example.com", "test-key", ["20260413"], dry_run=False 456 + ) 355 457 356 458 assert mock_session.post.call_count == 2 357 459 output = capsys.readouterr().out ··· 367 469 mock_session = _make_session(manifest_data={}) 368 470 369 471 with patch("observe.export.requests.Session", return_value=mock_session): 370 - export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 472 + export_segments( 473 + "https://example.com", "test-key", ["20260413"], dry_run=False 474 + ) 371 475 372 476 for call in mock_session.post.call_args_list: 373 477 post_kwargs = call.kwargs ··· 572 676 _set_journal_override(monkeypatch, tmp_path) 573 677 574 678 post_json = {"created": 1, "merged": 0, "skipped": 0, "staged": 0, "errors": []} 575 - mock_session = _make_session(manifest_data={"received": {}}, post_json=post_json) 679 + mock_session = _make_session( 680 + manifest_data={"received": {}}, post_json=post_json 681 + ) 576 682 577 683 with patch("observe.export.requests.Session", return_value=mock_session): 578 684 export_facets("https://example.com", "test-key", dry_run=False) ··· 616 722 { 617 723 "name": "work", 618 724 "files": [ 619 - {"path": "entities/20260413.jsonl", "type": "detected_entities"}, 620 - {"path": "entities/alice/entity.json", "type": "entity_relationship"}, 725 + { 726 + "path": "entities/20260413.jsonl", 727 + "type": "detected_entities", 728 + }, 729 + { 730 + "path": "entities/alice/entity.json", 731 + "type": "entity_relationship", 732 + }, 621 733 { 622 734 "path": "entities/alice/observations.jsonl", 623 735 "type": "entity_observations", ··· 662 774 assert metadata["facets"][0]["name"] == "work" 663 775 assert metadata["facets"][0]["files"] == [ 664 776 {"path": "entities/20260413.jsonl", "type": "detected_entities"}, 665 - {"path": "entities/alice/observations.jsonl", "type": "entity_observations"}, 777 + { 778 + "path": "entities/alice/observations.jsonl", 779 + "type": "entity_observations", 780 + }, 666 781 {"path": "todos/20260413.jsonl", "type": "todos"}, 667 782 ] 668 783 ··· 681 796 output = capsys.readouterr().out 682 797 assert "personal: 2 new, 0 changed, 0 unchanged" in output 683 798 assert "work: 5 new, 0 changed, 0 unchanged" in output 684 - assert "Dry run: 7 new files, 0 changed, 0 unchanged across 2 facet(s)" in output 799 + assert ( 800 + "Dry run: 7 new files, 0 changed, 0 unchanged across 2 facet(s)" in output 801 + ) 685 802 686 803 def test_idempotent(self, tmp_path, monkeypatch, capsys): 687 804 from observe.export import export_facets ··· 696 813 for file_path in sorted(facet_dir.rglob("*")): 697 814 if file_path.is_file(): 698 815 rel_path = file_path.relative_to(facet_dir).as_posix() 699 - manifest_received[f"{facet_dir.name}/{rel_path}"] = _facet_file_hash(file_path) 816 + manifest_received[f"{facet_dir.name}/{rel_path}"] = ( 817 + _facet_file_hash(file_path) 818 + ) 700 819 mock_session = _make_session(manifest_data={"received": manifest_received}) 701 820 702 821 with patch("observe.export.requests.Session", return_value=mock_session): ··· 765 884 _set_journal_override(monkeypatch, tmp_path) 766 885 767 886 post_json = {"created": 1, "merged": 0, "skipped": 0, "staged": 0, "errors": []} 768 - mock_session = _make_session(manifest_data={"received": {}}, post_json=post_json) 887 + mock_session = _make_session( 888 + manifest_data={"received": {}}, post_json=post_json 889 + ) 769 890 770 891 with patch("observe.export.requests.Session", return_value=mock_session): 771 892 export_facets("https://example.com", "test-key", dry_run=False) ··· 783 904 _setup_facets(tmp_path) 784 905 events_dir = tmp_path / "facets" / "work" / "events" 785 906 events_dir.mkdir(parents=True) 786 - (events_dir / "20260413.jsonl").write_text('{"event": "ignored"}\n', encoding="utf-8") 907 + (events_dir / "20260413.jsonl").write_text( 908 + '{"event": "ignored"}\n', encoding="utf-8" 909 + ) 787 910 _set_journal_override(monkeypatch, tmp_path) 788 911 789 912 post_json = {"created": 1, "merged": 0, "skipped": 0, "staged": 0, "errors": []} 790 - mock_session = _make_session(manifest_data={"received": {}}, post_json=post_json) 913 + mock_session = _make_session( 914 + manifest_data={"received": {}}, post_json=post_json 915 + ) 791 916 792 917 with patch("observe.export.requests.Session", return_value=mock_session): 793 918 export_facets("https://example.com", "test-key", dry_run=False) 794 919 795 920 calls_by_facet = { 796 - json.loads(call.kwargs["data"]["metadata"])["facets"][0]["name"]: call.kwargs 921 + json.loads(call.kwargs["data"]["metadata"])["facets"][0][ 922 + "name" 923 + ]: call.kwargs 797 924 for call in mock_session.post.call_args_list 798 925 } 799 926 work_metadata = json.loads(calls_by_facet["work"]["data"]["metadata"]) 800 - uploaded_paths = [entry["path"] for entry in work_metadata["facets"][0]["files"]] 927 + uploaded_paths = [ 928 + entry["path"] for entry in work_metadata["facets"][0]["files"] 929 + ] 801 930 assert "events/20260413.jsonl" not in uploaded_paths 802 931 803 932 def test_response_errors_reported(self, tmp_path, monkeypatch, capsys): ··· 813 942 "staged": 0, 814 943 "errors": [{"facet": "work", "error": "entity merge conflict"}], 815 944 } 816 - mock_session = _make_session(manifest_data={"received": {}}, post_json=post_json) 945 + mock_session = _make_session( 946 + manifest_data={"received": {}}, post_json=post_json 947 + ) 817 948 818 949 with patch("observe.export.requests.Session", return_value=mock_session): 819 950 export_facets("https://example.com", "test-key", dry_run=False) ··· 821 952 output = capsys.readouterr().out 822 953 assert "entity merge conflict" in output 823 954 assert "error" in output.lower() 955 + 956 + 957 + class TestExportImports: 958 + def test_manifest_delta(self, tmp_path, monkeypatch): 959 + from observe.export import export_imports 960 + 961 + imports = _setup_imports(tmp_path) 962 + _set_journal_override(monkeypatch, tmp_path) 963 + 964 + manifest_data = { 965 + "received": {"20260101_090000": _import_hash(imports["20260101_090000"])} 966 + } 967 + post_json = {"copied": 1, "staged": 0, "skipped": 0, "errors": []} 968 + mock_session = _make_session(manifest_data=manifest_data, post_json=post_json) 969 + 970 + with patch("observe.export.requests.Session", return_value=mock_session): 971 + export_imports("https://example.com", "test-key", dry_run=False) 972 + 973 + assert mock_session.post.call_count == 1 974 + posted_data = mock_session.post.call_args.kwargs.get( 975 + "json" 976 + ) or mock_session.post.call_args[1].get("json") 977 + posted_ids = [entry["id"] for entry in posted_data["imports"]] 978 + assert posted_ids == ["20260102_100000"] 979 + 980 + def test_dry_run(self, tmp_path, monkeypatch, capsys): 981 + from observe.export import export_imports 982 + 983 + _setup_imports(tmp_path) 984 + _set_journal_override(monkeypatch, tmp_path) 985 + 986 + mock_session = _make_session(manifest_data={"received": {}}) 987 + 988 + with patch("observe.export.requests.Session", return_value=mock_session): 989 + export_imports("https://example.com", "test-key", dry_run=True) 990 + 991 + assert mock_session.post.call_count == 0 992 + output = capsys.readouterr().out 993 + assert "2 new" in output 994 + assert "0 changed" in output 995 + 996 + def test_idempotent(self, tmp_path, monkeypatch, capsys): 997 + from observe.export import export_imports 998 + 999 + imports = _setup_imports(tmp_path) 1000 + _set_journal_override(monkeypatch, tmp_path) 1001 + 1002 + manifest_data = { 1003 + "received": { 1004 + import_id: _import_hash(import_data) 1005 + for import_id, import_data in imports.items() 1006 + } 1007 + } 1008 + mock_session = _make_session(manifest_data=manifest_data) 1009 + 1010 + with patch("observe.export.requests.Session", return_value=mock_session): 1011 + export_imports("https://example.com", "test-key", dry_run=False) 1012 + 1013 + assert mock_session.post.call_count == 0 1014 + assert "up to date" in capsys.readouterr().out 1015 + 1016 + def test_sync_state_excluded(self, tmp_path, monkeypatch): 1017 + from observe.export import export_imports 1018 + 1019 + _setup_imports(tmp_path) 1020 + _set_journal_override(monkeypatch, tmp_path) 1021 + 1022 + mock_session = _make_session( 1023 + manifest_data={"received": {}}, 1024 + post_json={"copied": 2, "staged": 0, "skipped": 0, "errors": []}, 1025 + ) 1026 + 1027 + with patch("observe.export.requests.Session", return_value=mock_session): 1028 + export_imports("https://example.com", "test-key", dry_run=False) 1029 + 1030 + posted_data = mock_session.post.call_args.kwargs.get( 1031 + "json" 1032 + ) or mock_session.post.call_args[1].get("json") 1033 + posted_ids = {entry["id"] for entry in posted_data["imports"]} 1034 + assert "plaud.json" not in posted_ids 1035 + 1036 + def test_source_dir_excluded(self, tmp_path, monkeypatch): 1037 + from observe.export import export_imports 1038 + 1039 + _setup_imports(tmp_path) 1040 + _set_journal_override(monkeypatch, tmp_path) 1041 + 1042 + mock_session = _make_session( 1043 + manifest_data={"received": {}}, 1044 + post_json={"copied": 2, "staged": 0, "skipped": 0, "errors": []}, 1045 + ) 1046 + 1047 + with patch("observe.export.requests.Session", return_value=mock_session): 1048 + export_imports("https://example.com", "test-key", dry_run=False) 1049 + 1050 + posted_data = mock_session.post.call_args.kwargs.get( 1051 + "json" 1052 + ) or mock_session.post.call_args[1].get("json") 1053 + posted_ids = {entry["id"] for entry in posted_data["imports"]} 1054 + assert "abcd1234" not in posted_ids 1055 + 1056 + 1057 + class TestExportConfig: 1058 + def test_config_export(self, tmp_path, monkeypatch): 1059 + from observe.export import export_config 1060 + 1061 + _setup_config(tmp_path) 1062 + _set_journal_override(monkeypatch, tmp_path) 1063 + 1064 + mock_session = _make_session( 1065 + manifest_data={}, 1066 + post_json={"staged": True, "skipped": False, "diff_fields": 3}, 1067 + ) 1068 + 1069 + with patch("observe.export.requests.Session", return_value=mock_session): 1070 + export_config("https://example.com", "test-key", dry_run=False) 1071 + 1072 + assert mock_session.post.call_count == 1 1073 + 1074 + def test_dry_run(self, tmp_path, monkeypatch, capsys): 1075 + from observe.export import export_config 1076 + 1077 + _setup_config(tmp_path) 1078 + _set_journal_override(monkeypatch, tmp_path) 1079 + 1080 + mock_session = _make_session(manifest_data={}) 1081 + 1082 + with patch("observe.export.requests.Session", return_value=mock_session): 1083 + export_config("https://example.com", "test-key", dry_run=True) 1084 + 1085 + assert mock_session.post.call_count == 0 1086 + assert "would send snapshot" in capsys.readouterr().out 1087 + 1088 + def test_idempotent(self, tmp_path, monkeypatch, capsys): 1089 + from observe.export import _strip_never_transfer, export_config 1090 + 1091 + config = _setup_config(tmp_path) 1092 + _set_journal_override(monkeypatch, tmp_path) 1093 + 1094 + stripped = _strip_never_transfer(config) 1095 + content_hash = hashlib.sha256( 1096 + json.dumps(stripped, sort_keys=True, ensure_ascii=False).encode() 1097 + ).hexdigest() 1098 + mock_session = _make_session(manifest_data={"last_hash": content_hash}) 1099 + 1100 + with patch("observe.export.requests.Session", return_value=mock_session): 1101 + export_config("https://example.com", "test-key", dry_run=False) 1102 + 1103 + assert mock_session.post.call_count == 0 1104 + assert "up to date" in capsys.readouterr().out 1105 + 1106 + def test_never_transfer_stripped(self, tmp_path, monkeypatch): 1107 + from observe.export import export_config 1108 + 1109 + _setup_config(tmp_path) 1110 + _set_journal_override(monkeypatch, tmp_path) 1111 + 1112 + mock_session = _make_session( 1113 + manifest_data={}, 1114 + post_json={"staged": True, "skipped": False, "diff_fields": 3}, 1115 + ) 1116 + 1117 + with patch("observe.export.requests.Session", return_value=mock_session): 1118 + export_config("https://example.com", "test-key", dry_run=False) 1119 + 1120 + posted_data = mock_session.post.call_args.kwargs.get( 1121 + "json" 1122 + ) or mock_session.post.call_args[1].get("json") 1123 + posted_config = posted_data["config"] 1124 + assert posted_config["convey"] == {"trust_localhost": True} 1125 + assert "setup" in posted_config 1126 + assert posted_config["setup"] == {} 1127 + assert "env" not in posted_config
+306
tests/test_imports_ingest.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import hashlib 7 + import json 8 + from importlib import import_module 9 + 10 + import pytest 11 + from flask import Blueprint, Flask 12 + 13 + import convey.state 14 + 15 + journal_sources = import_module("apps.import.journal_sources") 16 + ingest = import_module("apps.import.ingest") 17 + 18 + create_state_directory = journal_sources.create_state_directory 19 + generate_key = journal_sources.generate_key 20 + get_state_directory = journal_sources.get_state_directory 21 + load_journal_source = journal_sources.load_journal_source 22 + save_journal_source = journal_sources.save_journal_source 23 + register_ingest_routes = ingest.register_ingest_routes 24 + 25 + 26 + @pytest.fixture 27 + def journal_env(tmp_path, monkeypatch): 28 + monkeypatch.setattr(convey.state, "journal_root", str(tmp_path), raising=False) 29 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 30 + (tmp_path / "apps" / "import" / "journal_sources").mkdir( 31 + parents=True, exist_ok=True 32 + ) 33 + return tmp_path 34 + 35 + 36 + def _source(name="test-source", key=None, **overrides): 37 + if key is None: 38 + key = generate_key() 39 + source = { 40 + "name": name, 41 + "key": key, 42 + "created_at": 1000, 43 + "enabled": True, 44 + "revoked": False, 45 + "revoked_at": None, 46 + "stats": { 47 + "segments_received": 0, 48 + "entities_received": 0, 49 + "facets_received": 0, 50 + "imports_received": 0, 51 + "config_received": 0, 52 + }, 53 + } 54 + source.update(overrides) 55 + return source 56 + 57 + 58 + @pytest.fixture 59 + def ingest_env(journal_env): 60 + key = generate_key() 61 + source = _source(key=key) 62 + save_journal_source(source) 63 + key_prefix = key[:8] 64 + create_state_directory(journal_env, key_prefix) 65 + 66 + app = Flask(__name__) 67 + app.config["TESTING"] = True 68 + bp = Blueprint("import-test", __name__, url_prefix="/app/import") 69 + register_ingest_routes(bp) 70 + app.register_blueprint(bp) 71 + 72 + return { 73 + "root": journal_env, 74 + "key": key, 75 + "key_prefix": key_prefix, 76 + "source": source, 77 + "client": app.test_client(), 78 + } 79 + 80 + 81 + def _sample_import(import_id="20260101_090000"): 82 + return { 83 + "id": import_id, 84 + "import_json": { 85 + "original_filename": "test.zip", 86 + "upload_timestamp": 1767258000000, 87 + "upload_datetime": "2026-01-01T09:00:00", 88 + "user_timestamp": import_id, 89 + "file_size": 1234, 90 + "mime_type": "application/zip", 91 + "facet": "work", 92 + "setting": "calendar", 93 + "file_path": f"imports/{import_id}/test.zip", 94 + }, 95 + "imported_json": { 96 + "processed_timestamp": import_id, 97 + "processing_completed": "2026-01-01T09:10:00", 98 + "total_files_created": 1, 99 + "all_created_files": ["20260101/import.ics/090000_300/event.md"], 100 + "segments": ["090000_300"], 101 + "source_type": "ics", 102 + "source_display": "Calendar", 103 + "entries_written": 1, 104 + "entities_seeded": 0, 105 + "date_range": ["20260101", "20260101"], 106 + "target_day": "20260101", 107 + }, 108 + "content_manifest": [ 109 + { 110 + "id": "event-0", 111 + "title": "Test Event", 112 + "date": "20260101", 113 + "type": "event", 114 + } 115 + ], 116 + } 117 + 118 + 119 + def _import_hash(item: dict) -> str: 120 + hash_input = json.dumps( 121 + { 122 + "import_json": item["import_json"], 123 + "imported_json": item["imported_json"], 124 + "content_manifest": item["content_manifest"], 125 + }, 126 + sort_keys=True, 127 + ensure_ascii=False, 128 + ).encode() 129 + return hashlib.sha256(hash_input).hexdigest() 130 + 131 + 132 + def _post_imports(client, key, key_prefix, imports_list): 133 + return client.post( 134 + f"/app/import/journal/{key_prefix}/ingest/imports", 135 + headers={"Authorization": f"Bearer {key}"}, 136 + json={"imports": imports_list}, 137 + ) 138 + 139 + 140 + def _read_state(key_prefix: str) -> dict: 141 + state_path = get_state_directory(key_prefix) / "imports" / "state.json" 142 + return json.loads(state_path.read_text(encoding="utf-8")) 143 + 144 + 145 + def _read_log(key_prefix: str) -> list[dict]: 146 + log_path = get_state_directory(key_prefix) / "imports" / "log.jsonl" 147 + if not log_path.exists(): 148 + return [] 149 + return [ 150 + json.loads(line) 151 + for line in log_path.read_text(encoding="utf-8").splitlines() 152 + if line.strip() 153 + ] 154 + 155 + 156 + def test_auth_required(ingest_env): 157 + env = ingest_env 158 + response = env["client"].post( 159 + f"/app/import/journal/{env['key_prefix']}/ingest/imports", 160 + json={"imports": []}, 161 + ) 162 + assert response.status_code == 401 163 + 164 + 165 + def test_key_prefix_mismatch(ingest_env): 166 + env = ingest_env 167 + response = env["client"].post( 168 + "/app/import/journal/deadbeef/ingest/imports", 169 + headers={"Authorization": f"Bearer {env['key']}"}, 170 + json={"imports": []}, 171 + ) 172 + assert response.status_code == 403 173 + 174 + 175 + def test_invalid_json(ingest_env): 176 + env = ingest_env 177 + response = env["client"].post( 178 + f"/app/import/journal/{env['key_prefix']}/ingest/imports", 179 + headers={"Authorization": f"Bearer {env['key']}"}, 180 + data="not-json", 181 + content_type="application/json", 182 + ) 183 + assert response.status_code == 400 184 + assert response.get_json() == {"error": "Invalid JSON body"} 185 + 186 + 187 + def test_missing_imports_array(ingest_env): 188 + env = ingest_env 189 + response = env["client"].post( 190 + f"/app/import/journal/{env['key_prefix']}/ingest/imports", 191 + headers={"Authorization": f"Bearer {env['key']}"}, 192 + json={}, 193 + ) 194 + assert response.status_code == 400 195 + assert response.get_json() == {"error": "Missing imports array"} 196 + 197 + 198 + def test_copy_new_import(ingest_env): 199 + env = ingest_env 200 + item = _sample_import() 201 + response = _post_imports(env["client"], env["key"], env["key_prefix"], [item]) 202 + body = response.get_json() 203 + import_dir = env["root"] / "imports" / item["id"] 204 + 205 + assert response.status_code == 200 206 + assert body == {"copied": 1, "skipped": 0, "staged": 0, "errors": []} 207 + assert ( 208 + json.loads((import_dir / "import.json").read_text(encoding="utf-8")) 209 + == item["import_json"] 210 + ) 211 + assert ( 212 + json.loads((import_dir / "imported.json").read_text(encoding="utf-8")) 213 + == item["imported_json"] 214 + ) 215 + assert [ 216 + json.loads(line) 217 + for line in (import_dir / "content_manifest.jsonl") 218 + .read_text(encoding="utf-8") 219 + .splitlines() 220 + if line.strip() 221 + ] == item["content_manifest"] 222 + assert _read_state(env["key_prefix"]) == { 223 + "received": {item["id"]: _import_hash(item)} 224 + } 225 + assert _read_log(env["key_prefix"])[0]["action"] == "copied" 226 + source = load_journal_source(env["key"]) 227 + assert source["stats"]["imports_received"] == 1 228 + 229 + 230 + def test_dedup_identical(ingest_env): 231 + env = ingest_env 232 + item = _sample_import() 233 + 234 + first = _post_imports(env["client"], env["key"], env["key_prefix"], [item]) 235 + second = _post_imports(env["client"], env["key"], env["key_prefix"], [item]) 236 + 237 + assert first.status_code == 200 238 + assert second.status_code == 200 239 + assert second.get_json() == {"copied": 0, "skipped": 1, "staged": 0, "errors": []} 240 + assert _read_log(env["key_prefix"])[-1]["reason"] == "idempotent" 241 + 242 + 243 + def test_id_collision(ingest_env): 244 + env = ingest_env 245 + item = _sample_import() 246 + (env["root"] / "imports" / item["id"]).mkdir(parents=True, exist_ok=True) 247 + 248 + response = _post_imports(env["client"], env["key"], env["key_prefix"], [item]) 249 + body = response.get_json() 250 + staged_path = ( 251 + get_state_directory(env["key_prefix"]) 252 + / "imports" 253 + / "staged" 254 + / f"{item['id']}.json" 255 + ) 256 + 257 + assert response.status_code == 200 258 + assert body == {"copied": 0, "skipped": 0, "staged": 1, "errors": []} 259 + assert staged_path.exists() 260 + assert ( 261 + json.loads(staged_path.read_text(encoding="utf-8"))["reason"] == "id_collision" 262 + ) 263 + 264 + 265 + def test_multiple_imports(ingest_env): 266 + env = ingest_env 267 + first = _sample_import("20260101_090000") 268 + second = _sample_import("20260101_100000") 269 + third = _sample_import("20260101_110000") 270 + _post_imports(env["client"], env["key"], env["key_prefix"], [first]) 271 + (env["root"] / "imports" / third["id"]).mkdir(parents=True, exist_ok=True) 272 + 273 + response = _post_imports( 274 + env["client"], env["key"], env["key_prefix"], [first, second, third] 275 + ) 276 + 277 + assert response.status_code == 200 278 + assert response.get_json() == {"copied": 1, "skipped": 1, "staged": 1, "errors": []} 279 + 280 + 281 + def test_state_manifest(ingest_env): 282 + env = ingest_env 283 + item = _sample_import() 284 + 285 + response = _post_imports(env["client"], env["key"], env["key_prefix"], [item]) 286 + 287 + assert response.status_code == 200 288 + assert _read_state(env["key_prefix"]) == { 289 + "received": {item["id"]: _import_hash(item)} 290 + } 291 + 292 + 293 + def test_stats_update(ingest_env): 294 + env = ingest_env 295 + first = _sample_import("20260101_090000") 296 + second = _sample_import("20260101_100000") 297 + collision = _sample_import("20260101_110000") 298 + (env["root"] / "imports" / collision["id"]).mkdir(parents=True, exist_ok=True) 299 + 300 + response = _post_imports( 301 + env["client"], env["key"], env["key_prefix"], [first, second, collision] 302 + ) 303 + source = load_journal_source(env["key"]) 304 + 305 + assert response.status_code == 200 306 + assert source["stats"]["imports_received"] == 2