personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

observe/transcribe: schema-constrain + drop shape tolerance

Add a Draft 2020-12 JSON schema for Gemini's transcribe response and
pass it via json_schema on generate(). Tighten _extract_segments to
accept only the documented {"segments": [...]} wrapper; bare-list,
array-wrapped-dict, and {"transcript": ...} fallbacks now raise.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+195 -72
+17 -50
observe/transcribe/gemini.py
··· 36 36 37 37 logger = logging.getLogger(__name__) 38 38 39 + _SCHEMA = json.loads( 40 + (Path(__file__).parent / "gemini.schema.json").read_text(encoding="utf-8") 41 + ) 42 + 39 43 # Regex for parsing speaker strings like "Speaker 1", "Speaker 2" 40 44 SPEAKER_PATTERN = re.compile(r"(?:speaker\s*)?(\d+)", re.IGNORECASE) 41 45 ··· 123 127 return None 124 128 125 129 126 - def _extract_segments(result: list | dict) -> list: 127 - """Extract a segments list from Gemini's JSON response. 130 + def _extract_segments(result: dict) -> list: 131 + """Extract the segments list from Gemini's schema-constrained response. 128 132 129 - Gemini may return any of these shapes: 130 - - ``{"segments": [...]}`` — expected format per prompt 131 - - ``[{"start": ..., "text": ...}, ...]`` — bare list of segment dicts 132 - - ``[{"segments": [...]}]`` — array-wrapped dict 133 - - ``{"transcript": [...]}`` or other single-key wrapper 134 - 135 - Args: 136 - result: Parsed JSON (list or dict). 137 - 138 - Returns: 139 - List of segment dicts, empty list on unexpected input. 133 + Raises RuntimeError if the result does not match the documented 134 + {"segments": [...]} wrapper shape. 140 135 """ 141 - # Unwrap: [{"segments": [...]}] — array-wrapped dict with a segments key. 142 - # Only unwrap if the inner dict has "segments", not if it's a segment itself. 143 - if ( 144 - isinstance(result, list) 145 - and len(result) == 1 146 - and isinstance(result[0], dict) 147 - and "segments" in result[0] 148 - ): 149 - result = result[0] 150 - 151 - # Bare list of segment dicts 152 - if isinstance(result, list): 153 - return result 154 - 155 - if isinstance(result, dict): 156 - # Preferred key 157 - if "segments" in result: 158 - val = result["segments"] 159 - if isinstance(val, list): 160 - return val 161 - 162 - # Fallback: single-key dict whose value is a list (e.g. {"transcript": [...]}) 163 - if len(result) == 1: 164 - val = next(iter(result.values())) 165 - if isinstance(val, list): 166 - logger.warning( 167 - f"Gemini used unexpected key {next(iter(result.keys()))!r} " 168 - f"instead of 'segments'" 169 - ) 170 - return val 171 - 172 - logger.warning(f"Gemini returned unexpected result shape: {type(result)}") 173 - return [] 136 + if isinstance(result, dict) and isinstance(result.get("segments"), list): 137 + return result["segments"] 138 + logger.warning( 139 + "Gemini returned unexpected shape: type=%s keys=%s", 140 + type(result).__name__, 141 + list(result.keys()) if isinstance(result, dict) else None, 142 + ) 143 + raise RuntimeError(f"Gemini returned unexpected shape: {type(result).__name__}") 174 144 175 145 176 146 def _build_chunk_contents( ··· 380 350 max_output_tokens=16384, 381 351 json_output=True, 382 352 thinking_budget=0, 353 + json_schema=_SCHEMA, 383 354 ) 384 355 385 356 transcribe_time = time.perf_counter() - t0 ··· 395 366 logger.debug(f"Response text: {response_text[:500]}") 396 367 raise RuntimeError(f"Gemini returned invalid JSON: {e}") from e 397 368 398 - # Extract segments — Gemini may return different shapes: 399 - # [...] bare list of segments 400 - # {"segments": [...]} expected wrapper 401 - # {"transcript": [...]} alternate key name 402 369 segments = _extract_segments(result) 403 370 404 371 # Normalize to standard statement format
+30
observe/transcribe/gemini.schema.json
··· 1 + { 2 + "$schema": "https://json-schema.org/draft/2020-12/schema", 3 + "type": "object", 4 + "additionalProperties": false, 5 + "required": ["segments"], 6 + "properties": { 7 + "segments": { 8 + "type": "array", 9 + "items": { 10 + "type": "object", 11 + "additionalProperties": false, 12 + "required": ["start", "speaker", "text"], 13 + "properties": { 14 + "start": { 15 + "type": "string", 16 + "pattern": "^\\d{2}:\\d{2}$" 17 + }, 18 + "speaker": { 19 + "type": "string", 20 + "minLength": 1 21 + }, 22 + "text": { 23 + "type": "string", 24 + "minLength": 1 25 + } 26 + } 27 + } 28 + } 29 + } 30 + }
+30 -22
tests/test_transcribe_gemini.py
··· 4 4 """Tests for the Gemini STT backend.""" 5 5 6 6 import numpy as np 7 + import pytest 7 8 8 9 from observe.transcribe.gemini import ( 9 10 _build_chunk_contents, ··· 259 260 260 261 261 262 class TestExtractSegments: 262 - """Tests for _extract_segments — robust response parsing.""" 263 + """Tests for _extract_segments strict wrapper parsing.""" 263 264 264 265 def test_expected_dict_wrapper(self): 265 266 """Standard {"segments": [...]} response.""" 266 267 segs = [{"start": "00:00", "speaker": "Speaker 1", "text": "Hi"}] 267 268 assert _extract_segments({"segments": segs}) == segs 268 269 269 - def test_bare_list(self): 270 - """Gemini returns bare list of segment dicts.""" 270 + def test_bare_list_raises(self): 271 + """Bare list is rejected.""" 271 272 segs = [{"start": "00:00", "speaker": "Speaker 1", "text": "Hi"}] 272 - assert _extract_segments(segs) == segs 273 + with pytest.raises(RuntimeError): 274 + _extract_segments(segs) 273 275 274 - def test_alternate_key(self): 275 - """Single-key dict with alternate key name.""" 276 + def test_alternate_key_raises(self): 277 + """Alternate wrapper key is rejected.""" 276 278 segs = [{"start": "00:00", "text": "Hi"}] 277 - assert _extract_segments({"transcript": segs}) == segs 279 + with pytest.raises(RuntimeError): 280 + _extract_segments({"transcript": segs}) 278 281 279 - def test_array_wrapped_dict(self): 280 - """Gemini wraps response in array: [{"segments": [...]}].""" 282 + def test_array_wrapped_dict_raises(self): 283 + """Array-wrapped dict is rejected.""" 281 284 segs = [{"start": "00:00", "speaker": "Speaker 1", "text": "Hi"}] 282 - assert _extract_segments([{"segments": segs}]) == segs 285 + with pytest.raises(RuntimeError): 286 + _extract_segments([{"segments": segs}]) 283 287 284 288 def test_empty_segments(self): 285 289 """Empty segments list in dict.""" 286 290 assert _extract_segments({"segments": []}) == [] 287 291 288 - def test_empty_bare_list(self): 289 - """Empty bare list.""" 290 - assert _extract_segments([]) == [] 292 + def test_empty_bare_list_raises(self): 293 + """Empty bare list is rejected.""" 294 + with pytest.raises(RuntimeError): 295 + _extract_segments([]) 291 296 292 - def test_non_list_segments_value(self): 293 - """segments key has non-list value.""" 294 - assert _extract_segments({"segments": "not a list"}) == [] 297 + def test_non_list_segments_value_raises(self): 298 + """Non-list segments value is rejected.""" 299 + with pytest.raises(RuntimeError): 300 + _extract_segments({"segments": "not a list"}) 295 301 296 - def test_unexpected_type(self): 297 - """Completely unexpected type returns empty.""" 298 - assert _extract_segments("unexpected") == [] 302 + def test_unexpected_type_raises(self): 303 + """Unexpected type is rejected.""" 304 + with pytest.raises(RuntimeError): 305 + _extract_segments("unexpected") 299 306 300 - def test_dict_with_no_segments_key(self): 301 - """Dict without segments or single-list key returns empty.""" 302 - assert _extract_segments({"other": "value", "more": "stuff"}) == [] 307 + def test_dict_with_no_segments_key_raises(self): 308 + """Dict without segments key is rejected.""" 309 + with pytest.raises(RuntimeError): 310 + _extract_segments({"other": 1}) 303 311 304 312 305 313 class TestGetModelInfo:
+118
tests/test_transcribe_gemini_schema.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import json 5 + from pathlib import Path 6 + from types import SimpleNamespace 7 + 8 + import numpy as np 9 + from jsonschema import Draft202012Validator 10 + 11 + import observe.transcribe.gemini as gemini_mod 12 + 13 + 14 + def _load_schema() -> dict: 15 + with ( 16 + Path(__file__).resolve().parents[1] 17 + / "observe" 18 + / "transcribe" 19 + / "gemini.schema.json" 20 + ).open(encoding="utf-8") as f: 21 + return json.load(f) 22 + 23 + 24 + def test_gemini_schema_file_is_valid_draft_2020_12(): 25 + Draft202012Validator.check_schema(_load_schema()) 26 + 27 + 28 + def test_gemini_schema_accepts_and_rejects_expected_values(): 29 + validator = Draft202012Validator(_load_schema()) 30 + 31 + assert validator.is_valid({"segments": []}) 32 + assert validator.is_valid( 33 + {"segments": [{"start": "01:23", "speaker": "Speaker 1", "text": "hi"}]} 34 + ) 35 + assert validator.is_valid( 36 + { 37 + "segments": [ 38 + {"start": "00:00", "speaker": "Speaker 1", "text": "hello"}, 39 + {"start": "00:05", "speaker": "Speaker 2", "text": "hi back"}, 40 + ] 41 + } 42 + ) 43 + assert not validator.is_valid( 44 + [{"start": "01:23", "speaker": "Speaker 1", "text": "hi"}] 45 + ) 46 + assert not validator.is_valid( 47 + {"transcript": [{"start": "01:23", "speaker": "Speaker 1", "text": "hi"}]} 48 + ) 49 + assert not validator.is_valid({"segments": [], "extra": 1}) 50 + assert not validator.is_valid( 51 + {"segments": [{"speaker": "Speaker 1", "text": "hi"}]} 52 + ) 53 + assert not validator.is_valid({"segments": [{"start": "01:23", "text": "hi"}]}) 54 + assert not validator.is_valid( 55 + {"segments": [{"start": "01:23", "speaker": "Speaker 1"}]} 56 + ) 57 + assert not validator.is_valid( 58 + { 59 + "segments": [ 60 + { 61 + "start": "01:23", 62 + "speaker": "s", 63 + "text": "t", 64 + "confidence": 0.9, 65 + } 66 + ] 67 + } 68 + ) 69 + assert not validator.is_valid( 70 + {"segments": [{"start": "01:23", "speaker": "Speaker 1", "text": ""}]} 71 + ) 72 + assert not validator.is_valid( 73 + {"segments": [{"start": "01:23", "speaker": "", "text": "hi"}]} 74 + ) 75 + assert not validator.is_valid( 76 + {"segments": [{"start": "1:23", "speaker": "Speaker 1", "text": "hi"}]} 77 + ) 78 + assert not validator.is_valid( 79 + {"segments": [{"start": "01:23:45", "speaker": "Speaker 1", "text": "hi"}]} 80 + ) 81 + assert not validator.is_valid( 82 + {"segments": [{"start": "01-23", "speaker": "Speaker 1", "text": "hi"}]} 83 + ) 84 + assert not validator.is_valid( 85 + {"segments": [{"start": 83, "speaker": "Speaker 1", "text": "hi"}]} 86 + ) 87 + 88 + 89 + def test_transcribe_passes_schema_to_generate(monkeypatch): 90 + captured = {} 91 + 92 + def fake_generate(**kwargs): 93 + captured.update(kwargs) 94 + return json.dumps( 95 + {"segments": [{"start": "00:00", "speaker": "Speaker 1", "text": "hello"}]} 96 + ) 97 + 98 + monkeypatch.setattr(gemini_mod, "generate", fake_generate) 99 + monkeypatch.setattr(gemini_mod, "audio_to_flac_bytes", lambda *_args: b"flac") 100 + monkeypatch.setattr( 101 + gemini_mod.types.Part, 102 + "from_bytes", 103 + staticmethod(lambda data, mime_type: {"data": data, "mime_type": mime_type}), 104 + ) 105 + monkeypatch.setattr( 106 + gemini_mod, 107 + "load_prompt", 108 + lambda *_args, **_kwargs: SimpleNamespace(text="Prompt"), 109 + ) 110 + 111 + gemini_mod.transcribe( 112 + np.zeros(16000, dtype=np.float32), 113 + 16000, 114 + {}, 115 + [(0.0, 1.0)], 116 + ) 117 + 118 + assert captured["json_schema"] is gemini_mod._SCHEMA