personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add observer transfer ingest and manifest endpoints

Extract shared ingest pipeline (_process_ingest_files) from ingest_upload()
and add POST /ingest/<key>/transfer for receiving pre-processed segments
from other solstone instances. Add GET /ingest/<key>/manifest for day
listing and GET /ingest/<key>/manifest/<day> for per-day transfer manifests.
Add observe.transferred event handler that queues indexer rescan.

+793 -135
+48
apps/observer/events.py
··· 57 57 logger.debug( 58 58 f"Recorded observed status for observer {observer_name}: {day}/{segment}" 59 59 ) 60 + 61 + 62 + @on_event("observe", "transferred") 63 + def handle_transferred(ctx: EventContext) -> None: 64 + """Handle observe.transferred events for transfer-originated segments. 65 + 66 + When a transferred segment is received, append a 'transferred' record 67 + to the observer's sync history, increment stats, and queue an indexer 68 + rescan to pick up the new content. 69 + """ 70 + observer_name = ctx.msg.get("observer") 71 + if not observer_name: 72 + return 73 + 74 + segment = ctx.msg.get("segment") 75 + day = ctx.msg.get("day") 76 + if not segment or not day: 77 + logger.warning( 78 + f"observe.transferred missing segment/day for observer {observer_name}" 79 + ) 80 + return 81 + 82 + observer = find_observer_by_name(observer_name) 83 + if not observer: 84 + logger.debug(f"Observer not found for transferred event: {observer_name}") 85 + return 86 + 87 + key_prefix = observer.get("key", "")[:8] 88 + if not key_prefix: 89 + return 90 + 91 + record = { 92 + "ts": now_ms(), 93 + "type": "transferred", 94 + "segment": segment, 95 + } 96 + append_history_record(key_prefix, day, record) 97 + 98 + increment_stat(key_prefix, "segments_transferred") 99 + 100 + # Queue indexer rescan to pick up transferred content 101 + from think.callosum import callosum_send 102 + 103 + callosum_send("supervisor", "request", cmd=["sol", "indexer", "--rescan"]) 104 + 105 + logger.debug( 106 + f"Recorded transferred status for observer {observer_name}: {day}/{segment}" 107 + )
+329 -131
apps/observer/routes.py
··· 6 6 Provides endpoints for: 7 7 - Managing observer registrations (UI) 8 8 - Receiving file uploads from observers (ingest) 9 + - Receiving transferred segments from other instances (transfer ingest) 10 + - Serving segment manifests for transfer diffing 9 11 - Relaying events from observers to local Callosum 10 12 - Retrieving segment upload history for sync verification 11 13 """ ··· 15 17 import base64 16 18 import json 17 19 import logging 20 + import platform 18 21 import re 19 22 import secrets 20 23 from pathlib import Path ··· 28 31 from observe.utils import ( 29 32 MAX_SEGMENT_ATTEMPTS, 30 33 compute_bytes_sha256, 34 + compute_file_sha256, 31 35 find_available_segment, 32 36 ) 33 37 from think.streams import stream_name, update_stream, write_segment_stream 34 - from think.utils import day_path, now_ms, segment_path 38 + from think.utils import day_path, iter_segments, now_ms, segment_path 35 39 36 40 from .utils import ( 37 41 append_history_record, 38 42 find_segment_by_sha256, 43 + get_hist_dir, 39 44 get_observers_dir, 40 45 list_observers, 41 46 load_history, ··· 301 306 # === Ingest API (key-protected) === 302 307 303 308 304 - @observer_bp.route("/ingest", methods=["POST"]) 305 - @observer_bp.route("/ingest/<key>", methods=["POST"]) 306 - def ingest_upload(key: str | None = None) -> Any: 307 - """Receive file uploads from observer. 309 + def _process_ingest_files( 310 + observer: dict, 311 + key_prefix: str, 312 + segment: str, 313 + day: str, 314 + stream: str, 315 + uploaded_files, 316 + *, 317 + source: str | None = None, 318 + ) -> tuple[dict, int]: 319 + """Shared ingest pipeline: read/hash files, dedup, deconflict, save, record history, update stats. 308 320 309 - Expects multipart form with: 310 - - segment: Segment key (HHMMSS_LEN) 311 - - day: Day string (YYYYMMDD) 312 - - files: One or more media files 313 - - host: (optional) Hostname of observer 314 - - platform: (optional) Platform of observer 315 - - meta: (optional) JSON-encoded metadata dict (facet, setting, etc.) 316 - 317 - Writes files to journal and emits observe.observing event. 318 - Host/platform are merged into meta (meta values take precedence). 321 + Parameters 322 + ---------- 323 + observer : dict 324 + Observer metadata dict (must include 'stats', 'name', 'last_seen', etc.) 325 + key_prefix : str 326 + First 8 chars of observer key. 327 + segment : str 328 + Requested segment key (HHMMSS_LEN format). 329 + day : str 330 + Day string (YYYYMMDD format). 331 + stream : str 332 + Stream name (already resolved by caller). 333 + uploaded_files : list 334 + List of Flask FileStorage objects from request.files.getlist("files"). 335 + source : str or None 336 + If provided, added as "source" field to history record (e.g., "transfer"). 319 337 320 - Returns status: 321 - - "ok": New segment accepted 322 - - "duplicate": All files already received (no processing triggered) 323 - - "collision": New segment saved with adjusted key (directory conflict) 338 + Returns 339 + ------- 340 + tuple of (dict, int) 341 + Response body dict and HTTP status code. 324 342 """ 325 - # Extract key from Bearer header (primary) or URL path (legacy) 326 - auth_key = _get_key(key) 327 - if not auth_key: 328 - return jsonify({"error": "Authorization required"}), 401 329 - 330 - # Validate key 331 - observer = load_observer(auth_key) 332 - if not observer: 333 - return jsonify({"error": "Invalid key"}), 401 334 - 335 - if observer.get("revoked", False): 336 - return jsonify({"error": "Observer revoked"}), 403 337 - 338 - if not observer.get("enabled", True): 339 - return jsonify({"error": "Observer disabled"}), 403 340 - 341 - # Get segment, day, and host info from form 342 - segment = request.form.get("segment", "").strip() 343 - day = request.form.get("day", "").strip() 344 - host = request.form.get("host", "").strip() 345 - platform = request.form.get("platform", "").strip() 346 - meta_str = request.form.get("meta", "").strip() 347 - 348 - # Parse meta JSON and merge host/platform (meta values take precedence) 349 - meta: dict = {} 350 - if meta_str: 351 - try: 352 - meta = json.loads(meta_str) 353 - except json.JSONDecodeError: 354 - logger.warning(f"Invalid meta JSON from observer: {meta_str[:100]}") 355 - if host and "host" not in meta: 356 - meta["host"] = host 357 - if platform and "platform" not in meta: 358 - meta["platform"] = platform 359 - 360 - # Warn if client hostname differs from registered observer name 361 - effective_host = meta.get("host", host) 362 - observer_name = observer.get("name", "") 363 - if effective_host and effective_host != observer_name: 364 - logger.warning( 365 - f"Observer '{observer_name}' ({auth_key[:8]}) connecting from host " 366 - f"'{effective_host}' — hostname differs from registered name. " 367 - f"Use `sol observer rename` to update if the host was renamed." 368 - ) 369 - 370 - if not segment: 371 - return jsonify({"error": "Missing segment"}), 400 372 - if not day: 373 - return jsonify({"error": "Missing day"}), 400 374 - 375 - # Validate segment format (HHMMSS_LEN) 376 - if not re.match(r"^\d{6}_\d+$", segment): 377 - return jsonify({"error": "Invalid segment format"}), 400 378 - 379 - # Validate day format (YYYYMMDD) 380 - if not re.match(r"^\d{8}$", day): 381 - return jsonify({"error": "Invalid day format"}), 400 382 - 383 - # Get uploaded files 384 - files = request.files.getlist("files") 385 - if not files: 386 - return jsonify({"error": "No files uploaded"}), 400 387 - 388 - key_prefix = auth_key[:8] 389 - 390 343 # Read file contents into memory and compute SHA256 before saving 391 344 # This allows duplicate detection without writing to disk 392 345 file_data = [] # List of (submitted_filename, simple_filename, content, sha256) 393 - for upload in files: 346 + for upload in uploaded_files: 394 347 if not upload.filename: 395 348 continue 396 349 ··· 411 364 file_data.append((submitted_filename, simple_filename, content, sha256)) 412 365 413 366 if not file_data: 414 - return jsonify({"error": "No valid files uploaded"}), 400 367 + return {"error": "No valid files uploaded"}, 400 415 368 416 369 # Check for duplicate submission by SHA256 417 370 incoming_sha256s = {fd[3] for fd in file_data} ··· 420 373 ) 421 374 422 375 if existing_segment: 423 - # Full duplicate - all files already exist in an existing segment 424 376 logger.info( 425 377 f"Duplicate segment rejected: {day}/{segment} from {observer.get('name')} " 426 378 f"(matches existing {existing_segment})" 427 379 ) 428 380 429 - # Update last_seen and increment duplicates_rejected stat 430 381 observer["last_seen"] = now_ms() 431 382 observer["stats"]["duplicates_rejected"] = ( 432 383 observer["stats"].get("duplicates_rejected", 0) + 1 433 384 ) 434 385 save_observer(observer) 435 386 436 - return jsonify( 387 + return ( 437 388 { 438 389 "status": "duplicate", 439 390 "existing_segment": existing_segment, 440 391 "message": "All files already received", 441 - } 392 + }, 393 + 200, 442 394 ) 443 395 444 - # Log partial match context if some files already exist 445 396 partial_match = bool(matched_sha256s) 446 397 447 398 # Ensure day directory exists 448 399 day_dir = day_path(day) 449 400 day_dir.mkdir(parents=True, exist_ok=True) 450 401 451 - # Determine stream name: trust client-provided stream in meta if valid, 452 - # otherwise derive from observer registration name. 453 - # Deriving from observer name via stream_name(observer=...) calls _strip_hostname, 454 - # which strips qualifiers like ".tmux" — so "fedora.tmux" becomes "fedora", 455 - # colliding both observers into one stream. 456 - client_stream = meta.get("stream", "").strip() 457 - observer_name = observer.get("name", "unknown") 458 - if client_stream and re.match(r"^[a-z0-9][a-z0-9._-]*$", client_stream): 459 - stream = client_stream 460 - else: 461 - stream = stream_name(observer=observer_name) 462 - 463 402 # Find available segment key within the stream directory 464 403 stream_dir = day_dir / stream 465 404 stream_dir.mkdir(parents=True, exist_ok=True) ··· 468 407 available_segment = find_available_segment(stream_dir, segment) 469 408 470 409 if available_segment is None: 471 - # Exhausted attempts, save to failed directory 472 410 logger.error( 473 411 f"No available segment slot for {day}/{stream}/{segment} from " 474 - f"{observer_name} after {MAX_SEGMENT_ATTEMPTS} attempts" 412 + f"{observer.get('name', 'unknown')} after {MAX_SEGMENT_ATTEMPTS} attempts" 475 413 ) 476 414 failed_dir = _save_to_failed(day_dir, file_data, segment) 477 415 return ( 478 - jsonify( 479 - { 480 - "status": "failed", 481 - "error": f"No available segment slot after {MAX_SEGMENT_ATTEMPTS} attempts", 482 - "failed_path": str(failed_dir.relative_to(day_dir.parent)), 483 - } 484 - ), 416 + { 417 + "status": "failed", 418 + "error": f"No available segment slot after {MAX_SEGMENT_ATTEMPTS} attempts", 419 + "failed_path": str(failed_dir.relative_to(day_dir.parent)), 420 + }, 485 421 507, 486 - ) # Insufficient Storage 422 + ) 487 423 488 424 segment = available_segment 489 425 if segment != original_segment: 490 426 logger.info( 491 427 f"Segment collision resolved: {original_segment} -> {segment} " 492 - f"for observer {observer_name}" 428 + f"for observer {observer.get('name', 'unknown')}" 493 429 ) 494 430 495 431 # Create segment directory for files (under stream) ··· 526 462 logger.info(f"Saved {simple_filename} to {segment_dir}") 527 463 except OSError as e: 528 464 logger.error(f"Failed to save {simple_filename}: {e}") 529 - return jsonify({"error": f"Failed to save {simple_filename}"}), 500 465 + return {"error": f"Failed to save {simple_filename}"}, 500 530 466 531 467 if not saved_files: 532 - return jsonify({"error": "No valid files saved"}), 400 468 + return {"error": "No valid files saved"}, 400 533 469 534 - # Write sync history record 535 470 sync_record = { 536 471 "ts": now_ms(), 537 472 "segment": segment, ··· 541 476 if segment != original_segment: 542 477 sync_record["segment_original"] = original_segment 543 478 if partial_match: 544 - # Log which SHA256s matched existing files (for debugging/audit) 545 479 sync_record["partial_match_sha256s"] = list(matched_sha256s) 480 + if source: 481 + sync_record["source"] = source 546 482 append_history_record(key_prefix, day, sync_record) 547 483 548 - # Update observer stats 549 484 observer["last_seen"] = now_ms() 550 485 observer["last_segment"] = segment 551 486 observer["stats"]["segments_received"] = ( ··· 556 491 ) 557 492 save_observer(observer) 558 493 494 + status = "collision" if segment != original_segment else "ok" 495 + return { 496 + "status": status, 497 + "segment": segment, 498 + "files": saved_files, 499 + "bytes": total_bytes, 500 + }, 200 501 + 502 + 503 + @observer_bp.route("/ingest", methods=["POST"]) 504 + @observer_bp.route("/ingest/<key>", methods=["POST"]) 505 + def ingest_upload(key: str | None = None) -> Any: 506 + """Receive file uploads from observer. 507 + 508 + Expects multipart form with: 509 + - segment: Segment key (HHMMSS_LEN) 510 + - day: Day string (YYYYMMDD) 511 + - files: One or more media files 512 + - host: (optional) Hostname of observer 513 + - platform: (optional) Platform of observer 514 + - meta: (optional) JSON-encoded metadata dict (facet, setting, etc.) 515 + 516 + Writes files to journal and emits observe.observing event. 517 + Host/platform are merged into meta (meta values take precedence). 518 + 519 + Returns status: 520 + - "ok": New segment accepted 521 + - "duplicate": All files already received (no processing triggered) 522 + - "collision": New segment saved with adjusted key (directory conflict) 523 + """ 524 + # Extract key from Bearer header (primary) or URL path (legacy) 525 + auth_key = _get_key(key) 526 + if not auth_key: 527 + return jsonify({"error": "Authorization required"}), 401 528 + 529 + # Validate key 530 + observer = load_observer(auth_key) 531 + if not observer: 532 + return jsonify({"error": "Invalid key"}), 401 533 + 534 + if observer.get("revoked", False): 535 + return jsonify({"error": "Observer revoked"}), 403 536 + 537 + if not observer.get("enabled", True): 538 + return jsonify({"error": "Observer disabled"}), 403 539 + 540 + # Get segment, day, and host info from form 541 + segment = request.form.get("segment", "").strip() 542 + day = request.form.get("day", "").strip() 543 + host = request.form.get("host", "").strip() 544 + platform = request.form.get("platform", "").strip() 545 + meta_str = request.form.get("meta", "").strip() 546 + 547 + # Parse meta JSON and merge host/platform (meta values take precedence) 548 + meta: dict = {} 549 + if meta_str: 550 + try: 551 + meta = json.loads(meta_str) 552 + except json.JSONDecodeError: 553 + logger.warning(f"Invalid meta JSON from observer: {meta_str[:100]}") 554 + if host and "host" not in meta: 555 + meta["host"] = host 556 + if platform and "platform" not in meta: 557 + meta["platform"] = platform 558 + 559 + # Warn if client hostname differs from registered observer name 560 + effective_host = meta.get("host", host) 561 + observer_name = observer.get("name", "") 562 + if effective_host and effective_host != observer_name: 563 + logger.warning( 564 + f"Observer '{observer_name}' ({auth_key[:8]}) connecting from host " 565 + f"'{effective_host}' — hostname differs from registered name. " 566 + f"Use `sol observer rename` to update if the host was renamed." 567 + ) 568 + 569 + if not segment: 570 + return jsonify({"error": "Missing segment"}), 400 571 + if not day: 572 + return jsonify({"error": "Missing day"}), 400 573 + 574 + # Validate segment format (HHMMSS_LEN) 575 + if not re.match(r"^\d{6}_\d+$", segment): 576 + return jsonify({"error": "Invalid segment format"}), 400 577 + 578 + # Validate day format (YYYYMMDD) 579 + if not re.match(r"^\d{8}$", day): 580 + return jsonify({"error": "Invalid day format"}), 400 581 + 582 + # Get uploaded files 583 + files = request.files.getlist("files") 584 + if not files: 585 + return jsonify({"error": "No files uploaded"}), 400 586 + 587 + key_prefix = auth_key[:8] 588 + 589 + # Determine stream name: trust client-provided stream in meta if valid, 590 + # otherwise derive from observer registration name. 591 + # Deriving from observer name via stream_name(observer=...) calls _strip_hostname, 592 + # which strips qualifiers like ".tmux" — so "fedora.tmux" becomes "fedora", 593 + # colliding both observers into one stream. 594 + client_stream = meta.get("stream", "").strip() 595 + observer_name = observer.get("name", "unknown") 596 + if client_stream and re.match(r"^[a-z0-9][a-z0-9._-]*$", client_stream): 597 + stream = client_stream 598 + else: 599 + stream = stream_name(observer=observer_name) 600 + 601 + body, status = _process_ingest_files( 602 + observer, key_prefix, segment, day, stream, files 603 + ) 604 + if status != 200 or body.get("status") == "duplicate": 605 + return jsonify(body), status 606 + 607 + segment = body["segment"] 608 + saved_files = body["files"] 609 + segment_dir = segment_path(day, segment, stream) 610 + 559 611 # Write stream identity for this segment 560 612 try: 561 613 result = update_stream(stream, day, segment, type="observer") ··· 588 640 logger.info( 589 641 f"Received {len(saved_files)} files for {day}/{segment} from {observer.get('name')}" 590 642 ) 643 + return jsonify(body), status 591 644 592 - # Determine response status 593 - if segment != original_segment: 594 - status = "collision" 595 - else: 596 - status = "ok" 645 + 646 + @observer_bp.route("/ingest/<key>/transfer", methods=["POST"]) 647 + def ingest_transfer(key: str) -> Any: 648 + """Receive transferred file uploads from another solstone instance.""" 649 + auth_key = _get_key(key) 650 + if not auth_key: 651 + return jsonify({"error": "Authorization required"}), 401 652 + 653 + observer = load_observer(auth_key) 654 + if not observer: 655 + return jsonify({"error": "Invalid key"}), 401 656 + 657 + if observer.get("revoked", False): 658 + return jsonify({"error": "Observer revoked"}), 403 659 + 660 + if not observer.get("enabled", True): 661 + return jsonify({"error": "Observer disabled"}), 403 662 + 663 + segment = request.form.get("segment", "").strip() 664 + day = request.form.get("day", "").strip() 665 + stream = request.form.get("stream", "").strip() 666 + host = request.form.get("host", "").strip() 667 + platform_name = request.form.get("platform", "").strip() 668 + meta_str = request.form.get("meta", "").strip() 669 + 670 + meta: dict = {} 671 + if meta_str: 672 + try: 673 + meta = json.loads(meta_str) 674 + except json.JSONDecodeError: 675 + logger.warning(f"Invalid meta JSON from observer: {meta_str[:100]}") 676 + if host and "host" not in meta: 677 + meta["host"] = host 678 + if platform_name and "platform" not in meta: 679 + meta["platform"] = platform_name 597 680 598 - return jsonify( 599 - { 600 - "status": status, 601 - "segment": segment, 602 - "files": saved_files, 603 - "bytes": total_bytes, 604 - } 681 + if not segment: 682 + return jsonify({"error": "Missing segment"}), 400 683 + if not day: 684 + return jsonify({"error": "Missing day"}), 400 685 + if not stream: 686 + return jsonify({"error": "Missing stream"}), 400 687 + if not re.match(r"^\d{6}_\d+$", segment): 688 + return jsonify({"error": "Invalid segment format"}), 400 689 + if not re.match(r"^\d{8}$", day): 690 + return jsonify({"error": "Invalid day format"}), 400 691 + if not re.match(r"^[a-z0-9][a-z0-9._-]*$", stream): 692 + return jsonify({"error": "Invalid stream format"}), 400 693 + 694 + files = request.files.getlist("files") 695 + if not files: 696 + return jsonify({"error": "No files uploaded"}), 400 697 + 698 + key_prefix = auth_key[:8] 699 + body, status = _process_ingest_files( 700 + observer, 701 + key_prefix, 702 + segment, 703 + day, 704 + stream, 705 + files, 706 + source="transfer", 605 707 ) 708 + if status != 200 or body.get("status") == "duplicate": 709 + return jsonify(body), status 710 + 711 + observer_name = observer.get("name", "") 712 + event_fields: dict[str, Any] = { 713 + "segment": body["segment"], 714 + "day": day, 715 + "files": body["files"], 716 + "observer": observer_name, 717 + "stream": stream, 718 + } 719 + if meta: 720 + event_fields["meta"] = meta 721 + emit("observe", "transferred", **event_fields) 722 + 723 + return jsonify(body), status 724 + 725 + 726 + @observer_bp.route("/ingest/<key>/manifest", methods=["GET"]) 727 + def ingest_manifest(key: str) -> Any: 728 + """List available manifest days for an observer.""" 729 + auth_key = _get_key(key) 730 + if not auth_key: 731 + return jsonify({"error": "Authorization required"}), 401 732 + 733 + observer = load_observer(auth_key) 734 + if not observer: 735 + return jsonify({"error": "Invalid key"}), 401 736 + 737 + if observer.get("revoked", False): 738 + return jsonify({"error": "Observer revoked"}), 403 739 + 740 + if not observer.get("enabled", True): 741 + return jsonify({"error": "Observer disabled"}), 403 742 + 743 + key_prefix = auth_key[:8] 744 + hist_dir = get_hist_dir(key_prefix, ensure_exists=False) 745 + if not hist_dir.exists(): 746 + return jsonify({"days": {}}) 747 + 748 + days: dict[str, dict[str, int]] = {} 749 + for hist_path in sorted(hist_dir.glob("*.jsonl")): 750 + records = load_history(key_prefix, hist_path.stem) 751 + segments = { 752 + record.get("segment", "") 753 + for record in records 754 + if not record.get("type") and record.get("segment") 755 + } 756 + days[hist_path.stem] = {"segments": len(segments)} 757 + 758 + return jsonify({"days": days}) 759 + 760 + 761 + @observer_bp.route("/ingest/<key>/manifest/<day>", methods=["GET"]) 762 + def ingest_manifest_day(key: str, day: str) -> Any: 763 + """Return a transfer manifest for all segments on a given day.""" 764 + auth_key = _get_key(key) 765 + if not auth_key: 766 + return jsonify({"error": "Authorization required"}), 401 767 + 768 + observer = load_observer(auth_key) 769 + if not observer: 770 + return jsonify({"error": "Invalid key"}), 401 771 + 772 + if observer.get("revoked", False): 773 + return jsonify({"error": "Observer revoked"}), 403 774 + 775 + if not observer.get("enabled", True): 776 + return jsonify({"error": "Observer disabled"}), 403 777 + 778 + if not re.match(r"^\d{8}$", day): 779 + return jsonify({"error": "Invalid day format"}), 400 780 + 781 + manifest = { 782 + "version": 1, 783 + "day": day, 784 + "created_at": now_ms(), 785 + "host": platform.node() or "unknown", 786 + "segments": {}, 787 + } 788 + 789 + for stream, seg_key, seg_path in iter_segments(day): 790 + arc_key = f"{stream}/{seg_key}" 791 + files = [] 792 + for file_path in sorted(seg_path.iterdir()): 793 + if file_path.is_file(): 794 + files.append( 795 + { 796 + "name": file_path.name, 797 + "sha256": compute_file_sha256(file_path), 798 + "size": file_path.stat().st_size, 799 + } 800 + ) 801 + manifest["segments"][arc_key] = {"files": files} 802 + 803 + return jsonify(manifest) 606 804 607 805 608 806 @observer_bp.route("/ingest/event", methods=["POST"])
+54 -3
apps/observer/tests/test_events.py
··· 10 10 import pytest 11 11 12 12 from apps.events import EventContext 13 - from apps.observer.events import handle_observed 13 + from apps.observer.events import handle_observed, handle_transferred 14 14 15 15 16 16 @pytest.fixture ··· 75 75 handle_observed(ctx) 76 76 77 77 # Check history was written 78 - hist_path = observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl" 78 + hist_path = ( 79 + observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl" 80 + ) 79 81 assert hist_path.exists() 80 82 81 83 with open(hist_path) as f: ··· 108 110 handle_observed(ctx) 109 111 110 112 # Check all records written 111 - hist_path = observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl" 113 + hist_path = ( 114 + observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl" 115 + ) 112 116 with open(hist_path) as f: 113 117 lines = f.readlines() 114 118 ··· 204 208 # No history should be created 205 209 hist_dir = observer_journal.observers_dir / "testkey1" / "hist" 206 210 assert not hist_dir.exists() 211 + 212 + def test_handle_transferred(self, observer_journal, monkeypatch): 213 + """Handler records transferred status, stats, and queues rescan.""" 214 + import think.callosum as callosum_module 215 + 216 + calls = [] 217 + monkeypatch.setattr( 218 + callosum_module, 219 + "callosum_send", 220 + lambda *a, **kw: calls.append((a, kw)) or True, 221 + ) 222 + 223 + ctx = EventContext( 224 + msg={ 225 + "tract": "observe", 226 + "event": "transferred", 227 + "observer": "test-observer", 228 + "segment": "120000_300", 229 + "day": "20250103", 230 + }, 231 + app="observer", 232 + tract="observe", 233 + event="transferred", 234 + ) 235 + 236 + handle_transferred(ctx) 237 + 238 + hist_path = ( 239 + observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl" 240 + ) 241 + assert hist_path.exists() 242 + with open(hist_path) as f: 243 + record = json.loads(f.readline()) 244 + 245 + assert record["type"] == "transferred" 246 + assert record["segment"] == "120000_300" 247 + 248 + with open(observer_journal.observer_path) as f: 249 + data = json.load(f) 250 + assert data["stats"]["segments_transferred"] == 1 251 + 252 + assert calls == [ 253 + ( 254 + ("supervisor", "request"), 255 + {"cmd": ["sol", "indexer", "--rescan"]}, 256 + ) 257 + ]
+362 -1
apps/observer/tests/test_routes.py
··· 1255 1255 assert "segments_observed" not in data[0]["stats"] 1256 1256 1257 1257 # Manually add segments_observed stat 1258 - observer_path = env.journal / "apps" / "observer" / "observers" / f"{key_prefix}.json" 1258 + observer_path = ( 1259 + env.journal / "apps" / "observer" / "observers" / f"{key_prefix}.json" 1260 + ) 1259 1261 with open(observer_path) as f: 1260 1262 observer_data = json.load(f) 1261 1263 observer_data["stats"]["segments_observed"] = 5 ··· 1690 1692 assert not ( 1691 1693 env.journal / "20250103" / "fedora" / "120000_300" / "tmux.jsonl" 1692 1694 ).exists() 1695 + 1696 + 1697 + def test_transfer_success(observer_env): 1698 + """Test successful transfer upload.""" 1699 + env = observer_env() 1700 + 1701 + resp = env.client.post( 1702 + "/app/observer/api/create", 1703 + json={"name": "transfer-test"}, 1704 + content_type="application/json", 1705 + ) 1706 + key = resp.get_json()["key"] 1707 + 1708 + test_data = b"transferred audio content" 1709 + resp = env.client.post( 1710 + f"/app/observer/ingest/{key}/transfer", 1711 + data={ 1712 + "day": "20250103", 1713 + "segment": "120000_300", 1714 + "stream": "remote.host", 1715 + "files": (io.BytesIO(test_data), "audio.flac"), 1716 + }, 1717 + ) 1718 + assert resp.status_code == 200 1719 + data = resp.get_json() 1720 + assert data["status"] == "ok" 1721 + assert data["segment"] == "120000_300" 1722 + assert data["files"] == ["audio.flac"] 1723 + assert data["bytes"] == len(test_data) 1724 + 1725 + expected_file = ( 1726 + env.journal / "20250103" / "remote.host" / "120000_300" / "audio.flac" 1727 + ) 1728 + assert expected_file.exists() 1729 + assert expected_file.read_bytes() == test_data 1730 + 1731 + 1732 + def test_transfer_requires_stream(observer_env): 1733 + """Test that transfer requires stream.""" 1734 + env = observer_env() 1735 + 1736 + resp = env.client.post( 1737 + "/app/observer/api/create", 1738 + json={"name": "transfer-stream-test"}, 1739 + content_type="application/json", 1740 + ) 1741 + key = resp.get_json()["key"] 1742 + 1743 + resp = env.client.post( 1744 + f"/app/observer/ingest/{key}/transfer", 1745 + data={ 1746 + "day": "20250103", 1747 + "segment": "120000_300", 1748 + "files": (io.BytesIO(b"content"), "audio.flac"), 1749 + }, 1750 + ) 1751 + assert resp.status_code == 400 1752 + assert resp.get_json()["error"] == "Missing stream" 1753 + 1754 + 1755 + def test_transfer_invalid_stream(observer_env): 1756 + """Test that transfer validates stream format.""" 1757 + env = observer_env() 1758 + 1759 + resp = env.client.post( 1760 + "/app/observer/api/create", 1761 + json={"name": "transfer-invalid-stream"}, 1762 + content_type="application/json", 1763 + ) 1764 + key = resp.get_json()["key"] 1765 + 1766 + resp = env.client.post( 1767 + f"/app/observer/ingest/{key}/transfer", 1768 + data={ 1769 + "day": "20250103", 1770 + "segment": "120000_300", 1771 + "stream": "INVALID!", 1772 + "files": (io.BytesIO(b"content"), "audio.flac"), 1773 + }, 1774 + ) 1775 + assert resp.status_code == 400 1776 + assert resp.get_json()["error"] == "Invalid stream format" 1777 + 1778 + 1779 + def test_transfer_duplicate_detection(observer_env): 1780 + """Test transfer duplicate detection.""" 1781 + env = observer_env() 1782 + 1783 + resp = env.client.post( 1784 + "/app/observer/api/create", 1785 + json={"name": "transfer-duplicate-test"}, 1786 + content_type="application/json", 1787 + ) 1788 + key = resp.get_json()["key"] 1789 + 1790 + test_data = b"duplicate transfer content" 1791 + resp = env.client.post( 1792 + f"/app/observer/ingest/{key}/transfer", 1793 + data={ 1794 + "day": "20250103", 1795 + "segment": "120000_300", 1796 + "stream": "remote.host", 1797 + "files": (io.BytesIO(test_data), "audio.flac"), 1798 + }, 1799 + ) 1800 + assert resp.status_code == 200 1801 + assert resp.get_json()["status"] == "ok" 1802 + 1803 + resp = env.client.post( 1804 + f"/app/observer/ingest/{key}/transfer", 1805 + data={ 1806 + "day": "20250103", 1807 + "segment": "120000_300", 1808 + "stream": "remote.host", 1809 + "files": (io.BytesIO(test_data), "audio.flac"), 1810 + }, 1811 + ) 1812 + assert resp.status_code == 200 1813 + data = resp.get_json() 1814 + assert data["status"] == "duplicate" 1815 + assert data["existing_segment"] == "120000_300" 1816 + 1817 + 1818 + def test_transfer_deconfliction(observer_env): 1819 + """Test transfer deconflicts existing segment directories.""" 1820 + env = observer_env() 1821 + 1822 + resp = env.client.post( 1823 + "/app/observer/api/create", 1824 + json={"name": "transfer-collision-test"}, 1825 + content_type="application/json", 1826 + ) 1827 + key = resp.get_json()["key"] 1828 + 1829 + stream_dir = env.journal / "20250103" / "remote.host" 1830 + stream_dir.mkdir(parents=True) 1831 + (stream_dir / "120000_300").mkdir() 1832 + 1833 + resp = env.client.post( 1834 + f"/app/observer/ingest/{key}/transfer", 1835 + data={ 1836 + "day": "20250103", 1837 + "segment": "120000_300", 1838 + "stream": "remote.host", 1839 + "files": (io.BytesIO(b"collision content"), "audio.flac"), 1840 + }, 1841 + ) 1842 + assert resp.status_code == 200 1843 + data = resp.get_json() 1844 + assert data["status"] == "collision" 1845 + assert data["segment"] != "120000_300" 1846 + assert (stream_dir / data["segment"] / "audio.flac").exists() 1847 + 1848 + 1849 + def test_transfer_emits_transferred_event(observer_env, monkeypatch): 1850 + """Test transfer emits observe.transferred.""" 1851 + env = observer_env() 1852 + 1853 + import apps.observer.routes as routes_module 1854 + 1855 + calls = [] 1856 + 1857 + def mock_emit(*args, **kwargs): 1858 + calls.append((args, kwargs)) 1859 + 1860 + monkeypatch.setattr(routes_module, "emit", mock_emit) 1861 + 1862 + resp = env.client.post( 1863 + "/app/observer/api/create", 1864 + json={"name": "transfer-event-test"}, 1865 + content_type="application/json", 1866 + ) 1867 + key = resp.get_json()["key"] 1868 + 1869 + resp = env.client.post( 1870 + f"/app/observer/ingest/{key}/transfer", 1871 + data={ 1872 + "day": "20250103", 1873 + "segment": "120000_300", 1874 + "stream": "remote.host", 1875 + "files": (io.BytesIO(b"event content"), "audio.flac"), 1876 + }, 1877 + ) 1878 + assert resp.status_code == 200 1879 + assert len(calls) == 1 1880 + assert calls[0][0] == ("observe", "transferred") 1881 + 1882 + 1883 + def test_transfer_does_not_emit_observing(observer_env, monkeypatch): 1884 + """Test transfer does not emit observe.observing.""" 1885 + env = observer_env() 1886 + 1887 + import apps.observer.routes as routes_module 1888 + 1889 + calls = [] 1890 + 1891 + def mock_emit(*args, **kwargs): 1892 + calls.append((args, kwargs)) 1893 + 1894 + monkeypatch.setattr(routes_module, "emit", mock_emit) 1895 + 1896 + resp = env.client.post( 1897 + "/app/observer/api/create", 1898 + json={"name": "transfer-no-observing-test"}, 1899 + content_type="application/json", 1900 + ) 1901 + key = resp.get_json()["key"] 1902 + 1903 + resp = env.client.post( 1904 + f"/app/observer/ingest/{key}/transfer", 1905 + data={ 1906 + "day": "20250103", 1907 + "segment": "120000_300", 1908 + "stream": "remote.host", 1909 + "files": (io.BytesIO(b"event content"), "audio.flac"), 1910 + }, 1911 + ) 1912 + assert resp.status_code == 200 1913 + assert all(args[1] != "observing" for args, _kwargs in calls) 1914 + 1915 + 1916 + def test_transfer_history_record(observer_env): 1917 + """Test transfer upload history records source='transfer'.""" 1918 + from apps.observer.utils import load_history 1919 + 1920 + env = observer_env() 1921 + 1922 + resp = env.client.post( 1923 + "/app/observer/api/create", 1924 + json={"name": "transfer-history-test"}, 1925 + content_type="application/json", 1926 + ) 1927 + data = resp.get_json() 1928 + key = data["key"] 1929 + key_prefix = data["key_prefix"] 1930 + 1931 + resp = env.client.post( 1932 + f"/app/observer/ingest/{key}/transfer", 1933 + data={ 1934 + "day": "20250103", 1935 + "segment": "120000_300", 1936 + "stream": "remote.host", 1937 + "files": (io.BytesIO(b"history content"), "audio.flac"), 1938 + }, 1939 + ) 1940 + assert resp.status_code == 200 1941 + 1942 + records = load_history(key_prefix, "20250103") 1943 + upload_record = next(record for record in records if not record.get("type")) 1944 + assert upload_record["source"] == "transfer" 1945 + 1946 + 1947 + def test_transfer_auth_required(observer_env): 1948 + """Test transfer rejects invalid path key without auth header.""" 1949 + env = observer_env() 1950 + 1951 + resp = env.client.post( 1952 + "/app/observer/ingest/badkey/transfer", 1953 + data={ 1954 + "day": "20250103", 1955 + "segment": "120000_300", 1956 + "stream": "remote.host", 1957 + "files": (io.BytesIO(b"content"), "audio.flac"), 1958 + }, 1959 + ) 1960 + assert resp.status_code == 401 1961 + 1962 + 1963 + def test_transfer_invalid_key(observer_env): 1964 + """Test transfer rejects invalid key.""" 1965 + env = observer_env() 1966 + 1967 + resp = env.client.post( 1968 + "/app/observer/ingest/not-a-real-key/transfer", 1969 + data={ 1970 + "day": "20250103", 1971 + "segment": "120000_300", 1972 + "stream": "remote.host", 1973 + "files": (io.BytesIO(b"content"), "audio.flac"), 1974 + }, 1975 + ) 1976 + assert resp.status_code == 401 1977 + 1978 + 1979 + def test_manifest_day_listing(observer_env): 1980 + """Test manifest day listing from observer history.""" 1981 + env = observer_env() 1982 + 1983 + resp = env.client.post( 1984 + "/app/observer/api/create", 1985 + json={"name": "manifest-list-test"}, 1986 + content_type="application/json", 1987 + ) 1988 + key = resp.get_json()["key"] 1989 + 1990 + resp = env.client.post( 1991 + "/app/observer/ingest", 1992 + headers={"Authorization": f"Bearer {key}"}, 1993 + data={ 1994 + "day": "20250103", 1995 + "segment": "120000_300", 1996 + "files": (io.BytesIO(b"manifest content"), "audio.flac"), 1997 + }, 1998 + ) 1999 + assert resp.status_code == 200 2000 + 2001 + resp = env.client.get(f"/app/observer/ingest/{key}/manifest") 2002 + assert resp.status_code == 200 2003 + assert resp.get_json() == {"days": {"20250103": {"segments": 1}}} 2004 + 2005 + 2006 + def test_manifest_per_day(observer_env): 2007 + """Test per-day manifest format matches transfer manifest v1.""" 2008 + env = observer_env() 2009 + 2010 + resp = env.client.post( 2011 + "/app/observer/api/create", 2012 + json={"name": "manifest-day-test"}, 2013 + content_type="application/json", 2014 + ) 2015 + key = resp.get_json()["key"] 2016 + 2017 + resp = env.client.post( 2018 + f"/app/observer/ingest/{key}/transfer", 2019 + data={ 2020 + "day": "20250103", 2021 + "segment": "120000_300", 2022 + "stream": "remote.host", 2023 + "files": [ 2024 + (io.BytesIO(b"audio bytes"), "audio.flac"), 2025 + (io.BytesIO(b"screen bytes"), "screen.webm"), 2026 + ], 2027 + }, 2028 + ) 2029 + assert resp.status_code == 200 2030 + 2031 + resp = env.client.get(f"/app/observer/ingest/{key}/manifest/20250103") 2032 + assert resp.status_code == 200 2033 + data = resp.get_json() 2034 + 2035 + assert data["version"] == 1 2036 + assert data["day"] == "20250103" 2037 + assert isinstance(data["created_at"], int) 2038 + assert "host" in data 2039 + assert "remote.host/120000_300" in data["segments"] 2040 + 2041 + files = data["segments"]["remote.host/120000_300"]["files"] 2042 + assert len(files) == 2 2043 + for file_info in files: 2044 + assert set(file_info) == {"name", "sha256", "size"} 2045 + assert len(file_info["sha256"]) == 64 2046 + 2047 + 2048 + def test_manifest_auth_required(observer_env): 2049 + """Test manifest endpoint rejects invalid key.""" 2050 + env = observer_env() 2051 + 2052 + resp = env.client.get("/app/observer/ingest/badkey/manifest") 2053 + assert resp.status_code == 401