personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

voice: wire observer.start_listening to action queue + /api/voice/observer-actions

Promotes the Wave-2a observer.start_listening stub to an action-emit
pattern. handle_observer_start_listening now returns the public
{status,mode,note} shape plus an internal _observer_action sentinel that
dispatch_tool_call strips and enqueues onto a per-call_id
ObserverActionQueue (TTL 60s, cap 8, FIFO drop, drain-clears). New route
GET /api/voice/observer-actions drains the queue; lenient on
missing/blank/unknown call_id (always 200 empty) -- divergent by design
from nav-hints. No changes to the observer ingest pipeline; existing
/app/observer/ingest already accepts the planned iOS multipart shape
(compatibility verdict captured in docs/design/observer-actions.md).

+681 -9
+10
convey/voice.py
··· 17 17 from think.voice import brain 18 18 from think.voice.config import get_openai_api_key, get_voice_model 19 19 from think.voice.nav_queue import get_nav_queue 20 + from think.voice.observer_queue import get_observer_queue 20 21 from think.voice.runtime import get_runtime_state 21 22 from think.voice.sideband import _run_sideband, register_voice_task 22 23 from think.voice.tools import get_tool_manifest ··· 164 165 return _error("call_id is required", 400) 165 166 hints = get_nav_queue().drain(call_id) 166 167 return jsonify({"hints": hints, "consumed": True}) 168 + 169 + 170 + @voice_bp.get("/observer-actions") 171 + def observer_actions(): 172 + call_id = request.args.get("call_id", "").strip() 173 + if not call_id: 174 + return jsonify({"actions": [], "consumed": True}) 175 + actions = get_observer_queue().drain(call_id) 176 + return jsonify({"actions": actions, "consumed": True}) 167 177 168 178 169 179 @voice_bp.get("/status")
+333
docs/design/observer-actions.md
··· 1 + # Wave 4 observer actions 2 + 3 + ## 1. Summary 4 + 5 + Wave 4 adds a second per-session voice side-effect queue beside nav hints: observer actions. The new queue is purpose-built for structured action payloads, not string hints, and it extends the established Wave 2 tool-dispatch pattern rather than refactoring it. `observer.start_listening` stops being a pure stub and starts returning an internal `_observer_action` sentinel; `dispatch_tool_call(...)` strips that sentinel from the model-facing JSON, enqueues a structured action under the voice session `call_id`, and leaves the existing sideband/runtime flow unchanged. A new root voice route, `GET /api/voice/observer-actions`, lets the iOS client poll for queued actions and drain them with intentionally lenient semantics: missing, blank, or unknown `call_id` returns HTTP 200 with `{"actions": [], "consumed": true}` so polling can stay robust on a simple cadence even if the client temporarily loses the echoed call id (`think/voice/tools.py:686-739`, `convey/voice.py:160-166`, `think/voice/sideband.py:20-37`, `think/voice/nav_queue.py:17-84`). 6 + 7 + ## 2. Module layout 8 + 9 + New files: 10 + 11 + - `think/voice/observer_queue.py` — thread-safe per-`call_id` FIFO for observer action payloads. Mirrors `think/voice/nav_queue.py` mechanically, but stores dict payloads instead of strings, owns its own TTL/capacity constants, and exports a module-level singleton accessor. 12 + - `tests/test_voice_observer_queue.py` — unit coverage for TTL expiry, FIFO capacity, drain-clears semantics, malformed enqueue rejection, and basic thread-safety, following the pattern in `tests/test_voice_nav_queue.py`. 13 + 14 + Existing files changed: 15 + 16 + - `think/voice/tools.py` — keep the `observer.start_listening` manifest entry and valid modes, but change `handle_observer_start_listening(...)` from stub-ack to sentinel-emitting return data; extend `dispatch_tool_call(...)` so it strips `_observer_action` and enqueues the structured payload keyed by the session `call_id` it already receives (`think/voice/tools.py:121-135`, `think/voice/tools.py:686-739`). 17 + - `convey/voice.py` — add `GET /api/voice/observer-actions` beside `GET /api/voice/nav-hints`, with intentionally lenient `call_id` semantics and drain-on-read behavior (`convey/voice.py:26-30`, `convey/voice.py:160-166`). 18 + - `tests/test_voice_routes.py` — add route coverage for the new endpoint. 19 + - `tests/test_voice_tools.py` — update the observer handler happy-path assertion and add dispatch-enqueue coverage parallel to the existing nav-target stripping test. 20 + - `tests/test_voice_integration.py` — add a fake-Realtime round trip that drives `observer.start_listening`, verifies the stripped public tool output, and drains the observer action queue through the new route. 21 + 22 + Deliberate non-changes: 23 + 24 + - No `think/voice/nav_queue.py` refactor into a shared generic base. The payload type, validation semantics, and current scale do not justify generification. 25 + - No `think/voice/sideband.py` or `think/voice/runtime.py` changes. The existing sideband loop already routes every tool call through `dispatch_tool_call(...)`, which remains the single place where per-session queue side effects happen (`think/voice/sideband.py:20-37`). 26 + - No `apps/observer/routes.py` changes. The existing ingest endpoint is already compatible with the planned iOS multipart upload shape; this design only documents that compatibility and the filename-stem caveat (`apps/observer/routes.py:503-643`). 27 + 28 + Public surface for the new queue module: 29 + 30 + ```python 31 + @dataclass(frozen=True) 32 + class QueuedAction: 33 + payload: dict[str, Any] 34 + created_at: float 35 + 36 + 37 + class ObserverActionQueue: 38 + def __init__( 39 + self, 40 + *, 41 + ttl_seconds: int = 60, 42 + capacity: int = 8, 43 + ) -> None: ... 44 + 45 + def push( 46 + self, 47 + call_id: str, 48 + action: dict[str, Any], 49 + *, 50 + now: float | None = None, 51 + ) -> None: ... 52 + 53 + def drain( 54 + self, 55 + call_id: str, 56 + *, 57 + now: float | None = None, 58 + ) -> list[dict[str, Any]]: ... 59 + 60 + def clear(self) -> None: ... 61 + 62 + 63 + def get_observer_queue() -> ObserverActionQueue: ... 64 + 65 + 66 + @voice_bp.get("/observer-actions") 67 + def observer_actions(): 68 + ... 69 + ``` 70 + 71 + ## 3. Flow diagram 72 + 73 + ```text 74 + OpenAI Realtime tool call 75 + -> think.voice.sideband._sideband_loop(...) 76 + -> think.voice.tools.dispatch_tool_call(name, arguments, call_id, app) 77 + -> handle_observer_start_listening(payload, app) 78 + -> handler returns: 79 + { 80 + "status": "requested", 81 + "mode": "meeting" | "voice_memo", 82 + "note": "sol will start listening shortly", 83 + "_observer_action": { 84 + "type": "start_observer", 85 + "mode": ... 86 + } 87 + } 88 + -> dispatch_tool_call(...) strips "_observer_action" 89 + -> dispatch_tool_call(...) enqueues action in ObserverActionQueue under session call_id 90 + -> stripped JSON is posted back to Realtime as function_call_output 91 + 92 + iOS client 93 + -> observes tool-call completion on the data channel 94 + -> polls GET /api/voice/observer-actions?call_id=<session-call-id> 95 + -> route drains queued actions for that call_id 96 + -> route returns {"actions": [...], "consumed": true} 97 + -> client applies {"type": "start_observer", "mode": "..."} 98 + -> client starts observer capture and uploads multipart media to /app/observer/ingest 99 + ``` 100 + 101 + ## 4. Endpoint spec 102 + 103 + ### `GET /api/voice/observer-actions` 104 + 105 + Request: 106 + 107 + - Method: `GET` 108 + - Path: `/api/voice/observer-actions` 109 + - Query params: 110 + - `call_id: string` — optional in practice. The route trims it if present, but blank, missing, or unknown values all resolve to an empty successful response. 111 + 112 + Success response: 113 + 114 + - HTTP 200 115 + - Body: `{"actions": [...], "consumed": true}` 116 + 117 + Response behavior: 118 + 119 + - Missing `call_id` returns `{"actions": [], "consumed": true}` with HTTP 200. 120 + - Blank `call_id` returns `{"actions": [], "consumed": true}` with HTTP 200. 121 + - Unknown `call_id` returns `{"actions": [], "consumed": true}` with HTTP 200. 122 + - Empty queue for a known `call_id` returns `{"actions": [], "consumed": true}` with HTTP 200. 123 + - Non-empty queue returns FIFO-ordered actions, drains the queue entry, and still reports `consumed: true`. 124 + 125 + Side effects: 126 + 127 + - Drains and clears the queue for the provided `call_id`. 128 + - Drops expired actions before returning. 129 + 130 + Intentional divergence from nav hints: 131 + 132 + - `GET /api/voice/nav-hints` currently rejects missing `call_id` with HTTP 400 (`convey/voice.py:160-166`). 133 + - `GET /api/voice/observer-actions` is intentionally lenient and always returns HTTP 200 with an array payload, because the iOS polling loop is simple cadence-based infrastructure, not in-web-session UI state. Robust empty success responses are a better failure mode than hard 400s when the client temporarily misses or delays the echoed `call_id`. 134 + 135 + ## 5. Action shape and handler return shape 136 + 137 + Internal handler return shape from `handle_observer_start_listening(...)`: 138 + 139 + ```json 140 + { 141 + "status": "requested", 142 + "mode": "meeting", 143 + "note": "sol will start listening shortly", 144 + "_observer_action": { 145 + "type": "start_observer", 146 + "mode": "meeting" 147 + } 148 + } 149 + ``` 150 + 151 + Public tool output after `dispatch_tool_call(...)` strips the sentinel: 152 + 153 + ```json 154 + { 155 + "status": "requested", 156 + "mode": "meeting", 157 + "note": "sol will start listening shortly" 158 + } 159 + ``` 160 + 161 + Queued observer action payload: 162 + 163 + ```json 164 + { 165 + "type": "start_observer", 166 + "mode": "meeting" 167 + } 168 + ``` 169 + 170 + Rules: 171 + 172 + - Valid modes remain `meeting` and `voice_memo`, matching the existing manifest contract (`think/voice/tools.py:121-135`). 173 + - `dispatch_tool_call(...)` remains the only place that turns an internal handler sentinel into a per-session side effect, matching the existing `_nav_target` pattern (`think/voice/tools.py:736-739`). 174 + - Handler signature stays uniform: `(payload, app) -> dict[str, Any]`. 175 + 176 + ## 6. Queue semantics 177 + 178 + `ObserverActionQueue` intentionally matches the proven shape of `NavHintQueue` while keeping its own module and payload typing (`think/voice/nav_queue.py:21-84`): 179 + 180 + - Per-`call_id` queue stored in a lock-protected `defaultdict(deque)`. 181 + - TTL: 60 seconds. 182 + - Capacity: 8 actions per `call_id`. 183 + - FIFO ordering. 184 + - Oldest item dropped on overflow. 185 + - Expired items dropped on both `push(...)` and `drain(...)`. 186 + - `drain(...)` returns all currently valid actions in order and clears the queue entry. 187 + - `clear()` wipes all queue state for tests and explicit cleanup. 188 + - Blank `call_id` is rejected as a no-op. 189 + - Blank action payload is rejected as a no-op. 190 + - Rejected enqueue attempts should log a warning so malformed producers are diagnosable without surfacing an HTTP failure. 191 + 192 + Validation scope: 193 + 194 + - The queue owns only generic enqueue sanity: blank `call_id` and empty payload rejection. 195 + - The producer (`handle_observer_start_listening(...)` plus `dispatch_tool_call(...)`) owns the concrete Wave 4 action contract: `{"type": "start_observer", "mode": "meeting"|"voice_memo"}`. 196 + 197 + ## 7. Polling cadence guidance 198 + 199 + Observer-actions polling should mirror nav-hints polling: 200 + 201 + - The client should begin or continue polling after each tool-call event it observes on the Realtime data channel. 202 + - Default poll interval: 500ms. 203 + - The client should stop polling when the voice session ends. 204 + - Because the route is lenient, empty 200 responses are part of the normal cadence rather than a client error condition. 205 + - The client should use the same session `call_id` it passed to `POST /api/voice/connect`, because that is the key `dispatch_tool_call(...)` uses for queueing. The route is not keyed by the per-tool `event.call_id` echoed back in the `function_call_output` envelope (`convey/voice.py:119-127`, `think/voice/sideband.py:24-35`). 206 + 207 + ## 8. Failure semantics 208 + 209 + - Tool JSON decode errors, unknown tools, and handler exceptions remain inside the existing `dispatch_tool_call(...)` wrapper and return the existing inline error payloads; no observer action is enqueued in those cases (`think/voice/tools.py:720-739`). 210 + - Invalid listen modes still return `{"error": "invalid mode"}` and do not enqueue an action (`think/voice/tools.py:690-698`). 211 + - Queue overflow drops the oldest queued action for that `call_id`, logs a warning, and keeps processing the new action. 212 + - TTL expiry drops stale queued actions without turning them into route or tool failures. 213 + - Blank enqueue inputs are no-ops with warning logs, not raised exceptions. 214 + - The route itself has no endpoint-specific 4xx failure path; all missing, blank, unknown, or empty queue states collapse to HTTP 200 with an empty `actions` array. 215 + - The queue is in-memory only, matching nav hints. Process restart or runtime teardown drops queued observer actions; that is acceptable for this adjunct path because the client polls immediately and the action is cheap to reissue on a later tool turn. 216 + 217 + ## 9. Observer-ingest compatibility note 218 + 219 + Compatibility verdict: 220 + 221 + - No ingest code changes are required for Wave 4. 222 + - The existing observer ingest surfaces already accept the planned iOS multipart upload shape: 223 + - Primary path: `POST /app/observer/ingest` with `Authorization: Bearer <key>` 224 + - Legacy fallback: `POST /app/observer/ingest/<key>` 225 + - Required fields: `segment=HHMMSS_LEN`, `day=YYYYMMDD`, and one or more `files`. 226 + - Optional fields: `host`, `platform`, and `meta` (JSON object encoded as a form field). 227 + - The ingest layer is binary-format-agnostic: it reads uploaded file bytes, sanitizes filenames, skips empty uploads, and writes the files to disk, but does not validate MIME type, codec, sample rate, or channel layout (`apps/observer/routes.py:309-367`, `apps/observer/routes.py:503-585`). 228 + 229 + Stream resolution: 230 + 231 + - If `meta["stream"]` is present and matches the stream-name regex, the server trusts it. 232 + - Otherwise the server derives the stream from the observer registration name via `stream_name(observer=observer_name)`. 233 + - `meta.stream` is therefore optional. The client only needs to send it when it wants a qualified stream name that should not be normalized from the observer registration name (`apps/observer/routes.py:589-600`). 234 + - The existing regression coverage for qualified stream preservation is in `apps/observer/tests/test_routes.py:1654-1694`. 235 + 236 + Host / platform semantics: 237 + 238 + - `host` and `platform` are metadata only. 239 + - If both form fields and `meta` contain those values, the `meta` dict wins. 240 + - Hostname mismatch against the registered observer name logs a warning but does not reject the upload (`apps/observer/routes.py:547-567`, `apps/observer/routes.py:627-643`). 241 + - The client should send a deterministic `host` so observer records remain coherent across sessions. 242 + 243 + Filename-stem constraint: 244 + 245 + - The ingest layer does not enforce a filename stem, but downstream consumers do care. 246 + - The client should upload `audio.m4a`, not an arbitrary stem like `meeting.m4a`, so transcription writes `audio.jsonl` and existing readers keep finding the transcript under their current glob conventions. 247 + - The transcribe pipeline writes the transcript as a sibling JSONL using the raw file stem, so `audio.m4a` becomes `audio.jsonl` (`observe/transcribe/main.py:165`, `observe/transcribe/main.py:594-609`). 248 + - Transcript/cluster consumers currently glob for `*audio.jsonl`, `audio.jsonl`, or `*_audio.jsonl`, not arbitrary stems (`think/cluster.py:132-136`, `think/cluster.py:397-405`, `apps/transcripts/routes.py:256-258`, `think/retention.py:73-100`). 249 + - `.m4a` itself is supported end-to-end: the media registry includes it, the sensor registers all `AUDIO_EXTENSIONS`, and transcribe accepts `.m4a` as a supported raw input (`media.py:6-25`, `observe/sense.py:606-667`, `observe/sense.py:1096-1100`, `observe/transcribe/main.py:58`, `observe/transcribe/main.py:885-889`). 250 + 251 + Operational caveats: 252 + 253 + - Duplicate submissions short-circuit with `status="duplicate"` and do not emit `observe.observing` (`apps/observer/routes.py:369-395`, `apps/observer/routes.py:604-605`, `apps/observer/tests/test_routes.py:1276-1368`). 254 + - Segment collisions can rewrite the final segment key; the server returns the adjusted `segment` in the response body and records `segment_original` in history (`apps/observer/routes.py:406-425`, `apps/observer/routes.py:470-500`, `apps/observer/tests/test_routes.py:1552-1586`). 255 + - If the client ever needs server-truth correlation after upload, it should trust the response body’s `segment`, not assume the requested key survived unchanged. 256 + 257 + ## 10. Tests 258 + 259 + New file: `tests/test_voice_observer_queue.py` 260 + 261 + - Unknown `call_id` drains to `[]`. 262 + - `drain(...)` clears the queue entry. 263 + - Expired actions are dropped on drain. 264 + - Capacity enforces FIFO oldest-drop behavior. 265 + - Blank `call_id` and blank payload enqueue attempts are ignored. 266 + - Basic thread-safety sanity for concurrent `push(...)` followed by `drain(...)`. 267 + 268 + Route updates: `tests/test_voice_routes.py` 269 + 270 + - Missing `call_id` returns HTTP 200 with `{"actions": [], "consumed": true}`. 271 + - Unknown `call_id` returns the same empty successful shape. 272 + - Pre-populated queue drains actions through `GET /api/voice/observer-actions`. 273 + - A second drain on the same `call_id` returns an empty successful shape. 274 + 275 + Tool updates: `tests/test_voice_tools.py` 276 + 277 + - `test_observer_start_listening_happy` changes from stub ack to sentinel-bearing internal handler return shape. 278 + - `test_observer_start_listening_failure` stays functionally the same unless valid modes change. 279 + - Add a parallel dispatch test, for example `test_dispatch_tool_call_strips_observer_action`, that asserts: 280 + - the returned JSON omits `_observer_action` 281 + - the public payload is `{"status": "requested", "mode": ..., "note": ...}` 282 + - the observer queue drains one structured action for the session `call_id` 283 + - Keep the existing nav-target stripping test so both queue side-effect channels stay covered. 284 + 285 + Integration updates: `tests/test_voice_integration.py` 286 + 287 + - Add a `_FakeConn` scripted-events case that emits `observer.start_listening`. 288 + - Assert the tool output returned to the fake OpenAI conversation omits `_observer_action`. 289 + - Poll `GET /api/voice/observer-actions?call_id=...` and assert the queued action is returned once and then cleared. 290 + - Keep the current journal/nav-hint round trip as-is; the observer-action case is a sibling integration scenario, not a replacement. 291 + 292 + Expected non-change: 293 + 294 + - `tests/test_voice_sideband.py` should not need structural changes because `dispatch_tool_call(...)` keeps the same signature and remains the only queueing seam the sideband loop calls (`tests/test_voice_sideband.py:60-90`). 295 + 296 + ## 11. Decision list 297 + 298 + - Separate queue module vs. generic refactor: separate module, `think/voice/observer_queue.py`. 299 + - Lenient `call_id` semantics: `GET /api/voice/observer-actions` is always HTTP 200 with an `actions` array, even for missing or unknown `call_id`. 300 + - Sentinel `_observer_action` vs. handler `call_id` parameter: sentinel. `dispatch_tool_call(...)` remains the uniform side-effect seam, matching `_nav_target`. 301 + - Filename-stem constraint surfaced to iOS client: document `audio.m4a` as the required raw filename shape for downstream transcript discoverability. 302 + - No ingest code changes: confirmed compatible. Wave 4 only documents the existing server contract and downstream caveats. 303 + 304 + ## 12. Risks / open questions 305 + 306 + - The observer-action queue is in-memory and ephemeral. Restarting the process drops queued actions, just like nav hints. That is acceptable for this adjunct path but worth keeping explicit. 307 + - Lenient HTTP 200 semantics are the right client contract here, but they can hide client-side `call_id` plumbing mistakes if logs are not watched. Warning logs on blank enqueue attempts help, but the route itself will not reveal misuse with a 4xx. 308 + - The queue is keyed by the session `call_id` passed to `POST /api/voice/connect`, not the per-tool `event.call_id` echoed in `function_call_output`. The design makes that distinction explicit because the current fake integration/tests mostly use the same string for both (`tests/test_voice_integration.py:15-57`). 309 + - Queue TTL and cap mean actions can disappear under long client stalls or bursty tool output. That matches nav-hint behavior and is acceptable, but the client should keep polling cadence tight. 310 + - `GET /api/voice/nav-hints` currently returns HTTP 400 on missing `call_id`, but `tests/test_voice_routes.py` does not cover that branch today. That test gap is out of scope for this lode but worth a small follow-up. 311 + 312 + ## 13. Sources 313 + 314 + - `docs/design/voice-server.md:71-87`, `docs/design/voice-server.md:172-191`, `docs/design/voice-server.md:215-233`, `docs/design/voice-server.md:412-446` 315 + - `docs/design/push.md:11-45`, `docs/design/push.md:117-224`, `docs/design/push.md:500-583` 316 + - `convey/__init__.py:152-169` 317 + - `convey/voice.py:26-30`, `convey/voice.py:105-127`, `convey/voice.py:160-166` 318 + - `think/voice/nav_queue.py:17-84` 319 + - `think/voice/sideband.py:20-37` 320 + - `think/voice/tools.py:121-135`, `think/voice/tools.py:686-739` 321 + - `tests/test_voice_nav_queue.py:11-59` 322 + - `tests/test_voice_tools.py:251-289` 323 + - `tests/test_voice_routes.py:51-61` 324 + - `tests/test_voice_sideband.py:60-90` 325 + - `tests/test_voice_integration.py:15-57`, `tests/test_voice_integration.py:115-159` 326 + - `apps/observer/routes.py:309-367`, `apps/observer/routes.py:503-643` 327 + - `apps/observer/tests/test_routes.py:1276-1368`, `apps/observer/tests/test_routes.py:1552-1586`, `apps/observer/tests/test_routes.py:1654-1694` 328 + - `observe/transcribe/main.py:58`, `observe/transcribe/main.py:165`, `observe/transcribe/main.py:594-609`, `observe/transcribe/main.py:885-889` 329 + - `observe/sense.py:606-667`, `observe/sense.py:1096-1100` 330 + - `media.py:6-25` 331 + - `think/cluster.py:132-136`, `think/cluster.py:397-405` 332 + - `apps/transcripts/routes.py:256-258` 333 + - `think/retention.py:73-100`
+68 -2
tests/test_voice_integration.py
··· 46 46 self._state = state 47 47 self.conversation = _FakeConversation(state) 48 48 self.response = _FakeResponse(state) 49 - self._events = iter( 49 + events = getattr( 50 + state, 51 + "events", 50 52 [ 51 53 _FakeEvent( 52 54 name="journal.get_day", 53 55 arguments='{"day":"2026-03-04"}', 54 56 call_id="call-1", 55 57 ) 56 - ] 58 + ], 57 59 ) 60 + self._events = iter(events) 58 61 59 62 def __aiter__(self): 60 63 return self ··· 157 160 "hints": ["today/journal/2026-03-04"], 158 161 "consumed": True, 159 162 } 163 + 164 + 165 + def test_voice_observer_action_round_trip(integration_client, monkeypatch): 166 + client, _ = integration_client 167 + state = SimpleNamespace( 168 + session_payloads=[], 169 + connect_calls=[], 170 + outputs=[], 171 + response_creates=0, 172 + events=[ 173 + _FakeEvent( 174 + name="observer.start_listening", 175 + arguments=json.dumps({"mode": "meeting"}), 176 + call_id="call-obs-int", 177 + ) 178 + ], 179 + ) 180 + FakeAsyncOpenAI.state = state 181 + 182 + monkeypatch.setattr("convey.voice.AsyncOpenAI", FakeAsyncOpenAI) 183 + monkeypatch.setattr("think.voice.sideband.AsyncOpenAI", FakeAsyncOpenAI) 184 + monkeypatch.setattr("convey.voice.get_openai_api_key", lambda: "sk-test") 185 + monkeypatch.setattr("think.voice.sideband.get_openai_api_key", lambda: "sk-test") 186 + monkeypatch.setattr( 187 + "convey.voice.brain.wait_until_ready", lambda app, timeout: True 188 + ) 189 + monkeypatch.setattr("convey.voice.brain.brain_is_stale", lambda app: False) 190 + 191 + session_response = client.post("/api/voice/session") 192 + assert session_response.status_code == 200 193 + assert session_response.get_json() == {"ephemeral_key": "ek-test"} 194 + 195 + connect_response = client.post( 196 + "/api/voice/connect", json={"call_id": "call-obs-int"} 197 + ) 198 + assert connect_response.status_code == 200 199 + assert connect_response.get_json() == {"status": "connected"} 200 + 201 + deadline = time.time() + 1.0 202 + while time.time() < deadline and not state.outputs: 203 + time.sleep(0.01) 204 + 205 + assert state.connect_calls == [{"call_id": "call-obs-int", "model": "gpt-realtime"}] 206 + assert state.outputs 207 + tool_output = json.loads(state.outputs[0]["output"]) 208 + assert tool_output == { 209 + "status": "requested", 210 + "mode": "meeting", 211 + "note": "sol will start listening shortly", 212 + } 213 + assert "_observer_action" not in tool_output 214 + assert state.response_creates == 1 215 + 216 + actions_response = client.get("/api/voice/observer-actions?call_id=call-obs-int") 217 + assert actions_response.status_code == 200 218 + assert actions_response.get_json() == { 219 + "actions": [{"type": "start_observer", "mode": "meeting"}], 220 + "consumed": True, 221 + } 222 + 223 + second_actions = client.get("/api/voice/observer-actions?call_id=call-obs-int") 224 + assert second_actions.status_code == 200 225 + assert second_actions.get_json() == {"actions": [], "consumed": True}
+85
tests/test_voice_observer_queue.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + from concurrent.futures import ThreadPoolExecutor 7 + 8 + from think.voice.observer_queue import ObserverActionQueue 9 + 10 + 11 + def test_observer_queue_returns_empty_for_unknown_call_id(): 12 + queue = ObserverActionQueue() 13 + assert queue.drain("call-1", now=100.0) == [] 14 + 15 + 16 + def test_observer_queue_drain_clears_queue(): 17 + queue = ObserverActionQueue() 18 + queue.push("call-1", {"type": "start_observer", "mode": "meeting"}, now=100.0) 19 + 20 + assert queue.drain("call-1", now=100.0) == [ 21 + {"type": "start_observer", "mode": "meeting"} 22 + ] 23 + assert queue.drain("call-1", now=100.0) == [] 24 + 25 + 26 + def test_observer_queue_drops_expired_actions(): 27 + queue = ObserverActionQueue(ttl_seconds=10) 28 + queue.push("call-1", {"type": "start_observer", "mode": "meeting"}, now=100.0) 29 + queue.push( 30 + "call-1", 31 + {"type": "start_observer", "mode": "voice_memo"}, 32 + now=111.0, 33 + ) 34 + 35 + assert queue.drain("call-1", now=111.0) == [ 36 + {"type": "start_observer", "mode": "voice_memo"} 37 + ] 38 + 39 + 40 + def test_observer_queue_enforces_fifo_capacity(): 41 + queue = ObserverActionQueue(capacity=3) 42 + for idx in range(5): 43 + queue.push( 44 + "call-1", 45 + {"type": "start_observer", "mode": f"mode-{idx}"}, 46 + now=float(idx), 47 + ) 48 + 49 + assert queue.drain("call-1", now=10.0) == [ 50 + {"type": "start_observer", "mode": "mode-2"}, 51 + {"type": "start_observer", "mode": "mode-3"}, 52 + {"type": "start_observer", "mode": "mode-4"}, 53 + ] 54 + 55 + 56 + def test_observer_queue_rejects_blank_call_id(): 57 + queue = ObserverActionQueue() 58 + queue.push(" ", {"type": "start_observer", "mode": "meeting"}) 59 + 60 + assert queue.drain("call-1", now=100.0) == [] 61 + 62 + 63 + def test_observer_queue_rejects_blank_payload(): 64 + queue = ObserverActionQueue() 65 + queue.push("call-1", {}) 66 + 67 + assert queue.drain("call-1", now=100.0) == [] 68 + 69 + 70 + def test_observer_queue_is_thread_safe_for_push_then_drain(): 71 + queue = ObserverActionQueue(capacity=16) 72 + 73 + def push_action(index: int) -> None: 74 + queue.push( 75 + "call-1", 76 + {"type": "start_observer", "mode": f"mode-{index}"}, 77 + now=float(index), 78 + ) 79 + 80 + with ThreadPoolExecutor(max_workers=4) as pool: 81 + list(pool.map(push_action, range(8))) 82 + 83 + drained = queue.drain("call-1", now=8.0) 84 + assert len(drained) == 8 85 + assert {item["mode"] for item in drained} == {f"mode-{idx}" for idx in range(8)}
+40
tests/test_voice_routes.py
··· 9 9 import pytest 10 10 11 11 from convey import create_app 12 + from think.voice.observer_queue import get_observer_queue 12 13 13 14 14 15 @pytest.fixture ··· 59 60 response = voice_client.get("/api/voice/nav-hints?call_id=missing") 60 61 assert response.status_code == 200 61 62 assert response.get_json() == {"hints": [], "consumed": True} 63 + 64 + 65 + def test_observer_actions_missing_call_id_returns_empty(voice_client): 66 + response = voice_client.get("/api/voice/observer-actions") 67 + assert response.status_code == 200 68 + assert response.get_json() == {"actions": [], "consumed": True} 69 + 70 + 71 + def test_observer_actions_blank_call_id_returns_empty(voice_client): 72 + response = voice_client.get("/api/voice/observer-actions?call_id=%20%20") 73 + assert response.status_code == 200 74 + assert response.get_json() == {"actions": [], "consumed": True} 75 + 76 + 77 + def test_observer_actions_unknown_call_id_returns_empty(voice_client): 78 + response = voice_client.get("/api/voice/observer-actions?call_id=missing") 79 + assert response.status_code == 200 80 + assert response.get_json() == {"actions": [], "consumed": True} 81 + 82 + 83 + def test_observer_actions_drain_clears_queue(voice_client): 84 + queue = get_observer_queue() 85 + queue.clear() 86 + queue.push("call-obs-route", {"type": "start_observer", "mode": "meeting"}) 87 + queue.push("call-obs-route", {"type": "start_observer", "mode": "voice_memo"}) 88 + 89 + response = voice_client.get("/api/voice/observer-actions?call_id=call-obs-route") 90 + assert response.status_code == 200 91 + assert response.get_json() == { 92 + "actions": [ 93 + {"type": "start_observer", "mode": "meeting"}, 94 + {"type": "start_observer", "mode": "voice_memo"}, 95 + ], 96 + "consumed": True, 97 + } 98 + 99 + second = voice_client.get("/api/voice/observer-actions?call_id=call-obs-route") 100 + assert second.status_code == 200 101 + assert second.get_json() == {"actions": [], "consumed": True} 62 102 63 103 64 104 def test_status_reports_all_fields(voice_client, voice_app, monkeypatch):
+26 -3
tests/test_voice_tools.py
··· 16 16 ) 17 17 from think.indexer.journal import scan_journal 18 18 from think.voice import tools 19 + from think.voice.observer_queue import get_observer_queue 19 20 20 21 21 22 def _set_today(monkeypatch, day_value: date) -> None: ··· 250 251 251 252 def test_observer_start_listening_happy(): 252 253 assert tools.handle_observer_start_listening({"mode": "meeting"}, object()) == { 253 - "status": "ack", 254 + "status": "requested", 254 255 "mode": "meeting", 255 - "note": "wave-4 observer not yet wired", 256 + "note": "sol will start listening shortly", 257 + "_observer_action": {"type": "start_observer", "mode": "meeting"}, 256 258 } 257 259 258 260 ··· 273 275 object(), 274 276 ) 275 277 ) 276 - assert json.loads(result)["status"] == "ack" 278 + assert json.loads(result)["status"] == "requested" 277 279 assert queue.drain("call-123") == [] 278 280 279 281 stripped = asyncio.run( ··· 287 289 payload = json.loads(stripped) 288 290 assert "_nav_target" not in payload 289 291 assert queue.drain("call-123") == ["today/journal/2026-03-04"] 292 + 293 + 294 + def test_dispatch_tool_call_strips_observer_action(): 295 + queue = get_observer_queue() 296 + queue.clear() 297 + 298 + result = asyncio.run( 299 + tools.dispatch_tool_call( 300 + "observer.start_listening", 301 + json.dumps({"mode": "meeting"}), 302 + "call-obs-1", 303 + object(), 304 + ) 305 + ) 306 + 307 + assert json.loads(result) == { 308 + "status": "requested", 309 + "mode": "meeting", 310 + "note": "sol will start listening shortly", 311 + } 312 + assert queue.drain("call-obs-1") == [{"type": "start_observer", "mode": "meeting"}]
+102
think/voice/observer_queue.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """In-memory observer actions for voice turns.""" 5 + 6 + from __future__ import annotations 7 + 8 + import copy 9 + import logging 10 + import threading 11 + import time 12 + from collections import defaultdict, deque 13 + from dataclasses import dataclass 14 + from typing import Any, DefaultDict, Deque 15 + 16 + logger = logging.getLogger(__name__) 17 + 18 + OBSERVER_ACTION_TTL_SECONDS = 60 19 + OBSERVER_ACTION_CAPACITY = 8 20 + 21 + 22 + @dataclass(frozen=True) 23 + class QueuedAction: 24 + payload: dict[str, Any] 25 + created_at: float 26 + 27 + 28 + class ObserverActionQueue: 29 + """Thread-safe FIFO queue for voice observer actions.""" 30 + 31 + def __init__( 32 + self, 33 + *, 34 + ttl_seconds: int = OBSERVER_ACTION_TTL_SECONDS, 35 + capacity: int = OBSERVER_ACTION_CAPACITY, 36 + ) -> None: 37 + self.ttl_seconds = ttl_seconds 38 + self.capacity = capacity 39 + self._lock = threading.Lock() 40 + self._queues: DefaultDict[str, Deque[QueuedAction]] = defaultdict(deque) 41 + 42 + def push( 43 + self, 44 + call_id: str, 45 + action: dict[str, Any], 46 + *, 47 + now: float | None = None, 48 + ) -> None: 49 + cleaned_call_id = call_id.strip() 50 + if not cleaned_call_id: 51 + logger.warning("voice observer action rejected blank call_id") 52 + return 53 + if not action: 54 + logger.warning("voice observer action rejected empty payload") 55 + return 56 + current = time.time() if now is None else now 57 + with self._lock: 58 + queue = self._queues[cleaned_call_id] 59 + self._drop_expired(queue, current) 60 + queue.append(QueuedAction(copy.deepcopy(action), current)) 61 + while len(queue) > self.capacity: 62 + queue.popleft() 63 + logger.warning("voice observer action dropped for capacity") 64 + 65 + def drain(self, call_id: str, *, now: float | None = None) -> list[dict[str, Any]]: 66 + cleaned_call_id = call_id.strip() 67 + if not cleaned_call_id: 68 + return [] 69 + current = time.time() if now is None else now 70 + with self._lock: 71 + queue = self._queues.get(cleaned_call_id) 72 + if not queue: 73 + return [] 74 + self._drop_expired(queue, current) 75 + actions = [entry.payload for entry in queue] 76 + if cleaned_call_id in self._queues: 77 + del self._queues[cleaned_call_id] 78 + return actions 79 + 80 + def clear(self) -> None: 81 + with self._lock: 82 + self._queues.clear() 83 + 84 + def _drop_expired(self, queue: Deque[QueuedAction], now: float) -> None: 85 + while queue and now - queue[0].created_at > self.ttl_seconds: 86 + queue.popleft() 87 + 88 + 89 + _OBSERVER_QUEUE = ObserverActionQueue() 90 + 91 + 92 + def get_observer_queue() -> ObserverActionQueue: 93 + return _OBSERVER_QUEUE 94 + 95 + 96 + __all__ = [ 97 + "OBSERVER_ACTION_CAPACITY", 98 + "OBSERVER_ACTION_TTL_SECONDS", 99 + "ObserverActionQueue", 100 + "QueuedAction", 101 + "get_observer_queue", 102 + ]
+17 -4
think/voice/tools.py
··· 25 25 from think.surfaces.profile import full as load_profile 26 26 from think.utils import day_path 27 27 from think.voice.nav_queue import get_nav_queue 28 + from think.voice.observer_queue import get_observer_queue 28 29 29 30 logger = logging.getLogger(__name__) 30 31 ··· 121 122 { 122 123 "type": "function", 123 124 "name": "observer.start_listening", 124 - "description": "Acknowledge the Wave 2 observer-listen stub.", 125 + "description": ( 126 + "Request sol to start listening in the given mode. " 127 + "Returns immediately after queueing the start request." 128 + ), 125 129 "parameters": { 126 130 "type": "object", 127 131 "properties": {"mode": {"type": "string"}}, ··· 690 694 mode = _clean_optional_str(payload.get("mode")) 691 695 if mode is None or mode not in VALID_LISTEN_MODES: 692 696 return {"error": "invalid mode"} 693 - logger.info("voice observer listen request acknowledged mode=%s", mode) 697 + logger.info("voice observer listen request queued mode=%s", mode) 694 698 return { 695 - "status": "ack", 699 + "status": "requested", 696 700 "mode": mode, 697 - "note": "wave-4 observer not yet wired", 701 + "note": "sol will start listening shortly", 702 + "_observer_action": {"type": "start_observer", "mode": mode}, 698 703 } 699 704 700 705 ··· 736 741 nav_target = result.pop("_nav_target", None) 737 742 if isinstance(nav_target, str) and nav_target.strip(): 738 743 get_nav_queue().push(call_id, nav_target) 744 + observer_action = result.pop("_observer_action", None) 745 + if ( 746 + isinstance(observer_action, dict) 747 + and observer_action 748 + and isinstance(call_id, str) 749 + and call_id.strip() 750 + ): 751 + get_observer_queue().push(call_id, observer_action) 739 752 return json.dumps(result) 740 753 741 754