personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix importer hang when transcription fails

sense.py now emits observe.observed with error info when handlers fail,
instead of silently abandoning the segment. The importer polling loop
handles these errors gracefully (partial success) and includes a 10-minute
timeout as a safety net. Failed segments are persisted to imported.json.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+76 -10
+5 -1
docs/CALLOSUM.md
··· 81 81 | `detected` | sense | File detected, handler spawned | 82 82 | `described` | describe | Vision analysis complete | 83 83 | `transcribed` | transcribe | Audio transcription complete (includes VAD metadata) | 84 - | `observed` | sense | All files for segment fully processed | 84 + | `observed` | sense | All files for segment fully processed (may include errors) | 85 85 86 86 **Common fields:** `day`, `segment`, `remote` (for remote uploads) 87 87 **`observing` event fields:** 88 88 - `meta` (dict, optional): Metadata dict from remote observer. Contains `host`, `platform`, and any client-provided fields (e.g., `facet`, `setting`). Passed to handlers via `SEGMENT_META` env var and unrolled into JSONL metadata headers. 89 + 90 + **`observed` event fields:** 91 + - `error` (bool, optional): `true` if any handler failed during segment processing 92 + - `errors` (list[str], optional): Error descriptions for failed handlers (e.g., `["transcribe exit 1"]`) 89 93 90 94 **Correlation:** `detected.ref` matches `logs.exec.ref`; `segment` groups files from same capture window 91 95 **Event Log:** Events with `day` + `segment` are logged to `<day>/<segment>/events.jsonl` by supervisor
+37 -4
observe/sense.py
··· 156 156 self.segment_batch: Dict[str, bool] = {} 157 157 # Track remote origin: {segment_key: remote_name} for remote observer segments 158 158 self.segment_remote: Dict[str, str] = {} 159 + # Track handler errors per segment: {segment_key: [error_strings]} 160 + self.segment_errors: Dict[str, list[str]] = {} 159 161 160 162 def register(self, pattern: str, handler_name: str, command: List[str]): 161 163 """ ··· 392 394 f"with exit code {exit_code} ({elapsed:.1f}s) - see log {log_rel}" 393 395 ) 394 396 397 + # Mark file as done so segment can still complete 398 + self._check_segment_observed( 399 + handler_proc.file_path, 400 + error=f"{handler_proc.handler_name} exit {exit_code}", 401 + ) 402 + 395 403 handler_proc.cleanup() 396 404 397 405 with self.lock: ··· 443 451 day = self.segment_day.get(segment) 444 452 batch = self.segment_batch.get(segment, False) 445 453 remote = self.segment_remote.get(segment) 454 + errors = self.segment_errors.get(segment) 446 455 447 456 if self.callosum: 448 457 event_fields = { ··· 454 463 event_fields["batch"] = True 455 464 if remote: 456 465 event_fields["remote"] = remote 466 + if errors: 467 + event_fields["error"] = True 468 + event_fields["errors"] = errors 457 469 self.callosum.emit("observe", "observed", **event_fields) 458 470 459 - note_str = f" ({note})" if note else "" 460 - logger.info(f"Segment fully observed{note_str}: {day}/{segment} ({duration}s)") 471 + if errors: 472 + logger.warning( 473 + f"Segment observed with errors: {day}/{segment} ({duration}s) - {errors}" 474 + ) 475 + else: 476 + note_str = f" ({note})" if note else "" 477 + logger.info( 478 + f"Segment fully observed{note_str}: {day}/{segment} ({duration}s)" 479 + ) 461 480 462 481 # Cleanup segment tracking 463 482 del self.segment_files[segment] ··· 468 487 del self.segment_batch[segment] 469 488 if segment in self.segment_remote: 470 489 del self.segment_remote[segment] 490 + if segment in self.segment_errors: 491 + del self.segment_errors[segment] 471 492 472 - def _check_segment_observed(self, file_path: Path): 473 - """Check if all files for this segment have completed processing.""" 493 + def _check_segment_observed( 494 + self, file_path: Path, error: str | None = None 495 + ): 496 + """Check if all files for this segment have completed processing. 497 + 498 + Args: 499 + file_path: Path to the file that finished processing 500 + error: Optional error string if the handler failed 501 + """ 474 502 from observe.utils import get_segment_key 475 503 476 504 segment = get_segment_key(file_path) ··· 479 507 480 508 with self.lock: 481 509 if segment in self.segment_files: 510 + if error: 511 + if segment not in self.segment_errors: 512 + self.segment_errors[segment] = [] 513 + self.segment_errors[segment].append(error) 514 + 482 515 self.segment_files[segment].discard(file_path) 483 516 484 517 # If no more pending files, emit observed event
+34 -5
think/importer.py
··· 747 747 # Get parent directory for saving metadata 748 748 media_path = Path(args.media) 749 749 import_dir = media_path.parent 750 + failed_segments: list[str] = [] 750 751 751 752 try: 752 753 if ext in {".txt", ".md", ".pdf"}: ··· 839 840 if not args.skip_summary: 840 841 _set_stage("transcribing") 841 842 pending = set(created_segments) 843 + segment_timeout = 600 # 10 minutes since last progress 844 + last_progress = time.monotonic() 842 845 843 846 logger.info(f"Waiting for {len(pending)} segments to complete") 844 847 ··· 847 850 try: 848 851 msg = _message_queue.get(timeout=5.0) 849 852 except queue.Empty: 853 + # Check for timeout since last progress 854 + if time.monotonic() - last_progress > segment_timeout: 855 + timed_out = sorted(pending) 856 + logger.error( 857 + f"Timed out waiting for segments: {timed_out}" 858 + ) 859 + failed_segments.extend(timed_out) 860 + pending.clear() 850 861 continue 851 862 852 863 tract = msg.get("tract") ··· 854 865 seg = msg.get("segment") 855 866 856 867 if tract == "observe" and event == "observed" and seg in pending: 857 - pending.remove(seg) 858 - logger.info( 859 - f"Segment {seg} transcribed ({len(pending)} remaining)" 860 - ) 868 + pending.discard(seg) 869 + last_progress = time.monotonic() 870 + if msg.get("error"): 871 + errors = msg.get("errors", []) 872 + logger.warning( 873 + f"Segment {seg} failed: {errors} " 874 + f"({len(pending)} remaining)" 875 + ) 876 + failed_segments.append(seg) 877 + else: 878 + logger.info( 879 + f"Segment {seg} transcribed " 880 + f"({len(pending)} remaining)" 881 + ) 861 882 862 - logger.info("All segments transcribed successfully") 883 + if failed_segments: 884 + logger.warning( 885 + f"{len(failed_segments)} of {len(created_segments)} " 886 + f"segments failed: {failed_segments}" 887 + ) 888 + else: 889 + logger.info("All segments transcribed successfully") 863 890 864 891 # Complete processing metadata 865 892 processing_results["processing_completed"] = dt.datetime.now().isoformat() 866 893 processing_results["total_files_created"] = len(all_created_files) 867 894 processing_results["all_created_files"] = all_created_files 868 895 processing_results["segments"] = created_segments 896 + if failed_segments: 897 + processing_results["failed_segments"] = failed_segments 869 898 870 899 # Write imported.json with all processing metadata 871 900 imported_path = import_dir / "imported.json"