personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add --reprocess option to observe-sense for batch re-analysis

New CLI options for observe-sense:
- --reprocess {screen,audio,all}: Delete existing outputs and reprocess
- --segment HHMMSS_LEN: Filter to specific segment
- --dry-run: Preview what would be deleted without making changes

The reprocess flow deletes matching .jsonl outputs, then runs normal
batch processing. Emits observe.observed events per segment, which
triggers supervisor's segment dream pipeline automatically.

Also fixes scan_day() to use AUDIO_EXTENSIONS constant instead of
hardcoded tuple, and adds segment format validation in CLI.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+230 -9
+137 -9
observe/sense.py
··· 20 20 from pathlib import Path 21 21 from typing import Any, Dict, List, Optional 22 22 23 - from observe.utils import VIDEO_EXTENSIONS 23 + from observe.utils import AUDIO_EXTENSIONS, VIDEO_EXTENSIONS 24 24 from think.callosum import CallosumConnection 25 25 from think.runner import ManagedProcess as RunnerManagedProcess 26 26 from think.utils import day_path, setup_cli ··· 559 559 with self.lock: 560 560 self.running.clear() 561 561 562 - def process_day(self, day: str, max_jobs: int = 1): 562 + def process_day( 563 + self, day: str, max_jobs: int = 1, segment_filter: Optional[str] = None 564 + ): 563 565 """Process all matching unprocessed files from a specific day directory. 564 566 565 567 Files are in segment directories (HHMMSS_LEN/). A file is considered ··· 568 570 Args: 569 571 day: Day in YYYYMMDD format 570 572 max_jobs: Maximum number of concurrent processing jobs 573 + segment_filter: Optional segment key to filter (HHMMSS_LEN format) 571 574 """ 572 575 from think.utils import segment_key 573 576 ··· 580 583 to_process = [] 581 584 for segment_dir in day_dir.iterdir(): 582 585 if not segment_dir.is_dir() or not segment_key(segment_dir.name): 586 + continue 587 + 588 + # Apply segment filter if specified 589 + if segment_filter and segment_dir.name != segment_filter: 583 590 continue 584 591 585 592 for file_path in segment_dir.iterdir(): ··· 649 656 logger.info("Batch processing complete") 650 657 651 658 659 + def delete_outputs( 660 + day_dir: Path, 661 + reprocess_type: str, 662 + segment_filter: Optional[str] = None, 663 + dry_run: bool = False, 664 + ) -> list[Path]: 665 + """Delete existing output files to force reprocessing. 666 + 667 + Args: 668 + day_dir: Path to day directory (YYYYMMDD) 669 + reprocess_type: Type of outputs to delete ("screen", "audio", or "all") 670 + segment_filter: Optional segment key to filter (HHMMSS_LEN format) 671 + dry_run: If True, don't delete, just return what would be deleted 672 + 673 + Returns: 674 + List of paths that were (or would be) deleted 675 + """ 676 + from think.utils import segment_key 677 + 678 + deleted = [] 679 + 680 + if not day_dir.exists(): 681 + return deleted 682 + 683 + for segment in day_dir.iterdir(): 684 + if not segment.is_dir() or not segment_key(segment.name): 685 + continue 686 + 687 + # Apply segment filter if specified 688 + if segment_filter and segment.name != segment_filter: 689 + continue 690 + 691 + for file_path in segment.iterdir(): 692 + if not file_path.is_file() or file_path.suffix != ".jsonl": 693 + continue 694 + 695 + stem = file_path.stem.lower() 696 + 697 + # Determine if this output matches the reprocess type 698 + should_delete = False 699 + if reprocess_type == "all": 700 + # Delete all outputs that have a corresponding source file 701 + # Check for video source 702 + for ext in VIDEO_EXTENSIONS: 703 + if (segment / f"{file_path.stem}{ext}").exists(): 704 + should_delete = True 705 + break 706 + # Check for audio source 707 + for ext in AUDIO_EXTENSIONS: 708 + if (segment / f"{file_path.stem}{ext}").exists(): 709 + should_delete = True 710 + break 711 + elif reprocess_type == "screen": 712 + # Screen outputs end with _screen or are just screen 713 + if stem.endswith("_screen") or stem == "screen": 714 + should_delete = True 715 + elif reprocess_type == "audio": 716 + # Audio outputs end with _audio or are just audio 717 + if stem.endswith("_audio") or stem == "audio": 718 + should_delete = True 719 + 720 + if should_delete: 721 + deleted.append(file_path) 722 + if not dry_run: 723 + file_path.unlink() 724 + logger.info(f"Deleted: {file_path.relative_to(day_dir.parent)}") 725 + 726 + return deleted 727 + 728 + 652 729 def scan_day(day_dir: Path) -> dict: 653 730 """Scan a day directory for processed and unprocessed files. 654 731 ··· 690 767 # Check if media file has corresponding JSONL (processed) 691 768 if ( 692 769 file_path.suffix.lower() in VIDEO_EXTENSIONS 693 - or file_path.suffix.lower() 694 - in ( 695 - ".flac", 696 - ".m4a", 697 - ) 770 + or file_path.suffix.lower() in AUDIO_EXTENSIONS 698 771 ): 699 772 output_path = file_path.with_suffix(".jsonl") 700 773 if not output_path.exists(): ··· 726 799 default=1, 727 800 help="Max concurrent processing jobs when using --day (default: 1)", 728 801 ) 802 + parser.add_argument( 803 + "--reprocess", 804 + type=str, 805 + choices=["screen", "audio", "all"], 806 + help="Delete existing outputs and reprocess (requires --day)", 807 + ) 808 + parser.add_argument( 809 + "--segment", 810 + type=str, 811 + help="Filter to specific segment (HHMMSS_LEN format, requires --day)", 812 + ) 813 + parser.add_argument( 814 + "--dry-run", 815 + action="store_true", 816 + help="Show what would be deleted/processed without making changes", 817 + ) 729 818 args = setup_cli(parser) 730 819 731 820 journal = Path(os.getenv("JOURNAL_PATH", "")) 732 821 if not journal.is_dir(): 733 822 parser.error("JOURNAL_PATH not set or invalid") 734 823 824 + # Validate argument combinations 825 + if args.reprocess and not args.day: 826 + parser.error("--reprocess requires --day") 827 + if args.segment and not args.day: 828 + parser.error("--segment requires --day") 829 + if args.dry_run and not args.reprocess: 830 + parser.error("--dry-run requires --reprocess") 831 + 832 + # Validate segment format if provided 833 + if args.segment: 834 + from think.utils import segment_key 835 + 836 + if not segment_key(args.segment): 837 + parser.error(f"--segment must be HHMMSS_LEN format, got: {args.segment}") 838 + 735 839 sensor = FileSensor(journal, verbose=args.verbose, debug=args.debug) 736 840 737 841 # Register handlers - match by extension ··· 744 848 sensor.register(f"*{ext}", "describe", ["observe-describe", "{file}"]) 745 849 746 850 if args.day: 851 + day_dir = day_path(args.day) 852 + 853 + # Handle reprocess mode 854 + if args.reprocess: 855 + deleted = delete_outputs( 856 + day_dir, 857 + args.reprocess, 858 + segment_filter=args.segment, 859 + dry_run=args.dry_run, 860 + ) 861 + 862 + if args.dry_run: 863 + if deleted: 864 + logger.info(f"Would delete {len(deleted)} output file(s):") 865 + for path in deleted: 866 + logger.info(f" {path.relative_to(journal)}") 867 + else: 868 + logger.info("No files to delete") 869 + return 870 + else: 871 + logger.info(f"Deleted {len(deleted)} output file(s)") 872 + 747 873 # Batch mode: process specific day 874 + segment_msg = f" (segment: {args.segment})" if args.segment else "" 748 875 logger.info( 749 - f"Processing files from day {args.day} with {args.jobs} concurrent jobs" 876 + f"Processing files from day {args.day}{segment_msg} " 877 + f"with {args.jobs} concurrent jobs" 750 878 ) 751 - sensor.process_day(args.day, max_jobs=args.jobs) 879 + sensor.process_day(args.day, max_jobs=args.jobs, segment_filter=args.segment) 752 880 else: 753 881 # Event mode: listen for Callosum events 754 882 logger.info("Starting observe sensor in event mode...")
+93
tests/test_sense.py
··· 440 440 assert len(observed_events) == 1 441 441 assert observed_events[0].get("day") == "20250101" 442 442 assert observed_events[0].get("segment") == "143022_300" 443 + 444 + 445 + def test_delete_outputs_screen(tmp_path): 446 + """Test delete_outputs with screen type.""" 447 + from observe.sense import delete_outputs 448 + 449 + # Create journal/day/segment structure 450 + day_dir = tmp_path / "20250101" 451 + segment_dir = day_dir / "143022_300" 452 + segment_dir.mkdir(parents=True) 453 + 454 + # Create source files and outputs 455 + (segment_dir / "center_DP-3_screen.webm").write_text("video") 456 + (segment_dir / "center_DP-3_screen.jsonl").write_text('{"raw": "test"}') 457 + (segment_dir / "audio.flac").write_text("audio") 458 + (segment_dir / "audio.jsonl").write_text('{"raw": "test"}') 459 + 460 + # Delete screen outputs 461 + deleted = delete_outputs(day_dir, "screen") 462 + 463 + assert len(deleted) == 1 464 + assert deleted[0].name == "center_DP-3_screen.jsonl" 465 + assert not (segment_dir / "center_DP-3_screen.jsonl").exists() 466 + assert (segment_dir / "audio.jsonl").exists() # Audio untouched 467 + 468 + 469 + def test_delete_outputs_audio(tmp_path): 470 + """Test delete_outputs with audio type.""" 471 + from observe.sense import delete_outputs 472 + 473 + # Create journal/day/segment structure 474 + day_dir = tmp_path / "20250101" 475 + segment_dir = day_dir / "143022_300" 476 + segment_dir.mkdir(parents=True) 477 + 478 + # Create source files and outputs 479 + (segment_dir / "center_DP-3_screen.webm").write_text("video") 480 + (segment_dir / "center_DP-3_screen.jsonl").write_text('{"raw": "test"}') 481 + (segment_dir / "audio.flac").write_text("audio") 482 + (segment_dir / "audio.jsonl").write_text('{"raw": "test"}') 483 + 484 + # Delete audio outputs 485 + deleted = delete_outputs(day_dir, "audio") 486 + 487 + assert len(deleted) == 1 488 + assert deleted[0].name == "audio.jsonl" 489 + assert not (segment_dir / "audio.jsonl").exists() 490 + assert (segment_dir / "center_DP-3_screen.jsonl").exists() # Screen untouched 491 + 492 + 493 + def test_delete_outputs_dry_run(tmp_path): 494 + """Test delete_outputs with dry_run=True.""" 495 + from observe.sense import delete_outputs 496 + 497 + # Create journal/day/segment structure 498 + day_dir = tmp_path / "20250101" 499 + segment_dir = day_dir / "143022_300" 500 + segment_dir.mkdir(parents=True) 501 + 502 + # Create source files and outputs 503 + (segment_dir / "screen.webm").write_text("video") 504 + (segment_dir / "screen.jsonl").write_text('{"raw": "test"}') 505 + 506 + # Dry run should return files but not delete 507 + deleted = delete_outputs(day_dir, "screen", dry_run=True) 508 + 509 + assert len(deleted) == 1 510 + assert (segment_dir / "screen.jsonl").exists() # Still exists 511 + 512 + 513 + def test_delete_outputs_segment_filter(tmp_path): 514 + """Test delete_outputs with segment filter.""" 515 + from observe.sense import delete_outputs 516 + 517 + # Create journal/day/segments structure 518 + day_dir = tmp_path / "20250101" 519 + segment1 = day_dir / "143022_300" 520 + segment2 = day_dir / "150022_300" 521 + segment1.mkdir(parents=True) 522 + segment2.mkdir(parents=True) 523 + 524 + # Create outputs in both segments 525 + (segment1 / "screen.webm").write_text("video") 526 + (segment1 / "screen.jsonl").write_text('{"raw": "test"}') 527 + (segment2 / "screen.webm").write_text("video") 528 + (segment2 / "screen.jsonl").write_text('{"raw": "test"}') 529 + 530 + # Delete only from segment1 531 + deleted = delete_outputs(day_dir, "screen", segment_filter="143022_300") 532 + 533 + assert len(deleted) == 1 534 + assert not (segment1 / "screen.jsonl").exists() 535 + assert (segment2 / "screen.jsonl").exists() # Other segment untouched