personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor export orchestration

Add ExportResult for structured export outcomes and refactor all five export helpers to accept an optional shared session while returning result data.

Rewrite sol export main() to orchestrate full exports in dependency order (segments, imports, entities, facets, config), isolate per-area failures, and print a unified summary.

Add end-to-end integration coverage for the export-to-ingest pipeline, including idempotence, per-area exports, staging behavior, dry-run handling, and orchestrator error resilience.

+948 -135
+357 -135
observe/export.py
··· 16 16 import re 17 17 import sys 18 18 import time 19 + from dataclasses import dataclass, field 19 20 from pathlib import Path, PurePosixPath 20 21 from typing import Any 21 22 ··· 51 52 ) 52 53 53 54 55 + @dataclass 56 + class ExportResult: 57 + """Result of a single area export.""" 58 + 59 + area: str = "" 60 + sent: int = 0 61 + skipped: int = 0 62 + staged: int = 0 63 + failed: int = 0 64 + errors: list[str] = field(default_factory=list) 65 + error: str | None = None 66 + 67 + 54 68 def _query_manifest( 55 69 session: requests.Session, base_url: str, key: str, area: str = "segments" 56 70 ) -> dict[str, Any]: ··· 62 76 if response.status_code == 403: 63 77 raise ValueError("Authentication failed: journal source revoked or disabled") 64 78 if response.status_code != 200: 65 - raise ValueError(f"Manifest query failed: {response.status_code} {response.text}") 79 + raise ValueError( 80 + f"Manifest query failed: {response.status_code} {response.text}" 81 + ) 66 82 return response.json() 67 83 68 84 ··· 222 238 return result 223 239 224 240 225 - def export_segments(base_url: str, key: str, days: list[str], dry_run: bool) -> None: 226 - session = requests.Session() 227 - session.headers["Authorization"] = f"Bearer {key}" 241 + def export_segments( 242 + base_url: str, 243 + key: str, 244 + days: list[str], 245 + dry_run: bool, 246 + session: requests.Session | None = None, 247 + ) -> ExportResult: 248 + own_session = session is None 249 + if own_session: 250 + session = requests.Session() 251 + session.headers["Authorization"] = f"Bearer {key}" 228 252 229 - sent = 0 230 - skipped = 0 231 - failed = 0 253 + result = ExportResult(area="segments") 232 254 bytes_total = 0 233 255 234 256 try: 235 257 try: 236 258 remote_manifest = _query_manifest(session, base_url, key) 237 259 except requests.ConnectionError: 238 - print(f"Connection failed: could not reach {base_url}") 239 - return 260 + result.error = f"Connection failed: could not reach {base_url}" 261 + print(result.error) 262 + return result 240 263 except ValueError as e: 241 - print(str(e)) 242 - return 264 + result.error = str(e) 265 + print(result.error) 266 + return result 243 267 244 268 journal = get_journal() 245 269 for day in days: ··· 262 286 if file_info["name"] != "stream.json" 263 287 } 264 288 if not local_files: 265 - skipped += 1 289 + result.skipped += 1 266 290 continue 267 291 268 292 remote_entry = remote_manifest.get(day, {}).get( ··· 273 297 for file_info in remote_entry.get("files", []) 274 298 } 275 299 if local_files == remote_files: 276 - skipped += 1 300 + result.skipped += 1 277 301 logger.info(f" [skip] {day}/{stream_name}/{seg_key}") 278 302 continue 279 303 ··· 301 325 logger.info( 302 326 f" [sent] {day}/{stream_name}/{seg_key} ({segment_bytes} bytes)" 303 327 ) 304 - sent += 1 328 + result.sent += 1 305 329 bytes_total += segment_bytes 306 330 elif status == "skip": 307 - skipped += 1 331 + result.skipped += 1 308 332 elif status == "auth_invalid": 309 - print("Authentication failed: invalid or missing API key") 310 - return 333 + result.error = "Authentication failed: invalid or missing API key" 334 + print(result.error) 335 + return result 311 336 elif status == "auth_revoked": 312 - print("Authentication failed: journal source revoked or disabled") 313 - return 337 + result.error = ( 338 + "Authentication failed: journal source revoked or disabled" 339 + ) 340 + print(result.error) 341 + return result 314 342 else: 315 343 logger.info(f" [FAILED] {day}/{stream_name}/{seg_key}") 316 - failed += 1 344 + result.failed += 1 317 345 318 346 if dry_run: 319 - sent += day_sent 347 + result.sent += day_sent 320 348 if day_sent > 0: 321 349 print(f" {day}: {day_sent} segment(s), {day_bytes} bytes") 322 350 323 - total = sent + skipped + failed 351 + total = result.sent + result.skipped + result.failed 324 352 if total == 0: 325 353 print("No segments found to export") 326 - return 354 + return result 327 355 if dry_run: 328 - print(f"\nDry run: would send {sent}, skip {skipped}") 329 - return 356 + print(f"\nDry run: would send {result.sent}, skip {result.skipped}") 357 + return result 330 358 331 359 print( 332 - f"\nExport complete: {sent} sent, {skipped} skipped, " 333 - f"{failed} failed, {bytes_total} bytes transferred" 360 + f"\nExport complete: {result.sent} sent, {result.skipped} skipped, " 361 + f"{result.failed} failed, {bytes_total} bytes transferred" 334 362 ) 335 - if sent == 0 and skipped > 0 and failed == 0: 363 + if result.sent == 0 and result.skipped > 0 and result.failed == 0: 336 364 print("Nothing to send - remote is up to date") 365 + return result 337 366 finally: 338 - session.close() 367 + if own_session: 368 + session.close() 339 369 340 370 341 - def export_entities(base_url: str, key: str, dry_run: bool) -> None: 342 - session = requests.Session() 343 - session.headers["Authorization"] = f"Bearer {key}" 371 + def export_entities( 372 + base_url: str, 373 + key: str, 374 + dry_run: bool, 375 + session: requests.Session | None = None, 376 + ) -> ExportResult: 377 + own_session = session is None 378 + if own_session: 379 + session = requests.Session() 380 + session.headers["Authorization"] = f"Bearer {key}" 381 + 382 + result = ExportResult(area="entities") 344 383 345 384 try: 346 385 try: 347 386 remote_manifest = _query_manifest(session, base_url, key, area="entities") 348 387 except requests.ConnectionError: 349 - print(f"Connection failed: could not reach {base_url}") 350 - return 388 + result.error = f"Connection failed: could not reach {base_url}" 389 + print(result.error) 390 + return result 351 391 except ValueError as e: 352 - print(str(e)) 353 - return 392 + result.error = str(e) 393 + print(result.error) 394 + return result 354 395 355 396 received = remote_manifest.get("received", {}) 356 397 entities = load_all_journal_entities() 357 398 if not entities: 358 399 print("No entities found to export") 359 - return 400 + return result 360 401 361 402 new_count = 0 362 403 changed_count = 0 ··· 381 422 to_send.append(entity) 382 423 383 424 if dry_run: 425 + result.sent = len(to_send) 426 + result.skipped = unchanged_count 384 427 print( 385 428 f"Dry run: {new_count} new, {changed_count} changed, " 386 429 f"{unchanged_count} unchanged" 387 430 ) 388 - return 431 + return result 389 432 390 433 if not to_send: 434 + result.skipped = unchanged_count 391 435 print("Nothing to send - remote entities are up to date") 392 - return 436 + return result 393 437 394 438 key_prefix = key[:8] 395 439 url = f"{base_url}/app/import/journal/{key_prefix}/ingest/entities" ··· 401 445 if response.status_code == 200: 402 446 break 403 447 if response.status_code == 401: 404 - print("Authentication failed: invalid or missing API key") 405 - return 448 + result.error = "Authentication failed: invalid or missing API key" 449 + print(result.error) 450 + return result 406 451 if response.status_code == 403: 407 - print("Authentication failed: journal source revoked or disabled") 408 - return 452 + result.error = ( 453 + "Authentication failed: journal source revoked or disabled" 454 + ) 455 + print(result.error) 456 + return result 409 457 if 500 <= response.status_code <= 599: 410 458 logger.warning( 411 459 "Entity upload attempt %s failed: %s %s", ··· 414 462 response.text, 415 463 ) 416 464 else: 417 - print(f"Entity upload failed: {response.status_code} {response.text}") 418 - return 465 + result.error = ( 466 + f"Entity upload failed: {response.status_code} {response.text}" 467 + ) 468 + print(result.error) 469 + return result 419 470 except (requests.RequestException, OSError) as e: 420 471 logger.warning("Entity upload attempt %s failed: %s", attempt + 1, e) 421 472 if attempt < len(RETRY_BACKOFF) - 1: 422 473 time.sleep(delay) 423 474 else: 424 - print("Entity upload failed after all retries") 425 - return 475 + result.error = "Entity upload failed after all retries" 476 + print(result.error) 477 + return result 426 478 427 - result = response.json() 428 - errors = result.get("errors", []) 479 + response_data = response.json() 480 + errors = [str(error) for error in response_data.get("errors", [])] 481 + result.sent = response_data.get("created", 0) + response_data.get( 482 + "auto_merged", 0 483 + ) 484 + result.staged = response_data.get("staged", 0) 485 + result.skipped = response_data.get("skipped", 0) 486 + result.errors = errors 429 487 if errors: 430 488 for err in errors: 431 489 print(f" Error: {err}") 432 490 print( 433 - f"\nExport complete: {result.get('created', 0)} created, " 434 - f"{result.get('auto_merged', 0)} merged, " 435 - f"{result.get('staged', 0)} staged, " 436 - f"{result.get('skipped', 0)} skipped" 491 + f"\nExport complete: {response_data.get('created', 0)} created, " 492 + f"{response_data.get('auto_merged', 0)} merged, " 493 + f"{response_data.get('staged', 0)} staged, " 494 + f"{response_data.get('skipped', 0)} skipped" 437 495 ) 438 496 if errors: 439 497 print(f" {len(errors)} error(s)") 498 + return result 440 499 finally: 441 - session.close() 500 + if own_session: 501 + session.close() 442 502 443 503 444 - def export_facets(base_url: str, key: str, dry_run: bool) -> None: 445 - session = requests.Session() 446 - session.headers["Authorization"] = f"Bearer {key}" 504 + def export_facets( 505 + base_url: str, 506 + key: str, 507 + dry_run: bool, 508 + session: requests.Session | None = None, 509 + ) -> ExportResult: 510 + own_session = session is None 511 + if own_session: 512 + session = requests.Session() 513 + session.headers["Authorization"] = f"Bearer {key}" 514 + 515 + result = ExportResult(area="facets") 447 516 448 517 try: 449 518 try: 450 519 remote_manifest = _query_manifest(session, base_url, key, area="facets") 451 520 except requests.ConnectionError: 452 - print(f"Connection failed: could not reach {base_url}") 453 - return 521 + result.error = f"Connection failed: could not reach {base_url}" 522 + print(result.error) 523 + return result 454 524 except ValueError as e: 455 - print(str(e)) 456 - return 525 + result.error = str(e) 526 + print(result.error) 527 + return result 457 528 458 529 received = remote_manifest.get("received", {}) 459 530 460 531 facets_dir = Path(get_journal()) / "facets" 461 532 if not facets_dir.is_dir(): 462 533 print("No facets found to export") 463 - return 534 + return result 464 535 465 536 facet_names = sorted( 466 537 d.name 467 538 for d in facets_dir.iterdir() 468 - if d.is_dir() and _FACET_NAME_RE.match(d.name) and (d / "facet.json").is_file() 539 + if d.is_dir() 540 + and _FACET_NAME_RE.match(d.name) 541 + and (d / "facet.json").is_file() 469 542 ) 470 543 if not facet_names: 471 544 print("No facets found to export") 472 - return 545 + return result 473 546 474 547 total_new = 0 475 548 total_changed = 0 ··· 486 559 facet_path = facets_dir / facet_name 487 560 488 561 classified_files = [] 489 - for abs_path in sorted(facet_path.rglob("*"), key=lambda path: path.as_posix()): 562 + for abs_path in sorted( 563 + facet_path.rglob("*"), key=lambda path: path.as_posix() 564 + ): 490 565 if not abs_path.is_file(): 491 566 continue 492 567 relative = PurePosixPath(abs_path.relative_to(facet_path)) ··· 563 638 timeout=UPLOAD_TIMEOUT, 564 639 ) 565 640 if response.status_code == 200: 566 - result = response.json() 567 - errors = result.get("errors", []) 641 + response_data = response.json() 642 + errors = [ 643 + str(error) for error in response_data.get("errors", []) 644 + ] 645 + result.errors.extend(errors) 568 646 total_errors += len(errors) 569 647 if errors: 570 648 for err in errors: ··· 572 650 logger.info( 573 651 " [sent] %s: %s created, %s merged, %s staged", 574 652 facet_name, 575 - result.get("created", 0), 576 - result.get("merged", 0), 577 - result.get("staged", 0), 653 + response_data.get("created", 0), 654 + response_data.get("merged", 0), 655 + response_data.get("staged", 0), 578 656 ) 579 657 total_facets_sent += 1 580 658 break 581 659 if response.status_code == 401: 582 - print("Authentication failed: invalid or missing API key") 583 - return 660 + result.error = ( 661 + "Authentication failed: invalid or missing API key" 662 + ) 663 + print(result.error) 664 + return result 584 665 if response.status_code == 403: 585 - print("Authentication failed: journal source revoked or disabled") 586 - return 666 + result.error = ( 667 + "Authentication failed: journal source revoked or disabled" 668 + ) 669 + print(result.error) 670 + return result 587 671 if 500 <= response.status_code <= 599: 588 672 logger.warning( 589 673 "Facet upload attempt %s failed for %s: %s %s", ··· 618 702 if attempt < len(RETRY_BACKOFF) - 1: 619 703 time.sleep(delay) 620 704 else: 621 - logger.warning("Facet upload failed after all retries for %s", facet_name) 705 + logger.warning( 706 + "Facet upload failed after all retries for %s", facet_name 707 + ) 622 708 total_facets_failed += 1 623 709 624 710 if dry_run: ··· 629 715 f"\nDry run: {total_new} new files, {total_changed} changed, " 630 716 f"{total_unchanged} unchanged across {total_facets_sent} facet(s)" 631 717 ) 632 - return 718 + result.sent = total_facets_sent 719 + result.skipped = total_facets_skipped 720 + result.failed = total_facets_failed 721 + return result 633 722 634 723 if total_facets_sent == 0 and total_facets_failed == 0: 724 + result.skipped = total_facets_skipped 635 725 print("Nothing to send - remote facets are up to date") 636 - return 726 + return result 637 727 638 728 print( 639 729 f"\nFacet export complete: {total_facets_sent} sent, " ··· 641 731 ) 642 732 if total_errors: 643 733 print(f" {total_errors} error(s)") 734 + result.sent = total_facets_sent 735 + result.skipped = total_facets_skipped 736 + result.failed = total_facets_failed 737 + return result 644 738 finally: 645 - session.close() 739 + if own_session: 740 + session.close() 646 741 647 742 648 - def export_imports(base_url: str, key: str, dry_run: bool) -> None: 743 + def export_imports( 744 + base_url: str, 745 + key: str, 746 + dry_run: bool, 747 + session: requests.Session | None = None, 748 + ) -> ExportResult: 649 749 """Export import metadata to a remote solstone instance.""" 650 - session = requests.Session() 651 - session.headers["Authorization"] = f"Bearer {key}" 750 + own_session = session is None 751 + if own_session: 752 + session = requests.Session() 753 + session.headers["Authorization"] = f"Bearer {key}" 754 + 755 + result = ExportResult(area="imports") 652 756 653 757 try: 654 758 try: 655 759 remote_manifest = _query_manifest(session, base_url, key, area="imports") 656 760 except requests.ConnectionError: 657 - print(f"Connection failed: could not reach {base_url}") 658 - return 761 + result.error = f"Connection failed: could not reach {base_url}" 762 + print(result.error) 763 + return result 659 764 except ValueError as e: 660 - print(str(e)) 661 - return 765 + result.error = str(e) 766 + print(result.error) 767 + return result 662 768 663 769 received = remote_manifest.get("received", {}) 664 770 ··· 666 772 imports_dir = journal_root / "imports" 667 773 if not imports_dir.is_dir(): 668 774 print("No imports directory found") 669 - return 775 + return result 670 776 671 777 sync_state_names = {f"{name}.json" for name in SYNCABLE_REGISTRY} 672 778 ··· 736 842 ) 737 843 738 844 if dry_run: 845 + result.sent = len(to_send) 846 + result.skipped = unchanged_count 739 847 print( 740 848 f"Dry run: {new_count} new, {changed_count} changed, " 741 849 f"{unchanged_count} unchanged" 742 850 ) 743 - return 851 + return result 744 852 745 853 if not to_send: 854 + result.skipped = unchanged_count 746 855 print("Nothing to send - remote imports are up to date") 747 - return 856 + return result 748 857 749 858 key_prefix = key[:8] 750 859 url = f"{base_url}/app/import/journal/{key_prefix}/ingest/imports" ··· 756 865 if response.status_code == 200: 757 866 break 758 867 if response.status_code == 401: 759 - print("Authentication failed: invalid or missing API key") 760 - return 868 + result.error = "Authentication failed: invalid or missing API key" 869 + print(result.error) 870 + return result 761 871 if response.status_code == 403: 762 - print("Authentication failed: journal source revoked or disabled") 763 - return 872 + result.error = ( 873 + "Authentication failed: journal source revoked or disabled" 874 + ) 875 + print(result.error) 876 + return result 764 877 if 500 <= response.status_code <= 599: 765 878 logger.warning( 766 879 "Import upload attempt %s failed: %s %s", ··· 769 882 response.text, 770 883 ) 771 884 else: 772 - print( 885 + result.error = ( 773 886 f"Import upload failed: {response.status_code} {response.text}" 774 887 ) 775 - return 888 + print(result.error) 889 + return result 776 890 except (requests.RequestException, OSError) as e: 777 891 logger.warning("Import upload attempt %s failed: %s", attempt + 1, e) 778 892 if attempt < len(RETRY_BACKOFF) - 1: 779 893 time.sleep(delay) 780 894 else: 781 - print("Import upload failed after all retries") 782 - return 895 + result.error = "Import upload failed after all retries" 896 + print(result.error) 897 + return result 783 898 784 - result = response.json() 785 - errors = result.get("errors", []) 899 + response_data = response.json() 900 + errors = [str(error) for error in response_data.get("errors", [])] 901 + result.sent = response_data.get("copied", 0) 902 + result.staged = response_data.get("staged", 0) 903 + result.skipped = response_data.get("skipped", 0) 904 + result.errors = errors 786 905 if errors: 787 906 for err in errors: 788 907 print(f" Error: {err}") 789 908 print( 790 - f"\nExport complete: {result.get('copied', 0)} copied, " 791 - f"{result.get('staged', 0)} staged, " 792 - f"{result.get('skipped', 0)} skipped" 909 + f"\nExport complete: {response_data.get('copied', 0)} copied, " 910 + f"{response_data.get('staged', 0)} staged, " 911 + f"{response_data.get('skipped', 0)} skipped" 793 912 ) 794 913 if errors: 795 914 print(f" {len(errors)} error(s)") 915 + return result 796 916 finally: 797 - session.close() 917 + if own_session: 918 + session.close() 798 919 799 920 800 - def export_config(base_url: str, key: str, dry_run: bool) -> None: 921 + def export_config( 922 + base_url: str, 923 + key: str, 924 + dry_run: bool, 925 + session: requests.Session | None = None, 926 + ) -> ExportResult: 801 927 """Export config snapshot to a remote solstone instance.""" 802 - session = requests.Session() 803 - session.headers["Authorization"] = f"Bearer {key}" 928 + own_session = session is None 929 + if own_session: 930 + session = requests.Session() 931 + session.headers["Authorization"] = f"Bearer {key}" 932 + 933 + result = ExportResult(area="config") 804 934 805 935 try: 806 936 try: 807 937 remote_manifest = _query_manifest(session, base_url, key, area="config") 808 938 except requests.ConnectionError: 809 - print(f"Connection failed: could not reach {base_url}") 810 - return 939 + result.error = f"Connection failed: could not reach {base_url}" 940 + print(result.error) 941 + return result 811 942 except ValueError as e: 812 - print(str(e)) 813 - return 943 + result.error = str(e) 944 + print(result.error) 945 + return result 814 946 815 947 config = _strip_never_transfer(get_config()) 816 948 content_hash = hashlib.sha256( ··· 818 950 ).hexdigest() 819 951 820 952 if remote_manifest.get("last_hash") == content_hash: 953 + result.skipped = 1 821 954 print("Nothing to send - remote config is up to date") 822 - return 955 + return result 823 956 824 957 if dry_run: 958 + result.staged = 1 825 959 print("Dry run: config has changed, would send snapshot") 826 - return 960 + return result 827 961 828 962 key_prefix = key[:8] 829 963 url = f"{base_url}/app/import/journal/{key_prefix}/ingest/config" ··· 835 969 if response.status_code == 200: 836 970 break 837 971 if response.status_code == 401: 838 - print("Authentication failed: invalid or missing API key") 839 - return 972 + result.error = "Authentication failed: invalid or missing API key" 973 + print(result.error) 974 + return result 840 975 if response.status_code == 403: 841 - print("Authentication failed: journal source revoked or disabled") 842 - return 976 + result.error = ( 977 + "Authentication failed: journal source revoked or disabled" 978 + ) 979 + print(result.error) 980 + return result 843 981 if 500 <= response.status_code <= 599: 844 982 logger.warning( 845 983 "Config upload attempt %s failed: %s %s", ··· 848 986 response.text, 849 987 ) 850 988 else: 851 - print( 989 + result.error = ( 852 990 f"Config upload failed: {response.status_code} {response.text}" 853 991 ) 854 - return 992 + print(result.error) 993 + return result 855 994 except (requests.RequestException, OSError) as e: 856 995 logger.warning("Config upload attempt %s failed: %s", attempt + 1, e) 857 996 if attempt < len(RETRY_BACKOFF) - 1: 858 997 time.sleep(delay) 859 998 else: 860 - print("Config upload failed after all retries") 861 - return 999 + result.error = "Config upload failed after all retries" 1000 + print(result.error) 1001 + return result 862 1002 863 - result = response.json() 864 - if result.get("staged"): 1003 + result_data = response.json() 1004 + if result_data.get("staged"): 1005 + result.staged = 1 865 1006 print( 866 - f"\nExport complete: config staged ({result.get('diff_fields', 0)} fields differ)" 1007 + f"\nExport complete: config staged ({result_data.get('diff_fields', 0)} fields differ)" 867 1008 ) 868 - elif result.get("skipped"): 1009 + elif result_data.get("skipped"): 1010 + result.skipped = 1 869 1011 print("Nothing to send - remote config is up to date") 1012 + return result 870 1013 finally: 871 - session.close() 1014 + if own_session: 1015 + session.close() 872 1016 873 1017 874 1018 def main() -> None: ··· 907 1051 if args.only == "entities": 908 1052 export_entities(base_url, args.key, args.dry_run) 909 1053 return 910 - 911 1054 if args.only == "facets": 912 1055 export_facets(base_url, args.key, args.dry_run) 913 1056 return 914 - 915 1057 if args.only == "imports": 916 1058 export_imports(base_url, args.key, args.dry_run) 917 1059 return 918 - 919 1060 if args.only == "config": 920 1061 export_config(base_url, args.key, args.dry_run) 921 1062 return 922 - 923 - if args.only is not None and args.only != "segments": 1063 + if args.only == "segments": 1064 + days = _parse_day_spec(args.day, Path(get_journal())) 1065 + export_segments(base_url, args.key, days, args.dry_run) 1066 + return 1067 + if args.only is not None: 924 1068 print(f"Export of '{args.only}' is not yet implemented") 925 1069 sys.exit(0) 926 1070 1071 + # Full export: all areas in dependency order 927 1072 days = _parse_day_spec(args.day, Path(get_journal())) 928 - export_segments(base_url, args.key, days, args.dry_run) 929 - if args.only is None: 930 - export_entities(base_url, args.key, args.dry_run) 931 - export_facets(base_url, args.key, args.dry_run) 932 - export_imports(base_url, args.key, args.dry_run) 933 - export_config(base_url, args.key, args.dry_run) 1073 + session = requests.Session() 1074 + session.headers["Authorization"] = f"Bearer {args.key}" 1075 + 1076 + try: 1077 + _query_manifest(session, base_url, args.key) 1078 + except requests.ConnectionError: 1079 + print(f"Connection failed: could not reach {base_url}") 1080 + session.close() 1081 + sys.exit(1) 1082 + except ValueError as e: 1083 + print(str(e)) 1084 + session.close() 1085 + sys.exit(1) 1086 + 1087 + areas = [ 1088 + ( 1089 + "segments", 1090 + lambda: export_segments( 1091 + base_url, args.key, days, args.dry_run, session=session 1092 + ), 1093 + ), 1094 + ( 1095 + "imports", 1096 + lambda: export_imports(base_url, args.key, args.dry_run, session=session), 1097 + ), 1098 + ( 1099 + "entities", 1100 + lambda: export_entities(base_url, args.key, args.dry_run, session=session), 1101 + ), 1102 + ( 1103 + "facets", 1104 + lambda: export_facets(base_url, args.key, args.dry_run, session=session), 1105 + ), 1106 + ( 1107 + "config", 1108 + lambda: export_config(base_url, args.key, args.dry_run, session=session), 1109 + ), 1110 + ] 1111 + 1112 + results: list[ExportResult] = [] 1113 + for area_name, export_fn in areas: 1114 + try: 1115 + area_result = export_fn() 1116 + results.append(area_result) 1117 + except Exception: 1118 + logger.exception("Export failed for %s", area_name) 1119 + error_result = ExportResult( 1120 + area=area_name, error=f"Exception during {area_name} export" 1121 + ) 1122 + results.append(error_result) 1123 + if area_name == "entities": 1124 + print( 1125 + " Warning: entity export failed — facet entity mapping may be incomplete" 1126 + ) 1127 + 1128 + session.close() 1129 + 1130 + print("\n--- Export Summary ---") 1131 + any_failed = False 1132 + for area_result in results: 1133 + if area_result.error: 1134 + print(f" {area_result.area}: FAILED ({area_result.error})") 1135 + any_failed = True 1136 + continue 1137 + 1138 + parts = [] 1139 + if area_result.sent: 1140 + parts.append(f"{area_result.sent} sent") 1141 + if area_result.skipped: 1142 + parts.append(f"{area_result.skipped} skipped") 1143 + if area_result.staged: 1144 + parts.append(f"{area_result.staged} staged") 1145 + if area_result.failed: 1146 + parts.append(f"{area_result.failed} failed") 1147 + any_failed = True 1148 + if area_result.errors: 1149 + parts.append(f"{len(area_result.errors)} error(s)") 1150 + if not parts: 1151 + parts.append("nothing to send") 1152 + print(f" {area_result.area}: {', '.join(parts)}") 1153 + 1154 + if any_failed: 1155 + sys.exit(1)
+591
tests/test_export_integration.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import json 7 + import os 8 + from argparse import Namespace 9 + from importlib import import_module 10 + from pathlib import Path 11 + from urllib.parse import urlparse 12 + 13 + import pytest 14 + from flask import Flask 15 + 16 + import convey.state 17 + import think.utils 18 + from observe.export import ( 19 + ExportResult, 20 + export_config, 21 + export_entities, 22 + export_facets, 23 + export_imports, 24 + export_segments, 25 + main, 26 + ) 27 + from think.entities.journal import clear_journal_entity_cache, save_journal_entity 28 + 29 + journal_sources = import_module("apps.import.journal_sources") 30 + import_routes = import_module("apps.import.routes") 31 + 32 + create_state_directory = journal_sources.create_state_directory 33 + generate_key = journal_sources.generate_key 34 + get_state_directory = journal_sources.get_state_directory 35 + save_journal_source = journal_sources.save_journal_source 36 + import_bp = import_routes.import_bp 37 + 38 + 39 + def _set_active_journal(journal: Path) -> None: 40 + os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(journal) 41 + think.utils._journal_path_cache = None 42 + clear_journal_entity_cache() 43 + 44 + 45 + def _extract_path(url: str) -> str: 46 + """Extract path from full URL.""" 47 + return urlparse(url).path 48 + 49 + 50 + class _FlaskResponse: 51 + """Wrap Flask test responses to match the requests.Response interface we use.""" 52 + 53 + def __init__(self, flask_response): 54 + self._resp = flask_response 55 + self.status_code = flask_response.status_code 56 + self.text = flask_response.get_data(as_text=True) 57 + 58 + def json(self): 59 + return self._resp.get_json() 60 + 61 + 62 + class _FlaskSessionAdapter: 63 + """Wrap a Flask test client to behave like requests.Session for export functions.""" 64 + 65 + def __init__(self, client, *, source_journal: Path, target_journal: Path): 66 + self.client = client 67 + self.source_journal = source_journal 68 + self.target_journal = target_journal 69 + self.headers: dict[str, str] = {} 70 + 71 + def get(self, url, **kwargs): 72 + del kwargs 73 + _set_active_journal(self.target_journal) 74 + try: 75 + resp = self.client.get(_extract_path(url), headers=self.headers) 76 + finally: 77 + _set_active_journal(self.source_journal) 78 + return _FlaskResponse(resp) 79 + 80 + def post(self, url, **kwargs): 81 + path = _extract_path(url) 82 + headers = {**self.headers} 83 + _set_active_journal(self.target_journal) 84 + try: 85 + if "files" in kwargs: 86 + data = {} 87 + if "data" in kwargs: 88 + data.update(kwargs["data"]) 89 + for field_name, file_tuple in kwargs["files"]: 90 + if isinstance(file_tuple, tuple): 91 + filename, file_obj = file_tuple[0], file_tuple[1] 92 + value = (file_obj, filename) 93 + else: 94 + value = file_tuple 95 + if field_name in data: 96 + existing = data[field_name] 97 + if isinstance(existing, list): 98 + existing.append(value) 99 + else: 100 + data[field_name] = [existing, value] 101 + else: 102 + data[field_name] = value 103 + resp = self.client.post( 104 + path, 105 + data=data, 106 + content_type="multipart/form-data", 107 + headers=headers, 108 + ) 109 + elif "json" in kwargs: 110 + resp = self.client.post(path, json=kwargs["json"], headers=headers) 111 + else: 112 + resp = self.client.post(path, headers=headers) 113 + finally: 114 + _set_active_journal(self.source_journal) 115 + return _FlaskResponse(resp) 116 + 117 + def close(self): 118 + pass 119 + 120 + 121 + @pytest.fixture 122 + def export_integration_env(tmp_path, monkeypatch): 123 + """Set up source journal + target Flask app for integration testing.""" 124 + previous_override = os.environ.get("_SOLSTONE_JOURNAL_OVERRIDE") 125 + source_journal = tmp_path / "source" 126 + source_journal.mkdir() 127 + _set_active_journal(source_journal) 128 + 129 + target_journal = tmp_path / "target" 130 + target_journal.mkdir() 131 + monkeypatch.setattr( 132 + convey.state, "journal_root", str(target_journal), raising=False 133 + ) 134 + (target_journal / "apps" / "import" / "journal_sources").mkdir(parents=True) 135 + 136 + key = generate_key() 137 + source = { 138 + "name": "integration-test", 139 + "key": key, 140 + "created_at": 1000, 141 + "enabled": True, 142 + "revoked": False, 143 + "revoked_at": None, 144 + "stats": { 145 + "segments_received": 0, 146 + "entities_received": 0, 147 + "facets_received": 0, 148 + "imports_received": 0, 149 + "config_received": 0, 150 + }, 151 + } 152 + save_journal_source(source) 153 + key_prefix = key[:8] 154 + create_state_directory(target_journal, key_prefix) 155 + 156 + app = Flask(__name__) 157 + app.config["TESTING"] = True 158 + app.register_blueprint(import_bp) 159 + 160 + client = app.test_client() 161 + adapter = _FlaskSessionAdapter( 162 + client, source_journal=source_journal, target_journal=target_journal 163 + ) 164 + adapter.headers["Authorization"] = f"Bearer {key}" 165 + 166 + yield { 167 + "source": source_journal, 168 + "target": target_journal, 169 + "key": key, 170 + "key_prefix": key_prefix, 171 + "client": client, 172 + "adapter": adapter, 173 + "base_url": "http://localhost:5000", 174 + } 175 + 176 + if previous_override is None: 177 + os.environ.pop("_SOLSTONE_JOURNAL_OVERRIDE", None) 178 + else: 179 + os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = previous_override 180 + think.utils._journal_path_cache = None 181 + clear_journal_entity_cache() 182 + 183 + 184 + def _write_bytes(path: Path, content: bytes) -> None: 185 + path.parent.mkdir(parents=True, exist_ok=True) 186 + path.write_bytes(content) 187 + 188 + 189 + def _write_json(path: Path, data: dict) -> None: 190 + path.parent.mkdir(parents=True, exist_ok=True) 191 + path.write_text( 192 + json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8" 193 + ) 194 + 195 + 196 + def _write_jsonl(path: Path, items: list[dict]) -> None: 197 + path.parent.mkdir(parents=True, exist_ok=True) 198 + path.write_text( 199 + "".join(json.dumps(item, ensure_ascii=False) + "\n" for item in items), 200 + encoding="utf-8", 201 + ) 202 + 203 + 204 + def _load_json(path: Path) -> dict: 205 + return json.loads(path.read_text(encoding="utf-8")) 206 + 207 + 208 + def _setup_segments(journal_root: Path, *, day: str = "20260413") -> list[str]: 209 + segment_dir = journal_root / day / "laptop" / "143022_300" 210 + _write_bytes(segment_dir / "audio.flac", b"audio-data") 211 + _write_bytes(segment_dir / "transcript.jsonl", b'{"text":"hello"}\n') 212 + return [day] 213 + 214 + 215 + def _setup_entities(journal_root: Path) -> None: 216 + _write_json( 217 + journal_root / "entities" / "source_entity" / "entity.json", 218 + { 219 + "id": "source_entity", 220 + "name": "Source Entity", 221 + "type": "Person", 222 + "created_at": 1000, 223 + }, 224 + ) 225 + 226 + 227 + def _setup_facet_with_entity( 228 + journal_root: Path, *, entity_id: str = "source_entity" 229 + ) -> None: 230 + facet_root = journal_root / "facets" / "work" 231 + _write_json(facet_root / "facet.json", {"title": "Work"}) 232 + _write_json( 233 + facet_root / "entities" / entity_id / "entity.json", 234 + {"description": "Relationship"}, 235 + ) 236 + _write_jsonl( 237 + facet_root / "entities" / entity_id / "observations.jsonl", 238 + [{"content": "Observed", "observed_at": 1}], 239 + ) 240 + _write_jsonl( 241 + facet_root / "entities" / "20260413.jsonl", 242 + [{"id": entity_id, "name": "Source Entity", "type": "Person"}], 243 + ) 244 + _write_jsonl( 245 + facet_root / "todos" / "20260413.jsonl", 246 + [{"text": "Follow up", "created_at": 1}], 247 + ) 248 + 249 + 250 + def _setup_simple_facet(journal_root: Path) -> None: 251 + facet_root = journal_root / "facets" / "personal" 252 + _write_json(facet_root / "facet.json", {"title": "Personal"}) 253 + (facet_root / "news").mkdir(parents=True, exist_ok=True) 254 + (facet_root / "news" / "20260413.md").write_text("# News\n", encoding="utf-8") 255 + 256 + 257 + def _setup_imports(journal_root: Path) -> None: 258 + import_root = journal_root / "imports" / "20260101_090000" 259 + _write_json( 260 + import_root / "import.json", {"original_filename": "cal.zip", "file_size": 100} 261 + ) 262 + _write_json( 263 + import_root / "imported.json", 264 + {"processed_timestamp": "20260101_090000", "total_files_created": 1}, 265 + ) 266 + _write_jsonl( 267 + import_root / "content_manifest.jsonl", 268 + [{"id": "event-0", "title": "Test Event"}], 269 + ) 270 + 271 + 272 + def _setup_config(journal_root: Path) -> None: 273 + _write_json( 274 + journal_root / "config" / "journal.json", 275 + { 276 + "identity": {"name": "Remote User"}, 277 + "retention": {"days": 30}, 278 + "convey": {"trust_localhost": True}, 279 + }, 280 + ) 281 + 282 + 283 + def test_full_export_cycle(export_integration_env): 284 + env = export_integration_env 285 + _setup_segments(env["source"]) 286 + _setup_imports(env["source"]) 287 + _setup_entities(env["source"]) 288 + _setup_facet_with_entity(env["source"]) 289 + _setup_config(env["source"]) 290 + 291 + segment_result = export_segments( 292 + env["base_url"], env["key"], ["20260413"], False, session=env["adapter"] 293 + ) 294 + import_result = export_imports( 295 + env["base_url"], env["key"], False, session=env["adapter"] 296 + ) 297 + entity_result = export_entities( 298 + env["base_url"], env["key"], False, session=env["adapter"] 299 + ) 300 + facet_result = export_facets( 301 + env["base_url"], env["key"], False, session=env["adapter"] 302 + ) 303 + config_result = export_config( 304 + env["base_url"], env["key"], False, session=env["adapter"] 305 + ) 306 + 307 + assert segment_result == ExportResult(area="segments", sent=1) 308 + assert import_result == ExportResult(area="imports", sent=1) 309 + assert entity_result == ExportResult(area="entities", sent=1) 310 + assert facet_result == ExportResult(area="facets", sent=1) 311 + assert config_result == ExportResult(area="config", staged=1) 312 + 313 + assert ( 314 + env["target"] / "20260413" / "laptop" / "143022_300" / "audio.flac" 315 + ).exists() 316 + assert (env["target"] / "entities" / "source_entity" / "entity.json").exists() 317 + assert (env["target"] / "imports" / "20260101_090000" / "import.json").exists() 318 + assert (env["target"] / "facets" / "work" / "facet.json").exists() 319 + assert ( 320 + get_state_directory(env["key_prefix"]) / "config" / "source_config.json" 321 + ).exists() 322 + 323 + 324 + def test_idempotent_reexport(export_integration_env): 325 + env = export_integration_env 326 + _setup_segments(env["source"]) 327 + _setup_imports(env["source"]) 328 + _setup_entities(env["source"]) 329 + _setup_facet_with_entity(env["source"]) 330 + _setup_config(env["source"]) 331 + 332 + export_segments( 333 + env["base_url"], env["key"], ["20260413"], False, session=env["adapter"] 334 + ) 335 + export_imports(env["base_url"], env["key"], False, session=env["adapter"]) 336 + export_entities(env["base_url"], env["key"], False, session=env["adapter"]) 337 + export_facets(env["base_url"], env["key"], False, session=env["adapter"]) 338 + export_config(env["base_url"], env["key"], False, session=env["adapter"]) 339 + 340 + second_segments = export_segments( 341 + env["base_url"], env["key"], ["20260413"], False, session=env["adapter"] 342 + ) 343 + second_imports = export_imports( 344 + env["base_url"], env["key"], False, session=env["adapter"] 345 + ) 346 + second_entities = export_entities( 347 + env["base_url"], env["key"], False, session=env["adapter"] 348 + ) 349 + second_facets = export_facets( 350 + env["base_url"], env["key"], False, session=env["adapter"] 351 + ) 352 + second_config = export_config( 353 + env["base_url"], env["key"], False, session=env["adapter"] 354 + ) 355 + 356 + assert second_segments.sent == 0 and second_segments.skipped == 1 357 + assert second_imports.sent == 0 and second_imports.skipped == 1 358 + assert second_entities.sent == 0 and second_entities.skipped == 1 359 + assert second_facets.sent == 0 and second_facets.skipped == 1 360 + assert second_config.sent == 0 and second_config.skipped == 1 361 + 362 + 363 + def test_partial_only_segments(export_integration_env): 364 + env = export_integration_env 365 + _setup_segments(env["source"]) 366 + 367 + result = export_segments( 368 + env["base_url"], env["key"], ["20260413"], False, session=env["adapter"] 369 + ) 370 + 371 + assert result.sent == 1 372 + assert ( 373 + env["target"] / "20260413" / "laptop" / "143022_300" / "transcript.jsonl" 374 + ).exists() 375 + 376 + 377 + def test_partial_only_entities(export_integration_env): 378 + env = export_integration_env 379 + _setup_entities(env["source"]) 380 + 381 + result = export_entities(env["base_url"], env["key"], False, session=env["adapter"]) 382 + 383 + assert result.sent == 1 384 + assert (env["target"] / "entities" / "source_entity" / "entity.json").exists() 385 + 386 + 387 + def test_partial_only_facets(export_integration_env): 388 + env = export_integration_env 389 + _setup_simple_facet(env["source"]) 390 + 391 + result = export_facets(env["base_url"], env["key"], False, session=env["adapter"]) 392 + 393 + assert result.sent == 1 394 + assert (env["target"] / "facets" / "personal" / "news" / "20260413.md").exists() 395 + 396 + 397 + def test_partial_only_imports(export_integration_env): 398 + env = export_integration_env 399 + _setup_imports(env["source"]) 400 + 401 + result = export_imports(env["base_url"], env["key"], False, session=env["adapter"]) 402 + 403 + assert result.sent == 1 404 + assert (env["target"] / "imports" / "20260101_090000" / "imported.json").exists() 405 + 406 + 407 + def test_partial_only_config(export_integration_env): 408 + env = export_integration_env 409 + _setup_config(env["source"]) 410 + 411 + result = export_config(env["base_url"], env["key"], False, session=env["adapter"]) 412 + 413 + assert result.staged == 1 414 + assert (get_state_directory(env["key_prefix"]) / "config" / "diff.json").exists() 415 + 416 + 417 + def test_staged_items_entity_collision(export_integration_env): 418 + env = export_integration_env 419 + _write_json( 420 + env["source"] / "entities" / "test" / "entity.json", 421 + {"id": "test", "name": "Completely Different Name", "type": "Person"}, 422 + ) 423 + 424 + _set_active_journal(env["target"]) 425 + save_journal_entity({"id": "test", "name": "Test Entity", "type": "Tool"}) 426 + _set_active_journal(env["source"]) 427 + 428 + result = export_entities(env["base_url"], env["key"], False, session=env["adapter"]) 429 + 430 + staged_path = ( 431 + get_state_directory(env["key_prefix"]) / "entities" / "staged" / "test.json" 432 + ) 433 + assert result.sent == 0 434 + assert result.staged == 1 435 + assert staged_path.exists() 436 + assert _load_json(staged_path)["reason"] == "id_collision" 437 + 438 + 439 + def test_config_always_staged(export_integration_env): 440 + env = export_integration_env 441 + _setup_config(env["source"]) 442 + _write_json( 443 + env["target"] / "config" / "journal.json", {"identity": {"name": "Local User"}} 444 + ) 445 + 446 + result = export_config(env["base_url"], env["key"], False, session=env["adapter"]) 447 + 448 + assert result.staged == 1 449 + assert result.skipped == 0 450 + 451 + 452 + def test_processing_order_entities_before_facets(export_integration_env): 453 + env = export_integration_env 454 + _write_json( 455 + env["source"] / "entities" / "source_entity" / "entity.json", 456 + {"id": "source_entity", "name": "Alice Johnson", "type": "Person"}, 457 + ) 458 + _setup_facet_with_entity(env["source"], entity_id="source_entity") 459 + 460 + _set_active_journal(env["target"]) 461 + save_journal_entity( 462 + {"id": "target_entity", "name": "Alice Johnson", "type": "Person"} 463 + ) 464 + _set_active_journal(env["source"]) 465 + 466 + entity_result = export_entities( 467 + env["base_url"], env["key"], False, session=env["adapter"] 468 + ) 469 + facet_result = export_facets( 470 + env["base_url"], env["key"], False, session=env["adapter"] 471 + ) 472 + 473 + entity_state = _load_json( 474 + get_state_directory(env["key_prefix"]) / "entities" / "state.json" 475 + ) 476 + detected_entities = ( 477 + env["target"] / "facets" / "work" / "entities" / "20260413.jsonl" 478 + ).read_text(encoding="utf-8") 479 + 480 + assert entity_result.sent == 1 481 + assert facet_result.sent == 1 482 + assert entity_state["id_map"]["source_entity"] == "target_entity" 483 + assert ( 484 + env["target"] / "facets" / "work" / "entities" / "target_entity" / "entity.json" 485 + ).exists() 486 + assert not ( 487 + env["target"] / "facets" / "work" / "entities" / "source_entity" 488 + ).exists() 489 + assert '"id": "target_entity"' in detected_entities 490 + 491 + 492 + def test_error_resilience(monkeypatch, capsys): 493 + class _DummySession: 494 + def __init__(self): 495 + self.headers = {} 496 + 497 + def close(self): 498 + pass 499 + 500 + calls: list[str] = [] 501 + 502 + monkeypatch.setattr( 503 + "observe.export.setup_cli", 504 + lambda parser: Namespace( 505 + to="localhost:5000", 506 + key="test-key-123456", 507 + only=None, 508 + dry_run=False, 509 + day=None, 510 + ), 511 + ) 512 + monkeypatch.setattr( 513 + "observe.export._parse_day_spec", lambda day, root: ["20260413"] 514 + ) 515 + monkeypatch.setattr( 516 + "observe.export._query_manifest", lambda session, base_url, key: {} 517 + ) 518 + monkeypatch.setattr("observe.export.requests.Session", _DummySession) 519 + monkeypatch.setattr( 520 + "observe.export.export_segments", 521 + lambda base_url, key, days, dry_run, session=None: ( 522 + calls.append("segments") or ExportResult(area="segments", sent=1) 523 + ), 524 + ) 525 + monkeypatch.setattr( 526 + "observe.export.export_imports", 527 + lambda base_url, key, dry_run, session=None: ( 528 + calls.append("imports") or ExportResult(area="imports", sent=1) 529 + ), 530 + ) 531 + 532 + def _explode(*args, **kwargs): 533 + calls.append("entities") 534 + raise RuntimeError("boom") 535 + 536 + monkeypatch.setattr("observe.export.export_entities", _explode) 537 + monkeypatch.setattr( 538 + "observe.export.export_facets", 539 + lambda base_url, key, dry_run, session=None: ( 540 + calls.append("facets") or ExportResult(area="facets", sent=1) 541 + ), 542 + ) 543 + monkeypatch.setattr( 544 + "observe.export.export_config", 545 + lambda base_url, key, dry_run, session=None: ( 546 + calls.append("config") or ExportResult(area="config", staged=1) 547 + ), 548 + ) 549 + 550 + with pytest.raises(SystemExit, match="1"): 551 + main() 552 + 553 + output = capsys.readouterr().out 554 + assert calls == ["segments", "imports", "entities", "facets", "config"] 555 + assert "Warning: entity export failed" in output 556 + assert "entities: FAILED" in output 557 + assert "segments: 1 sent" in output 558 + assert "facets: 1 sent" in output 559 + 560 + 561 + def test_dry_run_full(export_integration_env): 562 + env = export_integration_env 563 + _setup_segments(env["source"]) 564 + _setup_imports(env["source"]) 565 + _setup_entities(env["source"]) 566 + _setup_simple_facet(env["source"]) 567 + _setup_config(env["source"]) 568 + 569 + segment_result = export_segments( 570 + env["base_url"], env["key"], ["20260413"], True, session=env["adapter"] 571 + ) 572 + import_result = export_imports( 573 + env["base_url"], env["key"], True, session=env["adapter"] 574 + ) 575 + entity_result = export_entities( 576 + env["base_url"], env["key"], True, session=env["adapter"] 577 + ) 578 + facet_result = export_facets( 579 + env["base_url"], env["key"], True, session=env["adapter"] 580 + ) 581 + config_result = export_config( 582 + env["base_url"], env["key"], True, session=env["adapter"] 583 + ) 584 + 585 + assert segment_result.sent == 1 586 + assert import_result.sent == 1 587 + assert entity_result.sent == 1 588 + assert facet_result.sent == 1 589 + assert config_result.staged == 1 590 + assert not (env["target"] / "20260413").exists() 591 + assert not (env["target"] / "entities" / "source_entity" / "entity.json").exists()