personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(importers): plaud sync emits and checkpoints without waiting on transcription

Add `wait_for_processing` kwarg (default True) to `import_one` so PlaudBackend.sync
can return after segment creation instead of blocking on per-segment transcription.
Sync now also persists imports/plaud.json after each successful per-file import,
so partial progress survives the supervisor max_runtime cap. Interactive
`sol import file.opus` keeps end-to-end behavior via the default. granola.py and
obsidian.py are unchanged — their text-import paths don't use _wait_for_segments.

Co-Authored-By: Codex <codex@openai.com>

+114 -23
+45
tests/test_importer.py
··· 6 6 import importlib 7 7 import json 8 8 import subprocess 9 + import time 9 10 import zipfile 10 11 from pathlib import Path 11 12 from unittest.mock import ANY, MagicMock, patch ··· 1215 1216 1216 1217 with pytest.raises(ValueError, match="timestamp must be in YYYYMMDD_HHMMSS format"): 1217 1218 mod.import_one(media, timestamp="not-a-timestamp") 1219 + 1220 + 1221 + def test_import_one_skips_wait_when_disabled(tmp_path, monkeypatch): 1222 + mod = importlib.import_module("think.importers.cli") 1223 + 1224 + audio_file = tmp_path / "test.mp3" 1225 + audio_file.write_bytes(b"fake audio") 1226 + callosum = MagicMock() 1227 + 1228 + def fake_prepare_audio_segments(media_path, day_dir, base_dt, import_id, stream): 1229 + seg_dir = Path(day_dir) / stream / "120000_300" 1230 + seg_dir.mkdir(parents=True, exist_ok=True) 1231 + (seg_dir / "imported_audio.mp3").write_bytes(b"sliced audio") 1232 + return [("120000_300", seg_dir, ["imported_audio.mp3"])] 1233 + 1234 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 1235 + monkeypatch.setattr(mod, "CallosumConnection", lambda **kwargs: callosum) 1236 + monkeypatch.setattr(mod, "get_rev", lambda: "test-rev") 1237 + monkeypatch.setattr(mod, "_status_emitter", lambda: None) 1238 + monkeypatch.setattr(mod, "prepare_audio_segments", fake_prepare_audio_segments) 1239 + monkeypatch.setattr( 1240 + mod, 1241 + "update_stream", 1242 + lambda stream, day, seg, **kwargs: { 1243 + "prev_day": None, 1244 + "prev_segment": None, 1245 + "seq": 1, 1246 + }, 1247 + ) 1248 + monkeypatch.setattr(mod, "write_segment_stream", lambda *args, **kwargs: None) 1249 + 1250 + start = time.monotonic() 1251 + result = mod.import_one( 1252 + audio_file, 1253 + timestamp="20260303_120000", 1254 + source="audio", 1255 + wait_for_processing=False, 1256 + ) 1257 + elapsed = time.monotonic() - start 1258 + 1259 + assert result is not None 1260 + assert elapsed < 5 1261 + assert result.get("segments") 1262 + assert "failed_segments" not in result 1218 1263 1219 1264 1220 1265 def test_file_importer_indexes_created_files_in_process(tmp_path, monkeypatch):
+37
tests/test_importer_sync.py
··· 438 438 assert call.kwargs["source"] == "plaud" 439 439 assert call.kwargs["auto"] is True 440 440 assert call.kwargs["timestamp"] 441 + assert call.kwargs["wait_for_processing"] is False 441 442 442 443 state = load_sync_state(tmp_path, "plaud") 443 444 assert state["files"]["file1"]["status"] == "imported" 444 445 assert state["files"]["file2"]["status"] == "imported" 446 + 447 + 448 + def test_plaud_sync_checkpoints_catalog_per_file(tmp_path, monkeypatch): 449 + from think.importers.plaud import PlaudBackend 450 + 451 + monkeypatch.setenv("PLAUD_ACCESS_TOKEN", "test-token") 452 + saved_states = [] 453 + 454 + def fake_download(_session, _url, dest_path, progress_cb=None): 455 + dest_path.write_bytes(b"audio") 456 + if progress_cb: 457 + progress_cb() 458 + return True 459 + 460 + def fake_save_sync_state(_journal_root, _backend, state): 461 + saved_states.append(json.loads(json.dumps(state))) 462 + 463 + with ( 464 + patch("think.importers.plaud.list_files", side_effect=_mock_list_files), 465 + patch("think.importers.plaud.get_temp_url", return_value="https://temp"), 466 + patch("think.importers.plaud.download_to_file", side_effect=fake_download), 467 + patch("think.importers.plaud.import_one", return_value={"segments": ["seg"]}), 468 + patch( 469 + "think.importers.sync.save_sync_state", 470 + side_effect=fake_save_sync_state, 471 + ) as save_mock, 472 + ): 473 + result = PlaudBackend().sync(tmp_path, dry_run=False) 474 + 475 + assert result["downloaded"] == 2 476 + assert save_mock.call_count == 3 477 + per_iteration_states = saved_states[:-1] 478 + assert any( 479 + any(info.get("status") == "imported" for info in state["files"].values()) 480 + for state in per_iteration_states 481 + ) 445 482 446 483 447 484 def test_plaud_sync_save_records_import_one_errors(tmp_path, monkeypatch):
+30 -23
think/importers/cli.py
··· 334 334 dry_run: bool = False, 335 335 json_output: bool = False, 336 336 verbose: bool = False, 337 + wait_for_processing: bool = True, 337 338 ) -> dict[str, Any] | None: 339 + """When False, returns after segment creation without awaiting transcription completion; 340 + failed_segments is omitted from the result and created_segments is the durable 341 + record of what was queued. 342 + """ 338 343 args = argparse.Namespace( 339 344 media=os.path.expanduser(str(media)), 340 345 timestamp=timestamp, ··· 346 351 dry_run=dry_run, 347 352 json=json_output, 348 353 verbose=verbose, 354 + wait_for_processing=wait_for_processing, 349 355 ) 350 356 return _import_one_from_args(args) 351 357 ··· 1022 1028 ) 1023 1029 logger.info(f"Emitted observe.observing for segment: {day}/{seg_key}") 1024 1030 1025 - # Wait for transcription to complete 1026 - _set_stage("transcribing") 1027 - pending = set(created_segments) 1028 - segment_timeout = 600 # 10 minutes since last progress 1029 - transcribe_start = time.monotonic() 1030 - new_failed_segments, _completed_count = _wait_for_segments( 1031 - _message_queue, 1032 - pending, 1033 - segment_timeout, 1034 - total_segments=len(created_segments), 1035 - ) 1036 - failed_segments.extend(new_failed_segments) 1031 + if args.wait_for_processing: 1032 + # Wait for transcription to complete 1033 + _set_stage("transcribing") 1034 + pending = set(created_segments) 1035 + segment_timeout = 600 # 10 minutes since last progress 1036 + transcribe_start = time.monotonic() 1037 + new_failed_segments, _completed_count = _wait_for_segments( 1038 + _message_queue, 1039 + pending, 1040 + segment_timeout, 1041 + total_segments=len(created_segments), 1042 + ) 1043 + failed_segments.extend(new_failed_segments) 1037 1044 1038 - if failed_segments: 1039 - logger.warning( 1040 - f"{len(failed_segments)} of {len(created_segments)} " 1041 - f"segments failed: {failed_segments}" 1042 - ) 1043 - else: 1044 - total_elapsed = int(time.monotonic() - transcribe_start) 1045 - logger.info( 1046 - f"All {len(created_segments)} segments " 1047 - f"transcribed successfully ({total_elapsed}s)" 1048 - ) 1045 + if failed_segments: 1046 + logger.warning( 1047 + f"{len(failed_segments)} of {len(created_segments)} " 1048 + f"segments failed: {failed_segments}" 1049 + ) 1050 + else: 1051 + total_elapsed = int(time.monotonic() - transcribe_start) 1052 + logger.info( 1053 + f"All {len(created_segments)} segments " 1054 + f"transcribed successfully ({total_elapsed}s)" 1055 + ) 1049 1056 1050 1057 # Complete processing metadata 1051 1058 processing_results["processing_completed"] = dt.datetime.now().isoformat()
+2
think/importers/plaud.py
··· 538 538 source="plaud", 539 539 auto=True, 540 540 verbose=False, 541 + wait_for_processing=False, 541 542 ) 542 543 except Exception as exc: 543 544 import_elapsed = int(time.monotonic() - import_start) ··· 552 553 info["status"] = "imported" 553 554 info["import_timestamp"] = ts 554 555 info["imported_at"] = dt.datetime.now().isoformat() 556 + save_sync_state(journal_root, "plaud", state) 555 557 downloaded += 1 556 558 logger.info(" Imported successfully (%ss)", import_elapsed) 557 559