personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Propagate remote observer context through processing pipeline

When segments are uploaded from remote observers, the remote name is now:
- Tracked in sense.py segment state
- Passed to handlers via REMOTE_NAME env var
- Included in JSONL metadata headers (transcribe/describe output)
- Included in observe.observed events for downstream consumers

Also consolidates HOST/PLATFORM constants in observe/remote.py (used by
all platform observers), adds batch flag to observed events, and passes
SEGMENT_KEY via env instead of handlers deriving it redundantly.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+223 -61
+16 -10
apps/remote/routes.py
··· 24 24 from werkzeug.utils import secure_filename 25 25 26 26 from apps.utils import get_app_storage_path 27 - from convey import emit, state 27 + from convey import emit 28 28 from think.utils import day_path 29 29 30 30 logger = logging.getLogger(__name__) ··· 220 220 if not remote.get("enabled", True): 221 221 return jsonify({"error": "Remote disabled"}), 403 222 222 223 - # Get segment and day from form 223 + # Get segment, day, and host info from form 224 224 segment = request.form.get("segment", "").strip() 225 225 day = request.form.get("day", "").strip() 226 + host = request.form.get("host", "").strip() 227 + platform = request.form.get("platform", "").strip() 226 228 227 229 if not segment: 228 230 return jsonify({"error": "Missing segment"}), 400 ··· 286 288 _save_remote(remote) 287 289 288 290 # Emit observe.observing event to local Callosum 289 - emit( 290 - "observe", 291 - "observing", 292 - segment=segment, 293 - day=day, 294 - files=saved_files, 295 - remote=remote.get("name", "unknown"), 296 - ) 291 + # Include host/platform from remote observer if provided 292 + event_fields = { 293 + "segment": segment, 294 + "day": day, 295 + "files": saved_files, 296 + "remote": remote.get("name", "unknown"), 297 + } 298 + if host: 299 + event_fields["host"] = host 300 + if platform: 301 + event_fields["platform"] = platform 302 + emit("observe", "observing", **event_fields) 297 303 298 304 logger.info( 299 305 f"Received {len(saved_files)} files for {day}/{segment} from {remote.get('name')}"
+6 -1
apps/remote/tests/test_client.py
··· 89 89 # Check the call arguments 90 90 call_args = mock_session.post.call_args 91 91 assert call_args[0][0] == "https://server/ingest/key" 92 - assert call_args[1]["data"] == {"day": "20250103", "segment": "120000_300"} 92 + # Verify required fields (host/platform are also sent but vary by machine) 93 + data = call_args[1]["data"] 94 + assert data["day"] == "20250103" 95 + assert data["segment"] == "120000_300" 96 + assert "host" in data 97 + assert "platform" in data 93 98 94 99 95 100 def test_upload_segment_retry_on_failure(mock_session, tmp_path):
+3 -1
docs/CALLOSUM.md
··· 61 61 - `screencast.files_growing` - Whether recording files are actively being written 62 62 - From `sense.py`: `describe`, `transcribe` - Processing pipeline state (with `running`/`queued` sub-fields) 63 63 - `observing`: `day`, `segment`, `files` - Recording window boundary crossed with saved files 64 + - Remote events include `remote` (remote name) from `apps/remote/routes.py` 64 65 - `detected`: `file`, `handler`, `ref` - File detected and handler spawned 65 66 - `described`/`transcribed`: `input`, `output`, `duration_ms` - Processing complete 66 - - `observed`: `segment`, `duration` - All files for segment fully processed 67 + - `observed`: `day`, `segment`, `duration` - All files for segment fully processed 68 + - Batch mode (--day) events include `batch=true` to indicate non-live origin 67 69 - Observer events (`status`, `observing`) include `host` (hostname) and `platform` ("linux"/"darwin") for multi-host support 68 70 **Purpose:** Track observation pipeline from live capture state through processing completion 69 71 **Health Model:** Fail-fast - observers exit if capture stalls (e.g., files not growing). Supervisor checks event freshness only.
+9 -2
observe/describe.py
··· 392 392 393 393 suffix = extract_descriptive_suffix(self.video_path.stem) 394 394 metadata = {"raw": f"{suffix}{self.video_path.suffix}"} 395 + 396 + # Add remote origin if set (from sense.py for remote observer uploads) 397 + remote = os.getenv("REMOTE_NAME") 398 + if remote: 399 + metadata["remote"] = remote 400 + 395 401 output_file.write(json.dumps(metadata) + "\n") 396 402 output_file.flush() 397 403 ··· 760 766 except ValueError as exc: 761 767 parser.error(str(exc)) 762 768 763 - # Set segment key for token usage logging 764 - os.environ["SEGMENT_KEY"] = segment 769 + # Use segment from env (set by sense.py) or use derived value 770 + if not os.getenv("SEGMENT_KEY"): 771 + os.environ["SEGMENT_KEY"] = segment 765 772 766 773 segment_dir = video_path.parent / segment 767 774 segment_dir.mkdir(exist_ok=True)
+7 -11
observe/linux/observer.py
··· 22 22 import logging 23 23 import os 24 24 import signal 25 - import socket 26 25 import sys 27 26 import time 28 27 ··· 38 37 from observe.hear import AudioRecorder 39 38 from observe.linux.audio import is_sink_muted 40 39 from observe.linux.screencast import Screencaster, StreamInfo 41 - from observe.remote import RemoteClient 40 + from observe.remote import HOST, PLATFORM, RemoteClient 42 41 from observe.tmux.capture import TmuxCapture, write_captures_jsonl 43 42 from think.callosum import CallosumConnection 44 43 from think.utils import day_path, setup_cli ··· 52 51 CHUNK_DURATION = 5 # seconds 53 52 STALL_THRESHOLD_CHUNKS = 3 # Exit after this many chunks with no file growth 54 53 55 - # Host identification for multi-host scenarios 56 - _HOST = socket.gethostname() 57 - _PLATFORM = "linux" 58 54 59 55 # Capture modes 60 56 MODE_IDLE = "idle" ··· 380 376 day=date_part, 381 377 segment=segment, 382 378 files=files, 383 - host=_HOST, 384 - platform=_PLATFORM, 379 + host=HOST, 380 + platform=PLATFORM, 385 381 ) 386 382 logger.info(f"Segment observing: {segment} ({len(files)} files)") 387 383 ··· 516 512 tmux=tmux_info, 517 513 audio=audio_info, 518 514 activity=activity_info, 519 - host=_HOST, 520 - platform=_PLATFORM, 515 + host=HOST, 516 + platform=PLATFORM, 521 517 ) 522 518 elif self.callosum: 523 519 self.callosum.emit( ··· 528 524 tmux=tmux_info, 529 525 audio=audio_info, 530 526 activity=activity_info, 531 - host=_HOST, 532 - platform=_PLATFORM, 527 + host=HOST, 528 + platform=PLATFORM, 533 529 ) 534 530 535 531 def finalize_screencast(self, temp_path: str, final_path: str):
+5 -9
observe/macos/observer.py
··· 16 16 import os 17 17 import shutil 18 18 import signal 19 - import socket 20 19 import sys 21 20 import time 22 21 ··· 30 29 is_screen_locked, 31 30 ) 32 31 from observe.macos.screencapture import AudioInfo, DisplayInfo, ScreenCaptureKitManager 32 + from observe.remote import HOST, PLATFORM 33 33 from think.callosum import CallosumConnection 34 34 from think.utils import day_path, setup_cli 35 35 ··· 42 42 MIN_HITS_FOR_SAVE = 3 43 43 SAMPLE_RATE = 48000 # Standard audio sample rate 44 44 STALL_THRESHOLD_CHUNKS = 3 # Exit after this many chunks with no file growth 45 - 46 - # Host identification for multi-host scenarios 47 - _HOST = socket.gethostname() 48 - _PLATFORM = "darwin" 49 45 50 46 51 47 class MacOSObserver: ··· 295 291 day=date_part, 296 292 segment=segment, 297 293 files=saved_files, 298 - host=_HOST, 299 - platform=_PLATFORM, 294 + host=HOST, 295 + platform=PLATFORM, 300 296 ) 301 297 logger.info(f"Segment observing: {segment} ({len(saved_files)} files)") 302 298 ··· 415 411 tmux=tmux_info, 416 412 audio=audio_info, 417 413 activity=activity_info, 418 - host=_HOST, 419 - platform=_PLATFORM, 414 + host=HOST, 415 + platform=PLATFORM, 420 416 ) 421 417 422 418 def finalize_screencast(self, temp_path: str, final_path: str):
+14 -2
observe/remote.py
··· 12 12 from __future__ import annotations 13 13 14 14 import logging 15 + import platform 15 16 import queue 17 + import socket 16 18 import threading 17 19 import time 18 20 from pathlib import Path ··· 20 22 import requests 21 23 22 24 logger = logging.getLogger(__name__) 25 + 26 + # Host identification (captured once at module load) 27 + # Exported for use by platform observers 28 + HOST = socket.gethostname() 29 + PLATFORM = platform.system().lower() # "linux", "darwin", "windows" 23 30 24 31 # Retry configuration 25 32 MAX_RETRIES = 3 ··· 155 162 logger.error("No valid files to upload") 156 163 return False 157 164 158 - # Send request 165 + # Send request with host/platform for event emission 159 166 response = self.session.post( 160 167 self.remote_url, 161 - data={"day": day, "segment": segment}, 168 + data={ 169 + "day": day, 170 + "segment": segment, 171 + "host": HOST, 172 + "platform": PLATFORM, 173 + }, 162 174 files=files_data, 163 175 timeout=UPLOAD_TIMEOUT, 164 176 )
+107 -22
observe/sense.py
··· 75 75 self.segment_files: Dict[str, set[Path]] = {} 76 76 # Track segment start times: {segment_key: start_timestamp} 77 77 self.segment_start_time: Dict[str, float] = {} 78 + # Track segment day: {segment_key: day_string} 79 + self.segment_day: Dict[str, str] = {} 80 + # Track batch origin: {segment_key: True} for segments from batch mode 81 + self.segment_batch: Dict[str, bool] = {} 82 + # Track remote origin: {segment_key: remote_name} for remote observer segments 83 + self.segment_remote: Dict[str, str] = {} 78 84 79 85 def register(self, pattern: str, handler_name: str, command: List[str]): 80 86 """ ··· 110 116 return handler_info 111 117 return None 112 118 113 - def _spawn_handler(self, file_path: Path, handler_name: str, command: List[str]): 114 - """Spawn a handler process for the file.""" 119 + def _spawn_handler( 120 + self, 121 + file_path: Path, 122 + handler_name: str, 123 + command: List[str], 124 + day: Optional[str] = None, 125 + batch: bool = False, 126 + segment: Optional[str] = None, 127 + remote: Optional[str] = None, 128 + ): 129 + """Spawn a handler process for the file. 130 + 131 + Args: 132 + file_path: Path to the file to process 133 + handler_name: Name of the handler (e.g., "describe", "transcribe") 134 + command: Command template with {file} placeholder 135 + day: Day string (YYYYMMDD), extracted from path if not provided 136 + batch: Whether this is from batch processing mode 137 + segment: Segment key for SEGMENT_KEY env var 138 + remote: Remote name for REMOTE_NAME env var 139 + """ 140 + # Extract day from path if not provided (journal_dir/YYYYMMDD/file.ext) 141 + if day is None: 142 + try: 143 + rel_path = file_path.relative_to(self.journal_dir) 144 + if len(rel_path.parts) >= 1: 145 + day = rel_path.parts[0] 146 + except ValueError: 147 + pass 148 + 149 + # Extract segment from filename if not provided 150 + if segment is None: 151 + from think.utils import segment_key as get_segment_key 152 + 153 + segment = get_segment_key(file_path.name) 154 + 115 155 with self.lock: 116 156 # Skip if already processing this file 117 157 if file_path in self.running: 118 158 logger.debug(f"File {file_path.name} already being processed") 119 159 return 120 160 121 - # Register file for segment tracking 122 - from think.utils import segment_key 123 - 124 - segment = segment_key(file_path.name) 161 + # Register file for segment tracking (segment already extracted above) 125 162 if segment: 126 163 if segment not in self.segment_files: 127 164 self.segment_files[segment] = set() 128 165 self.segment_start_time[segment] = time.time() 166 + if day: 167 + self.segment_day[segment] = day 168 + # Track batch origin for observed event 169 + if batch: 170 + self.segment_batch[segment] = True 129 171 self.segment_files[segment].add(file_path) 130 172 131 173 # Queue describe requests to ensure only one runs at a time ··· 170 212 # Use unified runner to spawn process with automatic logging 171 213 logger.info(f"Spawning {handler_name} for {file_path.name}: {' '.join(cmd)}") 172 214 215 + # Build environment with segment/remote context for handlers 216 + env = os.environ.copy() 217 + if segment: 218 + env["SEGMENT_KEY"] = segment 219 + if remote: 220 + env["REMOTE_NAME"] = remote 221 + 173 222 try: 174 - managed = RunnerManagedProcess.spawn(cmd, ref=ref, callosum=self.callosum) 223 + managed = RunnerManagedProcess.spawn( 224 + cmd, ref=ref, callosum=self.callosum, env=env 225 + ) 175 226 except RuntimeError as exc: 176 227 logger.error(str(exc)) 177 228 # Release describe lock if this was a describe handler ··· 258 309 if not self.segment_files[segment]: 259 310 # Calculate processing duration 260 311 duration = int(time.time() - self.segment_start_time[segment]) 312 + day = self.segment_day.get(segment) 313 + batch = self.segment_batch.get(segment, False) 314 + remote = self.segment_remote.get(segment) 261 315 262 316 if self.callosum: 263 - self.callosum.emit( 264 - "observe", 265 - "observed", 266 - segment=segment, 267 - duration=duration, 268 - ) 269 - logger.info(f"Segment fully observed: {segment} ({duration}s)") 317 + event_fields = { 318 + "segment": segment, 319 + "day": day, 320 + "duration": duration, 321 + } 322 + if batch: 323 + event_fields["batch"] = True 324 + if remote: 325 + event_fields["remote"] = remote 326 + self.callosum.emit("observe", "observed", **event_fields) 327 + logger.info( 328 + f"Segment fully observed: {day}/{segment} ({duration}s)" 329 + ) 270 330 271 331 # Cleanup 272 332 del self.segment_files[segment] 273 333 del self.segment_start_time[segment] 334 + if segment in self.segment_day: 335 + del self.segment_day[segment] 336 + if segment in self.segment_batch: 337 + del self.segment_batch[segment] 338 + if segment in self.segment_remote: 339 + del self.segment_remote[segment] 274 340 275 - def _handle_file(self, file_path: Path): 276 - """Route file to appropriate handler.""" 341 + def _handle_file( 342 + self, 343 + file_path: Path, 344 + segment: Optional[str] = None, 345 + remote: Optional[str] = None, 346 + ): 347 + """Route file to appropriate handler. 348 + 349 + Args: 350 + file_path: Path to the file to process 351 + segment: Optional segment key for SEGMENT_KEY env var 352 + remote: Optional remote name for REMOTE_NAME env var 353 + """ 277 354 if not file_path.exists(): 278 355 logger.warning(f"File not found, skipping: {file_path}") 279 356 return ··· 281 358 handler_info = self._match_pattern(file_path) 282 359 if handler_info: 283 360 handler_name, command = handler_info 284 - self._spawn_handler(file_path, handler_name, command) 361 + self._spawn_handler( 362 + file_path, handler_name, command, segment=segment, remote=remote 363 + ) 285 364 286 365 def _handle_callosum_message(self, message: Dict[str, Any]): 287 366 """Handle incoming Callosum messages, filtering for observe.observing events.""" ··· 295 374 day = message.get("day") 296 375 segment = message.get("segment") 297 376 files = message.get("files", []) 377 + remote = message.get("remote") # Optional: set for remote observer uploads 298 378 299 379 if not day or not segment or not files: 300 380 logger.warning( ··· 315 395 if segment not in self.segment_files: 316 396 self.segment_files[segment] = set() 317 397 self.segment_start_time[segment] = time.time() 398 + self.segment_day[segment] = day 399 + if remote: 400 + self.segment_remote[segment] = remote 318 401 for file_path in file_paths: 319 402 # Only track files that will be processed (match a pattern) 320 403 if self._match_pattern(file_path): 321 404 self.segment_files[segment].add(file_path) 322 405 323 - # Process each file 406 + # Process each file (pass segment context for env vars) 324 407 for file_path in file_paths: 325 - self._handle_file(file_path) 408 + self._handle_file(file_path, segment=segment, remote=remote) 326 409 327 410 def _emit_status(self): 328 411 """Emit observe.status event with current processing state (only when active).""" ··· 522 605 semaphore = threading.Semaphore(max_jobs) 523 606 completion_events = {} 524 607 525 - def process_with_limit(file_path, handler_name, command): 608 + def process_with_limit(file_path, handler_name, command, day): 526 609 """Process a single file with semaphore-controlled concurrency.""" 527 610 with semaphore: 528 - self._spawn_handler(file_path, handler_name, command) 611 + self._spawn_handler( 612 + file_path, handler_name, command, day=day, batch=True 613 + ) 529 614 # Wait for this specific file to complete 530 615 while file_path in self.running: 531 616 time.sleep(0.5) ··· 538 623 completion_events[file_path] = threading.Event() 539 624 thread = threading.Thread( 540 625 target=process_with_limit, 541 - args=(file_path, handler_name, command), 626 + args=(file_path, handler_name, command, day), 542 627 daemon=False, 543 628 ) 544 629 thread.start()
+8 -3
observe/transcribe.py
··· 469 469 470 470 metadata["raw"] = f"{suffix}.flac" 471 471 472 + # Add remote origin if set (from sense.py for remote observer uploads) 473 + remote = os.getenv("REMOTE_NAME") 474 + if remote: 475 + metadata["remote"] = remote 476 + 472 477 # Add diarization data if provided 473 478 if diarization_data: 474 479 metadata["diarization"] = { ··· 509 514 """ 510 515 start_time = time.time() 511 516 512 - # Set segment key for token usage logging 513 - segment = get_segment_key(raw_path) 514 - if segment: 517 + # Use segment from env (set by sense.py) or derive from path 518 + segment = os.getenv("SEGMENT_KEY") or get_segment_key(raw_path) 519 + if segment and not os.getenv("SEGMENT_KEY"): 515 520 os.environ["SEGMENT_KEY"] = segment 516 521 517 522 # Skip if already processed (unless redo mode)
+48
tests/test_sense.py
··· 346 346 assert audio_file in sensor.segment_files["143022_300"] 347 347 assert video_file in sensor.segment_files["143022_300"] 348 348 assert "143022_300" in sensor.segment_start_time 349 + assert sensor.segment_day["143022_300"] == "20250101" 349 350 350 351 351 352 def test_file_sensor_handle_callosum_message_ignores_other_events(tmp_path): ··· 383 384 384 385 # Should not call _handle_file 385 386 mock_handle.assert_not_called() 387 + 388 + 389 + def test_file_sensor_segment_observed_includes_day(tmp_path, mock_callosum): 390 + """Test that observe.observed event includes day field.""" 391 + from think.callosum import CallosumConnection 392 + 393 + # Create journal/day structure 394 + day_dir = tmp_path / "20250101" 395 + day_dir.mkdir() 396 + 397 + sensor = FileSensor(tmp_path) 398 + sensor.register("*.flac", "transcribe", ["echo", "{file}"]) 399 + 400 + # Set up callosum on sensor to capture emitted events 401 + emitted_events = [] 402 + sensor.callosum = CallosumConnection() 403 + sensor.callosum.start(callback=lambda msg: emitted_events.append(msg)) 404 + 405 + # Create test file 406 + audio_file = day_dir / "143022_300_audio.flac" 407 + audio_file.write_text("audio content") 408 + 409 + # Simulate observing event to set up segment tracking 410 + message = { 411 + "tract": "observe", 412 + "event": "observing", 413 + "day": "20250101", 414 + "segment": "143022_300", 415 + "files": ["143022_300_audio.flac"], 416 + } 417 + sensor._handle_callosum_message(message) 418 + 419 + # Wait for handler to complete 420 + time.sleep(0.5) 421 + 422 + # Check that segment_day was cleaned up (handler completed) 423 + assert "143022_300" not in sensor.segment_day 424 + 425 + # Check observe.observed event was emitted with day field 426 + observed_events = [ 427 + e 428 + for e in emitted_events 429 + if e.get("tract") == "observe" and e.get("event") == "observed" 430 + ] 431 + assert len(observed_events) == 1 432 + assert observed_events[0].get("day") == "20250101" 433 + assert observed_events[0].get("segment") == "143022_300"