personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'hopper-jpdfvl4r-segment-ingest'

+822
+262
apps/import/ingest.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Segment ingest endpoint for journal source imports.""" 5 + 6 + from __future__ import annotations 7 + 8 + import json 9 + import logging 10 + import os 11 + import re 12 + import tempfile 13 + from datetime import datetime, timezone 14 + from pathlib import Path 15 + 16 + from flask import abort, g, jsonify, request 17 + from werkzeug.utils import secure_filename 18 + 19 + from convey import emit, state 20 + from observe.utils import ( 21 + compute_bytes_sha256, 22 + compute_file_sha256, 23 + find_available_segment, 24 + ) 25 + 26 + from .journal_sources import ( 27 + get_state_directory, 28 + require_journal_source, 29 + save_journal_source, 30 + ) 31 + 32 + logger = logging.getLogger(__name__) 33 + 34 + _DAY_RE = re.compile(r"^\d{8}$") 35 + _SEGMENT_RE = re.compile(r"^\d{6}_\d+$") 36 + _STREAM_RE = re.compile(r"^[a-z0-9][a-z0-9._-]*$") 37 + 38 + 39 + def _append_decision(log_path: Path, entry: dict) -> None: 40 + log_path.parent.mkdir(parents=True, exist_ok=True) 41 + with open(log_path, "a", encoding="utf-8") as handle: 42 + handle.write(json.dumps(entry, ensure_ascii=False) + "\n") 43 + 44 + 45 + def _write_state_atomic(state_path: Path, state_data: dict) -> None: 46 + state_path.parent.mkdir(parents=True, exist_ok=True) 47 + fd, tmp_path = tempfile.mkstemp(dir=state_path.parent, suffix=".tmp") 48 + try: 49 + with os.fdopen(fd, "w", encoding="utf-8") as handle: 50 + json.dump(state_data, handle, indent=2) 51 + Path(tmp_path).rename(state_path) 52 + except Exception: 53 + Path(tmp_path).unlink(missing_ok=True) 54 + raise 55 + 56 + 57 + def register_ingest_routes(bp) -> None: 58 + @bp.route("/journal/<key_prefix>/ingest/segments", methods=["POST"]) 59 + @require_journal_source 60 + def ingest_segments(key_prefix: str): 61 + if g.journal_source["key"][:8] != key_prefix: 62 + abort(403, description="Key prefix mismatch") 63 + 64 + metadata_raw = request.form.get("metadata") 65 + if not metadata_raw: 66 + return jsonify({"error": "Missing metadata"}), 400 67 + 68 + try: 69 + metadata = json.loads(metadata_raw) 70 + except json.JSONDecodeError: 71 + return jsonify({"error": "Invalid metadata JSON"}), 400 72 + 73 + if not isinstance(metadata, dict): 74 + return jsonify({"error": "Invalid metadata JSON"}), 400 75 + 76 + segments = metadata.get("segments") 77 + if not isinstance(segments, list): 78 + return jsonify({"error": "Missing segments array"}), 400 79 + 80 + journal_root = Path(state.journal_root) 81 + log_path = get_state_directory(key_prefix) / "segments" / "log.jsonl" 82 + 83 + copied = 0 84 + skipped = 0 85 + deconflicted = 0 86 + errors: list[dict[str, str]] = [] 87 + new_state = {} 88 + 89 + for idx, segment in enumerate(segments): 90 + day = "" 91 + segment_key = "" 92 + try: 93 + if not isinstance(segment, dict): 94 + raise ValueError("Segment metadata must be an object") 95 + 96 + day = str(segment.get("day", "")).strip() 97 + stream = str(segment.get("stream", "")).strip() 98 + segment_key = str(segment.get("segment_key", "")).strip() 99 + files = segment.get("files") 100 + 101 + if not _DAY_RE.match(day): 102 + raise ValueError("Invalid day format") 103 + if not _STREAM_RE.match(stream): 104 + raise ValueError("Invalid stream format") 105 + if not _SEGMENT_RE.match(segment_key): 106 + raise ValueError("Invalid segment_key format") 107 + if not isinstance(files, list) or not files: 108 + raise ValueError("Segment must list at least one file") 109 + 110 + expected_names = [] 111 + for raw_name in files: 112 + name = secure_filename(str(raw_name)) 113 + if not name: 114 + raise ValueError("Invalid filename in metadata") 115 + expected_names.append(name) 116 + 117 + if len(set(expected_names)) != len(expected_names): 118 + raise ValueError("Duplicate filenames in metadata") 119 + 120 + uploaded_files = request.files.getlist(f"files_{idx}") 121 + file_infos: dict[str, dict[str, str | int | bytes]] = {} 122 + for upload in uploaded_files: 123 + if not upload.filename: 124 + continue 125 + filename = secure_filename(upload.filename) 126 + if not filename: 127 + continue 128 + content = upload.read() 129 + if len(content) == 0: 130 + continue 131 + if filename in file_infos: 132 + raise ValueError(f"Duplicate uploaded filename: {filename}") 133 + file_infos[filename] = { 134 + "name": filename, 135 + "content": content, 136 + "sha256": compute_bytes_sha256(content), 137 + "size": len(content), 138 + } 139 + 140 + expected_set = set(expected_names) 141 + uploaded_set = set(file_infos.keys()) 142 + if expected_set != uploaded_set: 143 + missing = sorted(expected_set - uploaded_set) 144 + unexpected = sorted(uploaded_set - expected_set) 145 + parts = [] 146 + if missing: 147 + parts.append(f"Missing uploaded files: {', '.join(missing)}") 148 + if unexpected: 149 + parts.append( 150 + f"Unexpected uploaded files: {', '.join(unexpected)}" 151 + ) 152 + raise ValueError("; ".join(parts)) 153 + 154 + original_segment_key = segment_key 155 + arc_key = f"{stream}/{segment_key}" 156 + day_dir = journal_root / day 157 + stream_dir = day_dir / stream 158 + segment_dir = stream_dir / segment_key 159 + action = "copied" 160 + reason = "new segment" 161 + 162 + if segment_dir.exists(): 163 + exact_match = True 164 + for name in expected_names: 165 + file_path = segment_dir / name 166 + if not file_path.is_file(): 167 + exact_match = False 168 + break 169 + if compute_file_sha256(file_path) != file_infos[name]["sha256"]: 170 + exact_match = False 171 + break 172 + 173 + if exact_match: 174 + action = "skipped" 175 + reason = "exact match" 176 + else: 177 + new_key = find_available_segment(stream_dir, segment_key) 178 + if new_key is None: 179 + raise ValueError("No available segment slot") 180 + segment_key = new_key 181 + arc_key = f"{stream}/{segment_key}" 182 + segment_dir = stream_dir / segment_key 183 + action = "deconflicted" 184 + reason = "segment key conflict" 185 + 186 + if action in {"copied", "deconflicted"}: 187 + segment_dir.mkdir(parents=True, exist_ok=True) 188 + for name in expected_names: 189 + (segment_dir / name).write_bytes(file_infos[name]["content"]) 190 + 191 + file_records = [ 192 + { 193 + "name": name, 194 + "sha256": str(file_infos[name]["sha256"]), 195 + "size": int(file_infos[name]["size"]), 196 + } 197 + for name in expected_names 198 + ] 199 + new_state.setdefault(day, {})[arc_key] = {"files": file_records} 200 + 201 + entry = { 202 + "ts": datetime.now(timezone.utc).isoformat(), 203 + "action": action, 204 + "item_type": "segment", 205 + "item_id": f"{day}/{arc_key}", 206 + "reason": reason, 207 + "files": expected_names, 208 + } 209 + if action == "deconflicted": 210 + entry["original_key"] = original_segment_key 211 + _append_decision(log_path, entry) 212 + 213 + if action == "copied": 214 + copied += 1 215 + elif action == "skipped": 216 + skipped += 1 217 + else: 218 + deconflicted += 1 219 + except Exception as exc: 220 + errors.append( 221 + { 222 + "segment_key": segment_key, 223 + "day": day, 224 + "error": str(exc), 225 + } 226 + ) 227 + 228 + if new_state: 229 + state_path = get_state_directory(key_prefix) / "segments" / "state.json" 230 + state_path.parent.mkdir(parents=True, exist_ok=True) 231 + try: 232 + existing = json.loads(state_path.read_text(encoding="utf-8")) 233 + except (OSError, json.JSONDecodeError): 234 + existing = {} 235 + 236 + for day, segments_for_day in new_state.items(): 237 + existing.setdefault(day, {}).update(segments_for_day) 238 + 239 + _write_state_atomic(state_path, existing) 240 + 241 + written = copied + deconflicted 242 + if written > 0: 243 + source = g.journal_source 244 + source.setdefault("stats", {}) 245 + source["stats"]["segments_received"] = ( 246 + source["stats"].get("segments_received", 0) + written 247 + ) 248 + save_journal_source(source) 249 + 250 + try: 251 + emit("supervisor", "request", cmd=["sol", "indexer", "--rescan"]) 252 + except Exception: 253 + logger.warning("Failed to trigger indexer rescan via Callosum") 254 + 255 + return jsonify( 256 + { 257 + "segments_received": written, 258 + "segments_skipped": skipped, 259 + "segments_deconflicted": deconflicted, 260 + "errors": errors, 261 + } 262 + )
+6
apps/import/routes.py
··· 948 948 except (OSError, json.JSONDecodeError): 949 949 data = {} 950 950 return jsonify(data) 951 + 952 + 953 + # Segment ingest routes (separate module to keep routes.py manageable) 954 + from .ingest import register_ingest_routes 955 + 956 + register_ingest_routes(import_bp)
+554
tests/test_segment_ingest.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import json 7 + from importlib import import_module 8 + from io import BytesIO 9 + 10 + import pytest 11 + from flask import Blueprint, Flask 12 + 13 + import convey.state 14 + from observe.utils import compute_bytes_sha256 15 + 16 + journal_sources = import_module("apps.import.journal_sources") 17 + ingest = import_module("apps.import.ingest") 18 + 19 + create_state_directory = journal_sources.create_state_directory 20 + generate_key = journal_sources.generate_key 21 + get_state_directory = journal_sources.get_state_directory 22 + load_journal_source = journal_sources.load_journal_source 23 + save_journal_source = journal_sources.save_journal_source 24 + register_ingest_routes = ingest.register_ingest_routes 25 + 26 + 27 + @pytest.fixture 28 + def journal_env(tmp_path, monkeypatch): 29 + """Set up journal root and source storage.""" 30 + monkeypatch.setattr(convey.state, "journal_root", str(tmp_path), raising=False) 31 + (tmp_path / "apps" / "import" / "journal_sources").mkdir( 32 + parents=True, exist_ok=True 33 + ) 34 + return tmp_path 35 + 36 + 37 + def _source(name="test-source", key=None, **overrides): 38 + if key is None: 39 + key = generate_key() 40 + source = { 41 + "name": name, 42 + "key": key, 43 + "created_at": 1000, 44 + "enabled": True, 45 + "revoked": False, 46 + "revoked_at": None, 47 + "stats": { 48 + "segments_received": 0, 49 + "entities_received": 0, 50 + "facets_received": 0, 51 + "imports_received": 0, 52 + "config_received": 0, 53 + }, 54 + } 55 + source.update(overrides) 56 + return source 57 + 58 + 59 + @pytest.fixture 60 + def ingest_env(journal_env): 61 + """Set up a source and create an app with the ingest route.""" 62 + key = generate_key() 63 + source = _source(key=key) 64 + save_journal_source(source) 65 + key_prefix = key[:8] 66 + create_state_directory(journal_env, key_prefix) 67 + 68 + app = Flask(__name__) 69 + app.config["TESTING"] = True 70 + bp = Blueprint("import-test", __name__, url_prefix="/app/import") 71 + register_ingest_routes(bp) 72 + app.register_blueprint(bp) 73 + 74 + return { 75 + "root": journal_env, 76 + "key": key, 77 + "key_prefix": key_prefix, 78 + "source": source, 79 + "client": app.test_client(), 80 + } 81 + 82 + 83 + def _build_ingest_payload(segments): 84 + metadata = { 85 + "segments": [ 86 + { 87 + "day": segment["day"], 88 + "stream": segment["stream"], 89 + "segment_key": segment["segment_key"], 90 + "files": [filename for filename, _content in segment["files"]], 91 + } 92 + for segment in segments 93 + ] 94 + } 95 + 96 + data = {"metadata": json.dumps(metadata)} 97 + for idx, segment in enumerate(segments): 98 + data[f"files_{idx}"] = [ 99 + (BytesIO(content), filename) for filename, content in segment["files"] 100 + ] 101 + return data 102 + 103 + 104 + def _post_ingest(client, key, key_prefix, segments): 105 + return client.post( 106 + f"/app/import/journal/{key_prefix}/ingest/segments", 107 + headers={"Authorization": f"Bearer {key}"}, 108 + data=_build_ingest_payload(segments), 109 + content_type="multipart/form-data", 110 + ) 111 + 112 + 113 + def _read_state(key_prefix: str) -> dict: 114 + state_path = get_state_directory(key_prefix) / "segments" / "state.json" 115 + return json.loads(state_path.read_text(encoding="utf-8")) 116 + 117 + 118 + def _read_log(key_prefix: str) -> list[dict]: 119 + log_path = get_state_directory(key_prefix) / "segments" / "log.jsonl" 120 + if not log_path.exists(): 121 + return [] 122 + return [ 123 + json.loads(line) 124 + for line in log_path.read_text(encoding="utf-8").splitlines() 125 + if line.strip() 126 + ] 127 + 128 + 129 + def test_ingest_new_segments(ingest_env): 130 + env = ingest_env 131 + segments = [ 132 + { 133 + "day": "20260413", 134 + "stream": "laptop", 135 + "segment_key": "143022_300", 136 + "files": [ 137 + ("audio.flac", b"audio one"), 138 + ("transcript.jsonl", b'{"text":"one"}\n'), 139 + ], 140 + }, 141 + { 142 + "day": "20260413", 143 + "stream": "laptop", 144 + "segment_key": "143500_300", 145 + "files": [("audio.flac", b"audio two")], 146 + }, 147 + ] 148 + 149 + response = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 150 + 151 + assert response.status_code == 200 152 + assert response.get_json() == { 153 + "segments_received": 2, 154 + "segments_skipped": 0, 155 + "segments_deconflicted": 0, 156 + "errors": [], 157 + } 158 + 159 + first_dir = env["root"] / "20260413" / "laptop" / "143022_300" 160 + second_dir = env["root"] / "20260413" / "laptop" / "143500_300" 161 + assert (first_dir / "audio.flac").read_bytes() == b"audio one" 162 + assert (first_dir / "transcript.jsonl").read_bytes() == b'{"text":"one"}\n' 163 + assert (second_dir / "audio.flac").read_bytes() == b"audio two" 164 + 165 + state_data = _read_state(env["key_prefix"]) 166 + assert set(state_data["20260413"]) == {"laptop/143022_300", "laptop/143500_300"} 167 + 168 + log_entries = _read_log(env["key_prefix"]) 169 + assert [entry["action"] for entry in log_entries] == ["copied", "copied"] 170 + assert log_entries[0]["item_id"] == "20260413/laptop/143022_300" 171 + assert log_entries[0]["item_type"] == "segment" 172 + assert log_entries[0]["reason"] == "new segment" 173 + 174 + 175 + def test_ingest_duplicate_detection(ingest_env): 176 + env = ingest_env 177 + segments = [ 178 + { 179 + "day": "20260413", 180 + "stream": "laptop", 181 + "segment_key": "143022_300", 182 + "files": [ 183 + ("audio.flac", b"audio one"), 184 + ("transcript.jsonl", b'{"text":"one"}\n'), 185 + ], 186 + }, 187 + { 188 + "day": "20260413", 189 + "stream": "laptop", 190 + "segment_key": "143500_300", 191 + "files": [("audio.flac", b"audio two")], 192 + }, 193 + ] 194 + 195 + first = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 196 + first_state_raw = ( 197 + get_state_directory(env["key_prefix"]) / "segments" / "state.json" 198 + ).read_text(encoding="utf-8") 199 + 200 + second = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 201 + 202 + assert first.status_code == 200 203 + assert second.status_code == 200 204 + assert second.get_json() == { 205 + "segments_received": 0, 206 + "segments_skipped": 2, 207 + "segments_deconflicted": 0, 208 + "errors": [], 209 + } 210 + 211 + second_state_raw = ( 212 + get_state_directory(env["key_prefix"]) / "segments" / "state.json" 213 + ).read_text(encoding="utf-8") 214 + assert second_state_raw == first_state_raw 215 + 216 + log_entries = _read_log(env["key_prefix"]) 217 + assert [entry["action"] for entry in log_entries] == [ 218 + "copied", 219 + "copied", 220 + "skipped", 221 + "skipped", 222 + ] 223 + 224 + 225 + def test_ingest_deconfliction(ingest_env, monkeypatch): 226 + env = ingest_env 227 + target_dir = env["root"] / "20260413" / "laptop" / "143022_300" 228 + target_dir.mkdir(parents=True, exist_ok=True) 229 + (target_dir / "audio.flac").write_bytes(b"existing audio") 230 + 231 + monkeypatch.setattr( 232 + ingest, "find_available_segment", lambda _parent, _seg: "143023_300" 233 + ) 234 + 235 + segments = [ 236 + { 237 + "day": "20260413", 238 + "stream": "laptop", 239 + "segment_key": "143022_300", 240 + "files": [("audio.flac", b"new audio")], 241 + } 242 + ] 243 + 244 + response = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 245 + 246 + assert response.status_code == 200 247 + assert response.get_json() == { 248 + "segments_received": 1, 249 + "segments_skipped": 0, 250 + "segments_deconflicted": 1, 251 + "errors": [], 252 + } 253 + assert ( 254 + env["root"] / "20260413" / "laptop" / "143023_300" / "audio.flac" 255 + ).read_bytes() == b"new audio" 256 + 257 + state_data = _read_state(env["key_prefix"]) 258 + assert "laptop/143023_300" in state_data["20260413"] 259 + 260 + log_entries = _read_log(env["key_prefix"]) 261 + assert log_entries[0]["action"] == "deconflicted" 262 + assert log_entries[0]["original_key"] == "143022_300" 263 + assert log_entries[0]["item_id"] == "20260413/laptop/143023_300" 264 + 265 + 266 + def test_ingest_batch_error_isolation(ingest_env): 267 + env = ingest_env 268 + segments = [ 269 + { 270 + "day": "bad-day", 271 + "stream": "laptop", 272 + "segment_key": "143022_300", 273 + "files": [("audio.flac", b"broken")], 274 + }, 275 + { 276 + "day": "20260413", 277 + "stream": "laptop", 278 + "segment_key": "143500_300", 279 + "files": [("audio.flac", b"good")], 280 + }, 281 + ] 282 + 283 + response = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 284 + 285 + assert response.status_code == 200 286 + body = response.get_json() 287 + assert body["segments_received"] == 1 288 + assert body["segments_skipped"] == 0 289 + assert body["segments_deconflicted"] == 0 290 + assert body["errors"] == [ 291 + { 292 + "segment_key": "143022_300", 293 + "day": "bad-day", 294 + "error": "Invalid day format", 295 + } 296 + ] 297 + assert ( 298 + env["root"] / "20260413" / "laptop" / "143500_300" / "audio.flac" 299 + ).read_bytes() == b"good" 300 + 301 + 302 + def test_ingest_missing_metadata(ingest_env): 303 + env = ingest_env 304 + response = env["client"].post( 305 + f"/app/import/journal/{env['key_prefix']}/ingest/segments", 306 + headers={"Authorization": f"Bearer {env['key']}"}, 307 + data={}, 308 + content_type="multipart/form-data", 309 + ) 310 + 311 + assert response.status_code == 400 312 + assert response.get_json() == {"error": "Missing metadata"} 313 + 314 + 315 + def test_ingest_malformed_metadata(ingest_env): 316 + env = ingest_env 317 + response = env["client"].post( 318 + f"/app/import/journal/{env['key_prefix']}/ingest/segments", 319 + headers={"Authorization": f"Bearer {env['key']}"}, 320 + data={"metadata": "not-json"}, 321 + content_type="multipart/form-data", 322 + ) 323 + 324 + assert response.status_code == 400 325 + assert response.get_json() == {"error": "Invalid metadata JSON"} 326 + 327 + 328 + def test_ingest_auth_missing(ingest_env): 329 + env = ingest_env 330 + response = env["client"].post( 331 + f"/app/import/journal/{env['key_prefix']}/ingest/segments", 332 + data={"metadata": json.dumps({"segments": []})}, 333 + content_type="multipart/form-data", 334 + ) 335 + 336 + assert response.status_code == 401 337 + 338 + 339 + def test_ingest_auth_invalid(ingest_env): 340 + env = ingest_env 341 + response = env["client"].post( 342 + f"/app/import/journal/{env['key_prefix']}/ingest/segments", 343 + headers={"Authorization": "Bearer wrong-token"}, 344 + data={"metadata": json.dumps({"segments": []})}, 345 + content_type="multipart/form-data", 346 + ) 347 + 348 + assert response.status_code == 401 349 + 350 + 351 + def test_ingest_auth_revoked(ingest_env): 352 + env = ingest_env 353 + env["source"]["revoked"] = True 354 + env["source"]["revoked_at"] = 12345 355 + save_journal_source(env["source"]) 356 + 357 + response = env["client"].post( 358 + f"/app/import/journal/{env['key_prefix']}/ingest/segments", 359 + headers={"Authorization": f"Bearer {env['key']}"}, 360 + data={"metadata": json.dumps({"segments": []})}, 361 + content_type="multipart/form-data", 362 + ) 363 + 364 + assert response.status_code == 403 365 + 366 + 367 + def test_ingest_key_prefix_mismatch(ingest_env): 368 + env = ingest_env 369 + response = env["client"].post( 370 + "/app/import/journal/deadbeef/ingest/segments", 371 + headers={"Authorization": f"Bearer {env['key']}"}, 372 + data={"metadata": json.dumps({"segments": []})}, 373 + content_type="multipart/form-data", 374 + ) 375 + 376 + assert response.status_code == 403 377 + 378 + 379 + def test_ingest_callosum_trigger(ingest_env, monkeypatch): 380 + env = ingest_env 381 + calls = [] 382 + 383 + def mock_emit(*args, **kwargs): 384 + calls.append((args, kwargs)) 385 + 386 + monkeypatch.setattr(ingest, "emit", mock_emit) 387 + 388 + segments = [ 389 + { 390 + "day": "20260413", 391 + "stream": "laptop", 392 + "segment_key": "143022_300", 393 + "files": [("audio.flac", b"audio one")], 394 + } 395 + ] 396 + 397 + first = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 398 + second = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 399 + 400 + assert first.status_code == 200 401 + assert second.status_code == 200 402 + assert calls == [ 403 + (("supervisor", "request"), {"cmd": ["sol", "indexer", "--rescan"]}) 404 + ] 405 + 406 + 407 + def test_ingest_callosum_failure_isolated(ingest_env, monkeypatch): 408 + env = ingest_env 409 + 410 + def mock_emit(*_args, **_kwargs): 411 + raise RuntimeError("bridge down") 412 + 413 + monkeypatch.setattr(ingest, "emit", mock_emit) 414 + 415 + segments = [ 416 + { 417 + "day": "20260413", 418 + "stream": "laptop", 419 + "segment_key": "143022_300", 420 + "files": [("audio.flac", b"audio one")], 421 + } 422 + ] 423 + 424 + response = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 425 + 426 + assert response.status_code == 200 427 + assert response.get_json() == { 428 + "segments_received": 1, 429 + "segments_skipped": 0, 430 + "segments_deconflicted": 0, 431 + "errors": [], 432 + } 433 + 434 + 435 + def test_ingest_skip_ignores_extra_existing_files(ingest_env): 436 + env = ingest_env 437 + segment_dir = env["root"] / "20260413" / "laptop" / "143022_300" 438 + segment_dir.mkdir(parents=True, exist_ok=True) 439 + (segment_dir / "audio.flac").write_bytes(b"audio one") 440 + (segment_dir / "extra.txt").write_bytes(b"keep me") 441 + 442 + segments = [ 443 + { 444 + "day": "20260413", 445 + "stream": "laptop", 446 + "segment_key": "143022_300", 447 + "files": [("audio.flac", b"audio one")], 448 + } 449 + ] 450 + 451 + response = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 452 + 453 + assert response.status_code == 200 454 + assert response.get_json() == { 455 + "segments_received": 0, 456 + "segments_skipped": 1, 457 + "segments_deconflicted": 0, 458 + "errors": [], 459 + } 460 + assert (segment_dir / "extra.txt").read_bytes() == b"keep me" 461 + 462 + 463 + def test_ingest_stats_update(ingest_env): 464 + env = ingest_env 465 + segments = [ 466 + { 467 + "day": "20260413", 468 + "stream": "laptop", 469 + "segment_key": "143022_300", 470 + "files": [("audio.flac", b"audio one")], 471 + } 472 + ] 473 + 474 + first = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 475 + source_after_first = load_journal_source(env["key"]) 476 + second = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 477 + source_after_second = load_journal_source(env["key"]) 478 + 479 + assert first.status_code == 200 480 + assert second.status_code == 200 481 + assert source_after_first["stats"]["segments_received"] == 1 482 + assert source_after_second["stats"]["segments_received"] == 1 483 + 484 + 485 + def test_ingest_state_json_manifest_sync(ingest_env): 486 + env = ingest_env 487 + segments = [ 488 + { 489 + "day": "20260413", 490 + "stream": "laptop", 491 + "segment_key": "143022_300", 492 + "files": [ 493 + ("audio.flac", b"audio one"), 494 + ("transcript.jsonl", b'{"text":"one"}\n'), 495 + ], 496 + } 497 + ] 498 + 499 + response = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 500 + state_data = _read_state(env["key_prefix"]) 501 + 502 + assert response.status_code == 200 503 + assert state_data == { 504 + "20260413": { 505 + "laptop/143022_300": { 506 + "files": [ 507 + { 508 + "name": "audio.flac", 509 + "sha256": compute_bytes_sha256(b"audio one"), 510 + "size": len(b"audio one"), 511 + }, 512 + { 513 + "name": "transcript.jsonl", 514 + "sha256": compute_bytes_sha256(b'{"text":"one"}\n'), 515 + "size": len(b'{"text":"one"}\n'), 516 + }, 517 + ] 518 + } 519 + } 520 + } 521 + 522 + 523 + def test_ingest_idempotent(ingest_env): 524 + env = ingest_env 525 + segments = [ 526 + { 527 + "day": "20260413", 528 + "stream": "laptop", 529 + "segment_key": "143022_300", 530 + "files": [("audio.flac", b"audio one")], 531 + } 532 + ] 533 + 534 + first = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 535 + first_state = ( 536 + get_state_directory(env["key_prefix"]) / "segments" / "state.json" 537 + ).read_text(encoding="utf-8") 538 + second = _post_ingest(env["client"], env["key"], env["key_prefix"], segments) 539 + second_state = ( 540 + get_state_directory(env["key_prefix"]) / "segments" / "state.json" 541 + ).read_text(encoding="utf-8") 542 + source = load_journal_source(env["key"]) 543 + 544 + assert first.status_code == 200 545 + assert second.status_code == 200 546 + assert first.get_json()["segments_received"] == 1 547 + assert second.get_json() == { 548 + "segments_received": 0, 549 + "segments_skipped": 1, 550 + "segments_deconflicted": 0, 551 + "errors": [], 552 + } 553 + assert first_state == second_state 554 + assert source["stats"]["segments_received"] == 1