personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(importers): stop resetting transcribe stall timer on unrelated observe.status

Per-segment transcription wait was resetting last_progress on every observe.status event, but sense emits one every ~5s while any handler is busy and the payload has no segment field — the stall timeout could never trip. Drop the branch; observe.observed is the real per-segment progress signal. Today this produced 33 wedged sol:import orphans on the box.

Extract the wait loop into _wait_for_segments for direct unit testing; add two regression tests (status-only stream stalls; mixed observed+status stalls remainder).

+164 -48
+92
tests/test_importer_stall_timeout.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import logging 5 + import queue 6 + import time 7 + 8 + from think.importers.cli import _wait_for_segments 9 + 10 + 11 + def _status_event(index: int) -> dict: 12 + return { 13 + "tract": "observe", 14 + "event": "status", 15 + "screen": { 16 + "running": { 17 + "file": f"chronicle/20260502/default/seg-{index}/screen.png", 18 + "ref": f"screen-{index}", 19 + }, 20 + "max_age_seconds": 0, 21 + }, 22 + "audio": { 23 + "queued": [ 24 + { 25 + "file": f"chronicle/20260502/default/seg-{index}/audio.wav", 26 + "age_seconds": index, 27 + } 28 + ], 29 + "max_age_seconds": index, 30 + }, 31 + } 32 + 33 + 34 + def _observed_event(segment: str) -> dict: 35 + return { 36 + "tract": "observe", 37 + "event": "observed", 38 + "segment": segment, 39 + "day": "20260502", 40 + "stream": "default", 41 + } 42 + 43 + 44 + def test_status_only_stream_stalls(caplog): 45 + message_queue: queue.Queue[dict] = queue.Queue() 46 + for index in range(5): 47 + message_queue.put(_status_event(index)) 48 + 49 + pending = {"seg-a", "seg-b"} 50 + caplog.set_level(logging.ERROR, logger="think.importers.cli") 51 + 52 + started_at = time.monotonic() 53 + failed_segments, completed_count = _wait_for_segments( 54 + message_queue, 55 + pending, 56 + segment_timeout=1.0, 57 + poll_timeout=0.05, 58 + ) 59 + elapsed = time.monotonic() - started_at 60 + 61 + assert elapsed < 2.0 62 + assert sorted(failed_segments) == ["seg-a", "seg-b"] 63 + assert completed_count == 0 64 + assert "Transcription stalled: no progress for " in caplog.text 65 + assert "0/2 segments completed, 2 still pending: ['seg-a', 'seg-b']" in caplog.text 66 + 67 + 68 + def test_mixed_observed_and_status_stalls_remaining_segments(caplog): 69 + message_queue: queue.Queue[dict] = queue.Queue() 70 + message_queue.put(_observed_event("seg-a")) 71 + message_queue.put(_observed_event("seg-b")) 72 + for index in range(5): 73 + message_queue.put(_status_event(index)) 74 + 75 + pending = {"seg-a", "seg-b", "seg-c", "seg-d"} 76 + caplog.set_level(logging.ERROR, logger="think.importers.cli") 77 + 78 + started_at = time.monotonic() 79 + failed_segments, completed_count = _wait_for_segments( 80 + message_queue, 81 + pending, 82 + segment_timeout=1.0, 83 + poll_timeout=0.05, 84 + ) 85 + elapsed = time.monotonic() - started_at 86 + 87 + assert elapsed < 2.0 88 + assert completed_count == 2 89 + assert pending == {"seg-c", "seg-d"} 90 + assert sorted(failed_segments) == ["seg-c", "seg-d"] 91 + assert "Transcription stalled: no progress for " in caplog.text 92 + assert "2/4 segments completed, 2 still pending: ['seg-c', 'seg-d']" in caplog.text
+72 -48
think/importers/cli.py
··· 108 108 time.sleep(5) 109 109 110 110 111 + def _wait_for_segments( 112 + message_queue: "queue.Queue[dict[str, Any]]", 113 + pending: set[str], 114 + segment_timeout: float, 115 + *, 116 + completed_count_start: int = 0, 117 + total_segments: int | None = None, 118 + poll_timeout: float = 5.0, 119 + ) -> tuple[list[str], int]: 120 + """Drain message_queue until pending is empty or stall timeout trips. 121 + Returns (failed_segments, completed_count).""" 122 + failed_segments: list[str] = [] 123 + completed_count = completed_count_start 124 + if total_segments is None: 125 + total_segments = completed_count_start + len(pending) 126 + last_progress = time.monotonic() 127 + transcribe_start = time.monotonic() 128 + 129 + logger.info(f"Waiting for {len(pending)} segments to complete") 130 + 131 + while pending: 132 + # Check for timeout since last progress 133 + stall_duration = time.monotonic() - last_progress 134 + if stall_duration > segment_timeout: 135 + total_elapsed = int(time.monotonic() - transcribe_start) 136 + timed_out = sorted(pending) 137 + logger.error( 138 + f"Transcription stalled: no progress for " 139 + f"{int(stall_duration)}s ({total_elapsed}s total). " 140 + f"{completed_count}/{total_segments} segments " 141 + f"completed, {len(timed_out)} still pending: {timed_out}" 142 + ) 143 + failed_segments.extend(timed_out) 144 + break 145 + 146 + # Poll for observe.observed events from message queue 147 + try: 148 + msg = message_queue.get(timeout=poll_timeout) 149 + except queue.Empty: 150 + continue 151 + 152 + tract = msg.get("tract") 153 + event = msg.get("event") 154 + seg = msg.get("segment") 155 + 156 + if tract == "observe" and event == "observed" and seg in pending: 157 + pending.discard(seg) 158 + completed_count += 1 159 + last_progress = time.monotonic() 160 + if msg.get("error"): 161 + errors = msg.get("errors", []) 162 + logger.warning( 163 + f"Segment {seg} failed: {errors} ({len(pending)} remaining)" 164 + ) 165 + failed_segments.append(seg) 166 + else: 167 + logger.info( 168 + f"Segment {seg} transcribed " 169 + f"({completed_count}/{total_segments} done, " 170 + f"{len(pending)} remaining)" 171 + ) 172 + 173 + return failed_segments, completed_count 174 + 175 + 111 176 def _format_timestamp_display(timestamp: str) -> str: 112 177 """Format timestamp for human-readable display.""" 113 178 try: ··· 1061 1126 # Wait for transcription to complete 1062 1127 _set_stage("transcribing") 1063 1128 pending = set(created_segments) 1064 - completed_count = 0 1065 1129 segment_timeout = 600 # 10 minutes since last progress 1066 - last_progress = time.monotonic() 1067 1130 transcribe_start = time.monotonic() 1068 - 1069 - logger.info(f"Waiting for {len(pending)} segments to complete") 1070 - 1071 - while pending: 1072 - # Check for timeout since last progress 1073 - stall_duration = time.monotonic() - last_progress 1074 - if stall_duration > segment_timeout: 1075 - total_elapsed = int(time.monotonic() - transcribe_start) 1076 - timed_out = sorted(pending) 1077 - logger.error( 1078 - f"Transcription stalled: no progress for " 1079 - f"{int(stall_duration)}s ({total_elapsed}s total). " 1080 - f"{completed_count}/{len(created_segments)} segments " 1081 - f"completed, {len(timed_out)} still pending: {timed_out}" 1082 - ) 1083 - failed_segments.extend(timed_out) 1084 - break 1085 - 1086 - # Poll for observe.observed events from message queue 1087 - try: 1088 - msg = _message_queue.get(timeout=5.0) 1089 - except queue.Empty: 1090 - continue 1091 - 1092 - tract = msg.get("tract") 1093 - event = msg.get("event") 1094 - seg = msg.get("segment") 1095 - 1096 - if tract == "observe" and event == "observed" and seg in pending: 1097 - pending.discard(seg) 1098 - completed_count += 1 1099 - last_progress = time.monotonic() 1100 - if msg.get("error"): 1101 - errors = msg.get("errors", []) 1102 - logger.warning( 1103 - f"Segment {seg} failed: {errors} ({len(pending)} remaining)" 1104 - ) 1105 - failed_segments.append(seg) 1106 - else: 1107 - logger.info( 1108 - f"Segment {seg} transcribed " 1109 - f"({completed_count}/{len(created_segments)} done, " 1110 - f"{len(pending)} remaining)" 1111 - ) 1112 - elif tract == "observe" and event == "status": 1113 - last_progress = time.monotonic() 1131 + new_failed_segments, _completed_count = _wait_for_segments( 1132 + _message_queue, 1133 + pending, 1134 + segment_timeout, 1135 + total_segments=len(created_segments), 1136 + ) 1137 + failed_segments.extend(new_failed_segments) 1114 1138 1115 1139 if failed_segments: 1116 1140 logger.warning(