personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

apps/import: surface merge artifact paths and decision-log highlights via /api/<timestamp>

- Extends get_import_details() to add top-level merge_artifact_paths, decision_highlights, and summary_errors when the import carries a merge summary.
- New _load_decision_highlights() parses decisions.jsonl, filters to staged-entity and errored-segment rows, caps at 50.

Co-Authored-By: OpenAI Codex <codex@openai.com>

+168
+97
tests/test_importer_utils.py
··· 10 10 import pytest 11 11 12 12 from think.importers.utils import ( 13 + _load_decision_highlights, 13 14 build_import_info, 14 15 calculate_duration_from_files, 15 16 get_import_details, ··· 324 325 325 326 details = get_import_details(temp_journal, timestamp) 326 327 assert details["segments_json"] == segments_data 328 + 329 + 330 + def test_load_decision_highlights_filters_and_caps(temp_journal): 331 + decision_log = temp_journal / "journal.merge" / "run" / "decisions.jsonl" 332 + decision_log.parent.mkdir(parents=True) 333 + 334 + rows = [ 335 + '{"action":"ignored","item_id":"skip-me"}', 336 + "not json at all", 337 + ] 338 + for idx in range(30): 339 + rows.append( 340 + json.dumps( 341 + { 342 + "action": "entity_staged", 343 + "item_id": f"entity-{idx}", 344 + "source": {"name": f"Source {idx}"}, 345 + "target": {"name": f"Target {idx}"}, 346 + "staging_path": f"/tmp/staging/entity-{idx}/entity.json", 347 + } 348 + ) 349 + ) 350 + for idx in range(30): 351 + rows.append( 352 + json.dumps( 353 + { 354 + "action": "segment_errored", 355 + "item_id": f"20260101/default/{idx:06d}_300", 356 + "reason": f"segment failure {idx}", 357 + } 358 + ) 359 + ) 360 + decision_log.write_text("\n".join(rows) + "\n", encoding="utf-8") 361 + 362 + highlights = _load_decision_highlights(decision_log) 363 + 364 + assert highlights is not None 365 + assert len(highlights["staged_entities"]) == 30 366 + assert len(highlights["errored_segments"]) == 20 367 + assert highlights["staged_entities"][0] == { 368 + "source_name": "Source 0", 369 + "target_name": "Target 0", 370 + "staging_path": "/tmp/staging/entity-0/entity.json", 371 + } 372 + assert highlights["errored_segments"][0] == { 373 + "item_id": "20260101/default/000000_300", 374 + "reason": "segment failure 0", 375 + } 376 + 377 + 378 + def test_load_decision_highlights_returns_none_without_qualifying_rows(temp_journal): 379 + decision_log = temp_journal / "journal.merge" / "run" / "decisions.jsonl" 380 + decision_log.parent.mkdir(parents=True) 381 + decision_log.write_text( 382 + "\n".join( 383 + [ 384 + '{"action":"segment_copied","item_id":"20260101/default/090000_300"}', 385 + "not json at all", 386 + '{"action":"entity_created","item_id":"source_person"}', 387 + ] 388 + ) 389 + + "\n", 390 + encoding="utf-8", 391 + ) 392 + 393 + assert _load_decision_highlights(decision_log) is None 394 + 395 + 396 + def test_load_decision_highlights_propagates_non_missing_io_errors( 397 + temp_journal, monkeypatch 398 + ): 399 + decision_log = temp_journal / "journal.merge" / "run" / "decisions.jsonl" 400 + decision_log.parent.mkdir(parents=True) 401 + decision_log.write_text( 402 + json.dumps( 403 + { 404 + "action": "segment_errored", 405 + "item_id": "20260101/default/090000_300", 406 + "reason": "segment copy failed", 407 + } 408 + ) 409 + + "\n", 410 + encoding="utf-8", 411 + ) 412 + 413 + real_open = open 414 + 415 + def broken_open(path, *args, **kwargs): 416 + if Path(path) == decision_log: 417 + raise PermissionError("permission denied") 418 + return real_open(path, *args, **kwargs) 419 + 420 + monkeypatch.setattr("builtins.open", broken_open) 421 + 422 + with pytest.raises(PermissionError, match="permission denied"): 423 + _load_decision_highlights(decision_log)
+71
think/importers/utils.py
··· 423 423 except Exception: 424 424 pass 425 425 426 + imported_json = result.get("imported_json") 427 + if ( 428 + isinstance(imported_json, dict) 429 + and imported_json.get("merge_summary") is not None 430 + ): 431 + merge_log_path = imported_json.get("merge_log_path") 432 + merge_staging_path = imported_json.get("merge_staging_path") 433 + if merge_log_path and merge_staging_path: 434 + result["merge_artifact_paths"] = { 435 + "decisions": merge_log_path, 436 + "staging": merge_staging_path, 437 + } 438 + decision_highlights = _load_decision_highlights(Path(merge_log_path)) 439 + if decision_highlights is not None: 440 + result["decision_highlights"] = decision_highlights 441 + 442 + summary_errors = imported_json.get("summary_errors") 443 + if isinstance(summary_errors, list) and summary_errors: 444 + result["summary_errors"] = summary_errors 445 + 426 446 return result 447 + 448 + 449 + def _load_decision_highlights(decisions_path: Path) -> dict | None: 450 + """Load selected decision-log rows for detail-view highlights.""" 451 + if not decisions_path.exists(): 452 + return None 453 + 454 + staged_entities: list[dict[str, str]] = [] 455 + errored_segments: list[dict[str, str]] = [] 456 + qualifying_rows = 0 457 + 458 + try: 459 + with open(decisions_path, "r", encoding="utf-8") as handle: 460 + for line in handle: 461 + if qualifying_rows >= 50: 462 + break 463 + line = line.strip() 464 + if not line: 465 + continue 466 + try: 467 + row = json.loads(line) 468 + except json.JSONDecodeError: 469 + continue 470 + 471 + action = row.get("action") 472 + if action == "entity_staged": 473 + staged_entities.append( 474 + { 475 + "source_name": row["source"]["name"], 476 + "target_name": row["target"]["name"], 477 + "staging_path": row["staging_path"], 478 + } 479 + ) 480 + qualifying_rows += 1 481 + elif action == "segment_errored": 482 + errored_segments.append( 483 + { 484 + "item_id": row["item_id"], 485 + "reason": row["reason"], 486 + } 487 + ) 488 + qualifying_rows += 1 489 + except FileNotFoundError: 490 + return None 491 + 492 + if not staged_entities and not errored_segments: 493 + return None 494 + return { 495 + "staged_entities": staged_entities, 496 + "errored_segments": errored_segments, 497 + } 427 498 428 499 429 500 def _backfill_item_type(source_type: str) -> str: