personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

think/detect_transcript: schema-constrain segment + json calls; restructure json wrapper

Add Draft 2020-12 schemas for transcript segmentation and transcript JSON normalization, with `$comment` pointers back to `think/detect_transcript_segment.md` and `think/detect_transcript_json.md`. Load `_SEGMENT_SCHEMA` and `_JSON_SCHEMA` at module import time in `think/detect_transcript.py`, and pass `json_schema=` through both `generate()` call sites so advisory validation in `think.models.generate` engages on the next real provider run.

Rewrite the `think/detect_transcript_json.md` JSON contract from a heterogeneous array of transcript rows plus a trailing metadata dict to a wrapper object `{entries, topics, setting}`. Update `detect_transcript_json()` to return `Optional[dict]` for that wrapper, and migrate `think/importers/text.py:process_transcript` to read `entries`/`topics`/`setting` directly while deleting the old trailing-dict scan/pop branch entirely; empty-string topics and setting still coerce to `None` before `write_segment()`.

Tests now cover the new contract and wiring behavior:
- add `tests/test_detect_transcript_schema.py` mirroring the `detect_created` schema pattern with schema validity, accept/reject matrices, and `json_schema` wiring assertions for both call sites
- update `tests/test_importer.py::test_importer_pdf` and `tests/test_importer.py::test_importer_text` mocks to return the wrapper shape

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+233 -38
+136
tests/test_detect_transcript_schema.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import importlib 5 + import json 6 + from pathlib import Path 7 + 8 + from jsonschema import Draft202012Validator 9 + 10 + import think.models as models 11 + 12 + mod = importlib.import_module("think.detect_transcript") 13 + 14 + DETECT_TRANSCRIPT_SEGMENT_SCHEMA_PATH = ( 15 + Path(__file__).resolve().parents[1] 16 + / "think" 17 + / "detect_transcript_segment.schema.json" 18 + ) 19 + DETECT_TRANSCRIPT_JSON_SCHEMA_PATH = ( 20 + Path(__file__).resolve().parents[1] / "think" / "detect_transcript_json.schema.json" 21 + ) 22 + 23 + 24 + def _load_detect_transcript_segment_schema() -> dict: 25 + return json.loads(DETECT_TRANSCRIPT_SEGMENT_SCHEMA_PATH.read_text(encoding="utf-8")) 26 + 27 + 28 + def _load_detect_transcript_json_schema() -> dict: 29 + return json.loads(DETECT_TRANSCRIPT_JSON_SCHEMA_PATH.read_text(encoding="utf-8")) 30 + 31 + 32 + def test_detect_transcript_segment_schema_file_is_valid_draft_2020_12(): 33 + Draft202012Validator.check_schema(_load_detect_transcript_segment_schema()) 34 + 35 + 36 + def test_detect_transcript_json_schema_file_is_valid_draft_2020_12(): 37 + Draft202012Validator.check_schema(_load_detect_transcript_json_schema()) 38 + 39 + 40 + def test_detect_transcript_segment_schema_accepts_and_rejects_expected_values(): 41 + schema = _load_detect_transcript_segment_schema() 42 + validator = Draft202012Validator(schema) 43 + valid = [{"start_at": "12:34:56", "line": 1}] 44 + 45 + assert validator.is_valid(valid) 46 + assert not validator.is_valid([{"start_at": "12:34:56"}]) 47 + assert not validator.is_valid([{"start_at": "12:34", "line": 1}]) 48 + assert not validator.is_valid([{"start_at": "12:34:56", "line": "1"}]) 49 + assert not validator.is_valid([{"start_at": "12:34:56", "line": 0}]) 50 + assert not validator.is_valid([{"start_at": "12:34:56", "line": 1, "extra": "x"}]) 51 + 52 + 53 + def test_detect_transcript_json_schema_accepts_and_rejects_expected_values(): 54 + schema = _load_detect_transcript_json_schema() 55 + validator = Draft202012Validator(schema) 56 + valid = { 57 + "entries": [{"start": "12:34:56", "speaker": "Alice", "text": "Hello"}], 58 + "topics": "planning, budget", 59 + "setting": "workplace", 60 + } 61 + 62 + assert validator.is_valid(valid) 63 + assert validator.is_valid({**valid, "topics": "", "setting": ""}) 64 + assert not validator.is_valid( 65 + {"topics": "planning, budget", "setting": "workplace"} 66 + ) 67 + assert not validator.is_valid( 68 + { 69 + **valid, 70 + "entries": [{"start": "12:34", "speaker": "Alice", "text": "Hello"}], 71 + } 72 + ) 73 + assert not validator.is_valid( 74 + { 75 + **valid, 76 + "entries": [{"start": "12:34:56", "speaker": 1, "text": "Hello"}], 77 + } 78 + ) 79 + assert not validator.is_valid( 80 + { 81 + **valid, 82 + "entries": [{"start": "12:34:56", "speaker": "Alice", "text": ""}], 83 + } 84 + ) 85 + assert not validator.is_valid({**valid, "extra": "x"}) 86 + assert not validator.is_valid( 87 + { 88 + **valid, 89 + "entries": [ 90 + { 91 + "start": "12:34:56", 92 + "speaker": "Alice", 93 + "text": "Hello", 94 + "extra": "x", 95 + } 96 + ], 97 + } 98 + ) 99 + 100 + 101 + def test_detect_transcript_segment_passes_schema_to_generate(monkeypatch): 102 + captured = {} 103 + 104 + def fake_generate(**kwargs): 105 + captured.update(kwargs) 106 + return '[{"start_at": "12:00:00", "line": 1}]' 107 + 108 + monkeypatch.setattr(models, "generate", fake_generate) 109 + 110 + result = mod.detect_transcript_segment("01\n02\n", "12:00:00") 111 + 112 + assert captured["json_schema"] is mod._SEGMENT_SCHEMA 113 + assert result 114 + assert all(isinstance(item, tuple) and len(item) == 2 for item in result) 115 + 116 + 117 + def test_detect_transcript_json_passes_schema_to_generate(monkeypatch): 118 + captured = {} 119 + 120 + def fake_generate(**kwargs): 121 + captured.update(kwargs) 122 + return ( 123 + '{"entries": [{"start": "12:00:00", "speaker": "Alice", "text": "Hello"}], ' 124 + '"topics": "planning", "setting": "workplace"}' 125 + ) 126 + 127 + monkeypatch.setattr(models, "generate", fake_generate) 128 + 129 + result = mod.detect_transcript_json("some text", "12:00:00") 130 + 131 + assert captured["json_schema"] is mod._JSON_SCHEMA 132 + assert result == { 133 + "entries": [{"start": "12:00:00", "speaker": "Alice", "text": "Hello"}], 134 + "topics": "planning", 135 + "setting": "workplace", 136 + }
+17 -6
tests/test_importer.py
··· 106 106 107 107 # Mock JSON conversion: returns entries with absolute timestamps 108 108 def mock_detect_json(text, segment_start): 109 - return [{"start": segment_start, "speaker": "Unknown", "text": text}] 109 + return { 110 + "entries": [{"start": segment_start, "speaker": "Unknown", "text": text}], 111 + "topics": "", 112 + "setting": "", 113 + } 110 114 111 115 monkeypatch.setattr(text_mod, "detect_transcript_json", mock_detect_json) 112 116 ··· 193 197 194 198 # Mock JSON conversion 195 199 def mock_detect_json(text, segment_start): 196 - return [ 197 - {"start": segment_start, "speaker": "Jack", "text": "Board meeting notes"}, 198 - {"start": "16:30:30", "speaker": "Ramon", "text": "Action items"}, 199 - {"topics": "board meeting, action items", "setting": "workplace"}, 200 - ] 200 + return { 201 + "entries": [ 202 + { 203 + "start": segment_start, 204 + "speaker": "Jack", 205 + "text": "Board meeting notes", 206 + }, 207 + {"start": "16:30:30", "speaker": "Ramon", "text": "Action items"}, 208 + ], 209 + "topics": "board meeting, action items", 210 + "setting": "workplace", 211 + } 201 212 202 213 monkeypatch.setattr(text_mod, "detect_transcript_json", mock_detect_json) 203 214
+15 -2
think/detect_transcript.py
··· 12 12 13 13 from .prompts import load_prompt 14 14 15 + _SEGMENT_SCHEMA = json.loads( 16 + (Path(__file__).parent / "detect_transcript_segment.schema.json").read_text( 17 + encoding="utf-8" 18 + ) 19 + ) 20 + _JSON_SCHEMA = json.loads( 21 + (Path(__file__).parent / "detect_transcript_json.schema.json").read_text( 22 + encoding="utf-8" 23 + ) 24 + ) 25 + 15 26 16 27 def _load_json_prompt() -> str: 17 28 """Load the JSON system prompt.""" ··· 145 156 thinking_budget=8192, 146 157 system_instruction=_load_segment_prompt(), 147 158 json_output=True, 159 + json_schema=_SEGMENT_SCHEMA, 148 160 ) 149 161 150 162 logging.info(f"Received segmentation response: {response_text}") ··· 157 169 return [] 158 170 159 171 160 - def detect_transcript_json(text: str, segment_start: str) -> Optional[list]: 172 + def detect_transcript_json(text: str, segment_start: str) -> Optional[dict]: 161 173 """Return transcript ``text`` converted to JSON using LLM analysis. 162 174 163 175 Args: ··· 165 177 segment_start: Absolute start time of this segment in HH:MM:SS format 166 178 167 179 Returns: 168 - List of transcript entries with absolute timestamps 180 + Wrapper dict with ``entries``, ``topics``, and ``setting`` keys 169 181 """ 170 182 logging.info( 171 183 f"Starting transcript JSON conversion (segment_start: {segment_start})..." ··· 184 196 thinking_budget=8192, 185 197 system_instruction=_load_json_prompt(), 186 198 json_output=True, 199 + json_schema=_JSON_SCHEMA, 187 200 ) 188 201 189 202 logging.info(f"Received JSON conversion response: {response_text[:100]}")
+17 -11
think/detect_transcript_json.md
··· 16 16 3. Preserve chronological order of the conversation 17 17 4. Extract key topics and determine the conversational setting 18 18 5. Return ONLY valid JSON - no explanations or additional text 19 + 6. Return a JSON object with exactly these top-level keys: entries, topics, and setting. 19 20 20 21 ## JSON Format Requirements: 21 22 ```json 22 - [ 23 - {"start": "HH:MM:SS", "speaker": "<speaker_name>", "text": "<complete_statement>"}, 24 - {"start": "HH:MM:SS", "speaker": "<speaker_name>", "text": "<next_statement>"}, 25 - ..., 26 - {"topics": "<topic1>, <topic2>, <topic3>", "setting": "<context_type>"} 27 - ] 23 + { 24 + "entries": [ 25 + {"start": "HH:MM:SS", "speaker": "<speaker_name>", "text": "<complete_statement>"}, 26 + {"start": "HH:MM:SS", "speaker": "<speaker_name>", "text": "<next_statement>"} 27 + ], 28 + "topics": "<topic1>, <topic2>, <topic3>", 29 + "setting": "<context_type>" 30 + } 28 31 ``` 29 32 30 33 ## Timestamp Rules: ··· 45 48 46 49 ## Example (SEGMENT_START: 14:30:00): 47 50 ```json 48 - [ 49 - {"start": "14:30:00", "speaker": "Alice", "text": "Welcome everyone to today's meeting."}, 50 - {"start": "14:30:15", "speaker": "Bob", "text": "Thanks Alice. Let's review our sales."}, 51 - {"topics": "quarterly results, sales performance", "setting": "workplace"} 52 - ] 51 + { 52 + "entries": [ 53 + {"start": "14:30:00", "speaker": "Alice", "text": "Welcome everyone to today's meeting."}, 54 + {"start": "14:30:15", "speaker": "Bob", "text": "Thanks Alice. Let's review our sales."} 55 + ], 56 + "topics": "quarterly results, sales performance", 57 + "setting": "workplace" 58 + } 53 59 ```
+24
think/detect_transcript_json.schema.json
··· 1 + { 2 + "$schema": "https://json-schema.org/draft/2020-12/schema", 3 + "$comment": "Output contract for detect_transcript_json(). Source of truth is think/detect_transcript_json.md.", 4 + "type": "object", 5 + "additionalProperties": false, 6 + "required": ["entries", "topics", "setting"], 7 + "properties": { 8 + "entries": { 9 + "type": "array", 10 + "items": { 11 + "type": "object", 12 + "additionalProperties": false, 13 + "required": ["start", "speaker", "text"], 14 + "properties": { 15 + "start": {"type": "string", "pattern": "^\\d{2}:\\d{2}:\\d{2}$"}, 16 + "speaker": {"type": "string", "minLength": 1}, 17 + "text": {"type": "string", "minLength": 1} 18 + } 19 + } 20 + }, 21 + "topics": {"type": "string"}, 22 + "setting": {"type": "string"} 23 + } 24 + }
+14
think/detect_transcript_segment.schema.json
··· 1 + { 2 + "$schema": "https://json-schema.org/draft/2020-12/schema", 3 + "$comment": "Output contract for detect_transcript_segment(). Source of truth is think/detect_transcript_segment.md.", 4 + "type": "array", 5 + "items": { 6 + "type": "object", 7 + "additionalProperties": false, 8 + "required": ["start_at", "line"], 9 + "properties": { 10 + "start_at": {"type": "string", "pattern": "^\\d{2}:\\d{2}:\\d{2}$"}, 11 + "line": {"type": "integer", "minimum": 1} 12 + } 13 + } 14 + }
+1 -1
think/importers/shared.py
··· 79 79 # Write JSONL: metadata first, then entries with source field 80 80 jsonl_lines = [json.dumps(metadata)] 81 81 for entry in entries: 82 - # Add source field if not already present (skip metadata entries like topics/setting) 82 + # Add source to transcript rows when it is not already present. 83 83 if "text" in entry and "source" not in entry: 84 84 entry = {**entry, "source": "import"} 85 85 jsonl_lines.append(json.dumps(entry))
+9 -18
think/importers/text.py
··· 77 77 78 78 for idx, (start_at, seg_text) in enumerate(segments): 79 79 # Convert segment text to structured JSON with absolute timestamps 80 - json_data = detect_transcript_json(seg_text, start_at) 81 - if not json_data: 80 + wrapper = detect_transcript_json(seg_text, start_at) 81 + if not wrapper: 82 82 continue 83 83 84 - # Extract topics/setting from last entry (LLM appends it without a start field) 85 - topics = None 86 - detected_setting = None 87 - if ( 88 - json_data 89 - and isinstance(json_data[-1], dict) 90 - and "start" not in json_data[-1] 91 - and ("topics" in json_data[-1] or "setting" in json_data[-1]) 92 - ): 93 - meta_entry = json_data.pop() 94 - topics = meta_entry.get("topics") 95 - detected_setting = meta_entry.get("setting") 84 + entries = wrapper["entries"] 85 + topics_str = wrapper.get("topics") or None 86 + setting_str = wrapper.get("setting") or None 96 87 97 88 # Convert absolute timestamps to relative offsets from segment start 98 89 # (format_audio treats start as offset from the segment base time) 99 90 seg_start_seconds = _time_to_seconds(start_at) 100 - for entry in json_data: 91 + for entry in entries: 101 92 if "start" in entry: 102 93 try: 103 94 entry_seconds = _time_to_seconds(entry["start"]) ··· 143 134 day_dir, 144 135 stream, 145 136 segment_name, 146 - json_data, 137 + entries, 147 138 import_id=import_id, 148 139 raw_filename=os.path.basename(path), 149 140 facet=facet, 150 141 setting=setting, 151 - topics=topics, 152 - detected_setting=detected_setting, 142 + topics=topics_str, 143 + detected_setting=setting_str, 153 144 ) 154 145 logger.info(f"Added transcript segment to journal: {json_path}") 155 146 created_files.append(json_path)