personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'hopper-ffzagoau-transfer-send-scope'

+562 -4
+296 -2
observe/transfer.py
··· 3 3 4 4 """Transfer observed segments between solstone instances. 5 5 6 - Provides export and import commands for creating and unpacking day archives 7 - containing fully-processed observation segments. 6 + Provides export, import, and send commands for transferring fully-processed 7 + observation segments between solstone instances. 8 8 9 9 Usage: 10 10 sol transfer export --day YYYYMMDD [--output PATH] 11 11 sol transfer import --archive PATH [--dry-run] 12 + sol transfer send --to HOST --key KEY [--day YYYYMMDD] [--dry-run] 12 13 """ 13 14 14 15 from __future__ import annotations ··· 18 19 import logging 19 20 import os 20 21 import platform 22 + import re 21 23 import tarfile 22 24 import time 25 + from datetime import datetime, timedelta 23 26 from pathlib import Path 24 27 from typing import Any 28 + 29 + import requests 25 30 26 31 from think.callosum import callosum_send 27 32 from think.utils import get_journal, iter_segments, now_ms, setup_cli ··· 32 37 33 38 # Archive manifest version 34 39 MANIFEST_VERSION = 1 40 + RETRY_BACKOFF = [1, 5, 15] 41 + UPLOAD_TIMEOUT = 300 35 42 36 43 37 44 def _get_hostname() -> str: ··· 361 368 } 362 369 363 370 371 + def _normalize_url(to: str) -> str: 372 + """Normalize remote URL for observer endpoints.""" 373 + to = to.rstrip("/") 374 + if to.startswith(("http://", "https://")): 375 + return to 376 + return f"https://{to}" 377 + 378 + 379 + def _parse_day_spec(day_spec: str | None, journal_root: Path) -> list[str]: 380 + """Parse a single day, day range, or default to all journal days.""" 381 + if day_spec is None: 382 + return sorted( 383 + [ 384 + day_dir.name 385 + for day_dir in journal_root.iterdir() 386 + if day_dir.is_dir() and re.match(r"^\d{8}$", day_dir.name) 387 + ] 388 + ) 389 + 390 + if re.match(r"^\d{8}$", day_spec): 391 + return [day_spec] 392 + 393 + if re.match(r"^\d{8}-\d{8}$", day_spec): 394 + start_str, end_str = day_spec.split("-", 1) 395 + start = datetime.strptime(start_str, "%Y%m%d") 396 + end = datetime.strptime(end_str, "%Y%m%d") 397 + if start > end: 398 + raise ValueError("Invalid day format: start day must be on or before end day") 399 + 400 + days = [] 401 + current = start 402 + while current <= end: 403 + days.append(current.strftime("%Y%m%d")) 404 + current += timedelta(days=1) 405 + return days 406 + 407 + raise ValueError("Invalid day format: use YYYYMMDD or YYYYMMDD-YYYYMMDD") 408 + 409 + 410 + def _query_remote_segments( 411 + session: requests.Session, 412 + base_url: str, 413 + day: str, 414 + ) -> dict[str, dict[str, str]]: 415 + """Query remote observer for existing segments on a day.""" 416 + url = f"{base_url}/app/observer/ingest/segments/{day}" 417 + try: 418 + response = session.get(url, timeout=UPLOAD_TIMEOUT) 419 + if response.status_code == 200: 420 + data = response.json() 421 + return { 422 + entry["key"]: { 423 + file_info["name"]: file_info["sha256"] 424 + for file_info in entry.get("files", []) 425 + } 426 + for entry in data 427 + if entry.get("key") 428 + } 429 + if response.status_code == 401: 430 + raise ValueError("Authentication failed: invalid or missing API key") 431 + if response.status_code == 403: 432 + raise ValueError("Authentication failed: observer revoked or disabled") 433 + logger.warning( 434 + f"Remote segment query failed for {day}: " 435 + f"{response.status_code} {response.text}" 436 + ) 437 + except requests.RequestException as e: 438 + logger.warning(f"Remote segment query failed for {day}: {e}") 439 + 440 + return {} 441 + 442 + 443 + def _upload_segment( 444 + session: requests.Session, 445 + base_url: str, 446 + day: str, 447 + segment_key: str, 448 + stream_name: str, 449 + segment_path: Path, 450 + ) -> tuple[str, int]: 451 + """Upload a single segment to the remote observer.""" 452 + files = [ 453 + file_path 454 + for file_path in sorted(segment_path.iterdir()) 455 + if file_path.is_file() and file_path.name != "stream.json" 456 + ] 457 + if not files: 458 + return ("skip", 0) 459 + 460 + url = f"{base_url}/app/observer/ingest" 461 + data = { 462 + "day": day, 463 + "segment": segment_key, 464 + "meta": json.dumps({"stream": stream_name}), 465 + } 466 + 467 + for attempt, delay in enumerate(RETRY_BACKOFF): 468 + file_handles = [] 469 + files_data = [] 470 + try: 471 + for file_path in files: 472 + fh = open(file_path, "rb") 473 + file_handles.append(fh) 474 + files_data.append( 475 + ("files", (file_path.name, fh, "application/octet-stream")) 476 + ) 477 + 478 + response = session.post( 479 + url, 480 + data=data, 481 + files=files_data, 482 + timeout=UPLOAD_TIMEOUT, 483 + ) 484 + if response.status_code == 200: 485 + status = response.json().get("status") 486 + if status == "duplicate": 487 + return ("duplicate", 0) 488 + return ("sent", response.json().get("bytes", 0)) 489 + if response.status_code == 401: 490 + return ("auth_invalid", 0) 491 + if response.status_code == 403: 492 + return ("auth_revoked", 0) 493 + if 500 <= response.status_code <= 599: 494 + logger.warning( 495 + f"Upload attempt {attempt + 1} failed for " 496 + f"{day}/{stream_name}/{segment_key}: " 497 + f"{response.status_code} {response.text}" 498 + ) 499 + else: 500 + logger.warning( 501 + f"Upload rejected for {day}/{stream_name}/{segment_key}: " 502 + f"{response.status_code} {response.text}" 503 + ) 504 + return ("error", 0) 505 + except (requests.RequestException, OSError) as e: 506 + logger.warning( 507 + f"Upload attempt {attempt + 1} failed for " 508 + f"{day}/{stream_name}/{segment_key}: {e}" 509 + ) 510 + finally: 511 + for fh in file_handles: 512 + try: 513 + fh.close() 514 + except Exception: 515 + pass 516 + 517 + if attempt < len(RETRY_BACKOFF) - 1: 518 + time.sleep(delay) 519 + 520 + return ("error", 0) 521 + 522 + 523 + def send_segments(base_url: str, key: str, days: list[str], dry_run: bool) -> None: 524 + """Send local journal segments to a remote observer.""" 525 + session = requests.Session() 526 + session.headers["Authorization"] = f"Bearer {key}" 527 + 528 + sent = 0 529 + skipped = 0 530 + failed = 0 531 + bytes_total = 0 532 + duplicates = 0 533 + 534 + try: 535 + journal = get_journal() 536 + for day in days: 537 + day_dir = Path(journal) / day 538 + if not day_dir.exists(): 539 + logger.debug(f"Day directory not found: {day}") 540 + continue 541 + 542 + segment_entries = iter_segments(day_dir) 543 + if not segment_entries: 544 + continue 545 + 546 + try: 547 + remote_manifest = _query_remote_segments(session, base_url, day) 548 + except ValueError as e: 549 + print(str(e)) 550 + return 551 + 552 + for stream_name, seg_key, seg_path in segment_entries: 553 + manifest = _build_segment_manifest(seg_path) 554 + local_files = { 555 + file_info["name"]: file_info["sha256"] 556 + for file_info in manifest["files"] 557 + if file_info["name"] != "stream.json" 558 + } 559 + remote_files = remote_manifest.get(seg_key, {}) 560 + if all( 561 + remote_files.get(name) == sha256 562 + for name, sha256 in local_files.items() 563 + ): 564 + logger.info(f" [skip] {day}/{stream_name}/{seg_key}") 565 + skipped += 1 566 + continue 567 + 568 + if dry_run: 569 + logger.info(f" [would send] {day}/{stream_name}/{seg_key}") 570 + sent += 1 571 + continue 572 + 573 + status, bytes_sent = _upload_segment( 574 + session, 575 + base_url, 576 + day, 577 + seg_key, 578 + stream_name, 579 + seg_path, 580 + ) 581 + if status == "sent": 582 + logger.info( 583 + f" [sent] {day}/{stream_name}/{seg_key} ({bytes_sent} bytes)" 584 + ) 585 + sent += 1 586 + bytes_total += bytes_sent 587 + elif status == "duplicate": 588 + logger.info(f" [skip] {day}/{stream_name}/{seg_key}") 589 + skipped += 1 590 + duplicates += 1 591 + elif status == "skip": 592 + logger.info(f" [skip] {day}/{stream_name}/{seg_key}") 593 + skipped += 1 594 + elif status == "auth_invalid": 595 + print("Authentication failed: invalid or missing API key") 596 + return 597 + elif status == "auth_revoked": 598 + print("Authentication failed: observer revoked or disabled") 599 + return 600 + else: 601 + logger.info(f" [FAILED] {day}/{stream_name}/{seg_key}") 602 + failed += 1 603 + finally: 604 + session.close() 605 + 606 + total = sent + skipped + failed 607 + if total == 0: 608 + print("No segments found to transfer") 609 + return 610 + 611 + if dry_run: 612 + print(f"\nDry run: would send {sent}, skip {skipped}") 613 + return 614 + 615 + print( 616 + f"\nTransfer complete: {sent} sent, {skipped} skipped, " 617 + f"{failed} failed, {bytes_total} bytes transferred" 618 + ) 619 + if duplicates > 0: 620 + print(f" ({duplicates} duplicate segments already on remote)") 621 + if sent == 0 and skipped > 0 and failed == 0: 622 + print("Nothing to send - remote is up to date") 623 + 624 + 364 625 def main() -> None: 365 626 """CLI entry point.""" 366 627 parser = argparse.ArgumentParser( ··· 399 660 help="Validate archive without extracting", 400 661 ) 401 662 663 + # Send subcommand 664 + send_parser = subparsers.add_parser( 665 + "send", help="Send segments to remote observer" 666 + ) 667 + send_parser.add_argument( 668 + "--to", 669 + required=True, 670 + help="Remote observer URL (host:port or https://...)", 671 + ) 672 + send_parser.add_argument( 673 + "--key", 674 + required=True, 675 + help="Observer API key for authentication", 676 + ) 677 + send_parser.add_argument( 678 + "--day", 679 + help="Day or range (YYYYMMDD or YYYYMMDD-YYYYMMDD, default: all days)", 680 + ) 681 + send_parser.add_argument( 682 + "--dry-run", 683 + action="store_true", 684 + help="Show what would be sent without uploading", 685 + ) 686 + 402 687 args = setup_cli(parser) 403 688 404 689 if args.command == "export": ··· 431 716 print(f" Would deconflict: {len(v['deconflicted'])} segments") 432 717 except ValueError as e: 433 718 parser.error(str(e)) 719 + 720 + elif args.command == "send": 721 + base_url = _normalize_url(args.to) 722 + journal = get_journal() 723 + try: 724 + days = _parse_day_spec(args.day, Path(journal)) 725 + except ValueError as e: 726 + parser.error(str(e)) 727 + send_segments(base_url, args.key, days, args.dry_run)
+266 -2
tests/test_transfer.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """Tests for observe/transfer.py - day archive export and import.""" 4 + """Tests for observe/transfer.py - day archive export, import, and send.""" 5 5 6 6 import json 7 7 import tarfile 8 8 from pathlib import Path 9 - from unittest.mock import patch 9 + from unittest.mock import MagicMock, patch 10 10 11 11 import pytest 12 + import requests 12 13 13 14 14 15 class TestSegmentDeconfliction: ··· 432 433 433 434 with pytest.raises(ValueError, match="missing required fields"): 434 435 _read_manifest(archive_path) 436 + 437 + 438 + class TestTransferSend: 439 + """Tests for transfer send functionality.""" 440 + 441 + def _setup_journal(self, tmp_path, *, include_stream_json: bool = False) -> Path: 442 + journal = tmp_path / "journal" 443 + day_dir = journal / "20250103" / "default" / "120000_300" 444 + day_dir.mkdir(parents=True) 445 + (day_dir / "audio.flac").write_bytes(b"audio data") 446 + (day_dir / "transcript.jsonl").write_text('{"text": "hello"}\n') 447 + if include_stream_json: 448 + (day_dir / "stream.json").write_text('{"stream": "default"}\n') 449 + return journal 450 + 451 + def _set_journal_override(self, monkeypatch, journal: Path) -> None: 452 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 453 + 454 + import think.utils 455 + 456 + think.utils._journal_path_cache = None 457 + 458 + def _make_session( 459 + self, 460 + *, 461 + get_status: int = 200, 462 + get_json: list | None = None, 463 + post_status: int = 200, 464 + post_json: dict | None = None, 465 + ) -> MagicMock: 466 + mock_session = MagicMock(spec=requests.Session) 467 + mock_session.headers = {} 468 + 469 + get_response = MagicMock() 470 + get_response.status_code = get_status 471 + get_response.json.return_value = get_json if get_json is not None else [] 472 + get_response.text = "GET error" 473 + mock_session.get.return_value = get_response 474 + 475 + post_response = MagicMock() 476 + post_response.status_code = post_status 477 + post_response.json.return_value = ( 478 + post_json if post_json is not None else {"status": "ok", "bytes": 100} 479 + ) 480 + post_response.text = "POST error" 481 + mock_session.post.return_value = post_response 482 + 483 + return mock_session 484 + 485 + def test_parse_day_spec_single(self, tmp_path): 486 + from observe.transfer import _parse_day_spec 487 + 488 + journal_root = tmp_path / "journal" 489 + journal_root.mkdir() 490 + 491 + assert _parse_day_spec("20250103", journal_root) == ["20250103"] 492 + 493 + def test_parse_day_spec_range(self, tmp_path): 494 + from observe.transfer import _parse_day_spec 495 + 496 + journal_root = tmp_path / "journal" 497 + journal_root.mkdir() 498 + 499 + assert _parse_day_spec("20250101-20250103", journal_root) == [ 500 + "20250101", 501 + "20250102", 502 + "20250103", 503 + ] 504 + 505 + def test_parse_day_spec_all_days(self, tmp_path): 506 + from observe.transfer import _parse_day_spec 507 + 508 + journal_root = tmp_path / "journal" 509 + journal_root.mkdir() 510 + (journal_root / "20250101").mkdir() 511 + (journal_root / "20250103").mkdir() 512 + (journal_root / "config").mkdir() 513 + (journal_root / "streams").mkdir() 514 + 515 + assert _parse_day_spec(None, journal_root) == ["20250101", "20250103"] 516 + 517 + def test_parse_day_spec_invalid(self, tmp_path): 518 + from observe.transfer import _parse_day_spec 519 + 520 + journal_root = tmp_path / "journal" 521 + journal_root.mkdir() 522 + 523 + with pytest.raises(ValueError, match="Invalid day format"): 524 + _parse_day_spec("invalid", journal_root) 525 + 526 + def test_normalize_url(self): 527 + from observe.transfer import _normalize_url 528 + 529 + assert _normalize_url("example.com") == "https://example.com" 530 + assert _normalize_url("example.com/") == "https://example.com" 531 + assert _normalize_url("https://example.com/") == "https://example.com" 532 + assert _normalize_url("http://example.com/api/") == "http://example.com/api" 533 + 534 + def test_send_dry_run(self, tmp_path, monkeypatch, capsys): 535 + from observe.transfer import send_segments 536 + 537 + journal = self._setup_journal(tmp_path) 538 + self._set_journal_override(monkeypatch, journal) 539 + 540 + mock_session = self._make_session(get_json=[]) 541 + 542 + with patch("observe.transfer.requests.Session", return_value=mock_session): 543 + send_segments("https://example.com", "test-key", ["20250103"], dry_run=True) 544 + 545 + assert mock_session.get.call_count == 1 546 + assert mock_session.get.call_args.args[0].endswith( 547 + "/app/observer/ingest/segments/20250103" 548 + ) 549 + assert mock_session.post.call_count == 0 550 + assert "Dry run: would send 1, skip 0" in capsys.readouterr().out 551 + 552 + def test_send_skips_matching(self, tmp_path, monkeypatch, capsys): 553 + from observe.transfer import send_segments 554 + from observe.utils import compute_file_sha256 555 + 556 + journal = self._setup_journal(tmp_path) 557 + self._set_journal_override(monkeypatch, journal) 558 + 559 + segment_dir = journal / "20250103" / "default" / "120000_300" 560 + remote_files = [ 561 + { 562 + "name": "audio.flac", 563 + "sha256": compute_file_sha256(segment_dir / "audio.flac"), 564 + }, 565 + { 566 + "name": "transcript.jsonl", 567 + "sha256": compute_file_sha256(segment_dir / "transcript.jsonl"), 568 + }, 569 + ] 570 + mock_session = self._make_session( 571 + get_json=[{"key": "120000_300", "observed": False, "files": remote_files}] 572 + ) 573 + 574 + with patch("observe.transfer.requests.Session", return_value=mock_session): 575 + send_segments("https://example.com", "test-key", ["20250103"], dry_run=False) 576 + 577 + assert mock_session.post.call_count == 0 578 + output = capsys.readouterr().out 579 + assert "Transfer complete: 0 sent, 1 skipped, 0 failed" in output 580 + assert "Nothing to send - remote is up to date" in output 581 + 582 + def test_send_uploads_new(self, tmp_path, monkeypatch, capsys): 583 + from observe.transfer import send_segments 584 + 585 + journal = self._setup_journal(tmp_path) 586 + self._set_journal_override(monkeypatch, journal) 587 + 588 + mock_session = self._make_session( 589 + get_json=[], 590 + post_json={"status": "ok", "bytes": 100}, 591 + ) 592 + 593 + with patch("observe.transfer.requests.Session", return_value=mock_session): 594 + send_segments("https://example.com", "test-key", ["20250103"], dry_run=False) 595 + 596 + assert mock_session.post.call_count == 1 597 + post_kwargs = mock_session.post.call_args.kwargs 598 + assert post_kwargs["data"]["day"] == "20250103" 599 + assert post_kwargs["data"]["segment"] == "120000_300" 600 + assert json.loads(post_kwargs["data"]["meta"]) == {"stream": "default"} 601 + # Auth is set on the session, not per-request 602 + assert mock_session.headers["Authorization"] == "Bearer test-key" 603 + assert "Transfer complete: 1 sent, 0 skipped, 0 failed, 100 bytes transferred" in ( 604 + capsys.readouterr().out 605 + ) 606 + 607 + def test_send_retry_on_5xx(self, tmp_path, monkeypatch, capsys): 608 + from observe.transfer import send_segments 609 + 610 + journal = self._setup_journal(tmp_path) 611 + self._set_journal_override(monkeypatch, journal) 612 + 613 + mock_session = self._make_session(get_json=[]) 614 + first = MagicMock(status_code=500, text="server error") 615 + second = MagicMock(status_code=500, text="server error") 616 + success = MagicMock(status_code=200) 617 + success.json.return_value = {"status": "ok", "bytes": 100} 618 + mock_session.post.side_effect = [first, second, success] 619 + 620 + with ( 621 + patch("observe.transfer.requests.Session", return_value=mock_session), 622 + patch("observe.transfer.time.sleep"), 623 + ): 624 + send_segments("https://example.com", "test-key", ["20250103"], dry_run=False) 625 + 626 + assert mock_session.post.call_count == 3 627 + assert "Transfer complete: 1 sent, 0 skipped, 0 failed, 100 bytes transferred" in ( 628 + capsys.readouterr().out 629 + ) 630 + 631 + def test_send_auth_error(self): 632 + from observe.transfer import _query_remote_segments 633 + 634 + mock_session = self._make_session(get_status=401) 635 + 636 + with pytest.raises(ValueError, match="Authentication failed"): 637 + _query_remote_segments( 638 + mock_session, 639 + "https://example.com", 640 + "20250103", 641 + ) 642 + 643 + def test_send_idempotent(self, tmp_path, monkeypatch, capsys): 644 + from observe.transfer import send_segments 645 + from observe.utils import compute_file_sha256 646 + 647 + journal = self._setup_journal(tmp_path) 648 + self._set_journal_override(monkeypatch, journal) 649 + 650 + segment_dir = journal / "20250103" / "default" / "120000_300" 651 + remote_files = [ 652 + { 653 + "name": "audio.flac", 654 + "sha256": compute_file_sha256(segment_dir / "audio.flac"), 655 + }, 656 + { 657 + "name": "transcript.jsonl", 658 + "sha256": compute_file_sha256(segment_dir / "transcript.jsonl"), 659 + }, 660 + ] 661 + first_get = MagicMock(status_code=200) 662 + first_get.json.return_value = [] 663 + first_get.text = "GET error" 664 + second_get = MagicMock(status_code=200) 665 + second_get.json.return_value = [ 666 + {"key": "120000_300", "observed": False, "files": remote_files} 667 + ] 668 + second_get.text = "GET error" 669 + 670 + mock_session = self._make_session() 671 + mock_session.get.side_effect = [first_get, second_get] 672 + 673 + with patch("observe.transfer.requests.Session", return_value=mock_session): 674 + send_segments("https://example.com", "test-key", ["20250103"], dry_run=False) 675 + send_segments("https://example.com", "test-key", ["20250103"], dry_run=False) 676 + 677 + assert mock_session.post.call_count == 1 678 + output = capsys.readouterr().out 679 + assert "Transfer complete: 1 sent, 0 skipped, 0 failed, 100 bytes transferred" in ( 680 + output 681 + ) 682 + assert "Transfer complete: 0 sent, 1 skipped, 0 failed, 0 bytes transferred" in output 683 + 684 + def test_send_excludes_stream_json(self, tmp_path, monkeypatch): 685 + from observe.transfer import send_segments 686 + 687 + journal = self._setup_journal(tmp_path, include_stream_json=True) 688 + self._set_journal_override(monkeypatch, journal) 689 + 690 + mock_session = self._make_session(get_json=[]) 691 + 692 + with patch("observe.transfer.requests.Session", return_value=mock_session): 693 + send_segments("https://example.com", "test-key", ["20250103"], dry_run=False) 694 + 695 + files_arg = mock_session.post.call_args.kwargs["files"] 696 + uploaded_names = [entry[1][0] for entry in files_arg] 697 + assert "stream.json" not in uploaded_names 698 + assert uploaded_names == ["audio.flac", "transcript.jsonl"]