···5757 logger.debug(
5858 f"Recorded observed status for observer {observer_name}: {day}/{segment}"
5959 )
6060+6161+6262+@on_event("observe", "transferred")
6363+def handle_transferred(ctx: EventContext) -> None:
6464+ """Handle observe.transferred events for transfer-originated segments.
6565+6666+ When a transferred segment is received, append a 'transferred' record
6767+ to the observer's sync history, increment stats, and queue an indexer
6868+ rescan to pick up the new content.
6969+ """
7070+ observer_name = ctx.msg.get("observer")
7171+ if not observer_name:
7272+ return
7373+7474+ segment = ctx.msg.get("segment")
7575+ day = ctx.msg.get("day")
7676+ if not segment or not day:
7777+ logger.warning(
7878+ f"observe.transferred missing segment/day for observer {observer_name}"
7979+ )
8080+ return
8181+8282+ observer = find_observer_by_name(observer_name)
8383+ if not observer:
8484+ logger.debug(f"Observer not found for transferred event: {observer_name}")
8585+ return
8686+8787+ key_prefix = observer.get("key", "")[:8]
8888+ if not key_prefix:
8989+ return
9090+9191+ record = {
9292+ "ts": now_ms(),
9393+ "type": "transferred",
9494+ "segment": segment,
9595+ }
9696+ append_history_record(key_prefix, day, record)
9797+9898+ increment_stat(key_prefix, "segments_transferred")
9999+100100+ # Queue indexer rescan to pick up transferred content
101101+ from think.callosum import callosum_send
102102+103103+ callosum_send("supervisor", "request", cmd=["sol", "indexer", "--rescan"])
104104+105105+ logger.debug(
106106+ f"Recorded transferred status for observer {observer_name}: {day}/{segment}"
107107+ )
+329-131
apps/observer/routes.py
···66Provides endpoints for:
77- Managing observer registrations (UI)
88- Receiving file uploads from observers (ingest)
99+- Receiving transferred segments from other instances (transfer ingest)
1010+- Serving segment manifests for transfer diffing
911- Relaying events from observers to local Callosum
1012- Retrieving segment upload history for sync verification
1113"""
···1517import base64
1618import json
1719import logging
2020+import platform
1821import re
1922import secrets
2023from pathlib import Path
···2831from observe.utils import (
2932 MAX_SEGMENT_ATTEMPTS,
3033 compute_bytes_sha256,
3434+ compute_file_sha256,
3135 find_available_segment,
3236)
3337from think.streams import stream_name, update_stream, write_segment_stream
3434-from think.utils import day_path, now_ms, segment_path
3838+from think.utils import day_path, iter_segments, now_ms, segment_path
35393640from .utils import (
3741 append_history_record,
3842 find_segment_by_sha256,
4343+ get_hist_dir,
3944 get_observers_dir,
4045 list_observers,
4146 load_history,
···301306# === Ingest API (key-protected) ===
302307303308304304-@observer_bp.route("/ingest", methods=["POST"])
305305-@observer_bp.route("/ingest/<key>", methods=["POST"])
306306-def ingest_upload(key: str | None = None) -> Any:
307307- """Receive file uploads from observer.
309309+def _process_ingest_files(
310310+ observer: dict,
311311+ key_prefix: str,
312312+ segment: str,
313313+ day: str,
314314+ stream: str,
315315+ uploaded_files,
316316+ *,
317317+ source: str | None = None,
318318+) -> tuple[dict, int]:
319319+ """Shared ingest pipeline: read/hash files, dedup, deconflict, save, record history, update stats.
308320309309- Expects multipart form with:
310310- - segment: Segment key (HHMMSS_LEN)
311311- - day: Day string (YYYYMMDD)
312312- - files: One or more media files
313313- - host: (optional) Hostname of observer
314314- - platform: (optional) Platform of observer
315315- - meta: (optional) JSON-encoded metadata dict (facet, setting, etc.)
316316-317317- Writes files to journal and emits observe.observing event.
318318- Host/platform are merged into meta (meta values take precedence).
321321+ Parameters
322322+ ----------
323323+ observer : dict
324324+ Observer metadata dict (must include 'stats', 'name', 'last_seen', etc.)
325325+ key_prefix : str
326326+ First 8 chars of observer key.
327327+ segment : str
328328+ Requested segment key (HHMMSS_LEN format).
329329+ day : str
330330+ Day string (YYYYMMDD format).
331331+ stream : str
332332+ Stream name (already resolved by caller).
333333+ uploaded_files : list
334334+ List of Flask FileStorage objects from request.files.getlist("files").
335335+ source : str or None
336336+ If provided, added as "source" field to history record (e.g., "transfer").
319337320320- Returns status:
321321- - "ok": New segment accepted
322322- - "duplicate": All files already received (no processing triggered)
323323- - "collision": New segment saved with adjusted key (directory conflict)
338338+ Returns
339339+ -------
340340+ tuple of (dict, int)
341341+ Response body dict and HTTP status code.
324342 """
325325- # Extract key from Bearer header (primary) or URL path (legacy)
326326- auth_key = _get_key(key)
327327- if not auth_key:
328328- return jsonify({"error": "Authorization required"}), 401
329329-330330- # Validate key
331331- observer = load_observer(auth_key)
332332- if not observer:
333333- return jsonify({"error": "Invalid key"}), 401
334334-335335- if observer.get("revoked", False):
336336- return jsonify({"error": "Observer revoked"}), 403
337337-338338- if not observer.get("enabled", True):
339339- return jsonify({"error": "Observer disabled"}), 403
340340-341341- # Get segment, day, and host info from form
342342- segment = request.form.get("segment", "").strip()
343343- day = request.form.get("day", "").strip()
344344- host = request.form.get("host", "").strip()
345345- platform = request.form.get("platform", "").strip()
346346- meta_str = request.form.get("meta", "").strip()
347347-348348- # Parse meta JSON and merge host/platform (meta values take precedence)
349349- meta: dict = {}
350350- if meta_str:
351351- try:
352352- meta = json.loads(meta_str)
353353- except json.JSONDecodeError:
354354- logger.warning(f"Invalid meta JSON from observer: {meta_str[:100]}")
355355- if host and "host" not in meta:
356356- meta["host"] = host
357357- if platform and "platform" not in meta:
358358- meta["platform"] = platform
359359-360360- # Warn if client hostname differs from registered observer name
361361- effective_host = meta.get("host", host)
362362- observer_name = observer.get("name", "")
363363- if effective_host and effective_host != observer_name:
364364- logger.warning(
365365- f"Observer '{observer_name}' ({auth_key[:8]}) connecting from host "
366366- f"'{effective_host}' — hostname differs from registered name. "
367367- f"Use `sol observer rename` to update if the host was renamed."
368368- )
369369-370370- if not segment:
371371- return jsonify({"error": "Missing segment"}), 400
372372- if not day:
373373- return jsonify({"error": "Missing day"}), 400
374374-375375- # Validate segment format (HHMMSS_LEN)
376376- if not re.match(r"^\d{6}_\d+$", segment):
377377- return jsonify({"error": "Invalid segment format"}), 400
378378-379379- # Validate day format (YYYYMMDD)
380380- if not re.match(r"^\d{8}$", day):
381381- return jsonify({"error": "Invalid day format"}), 400
382382-383383- # Get uploaded files
384384- files = request.files.getlist("files")
385385- if not files:
386386- return jsonify({"error": "No files uploaded"}), 400
387387-388388- key_prefix = auth_key[:8]
389389-390343 # Read file contents into memory and compute SHA256 before saving
391344 # This allows duplicate detection without writing to disk
392345 file_data = [] # List of (submitted_filename, simple_filename, content, sha256)
393393- for upload in files:
346346+ for upload in uploaded_files:
394347 if not upload.filename:
395348 continue
396349···411364 file_data.append((submitted_filename, simple_filename, content, sha256))
412365413366 if not file_data:
414414- return jsonify({"error": "No valid files uploaded"}), 400
367367+ return {"error": "No valid files uploaded"}, 400
415368416369 # Check for duplicate submission by SHA256
417370 incoming_sha256s = {fd[3] for fd in file_data}
···420373 )
421374422375 if existing_segment:
423423- # Full duplicate - all files already exist in an existing segment
424376 logger.info(
425377 f"Duplicate segment rejected: {day}/{segment} from {observer.get('name')} "
426378 f"(matches existing {existing_segment})"
427379 )
428380429429- # Update last_seen and increment duplicates_rejected stat
430381 observer["last_seen"] = now_ms()
431382 observer["stats"]["duplicates_rejected"] = (
432383 observer["stats"].get("duplicates_rejected", 0) + 1
433384 )
434385 save_observer(observer)
435386436436- return jsonify(
387387+ return (
437388 {
438389 "status": "duplicate",
439390 "existing_segment": existing_segment,
440391 "message": "All files already received",
441441- }
392392+ },
393393+ 200,
442394 )
443395444444- # Log partial match context if some files already exist
445396 partial_match = bool(matched_sha256s)
446397447398 # Ensure day directory exists
448399 day_dir = day_path(day)
449400 day_dir.mkdir(parents=True, exist_ok=True)
450401451451- # Determine stream name: trust client-provided stream in meta if valid,
452452- # otherwise derive from observer registration name.
453453- # Deriving from observer name via stream_name(observer=...) calls _strip_hostname,
454454- # which strips qualifiers like ".tmux" — so "fedora.tmux" becomes "fedora",
455455- # colliding both observers into one stream.
456456- client_stream = meta.get("stream", "").strip()
457457- observer_name = observer.get("name", "unknown")
458458- if client_stream and re.match(r"^[a-z0-9][a-z0-9._-]*$", client_stream):
459459- stream = client_stream
460460- else:
461461- stream = stream_name(observer=observer_name)
462462-463402 # Find available segment key within the stream directory
464403 stream_dir = day_dir / stream
465404 stream_dir.mkdir(parents=True, exist_ok=True)
···468407 available_segment = find_available_segment(stream_dir, segment)
469408470409 if available_segment is None:
471471- # Exhausted attempts, save to failed directory
472410 logger.error(
473411 f"No available segment slot for {day}/{stream}/{segment} from "
474474- f"{observer_name} after {MAX_SEGMENT_ATTEMPTS} attempts"
412412+ f"{observer.get('name', 'unknown')} after {MAX_SEGMENT_ATTEMPTS} attempts"
475413 )
476414 failed_dir = _save_to_failed(day_dir, file_data, segment)
477415 return (
478478- jsonify(
479479- {
480480- "status": "failed",
481481- "error": f"No available segment slot after {MAX_SEGMENT_ATTEMPTS} attempts",
482482- "failed_path": str(failed_dir.relative_to(day_dir.parent)),
483483- }
484484- ),
416416+ {
417417+ "status": "failed",
418418+ "error": f"No available segment slot after {MAX_SEGMENT_ATTEMPTS} attempts",
419419+ "failed_path": str(failed_dir.relative_to(day_dir.parent)),
420420+ },
485421 507,
486486- ) # Insufficient Storage
422422+ )
487423488424 segment = available_segment
489425 if segment != original_segment:
490426 logger.info(
491427 f"Segment collision resolved: {original_segment} -> {segment} "
492492- f"for observer {observer_name}"
428428+ f"for observer {observer.get('name', 'unknown')}"
493429 )
494430495431 # Create segment directory for files (under stream)
···526462 logger.info(f"Saved {simple_filename} to {segment_dir}")
527463 except OSError as e:
528464 logger.error(f"Failed to save {simple_filename}: {e}")
529529- return jsonify({"error": f"Failed to save {simple_filename}"}), 500
465465+ return {"error": f"Failed to save {simple_filename}"}, 500
530466531467 if not saved_files:
532532- return jsonify({"error": "No valid files saved"}), 400
468468+ return {"error": "No valid files saved"}, 400
533469534534- # Write sync history record
535470 sync_record = {
536471 "ts": now_ms(),
537472 "segment": segment,
···541476 if segment != original_segment:
542477 sync_record["segment_original"] = original_segment
543478 if partial_match:
544544- # Log which SHA256s matched existing files (for debugging/audit)
545479 sync_record["partial_match_sha256s"] = list(matched_sha256s)
480480+ if source:
481481+ sync_record["source"] = source
546482 append_history_record(key_prefix, day, sync_record)
547483548548- # Update observer stats
549484 observer["last_seen"] = now_ms()
550485 observer["last_segment"] = segment
551486 observer["stats"]["segments_received"] = (
···556491 )
557492 save_observer(observer)
558493494494+ status = "collision" if segment != original_segment else "ok"
495495+ return {
496496+ "status": status,
497497+ "segment": segment,
498498+ "files": saved_files,
499499+ "bytes": total_bytes,
500500+ }, 200
501501+502502+503503+@observer_bp.route("/ingest", methods=["POST"])
504504+@observer_bp.route("/ingest/<key>", methods=["POST"])
505505+def ingest_upload(key: str | None = None) -> Any:
506506+ """Receive file uploads from observer.
507507+508508+ Expects multipart form with:
509509+ - segment: Segment key (HHMMSS_LEN)
510510+ - day: Day string (YYYYMMDD)
511511+ - files: One or more media files
512512+ - host: (optional) Hostname of observer
513513+ - platform: (optional) Platform of observer
514514+ - meta: (optional) JSON-encoded metadata dict (facet, setting, etc.)
515515+516516+ Writes files to journal and emits observe.observing event.
517517+ Host/platform are merged into meta (meta values take precedence).
518518+519519+ Returns status:
520520+ - "ok": New segment accepted
521521+ - "duplicate": All files already received (no processing triggered)
522522+ - "collision": New segment saved with adjusted key (directory conflict)
523523+ """
524524+ # Extract key from Bearer header (primary) or URL path (legacy)
525525+ auth_key = _get_key(key)
526526+ if not auth_key:
527527+ return jsonify({"error": "Authorization required"}), 401
528528+529529+ # Validate key
530530+ observer = load_observer(auth_key)
531531+ if not observer:
532532+ return jsonify({"error": "Invalid key"}), 401
533533+534534+ if observer.get("revoked", False):
535535+ return jsonify({"error": "Observer revoked"}), 403
536536+537537+ if not observer.get("enabled", True):
538538+ return jsonify({"error": "Observer disabled"}), 403
539539+540540+ # Get segment, day, and host info from form
541541+ segment = request.form.get("segment", "").strip()
542542+ day = request.form.get("day", "").strip()
543543+ host = request.form.get("host", "").strip()
544544+ platform = request.form.get("platform", "").strip()
545545+ meta_str = request.form.get("meta", "").strip()
546546+547547+ # Parse meta JSON and merge host/platform (meta values take precedence)
548548+ meta: dict = {}
549549+ if meta_str:
550550+ try:
551551+ meta = json.loads(meta_str)
552552+ except json.JSONDecodeError:
553553+ logger.warning(f"Invalid meta JSON from observer: {meta_str[:100]}")
554554+ if host and "host" not in meta:
555555+ meta["host"] = host
556556+ if platform and "platform" not in meta:
557557+ meta["platform"] = platform
558558+559559+ # Warn if client hostname differs from registered observer name
560560+ effective_host = meta.get("host", host)
561561+ observer_name = observer.get("name", "")
562562+ if effective_host and effective_host != observer_name:
563563+ logger.warning(
564564+ f"Observer '{observer_name}' ({auth_key[:8]}) connecting from host "
565565+ f"'{effective_host}' — hostname differs from registered name. "
566566+ f"Use `sol observer rename` to update if the host was renamed."
567567+ )
568568+569569+ if not segment:
570570+ return jsonify({"error": "Missing segment"}), 400
571571+ if not day:
572572+ return jsonify({"error": "Missing day"}), 400
573573+574574+ # Validate segment format (HHMMSS_LEN)
575575+ if not re.match(r"^\d{6}_\d+$", segment):
576576+ return jsonify({"error": "Invalid segment format"}), 400
577577+578578+ # Validate day format (YYYYMMDD)
579579+ if not re.match(r"^\d{8}$", day):
580580+ return jsonify({"error": "Invalid day format"}), 400
581581+582582+ # Get uploaded files
583583+ files = request.files.getlist("files")
584584+ if not files:
585585+ return jsonify({"error": "No files uploaded"}), 400
586586+587587+ key_prefix = auth_key[:8]
588588+589589+ # Determine stream name: trust client-provided stream in meta if valid,
590590+ # otherwise derive from observer registration name.
591591+ # Deriving from observer name via stream_name(observer=...) calls _strip_hostname,
592592+ # which strips qualifiers like ".tmux" — so "fedora.tmux" becomes "fedora",
593593+ # colliding both observers into one stream.
594594+ client_stream = meta.get("stream", "").strip()
595595+ observer_name = observer.get("name", "unknown")
596596+ if client_stream and re.match(r"^[a-z0-9][a-z0-9._-]*$", client_stream):
597597+ stream = client_stream
598598+ else:
599599+ stream = stream_name(observer=observer_name)
600600+601601+ body, status = _process_ingest_files(
602602+ observer, key_prefix, segment, day, stream, files
603603+ )
604604+ if status != 200 or body.get("status") == "duplicate":
605605+ return jsonify(body), status
606606+607607+ segment = body["segment"]
608608+ saved_files = body["files"]
609609+ segment_dir = segment_path(day, segment, stream)
610610+559611 # Write stream identity for this segment
560612 try:
561613 result = update_stream(stream, day, segment, type="observer")
···588640 logger.info(
589641 f"Received {len(saved_files)} files for {day}/{segment} from {observer.get('name')}"
590642 )
643643+ return jsonify(body), status
591644592592- # Determine response status
593593- if segment != original_segment:
594594- status = "collision"
595595- else:
596596- status = "ok"
645645+646646+@observer_bp.route("/ingest/<key>/transfer", methods=["POST"])
647647+def ingest_transfer(key: str) -> Any:
648648+ """Receive transferred file uploads from another solstone instance."""
649649+ auth_key = _get_key(key)
650650+ if not auth_key:
651651+ return jsonify({"error": "Authorization required"}), 401
652652+653653+ observer = load_observer(auth_key)
654654+ if not observer:
655655+ return jsonify({"error": "Invalid key"}), 401
656656+657657+ if observer.get("revoked", False):
658658+ return jsonify({"error": "Observer revoked"}), 403
659659+660660+ if not observer.get("enabled", True):
661661+ return jsonify({"error": "Observer disabled"}), 403
662662+663663+ segment = request.form.get("segment", "").strip()
664664+ day = request.form.get("day", "").strip()
665665+ stream = request.form.get("stream", "").strip()
666666+ host = request.form.get("host", "").strip()
667667+ platform_name = request.form.get("platform", "").strip()
668668+ meta_str = request.form.get("meta", "").strip()
669669+670670+ meta: dict = {}
671671+ if meta_str:
672672+ try:
673673+ meta = json.loads(meta_str)
674674+ except json.JSONDecodeError:
675675+ logger.warning(f"Invalid meta JSON from observer: {meta_str[:100]}")
676676+ if host and "host" not in meta:
677677+ meta["host"] = host
678678+ if platform_name and "platform" not in meta:
679679+ meta["platform"] = platform_name
597680598598- return jsonify(
599599- {
600600- "status": status,
601601- "segment": segment,
602602- "files": saved_files,
603603- "bytes": total_bytes,
604604- }
681681+ if not segment:
682682+ return jsonify({"error": "Missing segment"}), 400
683683+ if not day:
684684+ return jsonify({"error": "Missing day"}), 400
685685+ if not stream:
686686+ return jsonify({"error": "Missing stream"}), 400
687687+ if not re.match(r"^\d{6}_\d+$", segment):
688688+ return jsonify({"error": "Invalid segment format"}), 400
689689+ if not re.match(r"^\d{8}$", day):
690690+ return jsonify({"error": "Invalid day format"}), 400
691691+ if not re.match(r"^[a-z0-9][a-z0-9._-]*$", stream):
692692+ return jsonify({"error": "Invalid stream format"}), 400
693693+694694+ files = request.files.getlist("files")
695695+ if not files:
696696+ return jsonify({"error": "No files uploaded"}), 400
697697+698698+ key_prefix = auth_key[:8]
699699+ body, status = _process_ingest_files(
700700+ observer,
701701+ key_prefix,
702702+ segment,
703703+ day,
704704+ stream,
705705+ files,
706706+ source="transfer",
605707 )
708708+ if status != 200 or body.get("status") == "duplicate":
709709+ return jsonify(body), status
710710+711711+ observer_name = observer.get("name", "")
712712+ event_fields: dict[str, Any] = {
713713+ "segment": body["segment"],
714714+ "day": day,
715715+ "files": body["files"],
716716+ "observer": observer_name,
717717+ "stream": stream,
718718+ }
719719+ if meta:
720720+ event_fields["meta"] = meta
721721+ emit("observe", "transferred", **event_fields)
722722+723723+ return jsonify(body), status
724724+725725+726726+@observer_bp.route("/ingest/<key>/manifest", methods=["GET"])
727727+def ingest_manifest(key: str) -> Any:
728728+ """List available manifest days for an observer."""
729729+ auth_key = _get_key(key)
730730+ if not auth_key:
731731+ return jsonify({"error": "Authorization required"}), 401
732732+733733+ observer = load_observer(auth_key)
734734+ if not observer:
735735+ return jsonify({"error": "Invalid key"}), 401
736736+737737+ if observer.get("revoked", False):
738738+ return jsonify({"error": "Observer revoked"}), 403
739739+740740+ if not observer.get("enabled", True):
741741+ return jsonify({"error": "Observer disabled"}), 403
742742+743743+ key_prefix = auth_key[:8]
744744+ hist_dir = get_hist_dir(key_prefix, ensure_exists=False)
745745+ if not hist_dir.exists():
746746+ return jsonify({"days": {}})
747747+748748+ days: dict[str, dict[str, int]] = {}
749749+ for hist_path in sorted(hist_dir.glob("*.jsonl")):
750750+ records = load_history(key_prefix, hist_path.stem)
751751+ segments = {
752752+ record.get("segment", "")
753753+ for record in records
754754+ if not record.get("type") and record.get("segment")
755755+ }
756756+ days[hist_path.stem] = {"segments": len(segments)}
757757+758758+ return jsonify({"days": days})
759759+760760+761761+@observer_bp.route("/ingest/<key>/manifest/<day>", methods=["GET"])
762762+def ingest_manifest_day(key: str, day: str) -> Any:
763763+ """Return a transfer manifest for all segments on a given day."""
764764+ auth_key = _get_key(key)
765765+ if not auth_key:
766766+ return jsonify({"error": "Authorization required"}), 401
767767+768768+ observer = load_observer(auth_key)
769769+ if not observer:
770770+ return jsonify({"error": "Invalid key"}), 401
771771+772772+ if observer.get("revoked", False):
773773+ return jsonify({"error": "Observer revoked"}), 403
774774+775775+ if not observer.get("enabled", True):
776776+ return jsonify({"error": "Observer disabled"}), 403
777777+778778+ if not re.match(r"^\d{8}$", day):
779779+ return jsonify({"error": "Invalid day format"}), 400
780780+781781+ manifest = {
782782+ "version": 1,
783783+ "day": day,
784784+ "created_at": now_ms(),
785785+ "host": platform.node() or "unknown",
786786+ "segments": {},
787787+ }
788788+789789+ for stream, seg_key, seg_path in iter_segments(day):
790790+ arc_key = f"{stream}/{seg_key}"
791791+ files = []
792792+ for file_path in sorted(seg_path.iterdir()):
793793+ if file_path.is_file():
794794+ files.append(
795795+ {
796796+ "name": file_path.name,
797797+ "sha256": compute_file_sha256(file_path),
798798+ "size": file_path.stat().st_size,
799799+ }
800800+ )
801801+ manifest["segments"][arc_key] = {"files": files}
802802+803803+ return jsonify(manifest)
606804607805608806@observer_bp.route("/ingest/event", methods=["POST"])
+54-3
apps/observer/tests/test_events.py
···1010import pytest
11111212from apps.events import EventContext
1313-from apps.observer.events import handle_observed
1313+from apps.observer.events import handle_observed, handle_transferred
141415151616@pytest.fixture
···7575 handle_observed(ctx)
76767777 # Check history was written
7878- hist_path = observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl"
7878+ hist_path = (
7979+ observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl"
8080+ )
7981 assert hist_path.exists()
80828183 with open(hist_path) as f:
···108110 handle_observed(ctx)
109111110112 # Check all records written
111111- hist_path = observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl"
113113+ hist_path = (
114114+ observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl"
115115+ )
112116 with open(hist_path) as f:
113117 lines = f.readlines()
114118···204208 # No history should be created
205209 hist_dir = observer_journal.observers_dir / "testkey1" / "hist"
206210 assert not hist_dir.exists()
211211+212212+ def test_handle_transferred(self, observer_journal, monkeypatch):
213213+ """Handler records transferred status, stats, and queues rescan."""
214214+ import think.callosum as callosum_module
215215+216216+ calls = []
217217+ monkeypatch.setattr(
218218+ callosum_module,
219219+ "callosum_send",
220220+ lambda *a, **kw: calls.append((a, kw)) or True,
221221+ )
222222+223223+ ctx = EventContext(
224224+ msg={
225225+ "tract": "observe",
226226+ "event": "transferred",
227227+ "observer": "test-observer",
228228+ "segment": "120000_300",
229229+ "day": "20250103",
230230+ },
231231+ app="observer",
232232+ tract="observe",
233233+ event="transferred",
234234+ )
235235+236236+ handle_transferred(ctx)
237237+238238+ hist_path = (
239239+ observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl"
240240+ )
241241+ assert hist_path.exists()
242242+ with open(hist_path) as f:
243243+ record = json.loads(f.readline())
244244+245245+ assert record["type"] == "transferred"
246246+ assert record["segment"] == "120000_300"
247247+248248+ with open(observer_journal.observer_path) as f:
249249+ data = json.load(f)
250250+ assert data["stats"]["segments_transferred"] == 1
251251+252252+ assert calls == [
253253+ (
254254+ ("supervisor", "request"),
255255+ {"cmd": ["sol", "indexer", "--rescan"]},
256256+ )
257257+ ]