personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor think/importer.py into think/importers/ package

Split monolithic importer module into focused submodules:
- cli.py: CLI entry point and Callosum status tracking
- audio.py: audio segmentation (slice, duration, prepare)
- text.py: transcript processing and PDF reading
- shared.py: cross-cutting helpers (JSONL writing, import setup, summary)
- utils.py: journal-side import utilities (moved from think/importer_utils.py)
- plaud.py: Plaud API sync (relocated from observe/plaud.py)

Updated all callers: sol.py, apps/import/routes.py, tests, docs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+1139 -1100
+1 -1
apps/import/routes.py
··· 12 12 13 13 from convey import emit, state 14 14 from think.detect_created import detect_created 15 - from think.importer_utils import ( 15 + from think.importers.utils import ( 16 16 build_import_info, 17 17 get_import_details, 18 18 list_import_timestamps,
+1 -1
docs/CALLOSUM.md
··· 97 97 **Event Log:** Observe, dream, and activity tract events with `day` + `segment` are logged to `<day>/<segment>/events.jsonl` by supervisor 98 98 99 99 ### `importer` - Media import processing 100 - **Source:** `think/importer.py` 100 + **Source:** `think/importers/cli.py` 101 101 **Events:** `started`, `status`, `completed`, `error` 102 102 **Key fields:** `import_id` (correlates all events), `stage`, `segments` (created segment keys), `stream` (stream name, e.g., `"import.apple"`) 103 103 **Stages:** `initialization`, `segmenting`, `transcribing`, `summarizing`
-1
observe/plaud.py think/importers/plaud.py
··· 1 - #!/usr/bin/env python3 2 1 # SPDX-License-Identifier: AGPL-3.0-only 3 2 # Copyright (c) 2026 sol pbc 4 3
+4 -4
sol.py
··· 6 6 Usage: 7 7 sol Show status and available commands 8 8 sol <command> [args] Run a subcommand 9 - sol <module> [args] Run by module path (e.g., sol think.importer) 9 + sol <module> [args] Run by module path (e.g., sol think.importers.cli) 10 10 11 11 Examples: 12 12 sol import data.json Import data into journal ··· 38 38 39 39 COMMANDS: dict[str, str] = { 40 40 # think package - daily processing and analysis 41 - "import": "think.importer", 41 + "import": "think.importers.cli", 42 42 "dream": "think.dream", 43 43 "planner": "think.planner", 44 44 "indexer": "think.indexer", ··· 194 194 print() 195 195 196 196 print("Direct module syntax: sol <module.path> [args]") 197 - print("Example: sol think.importer --help") 197 + print("Example: sol think.importers.cli --help") 198 198 199 199 200 200 def resolve_command(name: str) -> tuple[str, list[str]]: ··· 234 234 """Import and run a module's main() function. 235 235 236 236 Args: 237 - module_path: Dotted module path (e.g., "think.importer") 237 + module_path: Dotted module path (e.g., "think.importers.cli") 238 238 239 239 Returns: 240 240 Exit code (0 for success)
+18 -10
tests/test_activities.py
··· 141 141 142 142 # Check browsing (predefined with instructions override) 143 143 browsing = next(a for a in loaded if a["id"] == "browsing") 144 - assert browsing["instructions"] == "Custom browsing instructions for this facet" 144 + assert ( 145 + browsing["instructions"] 146 + == "Custom browsing instructions for this facet" 147 + ) 145 148 146 149 # Check custom activity with instructions 147 150 custom = next(a for a in loaded if a["id"] == "custom_activity") ··· 207 210 for a in get_facet_activities("test_facet") 208 211 if a["id"] == "3d_modeling" 209 212 ) 210 - assert modeling["instructions"] == "Detect via: Blender, FreeCAD, OpenSCAD windows" 213 + assert ( 214 + modeling["instructions"] 215 + == "Detect via: Blender, FreeCAD, OpenSCAD windows" 216 + ) 211 217 212 218 # Remove it 213 219 removed = remove_activity_from_facet("test_facet", "meeting") ··· 256 262 instructions="Only detect scheduled meetings, not ad-hoc calls", 257 263 ) 258 264 assert updated is not None 259 - assert updated["instructions"] == "Only detect scheduled meetings, not ad-hoc calls" 265 + assert ( 266 + updated["instructions"] 267 + == "Only detect scheduled meetings, not ad-hoc calls" 268 + ) 260 269 # Other fields should be preserved 261 270 assert updated["priority"] == "low" 262 271 263 272 # Verify via lookup 264 273 activity = get_activity_by_id("test_facet", "meeting") 265 274 assert activity["priority"] == "low" 266 - assert activity["instructions"] == "Only detect scheduled meetings, not ad-hoc calls" 275 + assert ( 276 + activity["instructions"] 277 + == "Only detect scheduled meetings, not ad-hoc calls" 278 + ) 267 279 268 280 # Reset instructions to default via empty string 269 281 from think.activities import DEFAULT_ACTIVITIES ··· 271 283 default_instructions = next( 272 284 a["instructions"] for a in DEFAULT_ACTIVITIES if a["id"] == "meeting" 273 285 ) 274 - updated = update_activity_in_facet( 275 - "test_facet", "meeting", instructions="" 276 - ) 286 + updated = update_activity_in_facet("test_facet", "meeting", instructions="") 277 287 assert updated is not None 278 288 assert updated["instructions"] == default_instructions 279 289 ··· 281 291 default_desc = next( 282 292 a["description"] for a in DEFAULT_ACTIVITIES if a["id"] == "meeting" 283 293 ) 284 - updated = update_activity_in_facet( 285 - "test_facet", "meeting", description="" 286 - ) 294 + updated = update_activity_in_facet("test_facet", "meeting", description="") 287 295 assert updated is not None 288 296 assert updated["description"] == default_desc 289 297
+12 -11
tests/test_importer.py
··· 13 13 14 14 def test_slice_audio_segment(tmp_path): 15 15 """Test slice_audio_segment extracts audio with stream copy.""" 16 - mod = importlib.import_module("think.importer") 16 + mod = importlib.import_module("think.importers.audio") 17 17 18 18 source = tmp_path / "source.mp3" 19 19 source.write_bytes(b"fake audio") ··· 34 34 35 35 def test_slice_audio_segment_fallback(tmp_path): 36 36 """Test slice_audio_segment falls back to re-encode on copy failure.""" 37 - mod = importlib.import_module("think.importer") 37 + mod = importlib.import_module("think.importers.audio") 38 38 39 39 source = tmp_path / "source.mp3" 40 40 source.write_bytes(b"fake audio") ··· 60 60 61 61 def test_importer_text(tmp_path, monkeypatch): 62 62 """Test importing a text transcript file.""" 63 - mod = importlib.import_module("think.importer") 63 + mod = importlib.import_module("think.importers.cli") 64 + text_mod = importlib.import_module("think.importers.text") 64 65 65 66 transcript = "hello\nworld" 66 67 txt = tmp_path / "sample.txt" ··· 75 76 def mock_detect_segment(text, start_time): 76 77 return [("12:00:00", "seg1"), ("12:05:00", "seg2")] 77 78 78 - monkeypatch.setattr(mod, "detect_transcript_segment", mock_detect_segment) 79 + monkeypatch.setattr(text_mod, "detect_transcript_segment", mock_detect_segment) 79 80 80 81 # Mock JSON conversion: returns entries with absolute timestamps 81 82 def mock_detect_json(text, segment_start): 82 83 return [{"start": segment_start, "speaker": "Unknown", "text": text}] 83 84 84 - monkeypatch.setattr(mod, "detect_transcript_json", mock_detect_json) 85 + monkeypatch.setattr(text_mod, "detect_transcript_json", mock_detect_json) 85 86 86 87 # Mock CallosumConnection and status emitter to avoid real sockets/threads 87 88 monkeypatch.setattr(mod, "CallosumConnection", lambda **kwargs: MagicMock()) ··· 125 126 126 127 def test_get_audio_duration(tmp_path): 127 128 """Test _get_audio_duration calls ffprobe correctly.""" 128 - mod = importlib.import_module("think.importer") 129 + mod = importlib.import_module("think.importers.audio") 129 130 130 131 audio_file = tmp_path / "test.mp3" 131 132 audio_file.write_bytes(b"fake audio") ··· 146 147 147 148 def test_get_audio_duration_failure(tmp_path): 148 149 """Test _get_audio_duration returns None on error.""" 149 - mod = importlib.import_module("think.importer") 150 + mod = importlib.import_module("think.importers.audio") 150 151 151 152 audio_file = tmp_path / "test.mp3" 152 153 audio_file.write_bytes(b"fake audio") ··· 160 161 161 162 def test_prepare_audio_segments(tmp_path, monkeypatch): 162 163 """Test prepare_audio_segments creates segment directories with audio slices.""" 163 - mod = importlib.import_module("think.importer") 164 + mod = importlib.import_module("think.importers.audio") 164 165 165 166 monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 166 167 ··· 213 214 214 215 def test_prepare_audio_segments_with_collision(tmp_path, monkeypatch): 215 216 """Test prepare_audio_segments handles segment key collisions.""" 216 - mod = importlib.import_module("think.importer") 217 + mod = importlib.import_module("think.importers.audio") 217 218 218 219 monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 219 220 ··· 258 259 259 260 def test_run_import_summary(tmp_path, monkeypatch): 260 261 """Test _run_import_summary calls cortex_request correctly.""" 261 - mod = importlib.import_module("think.importer") 262 + mod = importlib.import_module("think.importers.shared") 262 263 263 264 import_dir = tmp_path / "imports" / "20240101_120000" 264 265 import_dir.mkdir(parents=True) ··· 300 301 301 302 def test_run_import_summary_no_segments(tmp_path): 302 303 """Test _run_import_summary returns False with no segments.""" 303 - mod = importlib.import_module("think.importer") 304 + mod = importlib.import_module("think.importers.shared") 304 305 305 306 import_dir = tmp_path / "imports" / "20240101_120000" 306 307 import_dir.mkdir(parents=True)
+2 -2
tests/test_importer_jsonl.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """Tests for think.importer JSONL format writing.""" 4 + """Tests for think.importers.shared JSONL format writing.""" 5 5 6 6 import json 7 7 import tempfile 8 8 from pathlib import Path 9 9 10 - from think.importer import _write_import_jsonl 10 + from think.importers.shared import _write_import_jsonl 11 11 12 12 13 13 def test_write_import_jsonl_with_entries():
+2 -2
tests/test_importer_utils.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """Tests for think.importer_utils module.""" 4 + """Tests for think.importers.utils module.""" 5 5 6 6 import json 7 7 import tempfile ··· 9 9 10 10 import pytest 11 11 12 - from think.importer_utils import ( 12 + from think.importers.utils import ( 13 13 build_import_info, 14 14 calculate_duration_from_files, 15 15 get_import_details,
+3 -3
tests/test_sol.py
··· 17 17 def test_resolve_known_command(self): 18 18 """Test resolving a known command from registry.""" 19 19 module_path, preset_args = sol.resolve_command("import") 20 - assert module_path == "think.importer" 20 + assert module_path == "think.importers.cli" 21 21 assert preset_args == [] 22 22 23 23 def test_resolve_direct_module_path(self): 24 24 """Test resolving a direct module path with dot.""" 25 - module_path, preset_args = sol.resolve_command("think.importer") 26 - assert module_path == "think.importer" 25 + module_path, preset_args = sol.resolve_command("think.importers.cli") 26 + assert module_path == "think.importers.cli" 27 27 assert preset_args == [] 28 28 29 29 def test_resolve_nested_module_path(self):
+2 -2
think/activities.py
··· 335 335 entry["description"] = activity["description"] 336 336 337 337 # Store instructions only if different from default 338 - if activity.get("instructions") and activity[ 338 + if activity.get("instructions") and activity["instructions"] != default.get( 339 339 "instructions" 340 - ] != default.get("instructions"): 340 + ): 341 341 entry["instructions"] = activity["instructions"] 342 342 343 343 # Store priority if set
-1063
think/importer.py
··· 1 - # SPDX-License-Identifier: AGPL-3.0-only 2 - # Copyright (c) 2026 sol pbc 3 - 4 - import argparse 5 - import datetime as dt 6 - import json 7 - import logging 8 - import os 9 - import queue 10 - import re 11 - import shutil 12 - import subprocess 13 - import threading 14 - import time 15 - from datetime import timedelta 16 - from pathlib import Path 17 - 18 - from observe.utils import find_available_segment 19 - from think.callosum import CallosumConnection 20 - from think.detect_created import detect_created 21 - from think.detect_transcript import detect_transcript_json, detect_transcript_segment 22 - from think.importer_utils import ( 23 - save_import_file, 24 - save_import_segments, 25 - write_import_metadata, 26 - ) 27 - from think.streams import stream_name, update_stream, write_segment_stream 28 - from think.utils import ( 29 - day_path, 30 - get_journal, 31 - get_rev, 32 - now_ms, 33 - segment_key, 34 - setup_cli, 35 - ) 36 - 37 - try: 38 - from pypdf import PdfReader 39 - except Exception: # pragma: no cover - optional dependency 40 - PdfReader = None 41 - 42 - logger = logging.getLogger(__name__) 43 - 44 - TIME_RE = re.compile(r"\d{8}_\d{6}") 45 - 46 - # Importer tract state 47 - _callosum: CallosumConnection | None = None 48 - _message_queue: queue.Queue | None = None 49 - _import_id: str | None = None 50 - _current_stage: str = "initialization" 51 - _start_time: float = 0.0 52 - _stage_start_time: float = 0.0 53 - _stages_run: list[str] = [] 54 - _status_thread: threading.Thread | None = None 55 - _status_running: bool = False 56 - 57 - 58 - def _get_relative_path(path: str) -> str: 59 - """Get path relative to journal, or return as-is if not under journal.""" 60 - journal_path = get_journal() 61 - try: 62 - return os.path.relpath(path, journal_path) 63 - except ValueError: 64 - return path 65 - 66 - 67 - def _set_stage(stage: str) -> None: 68 - """Update current stage and track timing.""" 69 - global _current_stage, _stage_start_time 70 - _current_stage = stage 71 - _stage_start_time = time.monotonic() 72 - if stage not in _stages_run: 73 - _stages_run.append(stage) 74 - logger.debug(f"Stage changed to: {stage}") 75 - 76 - 77 - def _status_emitter() -> None: 78 - """Background thread that emits status events every 5 seconds.""" 79 - while _status_running: 80 - if _callosum and _import_id: 81 - elapsed_ms = int((time.monotonic() - _start_time) * 1000) 82 - stage_elapsed_ms = int((time.monotonic() - _stage_start_time) * 1000) 83 - _callosum.emit( 84 - "importer", 85 - "status", 86 - import_id=_import_id, 87 - stage=_current_stage, 88 - elapsed_ms=elapsed_ms, 89 - stage_elapsed_ms=stage_elapsed_ms, 90 - ) 91 - time.sleep(5) 92 - 93 - 94 - def _write_import_jsonl( 95 - file_path: str, 96 - entries: list[dict], 97 - *, 98 - import_id: str, 99 - raw_filename: str | None = None, 100 - facet: str | None = None, 101 - setting: str | None = None, 102 - ) -> None: 103 - """Write imported transcript entries in JSONL format. 104 - 105 - First line contains imported metadata, subsequent lines contain entries. 106 - Each entry gets source="import" added to match the imported_audio.jsonl convention. 107 - 108 - Args: 109 - file_path: Path to write JSONL file 110 - entries: List of transcript entries 111 - import_id: Import identifier 112 - raw_filename: Source file name (relative path from segment to imports/) 113 - facet: Optional facet name 114 - setting: Optional setting description 115 - """ 116 - imported_meta: dict[str, str] = {"id": import_id} 117 - if facet: 118 - imported_meta["facet"] = facet 119 - if setting: 120 - imported_meta["setting"] = setting 121 - 122 - # Build top-level metadata with imported info 123 - metadata: dict[str, object] = {"imported": imported_meta} 124 - 125 - # Add raw file reference (path relative from segment to imports directory) 126 - if raw_filename: 127 - metadata["raw"] = f"../../imports/{import_id}/{raw_filename}" 128 - 129 - # Write JSONL: metadata first, then entries with source field 130 - jsonl_lines = [json.dumps(metadata)] 131 - for entry in entries: 132 - # Add source field if not already present (skip metadata entries like topics/setting) 133 - if "text" in entry and "source" not in entry: 134 - entry = {**entry, "source": "import"} 135 - jsonl_lines.append(json.dumps(entry)) 136 - 137 - with open(file_path, "w", encoding="utf-8") as f: 138 - f.write("\n".join(jsonl_lines) + "\n") 139 - 140 - 141 - def str2bool(value: str) -> bool: 142 - if isinstance(value, bool): 143 - return value 144 - val = value.lower() 145 - if val in {"y", "yes", "true", "t", "1"}: 146 - return True 147 - if val in {"n", "no", "false", "f", "0"}: 148 - return False 149 - raise argparse.ArgumentTypeError("boolean value expected") 150 - 151 - 152 - def slice_audio_segment( 153 - source_path: str, 154 - output_path: str, 155 - start_seconds: float, 156 - duration_seconds: float, 157 - ) -> str: 158 - """Extract an audio segment from source file, preserving original format. 159 - 160 - Uses stream copy for lossless extraction when possible. 161 - 162 - Args: 163 - source_path: Path to source audio file 164 - output_path: Path for output segment file 165 - start_seconds: Start offset in seconds 166 - duration_seconds: Duration to extract in seconds 167 - 168 - Returns: 169 - Output path on success 170 - 171 - Raises: 172 - subprocess.CalledProcessError: If ffmpeg fails 173 - """ 174 - cmd = [ 175 - "ffmpeg", 176 - "-ss", 177 - str(start_seconds), 178 - "-i", 179 - source_path, 180 - "-t", 181 - str(duration_seconds), 182 - "-vn", # No video 183 - "-c:a", 184 - "copy", # Stream copy for lossless extraction 185 - "-y", # Overwrite output 186 - output_path, 187 - ] 188 - try: 189 - subprocess.run(cmd, check=True, capture_output=True, text=True) 190 - except subprocess.CalledProcessError: 191 - # Fallback: re-encode if stream copy fails (some formats don't support it) 192 - logger.debug(f"Stream copy failed, re-encoding: {output_path}") 193 - cmd_reencode = [ 194 - "ffmpeg", 195 - "-ss", 196 - str(start_seconds), 197 - "-i", 198 - source_path, 199 - "-t", 200 - str(duration_seconds), 201 - "-vn", 202 - "-y", 203 - output_path, 204 - ] 205 - subprocess.run(cmd_reencode, check=True, capture_output=True, text=True) 206 - 207 - logger.info(f"Created audio segment: {output_path}") 208 - return output_path 209 - 210 - 211 - def _get_audio_duration(audio_path: str) -> float | None: 212 - """Get audio duration in seconds using ffprobe. 213 - 214 - Args: 215 - audio_path: Path to audio file 216 - 217 - Returns: 218 - Duration in seconds, or None if unable to determine 219 - """ 220 - try: 221 - cmd = [ 222 - "ffprobe", 223 - "-v", 224 - "error", 225 - "-show_entries", 226 - "format=duration", 227 - "-of", 228 - "default=noprint_wrappers=1:nokey=1", 229 - audio_path, 230 - ] 231 - result = subprocess.run(cmd, capture_output=True, text=True, check=True) 232 - return float(result.stdout.strip()) 233 - except (subprocess.CalledProcessError, ValueError) as e: 234 - logger.warning(f"Could not determine audio duration: {e}") 235 - return None 236 - 237 - 238 - def prepare_audio_segments( 239 - media_path: str, 240 - day_dir: str, 241 - base_dt: dt.datetime, 242 - import_id: str, 243 - stream: str, 244 - ) -> list[tuple[str, Path, list[str]]]: 245 - """Slice audio into 5-minute segments for observe pipeline. 246 - 247 - Creates segment directories with audio slices, ready for transcription 248 - via observe.observing events. 249 - 250 - Args: 251 - media_path: Path to source audio file 252 - day_dir: Day directory path (YYYYMMDD) 253 - base_dt: Base datetime for timestamp calculation 254 - import_id: Import identifier 255 - stream: Stream name for directory layout (day/stream/segment/) 256 - 257 - Returns: 258 - List of (segment_key, segment_dir, files_list) tuples 259 - where files_list contains the audio filename(s) created 260 - """ 261 - media = Path(media_path) 262 - source_ext = media.suffix.lower() 263 - stream_dir = Path(day_dir) / stream 264 - 265 - # Get audio duration to calculate number of segments 266 - duration = _get_audio_duration(media_path) 267 - if duration is None: 268 - raise RuntimeError(f"Could not determine duration of {media_path}") 269 - 270 - # Calculate number of 5-minute segments (ceiling division) 271 - segment_duration = 300 # 5 minutes 272 - num_segments = int((duration + segment_duration - 1) // segment_duration) 273 - if num_segments == 0: 274 - num_segments = 1 # At least one segment for very short audio 275 - 276 - segments: list[tuple[str, Path, list[str]]] = [] 277 - 278 - for chunk_index in range(num_segments): 279 - # Calculate timestamp for this segment 280 - ts = base_dt + timedelta(minutes=chunk_index * 5) 281 - time_part = ts.strftime("%H%M%S") 282 - 283 - # Create segment key with 5-minute duration 284 - segment_key_candidate = f"{time_part}_{segment_duration}" 285 - 286 - # Check for collision and deconflict if needed 287 - available_key = find_available_segment(stream_dir, segment_key_candidate) 288 - if available_key is None: 289 - logger.warning( 290 - f"Could not find available segment key near {segment_key_candidate}" 291 - ) 292 - continue 293 - 294 - if available_key != segment_key_candidate: 295 - logger.info( 296 - f"Segment collision: {segment_key_candidate} -> {available_key}" 297 - ) 298 - 299 - # Create segment directory under stream 300 - segment_dir = stream_dir / available_key 301 - segment_dir.mkdir(parents=True, exist_ok=True) 302 - 303 - # Slice audio for this segment 304 - audio_filename = f"imported_audio{source_ext}" 305 - audio_path = segment_dir / audio_filename 306 - start_seconds = chunk_index * segment_duration 307 - 308 - # For the last segment, use remaining duration 309 - if chunk_index == num_segments - 1: 310 - chunk_duration = duration - start_seconds 311 - else: 312 - chunk_duration = segment_duration 313 - 314 - try: 315 - slice_audio_segment( 316 - media_path, 317 - str(audio_path), 318 - start_seconds, 319 - chunk_duration, 320 - ) 321 - segments.append((available_key, segment_dir, [audio_filename])) 322 - logger.info(f"Created segment: {available_key} with {audio_filename}") 323 - except subprocess.CalledProcessError as e: 324 - logger.warning(f"Failed to slice segment {available_key}: {e}") 325 - # Clean up empty directory 326 - if segment_dir.exists() and not any(segment_dir.iterdir()): 327 - segment_dir.rmdir() 328 - 329 - return segments 330 - 331 - 332 - def _read_transcript(path: str) -> str: 333 - """Return transcript text from a .txt/.md/.pdf file.""" 334 - ext = os.path.splitext(path)[1].lower() 335 - if ext in {".txt", ".md"}: 336 - with open(path, "r", encoding="utf-8") as f: 337 - return f.read() 338 - if ext == ".pdf": 339 - if PdfReader is None: 340 - raise RuntimeError("pypdf required for PDF support") 341 - reader = PdfReader(path) 342 - parts = [] 343 - for page in reader.pages: 344 - text = page.extract_text() or "" 345 - parts.append(text) 346 - return "\n".join(parts) 347 - raise ValueError("unsupported transcript format") 348 - 349 - 350 - def _time_to_seconds(time_str: str) -> int: 351 - """Convert HH:MM:SS time string to seconds from midnight.""" 352 - h, m, s = map(int, time_str.split(":")) 353 - return h * 3600 + m * 60 + s 354 - 355 - 356 - def process_transcript( 357 - path: str, 358 - day_dir: str, 359 - base_dt: dt.datetime, 360 - *, 361 - import_id: str, 362 - stream: str, 363 - facet: str | None = None, 364 - setting: str | None = None, 365 - audio_duration: int | None = None, 366 - ) -> list[str]: 367 - """Process a transcript file and write imported JSONL segments. 368 - 369 - Args: 370 - path: Path to transcript file 371 - day_dir: Journal day directory 372 - base_dt: Base datetime for the import 373 - import_id: Import identifier 374 - stream: Stream name for directory layout (day/stream/segment/) 375 - facet: Optional facet name 376 - setting: Optional setting description 377 - audio_duration: Optional total audio duration in seconds (for last segment) 378 - 379 - Returns: 380 - List of created file paths. 381 - """ 382 - created_files = [] 383 - text = _read_transcript(path) 384 - stream_dir = os.path.join(day_dir, stream) 385 - 386 - # Get start time from base_dt for segmentation 387 - start_time = base_dt.strftime("%H:%M:%S") 388 - 389 - # Get segments with their absolute start times 390 - segments = detect_transcript_segment(text, start_time) 391 - 392 - for idx, (start_at, seg_text) in enumerate(segments): 393 - # Convert segment text to structured JSON with absolute timestamps 394 - json_data = detect_transcript_json(seg_text, start_at) 395 - if not json_data: 396 - continue 397 - 398 - # Parse absolute time for segment directory name 399 - time_part = start_at.replace(":", "") # "12:05:30" -> "120530" 400 - 401 - # Compute segment duration from absolute times 402 - start_seconds = _time_to_seconds(start_at) 403 - if idx + 1 < len(segments): 404 - next_start_at, _ = segments[idx + 1] 405 - next_seconds = _time_to_seconds(next_start_at) 406 - duration = next_seconds - start_seconds 407 - else: 408 - # Last segment: use remaining audio duration or default +5s 409 - if audio_duration: 410 - # audio_duration is total length, start_seconds is time-of-day 411 - # Need to calculate offset from recording start 412 - recording_start_seconds = _time_to_seconds(start_time) 413 - segment_offset = start_seconds - recording_start_seconds 414 - duration = audio_duration - segment_offset 415 - else: 416 - duration = 5 417 - 418 - # Negative duration indicates corrupted/invalid timestamp data 419 - if duration < 0: 420 - raise ValueError( 421 - f"Invalid segment duration: {duration}s for segment at {time_part}. " 422 - "Timestamps may be out of order or audio_duration is incorrect." 423 - ) 424 - 425 - # Ensure minimum duration of 1 second 426 - duration = max(1, duration) 427 - 428 - segment_name = f"{time_part}_{duration}" 429 - ts_dir = os.path.join(stream_dir, segment_name) 430 - os.makedirs(ts_dir, exist_ok=True) 431 - json_path = os.path.join(ts_dir, "imported_audio.jsonl") 432 - 433 - _write_import_jsonl( 434 - json_path, 435 - json_data, 436 - import_id=import_id, 437 - raw_filename=os.path.basename(path), 438 - facet=facet, 439 - setting=setting, 440 - ) 441 - logger.info(f"Added transcript segment to journal: {json_path}") 442 - created_files.append(json_path) 443 - 444 - return created_files 445 - 446 - 447 - def _run_import_summary( 448 - import_dir: Path, 449 - day: str, 450 - segments: list[str], 451 - ) -> bool: 452 - """Create a summary for imported segments using cortex generator. 453 - 454 - Args: 455 - import_dir: Directory where the summary will be saved 456 - day: Day string (YYYYMMDD format) 457 - segments: List of segment keys to summarize 458 - 459 - Returns: 460 - True if summary was created successfully, False otherwise 461 - """ 462 - from think.cortex_client import cortex_request, wait_for_agents 463 - 464 - if not segments: 465 - logger.info("No segments to summarize") 466 - return False 467 - 468 - summary_path = import_dir / "summary.md" 469 - 470 - try: 471 - logger.info(f"Creating summary for {len(segments)} segments via cortex") 472 - 473 - # Spawn generator via cortex 474 - agent_id = cortex_request( 475 - prompt="", # Generators don't use prompt 476 - name="importer", 477 - config={ 478 - "day": day, 479 - "span": segments, 480 - "output": "md", 481 - "output_path": str(summary_path), 482 - }, 483 - ) 484 - 485 - if agent_id is None: 486 - logger.error("Failed to send cortex request for import summary") 487 - return False 488 - 489 - # Wait for completion 490 - completed, timed_out = wait_for_agents([agent_id], timeout=300) 491 - 492 - if timed_out: 493 - logger.error(f"Import summary timed out (ID: {agent_id})") 494 - return False 495 - 496 - if completed: 497 - end_state = completed.get(agent_id, "unknown") 498 - if end_state == "finish" and summary_path.exists(): 499 - logger.info(f"Created import summary: {summary_path}") 500 - return True 501 - else: 502 - logger.warning( 503 - f"Generator ended with state {end_state}, " 504 - f"summary exists: {summary_path.exists()}" 505 - ) 506 - return False 507 - 508 - logger.error("Generator did not complete") 509 - return False 510 - 511 - except Exception as e: 512 - logger.error(f"Failed to create summary: {e}") 513 - return False 514 - 515 - 516 - # MIME type mapping for import metadata 517 - _MIME_TYPES = { 518 - ".m4a": "audio/mp4", 519 - ".mp3": "audio/mpeg", 520 - ".wav": "audio/wav", 521 - ".flac": "audio/flac", 522 - ".ogg": "audio/ogg", 523 - ".mp4": "video/mp4", 524 - ".webm": "video/webm", 525 - ".mov": "video/quicktime", 526 - ".txt": "text/plain", 527 - ".md": "text/markdown", 528 - ".pdf": "application/pdf", 529 - } 530 - 531 - 532 - def _is_in_imports(media_path: str) -> bool: 533 - """Check if file path is already under journal/imports/.""" 534 - imports_dir = os.path.join(get_journal(), "imports") 535 - abs_media = os.path.abspath(media_path) 536 - abs_imports = os.path.abspath(imports_dir) 537 - return abs_media.startswith(abs_imports + os.sep) 538 - 539 - 540 - def _setup_import( 541 - media_path: str, 542 - timestamp: str, 543 - facet: str | None, 544 - setting: str | None, 545 - detection_result: dict | None, 546 - force: bool = False, 547 - ) -> str: 548 - """Copy file to imports/ and write metadata. Returns new file path.""" 549 - journal_root = Path(get_journal()) 550 - import_dir = journal_root / "imports" / timestamp 551 - 552 - # Check for conflict 553 - if import_dir.exists(): 554 - if force: 555 - logger.info(f"Removing existing import directory: {import_dir}") 556 - shutil.rmtree(import_dir) 557 - else: 558 - raise SystemExit( 559 - f"Error: Import already exists for timestamp {timestamp}\n" 560 - f"To re-import, use --force to delete existing data and start over" 561 - ) 562 - 563 - # Copy file to imports/ 564 - filename = os.path.basename(media_path) 565 - new_path = save_import_file( 566 - journal_root=journal_root, 567 - timestamp=timestamp, 568 - source_path=Path(media_path), 569 - filename=filename, 570 - ) 571 - 572 - # Build metadata matching app structure 573 - upload_ts = now_ms() 574 - ext = os.path.splitext(filename)[1].lower() 575 - metadata = { 576 - "original_filename": filename, 577 - "upload_timestamp": upload_ts, 578 - "upload_datetime": dt.datetime.fromtimestamp(upload_ts / 1000).isoformat(), 579 - "detection_result": detection_result, 580 - "detected_timestamp": timestamp, 581 - "user_timestamp": timestamp, 582 - "file_size": new_path.stat().st_size if new_path.exists() else 0, 583 - "mime_type": _MIME_TYPES.get(ext, "application/octet-stream"), 584 - "facet": facet, 585 - "setting": setting, 586 - "file_path": str(new_path), 587 - } 588 - 589 - write_import_metadata( 590 - journal_root=journal_root, 591 - timestamp=timestamp, 592 - metadata=metadata, 593 - ) 594 - 595 - logger.info(f"Copied to journal: {new_path}") 596 - return str(new_path) 597 - 598 - 599 - def _format_timestamp_display(timestamp: str) -> str: 600 - """Format timestamp for human-readable display.""" 601 - try: 602 - dt_obj = dt.datetime.strptime(timestamp, "%Y%m%d_%H%M%S") 603 - return dt_obj.strftime("%a %b %d %Y, %-I:%M %p") 604 - except ValueError: 605 - return timestamp 606 - 607 - 608 - def main() -> None: 609 - global _callosum, _message_queue, _import_id, _current_stage, _start_time 610 - global _stage_start_time, _stages_run, _status_thread, _status_running 611 - 612 - parser = argparse.ArgumentParser(description="Chunk a media file into the journal") 613 - parser.add_argument("media", help="Path to video or audio file") 614 - parser.add_argument( 615 - "--timestamp", help="Timestamp YYYYMMDD_HHMMSS for journal entry" 616 - ) 617 - parser.add_argument( 618 - "--summarize", 619 - type=str2bool, 620 - default=True, 621 - help="Create summary.md after transcription completes", 622 - ) 623 - parser.add_argument( 624 - "--facet", 625 - type=str, 626 - default=None, 627 - help="Facet name for this import", 628 - ) 629 - parser.add_argument( 630 - "--setting", 631 - type=str, 632 - default=None, 633 - help="Contextual setting description to store with import metadata", 634 - ) 635 - parser.add_argument( 636 - "--source", 637 - type=str, 638 - default=None, 639 - help="Import source type (apple, plaud, audio, text). Auto-detected if omitted.", 640 - ) 641 - parser.add_argument( 642 - "--skip-summary", 643 - action="store_true", 644 - help="Skip waiting for transcription and summary generation", 645 - ) 646 - parser.add_argument( 647 - "--force", 648 - action="store_true", 649 - help="Force re-import by deleting existing import directory", 650 - ) 651 - parser.add_argument( 652 - "--auto", 653 - action="store_true", 654 - help="Auto-accept detected timestamp and proceed with import", 655 - ) 656 - args, extra = setup_cli(parser, parse_known=True) 657 - if extra and not args.timestamp: 658 - args.timestamp = extra[0] 659 - 660 - # Track detection result for metadata 661 - detection_result = None 662 - 663 - # If no timestamp provided, detect it 664 - if not args.timestamp: 665 - # Pass the original filename for better detection 666 - detection_result = detect_created( 667 - args.media, original_filename=os.path.basename(args.media) 668 - ) 669 - if ( 670 - detection_result 671 - and detection_result.get("day") 672 - and detection_result.get("time") 673 - ): 674 - detected_timestamp = f"{detection_result['day']}_{detection_result['time']}" 675 - display = _format_timestamp_display(detected_timestamp) 676 - if args.auto: 677 - print( 678 - f"Detected timestamp: {detected_timestamp} ({display}) — auto-importing" 679 - ) 680 - args.timestamp = detected_timestamp 681 - else: 682 - print(f"Detected timestamp: {detected_timestamp} ({display})") 683 - print("\nRun:") 684 - print(f" sol import {args.media} --timestamp {detected_timestamp}") 685 - return 686 - else: 687 - raise SystemExit( 688 - "Could not detect timestamp. Please provide --timestamp YYYYMMDD_HHMMSS" 689 - ) 690 - 691 - if not TIME_RE.fullmatch(args.timestamp): 692 - raise SystemExit("timestamp must be in YYYYMMDD_HHMMSS format") 693 - 694 - # Check if file needs setup (not already in imports/) 695 - needs_setup = not _is_in_imports(args.media) 696 - 697 - # Copy to imports/ if file is not already there 698 - if needs_setup: 699 - args.media = _setup_import( 700 - args.media, 701 - args.timestamp, 702 - args.facet, 703 - args.setting, 704 - detection_result, 705 - force=args.force, 706 - ) 707 - print("Starting import...") 708 - 709 - base_dt = dt.datetime.strptime(args.timestamp, "%Y%m%d_%H%M%S") 710 - day = base_dt.strftime("%Y%m%d") 711 - logger.info(f"Using provided timestamp: {args.timestamp}") 712 - day_dir = str(day_path(day)) 713 - 714 - # Derive stream identity for this import 715 - if args.source: 716 - import_source = args.source 717 - else: 718 - # Auto-detect from file extension 719 - _ext = os.path.splitext(args.media)[1].lower() 720 - if _ext == ".m4a": 721 - import_source = "apple" 722 - elif _ext in {".txt", ".md", ".pdf"}: 723 - import_source = "text" 724 - else: 725 - import_source = "audio" 726 - stream = stream_name(import_source=import_source) 727 - 728 - # Initialize importer tract state 729 - _import_id = args.timestamp 730 - _start_time = time.monotonic() 731 - _stage_start_time = _start_time 732 - _current_stage = "initialization" 733 - _stages_run = ["initialization"] 734 - 735 - # Start Callosum connection with message queue for receiving events 736 - _message_queue = queue.Queue() 737 - _callosum = CallosumConnection(defaults={"rev": get_rev()}) 738 - _callosum.start(callback=lambda msg: _message_queue.put(msg)) 739 - 740 - # Start status emitter thread 741 - _status_running = True 742 - _status_thread = threading.Thread(target=_status_emitter, daemon=True) 743 - _status_thread.start() 744 - 745 - # Emit started event 746 - ext = os.path.splitext(args.media)[1].lower() 747 - _callosum.emit( 748 - "importer", 749 - "started", 750 - import_id=_import_id, 751 - input_file=os.path.basename(args.media), 752 - file_type=ext.lstrip("."), 753 - day=day, 754 - facet=args.facet, 755 - setting=args.setting, 756 - options={ 757 - "summarize": args.summarize, 758 - "skip_summary": args.skip_summary, 759 - }, 760 - stage=_current_stage, 761 - stream=stream, 762 - ) 763 - 764 - # Track all created files and processing metadata 765 - all_created_files: list[str] = [] 766 - created_segments: list[str] = [] 767 - journal_root = Path(get_journal()) 768 - processing_results = { 769 - "processed_timestamp": args.timestamp, 770 - "target_day": base_dt.strftime("%Y%m%d"), 771 - "target_day_path": day_dir, 772 - "input_file": args.media, 773 - "processing_started": dt.datetime.now().isoformat(), 774 - "facet": args.facet, 775 - "setting": args.setting, 776 - "outputs": [], 777 - } 778 - 779 - # Get parent directory for saving metadata 780 - media_path = Path(args.media) 781 - import_dir = media_path.parent 782 - failed_segments: list[str] = [] 783 - 784 - try: 785 - if ext in {".txt", ".md", ".pdf"}: 786 - # Text transcript processing — no observe pipeline 787 - _set_stage("segmenting") 788 - 789 - created_files = process_transcript( 790 - args.media, 791 - day_dir, 792 - base_dt, 793 - import_id=args.timestamp, 794 - stream=stream, 795 - facet=args.facet, 796 - setting=args.setting, 797 - ) 798 - all_created_files.extend(created_files) 799 - processing_results["outputs"].append( 800 - { 801 - "type": "transcript", 802 - "format": "imported_audio.jsonl", 803 - "description": "Transcript segments", 804 - "files": created_files, 805 - "count": len(created_files), 806 - } 807 - ) 808 - 809 - # Extract segment keys for text imports 810 - for file_path in created_files: 811 - seg = segment_key(file_path) 812 - if seg and seg not in created_segments: 813 - created_segments.append(seg) 814 - 815 - # Write stream markers for text import segments 816 - for seg in created_segments: 817 - try: 818 - seg_dir = day_path(day) / stream / seg 819 - result = update_stream(stream, day, seg, type="import", host=None) 820 - write_segment_stream( 821 - seg_dir, 822 - stream, 823 - result["prev_day"], 824 - result["prev_segment"], 825 - result["seq"], 826 - ) 827 - except Exception as e: 828 - logger.warning(f"Failed to write stream identity: {e}") 829 - 830 - # Emit observe.observed for text imports (already processed) 831 - for seg in created_segments: 832 - _callosum.emit( 833 - "observe", "observed", segment=seg, day=day, stream=stream 834 - ) 835 - logger.info(f"Emitted observe.observed for segment: {day}/{seg}") 836 - 837 - else: 838 - # Audio processing via observe pipeline 839 - _set_stage("segmenting") 840 - 841 - # Prepare audio segments (slice into 5-minute chunks) 842 - segments = prepare_audio_segments( 843 - args.media, 844 - day_dir, 845 - base_dt, 846 - args.timestamp, 847 - stream, 848 - ) 849 - 850 - if not segments: 851 - raise RuntimeError("No segments created from audio file") 852 - 853 - # Track created files and segment keys, write stream markers 854 - for seg_key, seg_dir, files in segments: 855 - created_segments.append(seg_key) 856 - for f in files: 857 - all_created_files.append(str(seg_dir / f)) 858 - try: 859 - result = update_stream( 860 - stream, day, seg_key, type="import", host=None 861 - ) 862 - write_segment_stream( 863 - seg_dir, 864 - stream, 865 - result["prev_day"], 866 - result["prev_segment"], 867 - result["seq"], 868 - ) 869 - except Exception as e: 870 - logger.warning(f"Failed to write stream identity: {e}") 871 - 872 - # Save segment list for tracking 873 - save_import_segments(journal_root, args.timestamp, created_segments, day) 874 - 875 - processing_results["outputs"].append( 876 - { 877 - "type": "audio_segments", 878 - "description": "Audio segments queued for transcription", 879 - "segments": created_segments, 880 - "count": len(created_segments), 881 - } 882 - ) 883 - 884 - # Build meta dict for observe.observing events 885 - meta: dict[str, str] = {"import_id": args.timestamp, "stream": stream} 886 - if args.facet: 887 - meta["facet"] = args.facet 888 - if args.setting: 889 - meta["setting"] = args.setting 890 - 891 - # Emit observe.observing per segment to trigger sense.py transcription 892 - for seg_key, seg_dir, files in segments: 893 - _callosum.emit( 894 - "observe", 895 - "observing", 896 - segment=seg_key, 897 - day=day, 898 - files=files, 899 - meta=meta, 900 - stream=stream, 901 - ) 902 - logger.info(f"Emitted observe.observing for segment: {day}/{seg_key}") 903 - 904 - # Wait for transcription to complete (unless --no-wait) 905 - if not args.skip_summary: 906 - _set_stage("transcribing") 907 - pending = set(created_segments) 908 - segment_timeout = 600 # 10 minutes since last progress 909 - last_progress = time.monotonic() 910 - 911 - logger.info(f"Waiting for {len(pending)} segments to complete") 912 - 913 - while pending: 914 - # Check for timeout since last progress 915 - if time.monotonic() - last_progress > segment_timeout: 916 - timed_out = sorted(pending) 917 - logger.error(f"Timed out waiting for segments: {timed_out}") 918 - failed_segments.extend(timed_out) 919 - break 920 - 921 - # Poll for observe.observed events from message queue 922 - try: 923 - msg = _message_queue.get(timeout=5.0) 924 - except queue.Empty: 925 - continue 926 - 927 - tract = msg.get("tract") 928 - event = msg.get("event") 929 - seg = msg.get("segment") 930 - 931 - if tract == "observe" and event == "observed" and seg in pending: 932 - pending.discard(seg) 933 - last_progress = time.monotonic() 934 - if msg.get("error"): 935 - errors = msg.get("errors", []) 936 - logger.warning( 937 - f"Segment {seg} failed: {errors} " 938 - f"({len(pending)} remaining)" 939 - ) 940 - failed_segments.append(seg) 941 - else: 942 - logger.info( 943 - f"Segment {seg} transcribed " 944 - f"({len(pending)} remaining)" 945 - ) 946 - 947 - if failed_segments: 948 - logger.warning( 949 - f"{len(failed_segments)} of {len(created_segments)} " 950 - f"segments failed: {failed_segments}" 951 - ) 952 - else: 953 - logger.info("All segments transcribed successfully") 954 - 955 - # Complete processing metadata 956 - processing_results["processing_completed"] = dt.datetime.now().isoformat() 957 - processing_results["total_files_created"] = len(all_created_files) 958 - processing_results["all_created_files"] = all_created_files 959 - processing_results["segments"] = created_segments 960 - if failed_segments: 961 - processing_results["failed_segments"] = failed_segments 962 - 963 - # Write imported.json with all processing metadata 964 - imported_path = import_dir / "imported.json" 965 - try: 966 - with open(imported_path, "w", encoding="utf-8") as f: 967 - json.dump(processing_results, f, indent=2) 968 - logger.info(f"Saved import processing metadata: {imported_path}") 969 - except Exception as e: 970 - logger.warning(f"Failed to save imported.json: {e}") 971 - 972 - # Update import.json with processing summary if it exists 973 - import_metadata_path = import_dir / "import.json" 974 - if import_metadata_path.exists(): 975 - try: 976 - with open(import_metadata_path, "r", encoding="utf-8") as f: 977 - import_meta = json.load(f) 978 - import_meta["processing_completed"] = processing_results[ 979 - "processing_completed" 980 - ] 981 - import_meta["total_files_created"] = processing_results[ 982 - "total_files_created" 983 - ] 984 - import_meta["imported_json_path"] = str(imported_path) 985 - import_meta["segments"] = created_segments 986 - with open(import_metadata_path, "w", encoding="utf-8") as f: 987 - json.dump(import_meta, f, indent=2) 988 - logger.info(f"Updated import metadata: {import_metadata_path}") 989 - except Exception as e: 990 - logger.warning(f"Failed to update import metadata: {e}") 991 - 992 - # Create summary if requested and we have segments 993 - if args.summarize and created_segments and not args.skip_summary: 994 - _set_stage("summarizing") 995 - _run_import_summary(import_dir, day, created_segments) 996 - 997 - # Emit completed event 998 - duration_ms = int((time.monotonic() - _start_time) * 1000) 999 - output_files_relative = [_get_relative_path(f) for f in all_created_files] 1000 - metadata_file_relative = _get_relative_path(str(imported_path)) 1001 - 1002 - _callosum.emit( 1003 - "importer", 1004 - "completed", 1005 - import_id=_import_id, 1006 - stage=_current_stage, 1007 - duration_ms=duration_ms, 1008 - total_files_created=len(all_created_files), 1009 - output_files=output_files_relative, 1010 - metadata_file=metadata_file_relative, 1011 - stages_run=_stages_run, 1012 - segments=created_segments, 1013 - stream=stream, 1014 - ) 1015 - 1016 - except Exception as e: 1017 - # Write error state to imported.json for persistent failure tracking 1018 - duration_ms = int((time.monotonic() - _start_time) * 1000) 1019 - partial_outputs = [_get_relative_path(f) for f in all_created_files] 1020 - imported_path = import_dir / "imported.json" 1021 - 1022 - error_results = { 1023 - **processing_results, # Include all the metadata we have 1024 - "processing_failed": dt.datetime.now().isoformat(), 1025 - "error": str(e), 1026 - "error_stage": _current_stage, 1027 - "duration_ms": duration_ms, 1028 - "total_files_created": len(all_created_files), 1029 - "all_created_files": all_created_files, 1030 - "stages_run": _stages_run, 1031 - } 1032 - 1033 - try: 1034 - with open(imported_path, "w", encoding="utf-8") as f: 1035 - json.dump(error_results, f, indent=2) 1036 - logger.info(f"Saved error state: {imported_path}") 1037 - except Exception as write_err: 1038 - logger.warning(f"Failed to write error state: {write_err}") 1039 - 1040 - # Emit error event 1041 - if _callosum: 1042 - _callosum.emit( 1043 - "importer", 1044 - "error", 1045 - import_id=_import_id, 1046 - stage=_current_stage, 1047 - error=str(e), 1048 - duration_ms=duration_ms, 1049 - partial_outputs=partial_outputs, 1050 - ) 1051 - 1052 - logger.error(f"Import failed: {e}") 1053 - raise 1054 - 1055 - finally: 1056 - # Stop status thread 1057 - _status_running = False 1058 - if _status_thread: 1059 - _status_thread.join(timeout=6) 1060 - 1061 - 1062 - if __name__ == "__main__": 1063 - main()
think/importer_utils.py think/importers/utils.py
+2
think/importers/__init__.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc
+192
think/importers/audio.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import datetime as dt 5 + import logging 6 + import subprocess 7 + from datetime import timedelta 8 + from pathlib import Path 9 + 10 + from observe.utils import find_available_segment 11 + 12 + logger = logging.getLogger(__name__) 13 + 14 + 15 + def slice_audio_segment( 16 + source_path: str, 17 + output_path: str, 18 + start_seconds: float, 19 + duration_seconds: float, 20 + ) -> str: 21 + """Extract an audio segment from source file, preserving original format. 22 + 23 + Uses stream copy for lossless extraction when possible. 24 + 25 + Args: 26 + source_path: Path to source audio file 27 + output_path: Path for output segment file 28 + start_seconds: Start offset in seconds 29 + duration_seconds: Duration to extract in seconds 30 + 31 + Returns: 32 + Output path on success 33 + 34 + Raises: 35 + subprocess.CalledProcessError: If ffmpeg fails 36 + """ 37 + cmd = [ 38 + "ffmpeg", 39 + "-ss", 40 + str(start_seconds), 41 + "-i", 42 + source_path, 43 + "-t", 44 + str(duration_seconds), 45 + "-vn", # No video 46 + "-c:a", 47 + "copy", # Stream copy for lossless extraction 48 + "-y", # Overwrite output 49 + output_path, 50 + ] 51 + try: 52 + subprocess.run(cmd, check=True, capture_output=True, text=True) 53 + except subprocess.CalledProcessError: 54 + # Fallback: re-encode if stream copy fails (some formats don't support it) 55 + logger.debug(f"Stream copy failed, re-encoding: {output_path}") 56 + cmd_reencode = [ 57 + "ffmpeg", 58 + "-ss", 59 + str(start_seconds), 60 + "-i", 61 + source_path, 62 + "-t", 63 + str(duration_seconds), 64 + "-vn", 65 + "-y", 66 + output_path, 67 + ] 68 + subprocess.run(cmd_reencode, check=True, capture_output=True, text=True) 69 + 70 + logger.info(f"Created audio segment: {output_path}") 71 + return output_path 72 + 73 + 74 + def _get_audio_duration(audio_path: str) -> float | None: 75 + """Get audio duration in seconds using ffprobe. 76 + 77 + Args: 78 + audio_path: Path to audio file 79 + 80 + Returns: 81 + Duration in seconds, or None if unable to determine 82 + """ 83 + try: 84 + cmd = [ 85 + "ffprobe", 86 + "-v", 87 + "error", 88 + "-show_entries", 89 + "format=duration", 90 + "-of", 91 + "default=noprint_wrappers=1:nokey=1", 92 + audio_path, 93 + ] 94 + result = subprocess.run(cmd, capture_output=True, text=True, check=True) 95 + return float(result.stdout.strip()) 96 + except (subprocess.CalledProcessError, ValueError) as e: 97 + logger.warning(f"Could not determine audio duration: {e}") 98 + return None 99 + 100 + 101 + def prepare_audio_segments( 102 + media_path: str, 103 + day_dir: str, 104 + base_dt: dt.datetime, 105 + import_id: str, 106 + stream: str, 107 + ) -> list[tuple[str, Path, list[str]]]: 108 + """Slice audio into 5-minute segments for observe pipeline. 109 + 110 + Creates segment directories with audio slices, ready for transcription 111 + via observe.observing events. 112 + 113 + Args: 114 + media_path: Path to source audio file 115 + day_dir: Day directory path (YYYYMMDD) 116 + base_dt: Base datetime for timestamp calculation 117 + import_id: Import identifier 118 + stream: Stream name for directory layout (day/stream/segment/) 119 + 120 + Returns: 121 + List of (segment_key, segment_dir, files_list) tuples 122 + where files_list contains the audio filename(s) created 123 + """ 124 + media = Path(media_path) 125 + source_ext = media.suffix.lower() 126 + stream_dir = Path(day_dir) / stream 127 + 128 + # Get audio duration to calculate number of segments 129 + duration = _get_audio_duration(media_path) 130 + if duration is None: 131 + raise RuntimeError(f"Could not determine duration of {media_path}") 132 + 133 + # Calculate number of 5-minute segments (ceiling division) 134 + segment_duration = 300 # 5 minutes 135 + num_segments = int((duration + segment_duration - 1) // segment_duration) 136 + if num_segments == 0: 137 + num_segments = 1 # At least one segment for very short audio 138 + 139 + segments: list[tuple[str, Path, list[str]]] = [] 140 + 141 + for chunk_index in range(num_segments): 142 + # Calculate timestamp for this segment 143 + ts = base_dt + timedelta(minutes=chunk_index * 5) 144 + time_part = ts.strftime("%H%M%S") 145 + 146 + # Create segment key with 5-minute duration 147 + segment_key_candidate = f"{time_part}_{segment_duration}" 148 + 149 + # Check for collision and deconflict if needed 150 + available_key = find_available_segment(stream_dir, segment_key_candidate) 151 + if available_key is None: 152 + logger.warning( 153 + f"Could not find available segment key near {segment_key_candidate}" 154 + ) 155 + continue 156 + 157 + if available_key != segment_key_candidate: 158 + logger.info( 159 + f"Segment collision: {segment_key_candidate} -> {available_key}" 160 + ) 161 + 162 + # Create segment directory under stream 163 + segment_dir = stream_dir / available_key 164 + segment_dir.mkdir(parents=True, exist_ok=True) 165 + 166 + # Slice audio for this segment 167 + audio_filename = f"imported_audio{source_ext}" 168 + audio_path = segment_dir / audio_filename 169 + start_seconds = chunk_index * segment_duration 170 + 171 + # For the last segment, use remaining duration 172 + if chunk_index == num_segments - 1: 173 + chunk_duration = duration - start_seconds 174 + else: 175 + chunk_duration = segment_duration 176 + 177 + try: 178 + slice_audio_segment( 179 + media_path, 180 + str(audio_path), 181 + start_seconds, 182 + chunk_duration, 183 + ) 184 + segments.append((available_key, segment_dir, [audio_filename])) 185 + logger.info(f"Created segment: {available_key} with {audio_filename}") 186 + except subprocess.CalledProcessError as e: 187 + logger.warning(f"Failed to slice segment {available_key}: {e}") 188 + # Clean up empty directory 189 + if segment_dir.exists() and not any(segment_dir.iterdir()): 190 + segment_dir.rmdir() 191 + 192 + return segments
+547
think/importers/cli.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import argparse 5 + import datetime as dt 6 + import json 7 + import logging 8 + import os 9 + import queue 10 + import re 11 + import threading 12 + import time 13 + from pathlib import Path 14 + 15 + from think.callosum import CallosumConnection 16 + from think.detect_created import detect_created 17 + from think.importers.audio import prepare_audio_segments 18 + from think.importers.shared import ( 19 + _get_relative_path, 20 + _is_in_imports, 21 + _run_import_summary, 22 + _setup_import, 23 + ) 24 + from think.importers.text import process_transcript 25 + from think.importers.utils import save_import_segments 26 + from think.streams import stream_name, update_stream, write_segment_stream 27 + from think.utils import day_path, get_journal, get_rev, segment_key, setup_cli 28 + 29 + logger = logging.getLogger(__name__) 30 + 31 + TIME_RE = re.compile(r"\d{8}_\d{6}") 32 + 33 + # Importer tract state 34 + _callosum: CallosumConnection | None = None 35 + _message_queue: queue.Queue | None = None 36 + _import_id: str | None = None 37 + _current_stage: str = "initialization" 38 + _start_time: float = 0.0 39 + _stage_start_time: float = 0.0 40 + _stages_run: list[str] = [] 41 + _status_thread: threading.Thread | None = None 42 + _status_running: bool = False 43 + 44 + 45 + def _set_stage(stage: str) -> None: 46 + """Update current stage and track timing.""" 47 + global _current_stage, _stage_start_time 48 + _current_stage = stage 49 + _stage_start_time = time.monotonic() 50 + if stage not in _stages_run: 51 + _stages_run.append(stage) 52 + logger.debug(f"Stage changed to: {stage}") 53 + 54 + 55 + def _status_emitter() -> None: 56 + """Background thread that emits status events every 5 seconds.""" 57 + while _status_running: 58 + if _callosum and _import_id: 59 + elapsed_ms = int((time.monotonic() - _start_time) * 1000) 60 + stage_elapsed_ms = int((time.monotonic() - _stage_start_time) * 1000) 61 + _callosum.emit( 62 + "importer", 63 + "status", 64 + import_id=_import_id, 65 + stage=_current_stage, 66 + elapsed_ms=elapsed_ms, 67 + stage_elapsed_ms=stage_elapsed_ms, 68 + ) 69 + time.sleep(5) 70 + 71 + 72 + def str2bool(value: str) -> bool: 73 + if isinstance(value, bool): 74 + return value 75 + val = value.lower() 76 + if val in {"y", "yes", "true", "t", "1"}: 77 + return True 78 + if val in {"n", "no", "false", "f", "0"}: 79 + return False 80 + raise argparse.ArgumentTypeError("boolean value expected") 81 + 82 + 83 + def _format_timestamp_display(timestamp: str) -> str: 84 + """Format timestamp for human-readable display.""" 85 + try: 86 + dt_obj = dt.datetime.strptime(timestamp, "%Y%m%d_%H%M%S") 87 + return dt_obj.strftime("%a %b %d %Y, %-I:%M %p") 88 + except ValueError: 89 + return timestamp 90 + 91 + 92 + def main() -> None: 93 + global _callosum, _message_queue, _import_id, _current_stage, _start_time 94 + global _stage_start_time, _stages_run, _status_thread, _status_running 95 + 96 + parser = argparse.ArgumentParser(description="Chunk a media file into the journal") 97 + parser.add_argument("media", help="Path to video or audio file") 98 + parser.add_argument( 99 + "--timestamp", help="Timestamp YYYYMMDD_HHMMSS for journal entry" 100 + ) 101 + parser.add_argument( 102 + "--summarize", 103 + type=str2bool, 104 + default=True, 105 + help="Create summary.md after transcription completes", 106 + ) 107 + parser.add_argument( 108 + "--facet", 109 + type=str, 110 + default=None, 111 + help="Facet name for this import", 112 + ) 113 + parser.add_argument( 114 + "--setting", 115 + type=str, 116 + default=None, 117 + help="Contextual setting description to store with import metadata", 118 + ) 119 + parser.add_argument( 120 + "--source", 121 + type=str, 122 + default=None, 123 + help="Import source type (apple, plaud, audio, text). Auto-detected if omitted.", 124 + ) 125 + parser.add_argument( 126 + "--skip-summary", 127 + action="store_true", 128 + help="Skip waiting for transcription and summary generation", 129 + ) 130 + parser.add_argument( 131 + "--force", 132 + action="store_true", 133 + help="Force re-import by deleting existing import directory", 134 + ) 135 + parser.add_argument( 136 + "--auto", 137 + action="store_true", 138 + help="Auto-accept detected timestamp and proceed with import", 139 + ) 140 + args, extra = setup_cli(parser, parse_known=True) 141 + if extra and not args.timestamp: 142 + args.timestamp = extra[0] 143 + 144 + # Track detection result for metadata 145 + detection_result = None 146 + 147 + # If no timestamp provided, detect it 148 + if not args.timestamp: 149 + # Pass the original filename for better detection 150 + detection_result = detect_created( 151 + args.media, original_filename=os.path.basename(args.media) 152 + ) 153 + if ( 154 + detection_result 155 + and detection_result.get("day") 156 + and detection_result.get("time") 157 + ): 158 + detected_timestamp = f"{detection_result['day']}_{detection_result['time']}" 159 + display = _format_timestamp_display(detected_timestamp) 160 + if args.auto: 161 + print( 162 + f"Detected timestamp: {detected_timestamp} ({display}) — auto-importing" 163 + ) 164 + args.timestamp = detected_timestamp 165 + else: 166 + print(f"Detected timestamp: {detected_timestamp} ({display})") 167 + print("\nRun:") 168 + print(f" sol import {args.media} --timestamp {detected_timestamp}") 169 + return 170 + else: 171 + raise SystemExit( 172 + "Could not detect timestamp. Please provide --timestamp YYYYMMDD_HHMMSS" 173 + ) 174 + 175 + if not TIME_RE.fullmatch(args.timestamp): 176 + raise SystemExit("timestamp must be in YYYYMMDD_HHMMSS format") 177 + 178 + # Check if file needs setup (not already in imports/) 179 + needs_setup = not _is_in_imports(args.media) 180 + 181 + # Copy to imports/ if file is not already there 182 + if needs_setup: 183 + args.media = _setup_import( 184 + args.media, 185 + args.timestamp, 186 + args.facet, 187 + args.setting, 188 + detection_result, 189 + force=args.force, 190 + ) 191 + print("Starting import...") 192 + 193 + base_dt = dt.datetime.strptime(args.timestamp, "%Y%m%d_%H%M%S") 194 + day = base_dt.strftime("%Y%m%d") 195 + logger.info(f"Using provided timestamp: {args.timestamp}") 196 + day_dir = str(day_path(day)) 197 + 198 + # Derive stream identity for this import 199 + if args.source: 200 + import_source = args.source 201 + else: 202 + # Auto-detect from file extension 203 + _ext = os.path.splitext(args.media)[1].lower() 204 + if _ext == ".m4a": 205 + import_source = "apple" 206 + elif _ext in {".txt", ".md", ".pdf"}: 207 + import_source = "text" 208 + else: 209 + import_source = "audio" 210 + stream = stream_name(import_source=import_source) 211 + 212 + # Initialize importer tract state 213 + _import_id = args.timestamp 214 + _start_time = time.monotonic() 215 + _stage_start_time = _start_time 216 + _current_stage = "initialization" 217 + _stages_run = ["initialization"] 218 + 219 + # Start Callosum connection with message queue for receiving events 220 + _message_queue = queue.Queue() 221 + _callosum = CallosumConnection(defaults={"rev": get_rev()}) 222 + _callosum.start(callback=lambda msg: _message_queue.put(msg)) 223 + 224 + # Start status emitter thread 225 + _status_running = True 226 + _status_thread = threading.Thread(target=_status_emitter, daemon=True) 227 + _status_thread.start() 228 + 229 + # Emit started event 230 + ext = os.path.splitext(args.media)[1].lower() 231 + _callosum.emit( 232 + "importer", 233 + "started", 234 + import_id=_import_id, 235 + input_file=os.path.basename(args.media), 236 + file_type=ext.lstrip("."), 237 + day=day, 238 + facet=args.facet, 239 + setting=args.setting, 240 + options={ 241 + "summarize": args.summarize, 242 + "skip_summary": args.skip_summary, 243 + }, 244 + stage=_current_stage, 245 + stream=stream, 246 + ) 247 + 248 + # Track all created files and processing metadata 249 + all_created_files: list[str] = [] 250 + created_segments: list[str] = [] 251 + journal_root = Path(get_journal()) 252 + processing_results = { 253 + "processed_timestamp": args.timestamp, 254 + "target_day": base_dt.strftime("%Y%m%d"), 255 + "target_day_path": day_dir, 256 + "input_file": args.media, 257 + "processing_started": dt.datetime.now().isoformat(), 258 + "facet": args.facet, 259 + "setting": args.setting, 260 + "outputs": [], 261 + } 262 + 263 + # Get parent directory for saving metadata 264 + media_path = Path(args.media) 265 + import_dir = media_path.parent 266 + failed_segments: list[str] = [] 267 + 268 + try: 269 + if ext in {".txt", ".md", ".pdf"}: 270 + # Text transcript processing — no observe pipeline 271 + _set_stage("segmenting") 272 + 273 + created_files = process_transcript( 274 + args.media, 275 + day_dir, 276 + base_dt, 277 + import_id=args.timestamp, 278 + stream=stream, 279 + facet=args.facet, 280 + setting=args.setting, 281 + ) 282 + all_created_files.extend(created_files) 283 + processing_results["outputs"].append( 284 + { 285 + "type": "transcript", 286 + "format": "imported_audio.jsonl", 287 + "description": "Transcript segments", 288 + "files": created_files, 289 + "count": len(created_files), 290 + } 291 + ) 292 + 293 + # Extract segment keys for text imports 294 + for file_path in created_files: 295 + seg = segment_key(file_path) 296 + if seg and seg not in created_segments: 297 + created_segments.append(seg) 298 + 299 + # Write stream markers for text import segments 300 + for seg in created_segments: 301 + try: 302 + seg_dir = day_path(day) / stream / seg 303 + result = update_stream(stream, day, seg, type="import", host=None) 304 + write_segment_stream( 305 + seg_dir, 306 + stream, 307 + result["prev_day"], 308 + result["prev_segment"], 309 + result["seq"], 310 + ) 311 + except Exception as e: 312 + logger.warning(f"Failed to write stream identity: {e}") 313 + 314 + # Emit observe.observed for text imports (already processed) 315 + for seg in created_segments: 316 + _callosum.emit( 317 + "observe", "observed", segment=seg, day=day, stream=stream 318 + ) 319 + logger.info(f"Emitted observe.observed for segment: {day}/{seg}") 320 + 321 + else: 322 + # Audio processing via observe pipeline 323 + _set_stage("segmenting") 324 + 325 + # Prepare audio segments (slice into 5-minute chunks) 326 + segments = prepare_audio_segments( 327 + args.media, 328 + day_dir, 329 + base_dt, 330 + args.timestamp, 331 + stream, 332 + ) 333 + 334 + if not segments: 335 + raise RuntimeError("No segments created from audio file") 336 + 337 + # Track created files and segment keys, write stream markers 338 + for seg_key, seg_dir, files in segments: 339 + created_segments.append(seg_key) 340 + for f in files: 341 + all_created_files.append(str(seg_dir / f)) 342 + try: 343 + result = update_stream( 344 + stream, day, seg_key, type="import", host=None 345 + ) 346 + write_segment_stream( 347 + seg_dir, 348 + stream, 349 + result["prev_day"], 350 + result["prev_segment"], 351 + result["seq"], 352 + ) 353 + except Exception as e: 354 + logger.warning(f"Failed to write stream identity: {e}") 355 + 356 + # Save segment list for tracking 357 + save_import_segments(journal_root, args.timestamp, created_segments, day) 358 + 359 + processing_results["outputs"].append( 360 + { 361 + "type": "audio_segments", 362 + "description": "Audio segments queued for transcription", 363 + "segments": created_segments, 364 + "count": len(created_segments), 365 + } 366 + ) 367 + 368 + # Build meta dict for observe.observing events 369 + meta: dict[str, str] = {"import_id": args.timestamp, "stream": stream} 370 + if args.facet: 371 + meta["facet"] = args.facet 372 + if args.setting: 373 + meta["setting"] = args.setting 374 + 375 + # Emit observe.observing per segment to trigger sense.py transcription 376 + for seg_key, seg_dir, files in segments: 377 + _callosum.emit( 378 + "observe", 379 + "observing", 380 + segment=seg_key, 381 + day=day, 382 + files=files, 383 + meta=meta, 384 + stream=stream, 385 + ) 386 + logger.info(f"Emitted observe.observing for segment: {day}/{seg_key}") 387 + 388 + # Wait for transcription to complete (unless --no-wait) 389 + if not args.skip_summary: 390 + _set_stage("transcribing") 391 + pending = set(created_segments) 392 + segment_timeout = 600 # 10 minutes since last progress 393 + last_progress = time.monotonic() 394 + 395 + logger.info(f"Waiting for {len(pending)} segments to complete") 396 + 397 + while pending: 398 + # Check for timeout since last progress 399 + if time.monotonic() - last_progress > segment_timeout: 400 + timed_out = sorted(pending) 401 + logger.error(f"Timed out waiting for segments: {timed_out}") 402 + failed_segments.extend(timed_out) 403 + break 404 + 405 + # Poll for observe.observed events from message queue 406 + try: 407 + msg = _message_queue.get(timeout=5.0) 408 + except queue.Empty: 409 + continue 410 + 411 + tract = msg.get("tract") 412 + event = msg.get("event") 413 + seg = msg.get("segment") 414 + 415 + if tract == "observe" and event == "observed" and seg in pending: 416 + pending.discard(seg) 417 + last_progress = time.monotonic() 418 + if msg.get("error"): 419 + errors = msg.get("errors", []) 420 + logger.warning( 421 + f"Segment {seg} failed: {errors} " 422 + f"({len(pending)} remaining)" 423 + ) 424 + failed_segments.append(seg) 425 + else: 426 + logger.info( 427 + f"Segment {seg} transcribed " 428 + f"({len(pending)} remaining)" 429 + ) 430 + 431 + if failed_segments: 432 + logger.warning( 433 + f"{len(failed_segments)} of {len(created_segments)} " 434 + f"segments failed: {failed_segments}" 435 + ) 436 + else: 437 + logger.info("All segments transcribed successfully") 438 + 439 + # Complete processing metadata 440 + processing_results["processing_completed"] = dt.datetime.now().isoformat() 441 + processing_results["total_files_created"] = len(all_created_files) 442 + processing_results["all_created_files"] = all_created_files 443 + processing_results["segments"] = created_segments 444 + if failed_segments: 445 + processing_results["failed_segments"] = failed_segments 446 + 447 + # Write imported.json with all processing metadata 448 + imported_path = import_dir / "imported.json" 449 + try: 450 + with open(imported_path, "w", encoding="utf-8") as f: 451 + json.dump(processing_results, f, indent=2) 452 + logger.info(f"Saved import processing metadata: {imported_path}") 453 + except Exception as e: 454 + logger.warning(f"Failed to save imported.json: {e}") 455 + 456 + # Update import.json with processing summary if it exists 457 + import_metadata_path = import_dir / "import.json" 458 + if import_metadata_path.exists(): 459 + try: 460 + with open(import_metadata_path, "r", encoding="utf-8") as f: 461 + import_meta = json.load(f) 462 + import_meta["processing_completed"] = processing_results[ 463 + "processing_completed" 464 + ] 465 + import_meta["total_files_created"] = processing_results[ 466 + "total_files_created" 467 + ] 468 + import_meta["imported_json_path"] = str(imported_path) 469 + import_meta["segments"] = created_segments 470 + with open(import_metadata_path, "w", encoding="utf-8") as f: 471 + json.dump(import_meta, f, indent=2) 472 + logger.info(f"Updated import metadata: {import_metadata_path}") 473 + except Exception as e: 474 + logger.warning(f"Failed to update import metadata: {e}") 475 + 476 + # Create summary if requested and we have segments 477 + if args.summarize and created_segments and not args.skip_summary: 478 + _set_stage("summarizing") 479 + _run_import_summary(import_dir, day, created_segments) 480 + 481 + # Emit completed event 482 + duration_ms = int((time.monotonic() - _start_time) * 1000) 483 + output_files_relative = [_get_relative_path(f) for f in all_created_files] 484 + metadata_file_relative = _get_relative_path(str(imported_path)) 485 + 486 + _callosum.emit( 487 + "importer", 488 + "completed", 489 + import_id=_import_id, 490 + stage=_current_stage, 491 + duration_ms=duration_ms, 492 + total_files_created=len(all_created_files), 493 + output_files=output_files_relative, 494 + metadata_file=metadata_file_relative, 495 + stages_run=_stages_run, 496 + segments=created_segments, 497 + stream=stream, 498 + ) 499 + 500 + except Exception as e: 501 + # Write error state to imported.json for persistent failure tracking 502 + duration_ms = int((time.monotonic() - _start_time) * 1000) 503 + partial_outputs = [_get_relative_path(f) for f in all_created_files] 504 + imported_path = import_dir / "imported.json" 505 + 506 + error_results = { 507 + **processing_results, # Include all the metadata we have 508 + "processing_failed": dt.datetime.now().isoformat(), 509 + "error": str(e), 510 + "error_stage": _current_stage, 511 + "duration_ms": duration_ms, 512 + "total_files_created": len(all_created_files), 513 + "all_created_files": all_created_files, 514 + "stages_run": _stages_run, 515 + } 516 + 517 + try: 518 + with open(imported_path, "w", encoding="utf-8") as f: 519 + json.dump(error_results, f, indent=2) 520 + logger.info(f"Saved error state: {imported_path}") 521 + except Exception as write_err: 522 + logger.warning(f"Failed to write error state: {write_err}") 523 + 524 + # Emit error event 525 + if _callosum: 526 + _callosum.emit( 527 + "importer", 528 + "error", 529 + import_id=_import_id, 530 + stage=_current_stage, 531 + error=str(e), 532 + duration_ms=duration_ms, 533 + partial_outputs=partial_outputs, 534 + ) 535 + 536 + logger.error(f"Import failed: {e}") 537 + raise 538 + 539 + finally: 540 + # Stop status thread 541 + _status_running = False 542 + if _status_thread: 543 + _status_thread.join(timeout=6) 544 + 545 + 546 + if __name__ == "__main__": 547 + main()
+222
think/importers/shared.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import datetime as dt 5 + import json 6 + import logging 7 + import os 8 + import shutil 9 + from pathlib import Path 10 + 11 + from think.importers.utils import save_import_file, write_import_metadata 12 + from think.utils import get_journal, now_ms 13 + 14 + logger = logging.getLogger(__name__) 15 + 16 + 17 + def _get_relative_path(path: str) -> str: 18 + """Get path relative to journal, or return as-is if not under journal.""" 19 + journal_path = get_journal() 20 + try: 21 + return os.path.relpath(path, journal_path) 22 + except ValueError: 23 + return path 24 + 25 + 26 + def _write_import_jsonl( 27 + file_path: str, 28 + entries: list[dict], 29 + *, 30 + import_id: str, 31 + raw_filename: str | None = None, 32 + facet: str | None = None, 33 + setting: str | None = None, 34 + ) -> None: 35 + """Write imported transcript entries in JSONL format. 36 + 37 + First line contains imported metadata, subsequent lines contain entries. 38 + Each entry gets source="import" added to match the imported_audio.jsonl convention. 39 + 40 + Args: 41 + file_path: Path to write JSONL file 42 + entries: List of transcript entries 43 + import_id: Import identifier 44 + raw_filename: Source file name (relative path from segment to imports/) 45 + facet: Optional facet name 46 + setting: Optional setting description 47 + """ 48 + imported_meta: dict[str, str] = {"id": import_id} 49 + if facet: 50 + imported_meta["facet"] = facet 51 + if setting: 52 + imported_meta["setting"] = setting 53 + 54 + # Build top-level metadata with imported info 55 + metadata: dict[str, object] = {"imported": imported_meta} 56 + 57 + # Add raw file reference (path relative from segment to imports directory) 58 + if raw_filename: 59 + metadata["raw"] = f"../../imports/{import_id}/{raw_filename}" 60 + 61 + # Write JSONL: metadata first, then entries with source field 62 + jsonl_lines = [json.dumps(metadata)] 63 + for entry in entries: 64 + # Add source field if not already present (skip metadata entries like topics/setting) 65 + if "text" in entry and "source" not in entry: 66 + entry = {**entry, "source": "import"} 67 + jsonl_lines.append(json.dumps(entry)) 68 + 69 + with open(file_path, "w", encoding="utf-8") as f: 70 + f.write("\n".join(jsonl_lines) + "\n") 71 + 72 + 73 + def _run_import_summary( 74 + import_dir: Path, 75 + day: str, 76 + segments: list[str], 77 + ) -> bool: 78 + """Create a summary for imported segments using cortex generator. 79 + 80 + Args: 81 + import_dir: Directory where the summary will be saved 82 + day: Day string (YYYYMMDD format) 83 + segments: List of segment keys to summarize 84 + 85 + Returns: 86 + True if summary was created successfully, False otherwise 87 + """ 88 + from think.cortex_client import cortex_request, wait_for_agents 89 + 90 + if not segments: 91 + logger.info("No segments to summarize") 92 + return False 93 + 94 + summary_path = import_dir / "summary.md" 95 + 96 + try: 97 + logger.info(f"Creating summary for {len(segments)} segments via cortex") 98 + 99 + # Spawn generator via cortex 100 + agent_id = cortex_request( 101 + prompt="", # Generators don't use prompt 102 + name="importer", 103 + config={ 104 + "day": day, 105 + "span": segments, 106 + "output": "md", 107 + "output_path": str(summary_path), 108 + }, 109 + ) 110 + 111 + if agent_id is None: 112 + logger.error("Failed to send cortex request for import summary") 113 + return False 114 + 115 + # Wait for completion 116 + completed, timed_out = wait_for_agents([agent_id], timeout=300) 117 + 118 + if timed_out: 119 + logger.error(f"Import summary timed out (ID: {agent_id})") 120 + return False 121 + 122 + if completed: 123 + end_state = completed.get(agent_id, "unknown") 124 + if end_state == "finish" and summary_path.exists(): 125 + logger.info(f"Created import summary: {summary_path}") 126 + return True 127 + else: 128 + logger.warning( 129 + f"Generator ended with state {end_state}, " 130 + f"summary exists: {summary_path.exists()}" 131 + ) 132 + return False 133 + 134 + logger.error("Generator did not complete") 135 + return False 136 + 137 + except Exception as e: 138 + logger.error(f"Failed to create summary: {e}") 139 + return False 140 + 141 + 142 + # MIME type mapping for import metadata 143 + _MIME_TYPES = { 144 + ".m4a": "audio/mp4", 145 + ".mp3": "audio/mpeg", 146 + ".wav": "audio/wav", 147 + ".flac": "audio/flac", 148 + ".ogg": "audio/ogg", 149 + ".mp4": "video/mp4", 150 + ".webm": "video/webm", 151 + ".mov": "video/quicktime", 152 + ".txt": "text/plain", 153 + ".md": "text/markdown", 154 + ".pdf": "application/pdf", 155 + } 156 + 157 + 158 + def _is_in_imports(media_path: str) -> bool: 159 + """Check if file path is already under journal/imports/.""" 160 + imports_dir = os.path.join(get_journal(), "imports") 161 + abs_media = os.path.abspath(media_path) 162 + abs_imports = os.path.abspath(imports_dir) 163 + return abs_media.startswith(abs_imports + os.sep) 164 + 165 + 166 + def _setup_import( 167 + media_path: str, 168 + timestamp: str, 169 + facet: str | None, 170 + setting: str | None, 171 + detection_result: dict | None, 172 + force: bool = False, 173 + ) -> str: 174 + """Copy file to imports/ and write metadata. Returns new file path.""" 175 + journal_root = Path(get_journal()) 176 + import_dir = journal_root / "imports" / timestamp 177 + 178 + # Check for conflict 179 + if import_dir.exists(): 180 + if force: 181 + logger.info(f"Removing existing import directory: {import_dir}") 182 + shutil.rmtree(import_dir) 183 + else: 184 + raise SystemExit( 185 + f"Error: Import already exists for timestamp {timestamp}\n" 186 + f"To re-import, use --force to delete existing data and start over" 187 + ) 188 + 189 + # Copy file to imports/ 190 + filename = os.path.basename(media_path) 191 + new_path = save_import_file( 192 + journal_root=journal_root, 193 + timestamp=timestamp, 194 + source_path=Path(media_path), 195 + filename=filename, 196 + ) 197 + 198 + # Build metadata matching app structure 199 + upload_ts = now_ms() 200 + ext = os.path.splitext(filename)[1].lower() 201 + metadata = { 202 + "original_filename": filename, 203 + "upload_timestamp": upload_ts, 204 + "upload_datetime": dt.datetime.fromtimestamp(upload_ts / 1000).isoformat(), 205 + "detection_result": detection_result, 206 + "detected_timestamp": timestamp, 207 + "user_timestamp": timestamp, 208 + "file_size": new_path.stat().st_size if new_path.exists() else 0, 209 + "mime_type": _MIME_TYPES.get(ext, "application/octet-stream"), 210 + "facet": facet, 211 + "setting": setting, 212 + "file_path": str(new_path), 213 + } 214 + 215 + write_import_metadata( 216 + journal_root=journal_root, 217 + timestamp=timestamp, 218 + metadata=metadata, 219 + ) 220 + 221 + logger.info(f"Copied to journal: {new_path}") 222 + return str(new_path)
+131
think/importers/text.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import datetime as dt 5 + import logging 6 + import os 7 + 8 + from think.detect_transcript import detect_transcript_json, detect_transcript_segment 9 + from think.importers.shared import _write_import_jsonl 10 + 11 + try: 12 + from pypdf import PdfReader 13 + except ImportError: # pragma: no cover - optional dependency 14 + PdfReader = None 15 + 16 + logger = logging.getLogger(__name__) 17 + 18 + 19 + def _read_transcript(path: str) -> str: 20 + """Return transcript text from a .txt/.md/.pdf file.""" 21 + ext = os.path.splitext(path)[1].lower() 22 + if ext in {".txt", ".md"}: 23 + with open(path, "r", encoding="utf-8") as f: 24 + return f.read() 25 + if ext == ".pdf": 26 + if PdfReader is None: 27 + raise RuntimeError("pypdf required for PDF support") 28 + reader = PdfReader(path) 29 + parts = [] 30 + for page in reader.pages: 31 + text = page.extract_text() or "" 32 + parts.append(text) 33 + return "\n".join(parts) 34 + raise ValueError("unsupported transcript format") 35 + 36 + 37 + def _time_to_seconds(time_str: str) -> int: 38 + """Convert HH:MM:SS time string to seconds from midnight.""" 39 + h, m, s = map(int, time_str.split(":")) 40 + return h * 3600 + m * 60 + s 41 + 42 + 43 + def process_transcript( 44 + path: str, 45 + day_dir: str, 46 + base_dt: dt.datetime, 47 + *, 48 + import_id: str, 49 + stream: str, 50 + facet: str | None = None, 51 + setting: str | None = None, 52 + audio_duration: int | None = None, 53 + ) -> list[str]: 54 + """Process a transcript file and write imported JSONL segments. 55 + 56 + Args: 57 + path: Path to transcript file 58 + day_dir: Journal day directory 59 + base_dt: Base datetime for the import 60 + import_id: Import identifier 61 + stream: Stream name for directory layout (day/stream/segment/) 62 + facet: Optional facet name 63 + setting: Optional setting description 64 + audio_duration: Optional total audio duration in seconds (for last segment) 65 + 66 + Returns: 67 + List of created file paths. 68 + """ 69 + created_files = [] 70 + text = _read_transcript(path) 71 + stream_dir = os.path.join(day_dir, stream) 72 + 73 + # Get start time from base_dt for segmentation 74 + start_time = base_dt.strftime("%H:%M:%S") 75 + 76 + # Get segments with their absolute start times 77 + segments = detect_transcript_segment(text, start_time) 78 + 79 + for idx, (start_at, seg_text) in enumerate(segments): 80 + # Convert segment text to structured JSON with absolute timestamps 81 + json_data = detect_transcript_json(seg_text, start_at) 82 + if not json_data: 83 + continue 84 + 85 + # Parse absolute time for segment directory name 86 + time_part = start_at.replace(":", "") # "12:05:30" -> "120530" 87 + 88 + # Compute segment duration from absolute times 89 + start_seconds = _time_to_seconds(start_at) 90 + if idx + 1 < len(segments): 91 + next_start_at, _ = segments[idx + 1] 92 + next_seconds = _time_to_seconds(next_start_at) 93 + duration = next_seconds - start_seconds 94 + else: 95 + # Last segment: use remaining audio duration or default +5s 96 + if audio_duration: 97 + # audio_duration is total length, start_seconds is time-of-day 98 + # Need to calculate offset from recording start 99 + recording_start_seconds = _time_to_seconds(start_time) 100 + segment_offset = start_seconds - recording_start_seconds 101 + duration = audio_duration - segment_offset 102 + else: 103 + duration = 5 104 + 105 + # Negative duration indicates corrupted/invalid timestamp data 106 + if duration < 0: 107 + raise ValueError( 108 + f"Invalid segment duration: {duration}s for segment at {time_part}. " 109 + "Timestamps may be out of order or audio_duration is incorrect." 110 + ) 111 + 112 + # Ensure minimum duration of 1 second 113 + duration = max(1, duration) 114 + 115 + segment_name = f"{time_part}_{duration}" 116 + ts_dir = os.path.join(stream_dir, segment_name) 117 + os.makedirs(ts_dir, exist_ok=True) 118 + json_path = os.path.join(ts_dir, "imported_audio.jsonl") 119 + 120 + _write_import_jsonl( 121 + json_path, 122 + json_data, 123 + import_id=import_id, 124 + raw_filename=os.path.basename(path), 125 + facet=facet, 126 + setting=setting, 127 + ) 128 + logger.info(f"Added transcript segment to journal: {json_path}") 129 + created_files.append(json_path) 130 + 131 + return created_files