personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor importer segment writing

Extract a shared write_segment() helper from the segment-writing path and refactor the text importer to use it without changing behavior.

Rewrite the ChatGPT importer to emit segment-aligned imported_audio.jsonl output using 5-minute windows and extend ImportResult with optional segment metadata.

Update the CLI to prepare file-import directories and handle segment-producing file importers, including stream marker and segment tracking support.

+436 -93
+1 -1
tests/test_import_dedup.py
··· 209 209 assert header["entry_count"] == 2 210 210 211 211 # Verify content 212 - entries = [json.loads(l) for l in lines[1:]] 212 + entries = [json.loads(line) for line in lines[1:]] 213 213 titles = [e["title"] for e in entries] 214 214 assert "Standup" in titles 215 215 assert "New meeting" in titles
+181
tests/test_importer.py
··· 5 5 import importlib 6 6 import json 7 7 import subprocess 8 + import zipfile 8 9 from pathlib import Path 9 10 from unittest.mock import MagicMock, patch 10 11 ··· 261 262 # Verify segments.json written 262 263 segments_json = tmp_path / "imports" / "20251205_163000" / "segments.json" 263 264 assert segments_json.exists() 265 + 266 + 267 + def test_write_segment(tmp_path): 268 + """Test write_segment creates a segment directory and JSONL file.""" 269 + mod = importlib.import_module("think.importers.shared") 270 + 271 + json_path = mod.write_segment( 272 + str(tmp_path / "20240101"), 273 + "import.text", 274 + "120000_300", 275 + [{"start": "00:00:00", "speaker": "Alice", "text": "Hello"}], 276 + import_id="20240101_120000", 277 + raw_filename="notes.txt", 278 + facet="work", 279 + setting="standup", 280 + model="gpt-4", 281 + ) 282 + 283 + written = Path(json_path) 284 + assert ( 285 + written 286 + == tmp_path / "20240101" / "import.text" / "120000_300" / "imported_audio.jsonl" 287 + ) 288 + assert written.exists() 289 + 290 + lines = written.read_text().strip().split("\n") 291 + metadata = json.loads(lines[0]) 292 + entry = json.loads(lines[1]) 293 + 294 + assert metadata["imported"] == { 295 + "id": "20240101_120000", 296 + "facet": "work", 297 + "setting": "standup", 298 + } 299 + assert metadata["raw"] == "../../../imports/20240101_120000/notes.txt" 300 + assert metadata["model"] == "gpt-4" 301 + assert entry["source"] == "import" 302 + 303 + 304 + def test_chatgpt_importer_segments(tmp_path, monkeypatch): 305 + """ChatGPT importer should write message windows as import segments.""" 306 + mod = importlib.import_module("think.importers.chatgpt") 307 + 308 + base = dt.datetime(2026, 1, 15, 12, 0, 0).timestamp() 309 + conversations = [ 310 + { 311 + "title": "First", 312 + "current_node": "a3", 313 + "mapping": { 314 + "a1": { 315 + "parent": None, 316 + "message": { 317 + "author": {"role": "user"}, 318 + "content": {"parts": ["Hello"]}, 319 + "create_time": base, 320 + }, 321 + }, 322 + "a2": { 323 + "parent": "a1", 324 + "message": { 325 + "author": {"role": "assistant"}, 326 + "content": {"parts": ["Hi there"]}, 327 + "create_time": base + 60, 328 + "metadata": {"model_slug": "gpt-4"}, 329 + }, 330 + }, 331 + "a3": { 332 + "parent": "a2", 333 + "message": { 334 + "author": {"role": "user"}, 335 + "content": {"parts": ["New topic"]}, 336 + "create_time": base + 301, 337 + }, 338 + }, 339 + }, 340 + }, 341 + { 342 + "title": "Second", 343 + "current_node": "b2", 344 + "mapping": { 345 + "b1": { 346 + "parent": None, 347 + "message": { 348 + "author": {"role": "assistant"}, 349 + "content": {"parts": ["Missing time"]}, 350 + }, 351 + }, 352 + "b2": { 353 + "parent": "b1", 354 + "message": { 355 + "author": {"role": "assistant"}, 356 + "content": {"parts": ["Next day reply"]}, 357 + "create_time": base + 12 * 3600, 358 + "metadata": {"model_slug": "gpt-4o"}, 359 + }, 360 + }, 361 + }, 362 + }, 363 + ] 364 + 365 + archive = tmp_path / "chatgpt.zip" 366 + with zipfile.ZipFile(archive, "w") as zf: 367 + zf.writestr("conversations.json", json.dumps(conversations)) 368 + 369 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 370 + 371 + fixed_dt = dt.datetime(2026, 1, 20, 8, 30, 0) 372 + 373 + class FixedDateTime(dt.datetime): 374 + @classmethod 375 + def now(cls, tz=None): 376 + return fixed_dt 377 + 378 + monkeypatch.setattr(mod.dt, "datetime", FixedDateTime) 379 + 380 + result = mod.ChatGPTImporter().process(archive, tmp_path, facet="work") 381 + 382 + assert result.entries_written == 4 383 + assert result.errors == [] 384 + assert result.segments == [ 385 + ("20260115", "120000_300"), 386 + ("20260115", "120501_300"), 387 + ("20260116", "000000_300"), 388 + ] 389 + assert len(result.files_created) == 3 390 + 391 + first_segment = ( 392 + day_path("20260115") / "import.chatgpt" / "120000_300" / "imported_audio.jsonl" 393 + ) 394 + second_segment = ( 395 + day_path("20260115") / "import.chatgpt" / "120501_300" / "imported_audio.jsonl" 396 + ) 397 + third_segment = ( 398 + day_path("20260116") / "import.chatgpt" / "000000_300" / "imported_audio.jsonl" 399 + ) 400 + 401 + assert first_segment.exists() 402 + assert second_segment.exists() 403 + assert third_segment.exists() 404 + 405 + first_lines = first_segment.read_text().strip().split("\n") 406 + first_meta = json.loads(first_lines[0]) 407 + first_entries = [json.loads(line) for line in first_lines[1:]] 408 + assert first_meta["imported"] == {"id": "20260120_083000", "facet": "work"} 409 + assert first_meta["model"] == "gpt-4" 410 + assert first_entries == [ 411 + {"start": "00:00:00", "speaker": "Human", "text": "Hello", "source": "import"}, 412 + { 413 + "start": "00:01:00", 414 + "speaker": "Assistant", 415 + "text": "Hi there", 416 + "source": "import", 417 + }, 418 + ] 419 + 420 + second_lines = second_segment.read_text().strip().split("\n") 421 + second_meta = json.loads(second_lines[0]) 422 + second_entries = [json.loads(line) for line in second_lines[1:]] 423 + assert second_meta == {"imported": {"id": "20260120_083000", "facet": "work"}} 424 + assert second_entries == [ 425 + { 426 + "start": "00:00:00", 427 + "speaker": "Human", 428 + "text": "New topic", 429 + "source": "import", 430 + } 431 + ] 432 + 433 + third_lines = third_segment.read_text().strip().split("\n") 434 + third_meta = json.loads(third_lines[0]) 435 + third_entries = [json.loads(line) for line in third_lines[1:]] 436 + assert third_meta["model"] == "gpt-4o" 437 + assert third_entries == [ 438 + { 439 + "start": "00:00:00", 440 + "speaker": "Assistant", 441 + "text": "Next day reply", 442 + "source": "import", 443 + } 444 + ] 264 445 265 446 266 447 def test_format_audio_stream_path():
+22
tests/test_importer_jsonl.py
··· 136 136 } 137 137 assert metadata["topics"] == "project updates, sprint planning" 138 138 assert metadata["setting"] == "workplace" 139 + 140 + 141 + def test_write_import_jsonl_with_model(): 142 + """Test writing imported JSONL with model metadata.""" 143 + with tempfile.TemporaryDirectory() as tmpdir: 144 + json_path = Path(tmpdir) / "test_audio.jsonl" 145 + 146 + _write_import_jsonl( 147 + str(json_path), 148 + [{"start": "00:00:00", "speaker": "Assistant", "text": "Hello"}], 149 + import_id="20240101_120000", 150 + model="gpt-4", 151 + ) 152 + 153 + with open(json_path, "r") as f: 154 + lines = f.read().strip().split("\n") 155 + 156 + metadata = json.loads(lines[0]) 157 + assert metadata == { 158 + "imported": {"id": "20240101_120000"}, 159 + "model": "gpt-4", 160 + }
+145 -84
think/importers/chatgpt.py
··· 11 11 from typing import Any, Callable 12 12 13 13 from think.importers.file_importer import ImportPreview, ImportResult 14 - from think.importers.shared import write_structured_import 14 + from think.importers.shared import write_segment 15 + from think.utils import day_path 15 16 16 17 logger = logging.getLogger(__name__) 17 18 ··· 50 51 return chain 51 52 52 53 53 - def _format_messages(messages: list[dict[str, Any]]) -> tuple[str, int, str | None]: 54 - """Format ChatGPT messages into readable text. 54 + def _extract_messages( 55 + conversations: list[dict[str, Any]], 56 + ) -> tuple[list[dict[str, Any]], dict[str, int], int]: 57 + """Extract timestamped user and assistant messages from conversations.""" 58 + messages: list[dict[str, Any]] = [] 59 + model_counts: dict[str, int] = {} 60 + skipped = 0 61 + 62 + for conv in conversations: 63 + mapping = conv.get("mapping", {}) 64 + if not mapping: 65 + skipped += 1 66 + continue 67 + 68 + conv_messages = _walk_message_tree(conv) 69 + conv_has_content = False 70 + 71 + for msg in conv_messages: 72 + author = msg.get("author", {}) 73 + role = author.get("role", "") 74 + if role not in ("user", "assistant"): 75 + continue 76 + 77 + content = msg.get("content", {}) 78 + parts = content.get("parts", []) 79 + text_parts = [p for p in parts if isinstance(p, str)] 80 + text = "\n".join(text_parts).strip() 81 + if not text: 82 + continue 83 + 84 + create_time = msg.get("create_time") 85 + if create_time is None or not isinstance(create_time, (int, float)): 86 + continue 87 + 88 + model_slug = None 89 + if role == "assistant": 90 + meta = msg.get("metadata", {}) 91 + model_slug = meta.get("model_slug") 92 + if model_slug: 93 + model_counts[model_slug] = model_counts.get(model_slug, 0) + 1 94 + 95 + messages.append( 96 + { 97 + "create_time": float(create_time), 98 + "speaker": "Human" if role == "user" else "Assistant", 99 + "text": text, 100 + "model_slug": model_slug, 101 + } 102 + ) 103 + conv_has_content = True 104 + 105 + if not conv_has_content: 106 + skipped += 1 107 + 108 + return messages, model_counts, skipped 109 + 110 + 111 + def _window_messages( 112 + messages: list[dict[str, Any]], 113 + window_duration: int = 300, 114 + ) -> list[tuple[str, str, str | None, list[dict[str, Any]]]]: 115 + """Group sorted messages into fixed-duration windows per day.""" 116 + if not messages: 117 + return [] 55 118 56 - Returns (formatted_content, message_count, model_slug). 57 - """ 58 - lines: list[str] = [] 59 - count = 0 60 - model_slug: str | None = None 119 + windows: list[tuple[str, str, str | None, list[dict[str, Any]]]] = [] 120 + window_start: float | None = None 121 + window_day: str | None = None 122 + window_entries: list[dict[str, Any]] = [] 123 + window_model: str | None = None 61 124 62 125 for msg in messages: 63 - author = msg.get("author", {}) 64 - role = author.get("role", "") 126 + msg_dt = dt.datetime.fromtimestamp(msg["create_time"]) 127 + msg_day = msg_dt.strftime("%Y%m%d") 128 + 129 + if ( 130 + window_start is None 131 + or msg_day != window_day 132 + or msg["create_time"] - window_start >= window_duration 133 + ): 134 + if window_entries and window_day and window_start is not None: 135 + start_dt = dt.datetime.fromtimestamp(window_start) 136 + seg_key = f"{start_dt.strftime('%H%M%S')}_{window_duration}" 137 + windows.append((window_day, seg_key, window_model, window_entries)) 65 138 66 - # Skip system and tool messages 67 - if role in {"system", "tool"}: 68 - continue 139 + window_start = msg["create_time"] 140 + window_day = msg_day 141 + window_entries = [] 142 + window_model = None 69 143 70 - content = msg.get("content", {}) 71 - parts = content.get("parts", []) 72 - # Filter out non-string parts (e.g., image refs) 73 - text_parts = [p for p in parts if isinstance(p, str)] 74 - text = "\n".join(text_parts).strip() 75 - if not text: 76 - continue 144 + offset = int(msg["create_time"] - window_start) 145 + h, remainder = divmod(offset, 3600) 146 + m, s = divmod(remainder, 60) 147 + window_entries.append( 148 + { 149 + "start": f"{h:02d}:{m:02d}:{s:02d}", 150 + "speaker": msg["speaker"], 151 + "text": msg["text"], 152 + } 153 + ) 77 154 78 - # Extract model info from assistant messages 79 - if role == "assistant" and model_slug is None: 80 - meta = msg.get("metadata", {}) 81 - model_slug = meta.get("model_slug") 155 + if msg["model_slug"] and window_model is None: 156 + window_model = msg["model_slug"] 82 157 83 - label = "Human" if role == "user" else "Assistant" 84 - lines.append(f"{label}: {text}") 85 - count += 1 158 + if window_entries and window_day and window_start is not None: 159 + start_dt = dt.datetime.fromtimestamp(window_start) 160 + seg_key = f"{start_dt.strftime('%H%M%S')}_{window_duration}" 161 + windows.append((window_day, seg_key, window_model, window_entries)) 86 162 87 - return "\n\n".join(lines), count, model_slug 163 + return windows 88 164 89 165 90 166 class ChatGPTImporter: ··· 174 250 ) -> ImportResult: 175 251 conversations = _open_conversations(path) 176 252 import_id = dt.datetime.now().strftime("%Y%m%d_%H%M%S") 253 + messages, model_counts, skipped = _extract_messages(conversations) 254 + if not messages: 255 + return ImportResult( 256 + entries_written=0, 257 + entities_seeded=0, 258 + files_created=[], 259 + errors=[], 260 + summary="No messages found in ChatGPT export", 261 + ) 177 262 178 - entries: list[dict[str, Any]] = [] 179 - errors: list[str] = [] 180 - skipped = 0 181 - model_counts: dict[str, int] = {} 263 + messages.sort(key=lambda m: m["create_time"]) 182 264 183 - for i, conv in enumerate(conversations): 184 - mapping = conv.get("mapping", {}) 185 - if not mapping: 186 - skipped += 1 187 - continue 265 + if progress_callback: 266 + progress_callback(len(conversations), len(conversations)) 188 267 189 - title = conv.get("title", "Untitled") 190 - create_time = conv.get("create_time") 268 + windows = _window_messages(messages) 269 + created_files: list[str] = [] 270 + segments: list[tuple[str, str]] = [] 271 + errors: list[str] = [] 272 + written_count = 0 191 273 192 - # Parse timestamp (Unix epoch float) 193 - if create_time is None: 194 - errors.append(f"Missing create_time for conversation: {title!r}") 195 - continue 274 + for day, seg_key, model_slug, entries in windows: 275 + day_dir = str(day_path(day)) 196 276 try: 197 - ts = dt.datetime.fromtimestamp(create_time).isoformat() 198 - except (ValueError, OSError): 199 - errors.append(f"Bad timestamp for conversation: {title!r}") 200 - continue 201 - 202 - # Walk message tree 203 - messages = _walk_message_tree(conv) 204 - content, message_count, model_slug = _format_messages(messages) 205 - if not content: 206 - skipped += 1 207 - continue 208 - 209 - if model_slug: 210 - model_counts[model_slug] = model_counts.get(model_slug, 0) + 1 211 - 212 - entry: dict[str, Any] = { 213 - "type": "ai_chat", 214 - "ts": ts, 215 - "title": title, 216 - "source": "chatgpt", 217 - "message_count": message_count, 218 - "content": content, 219 - } 220 - if model_slug: 221 - entry["model"] = model_slug 222 - 223 - entries.append(entry) 224 - 225 - if progress_callback and (i + 1) % 100 == 0: 226 - progress_callback(i + 1, len(conversations)) 227 - 228 - # Write to journal 229 - created_files = write_structured_import( 230 - "chatgpt", 231 - entries, 232 - import_id=import_id, 233 - facet=facet, 234 - ) 277 + json_path = write_segment( 278 + day_dir, 279 + "import.chatgpt", 280 + seg_key, 281 + entries, 282 + import_id=import_id, 283 + facet=facet, 284 + model=model_slug, 285 + ) 286 + created_files.append(json_path) 287 + segments.append((day, seg_key)) 288 + written_count += len(entries) 289 + except Exception as exc: 290 + errors.append(f"Failed to write segment {day}/{seg_key}: {exc}") 291 + logger.warning("Failed to write segment %s/%s: %s", day, seg_key, exc) 235 292 236 293 if skipped: 237 294 logger.info("Skipped %d conversations with no content", skipped) 238 295 239 - # Build summary with model distribution 296 + days = sorted({day for day, _ in segments}) 240 297 model_info = "" 241 298 if model_counts: 242 299 top_models = sorted(model_counts.items(), key=lambda x: -x[1])[:5] 243 300 model_info = " — models: " + ", ".join(f"{m} ({n})" for m, n in top_models) 244 301 245 302 return ImportResult( 246 - entries_written=len(entries), 303 + entries_written=written_count, 247 304 entities_seeded=0, 248 305 files_created=created_files, 249 306 errors=errors, 250 - summary=f"Imported {len(entries)} ChatGPT conversations across {len(created_files)} days{model_info}", 307 + summary=( 308 + f"Imported {len(messages)} messages across {len(days)} days into " 309 + f"{len(segments)} segments{model_info}" 310 + ), 311 + segments=segments, 251 312 ) 252 313 253 314
+37
think/importers/cli.py
··· 18 18 from think.importers.shared import ( 19 19 _get_relative_path, 20 20 _is_in_imports, 21 + _setup_file_import, 21 22 _setup_import, 22 23 ) 23 24 from think.importers.text import _read_transcript, process_transcript ··· 594 595 595 596 _source_hash = hash_source(Path(args.media)) 596 597 598 + _setup_file_import(_import_id) 597 599 result = _file_importer.process( 598 600 Path(args.media), journal_root, facet=args.facet 599 601 ) ··· 631 633 errors=len(result.errors), 632 634 stream=stream, 633 635 ) 636 + 637 + if result.segments: 638 + for seg_day, seg_key in result.segments: 639 + if seg_key not in created_segments: 640 + created_segments.append(seg_key) 641 + try: 642 + seg_dir = day_path(seg_day) / stream / seg_key 643 + stream_result = update_stream( 644 + stream, seg_day, seg_key, type="import", host=None 645 + ) 646 + write_segment_stream( 647 + seg_dir, 648 + stream, 649 + stream_result["prev_day"], 650 + stream_result["prev_segment"], 651 + stream_result["seq"], 652 + ) 653 + except Exception as e: 654 + logger.warning(f"Failed to write stream identity: {e}") 655 + 656 + all_seg_keys = [seg_key for _, seg_key in result.segments] 657 + first_day = result.segments[0][0] 658 + save_import_segments(journal_root, _import_id, all_seg_keys, first_day) 659 + 660 + for seg_day, seg_key in result.segments: 661 + _callosum.emit( 662 + "observe", 663 + "observed", 664 + segment=seg_key, 665 + day=seg_day, 666 + stream=stream, 667 + ) 668 + logger.info( 669 + f"Emitted observe.observed for segment: {seg_day}/{seg_key}" 670 + ) 634 671 635 672 logger.info( 636 673 "%s import complete: %d entries, %d entities, %d files",
+1
think/importers/file_importer.py
··· 30 30 files_created: list[str] 31 31 errors: list[str] 32 32 summary: str 33 + segments: list[tuple[str, str]] | None = None 33 34 34 35 35 36 @runtime_checkable
+44
think/importers/shared.py
··· 40 40 setting: str | None = None, 41 41 topics: str | None = None, 42 42 detected_setting: str | None = None, 43 + model: str | None = None, 43 44 ) -> None: 44 45 """Write imported transcript entries in JSONL format. 45 46 ··· 74 75 metadata["topics"] = topics 75 76 if detected_setting: 76 77 metadata["setting"] = detected_setting 78 + if model: 79 + metadata["model"] = model 77 80 78 81 # Write JSONL: metadata first, then entries with source field 79 82 jsonl_lines = [json.dumps(metadata)] ··· 87 90 f.write("\n".join(jsonl_lines) + "\n") 88 91 89 92 93 + def write_segment( 94 + day_dir: str, 95 + stream: str, 96 + segment_key: str, 97 + entries: list[dict], 98 + *, 99 + import_id: str, 100 + raw_filename: str | None = None, 101 + facet: str | None = None, 102 + setting: str | None = None, 103 + topics: str | None = None, 104 + detected_setting: str | None = None, 105 + model: str | None = None, 106 + ) -> str: 107 + """Write a single segment's imported_audio.jsonl file.""" 108 + ts_dir = os.path.join(day_dir, stream, segment_key) 109 + os.makedirs(ts_dir, exist_ok=True) 110 + json_path = os.path.join(ts_dir, "imported_audio.jsonl") 111 + 112 + _write_import_jsonl( 113 + json_path, 114 + entries, 115 + import_id=import_id, 116 + raw_filename=raw_filename, 117 + facet=facet, 118 + setting=setting, 119 + topics=topics, 120 + detected_setting=detected_setting, 121 + model=model, 122 + ) 123 + return json_path 124 + 125 + 90 126 # MIME type mapping for import metadata 91 127 _MIME_TYPES = { 92 128 ".m4a": "audio/mp4", ··· 171 207 172 208 logger.info(f"Copied to journal: {new_path}") 173 209 return str(new_path) 210 + 211 + 212 + def _setup_file_import(import_id: str) -> Path: 213 + """Create imports/{import_id}/ directory for file importers.""" 214 + journal_root = Path(get_journal()) 215 + import_dir = journal_root / "imports" / import_id 216 + import_dir.mkdir(parents=True, exist_ok=True) 217 + return import_dir 174 218 175 219 176 220 def write_structured_import(
+5 -8
think/importers/text.py
··· 6 6 import os 7 7 8 8 from think.detect_transcript import detect_transcript_json, detect_transcript_segment 9 - from think.importers.shared import _write_import_jsonl 9 + from think.importers.shared import write_segment 10 10 11 11 try: 12 12 from pypdf import PdfReader ··· 68 68 """ 69 69 created_files = [] 70 70 text = _read_transcript(path) 71 - stream_dir = os.path.join(day_dir, stream) 72 71 73 72 # Get start time from base_dt for segmentation 74 73 start_time = base_dt.strftime("%H:%M:%S") ··· 140 139 duration = max(1, duration) 141 140 142 141 segment_name = f"{time_part}_{duration}" 143 - ts_dir = os.path.join(stream_dir, segment_name) 144 - os.makedirs(ts_dir, exist_ok=True) 145 - json_path = os.path.join(ts_dir, "imported_audio.jsonl") 146 - 147 - _write_import_jsonl( 148 - json_path, 142 + json_path = write_segment( 143 + day_dir, 144 + stream, 145 + segment_name, 149 146 json_data, 150 147 import_id=import_id, 151 148 raw_filename=os.path.basename(path),