personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix importer stream paths and wait loop timeout

Imported segments were created at day/segment/ but sense expected
day/stream/segment/, so transcription handlers never found the files.
The wait loop also never timed out because the importer's own status
emitter kept the message queue fed, preventing the timeout check from
running.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+30 -15
+9 -2
tests/test_importer.py
··· 96 96 day_dir = day_path("20240101") 97 97 # Duration: seg1 starts at 12:00:00, seg2 at 12:05:00 = 300s duration 98 98 # Last segment (seg2) defaults to 5s since no audio duration 99 - f1 = day_dir / "120000_300" / "imported_audio.jsonl" 100 - f2 = day_dir / "120500_5" / "imported_audio.jsonl" 99 + # Segments are under stream directory (import.text for .txt files) 100 + f1 = day_dir / "import.text" / "120000_300" / "imported_audio.jsonl" 101 + f2 = day_dir / "import.text" / "120500_5" / "imported_audio.jsonl" 101 102 102 103 # Read JSONL format: first line is metadata, subsequent lines are entries 103 104 lines1 = f1.read_text().strip().split("\n") ··· 190 191 str(day_dir), 191 192 base_dt, 192 193 "20240101_120000", 194 + "import.audio", 193 195 ) 194 196 195 197 # Should create 2 segments (0-5 min, 5-7 min) ··· 199 201 assert seg1_key == "120000_300" 200 202 assert seg1_files == ["imported_audio.mp3"] 201 203 assert (seg1_dir / "imported_audio.mp3").exists() 204 + # Segment should be under stream directory 205 + assert seg1_dir == day_dir / "import.audio" / "120000_300" 202 206 203 207 seg2_key, seg2_dir, seg2_files = segments[1] 204 208 assert seg2_key == "120500_300" 205 209 assert seg2_files == ["imported_audio.mp3"] 206 210 assert (seg2_dir / "imported_audio.mp3").exists() 211 + assert seg2_dir == day_dir / "import.audio" / "120500_300" 207 212 208 213 209 214 def test_prepare_audio_segments_with_collision(tmp_path, monkeypatch): ··· 242 247 str(day_dir), 243 248 base_dt, 244 249 "20240101_120000", 250 + "import.audio", 245 251 ) 246 252 247 253 assert len(segments) == 1 248 254 seg_key, seg_dir, seg_files = segments[0] 249 255 assert seg_key == "120001_300" # Deconflicted key 256 + assert seg_dir == day_dir / "import.audio" / "120001_300" 250 257 251 258 252 259 def test_run_import_summary(tmp_path, monkeypatch):
+21 -13
think/importer.py
··· 240 240 day_dir: str, 241 241 base_dt: dt.datetime, 242 242 import_id: str, 243 + stream: str, 243 244 ) -> list[tuple[str, Path, list[str]]]: 244 245 """Slice audio into 5-minute segments for observe pipeline. 245 246 ··· 251 252 day_dir: Day directory path (YYYYMMDD) 252 253 base_dt: Base datetime for timestamp calculation 253 254 import_id: Import identifier 255 + stream: Stream name for directory layout (day/stream/segment/) 254 256 255 257 Returns: 256 258 List of (segment_key, segment_dir, files_list) tuples ··· 258 260 """ 259 261 media = Path(media_path) 260 262 source_ext = media.suffix.lower() 261 - day_path_obj = Path(day_dir) 263 + stream_dir = Path(day_dir) / stream 262 264 263 265 # Get audio duration to calculate number of segments 264 266 duration = _get_audio_duration(media_path) ··· 282 284 segment_key_candidate = f"{time_part}_{segment_duration}" 283 285 284 286 # Check for collision and deconflict if needed 285 - available_key = find_available_segment(day_path_obj, segment_key_candidate) 287 + available_key = find_available_segment(stream_dir, segment_key_candidate) 286 288 if available_key is None: 287 289 logger.warning( 288 290 f"Could not find available segment key near {segment_key_candidate}" ··· 294 296 f"Segment collision: {segment_key_candidate} -> {available_key}" 295 297 ) 296 298 297 - # Create segment directory 298 - segment_dir = day_path_obj / available_key 299 + # Create segment directory under stream 300 + segment_dir = stream_dir / available_key 299 301 segment_dir.mkdir(parents=True, exist_ok=True) 300 302 301 303 # Slice audio for this segment ··· 357 359 base_dt: dt.datetime, 358 360 *, 359 361 import_id: str, 362 + stream: str, 360 363 facet: str | None = None, 361 364 setting: str | None = None, 362 365 audio_duration: int | None = None, ··· 368 371 day_dir: Journal day directory 369 372 base_dt: Base datetime for the import 370 373 import_id: Import identifier 374 + stream: Stream name for directory layout (day/stream/segment/) 371 375 facet: Optional facet name 372 376 setting: Optional setting description 373 377 audio_duration: Optional total audio duration in seconds (for last segment) ··· 377 381 """ 378 382 created_files = [] 379 383 text = _read_transcript(path) 384 + stream_dir = os.path.join(day_dir, stream) 380 385 381 386 # Get start time from base_dt for segmentation 382 387 start_time = base_dt.strftime("%H:%M:%S") ··· 421 426 duration = max(1, duration) 422 427 423 428 segment_name = f"{time_part}_{duration}" 424 - ts_dir = os.path.join(day_dir, segment_name) 429 + ts_dir = os.path.join(stream_dir, segment_name) 425 430 os.makedirs(ts_dir, exist_ok=True) 426 431 json_path = os.path.join(ts_dir, "imported_audio.jsonl") 427 432 ··· 778 783 779 784 try: 780 785 if ext in {".txt", ".md", ".pdf"}: 781 - # Text transcript processing (unchanged - no observe pipeline) 786 + # Text transcript processing — no observe pipeline 782 787 _set_stage("segmenting") 783 788 784 789 created_files = process_transcript( ··· 786 791 day_dir, 787 792 base_dt, 788 793 import_id=args.timestamp, 794 + stream=stream, 789 795 facet=args.facet, 790 796 setting=args.setting, 791 797 ) ··· 809 815 # Write stream markers for text import segments 810 816 for seg in created_segments: 811 817 try: 812 - seg_dir = day_path(day) / seg 818 + seg_dir = day_path(day) / stream / seg 813 819 result = update_stream(stream, day, seg, type="import", host=None) 814 820 write_segment_stream( 815 821 seg_dir, ··· 838 844 day_dir, 839 845 base_dt, 840 846 args.timestamp, 847 + stream, 841 848 ) 842 849 843 850 if not segments: ··· 904 911 logger.info(f"Waiting for {len(pending)} segments to complete") 905 912 906 913 while pending: 914 + # Check for timeout since last progress 915 + if time.monotonic() - last_progress > segment_timeout: 916 + timed_out = sorted(pending) 917 + logger.error(f"Timed out waiting for segments: {timed_out}") 918 + failed_segments.extend(timed_out) 919 + break 920 + 907 921 # Poll for observe.observed events from message queue 908 922 try: 909 923 msg = _message_queue.get(timeout=5.0) 910 924 except queue.Empty: 911 - # Check for timeout since last progress 912 - if time.monotonic() - last_progress > segment_timeout: 913 - timed_out = sorted(pending) 914 - logger.error(f"Timed out waiting for segments: {timed_out}") 915 - failed_segments.extend(timed_out) 916 - pending.clear() 917 925 continue 918 926 919 927 tract = msg.get("tract")