personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'hopper-ne67urmq-facet-ingest'

+1983
+718
apps/import/facet_ingest.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import hashlib 7 + import json 8 + import re 9 + from datetime import datetime, timezone 10 + from pathlib import Path, PurePosixPath 11 + from typing import Any 12 + 13 + from think.entities.observations import load_observations, save_observations 14 + from think.entities.relationships import ( 15 + load_facet_relationship, 16 + save_facet_relationship, 17 + ) 18 + 19 + from .ingest import _append_decision 20 + 21 + _DAY_JSONL_RE = re.compile(r"^\d{8}\.jsonl$") 22 + _DAY_MD_RE = re.compile(r"^\d{8}\.md$") 23 + _FACET_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]*$") 24 + _ENTITY_FILE_TYPES = { 25 + "entity_relationship", 26 + "entity_observations", 27 + "detected_entities", 28 + "activity_records", 29 + } 30 + 31 + 32 + def _read_jsonl(path: Path) -> list[dict[str, Any]]: 33 + if not path.is_file(): 34 + return [] 35 + 36 + items: list[dict[str, Any]] = [] 37 + with open(path, encoding="utf-8") as handle: 38 + for line in handle: 39 + line = line.strip() 40 + if not line: 41 + continue 42 + items.append(json.loads(line)) 43 + return items 44 + 45 + 46 + def _append_jsonl(path: Path, items: list[dict[str, Any]]) -> None: 47 + if not items: 48 + return 49 + 50 + path.parent.mkdir(parents=True, exist_ok=True) 51 + with open(path, "a", encoding="utf-8") as handle: 52 + for item in items: 53 + handle.write(json.dumps(item, ensure_ascii=False) + "\n") 54 + 55 + 56 + def _remap_entity_id(entity_id: str, id_map: dict[str, str]) -> str | None: 57 + return id_map.get(entity_id) 58 + 59 + 60 + def _parse_path(path_str: str, file_type: str) -> tuple[PurePosixPath, dict[str, str]]: 61 + path = PurePosixPath(path_str) 62 + if not path_str or path.is_absolute(): 63 + raise ValueError("Invalid path") 64 + 65 + if any(part in {"", ".", ".."} for part in path.parts): 66 + raise ValueError("Invalid path") 67 + 68 + parts = path.parts 69 + 70 + if file_type == "facet_json": 71 + if parts != ("facet.json",): 72 + raise ValueError("facet_json path must be facet.json") 73 + return path, {} 74 + 75 + if file_type == "entity_relationship": 76 + if len(parts) != 3 or parts[0] != "entities" or parts[2] != "entity.json": 77 + raise ValueError("entity_relationship path must be entities/<id>/entity.json") 78 + return path, {"entity_id": parts[1]} 79 + 80 + if file_type == "entity_observations": 81 + if ( 82 + len(parts) != 3 83 + or parts[0] != "entities" 84 + or parts[2] != "observations.jsonl" 85 + ): 86 + raise ValueError( 87 + "entity_observations path must be entities/<id>/observations.jsonl" 88 + ) 89 + return path, {"entity_id": parts[1]} 90 + 91 + if file_type == "detected_entities": 92 + if len(parts) != 2 or parts[0] != "entities" or not _DAY_JSONL_RE.match(parts[1]): 93 + raise ValueError("detected_entities path must be entities/YYYYMMDD.jsonl") 94 + return path, {"day_file": parts[1]} 95 + 96 + if file_type == "activity_config": 97 + if parts != ("activities", "activities.jsonl"): 98 + raise ValueError("activity_config path must be activities/activities.jsonl") 99 + return path, {} 100 + 101 + if file_type == "activity_records": 102 + if len(parts) != 2 or parts[0] != "activities" or not _DAY_JSONL_RE.match(parts[1]): 103 + raise ValueError("activity_records path must be activities/YYYYMMDD.jsonl") 104 + return path, {"day_file": parts[1]} 105 + 106 + if file_type == "activity_output": 107 + if len(parts) < 4 or parts[0] != "activities" or not re.match(r"^\d{8}$", parts[1]): 108 + raise ValueError( 109 + "activity_output path must be activities/YYYYMMDD/<activity_id>/..." 110 + ) 111 + return path, {"output_dir": str(PurePosixPath(*parts[:3]))} 112 + 113 + if file_type == "todos": 114 + if len(parts) != 2 or parts[0] != "todos" or not _DAY_JSONL_RE.match(parts[1]): 115 + raise ValueError("todos path must be todos/YYYYMMDD.jsonl") 116 + return path, {"day_file": parts[1]} 117 + 118 + if file_type == "calendar": 119 + if len(parts) != 2 or parts[0] != "calendar" or not _DAY_JSONL_RE.match(parts[1]): 120 + raise ValueError("calendar path must be calendar/YYYYMMDD.jsonl") 121 + return path, {"day_file": parts[1]} 122 + 123 + if file_type == "news": 124 + if len(parts) != 2 or parts[0] != "news" or not _DAY_MD_RE.match(parts[1]): 125 + raise ValueError("news path must be news/YYYYMMDD.md") 126 + return path, {"news_file": parts[1]} 127 + 128 + if file_type == "logs": 129 + if len(parts) != 2 or parts[0] != "logs" or not _DAY_JSONL_RE.match(parts[1]): 130 + raise ValueError("logs path must be logs/YYYYMMDD.jsonl") 131 + return path, {"day_file": parts[1]} 132 + 133 + raise ValueError(f"Unsupported file type: {file_type}") 134 + 135 + 136 + def _decode_text(raw_bytes: bytes) -> str: 137 + return raw_bytes.decode("utf-8") 138 + 139 + 140 + def _parse_json_bytes(raw_bytes: bytes) -> Any: 141 + return json.loads(_decode_text(raw_bytes)) 142 + 143 + 144 + def _parse_jsonl_bytes(raw_bytes: bytes) -> list[dict[str, Any]]: 145 + items: list[dict[str, Any]] = [] 146 + for line_number, line in enumerate(_decode_text(raw_bytes).splitlines(), start=1): 147 + line = line.strip() 148 + if not line: 149 + continue 150 + try: 151 + value = json.loads(line) 152 + except json.JSONDecodeError as exc: 153 + raise ValueError(f"Invalid JSONL at line {line_number}: {exc.msg}") from exc 154 + if not isinstance(value, dict): 155 + raise ValueError(f"Invalid JSONL at line {line_number}: item must be an object") 156 + items.append(value) 157 + return items 158 + 159 + 160 + def _check_unmapped_entities( 161 + data: Any, 162 + id_map: dict[str, str], 163 + file_type: str, 164 + path_info: dict[str, str], 165 + ) -> list[str]: 166 + unmapped: list[str] = [] 167 + 168 + def add(entity_id: str) -> None: 169 + if entity_id and _remap_entity_id(entity_id, id_map) is None and entity_id not in unmapped: 170 + unmapped.append(entity_id) 171 + 172 + if file_type in {"entity_relationship", "entity_observations"}: 173 + add(path_info["entity_id"]) 174 + elif file_type == "detected_entities": 175 + for item in data: 176 + entity_id = item.get("id") 177 + if entity_id: 178 + add(str(entity_id)) 179 + elif file_type == "activity_records": 180 + for item in data: 181 + active_entities = item.get("active_entities") 182 + if not isinstance(active_entities, list): 183 + continue 184 + for entity_id in active_entities: 185 + if entity_id: 186 + add(str(entity_id)) 187 + 188 + return unmapped 189 + 190 + 191 + def _sanitize_stage_name(relative_path: str) -> str: 192 + return relative_path.replace("/", "__") + ".staged.json" 193 + 194 + 195 + def _stage_unmapped_entity( 196 + staged_dir: Path, 197 + facet_name: str, 198 + file_type: str, 199 + relative_path: str, 200 + entity_id: str, 201 + source_data: str, 202 + ) -> Path: 203 + target_path = staged_dir / facet_name / file_type / _sanitize_stage_name(relative_path) 204 + target_path.parent.mkdir(parents=True, exist_ok=True) 205 + payload = { 206 + "reason": "unmapped_entity", 207 + "source_entity_id": entity_id, 208 + "explanation": ( 209 + f"Entity '{entity_id}' has no mapping in entities/state.json id_map" 210 + ), 211 + "source_path": relative_path, 212 + "source_data": source_data, 213 + "staged_at": datetime.now(timezone.utc).isoformat(), 214 + } 215 + target_path.write_text( 216 + json.dumps(payload, indent=2, ensure_ascii=False) + "\n", 217 + encoding="utf-8", 218 + ) 219 + return target_path 220 + 221 + 222 + def _stage_facet_json_conflict( 223 + staged_dir: Path, 224 + facet_name: str, 225 + relative_path: str, 226 + source_content: Any, 227 + target_content: Any, 228 + ) -> Path: 229 + target_path = staged_dir / facet_name / "facet_json" / _sanitize_stage_name(relative_path) 230 + target_path.parent.mkdir(parents=True, exist_ok=True) 231 + payload = { 232 + "reason": "facet_json_conflict", 233 + "source_content": source_content, 234 + "target_content": target_content, 235 + "staged_at": datetime.now(timezone.utc).isoformat(), 236 + } 237 + target_path.write_text( 238 + json.dumps(payload, indent=2, ensure_ascii=False) + "\n", 239 + encoding="utf-8", 240 + ) 241 + return target_path 242 + 243 + 244 + def _write_bytes(path: Path, raw_bytes: bytes) -> None: 245 + path.parent.mkdir(parents=True, exist_ok=True) 246 + path.write_bytes(raw_bytes) 247 + 248 + 249 + def _merge_facet_json( 250 + target_path: Path, 251 + raw_bytes: bytes, 252 + *, 253 + new_facet: bool, 254 + staged_dir: Path, 255 + facet_name: str, 256 + relative_path: str, 257 + ) -> dict[str, Any]: 258 + source_content = _parse_json_bytes(raw_bytes) 259 + if not target_path.exists() or new_facet: 260 + _write_bytes(target_path, raw_bytes) 261 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 262 + 263 + target_content = json.loads(target_path.read_text(encoding="utf-8")) 264 + if target_content == source_content: 265 + return {"status": "skipped", "reason": "facet_json_match"} 266 + 267 + staged_path = _stage_facet_json_conflict( 268 + staged_dir, facet_name, relative_path, source_content, target_content 269 + ) 270 + return { 271 + "status": "staged", 272 + "reason": "facet_json_conflict", 273 + "staged_path": str(staged_path), 274 + } 275 + 276 + 277 + def _merge_entity_relationship( 278 + facet_name: str, 279 + entity_id: str, 280 + raw_bytes: bytes, 281 + *, 282 + new_facet: bool, 283 + ) -> dict[str, Any]: 284 + source_relationship = _parse_json_bytes(raw_bytes) 285 + if not isinstance(source_relationship, dict): 286 + raise ValueError("entity_relationship content must be a JSON object") 287 + source_relationship["entity_id"] = entity_id 288 + 289 + target_relationship = {} 290 + if not new_facet: 291 + loaded = load_facet_relationship(facet_name, entity_id) 292 + if loaded is not None: 293 + target_relationship = loaded 294 + 295 + merged_relationship = {**source_relationship, **target_relationship} 296 + save_facet_relationship(facet_name, entity_id, merged_relationship) 297 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 298 + 299 + 300 + def _merge_observations( 301 + facet_name: str, 302 + entity_id: str, 303 + raw_bytes: bytes, 304 + *, 305 + new_facet: bool, 306 + ) -> dict[str, Any]: 307 + source_observations = _parse_jsonl_bytes(raw_bytes) 308 + target_observations = [] if new_facet else load_observations(facet_name, entity_id) 309 + seen = { 310 + (item.get("content", ""), item.get("observed_at")) for item in target_observations 311 + } 312 + merged_observations = list(target_observations) 313 + for item in source_observations: 314 + key = (item.get("content", ""), item.get("observed_at")) 315 + if key in seen: 316 + continue 317 + seen.add(key) 318 + merged_observations.append(item) 319 + 320 + save_observations(facet_name, entity_id, merged_observations) 321 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 322 + 323 + 324 + def _merge_detected_entities( 325 + target_path: Path, 326 + raw_bytes: bytes, 327 + *, 328 + new_facet: bool, 329 + ) -> dict[str, Any]: 330 + source_items = _parse_jsonl_bytes(raw_bytes) 331 + target_items = [] if new_facet else _read_jsonl(target_path) 332 + seen_ids = {item.get("id") for item in target_items if item.get("id")} 333 + new_items = [] 334 + for item in source_items: 335 + item_id = item.get("id", "") 336 + if item_id in seen_ids: 337 + continue 338 + new_items.append(item) 339 + _append_jsonl(target_path, new_items) 340 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 341 + 342 + 343 + def _merge_activity_config( 344 + target_path: Path, 345 + raw_bytes: bytes, 346 + *, 347 + new_facet: bool, 348 + ) -> dict[str, Any]: 349 + source_items = _parse_jsonl_bytes(raw_bytes) 350 + target_items = [] if new_facet else _read_jsonl(target_path) 351 + existing_ids = {item.get("id") for item in target_items} 352 + new_items = [item for item in source_items if item.get("id") not in existing_ids] 353 + _append_jsonl(target_path, new_items) 354 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 355 + 356 + 357 + def _merge_activity_records( 358 + target_path: Path, 359 + raw_bytes: bytes, 360 + *, 361 + new_facet: bool, 362 + ) -> dict[str, Any]: 363 + source_items = _parse_jsonl_bytes(raw_bytes) 364 + target_items = [] if new_facet else _read_jsonl(target_path) 365 + existing_ids = {item.get("id") for item in target_items} 366 + new_items = [item for item in source_items if item.get("id") not in existing_ids] 367 + _append_jsonl(target_path, new_items) 368 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 369 + 370 + 371 + def _merge_activity_output( 372 + target_path: Path, 373 + raw_bytes: bytes, 374 + output_dir: Path, 375 + *, 376 + new_facet: bool, 377 + ) -> dict[str, Any]: 378 + if output_dir.exists(): 379 + return {"status": "skipped", "reason": "output_dir_exists"} 380 + _write_bytes(target_path, raw_bytes) 381 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 382 + 383 + 384 + def _merge_todos( 385 + target_path: Path, 386 + raw_bytes: bytes, 387 + *, 388 + new_facet: bool, 389 + ) -> dict[str, Any]: 390 + source_items = _parse_jsonl_bytes(raw_bytes) 391 + target_items = [] if new_facet else _read_jsonl(target_path) 392 + seen = {(item["text"], item.get("created_at")) for item in target_items} 393 + new_items = [ 394 + item 395 + for item in source_items 396 + if (item["text"], item.get("created_at")) not in seen 397 + ] 398 + _append_jsonl(target_path, new_items) 399 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 400 + 401 + 402 + def _merge_calendar( 403 + target_path: Path, 404 + raw_bytes: bytes, 405 + *, 406 + new_facet: bool, 407 + ) -> dict[str, Any]: 408 + source_items = _parse_jsonl_bytes(raw_bytes) 409 + target_items = [] if new_facet else _read_jsonl(target_path) 410 + seen = {(item["title"], item.get("start")) for item in target_items} 411 + new_items = [ 412 + item 413 + for item in source_items 414 + if (item["title"], item.get("start")) not in seen 415 + ] 416 + _append_jsonl(target_path, new_items) 417 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 418 + 419 + 420 + def _merge_news( 421 + target_path: Path, 422 + raw_bytes: bytes, 423 + *, 424 + new_facet: bool, 425 + ) -> dict[str, Any]: 426 + if target_path.exists(): 427 + return {"status": "skipped", "reason": "news_exists"} 428 + _write_bytes(target_path, raw_bytes) 429 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 430 + 431 + 432 + def _merge_logs( 433 + target_path: Path, 434 + raw_bytes: bytes, 435 + *, 436 + new_facet: bool, 437 + ) -> dict[str, Any]: 438 + source_items = _parse_jsonl_bytes(raw_bytes) 439 + _append_jsonl(target_path, source_items) 440 + return {"status": "written", "reason": "new_facet" if new_facet else "overlap_merged"} 441 + 442 + 443 + def _remap_entity_ids( 444 + data: Any, 445 + id_map: dict[str, str], 446 + file_type: str, 447 + path_info: dict[str, str], 448 + ) -> tuple[Any, dict[str, str]]: 449 + updated_path_info = dict(path_info) 450 + 451 + if file_type == "entity_relationship": 452 + entity_id = path_info["entity_id"] 453 + mapped_id = _remap_entity_id(entity_id, id_map) 454 + if mapped_id is None: 455 + raise ValueError(f"Unmapped entity id: {entity_id}") 456 + if not isinstance(data, dict): 457 + raise ValueError("entity_relationship content must be a JSON object") 458 + updated_path_info["entity_id"] = mapped_id 459 + remapped = dict(data) 460 + remapped["entity_id"] = mapped_id 461 + return remapped, updated_path_info 462 + 463 + if file_type == "entity_observations": 464 + entity_id = path_info["entity_id"] 465 + mapped_id = _remap_entity_id(entity_id, id_map) 466 + if mapped_id is None: 467 + raise ValueError(f"Unmapped entity id: {entity_id}") 468 + updated_path_info["entity_id"] = mapped_id 469 + return data, updated_path_info 470 + 471 + if file_type == "detected_entities": 472 + remapped_items = [] 473 + for item in data: 474 + updated = dict(item) 475 + entity_id = updated.get("id") 476 + if entity_id: 477 + mapped_id = _remap_entity_id(str(entity_id), id_map) 478 + if mapped_id is None: 479 + raise ValueError(f"Unmapped entity id: {entity_id}") 480 + updated["id"] = mapped_id 481 + remapped_items.append(updated) 482 + return remapped_items, updated_path_info 483 + 484 + if file_type == "activity_records": 485 + remapped_items = [] 486 + for item in data: 487 + updated = dict(item) 488 + active_entities = updated.get("active_entities") 489 + if isinstance(active_entities, list): 490 + remapped_entities = [] 491 + for entity_id in active_entities: 492 + mapped_id = _remap_entity_id(str(entity_id), id_map) 493 + if mapped_id is None: 494 + raise ValueError(f"Unmapped entity id: {entity_id}") 495 + remapped_entities.append(mapped_id) 496 + updated["active_entities"] = remapped_entities 497 + remapped_items.append(updated) 498 + return remapped_items, updated_path_info 499 + 500 + return data, updated_path_info 501 + 502 + 503 + def _serialize_jsonl(items: list[dict[str, Any]]) -> bytes: 504 + if not items: 505 + return b"" 506 + return "".join(json.dumps(item, ensure_ascii=False) + "\n" for item in items).encode( 507 + "utf-8" 508 + ) 509 + 510 + 511 + def process_facet( 512 + facet_name: str, 513 + files: list[dict], 514 + file_data: list[bytes], 515 + journal_root: Path, 516 + id_map: dict[str, str], 517 + log_path: Path, 518 + staged_dir: Path, 519 + received: dict[str, str], 520 + ) -> dict: 521 + if not _FACET_NAME_RE.match(facet_name): 522 + raise ValueError("Invalid facet name") 523 + 524 + facet_dir = journal_root / "facets" / facet_name 525 + new_facet = not facet_dir.exists() 526 + 527 + result = { 528 + "created": 0, 529 + "merged": 0, 530 + "skipped": 0, 531 + "staged": 0, 532 + "errors": [], 533 + "wrote_files": False, 534 + } 535 + 536 + for metadata, raw_bytes in zip(files, file_data, strict=True): 537 + raw_path = str(metadata.get("path", "")).strip() 538 + file_type = str(metadata.get("type", "")).strip() 539 + 540 + try: 541 + normalized_path, path_info = _parse_path(raw_path, file_type) 542 + relative_path = normalized_path.as_posix() 543 + item_id = f"{facet_name}/{relative_path}" 544 + content_hash = hashlib.sha256(raw_bytes).hexdigest() 545 + 546 + if received.get(item_id) == content_hash: 547 + result["skipped"] += 1 548 + _append_decision( 549 + log_path, 550 + { 551 + "ts": datetime.now(timezone.utc).isoformat(), 552 + "action": "facet_file_skipped", 553 + "item_type": file_type, 554 + "item_id": item_id, 555 + "facet": facet_name, 556 + "reason": "idempotent", 557 + }, 558 + ) 559 + continue 560 + 561 + parsed_data: Any = raw_bytes 562 + if file_type == "facet_json": 563 + parsed_data = _parse_json_bytes(raw_bytes) 564 + elif file_type in { 565 + "entity_observations", 566 + "detected_entities", 567 + "activity_config", 568 + "activity_records", 569 + "todos", 570 + "calendar", 571 + "logs", 572 + }: 573 + parsed_data = _parse_jsonl_bytes(raw_bytes) 574 + elif file_type == "entity_relationship": 575 + parsed_data = _parse_json_bytes(raw_bytes) 576 + 577 + if file_type in _ENTITY_FILE_TYPES: 578 + unmapped = _check_unmapped_entities(parsed_data, id_map, file_type, path_info) 579 + if unmapped: 580 + staged_path = _stage_unmapped_entity( 581 + staged_dir, 582 + facet_name, 583 + file_type, 584 + relative_path, 585 + unmapped[0], 586 + _decode_text(raw_bytes), 587 + ) 588 + result["staged"] += 1 589 + _append_decision( 590 + log_path, 591 + { 592 + "ts": datetime.now(timezone.utc).isoformat(), 593 + "action": "facet_file_staged", 594 + "item_type": file_type, 595 + "item_id": item_id, 596 + "facet": facet_name, 597 + "reason": "unmapped_entity", 598 + "staged_path": str(staged_path), 599 + }, 600 + ) 601 + continue 602 + 603 + parsed_data, path_info = _remap_entity_ids( 604 + parsed_data, id_map, file_type, path_info 605 + ) 606 + if file_type == "entity_relationship": 607 + raw_bytes = ( 608 + json.dumps(parsed_data, ensure_ascii=False, indent=2) + "\n" 609 + ).encode("utf-8") 610 + elif file_type in {"detected_entities", "activity_records"}: 611 + raw_bytes = _serialize_jsonl(parsed_data) 612 + 613 + target_path = facet_dir / normalized_path 614 + 615 + if file_type == "facet_json": 616 + merge_result = _merge_facet_json( 617 + target_path, 618 + raw_bytes, 619 + new_facet=new_facet, 620 + staged_dir=staged_dir, 621 + facet_name=facet_name, 622 + relative_path=relative_path, 623 + ) 624 + elif file_type == "entity_relationship": 625 + merge_result = _merge_entity_relationship( 626 + facet_name, 627 + path_info["entity_id"], 628 + raw_bytes, 629 + new_facet=new_facet, 630 + ) 631 + elif file_type == "entity_observations": 632 + merge_result = _merge_observations( 633 + facet_name, 634 + path_info["entity_id"], 635 + raw_bytes, 636 + new_facet=new_facet, 637 + ) 638 + elif file_type == "detected_entities": 639 + merge_result = _merge_detected_entities( 640 + target_path, raw_bytes, new_facet=new_facet 641 + ) 642 + elif file_type == "activity_config": 643 + merge_result = _merge_activity_config( 644 + target_path, raw_bytes, new_facet=new_facet 645 + ) 646 + elif file_type == "activity_records": 647 + merge_result = _merge_activity_records( 648 + target_path, raw_bytes, new_facet=new_facet 649 + ) 650 + elif file_type == "activity_output": 651 + output_dir = facet_dir / PurePosixPath(path_info["output_dir"]) 652 + merge_result = _merge_activity_output( 653 + target_path, 654 + raw_bytes, 655 + output_dir, 656 + new_facet=new_facet, 657 + ) 658 + elif file_type == "todos": 659 + merge_result = _merge_todos(target_path, raw_bytes, new_facet=new_facet) 660 + elif file_type == "calendar": 661 + merge_result = _merge_calendar(target_path, raw_bytes, new_facet=new_facet) 662 + elif file_type == "news": 663 + merge_result = _merge_news(target_path, raw_bytes, new_facet=new_facet) 664 + elif file_type == "logs": 665 + merge_result = _merge_logs(target_path, raw_bytes, new_facet=new_facet) 666 + else: 667 + raise ValueError(f"Unsupported file type: {file_type}") 668 + 669 + status = merge_result["status"] 670 + if status == "written": 671 + received[item_id] = content_hash 672 + bucket = "created" if new_facet else "merged" 673 + result[bucket] += 1 674 + result["wrote_files"] = True 675 + action = "facet_file_created" if new_facet else "facet_file_merged" 676 + elif status == "staged": 677 + result["staged"] += 1 678 + action = "facet_file_staged" 679 + else: 680 + received[item_id] = content_hash 681 + result["skipped"] += 1 682 + action = "facet_file_skipped" 683 + 684 + entry = { 685 + "ts": datetime.now(timezone.utc).isoformat(), 686 + "action": action, 687 + "item_type": file_type, 688 + "item_id": item_id, 689 + "facet": facet_name, 690 + "reason": merge_result["reason"], 691 + } 692 + if "staged_path" in merge_result: 693 + entry["staged_path"] = merge_result["staged_path"] 694 + _append_decision(log_path, entry) 695 + except Exception as exc: 696 + _append_decision( 697 + log_path, 698 + { 699 + "ts": datetime.now(timezone.utc).isoformat(), 700 + "action": "facet_file_error", 701 + "item_type": file_type, 702 + "item_id": f"{facet_name}/{raw_path}", 703 + "facet": facet_name, 704 + "reason": str(exc), 705 + }, 706 + ) 707 + result["errors"].append( 708 + { 709 + "facet": facet_name, 710 + "path": raw_path, 711 + "error": str(exc), 712 + } 713 + ) 714 + 715 + return result 716 + 717 + 718 + __all__ = ["process_facet"]
+141
apps/import/ingest.py
··· 42 42 _DAY_RE = re.compile(r"^\d{8}$") 43 43 _SEGMENT_RE = re.compile(r"^\d{6}_\d+$") 44 44 _STREAM_RE = re.compile(r"^[a-z0-9][a-z0-9._-]*$") 45 + _FACET_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]*$") 45 46 46 47 47 48 def _append_decision(log_path: Path, entry: dict) -> None: ··· 60 61 except Exception: 61 62 Path(tmp_path).unlink(missing_ok=True) 62 63 raise 64 + 65 + 66 + from .facet_ingest import process_facet 63 67 64 68 65 69 def register_ingest_routes(bp) -> None: ··· 560 564 "errors": errors, 561 565 } 562 566 ) 567 + 568 + @bp.route("/journal/<key_prefix>/ingest/facets", methods=["POST"]) 569 + @require_journal_source 570 + def ingest_facets(key_prefix: str): 571 + if g.journal_source["key"][:8] != key_prefix: 572 + abort(403, description="Key prefix mismatch") 573 + 574 + metadata_raw = request.form.get("metadata") 575 + if not metadata_raw: 576 + return jsonify({"error": "Missing metadata"}), 400 577 + 578 + try: 579 + metadata = json.loads(metadata_raw) 580 + except json.JSONDecodeError: 581 + return jsonify({"error": "Invalid metadata JSON"}), 400 582 + 583 + if not isinstance(metadata, dict): 584 + return jsonify({"error": "Invalid metadata JSON"}), 400 585 + 586 + facets = metadata.get("facets") 587 + if not isinstance(facets, list): 588 + return jsonify({"error": "Missing facets array"}), 400 589 + 590 + state_dir = get_state_directory(key_prefix) 591 + entities_state_path = state_dir / "entities" / "state.json" 592 + facets_state_path = state_dir / "facets" / "state.json" 593 + log_path = state_dir / "facets" / "log.jsonl" 594 + staged_dir = state_dir / "facets" / "staged" 595 + 596 + try: 597 + entities_state = json.loads(entities_state_path.read_text(encoding="utf-8")) 598 + except (OSError, json.JSONDecodeError): 599 + entities_state = {} 600 + if not isinstance(entities_state, dict): 601 + entities_state = {} 602 + id_map = entities_state.get("id_map") 603 + if not isinstance(id_map, dict): 604 + id_map = {} 605 + 606 + try: 607 + facets_state = json.loads(facets_state_path.read_text(encoding="utf-8")) 608 + except (OSError, json.JSONDecodeError): 609 + facets_state = {} 610 + if not isinstance(facets_state, dict): 611 + facets_state = {} 612 + received = facets_state.get("received") 613 + if not isinstance(received, dict): 614 + received = {} 615 + facets_state = {"received": dict(received)} 616 + 617 + created = 0 618 + merged = 0 619 + skipped = 0 620 + staged = 0 621 + errors: list[dict[str, str]] = [] 622 + written_facets: set[str] = set() 623 + 624 + for facet_idx, facet in enumerate(facets): 625 + if not isinstance(facet, dict): 626 + return jsonify({"error": "Facet metadata must be an object"}), 400 627 + 628 + facet_name = str(facet.get("name", "")).strip() 629 + files = facet.get("files") 630 + if not facet_name: 631 + return jsonify({"error": "Facet name is required"}), 400 632 + if not _FACET_NAME_RE.match(facet_name): 633 + return jsonify({"error": "Invalid facet name"}), 400 634 + if not isinstance(files, list): 635 + return jsonify({"error": "Facet files must be an array"}), 400 636 + 637 + file_bytes: list[bytes] = [] 638 + normalized_files: list[dict[str, str]] = [] 639 + for file_idx, file_meta in enumerate(files): 640 + if not isinstance(file_meta, dict): 641 + return jsonify({"error": "Facet file metadata must be an object"}), 400 642 + 643 + path_value = file_meta.get("path") 644 + type_value = file_meta.get("type") 645 + if not isinstance(path_value, str) or not isinstance(type_value, str): 646 + return ( 647 + jsonify({"error": "Facet file metadata must include path and type"}), 648 + 400, 649 + ) 650 + 651 + upload = request.files.get(f"files_{facet_idx}_{file_idx}") 652 + if upload is None: 653 + return ( 654 + jsonify( 655 + { 656 + "error": ( 657 + f"Missing uploaded file for facet {facet_idx} file {file_idx}" 658 + ) 659 + } 660 + ), 661 + 400, 662 + ) 663 + 664 + file_bytes.append(upload.read()) 665 + normalized_files.append({"path": path_value, "type": type_value}) 666 + 667 + facet_result = process_facet( 668 + facet_name=facet_name, 669 + files=normalized_files, 670 + file_data=file_bytes, 671 + journal_root=Path(state.journal_root), 672 + id_map=id_map, 673 + log_path=log_path, 674 + staged_dir=staged_dir, 675 + received=facets_state["received"], 676 + ) 677 + created += facet_result["created"] 678 + merged += facet_result["merged"] 679 + skipped += facet_result["skipped"] 680 + staged += facet_result["staged"] 681 + errors.extend(facet_result["errors"]) 682 + if facet_result["wrote_files"]: 683 + written_facets.add(facet_name) 684 + 685 + _write_state_atomic(facets_state_path, facets_state) 686 + 687 + if written_facets: 688 + source = g.journal_source 689 + source.setdefault("stats", {}) 690 + source["stats"]["facets_received"] = ( 691 + source["stats"].get("facets_received", 0) + len(written_facets) 692 + ) 693 + save_journal_source(source) 694 + 695 + return jsonify( 696 + { 697 + "created": created, 698 + "merged": merged, 699 + "skipped": skipped, 700 + "staged": staged, 701 + "errors": errors, 702 + } 703 + )
+1124
tests/test_facet_ingest.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import hashlib 7 + import json 8 + from importlib import import_module 9 + from io import BytesIO 10 + from pathlib import Path 11 + 12 + import pytest 13 + from flask import Blueprint, Flask 14 + 15 + import convey.state 16 + 17 + journal_sources = import_module("apps.import.journal_sources") 18 + ingest = import_module("apps.import.ingest") 19 + 20 + create_state_directory = journal_sources.create_state_directory 21 + generate_key = journal_sources.generate_key 22 + get_state_directory = journal_sources.get_state_directory 23 + load_journal_source = journal_sources.load_journal_source 24 + save_journal_source = journal_sources.save_journal_source 25 + register_ingest_routes = ingest.register_ingest_routes 26 + 27 + 28 + @pytest.fixture 29 + def journal_env(tmp_path, monkeypatch): 30 + monkeypatch.setattr(convey.state, "journal_root", str(tmp_path), raising=False) 31 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 32 + (tmp_path / "apps" / "import" / "journal_sources").mkdir( 33 + parents=True, exist_ok=True 34 + ) 35 + return tmp_path 36 + 37 + 38 + def _source(name="test-source", key=None, **overrides): 39 + if key is None: 40 + key = generate_key() 41 + source = { 42 + "name": name, 43 + "key": key, 44 + "created_at": 1000, 45 + "enabled": True, 46 + "revoked": False, 47 + "revoked_at": None, 48 + "stats": { 49 + "segments_received": 0, 50 + "entities_received": 0, 51 + "facets_received": 0, 52 + "imports_received": 0, 53 + "config_received": 0, 54 + }, 55 + } 56 + source.update(overrides) 57 + return source 58 + 59 + 60 + @pytest.fixture 61 + def ingest_env(journal_env): 62 + key = generate_key() 63 + source = _source(key=key) 64 + save_journal_source(source) 65 + key_prefix = key[:8] 66 + create_state_directory(journal_env, key_prefix) 67 + 68 + entity_state = { 69 + "id_map": { 70 + "source_entity": "target_entity", 71 + "same_entity": "same_entity", 72 + }, 73 + "received": {}, 74 + } 75 + ( 76 + get_state_directory(key_prefix) / "entities" / "state.json" 77 + ).write_text(json.dumps(entity_state, indent=2), encoding="utf-8") 78 + 79 + app = Flask(__name__) 80 + app.config["TESTING"] = True 81 + bp = Blueprint("import-test", __name__, url_prefix="/app/import") 82 + register_ingest_routes(bp) 83 + app.register_blueprint(bp) 84 + 85 + return { 86 + "root": journal_env, 87 + "key": key, 88 + "key_prefix": key_prefix, 89 + "source": source, 90 + "client": app.test_client(), 91 + } 92 + 93 + 94 + def _post_facets(client, key, key_prefix, facets_metadata, file_map): 95 + data = {"metadata": json.dumps({"facets": facets_metadata})} 96 + data.update(file_map) 97 + return client.post( 98 + f"/app/import/journal/{key_prefix}/ingest/facets", 99 + headers={"Authorization": f"Bearer {key}"}, 100 + data=data, 101 + content_type="multipart/form-data", 102 + ) 103 + 104 + 105 + def _build_request(facets): 106 + facets_metadata = [] 107 + file_map = {} 108 + for facet_idx, facet in enumerate(facets): 109 + facet_meta = {"name": facet["name"], "files": []} 110 + for file_idx, file_info in enumerate(facet["files"]): 111 + facet_meta["files"].append( 112 + {"path": file_info["path"], "type": file_info["type"]} 113 + ) 114 + filename = Path(file_info["path"]).name 115 + file_map[f"files_{facet_idx}_{file_idx}"] = ( 116 + BytesIO(file_info["content"]), 117 + filename, 118 + ) 119 + facets_metadata.append(facet_meta) 120 + return facets_metadata, file_map 121 + 122 + 123 + def _read_state(key_prefix: str) -> dict: 124 + state_path = get_state_directory(key_prefix) / "facets" / "state.json" 125 + return json.loads(state_path.read_text(encoding="utf-8")) 126 + 127 + 128 + def _read_log(key_prefix: str) -> list[dict]: 129 + log_path = get_state_directory(key_prefix) / "facets" / "log.jsonl" 130 + if not log_path.exists(): 131 + return [] 132 + return [ 133 + json.loads(line) 134 + for line in log_path.read_text(encoding="utf-8").splitlines() 135 + if line.strip() 136 + ] 137 + 138 + 139 + def _read_staged(key_prefix: str, facet: str, file_type: str, relative_path: str) -> dict: 140 + staged_name = relative_path.replace("/", "__") + ".staged.json" 141 + staged_path = ( 142 + get_state_directory(key_prefix) 143 + / "facets" 144 + / "staged" 145 + / facet 146 + / file_type 147 + / staged_name 148 + ) 149 + return json.loads(staged_path.read_text(encoding="utf-8")) 150 + 151 + 152 + def _json_bytes(data: dict) -> bytes: 153 + return (json.dumps(data, ensure_ascii=False, indent=2) + "\n").encode("utf-8") 154 + 155 + 156 + def _jsonl_bytes(items: list[dict]) -> bytes: 157 + return "".join(json.dumps(item, ensure_ascii=False) + "\n" for item in items).encode( 158 + "utf-8" 159 + ) 160 + 161 + 162 + def _read_json(path: Path) -> dict: 163 + return json.loads(path.read_text(encoding="utf-8")) 164 + 165 + 166 + def _read_jsonl_file(path: Path) -> list[dict]: 167 + if not path.exists(): 168 + return [] 169 + return [ 170 + json.loads(line) 171 + for line in path.read_text(encoding="utf-8").splitlines() 172 + if line.strip() 173 + ] 174 + 175 + 176 + def _hash_bytes(content: bytes) -> str: 177 + return hashlib.sha256(content).hexdigest() 178 + 179 + 180 + def _write_json(path: Path, data: dict) -> None: 181 + path.parent.mkdir(parents=True, exist_ok=True) 182 + path.write_text( 183 + json.dumps(data, ensure_ascii=False, indent=2) + "\n", 184 + encoding="utf-8", 185 + ) 186 + 187 + 188 + def _write_jsonl(path: Path, items: list[dict]) -> None: 189 + path.parent.mkdir(parents=True, exist_ok=True) 190 + path.write_text(_jsonl_bytes(items).decode("utf-8"), encoding="utf-8") 191 + 192 + 193 + def test_auth_missing(ingest_env): 194 + env = ingest_env 195 + response = env["client"].post( 196 + f"/app/import/journal/{env['key_prefix']}/ingest/facets", 197 + data={"metadata": json.dumps({"facets": []})}, 198 + content_type="multipart/form-data", 199 + ) 200 + assert response.status_code == 401 201 + 202 + 203 + def test_auth_invalid(ingest_env): 204 + env = ingest_env 205 + response = env["client"].post( 206 + f"/app/import/journal/{env['key_prefix']}/ingest/facets", 207 + headers={"Authorization": "Bearer wrong-token"}, 208 + data={"metadata": json.dumps({"facets": []})}, 209 + content_type="multipart/form-data", 210 + ) 211 + assert response.status_code == 401 212 + 213 + 214 + def test_auth_revoked(ingest_env): 215 + env = ingest_env 216 + env["source"]["revoked"] = True 217 + env["source"]["revoked_at"] = 12345 218 + save_journal_source(env["source"]) 219 + 220 + response = env["client"].post( 221 + f"/app/import/journal/{env['key_prefix']}/ingest/facets", 222 + headers={"Authorization": f"Bearer {env['key']}"}, 223 + data={"metadata": json.dumps({"facets": []})}, 224 + content_type="multipart/form-data", 225 + ) 226 + assert response.status_code == 403 227 + 228 + 229 + def test_key_prefix_mismatch(ingest_env): 230 + env = ingest_env 231 + response = env["client"].post( 232 + "/app/import/journal/deadbeef/ingest/facets", 233 + headers={"Authorization": f"Bearer {env['key']}"}, 234 + data={"metadata": json.dumps({"facets": []})}, 235 + content_type="multipart/form-data", 236 + ) 237 + assert response.status_code == 403 238 + 239 + 240 + def test_missing_metadata(ingest_env): 241 + env = ingest_env 242 + response = env["client"].post( 243 + f"/app/import/journal/{env['key_prefix']}/ingest/facets", 244 + headers={"Authorization": f"Bearer {env['key']}"}, 245 + content_type="multipart/form-data", 246 + ) 247 + assert response.status_code == 400 248 + assert response.get_json() == {"error": "Missing metadata"} 249 + 250 + 251 + def test_invalid_metadata_json(ingest_env): 252 + env = ingest_env 253 + response = env["client"].post( 254 + f"/app/import/journal/{env['key_prefix']}/ingest/facets", 255 + headers={"Authorization": f"Bearer {env['key']}"}, 256 + data={"metadata": "not-json"}, 257 + content_type="multipart/form-data", 258 + ) 259 + assert response.status_code == 400 260 + assert response.get_json() == {"error": "Invalid metadata JSON"} 261 + 262 + 263 + def test_unsafe_facet_name(ingest_env): 264 + env = ingest_env 265 + for bad_name in ["../etc", "foo/bar", ".", "..", "FOO"]: 266 + facets = [ 267 + { 268 + "name": bad_name, 269 + "files": [ 270 + { 271 + "path": "news/20260305.md", 272 + "type": "news", 273 + "content": b"x", 274 + } 275 + ], 276 + } 277 + ] 278 + metadata, file_map = _build_request(facets) 279 + response = _post_facets( 280 + env["client"], env["key"], env["key_prefix"], metadata, file_map 281 + ) 282 + assert response.status_code == 400, f"Expected 400 for facet name: {bad_name}" 283 + assert response.get_json() == {"error": "Invalid facet name"} 284 + 285 + 286 + def test_new_facet_all_types(ingest_env): 287 + env = ingest_env 288 + facets = [ 289 + { 290 + "name": "personal", 291 + "files": [ 292 + {"path": "facet.json", "type": "facet_json", "content": _json_bytes({"title": "Personal"})}, 293 + { 294 + "path": "entities/same_entity/entity.json", 295 + "type": "entity_relationship", 296 + "content": _json_bytes({"description": "Close contact", "attached_at": 100}), 297 + }, 298 + { 299 + "path": "entities/same_entity/observations.jsonl", 300 + "type": "entity_observations", 301 + "content": _jsonl_bytes( 302 + [{"content": "Likes tea", "observed_at": 1}] 303 + ), 304 + }, 305 + { 306 + "path": "entities/20260305.jsonl", 307 + "type": "detected_entities", 308 + "content": _jsonl_bytes( 309 + [{"id": "same_entity", "name": "Same Entity", "type": "Person"}] 310 + ), 311 + }, 312 + { 313 + "path": "activities/activities.jsonl", 314 + "type": "activity_config", 315 + "content": _jsonl_bytes( 316 + [{"id": "coding", "name": "Coding", "priority": "high"}] 317 + ), 318 + }, 319 + { 320 + "path": "activities/20260305.jsonl", 321 + "type": "activity_records", 322 + "content": _jsonl_bytes( 323 + [ 324 + { 325 + "id": "coding_093000_300", 326 + "activity": "coding", 327 + "active_entities": ["same_entity"], 328 + } 329 + ] 330 + ), 331 + }, 332 + { 333 + "path": "activities/20260305/coding_093000_300/session_review.md", 334 + "type": "activity_output", 335 + "content": b"# Session\n", 336 + }, 337 + { 338 + "path": "todos/20260305.jsonl", 339 + "type": "todos", 340 + "content": _jsonl_bytes([{"text": "Ship it", "created_at": 10}]), 341 + }, 342 + { 343 + "path": "calendar/20260305.jsonl", 344 + "type": "calendar", 345 + "content": _jsonl_bytes([{"title": "Standup", "start": "09:00"}]), 346 + }, 347 + { 348 + "path": "news/20260305.md", 349 + "type": "news", 350 + "content": b"# News\n", 351 + }, 352 + { 353 + "path": "logs/20260305.jsonl", 354 + "type": "logs", 355 + "content": _jsonl_bytes([{"event": "ingested"}]), 356 + }, 357 + ], 358 + } 359 + ] 360 + metadata, file_map = _build_request(facets) 361 + 362 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 363 + 364 + assert response.status_code == 200 365 + assert response.get_json() == { 366 + "created": 11, 367 + "merged": 0, 368 + "skipped": 0, 369 + "staged": 0, 370 + "errors": [], 371 + } 372 + 373 + facet_root = env["root"] / "facets" / "personal" 374 + assert _read_json(facet_root / "facet.json") == {"title": "Personal"} 375 + assert _read_json(facet_root / "entities" / "same_entity" / "entity.json") == { 376 + "description": "Close contact", 377 + "attached_at": 100, 378 + "entity_id": "same_entity", 379 + } 380 + assert _read_jsonl_file( 381 + facet_root / "entities" / "same_entity" / "observations.jsonl" 382 + ) == [{"content": "Likes tea", "observed_at": 1}] 383 + assert _read_jsonl_file(facet_root / "entities" / "20260305.jsonl")[0]["id"] == "same_entity" 384 + assert _read_jsonl_file(facet_root / "activities" / "activities.jsonl")[0]["id"] == "coding" 385 + assert _read_jsonl_file(facet_root / "activities" / "20260305.jsonl")[0]["id"] == "coding_093000_300" 386 + assert ( 387 + facet_root / "activities" / "20260305" / "coding_093000_300" / "session_review.md" 388 + ).read_text(encoding="utf-8") == "# Session\n" 389 + assert _read_jsonl_file(facet_root / "todos" / "20260305.jsonl")[0]["text"] == "Ship it" 390 + assert _read_jsonl_file(facet_root / "calendar" / "20260305.jsonl")[0]["title"] == "Standup" 391 + assert (facet_root / "news" / "20260305.md").read_text(encoding="utf-8") == "# News\n" 392 + assert _read_jsonl_file(facet_root / "logs" / "20260305.jsonl")[0]["event"] == "ingested" 393 + 394 + source = load_journal_source(env["key"]) 395 + assert source["stats"]["facets_received"] == 1 396 + 397 + 398 + def test_existing_facet_merge_entity_relationship(ingest_env): 399 + env = ingest_env 400 + target_path = env["root"] / "facets" / "work" / "entities" / "same_entity" / "entity.json" 401 + _write_json( 402 + target_path, 403 + {"entity_id": "same_entity", "description": "Keep target", "attached_at": 200}, 404 + ) 405 + 406 + facets = [ 407 + { 408 + "name": "work", 409 + "files": [ 410 + { 411 + "path": "entities/same_entity/entity.json", 412 + "type": "entity_relationship", 413 + "content": _json_bytes({"description": "Source desc", "last_seen": 999}), 414 + } 415 + ], 416 + } 417 + ] 418 + metadata, file_map = _build_request(facets) 419 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 420 + 421 + assert response.status_code == 200 422 + assert response.get_json()["merged"] == 1 423 + assert _read_json(target_path) == { 424 + "description": "Keep target", 425 + "last_seen": 999, 426 + "attached_at": 200, 427 + "entity_id": "same_entity", 428 + } 429 + 430 + 431 + def test_existing_facet_merge_observations(ingest_env): 432 + env = ingest_env 433 + target_path = ( 434 + env["root"] / "facets" / "work" / "entities" / "same_entity" / "observations.jsonl" 435 + ) 436 + _write_jsonl( 437 + target_path, 438 + [ 439 + {"content": "Likes tea", "observed_at": 1}, 440 + {"content": "Prefers email", "observed_at": 2}, 441 + ], 442 + ) 443 + 444 + facets = [ 445 + { 446 + "name": "work", 447 + "files": [ 448 + { 449 + "path": "entities/same_entity/observations.jsonl", 450 + "type": "entity_observations", 451 + "content": _jsonl_bytes( 452 + [ 453 + {"content": "Likes tea", "observed_at": 1}, 454 + {"content": "Uses vim", "observed_at": 3}, 455 + ] 456 + ), 457 + } 458 + ], 459 + } 460 + ] 461 + metadata, file_map = _build_request(facets) 462 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 463 + 464 + assert response.status_code == 200 465 + assert response.get_json()["merged"] == 1 466 + assert _read_jsonl_file(target_path) == [ 467 + {"content": "Likes tea", "observed_at": 1}, 468 + {"content": "Prefers email", "observed_at": 2}, 469 + {"content": "Uses vim", "observed_at": 3}, 470 + ] 471 + 472 + 473 + def test_existing_facet_merge_detected_entities(ingest_env): 474 + env = ingest_env 475 + target_path = env["root"] / "facets" / "work" / "entities" / "20260305.jsonl" 476 + _write_jsonl(target_path, [{"id": "same_entity", "name": "Same", "type": "Person"}]) 477 + 478 + facets = [ 479 + { 480 + "name": "work", 481 + "files": [ 482 + { 483 + "path": "entities/20260305.jsonl", 484 + "type": "detected_entities", 485 + "content": _jsonl_bytes( 486 + [ 487 + {"id": "same_entity", "name": "Same", "type": "Person"}, 488 + {"id": "source_entity", "name": "Source", "type": "Person"}, 489 + ] 490 + ), 491 + } 492 + ], 493 + } 494 + ] 495 + metadata, file_map = _build_request(facets) 496 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 497 + 498 + assert response.status_code == 200 499 + assert response.get_json()["merged"] == 1 500 + items = _read_jsonl_file(target_path) 501 + assert [item["id"] for item in items] == ["same_entity", "target_entity"] 502 + 503 + 504 + def test_existing_facet_merge_activity_config(ingest_env): 505 + env = ingest_env 506 + target_path = env["root"] / "facets" / "work" / "activities" / "activities.jsonl" 507 + _write_jsonl(target_path, [{"id": "coding", "name": "Coding"}]) 508 + 509 + facets = [ 510 + { 511 + "name": "work", 512 + "files": [ 513 + { 514 + "path": "activities/activities.jsonl", 515 + "type": "activity_config", 516 + "content": _jsonl_bytes( 517 + [ 518 + {"id": "coding", "name": "Coding"}, 519 + {"id": "meeting", "name": "Meeting"}, 520 + ] 521 + ), 522 + } 523 + ], 524 + } 525 + ] 526 + metadata, file_map = _build_request(facets) 527 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 528 + 529 + assert response.status_code == 200 530 + assert response.get_json()["merged"] == 1 531 + assert [item["id"] for item in _read_jsonl_file(target_path)] == ["coding", "meeting"] 532 + 533 + 534 + def test_existing_facet_merge_activity_records(ingest_env): 535 + env = ingest_env 536 + target_path = env["root"] / "facets" / "work" / "activities" / "20260305.jsonl" 537 + _write_jsonl( 538 + target_path, 539 + [{"id": "coding_1", "activity": "coding", "active_entities": ["same_entity"]}], 540 + ) 541 + 542 + facets = [ 543 + { 544 + "name": "work", 545 + "files": [ 546 + { 547 + "path": "activities/20260305.jsonl", 548 + "type": "activity_records", 549 + "content": _jsonl_bytes( 550 + [ 551 + {"id": "coding_1", "activity": "coding", "active_entities": ["same_entity"]}, 552 + {"id": "coding_2", "activity": "coding", "active_entities": ["source_entity"]}, 553 + ] 554 + ), 555 + } 556 + ], 557 + } 558 + ] 559 + metadata, file_map = _build_request(facets) 560 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 561 + 562 + assert response.status_code == 200 563 + assert response.get_json()["merged"] == 1 564 + items = _read_jsonl_file(target_path) 565 + assert [item["id"] for item in items] == ["coding_1", "coding_2"] 566 + assert items[1]["active_entities"] == ["target_entity"] 567 + 568 + 569 + def test_existing_facet_merge_activity_output_skip(ingest_env): 570 + env = ingest_env 571 + target_file = ( 572 + env["root"] 573 + / "facets" 574 + / "work" 575 + / "activities" 576 + / "20260305" 577 + / "coding_093000_300" 578 + / "session_review.md" 579 + ) 580 + target_file.parent.mkdir(parents=True, exist_ok=True) 581 + target_file.write_text("existing\n", encoding="utf-8") 582 + 583 + facets = [ 584 + { 585 + "name": "work", 586 + "files": [ 587 + { 588 + "path": "activities/20260305/coding_093000_300/session_review.md", 589 + "type": "activity_output", 590 + "content": b"new\n", 591 + } 592 + ], 593 + } 594 + ] 595 + metadata, file_map = _build_request(facets) 596 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 597 + 598 + assert response.status_code == 200 599 + assert response.get_json()["skipped"] == 1 600 + assert target_file.read_text(encoding="utf-8") == "existing\n" 601 + 602 + 603 + def test_existing_facet_merge_activity_output_copy(ingest_env): 604 + env = ingest_env 605 + (env["root"] / "facets" / "work").mkdir(parents=True, exist_ok=True) 606 + 607 + facets = [ 608 + { 609 + "name": "work", 610 + "files": [ 611 + { 612 + "path": "activities/20260305/coding_093000_300/session_review.md", 613 + "type": "activity_output", 614 + "content": b"copied\n", 615 + } 616 + ], 617 + } 618 + ] 619 + metadata, file_map = _build_request(facets) 620 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 621 + 622 + target_file = ( 623 + env["root"] 624 + / "facets" 625 + / "work" 626 + / "activities" 627 + / "20260305" 628 + / "coding_093000_300" 629 + / "session_review.md" 630 + ) 631 + assert response.status_code == 200 632 + assert response.get_json()["merged"] == 1 633 + assert target_file.read_text(encoding="utf-8") == "copied\n" 634 + 635 + 636 + def test_existing_facet_merge_todos(ingest_env): 637 + env = ingest_env 638 + target_path = env["root"] / "facets" / "work" / "todos" / "20260305.jsonl" 639 + _write_jsonl(target_path, [{"text": "Ship it", "created_at": 1}]) 640 + 641 + facets = [ 642 + { 643 + "name": "work", 644 + "files": [ 645 + { 646 + "path": "todos/20260305.jsonl", 647 + "type": "todos", 648 + "content": _jsonl_bytes( 649 + [ 650 + {"text": "Ship it", "created_at": 1}, 651 + {"text": "Review PR", "created_at": 2}, 652 + ] 653 + ), 654 + } 655 + ], 656 + } 657 + ] 658 + metadata, file_map = _build_request(facets) 659 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 660 + 661 + assert response.status_code == 200 662 + assert response.get_json()["merged"] == 1 663 + assert _read_jsonl_file(target_path) == [ 664 + {"text": "Ship it", "created_at": 1}, 665 + {"text": "Review PR", "created_at": 2}, 666 + ] 667 + 668 + 669 + def test_existing_facet_merge_calendar(ingest_env): 670 + env = ingest_env 671 + target_path = env["root"] / "facets" / "work" / "calendar" / "20260305.jsonl" 672 + _write_jsonl(target_path, [{"title": "Standup", "start": "09:00"}]) 673 + 674 + facets = [ 675 + { 676 + "name": "work", 677 + "files": [ 678 + { 679 + "path": "calendar/20260305.jsonl", 680 + "type": "calendar", 681 + "content": _jsonl_bytes( 682 + [ 683 + {"title": "Standup", "start": "09:00"}, 684 + {"title": "Demo", "start": "14:00"}, 685 + ] 686 + ), 687 + } 688 + ], 689 + } 690 + ] 691 + metadata, file_map = _build_request(facets) 692 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 693 + 694 + assert response.status_code == 200 695 + assert response.get_json()["merged"] == 1 696 + assert _read_jsonl_file(target_path) == [ 697 + {"title": "Standup", "start": "09:00"}, 698 + {"title": "Demo", "start": "14:00"}, 699 + ] 700 + 701 + 702 + def test_existing_facet_merge_news_skip(ingest_env): 703 + env = ingest_env 704 + target_path = env["root"] / "facets" / "work" / "news" / "20260305.md" 705 + target_path.parent.mkdir(parents=True, exist_ok=True) 706 + target_path.write_text("existing\n", encoding="utf-8") 707 + 708 + facets = [ 709 + { 710 + "name": "work", 711 + "files": [ 712 + { 713 + "path": "news/20260305.md", 714 + "type": "news", 715 + "content": b"new\n", 716 + } 717 + ], 718 + } 719 + ] 720 + metadata, file_map = _build_request(facets) 721 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 722 + 723 + assert response.status_code == 200 724 + assert response.get_json()["skipped"] == 1 725 + assert target_path.read_text(encoding="utf-8") == "existing\n" 726 + 727 + 728 + def test_existing_facet_merge_news_copy(ingest_env): 729 + env = ingest_env 730 + (env["root"] / "facets" / "work").mkdir(parents=True, exist_ok=True) 731 + 732 + facets = [ 733 + { 734 + "name": "work", 735 + "files": [ 736 + { 737 + "path": "news/20260305.md", 738 + "type": "news", 739 + "content": b"headline\n", 740 + } 741 + ], 742 + } 743 + ] 744 + metadata, file_map = _build_request(facets) 745 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 746 + 747 + target_path = env["root"] / "facets" / "work" / "news" / "20260305.md" 748 + assert response.status_code == 200 749 + assert response.get_json()["merged"] == 1 750 + assert target_path.read_text(encoding="utf-8") == "headline\n" 751 + 752 + 753 + def test_existing_facet_merge_logs(ingest_env): 754 + env = ingest_env 755 + target_path = env["root"] / "facets" / "work" / "logs" / "20260305.jsonl" 756 + _write_jsonl(target_path, [{"event": "existing"}]) 757 + 758 + facets = [ 759 + { 760 + "name": "work", 761 + "files": [ 762 + { 763 + "path": "logs/20260305.jsonl", 764 + "type": "logs", 765 + "content": _jsonl_bytes([{"event": "new"}]), 766 + } 767 + ], 768 + } 769 + ] 770 + metadata, file_map = _build_request(facets) 771 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 772 + 773 + assert response.status_code == 200 774 + assert response.get_json()["merged"] == 1 775 + assert _read_jsonl_file(target_path) == [{"event": "existing"}, {"event": "new"}] 776 + 777 + 778 + def test_entity_id_remapping(ingest_env): 779 + env = ingest_env 780 + facets = [ 781 + { 782 + "name": "work", 783 + "files": [ 784 + { 785 + "path": "entities/source_entity/entity.json", 786 + "type": "entity_relationship", 787 + "content": _json_bytes({"description": "Remapped"}), 788 + }, 789 + { 790 + "path": "entities/source_entity/observations.jsonl", 791 + "type": "entity_observations", 792 + "content": _jsonl_bytes([{"content": "Knows Rust", "observed_at": 1}]), 793 + }, 794 + { 795 + "path": "entities/20260305.jsonl", 796 + "type": "detected_entities", 797 + "content": _jsonl_bytes( 798 + [{"id": "source_entity", "name": "Source Entity", "type": "Person"}] 799 + ), 800 + }, 801 + { 802 + "path": "activities/20260305.jsonl", 803 + "type": "activity_records", 804 + "content": _jsonl_bytes( 805 + [ 806 + { 807 + "id": "coding_1", 808 + "activity": "coding", 809 + "active_entities": ["source_entity"], 810 + } 811 + ] 812 + ), 813 + }, 814 + ], 815 + } 816 + ] 817 + metadata, file_map = _build_request(facets) 818 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 819 + 820 + facet_root = env["root"] / "facets" / "work" 821 + assert response.status_code == 200 822 + assert response.get_json()["created"] == 4 823 + assert (facet_root / "entities" / "target_entity" / "entity.json").exists() 824 + assert (facet_root / "entities" / "target_entity" / "observations.jsonl").exists() 825 + assert not (facet_root / "entities" / "source_entity").exists() 826 + assert _read_jsonl_file(facet_root / "entities" / "20260305.jsonl")[0]["id"] == "target_entity" 827 + assert _read_jsonl_file(facet_root / "activities" / "20260305.jsonl")[0]["active_entities"] == [ 828 + "target_entity" 829 + ] 830 + 831 + 832 + def test_unmapped_entity_staging(ingest_env): 833 + env = ingest_env 834 + content = _json_bytes({"description": "Unknown"}) 835 + facets = [ 836 + { 837 + "name": "work", 838 + "files": [ 839 + { 840 + "path": "entities/unknown/entity.json", 841 + "type": "entity_relationship", 842 + "content": content, 843 + } 844 + ], 845 + } 846 + ] 847 + metadata, file_map = _build_request(facets) 848 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 849 + 850 + assert response.status_code == 200 851 + assert response.get_json() == { 852 + "created": 0, 853 + "merged": 0, 854 + "skipped": 0, 855 + "staged": 1, 856 + "errors": [], 857 + } 858 + staged = _read_staged(env["key_prefix"], "work", "entity_relationship", "entities/unknown/entity.json") 859 + assert staged["reason"] == "unmapped_entity" 860 + assert staged["source_entity_id"] == "unknown" 861 + assert staged["source_path"] == "entities/unknown/entity.json" 862 + assert "unknown" in staged["explanation"] 863 + 864 + 865 + def test_staged_then_retry(ingest_env): 866 + env = ingest_env 867 + facets = [ 868 + { 869 + "name": "work", 870 + "files": [ 871 + { 872 + "path": "entities/unknown/entity.json", 873 + "type": "entity_relationship", 874 + "content": _json_bytes({"description": "test"}), 875 + } 876 + ], 877 + } 878 + ] 879 + metadata, file_map = _build_request(facets) 880 + first = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 881 + 882 + assert first.status_code == 200 883 + assert first.get_json()["staged"] == 1 884 + 885 + state_path = get_state_directory(env["key_prefix"]) / "entities" / "state.json" 886 + entity_state = json.loads(state_path.read_text(encoding="utf-8")) 887 + entity_state["id_map"]["unknown"] = "unknown" 888 + state_path.write_text(json.dumps(entity_state), encoding="utf-8") 889 + 890 + metadata, file_map = _build_request(facets) 891 + second = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 892 + 893 + assert second.status_code == 200 894 + body = second.get_json() 895 + assert body["staged"] == 0 896 + assert body["skipped"] == 0 897 + assert body["created"] == 1 or body["merged"] == 1 898 + 899 + 900 + def test_facet_json_conflict_staging(ingest_env): 901 + env = ingest_env 902 + target_path = env["root"] / "facets" / "work" / "facet.json" 903 + _write_json(target_path, {"title": "Current"}) 904 + 905 + facets = [ 906 + { 907 + "name": "work", 908 + "files": [ 909 + { 910 + "path": "facet.json", 911 + "type": "facet_json", 912 + "content": _json_bytes({"title": "Incoming"}), 913 + } 914 + ], 915 + } 916 + ] 917 + metadata, file_map = _build_request(facets) 918 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 919 + 920 + assert response.status_code == 200 921 + assert response.get_json()["staged"] == 1 922 + staged = _read_staged(env["key_prefix"], "work", "facet_json", "facet.json") 923 + assert staged["reason"] == "facet_json_conflict" 924 + assert staged["source_content"] == {"title": "Incoming"} 925 + assert staged["target_content"] == {"title": "Current"} 926 + 927 + 928 + def test_idempotent(ingest_env): 929 + env = ingest_env 930 + facets = [ 931 + { 932 + "name": "work", 933 + "files": [ 934 + {"path": "news/20260305.md", "type": "news", "content": b"repeat\n"} 935 + ], 936 + } 937 + ] 938 + metadata, file_map = _build_request(facets) 939 + first = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 940 + 941 + metadata, file_map = _build_request(facets) 942 + second = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 943 + 944 + assert first.status_code == 200 945 + assert second.status_code == 200 946 + assert first.get_json()["created"] == 1 947 + assert second.get_json() == { 948 + "created": 0, 949 + "merged": 0, 950 + "skipped": 1, 951 + "staged": 0, 952 + "errors": [], 953 + } 954 + 955 + 956 + def test_error_isolation(ingest_env): 957 + env = ingest_env 958 + (env["root"] / "facets" / "work").mkdir(parents=True, exist_ok=True) 959 + facets = [ 960 + { 961 + "name": "work", 962 + "files": [ 963 + { 964 + "path": "entities/20260305.jsonl", 965 + "type": "detected_entities", 966 + "content": b"{bad json\n", 967 + }, 968 + {"path": "news/20260305.md", "type": "news", "content": b"good\n"}, 969 + ], 970 + } 971 + ] 972 + metadata, file_map = _build_request(facets) 973 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 974 + 975 + assert response.status_code == 200 976 + body = response.get_json() 977 + assert body["merged"] == 1 978 + assert len(body["errors"]) == 1 979 + assert (env["root"] / "facets" / "work" / "news" / "20260305.md").exists() 980 + 981 + 982 + def test_error_isolation_across_facets(ingest_env): 983 + env = ingest_env 984 + facets = [ 985 + { 986 + "name": "broken", 987 + "files": [ 988 + { 989 + "path": "activities/activities.jsonl", 990 + "type": "activity_config", 991 + "content": b"{bad json\n", 992 + } 993 + ], 994 + }, 995 + { 996 + "name": "good", 997 + "files": [ 998 + {"path": "news/20260305.md", "type": "news", "content": b"ok\n"} 999 + ], 1000 + }, 1001 + ] 1002 + metadata, file_map = _build_request(facets) 1003 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 1004 + 1005 + assert response.status_code == 200 1006 + body = response.get_json() 1007 + assert body["created"] == 1 1008 + assert len(body["errors"]) == 1 1009 + assert (env["root"] / "facets" / "good" / "news" / "20260305.md").exists() 1010 + 1011 + 1012 + def test_stats_update(ingest_env): 1013 + env = ingest_env 1014 + facets = [ 1015 + { 1016 + "name": "alpha", 1017 + "files": [ 1018 + {"path": "news/20260305.md", "type": "news", "content": b"alpha\n"}, 1019 + { 1020 + "path": "logs/20260305.jsonl", 1021 + "type": "logs", 1022 + "content": _jsonl_bytes([{"event": "alpha"}]), 1023 + }, 1024 + ], 1025 + }, 1026 + { 1027 + "name": "beta", 1028 + "files": [ 1029 + {"path": "news/20260305.md", "type": "news", "content": b"beta\n"} 1030 + ], 1031 + }, 1032 + { 1033 + "name": "gamma", 1034 + "files": [ 1035 + { 1036 + "path": "entities/unknown/entity.json", 1037 + "type": "entity_relationship", 1038 + "content": _json_bytes({"description": "unknown"}), 1039 + } 1040 + ], 1041 + }, 1042 + ] 1043 + metadata, file_map = _build_request(facets) 1044 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 1045 + source = load_journal_source(env["key"]) 1046 + 1047 + assert response.status_code == 200 1048 + assert source["stats"]["facets_received"] == 2 1049 + 1050 + 1051 + def test_state_manifest(ingest_env): 1052 + env = ingest_env 1053 + news = b"news\n" 1054 + logs = _jsonl_bytes([{"event": "log"}]) 1055 + facets = [ 1056 + { 1057 + "name": "work", 1058 + "files": [ 1059 + {"path": "news/20260305.md", "type": "news", "content": news}, 1060 + {"path": "logs/20260305.jsonl", "type": "logs", "content": logs}, 1061 + ], 1062 + } 1063 + ] 1064 + metadata, file_map = _build_request(facets) 1065 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 1066 + 1067 + assert response.status_code == 200 1068 + assert _read_state(env["key_prefix"]) == { 1069 + "received": { 1070 + "work/news/20260305.md": _hash_bytes(news), 1071 + "work/logs/20260305.jsonl": _hash_bytes(logs), 1072 + } 1073 + } 1074 + 1075 + 1076 + def test_decision_log(ingest_env): 1077 + env = ingest_env 1078 + facets = [ 1079 + { 1080 + "name": "work", 1081 + "files": [ 1082 + {"path": "news/20260305.md", "type": "news", "content": b"headline\n"} 1083 + ], 1084 + } 1085 + ] 1086 + metadata, file_map = _build_request(facets) 1087 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 1088 + 1089 + assert response.status_code == 200 1090 + entries = _read_log(env["key_prefix"]) 1091 + assert len(entries) == 1 1092 + entry = entries[0] 1093 + assert "ts" in entry 1094 + assert entry["action"] == "facet_file_created" 1095 + assert entry["item_type"] == "news" 1096 + assert entry["item_id"] == "work/news/20260305.md" 1097 + assert entry["facet"] == "work" 1098 + assert entry["reason"] == "new_facet" 1099 + 1100 + 1101 + def test_error_logged_to_decision_log(ingest_env): 1102 + env = ingest_env 1103 + facets = [ 1104 + { 1105 + "name": "work", 1106 + "files": [ 1107 + { 1108 + "path": "todos/20260305.jsonl", 1109 + "type": "todos", 1110 + "content": b"{bad\n", 1111 + } 1112 + ], 1113 + } 1114 + ] 1115 + metadata, file_map = _build_request(facets) 1116 + response = _post_facets(env["client"], env["key"], env["key_prefix"], metadata, file_map) 1117 + 1118 + assert response.status_code == 200 1119 + assert len(response.get_json()["errors"]) == 1 1120 + 1121 + entries = _read_log(env["key_prefix"]) 1122 + assert len(entries) == 1 1123 + assert entries[0]["action"] == "facet_file_error" 1124 + assert entries[0]["facet"] == "work"