personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor observe segment handoff to use draft folder pattern

Observers now own segment directories: create HHMMSS_draft/ folders during
recording, write files with simple names (audio.flac, center_DP-3_screen.webm),
then atomically rename to HHMMSS_LEN/ on completion. This eliminates complex
file movement in transcribe/describe handlers.

Key changes:
- Linux/macOS observers: draft folder creation and atomic rename on boundary
- transcribe/describe: process files in-place, skip if output exists
- sense: expect files in segment dirs, removed SEGMENT_KEY env var
- remote ingest: create segment dirs, strip legacy prefixes from filenames
- Removed unused is_draft_segment() and macOS filename properties

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+583 -618
+33 -55
observe/describe.py
··· 28 28 import av 29 29 from PIL import Image, ImageChops, ImageStat 30 30 31 - from observe.utils import segment_and_suffix 31 + from observe.utils import get_segment_key 32 32 from think.callosum import callosum_send 33 33 from think.utils import setup_cli 34 34 ··· 340 340 contents.append(image) 341 341 return contents 342 342 343 - def _move_to_segment(self, media_path: Path) -> Path: 344 - """Move media file to its segment and return new path.""" 345 - segment, suffix = segment_and_suffix(media_path) 346 - segment_dir = media_path.parent / segment 347 - try: 348 - segment_dir.mkdir(exist_ok=True) 349 - # Preserve the original extension 350 - ext = media_path.suffix 351 - new_path = segment_dir / f"{suffix}{ext}" 352 - media_path.rename(new_path) 353 - logger.info(f"Moved {media_path} to {segment_dir}") 354 - return new_path 355 - except Exception as exc: 356 - logger.error(f"Failed to move {media_path} to segment: {exc}") 357 - return media_path 358 - 359 343 async def process_with_vision( 360 344 self, 361 345 max_concurrent: int = 10, ··· 388 372 389 373 # Write metadata header to JSONL file with actual video filename 390 374 if output_file: 391 - from observe.utils import extract_descriptive_suffix 392 - 393 - suffix = extract_descriptive_suffix(self.video_path.stem) 394 - metadata = {"raw": f"{suffix}{self.video_path.suffix}"} 375 + # Files are in segment directories, filename is simple (e.g., center_DP-3_screen.webm) 376 + metadata = {"raw": self.video_path.name} 395 377 396 378 # Add remote origin if set (from sense.py for remote observer uploads) 397 379 remote = os.getenv("REMOTE_NAME") ··· 682 664 all_failed = total_frames > 0 and failed_frames == total_frames 683 665 684 666 if all_failed: 685 - # Don't move video to segment - leave for retry 667 + # Leave video for retry (already in segment dir) 686 668 error_detail = ( 687 669 f"Error details in {output_path}" if output_path else "No output file" 688 670 ) ··· 695 677 raise RuntimeError( 696 678 f"All {total_frames} frame(s) failed vision analysis after retries" 697 679 ) 698 - else: 699 - # At least some frames succeeded - move to segment 700 - if failed_frames > 0: 701 - logger.warning( 702 - f"{failed_frames}/{total_frames} frame(s) failed processing. " 703 - f"Moving video to segment anyway." 704 - ) 705 - if output_path: 706 - self._move_to_segment(self.video_path) 680 + elif failed_frames > 0: 681 + logger.warning( 682 + f"{failed_frames}/{total_frames} frame(s) failed processing." 683 + ) 707 684 708 685 # Clear qualified_frames to free memory 709 686 self.qualified_frames.clear() ··· 737 714 parser.add_argument( 738 715 "video_path", 739 716 type=str, 740 - help="Path to video file to process", 717 + help="Path to video file in segment directory", 741 718 ) 742 719 parser.add_argument( 743 720 "-j", ··· 751 728 action="store_true", 752 729 help="Only output frame metadata without vision analysis", 753 730 ) 731 + parser.add_argument( 732 + "--redo", 733 + action="store_true", 734 + help="Reprocess file, overwriting existing outputs", 735 + ) 754 736 args = setup_cli(parser) 755 737 756 738 video_path = Path(args.video_path) 757 739 if not video_path.exists(): 758 740 parser.error(f"Video file not found: {video_path}") 759 741 760 - # Determine output path and warn if overwriting 742 + # Files must be in segment directories (YYYYMMDD/HHMMSS_LEN/) 743 + segment = get_segment_key(video_path) 744 + if segment is None: 745 + parser.error( 746 + f"Video file must be in a segment directory (HHMMSS_LEN/), " 747 + f"but parent is: {video_path.parent.name}" 748 + ) 749 + 750 + # Determine output path 761 751 output_path = None 762 - segment = None 763 - suffix = None 764 752 if not args.frames_only: 765 - # Extract segment and suffix for output naming 766 - try: 767 - segment, suffix = segment_and_suffix(video_path) 768 - except ValueError as exc: 769 - parser.error(str(exc)) 753 + # Output JSONL in same directory, same stem (e.g., center_DP-3_screen.jsonl) 754 + output_path = video_path.with_suffix(".jsonl") 770 755 771 - # Use segment from env (set by sense.py) or use derived value 772 - if not os.getenv("SEGMENT_KEY"): 773 - os.environ["SEGMENT_KEY"] = segment 756 + # Skip if already processed (unless redo mode) 757 + if not args.redo and output_path.exists(): 758 + logger.info(f"Already processed: {video_path}") 759 + return 774 760 775 - segment_dir = video_path.parent / segment 776 - segment_dir.mkdir(exist_ok=True) 777 - # Output JSONL matches input filename pattern (e.g., center_DP-3_screen.jsonl) 778 - output_path = segment_dir / f"{suffix}.jsonl" 779 761 if output_path.exists(): 780 762 logger.warning(f"Overwriting existing analysis file: {output_path}") 781 763 ··· 800 782 # Emit completion event 801 783 if output_path and output_path.exists(): 802 784 journal_path = Path(os.getenv("JOURNAL_PATH", "")) 803 - # Moved path is in segment: YYYYMMDD/HHMMSS_LEN/suffix.webm 804 - moved_path = ( 805 - video_path.parent / segment / f"{suffix}{video_path.suffix}" 806 - ) 807 785 808 786 try: 809 - rel_input = moved_path.relative_to(journal_path) 787 + rel_input = video_path.relative_to(journal_path) 810 788 rel_output = output_path.relative_to(journal_path) 811 789 except ValueError: 812 - rel_input = moved_path 790 + rel_input = video_path 813 791 rel_output = output_path 814 792 815 793 duration_ms = int((time.time() - start_time) * 1000) 816 794 817 - # Extract day from video path (video_path.parent is day dir) 818 - day = video_path.parent.name 795 + # Extract day from video path (grandparent is day dir) 796 + day = video_path.parent.parent.name 819 797 820 798 event_fields = { 821 799 "input": str(rel_input),
+175 -147
observe/linux/observer.py
··· 81 81 # Mode tracking (replaces screencast_running boolean) 82 82 self.current_mode = MODE_IDLE 83 83 84 + # Draft folder for current segment (HHMMSS_draft/) 85 + self.draft_dir: str | None = None 86 + 84 87 # Multi-file screencast tracking 85 88 self.current_streams: list[StreamInfo] = [] 86 - self.pending_finalizations: list[tuple[str, str]] | None = None 87 89 self.last_screencast_sizes: dict[str, int] = {} 88 90 89 91 # Tmux capture tracking ··· 212 214 time_part = dt.strftime("%H%M%S") 213 215 return date_part, time_part 214 216 215 - def _save_audio_segment( 216 - self, day_dir, time_part: str, duration: int, is_muted: bool 217 - ) -> list[str]: 217 + def _save_audio_segment(self, segment_dir: str, is_muted: bool) -> list[str]: 218 218 """ 219 - Save accumulated audio buffer to disk. 219 + Save accumulated audio buffer to segment directory. 220 220 221 221 Args: 222 - day_dir: Path to the day directory 223 - time_part: Timestamp string (HHMMSS) 224 - duration: Segment duration in seconds 222 + segment_dir: Path to the segment directory (YYYYMMDD/HHMMSS_LEN/) 225 223 is_muted: Whether to save as split mono files (muted) or stereo (unmuted) 226 224 227 225 Returns: 228 226 List of saved filenames (empty if nothing saved) 229 227 """ 228 + from pathlib import Path 229 + 230 230 if self.accumulated_audio_buffer.size == 0: 231 231 logger.warning("No audio buffer to save") 232 232 return [] 233 + 234 + segment_path = Path(segment_dir) 233 235 234 236 if is_muted: 235 237 # Split mode: save mic and sys as separate mono files ··· 239 241 mic_bytes = self.audio_recorder.create_mono_flac_bytes(mic_data) 240 242 sys_bytes = self.audio_recorder.create_mono_flac_bytes(sys_data) 241 243 242 - mic_name = f"{time_part}_{duration}_mic_audio.flac" 243 - sys_name = f"{time_part}_{duration}_sys_audio.flac" 244 + mic_name = "mic_audio.flac" 245 + sys_name = "sys_audio.flac" 244 246 245 - mic_path = day_dir / mic_name 246 - sys_path = day_dir / sys_name 247 + mic_path = segment_path / mic_name 248 + sys_path = segment_path / sys_name 247 249 248 250 with open(mic_path, "wb") as f: 249 251 f.write(mic_bytes) ··· 257 259 flac_bytes = self.audio_recorder.create_flac_bytes( 258 260 self.accumulated_audio_buffer 259 261 ) 260 - audio_name = f"{time_part}_{duration}_audio.flac" 261 - flac_path = day_dir / audio_name 262 + audio_name = "audio.flac" 263 + flac_path = segment_path / audio_name 262 264 263 265 with open(flac_path, "wb") as f: 264 266 f.write(flac_bytes) ··· 270 272 """ 271 273 Handle window boundary rollover. 272 274 275 + Closes the current draft folder, renames it to final segment name, 276 + and emits the observing event. 277 + 273 278 Args: 274 279 new_mode: The mode for the new segment 275 280 """ 281 + from pathlib import Path 282 + 276 283 # Get timestamp parts for this window and calculate duration 277 284 date_part, time_part = self.get_timestamp_parts(self.start_at) 278 285 duration = int(time.time() - self.start_at) 279 286 day_dir = day_path(date_part) 280 287 281 - # Save audio if we have enough threshold hits 288 + # Stop screencast first (closes file handles) 289 + stopped_streams: list[StreamInfo] = [] 290 + screen_files: list[str] = [] 291 + 292 + if self.current_mode == MODE_SCREENCAST: 293 + logger.info("Stopping previous screencast") 294 + stopped_streams = await self.screencaster.stop() 295 + self.current_streams = [] 296 + self.last_screencast_sizes = {} 297 + self.stalled_chunks = 0 298 + 299 + # Collect screen filenames (files are already in draft dir with final names) 300 + screen_files = [stream.filename for stream in stopped_streams] 301 + 302 + # Save audio if we have enough threshold hits (to draft dir) 282 303 did_save_audio = self.threshold_hits >= MIN_HITS_FOR_SAVE 283 304 audio_files: list[str] = [] 284 - if did_save_audio: 305 + if did_save_audio and self.draft_dir: 285 306 audio_files = self._save_audio_segment( 286 - day_dir, time_part, duration, self.segment_is_muted 307 + self.draft_dir, self.segment_is_muted 287 308 ) 288 309 if audio_files: 289 310 logger.info( ··· 298 319 self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2) 299 320 self.threshold_hits = 0 300 321 301 - # Handle screencast rollover (if we were in screencast mode) 302 - stopped_streams: list[StreamInfo] = [] 303 - screen_files: list[str] = [] 304 - 305 - if self.current_mode == MODE_SCREENCAST: 306 - logger.info("Stopping previous screencast") 307 - stopped_streams = await self.screencaster.stop() 308 - self.current_streams = [] 309 - self.last_screencast_sizes = {} 310 - self.stalled_chunks = 0 311 - 312 - # Build finalization list and file names 313 - finalizations = [] 314 - for stream in stopped_streams: 315 - final_name = stream.final_name(time_part, duration) 316 - final_path = str(day_dir / final_name) 317 - finalizations.append((stream.temp_path, final_path)) 318 - screen_files.append(final_name) 319 - 320 - if finalizations: 321 - self.pending_finalizations = finalizations 322 - 323 - # Handle tmux capture save (if we were in tmux mode) 322 + # Handle tmux capture save (to draft dir) 324 323 tmux_files: list[str] = [] 325 - if self.current_mode == MODE_TMUX and self.tmux_captures: 326 - segment_key = f"{time_part}_{duration}" 327 - segment_dir = day_dir / segment_key 328 - tmux_files = write_captures_jsonl(self.tmux_captures, segment_dir) 324 + if self.current_mode == MODE_TMUX and self.tmux_captures and self.draft_dir: 325 + # write_captures_jsonl expects a Path and creates it if needed 326 + # Draft dir already exists 327 + tmux_files = write_captures_jsonl(self.tmux_captures, Path(self.draft_dir)) 329 328 330 329 # Reset tmux state 331 330 self.tmux_captures = [] ··· 333 332 self.tmux_sessions_seen = set() 334 333 self.tmux_capture.reset_hashes() 335 334 335 + # Collect all files saved in this segment 336 + files = audio_files + screen_files + tmux_files 337 + segment_key = f"{time_part}_{duration}" 338 + 339 + # Rename draft folder to final segment name (atomic handoff) 340 + if self.draft_dir and files: 341 + final_segment_dir = str(day_dir / segment_key) 342 + try: 343 + os.rename(self.draft_dir, final_segment_dir) 344 + logger.info( 345 + f"Segment finalized: {self.draft_dir} -> {final_segment_dir}" 346 + ) 347 + except OSError as e: 348 + logger.error(f"Failed to rename draft folder: {e}") 349 + # Files stay in draft folder, won't be processed 350 + files = [] 351 + elif self.draft_dir and not files: 352 + # No files to save, remove empty draft folder 353 + try: 354 + os.rmdir(self.draft_dir) 355 + logger.debug(f"Removed empty draft folder: {self.draft_dir}") 356 + except OSError: 357 + pass # May have other files, ignore 358 + 359 + self.draft_dir = None 360 + 336 361 # Reset timing for new window 337 362 self.start_at = time.time() # Wall-clock for filenames 338 363 self.start_at_mono = time.monotonic() # Monotonic for elapsed ··· 344 369 old_mode = self.current_mode 345 370 self.current_mode = new_mode 346 371 347 - # Start new capture based on mode 372 + # Start new capture based on mode (creates new draft folder) 348 373 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked: 349 374 await self.initialize_screencast() 375 + elif new_mode == MODE_TMUX or new_mode == MODE_IDLE: 376 + # Create draft folder for audio/tmux even without screencast 377 + self._create_draft_folder() 350 378 # MODE_TMUX doesn't need initialization, captures happen in main loop 351 379 352 380 logger.info(f"Mode transition: {old_mode} -> {new_mode}") 353 381 354 382 # Emit observing event with what we saved this boundary 355 - files = audio_files + screen_files + tmux_files 356 - 357 383 if files: 358 - segment = f"{time_part}_{duration}" 359 - 360 384 if self.remote_client: 361 385 # Remote mode: upload files to remote server 362 - file_paths = [day_dir / f for f in files] 386 + segment_dir = day_dir / segment_key 387 + file_paths = [segment_dir / f for f in files] 363 388 if self.remote_client.upload_and_cleanup( 364 - date_part, segment, file_paths 389 + date_part, segment_key, file_paths 365 390 ): 366 - logger.info(f"Segment uploaded: {segment} ({len(files)} files)") 391 + logger.info(f"Segment uploaded: {segment_key} ({len(files)} files)") 367 392 else: 368 393 logger.error( 369 - f"Segment upload failed: {segment} - files kept locally" 394 + f"Segment upload failed: {segment_key} - files kept locally" 370 395 ) 371 396 elif self.callosum: 372 397 # Local mode: emit to local Callosum 398 + # Files are now simple names (e.g., "audio.flac" not "143022_300_audio.flac") 373 399 self.callosum.emit( 374 400 "observe", 375 401 "observing", 376 402 day=date_part, 377 - segment=segment, 403 + segment=segment_key, 378 404 files=files, 379 405 host=HOST, 380 406 platform=PLATFORM, 381 407 ) 382 - logger.info(f"Segment observing: {segment} ({len(files)} files)") 408 + logger.info(f"Segment observing: {segment_key} ({len(files)} files)") 409 + 410 + def _create_draft_folder(self) -> str: 411 + """ 412 + Create a draft folder for the current segment. 413 + 414 + Returns: 415 + Path to the draft folder (YYYYMMDD/HHMMSS_draft/) 416 + """ 417 + date_part, time_part = self.get_timestamp_parts(self.start_at) 418 + day_dir = day_path(date_part) 419 + 420 + # Create draft folder: YYYYMMDD/HHMMSS_draft/ 421 + draft_name = f"{time_part}_draft" 422 + draft_path = str(day_dir / draft_name) 423 + os.makedirs(draft_path, exist_ok=True) 424 + 425 + self.draft_dir = draft_path 426 + logger.debug(f"Created draft folder: {draft_path}") 427 + return draft_path 383 428 384 429 async def initialize_screencast(self) -> bool: 385 430 """ 386 431 Start a new screencast recording. 387 432 433 + Creates a draft folder and starts GStreamer recording to it. 434 + 388 435 Returns: 389 436 True if screencast started successfully, False otherwise. 390 437 391 438 Raises: 392 439 RuntimeError: If recording fails to start (caller should exit). 393 440 """ 394 - date_part, time_part = self.get_timestamp_parts(self.start_at) 395 - day_dir = day_path(date_part) 441 + # Create draft folder for this segment 442 + draft_path = self._create_draft_folder() 396 443 397 444 try: 398 445 streams = await self.screencaster.start( 399 - str(day_dir), time_part, framerate=1, draw_cursor=True 446 + draft_path, framerate=1, draw_cursor=True 400 447 ) 401 448 except RuntimeError as e: 402 449 logger.error(f"Failed to start screencast: {e}") ··· 407 454 raise RuntimeError("No streams available") 408 455 409 456 self.current_streams = streams 410 - self.last_screencast_sizes = {s.temp_path: 0 for s in streams} 457 + self.last_screencast_sizes = {s.file_path: 0 for s in streams} 411 458 self.stalled_chunks = 0 412 459 413 460 logger.info(f"Started screencast with {len(streams)} stream(s)") 414 461 for stream in streams: 415 - logger.info(f" {stream.position} ({stream.connector}): {stream.temp_path}") 462 + logger.info(f" {stream.position} ({stream.connector}): {stream.file_path}") 416 463 417 464 return True 418 465 ··· 451 498 for stream in self.current_streams: 452 499 try: 453 500 rel_file = ( 454 - os.path.relpath(stream.temp_path, journal_path) 501 + os.path.relpath(stream.file_path, journal_path) 455 502 if journal_path 456 - else stream.temp_path 503 + else stream.file_path 457 504 ) 458 505 except ValueError: 459 - rel_file = stream.temp_path 506 + rel_file = stream.file_path 460 507 461 508 streams_info.append( 462 509 { ··· 528 575 platform=PLATFORM, 529 576 ) 530 577 531 - def finalize_screencast(self, temp_path: str, final_path: str): 532 - """ 533 - Rename screencast from temp to final path. 534 - 535 - Args: 536 - temp_path: Temporary hidden path (.HHMMSS_position_connector.webm) 537 - final_path: Final destination path (HHMMSS_LEN_position_connector_screen.webm) 538 - """ 539 - if not os.path.exists(temp_path): 540 - logger.warning(f"Screencast file not found: {temp_path}") 541 - return 542 - 543 - try: 544 - os.replace(temp_path, final_path) 545 - logger.info(f"Finalized screencast: {final_path}") 546 - except OSError as e: 547 - logger.error(f"Failed to rename {temp_path} to {final_path}: {e}") 548 - 549 578 async def main_loop(self): 550 579 """Run the main observer loop.""" 551 580 logger.info(f"Starting observer loop (interval={self.interval}s)") ··· 555 584 self.segment_is_muted = self.cached_is_muted # Sync initial mute state 556 585 self.current_mode = new_mode 557 586 558 - # Start initial capture based on mode 587 + # Start initial capture based on mode (creates draft folder) 559 588 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked: 560 589 try: 561 590 await self.initialize_screencast() ··· 563 592 # Failed to start screencast, exit 564 593 self.running = False 565 594 return 595 + else: 596 + # Create draft folder for audio/tmux even without screencast 597 + self._create_draft_folder() 566 598 567 599 logger.info(f"Initial mode: {self.current_mode}") 568 600 ··· 570 602 # Sleep for chunk duration 571 603 await asyncio.sleep(CHUNK_DURATION) 572 604 573 - # Process pending screencast finalizations 574 - if self.pending_finalizations: 575 - for temp_path, final_path in self.pending_finalizations: 576 - if os.path.exists(temp_path): 577 - self.finalize_screencast(temp_path, final_path) 578 - else: 579 - logger.warning(f"Pending screencast not found: {temp_path}") 580 - self.pending_finalizations = None 581 - 582 605 # Check activity status and determine new mode 583 606 new_mode = await self.check_activity_status() 584 607 ··· 588 611 and not self.screencaster.is_healthy() 589 612 ): 590 613 logger.warning("Screencast recording failed, stopping gracefully") 591 - stopped_streams = await self.screencaster.stop() 614 + await self.screencaster.stop() 592 615 593 - # Finalize whatever we have 594 - if stopped_streams: 595 - date_part, time_part = self.get_timestamp_parts(self.start_at) 596 - duration = int(time.time() - self.start_at) 597 - day_dir = day_path(date_part) 598 - 599 - for stream in stopped_streams: 600 - if os.path.exists(stream.temp_path): 601 - final_path = str( 602 - day_dir / stream.final_name(time_part, duration) 603 - ) 604 - self.finalize_screencast(stream.temp_path, final_path) 605 - 616 + # Files are already in draft folder, will be finalized at next boundary 606 617 self.current_streams = [] 607 618 self.last_screencast_sizes = {} 608 619 self.stalled_chunks = 0 ··· 667 678 if self.current_mode == MODE_SCREENCAST and self.current_streams: 668 679 any_growing = False 669 680 for stream in self.current_streams: 670 - if os.path.exists(stream.temp_path): 671 - current_size = os.path.getsize(stream.temp_path) 672 - last_size = self.last_screencast_sizes.get(stream.temp_path, 0) 681 + if os.path.exists(stream.file_path): 682 + current_size = os.path.getsize(stream.file_path) 683 + last_size = self.last_screencast_sizes.get(stream.file_path, 0) 673 684 if current_size > last_size: 674 685 any_growing = True 675 - self.last_screencast_sizes[stream.temp_path] = current_size 686 + self.last_screencast_sizes[stream.file_path] = current_size 676 687 self.files_growing = any_growing 677 688 678 689 # Fail-fast: exit if screencast stalled (files not growing) ··· 699 710 700 711 async def shutdown(self): 701 712 """Clean shutdown of observer.""" 713 + from pathlib import Path 714 + 702 715 # Get timestamp parts for final save 703 716 date_part, time_part = self.get_timestamp_parts(self.start_at) 704 717 duration = int(time.time() - self.start_at) 705 718 day_dir = day_path(date_part) 706 719 707 - # Save final audio if threshold met 708 - if self.threshold_hits >= MIN_HITS_FOR_SAVE: 720 + # Stop screencast first (closes file handles) 721 + stopped_streams: list[StreamInfo] = [] 722 + if self.current_mode == MODE_SCREENCAST: 723 + logger.info("Stopping screencast for shutdown") 724 + stopped_streams = await self.screencaster.stop() 725 + # Brief delay for files to be flushed 726 + await asyncio.sleep(0.5) 727 + 728 + # Save final audio if threshold met (to draft dir) 729 + audio_files: list[str] = [] 730 + if self.threshold_hits >= MIN_HITS_FOR_SAVE and self.draft_dir: 709 731 audio_files = self._save_audio_segment( 710 - day_dir, time_part, duration, self.segment_is_muted 732 + self.draft_dir, self.segment_is_muted 711 733 ) 712 734 if audio_files: 713 735 logger.info(f"Saved final audio: {len(audio_files)} file(s)") 714 736 715 - # Stop screencast if running 716 - if self.current_mode == MODE_SCREENCAST: 717 - logger.info("Stopping screencast for shutdown") 718 - stopped_streams = await self.screencaster.stop() 719 - 720 - if stopped_streams: 721 - # Brief delay for files to be written 722 - await asyncio.sleep(0.5) 723 - 724 - for stream in stopped_streams: 725 - if os.path.exists(stream.temp_path): 726 - final_path = str( 727 - day_dir / stream.final_name(time_part, duration) 728 - ) 729 - self.finalize_screencast(stream.temp_path, final_path) 730 - else: 731 - logger.warning( 732 - f"Screencast file not found after shutdown: {stream.temp_path}" 733 - ) 734 - 735 - # Save tmux captures if in tmux mode 736 - if self.current_mode == MODE_TMUX and self.tmux_captures: 737 - segment_key = f"{time_part}_{duration}" 738 - segment_dir = day_dir / segment_key 739 - tmux_files = write_captures_jsonl(self.tmux_captures, segment_dir) 737 + # Save tmux captures if in tmux mode (to draft dir) 738 + tmux_files: list[str] = [] 739 + if self.current_mode == MODE_TMUX and self.tmux_captures and self.draft_dir: 740 + tmux_files = write_captures_jsonl(self.tmux_captures, Path(self.draft_dir)) 740 741 if tmux_files: 741 742 logger.info(f"Saved final tmux captures: {len(tmux_files)} file(s)") 742 743 743 - # Process any remaining pending finalizations 744 - if self.pending_finalizations: 745 - await asyncio.sleep(0.5) 746 - for temp_path, final_path in self.pending_finalizations: 747 - if os.path.exists(temp_path): 748 - self.finalize_screencast(temp_path, final_path) 749 - else: 750 - logger.warning( 751 - f"Pending screencast not found after shutdown: {temp_path}" 744 + # Collect all files and finalize segment 745 + screen_files = [stream.filename for stream in stopped_streams] 746 + files = audio_files + screen_files + tmux_files 747 + segment_key = f"{time_part}_{duration}" 748 + 749 + if self.draft_dir and files: 750 + final_segment_dir = str(day_dir / segment_key) 751 + try: 752 + os.rename(self.draft_dir, final_segment_dir) 753 + logger.info(f"Final segment: {self.draft_dir} -> {final_segment_dir}") 754 + 755 + # Emit final observing event 756 + if self.remote_client: 757 + segment_dir = day_dir / segment_key 758 + file_paths = [segment_dir / f for f in files] 759 + self.remote_client.upload_and_cleanup( 760 + date_part, segment_key, file_paths 752 761 ) 753 - self.pending_finalizations = None 762 + elif self.callosum: 763 + self.callosum.emit( 764 + "observe", 765 + "observing", 766 + day=date_part, 767 + segment=segment_key, 768 + files=files, 769 + host=HOST, 770 + platform=PLATFORM, 771 + ) 772 + except OSError as e: 773 + logger.error(f"Failed to rename final draft folder: {e}") 774 + elif self.draft_dir: 775 + # No files, remove empty draft folder 776 + try: 777 + os.rmdir(self.draft_dir) 778 + except OSError: 779 + pass 780 + 781 + self.draft_dir = None 754 782 755 783 # Stop audio recorder 756 784 self.audio_recorder.stop_recording()
+17 -16
observe/linux/screencast.py
··· 54 54 y: int 55 55 width: int 56 56 height: int 57 - temp_path: str 57 + file_path: str # Final path in segment directory 58 58 59 - def final_name(self, time_part: str, duration: int) -> str: 60 - """Generate the final filename for this stream.""" 61 - return f"{time_part}_{duration}_{self.position}_{self.connector}_screen.webm" 59 + @property 60 + def filename(self) -> str: 61 + """Return just the filename for event payloads.""" 62 + return os.path.basename(self.file_path) 62 63 63 64 64 65 def _get_restore_token_path() -> Path: ··· 233 234 234 235 async def start( 235 236 self, 236 - base_path: str, 237 - timestamp: str, 237 + output_dir: str, 238 238 framerate: int = 1, 239 239 draw_cursor: bool = True, 240 240 ) -> list[StreamInfo]: 241 241 """ 242 242 Start screencast recording for all monitors. 243 243 244 + Files are written directly to output_dir with final names (position_connector_screen.webm). 245 + The output_dir is typically a draft segment directory that will be renamed on completion. 246 + 244 247 Args: 245 - base_path: Directory for output files 246 - timestamp: Timestamp prefix for temp files (HHMMSS format) 248 + output_dir: Directory for output files (e.g., YYYYMMDD/HHMMSS_draft/) 247 249 framerate: Frames per second (default: 1) 248 250 draw_cursor: Whether to draw mouse cursor (default: True) 249 251 ··· 373 375 position = info["position_label"] 374 376 connector = info["connector"] 375 377 376 - # Temp file: .HHMMSS_position_connector.webm 377 - temp_path = os.path.join( 378 - base_path, f".{timestamp}_{position}_{connector}.webm" 379 - ) 378 + # Final file path: position_connector_screen.webm 379 + # Written directly to output_dir (draft segment directory) 380 + file_path = os.path.join(output_dir, f"{position}_{connector}_screen.webm") 380 381 381 382 stream_obj = StreamInfo( 382 383 node_id=node_id, ··· 386 387 y=info["y"], 387 388 width=info["width"], 388 389 height=info["height"], 389 - temp_path=temp_path, 390 + file_path=file_path, 390 391 ) 391 392 self.streams.append(stream_obj) 392 393 ··· 397 398 f"videorate ! video/x-raw,framerate={framerate}/1 ! " 398 399 f"videoconvert ! vp8enc end-usage=cq cq-level=4 max-quantizer=15 " 399 400 f"keyframe-max-dist=30 static-threshold=100 ! webmmux ! " 400 - f"filesink location={temp_path}" 401 + f"filesink location={file_path}" 401 402 ) 402 403 pipeline_parts.append(branch) 403 404 404 - logger.info(f" Stream {node_id}: {position} ({connector}) -> {temp_path}") 405 + logger.info(f" Stream {node_id}: {position} ({connector}) -> {file_path}") 405 406 406 407 pipeline_str = " ".join(pipeline_parts) 407 408 cmd = ["gst-launch-1.0", "-e"] + pipeline_str.split() ··· 439 440 Stop screencast recording gracefully. 440 441 441 442 Returns: 442 - List of StreamInfo with temp_path for finalization. 443 + List of StreamInfo with file_path for the recorded files. 443 444 """ 444 445 streams = self.streams.copy() 445 446
+147 -87
observe/macos/observer.py
··· 68 68 # Multi-display tracking (similar to Linux observer) 69 69 self.current_displays: list[DisplayInfo] = [] 70 70 self.current_audio: AudioInfo | None = None 71 - self.pending_finalization: list[tuple[str, str]] | None = None 72 71 self.last_video_sizes: dict[str, int] = {} 72 + 73 + # Draft folder for current segment (HHMMSS_draft/) 74 + self.draft_dir: str | None = None 73 75 74 76 # Activity status cache (updated each loop) 75 77 self.cached_is_active = False ··· 222 224 """ 223 225 Handle window boundary rollover. 224 226 227 + Closes the current draft folder, renames files to simple names, 228 + renames folder to final segment name, and emits the observing event. 229 + 225 230 Args: 226 231 is_active: Whether system is currently active 227 232 """ 233 + from pathlib import Path 234 + 228 235 # Get timestamp parts for this window and calculate duration 229 236 date_part, time_part = self.get_timestamp_parts(self.start_at) 230 237 duration = int(time.time() - self.start_at) 231 238 day_dir = day_path(date_part) 239 + segment_key = f"{time_part}_{duration}" 232 240 233 241 saved_files: list[str] = [] 234 - finalizations: list[tuple[str, str]] = [] 235 242 236 243 if self.capture_running: 237 244 logger.info("Stopping previous capture") 238 245 self.screencapture.stop() 239 246 self.capture_running = False 240 247 241 - # Build finalization list for video files 248 + # Rename video files to simple names in draft folder 242 249 for display in self.current_displays: 243 - if os.path.exists(display.temp_path): 244 - final_name = display.final_name(time_part, duration) 245 - final_path = str(day_dir / final_name) 246 - finalizations.append((display.temp_path, final_path)) 247 - saved_files.append(final_name) 250 + if os.path.exists(display.file_path): 251 + # Simple name: position_displayID_screen.mov 252 + simple_name = f"{display.position}_{display.display_id}_screen.mov" 253 + simple_path = Path(self.draft_dir) / simple_name 254 + try: 255 + os.rename(display.file_path, simple_path) 256 + saved_files.append(simple_name) 257 + except OSError as e: 258 + logger.error(f"Failed to rename {display.file_path}: {e}") 248 259 249 - # Check audio threshold before including in finalization 250 - if self.current_audio and os.path.exists(self.current_audio.temp_path): 251 - if self._check_audio_threshold(self.current_audio.temp_path): 252 - final_name = self.current_audio.final_name(time_part, duration) 253 - final_path = str(day_dir / final_name) 254 - finalizations.append((self.current_audio.temp_path, final_path)) 255 - saved_files.append(final_name) 256 - logger.info(f"Audio passed threshold check, saving: {final_name}") 260 + # Check audio threshold and rename if passing 261 + if self.current_audio and os.path.exists(self.current_audio.file_path): 262 + if self._check_audio_threshold(self.current_audio.file_path): 263 + simple_name = "audio.m4a" 264 + simple_path = Path(self.draft_dir) / simple_name 265 + try: 266 + os.rename(self.current_audio.file_path, simple_path) 267 + saved_files.append(simple_name) 268 + logger.info( 269 + f"Audio passed threshold check, saving: {simple_name}" 270 + ) 271 + except OSError as e: 272 + logger.error(f"Failed to rename audio: {e}") 257 273 else: 258 - # Delete the temp audio file 274 + # Delete the audio file 259 275 try: 260 - os.remove(self.current_audio.temp_path) 276 + os.remove(self.current_audio.file_path) 261 277 logger.info("Audio below threshold, discarded") 262 278 except OSError as e: 263 279 logger.warning(f"Failed to remove audio file: {e}") ··· 268 284 self.last_video_sizes = {} 269 285 self.stalled_chunks = 0 270 286 271 - if finalizations: 272 - self.pending_finalization = finalizations 287 + # Rename draft folder to final segment name (atomic handoff) 288 + if self.draft_dir and saved_files: 289 + final_segment_dir = str(day_dir / segment_key) 290 + try: 291 + os.rename(self.draft_dir, final_segment_dir) 292 + logger.info( 293 + f"Segment finalized: {self.draft_dir} -> {final_segment_dir}" 294 + ) 295 + except OSError as e: 296 + logger.error(f"Failed to rename draft folder: {e}") 297 + saved_files = [] # Don't emit event if rename failed 298 + elif self.draft_dir and not saved_files: 299 + # No files to save, remove empty draft folder 300 + try: 301 + os.rmdir(self.draft_dir) 302 + logger.debug(f"Removed empty draft folder: {self.draft_dir}") 303 + except OSError: 304 + pass # May have other files, ignore 305 + 306 + self.draft_dir = None 273 307 274 308 # Reset timing for new window 275 309 self.start_at = time.time() ··· 278 312 # Update segment mute state 279 313 self.segment_is_muted = self.cached_is_muted 280 314 281 - # Start new capture if active and screen not locked 315 + # Start new capture if active and screen not locked (creates new draft folder) 282 316 if is_active and not self.cached_screen_locked: 283 317 self.initialize_capture() 284 318 285 319 # Emit observing event with saved files 286 320 if saved_files and self.callosum: 287 - segment = f"{time_part}_{duration}" 288 321 self.callosum.emit( 289 322 "observe", 290 323 "observing", 291 324 day=date_part, 292 - segment=segment, 325 + segment=segment_key, 293 326 files=saved_files, 294 327 host=HOST, 295 328 platform=PLATFORM, 296 329 ) 297 - logger.info(f"Segment observing: {segment} ({len(saved_files)} files)") 330 + logger.info(f"Segment observing: {segment_key} ({len(saved_files)} files)") 331 + 332 + def _create_draft_folder(self) -> str: 333 + """ 334 + Create a draft folder for the current segment. 335 + 336 + Returns: 337 + Path to the draft folder (YYYYMMDD/HHMMSS_draft/) 338 + """ 339 + date_part, time_part = self.get_timestamp_parts(self.start_at) 340 + day_dir = day_path(date_part) 341 + 342 + # Create draft folder: YYYYMMDD/HHMMSS_draft/ 343 + draft_name = f"{time_part}_draft" 344 + draft_path = str(day_dir / draft_name) 345 + os.makedirs(draft_path, exist_ok=True) 346 + 347 + self.draft_dir = draft_path 348 + logger.debug(f"Created draft folder: {draft_path}") 349 + return draft_path 298 350 299 351 def initialize_capture(self) -> bool: 300 352 """ 301 353 Start a new screencast and audio recording. 302 354 355 + Creates a draft folder and starts sck-cli recording. 356 + 303 357 Returns: 304 358 True if capture started successfully, False otherwise 305 359 """ 306 - date_part, time_part = self.get_timestamp_parts(self.start_at) 307 - day_dir = day_path(date_part) 360 + from pathlib import Path 308 361 309 - # Ensure day directory exists 310 - day_dir.mkdir(parents=True, exist_ok=True) 362 + # Create draft folder for this segment 363 + draft_path = self._create_draft_folder() 311 364 312 - # Build temp output base (hidden file) 313 - output_base = day_dir / f".{time_part}" 365 + # Build output base for sck-cli (inside draft folder) 366 + # sck-cli will create files like: draft/capture_1.mov, draft/capture.m4a 367 + output_base = Path(draft_path) / "capture" 314 368 315 369 try: 316 370 displays, audio = self.screencapture.start( ··· 323 377 self.current_displays = displays 324 378 self.current_audio = audio 325 379 self.capture_running = True 326 - self.last_video_sizes = {d.temp_path: 0 for d in displays} 380 + self.last_video_sizes = {d.file_path: 0 for d in displays} 327 381 self.stalled_chunks = 0 328 382 329 383 logger.info(f"Started capture with {len(displays)} display(s)") 330 384 for display in displays: 331 385 logger.info( 332 - f" Display {display.display_id}: {display.position} -> {display.temp_path}" 386 + f" Display {display.display_id}: {display.position} -> {display.file_path}" 333 387 ) 334 388 if audio: 335 - logger.info(f" Audio: {audio.temp_path}") 389 + logger.info(f" Audio: {audio.file_path}") 336 390 337 391 return True 338 392 ··· 361 415 for display in self.current_displays: 362 416 try: 363 417 rel_file = ( 364 - os.path.relpath(display.temp_path, journal_path) 418 + os.path.relpath(display.file_path, journal_path) 365 419 if journal_path 366 - else display.temp_path 420 + else display.file_path 367 421 ) 368 422 except ValueError: 369 - rel_file = display.temp_path 423 + rel_file = display.file_path 370 424 371 425 streams_info.append( 372 426 { ··· 415 469 platform=PLATFORM, 416 470 ) 417 471 418 - def finalize_screencast(self, temp_path: str, final_path: str): 419 - """ 420 - Rename capture file from temp to final path. 421 - 422 - Args: 423 - temp_path: Temporary file path 424 - final_path: Final destination path 425 - """ 426 - if not os.path.exists(temp_path): 427 - logger.warning(f"Capture file not found: {temp_path}") 428 - return 429 - 430 - try: 431 - os.replace(temp_path, final_path) 432 - logger.info(f"Finalized: {final_path}") 433 - except OSError as e: 434 - logger.error(f"Failed to rename {temp_path} to {final_path}: {e}") 435 - 436 472 async def main_loop(self): 437 473 """Run the main observer loop.""" 438 474 logger.info(f"Starting observer loop (interval={self.interval}s)") ··· 451 487 # Sleep for chunk duration 452 488 await asyncio.sleep(CHUNK_DURATION) 453 489 454 - # Process pending finalizations 455 - if self.pending_finalization: 456 - for temp_path, final_path in self.pending_finalization: 457 - if os.path.exists(temp_path): 458 - self.finalize_screencast(temp_path, final_path) 459 - else: 460 - logger.warning(f"Pending file not found: {temp_path}") 461 - self.pending_finalization = None 462 - 463 490 # Check activity status 464 491 is_active = self.check_activity_status() 465 492 ··· 499 526 if self.capture_running and self.current_displays: 500 527 any_growing = False 501 528 for display in self.current_displays: 502 - if os.path.exists(display.temp_path): 503 - current_size = os.path.getsize(display.temp_path) 504 - last_size = self.last_video_sizes.get(display.temp_path, 0) 529 + if os.path.exists(display.file_path): 530 + current_size = os.path.getsize(display.file_path) 531 + last_size = self.last_video_sizes.get(display.file_path, 0) 505 532 if current_size > last_size: 506 533 any_growing = True 507 - self.last_video_sizes[display.temp_path] = current_size 534 + self.last_video_sizes[display.file_path] = current_size 508 535 self.files_growing = any_growing 509 536 510 537 # Fail-fast: exit if capture stalled (files not growing) ··· 531 558 532 559 async def shutdown(self): 533 560 """Clean shutdown of observer.""" 561 + from pathlib import Path 562 + 534 563 # Stop capture if running 535 564 if self.capture_running: 536 565 logger.info("Stopping capture for shutdown") ··· 543 572 date_part, time_part = self.get_timestamp_parts(self.start_at) 544 573 duration = int(time.time() - self.start_at) 545 574 day_dir = day_path(date_part) 575 + segment_key = f"{time_part}_{duration}" 546 576 547 - # Finalize video files 577 + saved_files: list[str] = [] 578 + 579 + # Rename video files to simple names in draft folder 548 580 for display in self.current_displays: 549 - if os.path.exists(display.temp_path): 550 - final_name = display.final_name(time_part, duration) 551 - final_path = str(day_dir / final_name) 552 - self.finalize_screencast(display.temp_path, final_path) 581 + if os.path.exists(display.file_path): 582 + simple_name = f"{display.position}_{display.display_id}_screen.mov" 583 + simple_path = Path(self.draft_dir) / simple_name 584 + try: 585 + os.rename(display.file_path, simple_path) 586 + saved_files.append(simple_name) 587 + except OSError as e: 588 + logger.error(f"Failed to rename {display.file_path}: {e}") 553 589 554 - # Check and finalize audio if threshold met 555 - if self.current_audio and os.path.exists(self.current_audio.temp_path): 556 - if self._check_audio_threshold(self.current_audio.temp_path): 557 - final_name = self.current_audio.final_name(time_part, duration) 558 - final_path = str(day_dir / final_name) 559 - self.finalize_screencast(self.current_audio.temp_path, final_path) 590 + # Check and rename audio if threshold met 591 + if self.current_audio and os.path.exists(self.current_audio.file_path): 592 + if self._check_audio_threshold(self.current_audio.file_path): 593 + simple_name = "audio.m4a" 594 + simple_path = Path(self.draft_dir) / simple_name 595 + try: 596 + os.rename(self.current_audio.file_path, simple_path) 597 + saved_files.append(simple_name) 598 + except OSError as e: 599 + logger.error(f"Failed to rename audio: {e}") 560 600 else: 561 601 try: 562 - os.remove(self.current_audio.temp_path) 602 + os.remove(self.current_audio.file_path) 563 603 logger.info("Final audio below threshold, discarded") 564 604 except OSError: 565 605 pass 566 606 567 - self.capture_running = False 607 + # Rename draft folder to final segment name 608 + if self.draft_dir and saved_files: 609 + final_segment_dir = str(day_dir / segment_key) 610 + try: 611 + os.rename(self.draft_dir, final_segment_dir) 612 + logger.info(f"Segment finalized: {segment_key}") 568 613 569 - # Process any remaining pending finalizations 570 - if self.pending_finalization: 571 - await asyncio.sleep(0.5) 572 - for temp_path, final_path in self.pending_finalization: 573 - if os.path.exists(temp_path): 574 - self.finalize_screencast(temp_path, final_path) 575 - self.pending_finalization = None 614 + # Emit observing event for final segment 615 + if self.callosum: 616 + self.callosum.emit( 617 + "observe", 618 + "observing", 619 + day=date_part, 620 + segment=segment_key, 621 + files=saved_files, 622 + host=HOST, 623 + platform=PLATFORM, 624 + ) 625 + except OSError as e: 626 + logger.error(f"Failed to rename draft folder: {e}") 627 + elif self.draft_dir: 628 + # No files, clean up draft folder 629 + try: 630 + os.rmdir(self.draft_dir) 631 + except OSError: 632 + pass 633 + 634 + self.draft_dir = None 635 + self.capture_running = False 576 636 577 637 # Stop Callosum connection 578 638 if self.callosum:
+9 -19
observe/macos/screencapture.py
··· 16 16 import threading 17 17 import time 18 18 from dataclasses import dataclass 19 - from pathlib import Path 20 19 from typing import Optional 21 20 22 21 from observe.utils import assign_monitor_positions ··· 37 36 y: int 38 37 width: int 39 38 height: int 40 - temp_path: str 41 - 42 - def final_name(self, time_part: str, duration: int) -> str: 43 - """Generate the final filename for this display's video.""" 44 - return f"{time_part}_{duration}_{self.position}_{self.display_id}_screen.mov" 39 + file_path: str # Path where sck-cli writes the file 45 40 46 41 47 42 @dataclass 48 43 class AudioInfo: 49 44 """Information about the audio recording.""" 50 45 51 - temp_path: str 46 + file_path: str # Path where sck-cli writes the file 52 47 tracks: list[str] 53 - 54 - def final_name(self, time_part: str, duration: int) -> str: 55 - """Generate the final filename for audio.""" 56 - return f"{time_part}_{duration}_audio.m4a" 57 48 58 49 59 50 class ScreenCaptureKitManager: ··· 61 52 Manages sck-cli subprocess for synchronized video and audio capture. 62 53 63 54 Wraps the sck-cli tool to provide lifecycle management, handles process 64 - monitoring, parses JSONL output for display geometry, and manages output 65 - file finalization. 55 + monitoring, and parses JSONL output for display geometry. 66 56 """ 67 57 68 58 def __init__(self, sck_cli_path: str = "sck-cli"): ··· 105 95 106 96 Example: 107 97 >>> manager = ScreenCaptureKitManager() 108 - >>> day_dir = Path("journal/20250101") 109 - >>> output_base = day_dir / ".120000" # Hidden temp file 98 + >>> draft_dir = Path("journal/20250101/120000_draft") 99 + >>> output_base = draft_dir / "capture" 110 100 >>> displays, audio = manager.start(output_base, duration=300) 111 101 """ 112 102 # Build command ··· 218 208 y=mon["box"][1], 219 209 width=mon["box"][2] - mon["box"][0], 220 210 height=mon["box"][3] - mon["box"][1], 221 - temp_path=raw["filename"], 211 + file_path=raw["filename"], 222 212 ) 223 213 ) 224 214 ··· 226 216 if audio_info: 227 217 tracks = [t["name"] for t in audio_info.get("tracks", [])] 228 218 self.audio = AudioInfo( 229 - temp_path=audio_info["filename"], 219 + file_path=audio_info["filename"], 230 220 tracks=tracks, 231 221 ) 232 222 else: ··· 236 226 for display in self.displays: 237 227 logger.info( 238 228 f" Display {display.display_id}: {display.position} " 239 - f"({display.width}x{display.height}) -> {display.temp_path}" 229 + f"({display.width}x{display.height}) -> {display.file_path}" 240 230 ) 241 231 if self.audio: 242 - logger.info(f" Audio: {self.audio.temp_path} ({self.audio.tracks})") 232 + logger.info(f" Audio: {self.audio.file_path} ({self.audio.tracks})") 243 233 244 234 # Start background threads to log remaining stdout/stderr in real-time 245 235 self._output_threads = [
+77 -65
observe/sense.py
··· 100 100 if file_path.name.startswith("."): 101 101 return None 102 102 103 - # Ignore files in subdirectories (segments, trash/) 104 - # Expected structure: journal_dir/YYYYMMDD/file.ext (2 parts from journal_dir) 105 - # Reject: journal_dir/YYYYMMDD/HHMMSS_LEN/file.ext (3+ parts from journal_dir) 103 + # Files should be in segment directories: journal_dir/YYYYMMDD/HHMMSS_LEN/file.ext 104 + # Expected structure: 3 parts from journal_dir 106 105 try: 107 106 rel_path = file_path.relative_to(self.journal_dir) 108 - if len(rel_path.parts) != 2: 107 + if len(rel_path.parts) != 3: 109 108 return None 110 109 except ValueError: 111 110 # File not under journal directory ··· 128 127 ): 129 128 """Spawn a handler process for the file. 130 129 130 + Files are expected to be in segment directories: YYYYMMDD/HHMMSS_LEN/file.ext 131 + 131 132 Args: 132 - file_path: Path to the file to process 133 + file_path: Path to the file to process (in segment directory) 133 134 handler_name: Name of the handler (e.g., "describe", "transcribe") 134 135 command: Command template with {file} placeholder 135 136 day: Day string (YYYYMMDD), extracted from path if not provided 136 137 batch: Whether this is from batch processing mode 137 - segment: Segment key for SEGMENT_KEY env var 138 + segment: Segment key, extracted from path if not provided 138 139 remote: Remote name for REMOTE_NAME env var 139 140 """ 140 - # Extract day from path if not provided (journal_dir/YYYYMMDD/file.ext) 141 - if day is None: 142 - try: 143 - rel_path = file_path.relative_to(self.journal_dir) 144 - if len(rel_path.parts) >= 1: 141 + # Extract day and segment from path: journal_dir/YYYYMMDD/HHMMSS_LEN/file.ext 142 + try: 143 + rel_path = file_path.relative_to(self.journal_dir) 144 + if len(rel_path.parts) >= 2: 145 + if day is None: 145 146 day = rel_path.parts[0] 146 - except ValueError: 147 - pass 148 - 149 - # Extract segment from filename if not provided 150 - if segment is None: 151 - from think.utils import segment_key as get_segment_key 152 - 153 - segment = get_segment_key(file_path.name) 147 + if segment is None: 148 + segment = rel_path.parts[1] 149 + except ValueError: 150 + pass 154 151 155 152 with self.lock: 156 153 # Skip if already processing this file ··· 185 182 # Generate correlation ID for this handler run 186 183 ref = str(int(time.time() * 1000)) 187 184 188 - # Create segment directory before emitting detected event 189 - # This ensures the directory exists for event logging 190 - if day and segment: 191 - segment_dir = self.journal_dir / day / segment 192 - segment_dir.mkdir(exist_ok=True) 193 - 194 185 # Emit detected event with file and ref 195 186 if self.callosum: 196 187 try: ··· 223 214 # Use unified runner to spawn process with automatic logging 224 215 logger.info(f"Spawning {handler_name} for {file_path.name}: {' '.join(cmd)}") 225 216 226 - # Build environment with segment/remote context for handlers 217 + # Build environment with remote context for handlers 227 218 env = os.environ.copy() 228 - if segment: 229 - env["SEGMENT_KEY"] = segment 230 219 if remote: 231 220 env["REMOTE_NAME"] = remote 232 221 ··· 306 295 307 296 def _check_segment_observed(self, file_path: Path): 308 297 """Check if all files for this segment have completed processing.""" 309 - from think.utils import segment_key 298 + from observe.utils import get_segment_key 310 299 311 - segment = segment_key(file_path.name) 300 + segment = get_segment_key(file_path) 312 301 if not segment: 313 302 return 314 303 ··· 359 348 360 349 Args: 361 350 file_path: Path to the file to process 362 - segment: Optional segment key for SEGMENT_KEY env var 351 + segment: Optional segment key for tracking 363 352 remote: Optional remote name for REMOTE_NAME env var 364 353 """ 365 354 if not file_path.exists(): ··· 396 385 logger.info(f"Received observing event: {day}/{segment} ({len(files)} files)") 397 386 398 387 # Build full paths for all files in this segment 399 - day_dir = self.journal_dir / day 400 - file_paths = [day_dir / filename for filename in files] 388 + # Files are in segment directories: YYYYMMDD/HHMMSS_LEN/filename 389 + segment_dir = self.journal_dir / day / segment 390 + file_paths = [segment_dir / filename for filename in files] 401 391 402 392 # Pre-register segment tracking with complete file list 403 393 # This ensures segment completion is tracked correctly even if some files ··· 572 562 def process_day(self, day: str, max_jobs: int = 1): 573 563 """Process all matching unprocessed files from a specific day directory. 574 564 575 - Files are considered unprocessed if the source media file has not been 576 - moved to segments (HHMMSS/). This approach handles incomplete 577 - processing gracefully by re-running even if output files exist. 565 + Files are in segment directories (HHMMSS_LEN/). A file is considered 566 + unprocessed if it has no corresponding .jsonl output file. 578 567 579 568 Args: 580 569 day: Day in YYYYMMDD format 581 570 max_jobs: Maximum number of concurrent processing jobs 582 571 """ 572 + from think.utils import segment_key 573 + 583 574 day_dir = day_path(day) 584 575 if not day_dir.exists(): 585 576 logger.error(f"Day directory not found: {day_dir}") 586 577 return 587 578 588 - # Find all matching unprocessed files (not yet moved to segments) 579 + # Find all matching unprocessed files in segment directories 589 580 to_process = [] 590 - for file_path in day_dir.iterdir(): 591 - if file_path.is_file(): 581 + for segment_dir in day_dir.iterdir(): 582 + if not segment_dir.is_dir() or not segment_key(segment_dir.name): 583 + continue 584 + 585 + for file_path in segment_dir.iterdir(): 586 + if not file_path.is_file(): 587 + continue 588 + 589 + # Check if output JSONL exists (already processed) 590 + output_path = file_path.with_suffix(".jsonl") 591 + if output_path.exists(): 592 + continue 593 + 592 594 handler_info = self._match_pattern(file_path) 593 595 if handler_info: 594 596 handler_name, command = handler_info ··· 650 652 def scan_day(day_dir: Path) -> dict: 651 653 """Scan a day directory for processed and unprocessed files. 652 654 655 + Files are in segment directories (HHMMSS_LEN/). A file is considered 656 + processed if it has a corresponding .jsonl output file. 657 + 653 658 Args: 654 659 day_dir: Path to day directory (YYYYMMDD) 655 660 656 661 Returns: 657 662 Dictionary with: 658 663 - "processed": List of JSONL output files in segments (HHMMSS_LEN/audio.jsonl, etc) 659 - - "unprocessed": List of unprocessed source media files in day root 664 + - "unprocessed": List of unprocessed source media files in segments 660 665 - "pending_segments": Count of unique segments with pending files 661 666 """ 662 - # Find processed output files in segments (HHMMSS_LEN/) 663 667 from think.utils import segment_key 664 668 665 669 processed = [] 670 + unprocessed = [] 671 + pending_segment_keys = set() 672 + 666 673 if not day_dir.exists(): 667 674 return {"processed": [], "unprocessed": [], "pending_segments": 0} 668 675 669 676 for segment in day_dir.iterdir(): 670 - if segment.is_dir() and segment_key(segment.name): 671 - # Check for audio JSONL files (audio.jsonl, mic_audio.jsonl, etc.) 672 - for audio_file in segment.glob("*audio.jsonl"): 673 - processed.append(f"{segment.name}/{audio_file.name}") 674 - # Check for screen JSONL files (screen.jsonl, etc.) 675 - for screen_file in segment.glob("*screen.jsonl"): 676 - processed.append(f"{segment.name}/{screen_file.name}") 677 + if not segment.is_dir() or not segment_key(segment.name): 678 + continue 679 + 680 + # Check each file in the segment 681 + for file_path in segment.iterdir(): 682 + if not file_path.is_file(): 683 + continue 677 684 678 - processed.sort() 685 + # JSONL files are outputs 686 + if file_path.suffix == ".jsonl": 687 + processed.append(f"{segment.name}/{file_path.name}") 688 + continue 679 689 680 - # Find unprocessed source media (still in day root, not yet moved to segments) 681 - # Match by extension only - any descriptive suffix is allowed 682 - unprocessed = [] 683 - unprocessed.extend(sorted(p.name for p in day_dir.glob("*.flac"))) 684 - unprocessed.extend(sorted(p.name for p in day_dir.glob("*.m4a"))) 685 - for ext in VIDEO_EXTENSIONS: 686 - unprocessed.extend(sorted(p.name for p in day_dir.glob(f"*{ext}"))) 690 + # Check if media file has corresponding JSONL (processed) 691 + if ( 692 + file_path.suffix.lower() in VIDEO_EXTENSIONS 693 + or file_path.suffix.lower() 694 + in ( 695 + ".flac", 696 + ".m4a", 697 + ) 698 + ): 699 + output_path = file_path.with_suffix(".jsonl") 700 + if not output_path.exists(): 701 + unprocessed.append(f"{segment.name}/{file_path.name}") 702 + pending_segment_keys.add(segment.name) 687 703 688 - # Count unique segments with pending files 689 - pending_segment_keys = set() 690 - for filename in unprocessed: 691 - key = segment_key(filename) 692 - if key: 693 - pending_segment_keys.add(key) 704 + processed.sort() 705 + unprocessed.sort() 694 706 695 707 return { 696 708 "processed": processed, ··· 722 734 723 735 sensor = FileSensor(journal, verbose=args.verbose, debug=args.debug) 724 736 725 - # Register handlers - match by extension, ignore descriptive suffix 726 - # Audio files: any HHMMSS_*.flac or HHMMSS_*.m4a in day root 737 + # Register handlers - match by extension 738 + # Audio files in segment directories 727 739 sensor.register("*.flac", "transcribe", ["observe-transcribe", "{file}"]) 728 740 sensor.register("*.m4a", "transcribe", ["observe-transcribe", "{file}"]) 729 741 730 - # Video files: any HHMMSS_*.webm, HHMMSS_*.mp4, HHMMSS_*.mov in day root 742 + # Video files in segment directories 731 743 for ext in VIDEO_EXTENSIONS: 732 744 sensor.register(f"*{ext}", "describe", ["observe-describe", "{file}"]) 733 745
+33 -78
observe/transcribe.py
··· 22 22 23 23 from observe.diarize import DiarizationError, diarize, save_speaker_embeddings 24 24 from observe.hear import SAMPLE_RATE 25 - from observe.utils import get_segment_key, segment_and_suffix 25 + from observe.utils import get_segment_key 26 26 from think.callosum import callosum_send 27 27 from think.entities import load_entity_names 28 28 from think.models import GEMINI_FLASH ··· 156 156 157 157 self.prompt_text = prompt_data.text 158 158 159 - def _segment_info(self, audio_path: Path) -> tuple[Path, str, bool]: 160 - """Return segment directory, descriptive suffix, and whether already in segment.""" 161 - segment, suffix = segment_and_suffix(audio_path) 162 - in_segment = get_segment_key(audio_path.parent) is not None 163 - if in_segment: 164 - return audio_path.parent, suffix, True 165 - return audio_path.parent / segment, suffix, False 166 - 167 - def _move_to_segment(self, audio_path: Path) -> Path: 168 - """Move audio file to its segment and return new path.""" 169 - segment_dir, suffix, in_segment = self._segment_info(audio_path) 170 - if in_segment: 171 - return audio_path 172 - try: 173 - segment_dir.mkdir(exist_ok=True) 174 - new_path = segment_dir / f"{suffix}.flac" 175 - audio_path.rename(new_path) 176 - logging.info("Moved %s to %s", audio_path, segment_dir) 177 - return new_path 178 - except Exception as exc: 179 - logging.error("Failed to move %s to segment: %s", audio_path, exc) 180 - return audio_path 181 - 182 159 def _prepare_audio(self, raw_path: Path) -> Path: 183 160 """Prepare audio file for diarization, converting if needed. 184 161 ··· 302 279 data = data.mean(axis=1) 303 280 304 281 # Extract date and time based on path structure 282 + # Files are always in segment directories: YYYYMMDD/HHMMSS_LEN/audio.flac 305 283 segment = get_segment_key(raw_path) 306 - time_part = ( 307 - segment.split("_")[0] if segment else raw_path.stem.split("_")[0] 308 - ) 309 - # Day dir is parent or grandparent depending on whether file is in segment 310 - if get_segment_key(raw_path.parent) is not None: 311 - day_str = raw_path.parent.parent.name 312 - else: 313 - day_str = raw_path.parent.name 284 + time_part = segment.split("_")[0] if segment else "000000" 285 + day_str = raw_path.parent.parent.name 314 286 315 287 base_dt = datetime.datetime.strptime( 316 288 f"{day_str}_{time_part}", "%Y%m%d_%H%M%S" ··· 379 351 audio_path.unlink() 380 352 381 353 def _get_json_path(self, audio_path: Path) -> Path: 382 - """Generate the corresponding JSONL path in timestamp directory. 354 + """Generate the corresponding JSONL path in segment directory. 383 355 384 - Handles both locations: 385 - - Day root: YYYYMMDD/HHMMSS_LEN_audio.flac -> YYYYMMDD/HHMMSS_LEN/audio.jsonl 386 - - Segment dir: YYYYMMDD/HHMMSS_LEN/audio.flac -> YYYYMMDD/HHMMSS_LEN/audio.jsonl 356 + Files are always in segment directories: 357 + YYYYMMDD/HHMMSS_LEN/audio.flac -> YYYYMMDD/HHMMSS_LEN/audio.jsonl 387 358 """ 388 - segment_dir, suffix, in_segment = self._segment_info(audio_path) 389 - if not in_segment: 390 - segment_dir.mkdir(exist_ok=True) 391 - return segment_dir / f"{suffix}.jsonl" 359 + return audio_path.with_suffix(".jsonl") 392 360 393 361 def _get_embeddings_dir(self, audio_path: Path) -> Path: 394 362 """Get directory for storing speaker embeddings. 395 363 396 - Handles both locations: 397 - - Day root: YYYYMMDD/HHMMSS_LEN_audio.flac -> YYYYMMDD/HHMMSS_LEN/audio/ 398 - - Segment dir: YYYYMMDD/HHMMSS_LEN/audio.flac -> YYYYMMDD/HHMMSS_LEN/audio/ 364 + Files are always in segment directories: 365 + YYYYMMDD/HHMMSS_LEN/audio.flac -> YYYYMMDD/HHMMSS_LEN/audio/ 399 366 """ 400 - segment_dir, suffix, _ = self._segment_info(audio_path) 401 - return segment_dir / suffix 367 + return audio_path.parent / audio_path.stem 402 368 403 369 def _transcribe( 404 370 self, ··· 463 429 transcript_items = result[:-1] 464 430 465 431 # Add audio file reference to metadata 466 - # Day root: stem is HHMMSS_LEN_suffix, need to extract suffix 467 - # Segment dir: stem is already the suffix (e.g., "audio") 468 - _, suffix, _ = self._segment_info(raw_path) 469 - 470 - metadata["raw"] = f"{suffix}.flac" 432 + # Files are in segment directories, stem is the suffix (e.g., "audio") 433 + metadata["raw"] = f"{raw_path.stem}{raw_path.suffix}" 471 434 472 435 # Add remote origin if set (from sense.py for remote observer uploads) 473 436 remote = os.getenv("REMOTE_NAME") ··· 486 449 # Extract source from <source>_audio pattern 487 450 # mic_audio -> "mic", sys_audio -> "sys", phone_audio -> "phone", etc. 488 451 source = None 452 + suffix = raw_path.stem 489 453 if suffix.endswith("_audio") and suffix != "audio": 490 454 source = suffix[:-6] # Remove "_audio" suffix 491 455 ··· 507 471 def _handle_raw(self, raw_path: Path, redo: bool = False) -> None: 508 472 """Process a raw audio file. 509 473 474 + Files are expected to be in segment directories (YYYYMMDD/HHMMSS_LEN/). 475 + 510 476 Args: 511 - raw_path: Path to audio file 512 - redo: If True, skip "already processed" check and don't move file 513 - (for reprocessing files already in segment directories) 477 + raw_path: Path to audio file in segment directory 478 + redo: If True, skip "already processed" check 514 479 """ 515 480 start_time = time.time() 516 481 517 - # Use segment from env (set by sense.py) or derive from path 518 - segment = os.getenv("SEGMENT_KEY") or get_segment_key(raw_path) 519 - if segment and not os.getenv("SEGMENT_KEY"): 520 - os.environ["SEGMENT_KEY"] = segment 482 + # Derive segment from path (parent dir is segment dir) 483 + segment = get_segment_key(raw_path) 521 484 522 485 # Skip if already processed (unless redo mode) 523 486 json_path = self._get_json_path(raw_path) 524 487 if not redo and json_path.exists(): 525 - logging.info(f"Already processed, moving to timestamp dir: {raw_path}") 526 - self._move_to_segment(raw_path) 488 + logging.info(f"Already processed: {raw_path}") 527 489 return 528 490 529 491 # Process audio with diarization ··· 545 507 # Transcribe 546 508 success = self._transcribe(raw_path, turns, speakers, diarization_data) 547 509 if success: 548 - # In redo mode, file is already in segment dir - don't move 549 - if redo: 550 - final_path = raw_path 551 - else: 552 - final_path = self._move_to_segment(raw_path) 553 - 554 510 # Save speaker embeddings 555 511 if speaker_embeddings: 556 512 embeddings_dir = self._get_embeddings_dir(raw_path) ··· 561 517 duration_ms = int((time.time() - start_time) * 1000) 562 518 563 519 try: 564 - rel_input = final_path.relative_to(journal_path) 520 + rel_input = raw_path.relative_to(journal_path) 565 521 rel_output = json_path.relative_to(journal_path) 566 522 except ValueError: 567 - rel_input = final_path 523 + rel_input = raw_path 568 524 rel_output = json_path 569 525 570 - # Extract day from audio path (raw_path.parent is day dir) 571 - day = raw_path.parent.name 526 + # Extract day from audio path (grandparent is day dir) 527 + day = raw_path.parent.parent.name 572 528 573 529 event_fields = { 574 530 "input": str(rel_input), ··· 592 548 parser.add_argument( 593 549 "audio_path", 594 550 type=str, 595 - help="Path to audio file to process (.flac or .m4a)", 551 + help="Path to audio file in segment directory (.flac or .m4a)", 596 552 ) 597 553 parser.add_argument( 598 554 "--redo", 599 555 action="store_true", 600 - help="Reprocess file already in segment directory, overwriting outputs", 556 + help="Reprocess file, overwriting existing outputs", 601 557 ) 602 558 args = setup_cli(parser) 603 559 ··· 623 579 f"Supported formats: {', '.join(supported_formats)}" 624 580 ) 625 581 626 - # Validate --redo requires file to be in segment directory 627 - if args.redo: 628 - if get_segment_key(audio_path.parent) is None: 629 - parser.error( 630 - f"--redo requires audio file to be in a segment directory (HHMMSS_LEN/), " 631 - f"but parent is: {audio_path.parent.name}" 632 - ) 582 + # Files must be in segment directories (YYYYMMDD/HHMMSS_LEN/) 583 + if get_segment_key(audio_path) is None: 584 + parser.error( 585 + f"Audio file must be in a segment directory (HHMMSS_LEN/), " 586 + f"but parent is: {audio_path.parent.name}" 587 + ) 633 588 634 589 logging.info(f"Processing audio: {audio_path}") 635 590
+23 -97
observe/utils.py
··· 14 14 AUDIO_EXTENSIONS = (".flac", ".ogg", ".m4a") 15 15 16 16 17 - def extract_descriptive_suffix(filename: str) -> str: 18 - """ 19 - Extract descriptive suffix from media filename. 20 - 21 - Returns the portion after the segment (HHMMSS_LEN), preserving 22 - the descriptive information for the final filename in the segment directory. 23 - 24 - Parameters 25 - ---------- 26 - filename : str 27 - Filename stem (without extension), e.g., "143022_300_audio" 28 - 29 - Returns 30 - ------- 31 - str 32 - Descriptive suffix (e.g., "audio", "screen", "mic_sys"), or "raw" if none 33 - 34 - Examples 35 - -------- 36 - >>> extract_descriptive_suffix("143022_300_audio") 37 - "audio" 38 - >>> extract_descriptive_suffix("143022_300_screen") 39 - "screen" 40 - >>> extract_descriptive_suffix("143022_300_mic_sys") 41 - "mic_sys" 42 - >>> extract_descriptive_suffix("143022_300") 43 - "raw" 44 - """ 45 - parts = filename.split("_") 46 - 47 - # Filename format: HHMMSS_LEN[_descriptive_text...] 48 - # First part must be 6-digit timestamp 49 - if not parts or not parts[0].isdigit() or len(parts[0]) != 6: 50 - raise ValueError( 51 - f"Invalid filename format: {filename} (must start with HHMMSS)" 52 - ) 53 - 54 - # Second part must be numeric duration suffix 55 - if len(parts) < 2 or not parts[1].isdigit(): 56 - raise ValueError( 57 - f"Invalid filename format: {filename} (must have HHMMSS_LEN format)" 58 - ) 59 - 60 - # HHMMSS_LEN_suffix... - join remaining parts as descriptive suffix 61 - if len(parts) > 2: 62 - return "_".join(parts[2:]) 63 - else: 64 - return "raw" 65 - 66 - 67 17 def get_segment_key(media_path: Path) -> str | None: 68 18 """ 69 19 Extract segment key from a media file path. 70 20 71 - Checks parent directory first (for files already in segment dirs), 72 - then falls back to filename stem (for files in day root). 21 + For the new model, files are always in segment directories (HHMMSS_LEN/). 22 + The segment key is the parent directory name. 73 23 74 24 Parameters 75 25 ---------- ··· 85 35 -------- 86 36 >>> get_segment_key(Path("/journal/20250101/143022_300/audio.flac")) 87 37 "143022_300" 88 - >>> get_segment_key(Path("/journal/20250101/143022_300_audio.flac")) 89 - "143022_300" 90 38 >>> get_segment_key(Path("/journal/20250101/random.txt")) 91 39 None 92 40 """ 93 41 from think.utils import segment_key 94 42 95 - # Check if parent directory is a segment (file already moved) 96 - parent_segment = segment_key(media_path.parent.name) 97 - if parent_segment: 98 - return parent_segment 99 - 100 - # Check if filename contains segment (file in day root) 101 - return segment_key(media_path.stem) 43 + # Segment key is the parent directory name 44 + return segment_key(media_path.parent.name) 102 45 103 46 104 47 def segment_and_suffix(media_path: Path) -> tuple[str, str]: 105 48 """ 106 49 Extract segment key and descriptive suffix from a media file path. 107 50 108 - Handles both files in day root (YYYYMMDD/HHMMSS_LEN_suffix.ext) and 109 - files already in segment directories (YYYYMMDD/HHMMSS_LEN/suffix.ext). 51 + For the new model, files are always in segment directories. 52 + The segment key is the parent directory name, suffix is the file stem. 110 53 111 54 Parameters 112 55 ---------- 113 56 media_path : Path 114 - Path to media file (audio or video) 57 + Path to media file (audio or video) in a segment directory 115 58 116 59 Returns 117 60 ------- ··· 121 64 Raises 122 65 ------ 123 66 ValueError 124 - If the path doesn't contain a valid segment key 67 + If the parent directory is not a valid segment 125 68 126 69 Examples 127 70 -------- 128 - >>> segment_and_suffix(Path("/journal/20250101/143022_300_audio.flac")) 129 - ("143022_300", "audio") 130 71 >>> segment_and_suffix(Path("/journal/20250101/143022_300/audio.flac")) 131 72 ("143022_300", "audio") 73 + >>> segment_and_suffix(Path("/journal/20250101/143022_300/center_DP-3_screen.webm")) 74 + ("143022_300", "center_DP-3_screen") 132 75 """ 133 76 from think.utils import segment_key 134 77 135 - # Check if parent directory is a segment (file already moved) 136 - parent_segment = segment_key(media_path.parent.name) 137 - if parent_segment: 138 - # File is in segment dir - stem is the suffix 139 - return parent_segment, media_path.stem 140 - 141 - # File is in day root - extract segment from filename 142 - segment = segment_key(media_path.stem) 78 + # Segment key is the parent directory name 79 + segment = segment_key(media_path.parent.name) 143 80 if segment is None: 144 81 raise ValueError( 145 - f"Invalid media filename: {media_path.stem} (must contain HHMMSS_LEN)" 82 + f"File not in segment directory: {media_path} " 83 + f"(parent {media_path.parent.name} is not HHMMSS_LEN format)" 146 84 ) 147 85 148 - suffix = extract_descriptive_suffix(media_path.stem) 149 - return segment, suffix 86 + # Suffix is the file stem 87 + return segment, media_path.stem 150 88 151 89 152 90 def parse_screen_filename(filename: str) -> tuple[str, str]: 153 91 """ 154 92 Parse position and connector/displayID from a per-monitor screen filename. 155 93 156 - Handles both pre-move filenames (with segment prefix) and post-move filenames 157 - (in segment directory without prefix). Works with both GNOME connector IDs 158 - (e.g., "DP-3") and macOS displayIDs (e.g., "1"). 94 + Files are in segment directories with format: position_connector_screen.ext 95 + Works with both GNOME connector IDs (e.g., "DP-3") and macOS displayIDs (e.g., "1"). 159 96 160 97 Parameters 161 98 ---------- 162 99 filename : str 163 100 Filename stem (without extension), e.g.: 164 - - "143022_300_center_DP-3_screen" (GNOME pre-move) 165 - - "143022_300_center_1_screen" (macOS pre-move) 166 - - "center_DP-3_screen" (GNOME post-move) 167 - - "center_1_screen" (macOS post-move) 101 + - "center_DP-3_screen" (GNOME) 102 + - "center_1_screen" (macOS) 168 103 169 104 Returns 170 105 ------- ··· 174 109 175 110 Examples 176 111 -------- 177 - >>> parse_screen_filename("143022_300_center_DP-3_screen") 178 - ("center", "DP-3") 179 - >>> parse_screen_filename("143022_300_center_1_screen") 180 - ("center", "1") 181 112 >>> parse_screen_filename("center_DP-3_screen") 182 113 ("center", "DP-3") 183 114 >>> parse_screen_filename("center_1_screen") 184 115 ("center", "1") 185 - >>> parse_screen_filename("143022_300_screen") 186 - ("unknown", "unknown") 116 + >>> parse_screen_filename("left_HDMI-1_screen") 117 + ("left", "HDMI-1") 187 118 """ 188 - # Pattern 1: HHMMSS_LEN_position_connector_screen (pre-move) 119 + # Pattern: position_connector_screen 189 120 # Connector can be alphanumeric with hyphens (GNOME: DP-3) or just numeric (macOS: 1) 190 - match = re.match(r"^\d{6}_\d+_([a-z-]+)_([A-Za-z0-9-]+)_screen$", filename) 191 - if match: 192 - return match.group(1), match.group(2) 193 - 194 - # Pattern 2: position_connector_screen (post-move, in segment directory) 195 121 match = re.match(r"^([a-z-]+)_([A-Za-z0-9-]+)_screen$", filename) 196 122 if match: 197 123 return match.group(1), match.group(2)
+6 -4
tests/test_journal_stats.py
··· 11 11 day = journal / "20240101" 12 12 day.mkdir() 13 13 14 - # Create an audio jsonl file in segment directory 14 + # Create an audio jsonl file in segment directory (already processed) 15 15 ts_dir = day / "123456_300" 16 16 ts_dir.mkdir() 17 17 (ts_dir / "audio.jsonl").write_text( ··· 20 20 '{"start": "10:01:00", "text": "world"}\n' 21 21 ) 22 22 23 - # Create unprocessed media files (remain in day root, will be moved to segment on processing) 24 - (day / "123456_300_audio.flac").write_bytes(b"RIFF") 25 - (day / "123456_300_center_DP-1_screen.webm").write_bytes(b"WEBM") 23 + # Create unprocessed media files in a second segment directory (no jsonl output yet) 24 + ts_dir2 = day / "134500_300" 25 + ts_dir2.mkdir() 26 + (ts_dir2 / "audio.flac").write_bytes(b"RIFF") 27 + (ts_dir2 / "center_DP-1_screen.webm").write_bytes(b"WEBM") 26 28 27 29 (day / "entities.md").write_text("") 28 30 (day / "insights").mkdir()
+25 -21
tests/test_observe_utils.py
··· 148 148 149 149 150 150 class TestParseScreenFilename: 151 - """Test screen filename parsing for per-monitor files.""" 151 + """Test screen filename parsing for per-monitor files. 152 + 153 + Files are now always in segment directories with simple names: 154 + position_connector_screen.webm (e.g., center_DP-3_screen.webm) 155 + """ 152 156 153 157 def test_standard_format(self): 154 158 """Parse standard per-monitor filename.""" 155 - position, connector = parse_screen_filename("143022_300_center_DP-3_screen") 159 + position, connector = parse_screen_filename("center_DP-3_screen") 156 160 assert position == "center" 157 161 assert connector == "DP-3" 158 162 159 163 def test_left_position(self): 160 164 """Parse left position filename.""" 161 - position, connector = parse_screen_filename("120000_600_left_HDMI-1_screen") 165 + position, connector = parse_screen_filename("left_HDMI-1_screen") 162 166 assert position == "left" 163 167 assert connector == "HDMI-1" 164 168 165 169 def test_compound_position(self): 166 170 """Parse compound position like left-top.""" 167 - position, connector = parse_screen_filename("090000_300_left-top_DP-1_screen") 171 + position, connector = parse_screen_filename("left-top_DP-1_screen") 168 172 assert position == "left-top" 169 173 assert connector == "DP-1" 170 174 175 + def test_macos_numeric_display_id(self): 176 + """Parse macOS numeric display ID.""" 177 + position, connector = parse_screen_filename("center_1_screen") 178 + assert position == "center" 179 + assert connector == "1" 180 + 171 181 def test_simple_screen_filename(self): 172 182 """Simple screen filename without position returns unknown.""" 173 - position, connector = parse_screen_filename("143022_300_screen") 183 + position, connector = parse_screen_filename("screen") 174 184 assert position == "unknown" 175 185 assert connector == "unknown" 176 186 177 187 def test_audio_filename(self): 178 188 """Audio filename returns unknown.""" 179 - position, connector = parse_screen_filename("143022_300_audio") 189 + position, connector = parse_screen_filename("audio") 180 190 assert position == "unknown" 181 191 assert connector == "unknown" 182 192 183 - def test_post_move_format(self): 184 - """Parse post-move filename (in segment directory, no HHMMSS_LEN prefix).""" 185 - position, connector = parse_screen_filename("center_DP-3_screen") 186 - assert position == "center" 187 - assert connector == "DP-3" 188 - 189 - def test_post_move_left_top(self): 190 - """Parse post-move filename with compound position.""" 191 - position, connector = parse_screen_filename("left-top_HDMI-2_screen") 192 - assert position == "left-top" 193 + def test_right_position(self): 194 + """Parse right position filename.""" 195 + position, connector = parse_screen_filename("right_HDMI-2_screen") 196 + assert position == "right" 193 197 assert connector == "HDMI-2" 194 198 195 - def test_plain_screen(self): 196 - """Plain 'screen' filename returns unknown.""" 197 - position, connector = parse_screen_filename("screen") 198 - assert position == "unknown" 199 - assert connector == "unknown" 199 + def test_compound_left_bottom(self): 200 + """Parse compound left-bottom position.""" 201 + position, connector = parse_screen_filename("left-bottom_DP-2_screen") 202 + assert position == "left-bottom" 203 + assert connector == "DP-2"
+38 -29
tests/test_sense.py
··· 115 115 116 116 117 117 def test_file_sensor_match_pattern(): 118 - """Test pattern matching logic.""" 118 + """Test pattern matching logic. 119 + 120 + Files are expected to be in segment directories: journal/YYYYMMDD/HHMMSS_LEN/file.ext 121 + """ 119 122 with tempfile.TemporaryDirectory() as tmpdir: 120 - # Create journal/day structure 123 + # Create journal/day/segment structure 121 124 journal_dir = Path(tmpdir) 122 125 day_dir = journal_dir / "20250101" 123 - day_dir.mkdir() 126 + segment_dir = day_dir / "123456_300" 127 + segment_dir.mkdir(parents=True) 124 128 125 129 sensor = FileSensor(journal_dir) 126 130 sensor.register("*.webm", "describe", ["echo", "{file}"]) 127 - sensor.register("*_raw.flac", "transcribe", ["cat", "{file}"]) 131 + sensor.register("*.flac", "transcribe", ["cat", "{file}"]) 128 132 129 - # Should match - files in day directory 130 - webm_file = day_dir / "test.webm" 133 + # Should match - files in segment directory 134 + webm_file = segment_dir / "center_DP-3_screen.webm" 131 135 assert sensor._match_pattern(webm_file) is not None 132 136 assert sensor._match_pattern(webm_file)[0] == "describe" 133 137 134 - flac_file = day_dir / "123456_300_raw.flac" 138 + flac_file = segment_dir / "audio.flac" 135 139 assert sensor._match_pattern(flac_file) is not None 136 140 assert sensor._match_pattern(flac_file)[0] == "transcribe" 137 141 138 142 # Should not match - wrong extension 139 - txt_file = day_dir / "test.txt" 143 + txt_file = segment_dir / "test.txt" 140 144 assert sensor._match_pattern(txt_file) is None 141 145 142 - # Should not match - in segment 143 - segment_dir = day_dir / "123456_300" 144 - segment_dir.mkdir() 145 - segment_file = segment_dir / "audio.jsonl" 146 - assert sensor._match_pattern(segment_file) is None 146 + # Should not match - file in day root (not in segment dir) 147 + day_root_file = day_dir / "orphan.webm" 148 + assert sensor._match_pattern(day_root_file) is None 149 + 150 + # Should not match - jsonl output file 151 + jsonl_file = segment_dir / "audio.jsonl" 152 + assert sensor._match_pattern(jsonl_file) is None 147 153 148 154 149 155 @patch("think.runner._get_journal_path") ··· 261 267 def test_file_sensor_handle_file(tmp_path): 262 268 """Test file handling dispatches to correct handler.""" 263 269 with patch.object(FileSensor, "_spawn_handler") as mock_spawn: 264 - # Create journal/day structure 270 + # Create journal/day/segment structure 265 271 day_dir = tmp_path / "20250101" 266 - day_dir.mkdir() 272 + segment_dir = day_dir / "143022_300" 273 + segment_dir.mkdir(parents=True) 267 274 268 275 sensor = FileSensor(tmp_path) 269 276 sensor.register("*.webm", "describe", ["echo", "{file}"]) 270 277 271 - test_file = day_dir / "test.webm" 278 + test_file = segment_dir / "center_DP-3_screen.webm" 272 279 test_file.write_text("content") 273 280 274 281 sensor._handle_file(test_file) ··· 310 317 def test_file_sensor_handle_callosum_message(tmp_path): 311 318 """Test handling of observe.observing Callosum events.""" 312 319 with patch.object(FileSensor, "_handle_file") as mock_handle: 313 - # Create journal/day structure 320 + # Create journal/day/segment structure 314 321 day_dir = tmp_path / "20250101" 315 - day_dir.mkdir() 322 + segment_dir = day_dir / "143022_300" 323 + segment_dir.mkdir(parents=True) 316 324 317 325 sensor = FileSensor(tmp_path) 318 326 sensor.register("*.flac", "transcribe", ["echo", "{file}"]) 319 327 sensor.register("*.webm", "describe", ["echo", "{file}"]) 320 328 321 - # Create test files 322 - audio_file = day_dir / "143022_300_audio.flac" 329 + # Create test files with simple names in segment directory 330 + audio_file = segment_dir / "audio.flac" 323 331 audio_file.write_text("audio content") 324 - video_file = day_dir / "143022_300_screen.webm" 332 + video_file = segment_dir / "center_DP-3_screen.webm" 325 333 video_file.write_text("video content") 326 334 327 - # Simulate observing event 335 + # Simulate observing event with simple filenames 328 336 message = { 329 337 "tract": "observe", 330 338 "event": "observing", 331 339 "day": "20250101", 332 340 "segment": "143022_300", 333 - "files": ["143022_300_audio.flac", "143022_300_screen.webm"], 341 + "files": ["audio.flac", "center_DP-3_screen.webm"], 334 342 } 335 343 336 344 sensor._handle_callosum_message(message) ··· 390 398 """Test that observe.observed event includes day field.""" 391 399 from think.callosum import CallosumConnection 392 400 393 - # Create journal/day structure 401 + # Create journal/day/segment structure 394 402 day_dir = tmp_path / "20250101" 395 - day_dir.mkdir() 403 + segment_dir = day_dir / "143022_300" 404 + segment_dir.mkdir(parents=True) 396 405 397 406 sensor = FileSensor(tmp_path) 398 407 sensor.register("*.flac", "transcribe", ["echo", "{file}"]) ··· 402 411 sensor.callosum = CallosumConnection() 403 412 sensor.callosum.start(callback=lambda msg: emitted_events.append(msg)) 404 413 405 - # Create test file 406 - audio_file = day_dir / "143022_300_audio.flac" 414 + # Create test file with simple name in segment directory 415 + audio_file = segment_dir / "audio.flac" 407 416 audio_file.write_text("audio content") 408 417 409 - # Simulate observing event to set up segment tracking 418 + # Simulate observing event to set up segment tracking (simple filenames) 410 419 message = { 411 420 "tract": "observe", 412 421 "event": "observing", 413 422 "day": "20250101", 414 423 "segment": "143022_300", 415 - "files": ["143022_300_audio.flac"], 424 + "files": ["audio.flac"], 416 425 } 417 426 sensor._handle_callosum_message(message) 418 427