personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'hopper-pyzctoyf-export-cli-segments'

+621
+297
observe/export.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Export journal data to a remote solstone instance. 5 + 6 + Usage: 7 + sol export --to HOST --key KEY [--only segments] [--dry-run] [--day YYYYMMDD] 8 + """ 9 + 10 + from __future__ import annotations 11 + 12 + import argparse 13 + import json 14 + import logging 15 + import sys 16 + import time 17 + from pathlib import Path 18 + from typing import Any 19 + 20 + import requests 21 + 22 + from observe.transfer import ( 23 + RETRY_BACKOFF, 24 + _build_segment_manifest, 25 + _normalize_url, 26 + _parse_day_spec, 27 + ) 28 + from think.utils import get_journal, iter_segments, setup_cli 29 + 30 + logger = logging.getLogger(__name__) 31 + 32 + UPLOAD_TIMEOUT = 300 33 + 34 + 35 + def _query_manifest(session: requests.Session, base_url: str, key: str) -> dict[str, Any]: 36 + key_prefix = key[:8] 37 + url = f"{base_url}/app/import/journal/{key_prefix}/manifest/segments" 38 + response = session.get(url, timeout=UPLOAD_TIMEOUT) 39 + if response.status_code == 401: 40 + raise ValueError("Authentication failed: invalid or missing API key") 41 + if response.status_code == 403: 42 + raise ValueError("Authentication failed: journal source revoked or disabled") 43 + if response.status_code != 200: 44 + raise ValueError(f"Manifest query failed: {response.status_code} {response.text}") 45 + return response.json() 46 + 47 + 48 + def _upload_segment( 49 + session: requests.Session, 50 + base_url: str, 51 + key: str, 52 + day: str, 53 + stream_name: str, 54 + segment_key: str, 55 + segment_path: Path, 56 + ) -> tuple[str, int]: 57 + files = [ 58 + file_path 59 + for file_path in sorted(segment_path.iterdir()) 60 + if file_path.is_file() and file_path.name != "stream.json" 61 + ] 62 + if not files: 63 + return ("skip", 0) 64 + 65 + bytes_sent = sum(file_path.stat().st_size for file_path in files) 66 + metadata = { 67 + "segments": [ 68 + { 69 + "day": day, 70 + "stream": stream_name, 71 + "segment_key": segment_key, 72 + "files": [file_path.name for file_path in files], 73 + } 74 + ] 75 + } 76 + key_prefix = key[:8] 77 + url = f"{base_url}/app/import/journal/{key_prefix}/ingest/segments" 78 + 79 + for attempt, delay in enumerate(RETRY_BACKOFF): 80 + file_handles = [] 81 + files_data = [] 82 + try: 83 + for file_path in files: 84 + fh = open(file_path, "rb") 85 + file_handles.append(fh) 86 + files_data.append( 87 + ("files_0", (file_path.name, fh, "application/octet-stream")) 88 + ) 89 + 90 + response = session.post( 91 + url, 92 + data={"metadata": json.dumps(metadata)}, 93 + files=files_data, 94 + timeout=UPLOAD_TIMEOUT, 95 + ) 96 + if response.status_code == 200: 97 + return ("sent", bytes_sent) 98 + if response.status_code == 401: 99 + return ("auth_invalid", 0) 100 + if response.status_code == 403: 101 + return ("auth_revoked", 0) 102 + if 500 <= response.status_code <= 599: 103 + logger.warning( 104 + "Upload attempt %s failed for %s/%s/%s: %s %s", 105 + attempt + 1, 106 + day, 107 + stream_name, 108 + segment_key, 109 + response.status_code, 110 + response.text, 111 + ) 112 + else: 113 + logger.warning( 114 + "Upload rejected for %s/%s/%s: %s %s", 115 + day, 116 + stream_name, 117 + segment_key, 118 + response.status_code, 119 + response.text, 120 + ) 121 + return ("error", 0) 122 + except (requests.RequestException, OSError) as e: 123 + logger.warning( 124 + "Upload attempt %s failed for %s/%s/%s: %s", 125 + attempt + 1, 126 + day, 127 + stream_name, 128 + segment_key, 129 + e, 130 + ) 131 + finally: 132 + for fh in file_handles: 133 + try: 134 + fh.close() 135 + except Exception: 136 + pass 137 + 138 + if attempt < len(RETRY_BACKOFF) - 1: 139 + time.sleep(delay) 140 + 141 + return ("error", 0) 142 + 143 + 144 + def export_segments(base_url: str, key: str, days: list[str], dry_run: bool) -> None: 145 + session = requests.Session() 146 + session.headers["Authorization"] = f"Bearer {key}" 147 + 148 + sent = 0 149 + skipped = 0 150 + failed = 0 151 + bytes_total = 0 152 + 153 + try: 154 + try: 155 + remote_manifest = _query_manifest(session, base_url, key) 156 + except requests.ConnectionError: 157 + print(f"Connection failed: could not reach {base_url}") 158 + return 159 + except ValueError as e: 160 + print(str(e)) 161 + return 162 + 163 + journal = get_journal() 164 + for day in days: 165 + day_dir = Path(journal) / day 166 + if not day_dir.exists(): 167 + continue 168 + 169 + segment_entries = iter_segments(day_dir) 170 + if not segment_entries: 171 + continue 172 + 173 + day_sent = 0 174 + day_bytes = 0 175 + 176 + for stream_name, seg_key, seg_path in segment_entries: 177 + manifest = _build_segment_manifest(seg_path) 178 + local_files = { 179 + file_info["name"]: file_info["sha256"] 180 + for file_info in manifest["files"] 181 + if file_info["name"] != "stream.json" 182 + } 183 + if not local_files: 184 + skipped += 1 185 + continue 186 + 187 + remote_entry = remote_manifest.get(day, {}).get( 188 + f"{stream_name}/{seg_key}", {} 189 + ) 190 + remote_files = { 191 + file_info["name"]: file_info["sha256"] 192 + for file_info in remote_entry.get("files", []) 193 + } 194 + if local_files == remote_files: 195 + skipped += 1 196 + logger.info(f" [skip] {day}/{stream_name}/{seg_key}") 197 + continue 198 + 199 + if dry_run: 200 + seg_bytes = sum( 201 + file_info["size"] 202 + for file_info in manifest["files"] 203 + if file_info["name"] != "stream.json" 204 + ) 205 + logger.info(f" [would send] {day}/{stream_name}/{seg_key}") 206 + day_sent += 1 207 + day_bytes += seg_bytes 208 + continue 209 + 210 + status, segment_bytes = _upload_segment( 211 + session, 212 + base_url, 213 + key, 214 + day, 215 + stream_name, 216 + seg_key, 217 + seg_path, 218 + ) 219 + if status == "sent": 220 + logger.info( 221 + f" [sent] {day}/{stream_name}/{seg_key} ({segment_bytes} bytes)" 222 + ) 223 + sent += 1 224 + bytes_total += segment_bytes 225 + elif status == "skip": 226 + skipped += 1 227 + elif status == "auth_invalid": 228 + print("Authentication failed: invalid or missing API key") 229 + return 230 + elif status == "auth_revoked": 231 + print("Authentication failed: journal source revoked or disabled") 232 + return 233 + else: 234 + logger.info(f" [FAILED] {day}/{stream_name}/{seg_key}") 235 + failed += 1 236 + 237 + if dry_run: 238 + sent += day_sent 239 + if day_sent > 0: 240 + print(f" {day}: {day_sent} segment(s), {day_bytes} bytes") 241 + 242 + total = sent + skipped + failed 243 + if total == 0: 244 + print("No segments found to export") 245 + return 246 + if dry_run: 247 + print(f"\nDry run: would send {sent}, skip {skipped}") 248 + return 249 + 250 + print( 251 + f"\nExport complete: {sent} sent, {skipped} skipped, " 252 + f"{failed} failed, {bytes_total} bytes transferred" 253 + ) 254 + if sent == 0 and skipped > 0 and failed == 0: 255 + print("Nothing to send - remote is up to date") 256 + finally: 257 + session.close() 258 + 259 + 260 + def main() -> None: 261 + parser = argparse.ArgumentParser( 262 + description="Export journal data to a remote solstone instance" 263 + ) 264 + parser.add_argument( 265 + "--to", 266 + required=True, 267 + help="Remote instance URL (e.g., host:port or https://host)", 268 + ) 269 + parser.add_argument( 270 + "--key", 271 + required=True, 272 + help="API key for the remote journal source", 273 + ) 274 + parser.add_argument( 275 + "--only", 276 + default=None, 277 + help="Export only specific area (segments)", 278 + ) 279 + parser.add_argument( 280 + "--dry-run", 281 + action="store_true", 282 + help="Show what would be exported without sending", 283 + ) 284 + parser.add_argument( 285 + "--day", 286 + default=None, 287 + help="Day or range (YYYYMMDD or YYYYMMDD-YYYYMMDD)", 288 + ) 289 + args = setup_cli(parser) 290 + 291 + if args.only is not None and args.only != "segments": 292 + print(f"Export of '{args.only}' is not yet implemented") 293 + sys.exit(0) 294 + 295 + base_url = _normalize_url(args.to) 296 + days = _parse_day_spec(args.day, Path(get_journal())) 297 + export_segments(base_url, args.key, days, args.dry_run)
+2
sol.py
··· 55 55 "describe": "observe.describe", 56 56 "sense": "observe.sense", 57 57 "transfer": "observe.transfer", 58 + "export": "observe.export", 58 59 "observer": "observe.observer_cli", 59 60 # AI agents (talent package) 60 61 "agents": "think.agents", ··· 107 108 "describe", 108 109 "sense", 109 110 "transfer", 111 + "export", 110 112 "observer", 111 113 ], 112 114 "Talent (AI agents)": [
+322
tests/test_export.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import json 7 + from pathlib import Path 8 + from unittest.mock import MagicMock, patch 9 + 10 + import pytest 11 + import requests 12 + 13 + import think.utils 14 + from observe.utils import compute_bytes_sha256 15 + 16 + 17 + def _setup_journal(tmp_path, *, include_stream_json: bool = False): 18 + first_dir = tmp_path / "20260413" / "laptop" / "143022_300" 19 + second_dir = tmp_path / "20260413" / "laptop" / "150000_600" 20 + first_dir.mkdir(parents=True) 21 + second_dir.mkdir(parents=True) 22 + 23 + (first_dir / "audio.flac").write_bytes(b"audio-data-one") 24 + (first_dir / "transcript.jsonl").write_bytes(b"transcript-one") 25 + (second_dir / "audio.flac").write_bytes(b"audio-data-two") 26 + (second_dir / "transcript.jsonl").write_bytes(b"transcript-two") 27 + 28 + if include_stream_json: 29 + (first_dir / "stream.json").write_bytes(b'{"name": "laptop"}') 30 + (second_dir / "stream.json").write_bytes(b'{"name": "laptop"}') 31 + 32 + return tmp_path 33 + 34 + 35 + def _set_journal_override(monkeypatch, journal_path): 36 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 37 + think.utils._journal_path_cache = None 38 + 39 + 40 + def _make_session(*, manifest_data=None, get_status=200, post_status=200, post_json=None): 41 + mock = MagicMock(spec=requests.Session) 42 + mock.headers = {} 43 + 44 + get_response = MagicMock() 45 + get_response.status_code = get_status 46 + get_response.json.return_value = manifest_data if manifest_data is not None else {} 47 + get_response.text = "GET error" 48 + mock.get.return_value = get_response 49 + 50 + post_response = MagicMock() 51 + post_response.status_code = post_status 52 + post_response.json.return_value = ( 53 + post_json 54 + if post_json is not None 55 + else { 56 + "segments_received": 1, 57 + "segments_skipped": 0, 58 + "segments_deconflicted": 0, 59 + "errors": [], 60 + } 61 + ) 62 + post_response.text = "POST error" 63 + mock.post.return_value = post_response 64 + 65 + return mock 66 + 67 + 68 + class TestExportSegments: 69 + def test_manifest_query_and_delta(self, tmp_path, monkeypatch): 70 + from observe.export import export_segments 71 + 72 + journal = _setup_journal(tmp_path) 73 + _set_journal_override(monkeypatch, journal) 74 + 75 + manifest_data = { 76 + "20260413": { 77 + "laptop/143022_300": { 78 + "files": [ 79 + { 80 + "name": "audio.flac", 81 + "sha256": compute_bytes_sha256(b"audio-data-one"), 82 + "size": len(b"audio-data-one"), 83 + }, 84 + { 85 + "name": "transcript.jsonl", 86 + "sha256": compute_bytes_sha256(b"transcript-one"), 87 + "size": len(b"transcript-one"), 88 + }, 89 + ] 90 + } 91 + } 92 + } 93 + mock_session = _make_session(manifest_data=manifest_data) 94 + 95 + with patch("observe.export.requests.Session", return_value=mock_session): 96 + export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 97 + 98 + assert mock_session.post.call_count == 1 99 + metadata = json.loads(mock_session.post.call_args.kwargs["data"]["metadata"]) 100 + assert metadata["segments"][0]["segment_key"] == "150000_600" 101 + 102 + def test_dry_run_output(self, tmp_path, monkeypatch, capsys): 103 + from observe.export import export_segments 104 + 105 + journal = _setup_journal(tmp_path) 106 + _set_journal_override(monkeypatch, journal) 107 + 108 + mock_session = _make_session(manifest_data={}) 109 + 110 + with patch("observe.export.requests.Session", return_value=mock_session): 111 + export_segments("https://example.com", "test-key", ["20260413"], dry_run=True) 112 + 113 + assert mock_session.post.call_count == 0 114 + output = capsys.readouterr().out 115 + assert "20260413: 2 segment(s)" in output 116 + assert "Dry run: would send 2, skip 0" in output 117 + 118 + def test_dry_run_skipped_not_double_counted(self, tmp_path, monkeypatch, capsys): 119 + from observe.export import export_segments 120 + 121 + journal = _setup_journal(tmp_path) 122 + _set_journal_override(monkeypatch, journal) 123 + 124 + manifest_data = { 125 + "20260413": { 126 + "laptop/143022_300": { 127 + "files": [ 128 + { 129 + "name": "audio.flac", 130 + "sha256": compute_bytes_sha256(b"audio-data-one"), 131 + "size": len(b"audio-data-one"), 132 + }, 133 + { 134 + "name": "transcript.jsonl", 135 + "sha256": compute_bytes_sha256(b"transcript-one"), 136 + "size": len(b"transcript-one"), 137 + }, 138 + ] 139 + } 140 + } 141 + } 142 + mock_session = _make_session(manifest_data=manifest_data) 143 + 144 + with patch("observe.export.requests.Session", return_value=mock_session): 145 + export_segments("https://example.com", "test-key", ["20260413"], dry_run=True) 146 + 147 + assert mock_session.post.call_count == 0 148 + output = capsys.readouterr().out 149 + assert "20260413: 1 segment(s)" in output 150 + assert "Dry run: would send 1, skip 1" in output 151 + 152 + def test_retry_on_5xx(self, tmp_path, monkeypatch): 153 + from observe.export import export_segments 154 + 155 + journal = _setup_journal(tmp_path) 156 + _set_journal_override(monkeypatch, journal) 157 + 158 + second_dir = journal / "20260413" / "laptop" / "150000_600" 159 + for file_path in second_dir.iterdir(): 160 + file_path.unlink() 161 + second_dir.rmdir() 162 + 163 + mock_session = _make_session(manifest_data={}) 164 + first = MagicMock(status_code=500, text="server error") 165 + second = MagicMock(status_code=500, text="server error") 166 + success = MagicMock(status_code=200, text="ok") 167 + success.json.return_value = { 168 + "segments_received": 1, 169 + "segments_skipped": 0, 170 + "segments_deconflicted": 0, 171 + "errors": [], 172 + } 173 + mock_session.post.side_effect = [first, second, success] 174 + 175 + with ( 176 + patch("observe.export.requests.Session", return_value=mock_session), 177 + patch("observe.export.time.sleep") as mock_sleep, 178 + ): 179 + export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 180 + 181 + assert mock_session.post.call_count == 3 182 + assert mock_sleep.called 183 + 184 + def test_auth_error_401(self): 185 + from observe.export import _query_manifest 186 + 187 + mock_session = _make_session(get_status=401) 188 + 189 + with pytest.raises(ValueError, match="Authentication failed"): 190 + _query_manifest(mock_session, "https://example.com", "test-key") 191 + 192 + def test_auth_error_403(self): 193 + from observe.export import _query_manifest 194 + 195 + mock_session = _make_session(get_status=403) 196 + 197 + with pytest.raises(ValueError, match="journal source revoked"): 198 + _query_manifest(mock_session, "https://example.com", "test-key") 199 + 200 + def test_connection_error(self, tmp_path, monkeypatch, capsys): 201 + from observe.export import export_segments 202 + 203 + journal = _setup_journal(tmp_path) 204 + _set_journal_override(monkeypatch, journal) 205 + 206 + mock_session = _make_session(manifest_data={}) 207 + mock_session.get.side_effect = requests.ConnectionError 208 + 209 + with patch("observe.export.requests.Session", return_value=mock_session): 210 + export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 211 + 212 + assert mock_session.post.call_count == 0 213 + assert "Connection failed" in capsys.readouterr().out 214 + 215 + def test_idempotent(self, tmp_path, monkeypatch, capsys): 216 + from observe.export import export_segments 217 + 218 + journal = _setup_journal(tmp_path) 219 + _set_journal_override(monkeypatch, journal) 220 + 221 + manifest_data = { 222 + "20260413": { 223 + "laptop/143022_300": { 224 + "files": [ 225 + { 226 + "name": "audio.flac", 227 + "sha256": compute_bytes_sha256(b"audio-data-one"), 228 + "size": len(b"audio-data-one"), 229 + }, 230 + { 231 + "name": "transcript.jsonl", 232 + "sha256": compute_bytes_sha256(b"transcript-one"), 233 + "size": len(b"transcript-one"), 234 + }, 235 + ] 236 + }, 237 + "laptop/150000_600": { 238 + "files": [ 239 + { 240 + "name": "audio.flac", 241 + "sha256": compute_bytes_sha256(b"audio-data-two"), 242 + "size": len(b"audio-data-two"), 243 + }, 244 + { 245 + "name": "transcript.jsonl", 246 + "sha256": compute_bytes_sha256(b"transcript-two"), 247 + "size": len(b"transcript-two"), 248 + }, 249 + ] 250 + }, 251 + } 252 + } 253 + mock_session = _make_session(manifest_data=manifest_data) 254 + 255 + with patch("observe.export.requests.Session", return_value=mock_session): 256 + export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 257 + 258 + assert mock_session.post.call_count == 0 259 + assert "up to date" in capsys.readouterr().out 260 + 261 + def test_only_not_implemented(self, capsys): 262 + from observe.export import main 263 + 264 + mock_args = MagicMock() 265 + mock_args.to = "host" 266 + mock_args.key = "testkey" 267 + mock_args.only = "entities" 268 + mock_args.dry_run = False 269 + mock_args.day = None 270 + 271 + with ( 272 + patch("sys.argv", ["sol export", "--to", "host", "--key", "testkey", "--only", "entities"]), 273 + patch("observe.export.setup_cli", return_value=mock_args), 274 + ): 275 + with pytest.raises(SystemExit) as excinfo: 276 + main() 277 + 278 + assert excinfo.value.code == 0 279 + assert "not yet implemented" in capsys.readouterr().out 280 + 281 + def test_upload_error_isolation(self, tmp_path, monkeypatch, capsys): 282 + from observe.export import export_segments 283 + 284 + journal = _setup_journal(tmp_path) 285 + _set_journal_override(monkeypatch, journal) 286 + 287 + mock_session = _make_session(manifest_data={}) 288 + first = MagicMock(status_code=400, text="bad request") 289 + second = MagicMock(status_code=200, text="ok") 290 + second.json.return_value = { 291 + "segments_received": 1, 292 + "segments_skipped": 0, 293 + "segments_deconflicted": 0, 294 + "errors": [], 295 + } 296 + mock_session.post.side_effect = [first, second] 297 + 298 + with patch("observe.export.requests.Session", return_value=mock_session): 299 + export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 300 + 301 + assert mock_session.post.call_count == 2 302 + output = capsys.readouterr().out 303 + assert "1 sent" in output 304 + assert "1 failed" in output 305 + 306 + def test_stream_json_excluded(self, tmp_path, monkeypatch): 307 + from observe.export import export_segments 308 + 309 + journal = _setup_journal(tmp_path, include_stream_json=True) 310 + _set_journal_override(monkeypatch, journal) 311 + 312 + mock_session = _make_session(manifest_data={}) 313 + 314 + with patch("observe.export.requests.Session", return_value=mock_session): 315 + export_segments("https://example.com", "test-key", ["20260413"], dry_run=False) 316 + 317 + for call in mock_session.post.call_args_list: 318 + post_kwargs = call.kwargs 319 + metadata = json.loads(post_kwargs["data"]["metadata"]) 320 + assert "stream.json" not in metadata["segments"][0]["files"] 321 + uploaded_names = [entry[1][0] for entry in post_kwargs["files"]] 322 + assert "stream.json" not in uploaded_names