personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add entity export to sol export CLI

Export entities via manifest delta (SHA256 content hash comparison)
with --only entities dispatching, dry-run support, blocked entity
filtering, and inclusion in default full export after segments.

+347 -6
+118 -4
observe/export.py
··· 10 10 from __future__ import annotations 11 11 12 12 import argparse 13 + import hashlib 13 14 import json 14 15 import logging 15 16 import sys ··· 26 27 _parse_day_spec, 27 28 ) 28 29 from think.utils import get_journal, iter_segments, setup_cli 30 + from think.entities.journal import load_all_journal_entities 29 31 30 32 logger = logging.getLogger(__name__) 31 33 32 34 UPLOAD_TIMEOUT = 300 33 35 34 36 35 - def _query_manifest(session: requests.Session, base_url: str, key: str) -> dict[str, Any]: 37 + def _query_manifest( 38 + session: requests.Session, base_url: str, key: str, area: str = "segments" 39 + ) -> dict[str, Any]: 36 40 key_prefix = key[:8] 37 - url = f"{base_url}/app/import/journal/{key_prefix}/manifest/segments" 41 + url = f"{base_url}/app/import/journal/{key_prefix}/manifest/{area}" 38 42 response = session.get(url, timeout=UPLOAD_TIMEOUT) 39 43 if response.status_code == 401: 40 44 raise ValueError("Authentication failed: invalid or missing API key") ··· 257 261 session.close() 258 262 259 263 264 + def export_entities(base_url: str, key: str, dry_run: bool) -> None: 265 + session = requests.Session() 266 + session.headers["Authorization"] = f"Bearer {key}" 267 + 268 + try: 269 + try: 270 + remote_manifest = _query_manifest(session, base_url, key, area="entities") 271 + except requests.ConnectionError: 272 + print(f"Connection failed: could not reach {base_url}") 273 + return 274 + except ValueError as e: 275 + print(str(e)) 276 + return 277 + 278 + received = remote_manifest.get("received", {}) 279 + entities = load_all_journal_entities() 280 + if not entities: 281 + print("No entities found to export") 282 + return 283 + 284 + new_count = 0 285 + changed_count = 0 286 + unchanged_count = 0 287 + to_send = [] 288 + 289 + for entity_id, entity in entities.items(): 290 + if entity.get("blocked"): 291 + continue 292 + 293 + content_hash = hashlib.sha256( 294 + json.dumps(entity, sort_keys=True, ensure_ascii=False).encode() 295 + ).hexdigest() 296 + if received.get(entity_id) == content_hash: 297 + unchanged_count += 1 298 + continue 299 + 300 + if entity_id in received: 301 + changed_count += 1 302 + else: 303 + new_count += 1 304 + to_send.append(entity) 305 + 306 + if dry_run: 307 + print( 308 + f"Dry run: {new_count} new, {changed_count} changed, " 309 + f"{unchanged_count} unchanged" 310 + ) 311 + return 312 + 313 + if not to_send: 314 + print("Nothing to send - remote entities are up to date") 315 + return 316 + 317 + key_prefix = key[:8] 318 + url = f"{base_url}/app/import/journal/{key_prefix}/ingest/entities" 319 + for attempt, delay in enumerate(RETRY_BACKOFF): 320 + try: 321 + response = session.post( 322 + url, json={"entities": to_send}, timeout=UPLOAD_TIMEOUT 323 + ) 324 + if response.status_code == 200: 325 + break 326 + if response.status_code == 401: 327 + print("Authentication failed: invalid or missing API key") 328 + return 329 + if response.status_code == 403: 330 + print("Authentication failed: journal source revoked or disabled") 331 + return 332 + if 500 <= response.status_code <= 599: 333 + logger.warning( 334 + "Entity upload attempt %s failed: %s %s", 335 + attempt + 1, 336 + response.status_code, 337 + response.text, 338 + ) 339 + else: 340 + print(f"Entity upload failed: {response.status_code} {response.text}") 341 + return 342 + except (requests.RequestException, OSError) as e: 343 + logger.warning("Entity upload attempt %s failed: %s", attempt + 1, e) 344 + if attempt < len(RETRY_BACKOFF) - 1: 345 + time.sleep(delay) 346 + else: 347 + print("Entity upload failed after all retries") 348 + return 349 + 350 + result = response.json() 351 + errors = result.get("errors", []) 352 + if errors: 353 + for err in errors: 354 + print(f" Error: {err}") 355 + print( 356 + f"\nExport complete: {result.get('created', 0)} created, " 357 + f"{result.get('auto_merged', 0)} merged, " 358 + f"{result.get('staged', 0)} staged, " 359 + f"{result.get('skipped', 0)} skipped" 360 + ) 361 + if errors: 362 + print(f" {len(errors)} error(s)") 363 + finally: 364 + session.close() 365 + 366 + 260 367 def main() -> None: 261 368 parser = argparse.ArgumentParser( 262 369 description="Export journal data to a remote solstone instance" ··· 274 381 parser.add_argument( 275 382 "--only", 276 383 default=None, 277 - help="Export only specific area (segments)", 384 + help="Export only specific area (segments, entities)", 278 385 ) 279 386 parser.add_argument( 280 387 "--dry-run", ··· 288 395 ) 289 396 args = setup_cli(parser) 290 397 398 + base_url = _normalize_url(args.to) 399 + 400 + if args.only == "entities": 401 + export_entities(base_url, args.key, args.dry_run) 402 + return 403 + 291 404 if args.only is not None and args.only != "segments": 292 405 print(f"Export of '{args.only}' is not yet implemented") 293 406 sys.exit(0) 294 407 295 - base_url = _normalize_url(args.to) 296 408 days = _parse_day_spec(args.day, Path(get_journal())) 297 409 export_segments(base_url, args.key, days, args.dry_run) 410 + if args.only is None: 411 + export_entities(base_url, args.key, args.dry_run)
+229 -2
tests/test_export.py
··· 3 3 4 4 from __future__ import annotations 5 5 6 + import hashlib 6 7 import json 7 8 from pathlib import Path 8 9 from unittest.mock import MagicMock, patch ··· 63 64 mock.post.return_value = post_response 64 65 65 66 return mock 67 + 68 + 69 + def _setup_entities(tmp_path): 70 + """Create test entities in journal fixture.""" 71 + entities_dir = tmp_path / "entities" 72 + 73 + alice_dir = entities_dir / "alice_johnson" 74 + alice_dir.mkdir(parents=True) 75 + alice = { 76 + "id": "alice_johnson", 77 + "name": "Alice Johnson", 78 + "type": "Person", 79 + "created_at": 1000, 80 + } 81 + (alice_dir / "entity.json").write_text(json.dumps(alice), encoding="utf-8") 82 + 83 + bob_dir = entities_dir / "bob_smith" 84 + bob_dir.mkdir(parents=True) 85 + bob = {"id": "bob_smith", "name": "Bob Smith", "type": "Person", "created_at": 2000} 86 + (bob_dir / "entity.json").write_text(json.dumps(bob), encoding="utf-8") 87 + 88 + blocked_dir = entities_dir / "blocked_user" 89 + blocked_dir.mkdir(parents=True) 90 + blocked = { 91 + "id": "blocked_user", 92 + "name": "Blocked", 93 + "type": "Person", 94 + "blocked": True, 95 + "created_at": 3000, 96 + } 97 + (blocked_dir / "entity.json").write_text(json.dumps(blocked), encoding="utf-8") 98 + 99 + return {"alice_johnson": alice, "bob_smith": bob, "blocked_user": blocked} 100 + 101 + 102 + def _entity_hash(entity: dict) -> str: 103 + return hashlib.sha256( 104 + json.dumps(entity, sort_keys=True, ensure_ascii=False).encode() 105 + ).hexdigest() 66 106 67 107 68 108 class TestExportSegments: ··· 264 304 mock_args = MagicMock() 265 305 mock_args.to = "host" 266 306 mock_args.key = "testkey" 267 - mock_args.only = "entities" 307 + mock_args.only = "facets" 268 308 mock_args.dry_run = False 269 309 mock_args.day = None 270 310 271 311 with ( 272 - patch("sys.argv", ["sol export", "--to", "host", "--key", "testkey", "--only", "entities"]), 312 + patch("sys.argv", ["sol export", "--to", "host", "--key", "testkey", "--only", "facets"]), 273 313 patch("observe.export.setup_cli", return_value=mock_args), 274 314 ): 275 315 with pytest.raises(SystemExit) as excinfo: ··· 320 360 assert "stream.json" not in metadata["segments"][0]["files"] 321 361 uploaded_names = [entry[1][0] for entry in post_kwargs["files"]] 322 362 assert "stream.json" not in uploaded_names 363 + 364 + 365 + class TestExportEntities: 366 + def test_manifest_delta(self, tmp_path, monkeypatch): 367 + from observe.export import export_entities 368 + 369 + entities = _setup_entities(tmp_path) 370 + _set_journal_override(monkeypatch, tmp_path) 371 + 372 + manifest_data = { 373 + "received": { 374 + "alice_johnson": _entity_hash(entities["alice_johnson"]), 375 + } 376 + } 377 + post_json = { 378 + "created": 1, 379 + "auto_merged": 0, 380 + "staged": 0, 381 + "skipped": 0, 382 + "errors": [], 383 + } 384 + mock_session = _make_session(manifest_data=manifest_data, post_json=post_json) 385 + 386 + with patch("observe.export.requests.Session", return_value=mock_session): 387 + export_entities("https://example.com", "test-key", dry_run=False) 388 + 389 + assert mock_session.post.call_count == 1 390 + posted_data = mock_session.post.call_args.kwargs.get( 391 + "json" 392 + ) or mock_session.post.call_args[1].get("json") 393 + posted_entities = posted_data["entities"] 394 + posted_ids = [e["id"] for e in posted_entities] 395 + assert "bob_smith" in posted_ids 396 + assert "alice_johnson" not in posted_ids 397 + assert "blocked_user" not in posted_ids 398 + 399 + def test_dry_run(self, tmp_path, monkeypatch, capsys): 400 + from observe.export import export_entities 401 + 402 + entities = _setup_entities(tmp_path) 403 + _set_journal_override(monkeypatch, tmp_path) 404 + 405 + manifest_data = { 406 + "received": { 407 + "alice_johnson": _entity_hash(entities["alice_johnson"]), 408 + } 409 + } 410 + mock_session = _make_session(manifest_data=manifest_data) 411 + 412 + with patch("observe.export.requests.Session", return_value=mock_session): 413 + export_entities("https://example.com", "test-key", dry_run=True) 414 + 415 + assert mock_session.post.call_count == 0 416 + output = capsys.readouterr().out 417 + assert "1 new" in output 418 + assert "1 unchanged" in output 419 + 420 + def test_idempotent(self, tmp_path, monkeypatch, capsys): 421 + from observe.export import export_entities 422 + 423 + entities = _setup_entities(tmp_path) 424 + _set_journal_override(monkeypatch, tmp_path) 425 + 426 + manifest_data = { 427 + "received": { 428 + "alice_johnson": _entity_hash(entities["alice_johnson"]), 429 + "bob_smith": _entity_hash(entities["bob_smith"]), 430 + } 431 + } 432 + mock_session = _make_session(manifest_data=manifest_data) 433 + 434 + with patch("observe.export.requests.Session", return_value=mock_session): 435 + export_entities("https://example.com", "test-key", dry_run=False) 436 + 437 + assert mock_session.post.call_count == 0 438 + output = capsys.readouterr().out 439 + assert "up to date" in output 440 + 441 + def test_changed_entity(self, tmp_path, monkeypatch): 442 + from observe.export import export_entities 443 + 444 + entities = _setup_entities(tmp_path) 445 + _set_journal_override(monkeypatch, tmp_path) 446 + 447 + manifest_data = { 448 + "received": { 449 + "alice_johnson": "stale_hash_that_does_not_match", 450 + "bob_smith": _entity_hash(entities["bob_smith"]), 451 + } 452 + } 453 + post_json = { 454 + "created": 0, 455 + "auto_merged": 1, 456 + "staged": 0, 457 + "skipped": 0, 458 + "errors": [], 459 + } 460 + mock_session = _make_session(manifest_data=manifest_data, post_json=post_json) 461 + 462 + with patch("observe.export.requests.Session", return_value=mock_session): 463 + export_entities("https://example.com", "test-key", dry_run=False) 464 + 465 + assert mock_session.post.call_count == 1 466 + posted_data = mock_session.post.call_args.kwargs.get( 467 + "json" 468 + ) or mock_session.post.call_args[1].get("json") 469 + posted_ids = [e["id"] for e in posted_data["entities"]] 470 + assert "alice_johnson" in posted_ids 471 + assert "bob_smith" not in posted_ids 472 + 473 + def test_auth_error_401(self, tmp_path, monkeypatch, capsys): 474 + from observe.export import export_entities 475 + 476 + _setup_entities(tmp_path) 477 + _set_journal_override(monkeypatch, tmp_path) 478 + 479 + mock_session = _make_session(get_status=401) 480 + 481 + with patch("observe.export.requests.Session", return_value=mock_session): 482 + export_entities("https://example.com", "test-key", dry_run=False) 483 + 484 + assert mock_session.post.call_count == 0 485 + assert "Authentication failed" in capsys.readouterr().out 486 + 487 + def test_connection_error(self, tmp_path, monkeypatch, capsys): 488 + from observe.export import export_entities 489 + 490 + _setup_entities(tmp_path) 491 + _set_journal_override(monkeypatch, tmp_path) 492 + 493 + mock_session = _make_session() 494 + mock_session.get.side_effect = requests.ConnectionError 495 + 496 + with patch("observe.export.requests.Session", return_value=mock_session): 497 + export_entities("https://example.com", "test-key", dry_run=False) 498 + 499 + assert mock_session.post.call_count == 0 500 + assert "Connection failed" in capsys.readouterr().out 501 + 502 + def test_empty_manifest(self, tmp_path, monkeypatch): 503 + from observe.export import export_entities 504 + 505 + _setup_entities(tmp_path) 506 + _set_journal_override(monkeypatch, tmp_path) 507 + 508 + post_json = { 509 + "created": 2, 510 + "auto_merged": 0, 511 + "staged": 0, 512 + "skipped": 0, 513 + "errors": [], 514 + } 515 + mock_session = _make_session(manifest_data={}, post_json=post_json) 516 + 517 + with patch("observe.export.requests.Session", return_value=mock_session): 518 + export_entities("https://example.com", "test-key", dry_run=False) 519 + 520 + assert mock_session.post.call_count == 1 521 + posted_data = mock_session.post.call_args.kwargs.get( 522 + "json" 523 + ) or mock_session.post.call_args[1].get("json") 524 + posted_ids = [e["id"] for e in posted_data["entities"]] 525 + assert "alice_johnson" in posted_ids 526 + assert "bob_smith" in posted_ids 527 + assert "blocked_user" not in posted_ids 528 + 529 + def test_response_errors_reported(self, tmp_path, monkeypatch, capsys): 530 + from observe.export import export_entities 531 + 532 + _setup_entities(tmp_path) 533 + _set_journal_override(monkeypatch, tmp_path) 534 + 535 + post_json = { 536 + "created": 1, 537 + "auto_merged": 0, 538 + "staged": 0, 539 + "skipped": 0, 540 + "errors": ["Entity bob_smith: invalid type"], 541 + } 542 + mock_session = _make_session(manifest_data={}, post_json=post_json) 543 + 544 + with patch("observe.export.requests.Session", return_value=mock_session): 545 + export_entities("https://example.com", "test-key", dry_run=False) 546 + 547 + output = capsys.readouterr().out 548 + assert "Entity bob_smith: invalid type" in output 549 + assert "1 error" in output