personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Switch observe-sense from watchdog to Callosum events

Replace file-based detection with event-driven architecture. The sensor
now listens for observe.observing Callosum events from observers instead
of using watchdog to monitor directories.

- Add day field to observe.observing event in both observers
- Add _handle_callosum_message callback for processing events
- Pre-register segment tracking from event for robust completion detection
- Remove watchdog dependency and test mocks
- Batch mode (--day) unchanged, still uses file scanning

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+221 -129
+3 -1
docs/CALLOSUM.md
··· 60 60 - From `observer.py`: `screencast`, `audio`, `activity` - Live capture state 61 61 - `screencast.files_growing` - Whether recording files are actively being written (used for health) 62 62 - From `sense.py`: `describe`, `transcribe` - Processing pipeline state (with `running`/`queued` sub-fields) 63 - - `observing`: `segment`, `files` - Recording window boundary crossed with saved files 63 + - `observing`: `day`, `segment`, `files` - Recording window boundary crossed with saved files 64 64 - `detected`: `file`, `handler`, `ref` - File detected and handler spawned 65 65 - `described`/`transcribed`: `input`, `output`, `duration_ms` - Processing complete 66 66 - `observed`: `segment`, `duration` - All files for segment fully processed ··· 99 99 100 100 **Client Library:** `think/callosum.py` `CallosumConnection` class 101 101 **Server:** `think/callosum.py` `CallosumServer` class 102 + 103 + **Convey Integration:** `convey.emit()` for non-blocking event emission from route handlers (uses shared bridge connection). See [APPS.md](APPS.md) for usage. 102 104 103 105 See code documentation for usage patterns and examples.
+80 -28
observe/linux/observer.py
··· 38 38 from observe.hear import AudioRecorder 39 39 from observe.linux.audio import is_sink_muted 40 40 from observe.linux.screencast import Screencaster, StreamInfo 41 + from observe.remote import RemoteClient 41 42 from observe.tmux.capture import TmuxCapture, write_captures_jsonl 42 43 from think.callosum import CallosumConnection 43 44 from think.utils import day_path, setup_cli ··· 63 64 class Observer: 64 65 """Unified audio and screencast/tmux observer.""" 65 66 66 - def __init__(self, interval: int = 300): 67 + def __init__(self, interval: int = 300, remote_url: str | None = None): 67 68 self.interval = interval 69 + self.remote_url = remote_url 68 70 self.audio_recorder = AudioRecorder() 69 71 self.screencaster = Screencaster() 70 72 self.tmux_capture = TmuxCapture() 71 73 self.bus: MessageBus | None = None 72 74 self.running = True 73 75 self.callosum: CallosumConnection | None = None 76 + self.remote_client: RemoteClient | None = None 74 77 75 78 # State tracking 76 79 self.start_at = time.time() # Wall-clock for filenames ··· 131 134 else: 132 135 logger.info("Tmux not available (will only use screencast)") 133 136 134 - # Start Callosum connection for status events 135 - self.callosum = CallosumConnection() 136 - self.callosum.start() 137 - logger.info("Callosum connection started") 137 + # Start Callosum connection for status events (or remote client) 138 + if self.remote_url: 139 + self.remote_client = RemoteClient(self.remote_url) 140 + self.remote_client.start() 141 + logger.info(f"Remote client started: {self.remote_url[:50]}...") 142 + else: 143 + self.callosum = CallosumConnection() 144 + self.callosum.start() 145 + logger.info("Callosum connection started") 138 146 139 147 return True 140 148 ··· 347 355 348 356 if files: 349 357 segment = f"{time_part}_{duration}" 350 - self.callosum.emit( 351 - "observe", 352 - "observing", 353 - segment=segment, 354 - files=files, 355 - host=_HOST, 356 - platform=_PLATFORM, 357 - ) 358 - logger.info(f"Segment observing: {segment} ({len(files)} files)") 358 + 359 + if self.remote_client: 360 + # Remote mode: upload files to remote server 361 + file_paths = [day_dir / f for f in files] 362 + if self.remote_client.upload_and_cleanup( 363 + date_part, segment, file_paths 364 + ): 365 + logger.info(f"Segment uploaded: {segment} ({len(files)} files)") 366 + else: 367 + logger.error( 368 + f"Segment upload failed: {segment} - files kept locally" 369 + ) 370 + elif self.callosum: 371 + # Local mode: emit to local Callosum 372 + self.callosum.emit( 373 + "observe", 374 + "observing", 375 + day=date_part, 376 + segment=segment, 377 + files=files, 378 + host=_HOST, 379 + platform=_PLATFORM, 380 + ) 381 + logger.info(f"Segment observing: {segment} ({len(files)} files)") 359 382 360 383 async def initialize_screencast(self) -> bool: 361 384 """ ··· 477 500 "tmux_active": self.cached_tmux_active, 478 501 } 479 502 480 - self.callosum.emit( 481 - "observe", 482 - "status", 483 - mode=self.current_mode, 484 - screencast=screencast_info, 485 - tmux=tmux_info, 486 - audio=audio_info, 487 - activity=activity_info, 488 - host=_HOST, 489 - platform=_PLATFORM, 490 - ) 503 + # Emit to remote or local Callosum 504 + if self.remote_client: 505 + self.remote_client.emit( 506 + "observe", 507 + "status", 508 + mode=self.current_mode, 509 + screencast=screencast_info, 510 + tmux=tmux_info, 511 + audio=audio_info, 512 + activity=activity_info, 513 + host=_HOST, 514 + platform=_PLATFORM, 515 + ) 516 + elif self.callosum: 517 + self.callosum.emit( 518 + "observe", 519 + "status", 520 + mode=self.current_mode, 521 + screencast=screencast_info, 522 + tmux=tmux_info, 523 + audio=audio_info, 524 + activity=activity_info, 525 + host=_HOST, 526 + platform=_PLATFORM, 527 + ) 491 528 492 529 def finalize_screencast(self, temp_path: str, final_path: str): 493 530 """ ··· 703 740 self.audio_recorder.stop_recording() 704 741 logger.info("Audio recording stopped") 705 742 706 - # Stop Callosum connection 707 - if self.callosum: 743 + # Stop Callosum or remote client 744 + if self.remote_client: 745 + self.remote_client.stop() 746 + logger.info("Remote client stopped") 747 + elif self.callosum: 708 748 self.callosum.stop() 709 749 logger.info("Callosum connection stopped") 710 750 711 751 712 752 async def async_main(args): 713 753 """Async entry point.""" 714 - observer = Observer(interval=args.interval) 754 + observer = Observer( 755 + interval=args.interval, 756 + remote_url=getattr(args, "remote", None), 757 + ) 715 758 716 759 # Setup signal handlers 717 760 loop = asyncio.get_running_loop() ··· 752 795 default=300, 753 796 help="Duration per screencast window in seconds (default: 300 = 5 minutes).", 754 797 ) 798 + parser.add_argument( 799 + "--remote", 800 + type=str, 801 + help="Remote server URL for uploading segments (e.g., https://server:5000/app/remote/ingest/KEY)", 802 + ) 755 803 args = setup_cli(parser) 756 804 757 805 # Verify journal path exists ··· 759 807 if not journal or not os.path.exists(journal): 760 808 logger.error(f"JOURNAL_PATH not set or does not exist: {journal}") 761 809 sys.exit(1) 810 + 811 + # Log remote mode if enabled 812 + if args.remote: 813 + logger.info(f"Remote mode enabled: {args.remote[:50]}...") 762 814 763 815 # Run async main 764 816 try:
+1
observe/macos/observer.py
··· 287 287 self.callosum.emit( 288 288 "observe", 289 289 "observing", 290 + day=date_part, 290 291 segment=segment, 291 292 files=saved_files, 292 293 host=_HOST,
+56 -58
observe/sense.py
··· 2 2 # SPDX-License-Identifier: AGPL-3.0-only 3 3 # Copyright (c) 2026 sol pbc 4 4 5 - """File-based processor dispatcher for observe subsystem. 5 + """Event-based processor dispatcher for observe subsystem. 6 6 7 - Watches day directories for new files and spawns appropriate handler processes, 8 - capturing their stdout/stderr to log files like supervisor.py does for runners. 7 + Listens for observe.observing Callosum events and spawns appropriate handler 8 + processes, capturing their stdout/stderr to log files like supervisor.py does 9 + for runners. Batch mode (--day) uses file-based scanning for historical days. 9 10 """ 10 11 11 12 from __future__ import annotations ··· 16 17 import subprocess 17 18 import threading 18 19 import time 19 - from datetime import datetime 20 20 from pathlib import Path 21 - from typing import Dict, List, Optional 22 - 23 - from watchdog.events import FileSystemEventHandler 24 - from watchdog.observers import Observer 21 + from typing import Any, Dict, List, Optional 25 22 26 23 from observe.utils import VIDEO_EXTENSIONS 27 24 from think.callosum import CallosumConnection ··· 47 44 48 45 49 46 class FileSensor: 50 - """Pattern-based file watcher that spawns handler processes.""" 47 + """Event-driven sensor that spawns handler processes for media files.""" 51 48 52 49 def __init__(self, journal_dir: Path, verbose: bool = False, debug: bool = False): 53 50 self.journal_dir = journal_dir ··· 66 63 self.describe_queue: List[tuple[Path, float]] = [] 67 64 self.current_describe_process: Optional[HandlerProcess] = None 68 65 69 - self.observer: Optional[Observer] = None 70 - self.current_day: Optional[str] = None 71 66 self.running_flag = True 72 67 73 - # Callosum connection for emitting detected events 68 + # Callosum connection for receiving events and emitting status 74 69 self.callosum: Optional[CallosumConnection] = None 75 70 76 71 # Track last status emission time ··· 279 274 280 275 def _handle_file(self, file_path: Path): 281 276 """Route file to appropriate handler.""" 282 - # Small delay to ensure file is fully written 283 - time.sleep(0.1) 284 - 285 277 if not file_path.exists(): 278 + logger.warning(f"File not found, skipping: {file_path}") 286 279 return 287 280 288 281 handler_info = self._match_pattern(file_path) ··· 290 283 handler_name, command = handler_info 291 284 self._spawn_handler(file_path, handler_name, command) 292 285 286 + def _handle_callosum_message(self, message: Dict[str, Any]): 287 + """Handle incoming Callosum messages, filtering for observe.observing events.""" 288 + tract = message.get("tract") 289 + event = message.get("event") 290 + 291 + if tract != "observe" or event != "observing": 292 + return 293 + 294 + # Extract event fields 295 + day = message.get("day") 296 + segment = message.get("segment") 297 + files = message.get("files", []) 298 + 299 + if not day or not segment or not files: 300 + logger.warning( 301 + f"Invalid observing event: missing day/segment/files: {message}" 302 + ) 303 + return 304 + 305 + logger.info(f"Received observing event: {day}/{segment} ({len(files)} files)") 306 + 307 + # Build full paths for all files in this segment 308 + day_dir = self.journal_dir / day 309 + file_paths = [day_dir / filename for filename in files] 310 + 311 + # Pre-register segment tracking with complete file list 312 + # This ensures segment completion is tracked correctly even if some files 313 + # don't match patterns or fail to process 314 + with self.lock: 315 + if segment not in self.segment_files: 316 + self.segment_files[segment] = set() 317 + self.segment_start_time[segment] = time.time() 318 + for file_path in file_paths: 319 + # Only track files that will be processed (match a pattern) 320 + if self._match_pattern(file_path): 321 + self.segment_files[segment].add(file_path) 322 + 323 + # Process each file 324 + for file_path in file_paths: 325 + self._handle_file(file_path) 326 + 293 327 def _emit_status(self): 294 328 """Emit observe.status event with current processing state (only when active).""" 295 329 if not self.callosum: ··· 376 410 self.callosum.emit("observe", "status", **status) 377 411 378 412 def start(self): 379 - """Start watching for new files with day rollover.""" 413 + """Start listening for observe.observing Callosum events.""" 380 414 381 - # Start Callosum connection for emitting detected events 415 + # Start Callosum connection with callback for receiving events 382 416 self.callosum = CallosumConnection() 383 - self.callosum.start() 384 - 385 - class SensorEventHandler(FileSystemEventHandler): 386 - def __init__(self, sensor): 387 - self.sensor = sensor 388 - 389 - def on_created(self, event): 390 - if not event.is_directory: 391 - path = Path(event.src_path) 392 - # Ignore hidden files (temp recordings) 393 - if not path.name.startswith("."): 394 - self.sensor._handle_file(path) 395 - 396 - def on_moved(self, event): 397 - if not event.is_directory: 398 - path = Path(event.dest_path) 399 - # Ignore hidden files (temp recordings) 400 - if not path.name.startswith("."): 401 - self.sensor._handle_file(path) 402 - 403 - event_handler = SensorEventHandler(self) 417 + self.callosum.start(callback=self._handle_callosum_message) 418 + logger.info("Listening for observe.observing events via Callosum") 404 419 405 420 while self.running_flag: 406 - today_str = datetime.now().strftime("%Y%m%d") 407 - day_dir = day_path() 408 - 409 - if day_dir.exists() and (self.current_day != today_str): 410 - if self.observer: 411 - logger.info("Day rollover, stopping old observer") 412 - self.observer.stop() 413 - self.observer.join() 414 - 415 - self.observer = Observer() 416 - self.observer.schedule(event_handler, str(day_dir), recursive=False) 417 - self.observer.start() 418 - self.current_day = today_str 419 - logger.info(f"Watching {day_dir}") 420 421 421 422 # Emit status every 5 seconds if there's activity 422 423 now = time.time() ··· 427 428 time.sleep(1) 428 429 429 430 def stop(self): 430 - """Stop watching and cleanup running processes.""" 431 + """Stop listening and cleanup running processes.""" 431 432 self.running_flag = False 432 - if self.observer: 433 - self.observer.stop() 434 - self.observer.join() 435 433 436 434 # Stop Callosum connection 437 435 if self.callosum: ··· 644 642 ) 645 643 sensor.process_day(args.day, max_jobs=args.jobs) 646 644 else: 647 - # Watch mode: monitor for new files 648 - logger.info("Starting observe sensor in watch mode...") 645 + # Event mode: listen for Callosum events 646 + logger.info("Starting observe sensor in event mode...") 649 647 try: 650 648 sensor.start() 651 649 except KeyboardInterrupt:
-1
pyproject.toml
··· 73 73 74 74 # Media processing 75 75 "opencv-python", 76 - "watchdog", 77 76 78 77 # Additional AI SDKs 79 78 "claude-agent-sdk>=0.1.0",
-37
tests/conftest.py
··· 199 199 sys.modules["soundfile"] = sf_mod 200 200 for name in [ 201 201 "noisereduce", 202 - "watchdog.events", 203 - "watchdog.observers", 204 202 ]: 205 203 if name not in sys.modules: 206 204 sys.modules[name] = types.ModuleType(name) 207 - if "watchdog.events" in sys.modules and not hasattr( 208 - sys.modules["watchdog.events"], "PatternMatchingEventHandler" 209 - ): 210 - 211 - class FileSystemEventHandler: 212 - pass 213 - 214 - class PatternMatchingEventHandler: 215 - pass 216 - 217 - sys.modules["watchdog.events"].FileSystemEventHandler = FileSystemEventHandler 218 - sys.modules["watchdog.events"].PatternMatchingEventHandler = ( 219 - PatternMatchingEventHandler 220 - ) 221 - if "watchdog.observers" in sys.modules and not hasattr( 222 - sys.modules["watchdog.observers"], "Observer" 223 - ): 224 - 225 - class Observer: 226 - def schedule(self, *a, **k): 227 - pass 228 - 229 - def start(self): 230 - pass 231 - 232 - def stop(self): 233 - pass 234 - 235 - def join(self, *a, **k): 236 - pass 237 - 238 - def is_alive(self): 239 - return False 240 - 241 - sys.modules["watchdog.observers"].Observer = Observer 242 205 243 206 244 207 @pytest.fixture
+81 -4
tests/test_sense.py
··· 298 298 with tempfile.TemporaryDirectory() as tmpdir: 299 299 sensor = FileSensor(Path(tmpdir)) 300 300 301 - # Mock observer 302 - sensor.observer = MagicMock() 301 + # Mock callosum 302 + sensor.callosum = MagicMock() 303 303 304 304 sensor.stop() 305 305 306 306 assert sensor.running_flag is False 307 - sensor.observer.stop.assert_called_once() 308 - sensor.observer.join.assert_called_once() 307 + sensor.callosum.stop.assert_called_once() 308 + 309 + 310 + def test_file_sensor_handle_callosum_message(tmp_path): 311 + """Test handling of observe.observing Callosum events.""" 312 + with patch.object(FileSensor, "_handle_file") as mock_handle: 313 + # Create journal/day structure 314 + day_dir = tmp_path / "20250101" 315 + day_dir.mkdir() 316 + 317 + sensor = FileSensor(tmp_path) 318 + sensor.register("*.flac", "transcribe", ["echo", "{file}"]) 319 + sensor.register("*.webm", "describe", ["echo", "{file}"]) 320 + 321 + # Create test files 322 + audio_file = day_dir / "143022_300_audio.flac" 323 + audio_file.write_text("audio content") 324 + video_file = day_dir / "143022_300_screen.webm" 325 + video_file.write_text("video content") 326 + 327 + # Simulate observing event 328 + message = { 329 + "tract": "observe", 330 + "event": "observing", 331 + "day": "20250101", 332 + "segment": "143022_300", 333 + "files": ["143022_300_audio.flac", "143022_300_screen.webm"], 334 + } 335 + 336 + sensor._handle_callosum_message(message) 337 + 338 + # Should have called _handle_file for each file 339 + assert mock_handle.call_count == 2 340 + called_paths = [call[0][0] for call in mock_handle.call_args_list] 341 + assert audio_file in called_paths 342 + assert video_file in called_paths 343 + 344 + # Should have pre-registered segment tracking 345 + assert "143022_300" in sensor.segment_files 346 + assert audio_file in sensor.segment_files["143022_300"] 347 + assert video_file in sensor.segment_files["143022_300"] 348 + assert "143022_300" in sensor.segment_start_time 349 + 350 + 351 + def test_file_sensor_handle_callosum_message_ignores_other_events(tmp_path): 352 + """Test that non-observing events are ignored.""" 353 + with patch.object(FileSensor, "_handle_file") as mock_handle: 354 + sensor = FileSensor(tmp_path) 355 + 356 + # Simulate a different event type 357 + message = { 358 + "tract": "observe", 359 + "event": "status", 360 + "some_data": "value", 361 + } 362 + 363 + sensor._handle_callosum_message(message) 364 + 365 + # Should not call _handle_file 366 + mock_handle.assert_not_called() 367 + 368 + 369 + def test_file_sensor_handle_callosum_message_invalid_event(tmp_path): 370 + """Test that invalid observing events are handled gracefully.""" 371 + with patch.object(FileSensor, "_handle_file") as mock_handle: 372 + sensor = FileSensor(tmp_path) 373 + 374 + # Simulate event missing required fields 375 + message = { 376 + "tract": "observe", 377 + "event": "observing", 378 + "segment": "143022_300", 379 + # missing 'day' and 'files' 380 + } 381 + 382 + sensor._handle_callosum_message(message) 383 + 384 + # Should not call _handle_file 385 + mock_handle.assert_not_called()