personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add POST entity ingest endpoint with tiered merge

Add the journal source entity ingest route in apps/import/ingest.py with tiered matching, idempotency tracking, staging for ambiguous inputs, and source stats updates.

Add tests/test_entity_ingest.py covering high-confidence merges, staging paths, auto-create, idempotency, auth, state, and response behavior.

+827 -1
+301 -1
apps/import/ingest.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """Segment ingest endpoint for journal source imports.""" 4 + """Ingest endpoints for journal source imports.""" 5 5 6 6 from __future__ import annotations 7 7 8 8 import json 9 + import hashlib 9 10 import logging 10 11 import os 11 12 import re ··· 22 23 compute_file_sha256, 23 24 find_available_segment, 24 25 ) 26 + from think.entities.core import EntityDict, entity_slug 27 + from think.entities.journal import ( 28 + has_journal_principal, 29 + load_all_journal_entities, 30 + save_journal_entity, 31 + ) 32 + from think.entities.matching import find_matching_entity 25 33 26 34 from .journal_sources import ( 27 35 get_state_directory, ··· 260 268 "errors": errors, 261 269 } 262 270 ) 271 + 272 + @bp.route("/journal/<key_prefix>/ingest/entities", methods=["POST"]) 273 + @require_journal_source 274 + def ingest_entities(key_prefix: str): 275 + if g.journal_source["key"][:8] != key_prefix: 276 + abort(403, description="Key prefix mismatch") 277 + 278 + payload = request.get_json(silent=True) 279 + if not isinstance(payload, dict): 280 + return jsonify({"error": "Invalid JSON body"}), 400 281 + 282 + entities = payload.get("entities") 283 + if not isinstance(entities, list): 284 + return jsonify({"error": "Missing entities array"}), 400 285 + 286 + state_dir = get_state_directory(key_prefix) 287 + log_path = state_dir / "entities" / "log.jsonl" 288 + state_path = state_dir / "entities" / "state.json" 289 + staged_dir = state_dir / "entities" / "staged" 290 + 291 + try: 292 + entity_state = json.loads(state_path.read_text(encoding="utf-8")) 293 + except (OSError, json.JSONDecodeError): 294 + entity_state = {} 295 + if not isinstance(entity_state, dict): 296 + entity_state = {} 297 + id_map = entity_state.get("id_map") 298 + received = entity_state.get("received") 299 + if not isinstance(id_map, dict) or not isinstance(received, dict): 300 + entity_state = {"id_map": {}, "received": {}} 301 + else: 302 + entity_state = {"id_map": dict(id_map), "received": dict(received)} 303 + 304 + target_entities = load_all_journal_entities() 305 + target_has_principal = has_journal_principal() 306 + 307 + auto_merged = 0 308 + created = 0 309 + staged = 0 310 + skipped = 0 311 + errors: list[dict[str, str]] = [] 312 + 313 + for entity_data in entities: 314 + try: 315 + if not isinstance(entity_data, dict): 316 + raise ValueError("Entity data must be an object") 317 + 318 + name = str(entity_data.get("name", "")).strip() 319 + if not name: 320 + raise ValueError("Entity name is required") 321 + 322 + source_id = str(entity_data.get("id") or entity_slug(name)) 323 + if not source_id: 324 + raise ValueError("Entity id is required") 325 + entity_data["id"] = source_id 326 + 327 + content_hash = hashlib.sha256( 328 + json.dumps(entity_data, sort_keys=True, ensure_ascii=False).encode() 329 + ).hexdigest() 330 + 331 + existing_hash = entity_state["received"].get(source_id) 332 + if existing_hash == content_hash: 333 + skipped += 1 334 + entity_state["received"][source_id] = content_hash 335 + _append_decision( 336 + log_path, 337 + { 338 + "ts": datetime.now(timezone.utc).isoformat(), 339 + "action": "skipped", 340 + "item_type": "entity", 341 + "item_id": source_id, 342 + "match_tier": None, 343 + "reason": "idempotent", 344 + "source": entity_data, 345 + "target": None, 346 + "fields_changed": [], 347 + }, 348 + ) 349 + continue 350 + 351 + match = find_matching_entity(entity_data["name"], list(target_entities.values())) 352 + 353 + if match is not None and match.is_high_confidence: 354 + target_id = str(match["id"]) 355 + target_entity: EntityDict = dict(target_entities[target_id]) 356 + pre_merge_snapshot = dict(target_entity) 357 + 358 + aka_by_lower: dict[str, str] = {} 359 + for values in (target_entity.get("aka", []), entity_data.get("aka", [])): 360 + if not isinstance(values, list): 361 + continue 362 + for value in values: 363 + if not value: 364 + continue 365 + key = str(value).lower() 366 + if key not in aka_by_lower: 367 + aka_by_lower[key] = str(value) 368 + if aka_by_lower: 369 + target_entity["aka"] = sorted(aka_by_lower.values(), key=str.lower) 370 + 371 + merged_emails: list[str] = [] 372 + seen_emails: set[str] = set() 373 + for values in ( 374 + target_entity.get("emails", []), 375 + entity_data.get("emails", []), 376 + ): 377 + if not isinstance(values, list): 378 + continue 379 + for value in values: 380 + if not value: 381 + continue 382 + email = str(value) 383 + key = email.lower() 384 + if key in seen_emails: 385 + continue 386 + seen_emails.add(key) 387 + merged_emails.append(email) 388 + if merged_emails: 389 + target_entity["emails"] = merged_emails 390 + 391 + source_created = entity_data.get("created_at") 392 + target_created = target_entity.get("created_at") 393 + if source_created is not None and target_created is not None: 394 + target_entity["created_at"] = min(source_created, target_created) 395 + elif source_created is not None: 396 + target_entity["created_at"] = source_created 397 + 398 + save_journal_entity(target_entity) 399 + target_entities[target_id] = target_entity 400 + fields_changed = sorted( 401 + key 402 + for key in set(pre_merge_snapshot) | set(target_entity) 403 + if pre_merge_snapshot.get(key) != target_entity.get(key) 404 + ) 405 + entity_state["id_map"][source_id] = target_id 406 + auto_merged += 1 407 + _append_decision( 408 + log_path, 409 + { 410 + "ts": datetime.now(timezone.utc).isoformat(), 411 + "action": "auto_merged", 412 + "item_type": "entity", 413 + "item_id": source_id, 414 + "match_tier": int(match.tier), 415 + "reason": "high_confidence_match", 416 + "source": entity_data, 417 + "target": target_entity, 418 + "fields_changed": fields_changed, 419 + }, 420 + ) 421 + elif match is not None and not match.is_high_confidence: 422 + staged_dir.mkdir(parents=True, exist_ok=True) 423 + staged_payload = { 424 + "source_entity": entity_data, 425 + "match_candidates": [ 426 + { 427 + "id": match["id"], 428 + "name": match["name"], 429 + "tier": int(match.tier), 430 + } 431 + ], 432 + "reason": "low_confidence_match", 433 + "staged_at": datetime.now(timezone.utc).isoformat(), 434 + } 435 + (staged_dir / f"{source_id}.json").write_text( 436 + json.dumps(staged_payload, indent=2, ensure_ascii=False) + "\n", 437 + encoding="utf-8", 438 + ) 439 + staged += 1 440 + _append_decision( 441 + log_path, 442 + { 443 + "ts": datetime.now(timezone.utc).isoformat(), 444 + "action": "staged", 445 + "item_type": "entity", 446 + "item_id": source_id, 447 + "match_tier": int(match.tier), 448 + "reason": "low_confidence_match", 449 + "source": entity_data, 450 + "target": None, 451 + "fields_changed": [], 452 + }, 453 + ) 454 + else: 455 + source_slug = entity_data["id"] 456 + if source_slug in target_entities: 457 + staged_dir.mkdir(parents=True, exist_ok=True) 458 + staged_payload = { 459 + "source_entity": entity_data, 460 + "match_candidates": [ 461 + { 462 + "id": source_slug, 463 + "name": target_entities[source_slug]["name"], 464 + "tier": None, 465 + } 466 + ], 467 + "reason": "id_collision", 468 + "staged_at": datetime.now(timezone.utc).isoformat(), 469 + } 470 + (staged_dir / f"{source_id}.json").write_text( 471 + json.dumps(staged_payload, indent=2, ensure_ascii=False) + "\n", 472 + encoding="utf-8", 473 + ) 474 + staged += 1 475 + _append_decision( 476 + log_path, 477 + { 478 + "ts": datetime.now(timezone.utc).isoformat(), 479 + "action": "staged", 480 + "item_type": "entity", 481 + "item_id": source_id, 482 + "match_tier": None, 483 + "reason": "id_collision", 484 + "source": entity_data, 485 + "target": None, 486 + "fields_changed": [], 487 + }, 488 + ) 489 + elif entity_data.get("is_principal") and target_has_principal: 490 + staged_dir.mkdir(parents=True, exist_ok=True) 491 + staged_payload = { 492 + "source_entity": entity_data, 493 + "match_candidates": [], 494 + "reason": "principal_conflict", 495 + "staged_at": datetime.now(timezone.utc).isoformat(), 496 + } 497 + (staged_dir / f"{source_id}.json").write_text( 498 + json.dumps(staged_payload, indent=2, ensure_ascii=False) + "\n", 499 + encoding="utf-8", 500 + ) 501 + staged += 1 502 + _append_decision( 503 + log_path, 504 + { 505 + "ts": datetime.now(timezone.utc).isoformat(), 506 + "action": "staged", 507 + "item_type": "entity", 508 + "item_id": source_id, 509 + "match_tier": None, 510 + "reason": "principal_conflict", 511 + "source": entity_data, 512 + "target": None, 513 + "fields_changed": [], 514 + }, 515 + ) 516 + else: 517 + save_journal_entity(entity_data) 518 + target_entities[source_slug] = entity_data 519 + if entity_data.get("is_principal"): 520 + target_has_principal = True 521 + entity_state["id_map"][source_id] = source_id 522 + created += 1 523 + _append_decision( 524 + log_path, 525 + { 526 + "ts": datetime.now(timezone.utc).isoformat(), 527 + "action": "created", 528 + "item_type": "entity", 529 + "item_id": source_id, 530 + "match_tier": None, 531 + "reason": "no_match", 532 + "source": entity_data, 533 + "target": None, 534 + "fields_changed": [], 535 + }, 536 + ) 537 + 538 + entity_state["received"][source_id] = content_hash 539 + except Exception as exc: 540 + entity_id = entity_data.get("id", "") if isinstance(entity_data, dict) else "" 541 + errors.append({"entity_id": entity_id, "error": str(exc)}) 542 + 543 + _write_state_atomic(state_path, entity_state) 544 + 545 + written = auto_merged + created 546 + if written > 0: 547 + source = g.journal_source 548 + source.setdefault("stats", {}) 549 + source["stats"]["entities_received"] = ( 550 + source["stats"].get("entities_received", 0) + written 551 + ) 552 + save_journal_source(source) 553 + 554 + return jsonify( 555 + { 556 + "auto_merged": auto_merged, 557 + "created": created, 558 + "staged": staged, 559 + "skipped": skipped, 560 + "errors": errors, 561 + } 562 + )
+526
tests/test_entity_ingest.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import hashlib 7 + import json 8 + from importlib import import_module 9 + 10 + import pytest 11 + from flask import Blueprint, Flask 12 + 13 + import convey.state 14 + from think.entities.core import entity_slug 15 + from think.entities.journal import ( 16 + load_all_journal_entities, 17 + load_journal_entity, 18 + save_journal_entity, 19 + ) 20 + 21 + journal_sources = import_module("apps.import.journal_sources") 22 + ingest = import_module("apps.import.ingest") 23 + 24 + create_state_directory = journal_sources.create_state_directory 25 + generate_key = journal_sources.generate_key 26 + get_state_directory = journal_sources.get_state_directory 27 + load_journal_source = journal_sources.load_journal_source 28 + save_journal_source = journal_sources.save_journal_source 29 + register_ingest_routes = ingest.register_ingest_routes 30 + 31 + 32 + @pytest.fixture 33 + def journal_env(tmp_path, monkeypatch): 34 + monkeypatch.setattr(convey.state, "journal_root", str(tmp_path), raising=False) 35 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 36 + (tmp_path / "apps" / "import" / "journal_sources").mkdir( 37 + parents=True, exist_ok=True 38 + ) 39 + return tmp_path 40 + 41 + 42 + def _source(name="test-source", key=None, **overrides): 43 + if key is None: 44 + key = generate_key() 45 + source = { 46 + "name": name, 47 + "key": key, 48 + "created_at": 1000, 49 + "enabled": True, 50 + "revoked": False, 51 + "revoked_at": None, 52 + "stats": { 53 + "segments_received": 0, 54 + "entities_received": 0, 55 + "facets_received": 0, 56 + "imports_received": 0, 57 + "config_received": 0, 58 + }, 59 + } 60 + source.update(overrides) 61 + return source 62 + 63 + 64 + @pytest.fixture 65 + def ingest_env(journal_env): 66 + key = generate_key() 67 + source = _source(key=key) 68 + save_journal_source(source) 69 + key_prefix = key[:8] 70 + create_state_directory(journal_env, key_prefix) 71 + 72 + app = Flask(__name__) 73 + app.config["TESTING"] = True 74 + bp = Blueprint("import-test", __name__, url_prefix="/app/import") 75 + register_ingest_routes(bp) 76 + app.register_blueprint(bp) 77 + 78 + return { 79 + "root": journal_env, 80 + "key": key, 81 + "key_prefix": key_prefix, 82 + "source": source, 83 + "client": app.test_client(), 84 + } 85 + 86 + 87 + def _post_entities(client, key, key_prefix, entities): 88 + return client.post( 89 + f"/app/import/journal/{key_prefix}/ingest/entities", 90 + headers={"Authorization": f"Bearer {key}"}, 91 + json={"entities": entities}, 92 + ) 93 + 94 + 95 + def _read_state(key_prefix: str) -> dict: 96 + state_path = get_state_directory(key_prefix) / "entities" / "state.json" 97 + return json.loads(state_path.read_text(encoding="utf-8")) 98 + 99 + 100 + def _read_log(key_prefix: str) -> list[dict]: 101 + log_path = get_state_directory(key_prefix) / "entities" / "log.jsonl" 102 + if not log_path.exists(): 103 + return [] 104 + return [ 105 + json.loads(line) 106 + for line in log_path.read_text(encoding="utf-8").splitlines() 107 + if line.strip() 108 + ] 109 + 110 + 111 + def _read_staged(key_prefix: str, entity_id: str) -> dict: 112 + staged_path = ( 113 + get_state_directory(key_prefix) / "entities" / "staged" / f"{entity_id}.json" 114 + ) 115 + return json.loads(staged_path.read_text(encoding="utf-8")) 116 + 117 + 118 + def _entity_hash(entity: dict) -> str: 119 + return hashlib.sha256( 120 + json.dumps(entity, sort_keys=True, ensure_ascii=False).encode() 121 + ).hexdigest() 122 + 123 + 124 + def test_auto_merge_exact(ingest_env): 125 + env = ingest_env 126 + save_journal_entity( 127 + { 128 + "id": "alice_johnson", 129 + "name": "Alice Johnson", 130 + "type": "Person", 131 + "aka": ["Ali"], 132 + "emails": ["alice@old.com"], 133 + "created_at": 2000, 134 + } 135 + ) 136 + 137 + source = { 138 + "name": "Alice Johnson", 139 + "type": "Person", 140 + "aka": ["AJ"], 141 + "emails": ["alice@new.com"], 142 + "created_at": 1000, 143 + } 144 + 145 + response = _post_entities(env["client"], env["key"], env["key_prefix"], [source]) 146 + 147 + assert response.status_code == 200 148 + assert response.get_json() == { 149 + "auto_merged": 1, 150 + "created": 0, 151 + "staged": 0, 152 + "skipped": 0, 153 + "errors": [], 154 + } 155 + 156 + merged = load_journal_entity("alice_johnson") 157 + assert merged is not None 158 + assert merged["aka"] == ["AJ", "Ali"] 159 + assert merged["emails"] == ["alice@old.com", "alice@new.com"] 160 + assert merged["created_at"] == 1000 161 + 162 + state = _read_state(env["key_prefix"]) 163 + assert state["id_map"] == {"alice_johnson": "alice_johnson"} 164 + assert state["received"]["alice_johnson"] == _entity_hash( 165 + {**source, "id": "alice_johnson"} 166 + ) 167 + 168 + log_entries = _read_log(env["key_prefix"]) 169 + assert len(log_entries) == 1 170 + entry = log_entries[0] 171 + assert "ts" in entry 172 + assert entry["action"] == "auto_merged" 173 + assert entry["item_type"] == "entity" 174 + assert entry["item_id"] == "alice_johnson" 175 + assert entry["match_tier"] == 1 176 + assert entry["reason"] == "high_confidence_match" 177 + assert entry["source"]["name"] == "Alice Johnson" 178 + assert entry["target"]["id"] == "alice_johnson" 179 + assert entry["target"]["aka"] == ["AJ", "Ali"] 180 + assert isinstance(entry["fields_changed"], list) 181 + 182 + 183 + def test_auto_merge_case_insensitive(ingest_env): 184 + env = ingest_env 185 + save_journal_entity( 186 + {"id": "alice_johnson", "name": "alice johnson", "type": "Person"} 187 + ) 188 + 189 + response = _post_entities( 190 + env["client"], 191 + env["key"], 192 + env["key_prefix"], 193 + [{"name": "Alice Johnson", "type": "Person"}], 194 + ) 195 + 196 + assert response.status_code == 200 197 + assert response.get_json()["auto_merged"] == 1 198 + 199 + log_entries = _read_log(env["key_prefix"]) 200 + assert log_entries[0]["match_tier"] == 2 201 + 202 + 203 + def test_auto_merge_email_match(ingest_env): 204 + env = ingest_env 205 + save_journal_entity( 206 + { 207 + "id": "alice_contact", 208 + "name": "Alice Contact", 209 + "type": "Person", 210 + "emails": ["alice@example.com"], 211 + } 212 + ) 213 + 214 + response = _post_entities( 215 + env["client"], 216 + env["key"], 217 + env["key_prefix"], 218 + [{"name": "alice@example.com", "type": "Person"}], 219 + ) 220 + 221 + assert response.status_code == 200 222 + assert response.get_json()["auto_merged"] == 1 223 + 224 + log_entries = _read_log(env["key_prefix"]) 225 + assert log_entries[0]["match_tier"] == 3 226 + 227 + 228 + def test_auto_merge_slug_match(ingest_env): 229 + env = ingest_env 230 + save_journal_entity({"id": "alice_johnson", "name": "AJ", "type": "Person"}) 231 + 232 + response = _post_entities( 233 + env["client"], 234 + env["key"], 235 + env["key_prefix"], 236 + [{"name": "Alice Johnson", "type": "Person"}], 237 + ) 238 + 239 + assert response.status_code == 200 240 + assert response.get_json()["auto_merged"] == 1 241 + 242 + log_entries = _read_log(env["key_prefix"]) 243 + assert log_entries[0]["match_tier"] == 4 244 + 245 + 246 + def test_stage_low_confidence(ingest_env): 247 + env = ingest_env 248 + save_journal_entity( 249 + {"id": "alice_johnson", "name": "Alice Johnson", "type": "Person"} 250 + ) 251 + 252 + source = {"name": "Alce Jonson", "type": "Person"} 253 + response = _post_entities(env["client"], env["key"], env["key_prefix"], [source]) 254 + 255 + assert response.status_code == 200 256 + assert response.get_json() == { 257 + "auto_merged": 0, 258 + "created": 0, 259 + "staged": 1, 260 + "skipped": 0, 261 + "errors": [], 262 + } 263 + 264 + staged = _read_staged(env["key_prefix"], "alce_jonson") 265 + assert staged["reason"] == "low_confidence_match" 266 + assert staged["match_candidates"] == [ 267 + {"id": "alice_johnson", "name": "Alice Johnson", "tier": 8} 268 + ] 269 + state = _read_state(env["key_prefix"]) 270 + assert "alce_jonson" not in state["id_map"] 271 + assert "alce_jonson" in state["received"] 272 + 273 + 274 + def test_stage_id_collision(ingest_env): 275 + env = ingest_env 276 + save_journal_entity({"id": "test", "name": "Test Entity", "type": "Tool"}) 277 + 278 + source = { 279 + "id": "test", 280 + "name": "Completely Different Name", 281 + "type": "Person", 282 + } 283 + response = _post_entities(env["client"], env["key"], env["key_prefix"], [source]) 284 + 285 + assert response.status_code == 200 286 + assert response.get_json()["staged"] == 1 287 + 288 + staged = _read_staged(env["key_prefix"], "test") 289 + assert staged["reason"] == "id_collision" 290 + assert staged["match_candidates"] == [ 291 + {"id": "test", "name": "Test Entity", "tier": None} 292 + ] 293 + state = _read_state(env["key_prefix"]) 294 + assert "test" not in state["id_map"] 295 + assert "test" in state["received"] 296 + 297 + 298 + def test_stage_principal_conflict(ingest_env): 299 + env = ingest_env 300 + save_journal_entity( 301 + { 302 + "id": "existing_principal", 303 + "name": "Existing Principal", 304 + "type": "Person", 305 + "is_principal": True, 306 + } 307 + ) 308 + 309 + source = {"name": "New Principal", "type": "Person", "is_principal": True} 310 + response = _post_entities(env["client"], env["key"], env["key_prefix"], [source]) 311 + 312 + assert response.status_code == 200 313 + assert response.get_json()["staged"] == 1 314 + 315 + staged = _read_staged(env["key_prefix"], "new_principal") 316 + assert staged["reason"] == "principal_conflict" 317 + assert staged["match_candidates"] == [] 318 + state = _read_state(env["key_prefix"]) 319 + assert "new_principal" not in state["id_map"] 320 + assert "new_principal" in state["received"] 321 + 322 + 323 + def test_auto_create(ingest_env): 324 + env = ingest_env 325 + source = {"name": "Fresh Entity", "type": "Tool"} 326 + 327 + response = _post_entities(env["client"], env["key"], env["key_prefix"], [source]) 328 + 329 + assert response.status_code == 200 330 + assert response.get_json()["created"] == 1 331 + assert load_journal_entity("fresh_entity") is not None 332 + 333 + state = _read_state(env["key_prefix"]) 334 + assert state["id_map"] == {"fresh_entity": "fresh_entity"} 335 + 336 + 337 + def test_idempotent(ingest_env): 338 + env = ingest_env 339 + source = {"name": "Repeat Entity", "type": "Tool"} 340 + 341 + first = _post_entities(env["client"], env["key"], env["key_prefix"], [source]) 342 + second = _post_entities(env["client"], env["key"], env["key_prefix"], [source]) 343 + 344 + assert first.status_code == 200 345 + assert second.status_code == 200 346 + assert first.get_json()["created"] == 1 347 + assert second.get_json() == { 348 + "auto_merged": 0, 349 + "created": 0, 350 + "staged": 0, 351 + "skipped": 1, 352 + "errors": [], 353 + } 354 + 355 + 356 + def test_idempotent_content_change(ingest_env): 357 + env = ingest_env 358 + first = {"name": "Mutable Entity", "type": "Tool"} 359 + second = {"name": "Mutable Entity", "type": "Tool", "aka": ["Mut"]} 360 + 361 + initial = _post_entities(env["client"], env["key"], env["key_prefix"], [first]) 362 + updated = _post_entities(env["client"], env["key"], env["key_prefix"], [second]) 363 + 364 + assert initial.status_code == 200 365 + assert updated.status_code == 200 366 + assert updated.get_json()["created"] == 0 367 + assert updated.get_json()["auto_merged"] == 1 368 + assert updated.get_json()["skipped"] == 0 369 + state = _read_state(env["key_prefix"]) 370 + expected_second_entity = {**second, "id": "mutable_entity"} 371 + assert state["received"]["mutable_entity"] == _entity_hash(expected_second_entity) 372 + 373 + 374 + def test_error_isolation(ingest_env): 375 + env = ingest_env 376 + entities = [ 377 + {"name": "Valid One", "type": "Person"}, 378 + {"no_name": True}, 379 + {"name": "Valid Two", "type": "Tool"}, 380 + ] 381 + 382 + response = _post_entities(env["client"], env["key"], env["key_prefix"], entities) 383 + 384 + assert response.status_code == 200 385 + assert response.get_json() == { 386 + "auto_merged": 0, 387 + "created": 2, 388 + "staged": 0, 389 + "skipped": 0, 390 + "errors": [{"entity_id": "", "error": "Entity name is required"}], 391 + } 392 + assert load_journal_entity("valid_one") is not None 393 + assert load_journal_entity("valid_two") is not None 394 + 395 + 396 + def test_auth_missing(ingest_env): 397 + env = ingest_env 398 + response = env["client"].post( 399 + f"/app/import/journal/{env['key_prefix']}/ingest/entities", 400 + json={"entities": []}, 401 + ) 402 + 403 + assert response.status_code == 401 404 + 405 + 406 + def test_auth_invalid(ingest_env): 407 + env = ingest_env 408 + response = env["client"].post( 409 + f"/app/import/journal/{env['key_prefix']}/ingest/entities", 410 + headers={"Authorization": "Bearer wrong-token"}, 411 + json={"entities": []}, 412 + ) 413 + 414 + assert response.status_code == 401 415 + 416 + 417 + def test_auth_revoked(ingest_env): 418 + env = ingest_env 419 + env["source"]["revoked"] = True 420 + env["source"]["revoked_at"] = 12345 421 + save_journal_source(env["source"]) 422 + 423 + response = env["client"].post( 424 + f"/app/import/journal/{env['key_prefix']}/ingest/entities", 425 + headers={"Authorization": f"Bearer {env['key']}"}, 426 + json={"entities": []}, 427 + ) 428 + 429 + assert response.status_code == 403 430 + 431 + 432 + def test_key_prefix_mismatch(ingest_env): 433 + env = ingest_env 434 + response = env["client"].post( 435 + "/app/import/journal/deadbeef/ingest/entities", 436 + headers={"Authorization": f"Bearer {env['key']}"}, 437 + json={"entities": []}, 438 + ) 439 + 440 + assert response.status_code == 403 441 + 442 + 443 + def test_stats_update(ingest_env): 444 + env = ingest_env 445 + save_journal_entity({"id": "alice_johnson", "name": "Alice Johnson", "type": "Person"}) 446 + 447 + response = _post_entities( 448 + env["client"], 449 + env["key"], 450 + env["key_prefix"], 451 + [ 452 + {"name": "Alice Johnson", "type": "Person"}, 453 + {"name": "Fresh Entity", "type": "Tool"}, 454 + {"name": "Alic Johnson", "type": "Person"}, 455 + ], 456 + ) 457 + source = load_journal_source(env["key"]) 458 + 459 + assert response.status_code == 200 460 + assert source["stats"]["entities_received"] == 2 461 + 462 + 463 + def test_state_manifest(ingest_env): 464 + env = ingest_env 465 + source = {"name": "Manifest Entity", "type": "Tool"} 466 + 467 + response = _post_entities(env["client"], env["key"], env["key_prefix"], [source]) 468 + 469 + assert response.status_code == 200 470 + 471 + state = _read_state(env["key_prefix"]) 472 + expected = {**source, "id": "manifest_entity"} 473 + assert state == { 474 + "id_map": {"manifest_entity": "manifest_entity"}, 475 + "received": {"manifest_entity": _entity_hash(expected)}, 476 + } 477 + 478 + 479 + def test_request_body_validation(ingest_env): 480 + env = ingest_env 481 + 482 + missing = env["client"].post( 483 + f"/app/import/journal/{env['key_prefix']}/ingest/entities", 484 + headers={"Authorization": f"Bearer {env['key']}"}, 485 + data="not-json", 486 + content_type="application/json", 487 + ) 488 + invalid = env["client"].post( 489 + f"/app/import/journal/{env['key_prefix']}/ingest/entities", 490 + headers={"Authorization": f"Bearer {env['key']}"}, 491 + json={"wrong": []}, 492 + ) 493 + 494 + assert missing.status_code == 400 495 + assert missing.get_json() == {"error": "Invalid JSON body"} 496 + assert invalid.status_code == 400 497 + assert invalid.get_json() == {"error": "Missing entities array"} 498 + 499 + 500 + def test_empty_entities_list(ingest_env): 501 + env = ingest_env 502 + response = _post_entities(env["client"], env["key"], env["key_prefix"], []) 503 + 504 + assert response.status_code == 200 505 + assert response.get_json() == { 506 + "auto_merged": 0, 507 + "created": 0, 508 + "staged": 0, 509 + "skipped": 0, 510 + "errors": [], 511 + } 512 + 513 + 514 + def test_load_all_entities_sees_created_entity(ingest_env): 515 + env = ingest_env 516 + 517 + response = _post_entities( 518 + env["client"], 519 + env["key"], 520 + env["key_prefix"], 521 + [{"name": "Visible Entity", "type": "Tool"}], 522 + ) 523 + 524 + assert response.status_code == 200 525 + assert "visible_entity" in load_all_journal_entities() 526 + assert entity_slug("Visible Entity") == "visible_entity"