personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add segment collision resolution for remote ingest

When a remote observer uploads files with a segment key that already
exists, the ingest endpoint now uses a random walk algorithm to find
an available slot by adjusting time (+/-1 second) or duration (+/-1).
After 100 failed attempts, files are saved to a quarantine directory
for manual review.

- Add _randomize_segment() for random +/-1 modifications
- Add _segment_exists() to check for directory or file conflicts
- Add _find_available_segment() with random walk up to 100 attempts
- Add _save_to_failed() to quarantine files on exhaustion
- Update ingest_upload() to detect collisions and adjust segment keys
- Failed uploads saved to remote/failed/<segment>/<timestamp>/

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+457 -1
+170 -1
apps/remote/routes.py
··· 14 14 import base64 15 15 import json 16 16 import logging 17 + import random 17 18 import re 18 19 import secrets 19 20 import time ··· 239 240 ) 240 241 241 242 243 + # === Segment collision helpers === 244 + 245 + # Maximum attempts to find available segment key 246 + MAX_SEGMENT_ATTEMPTS = 100 247 + 248 + 249 + def _randomize_segment(segment: str) -> str | None: 250 + """Apply random +/-1 to either time or duration component. 251 + 252 + Args: 253 + segment: Segment key in HHMMSS_LEN format 254 + 255 + Returns: 256 + Modified segment key, or None if modification would be invalid 257 + (crosses midnight in either direction, or duration would be <= 0) 258 + """ 259 + time_part, duration_str = segment.split("_") 260 + h = int(time_part[:2]) 261 + m = int(time_part[2:4]) 262 + s = int(time_part[4:6]) 263 + dur = int(duration_str) 264 + 265 + modify_time = random.choice([True, False]) 266 + delta = random.choice([1, -1]) 267 + 268 + if modify_time: 269 + # Modify time component 270 + total_seconds = h * 3600 + m * 60 + s + delta 271 + if total_seconds < 0 or total_seconds >= 86400: 272 + return None # Would cross midnight 273 + h = total_seconds // 3600 274 + m = (total_seconds % 3600) // 60 275 + s = total_seconds % 60 276 + else: 277 + # Modify duration component 278 + dur = dur + delta 279 + if dur <= 0: 280 + return None # Duration can't be zero or negative 281 + 282 + return f"{h:02d}{m:02d}{s:02d}_{dur}" 283 + 284 + 285 + def _segment_exists(day_dir: Path, segment: str) -> bool: 286 + """Check if segment key is already in use. 287 + 288 + Args: 289 + day_dir: Path to day directory 290 + segment: Segment key in HHMMSS_LEN format 291 + 292 + Returns: 293 + True if segment directory or files with segment prefix exist 294 + """ 295 + # Check for segment directory 296 + if (day_dir / segment).exists(): 297 + return True 298 + # Check for files starting with segment key 299 + if list(day_dir.glob(f"{segment}_*")): 300 + return True 301 + return False 302 + 303 + 304 + def _find_available_segment( 305 + day_dir: Path, segment: str, max_attempts: int = MAX_SEGMENT_ATTEMPTS 306 + ) -> str | None: 307 + """Find an available segment key using random modifications. 308 + 309 + Uses a random walk approach: each attempt randomly modifies either 310 + the time or duration by +/-1, exploring the space around the original. 311 + 312 + Args: 313 + day_dir: Path to day directory 314 + segment: Original segment key in HHMMSS_LEN format 315 + max_attempts: Maximum modification attempts before giving up 316 + 317 + Returns: 318 + Available segment key (may be original or modified), or None if 319 + no available slot found after max_attempts 320 + """ 321 + # Check if original is available 322 + if not _segment_exists(day_dir, segment): 323 + return segment 324 + 325 + current = segment 326 + tried = {segment} 327 + 328 + for _ in range(max_attempts): 329 + modified = _randomize_segment(current) 330 + 331 + if modified is None: 332 + # Invalid modification (hit boundary), try again from same position 333 + continue 334 + 335 + current = modified # Always move to valid position 336 + 337 + if modified in tried: 338 + continue # Already checked, don't recheck filesystem 339 + 340 + tried.add(modified) 341 + 342 + if not _segment_exists(day_dir, modified): 343 + return modified 344 + 345 + return None # Exhausted attempts 346 + 347 + 348 + def _save_to_failed(day_dir: Path, files: list, segment: str) -> Path: 349 + """Save files to failed directory for manual review. 350 + 351 + Files are saved with their original segment key (not adjusted) since 352 + the collision resolution failed. 353 + 354 + Args: 355 + day_dir: Path to day directory 356 + files: List of file upload objects from request 357 + segment: Original segment key (used in directory name) 358 + 359 + Returns: 360 + Path to the failed directory where files were saved 361 + """ 362 + # Use segment in path for easier identification of failed uploads 363 + failed_dir = day_dir / "remote" / "failed" / segment / str(int(time.time() * 1000)) 364 + failed_dir.mkdir(parents=True, exist_ok=True) 365 + 366 + for upload in files: 367 + if not upload.filename: 368 + continue 369 + filename = secure_filename(upload.filename) 370 + if not filename: 371 + continue 372 + target_path = failed_dir / filename 373 + upload.save(target_path) 374 + 375 + return failed_dir 376 + 377 + 242 378 # === Ingest API (key-protected) === 243 379 244 380 ··· 292 428 target_dir = day_path(day) 293 429 target_dir.mkdir(parents=True, exist_ok=True) 294 430 295 - # Save files 431 + # Find available segment key (may differ from original if collision) 432 + original_segment = segment 433 + available_segment = _find_available_segment(target_dir, segment) 434 + 435 + if available_segment is None: 436 + # Exhausted attempts, save to failed directory 437 + logger.error( 438 + f"No available segment slot for {day}/{segment} from " 439 + f"{remote.get('name')} after {MAX_SEGMENT_ATTEMPTS} attempts" 440 + ) 441 + failed_dir = _save_to_failed(target_dir, files, segment) 442 + return ( 443 + jsonify( 444 + { 445 + "status": "failed", 446 + "error": f"No available segment slot after {MAX_SEGMENT_ATTEMPTS} attempts", 447 + "failed_path": str(failed_dir.relative_to(target_dir.parent)), 448 + } 449 + ), 450 + 507, 451 + ) # Insufficient Storage 452 + 453 + segment = available_segment 454 + if segment != original_segment: 455 + logger.info( 456 + f"Segment collision resolved: {original_segment} -> {segment} " 457 + f"for remote {remote.get('name')}" 458 + ) 459 + 460 + # Save files with adjusted segment key in filenames 296 461 saved_files = [] 297 462 total_bytes = 0 298 463 ··· 304 469 filename = secure_filename(upload.filename) 305 470 if not filename: 306 471 continue 472 + 473 + # Replace original segment with adjusted segment in filename 474 + if original_segment != segment and original_segment in filename: 475 + filename = filename.replace(original_segment, segment, 1) 307 476 308 477 target_path = target_dir / filename 309 478
+287
apps/remote/tests/test_routes.py
··· 435 435 436 436 resp = env.client.get("/app/remote/api/nonexistent/key") 437 437 assert resp.status_code == 404 438 + 439 + 440 + # === Segment collision helper tests === 441 + 442 + 443 + def test_randomize_segment_produces_valid_output(): 444 + """Test that _randomize_segment produces valid segment keys.""" 445 + from apps.remote.routes import _randomize_segment 446 + 447 + result = _randomize_segment("120000_300") 448 + 449 + # Result is either None (boundary hit) or a valid segment 450 + if result is not None: 451 + assert "_" in result 452 + time_part, dur_part = result.split("_") 453 + assert len(time_part) == 6 454 + assert time_part.isdigit() 455 + assert dur_part.isdigit() 456 + assert int(dur_part) > 0 457 + 458 + 459 + def test_randomize_segment_never_produces_invalid_time(): 460 + """Test that _randomize_segment never produces times outside 00:00:00-23:59:59.""" 461 + from apps.remote.routes import _randomize_segment 462 + 463 + # Test at boundaries - should return None or valid, never invalid 464 + for segment in ["000000_300", "235959_300"]: 465 + for _ in range(20): 466 + result = _randomize_segment(segment) 467 + if result is not None: 468 + time_part = result.split("_")[0] 469 + hours = int(time_part[:2]) 470 + assert 0 <= hours <= 23 471 + 472 + 473 + def test_randomize_segment_never_produces_zero_duration(): 474 + """Test that _randomize_segment never produces duration <= 0.""" 475 + from apps.remote.routes import _randomize_segment 476 + 477 + # Test at duration boundary 478 + for _ in range(20): 479 + result = _randomize_segment("120000_1") 480 + if result is not None: 481 + dur = int(result.split("_")[1]) 482 + assert dur > 0 483 + 484 + 485 + def test_segment_exists_with_directory(remote_env): 486 + """Test _segment_exists detects existing segment directory.""" 487 + from apps.remote.routes import _segment_exists 488 + 489 + env = remote_env() 490 + day_dir = env.journal / "20250103" 491 + day_dir.mkdir(parents=True) 492 + 493 + # Create a segment directory 494 + segment_dir = day_dir / "120000_300" 495 + segment_dir.mkdir() 496 + 497 + assert _segment_exists(day_dir, "120000_300") is True 498 + assert _segment_exists(day_dir, "120001_300") is False 499 + 500 + 501 + def test_segment_exists_with_files(remote_env): 502 + """Test _segment_exists detects files with segment prefix.""" 503 + from apps.remote.routes import _segment_exists 504 + 505 + env = remote_env() 506 + day_dir = env.journal / "20250103" 507 + day_dir.mkdir(parents=True) 508 + 509 + # Create a file with segment prefix 510 + (day_dir / "120000_300_audio.flac").write_bytes(b"test") 511 + 512 + assert _segment_exists(day_dir, "120000_300") is True 513 + assert _segment_exists(day_dir, "120001_300") is False 514 + 515 + 516 + def test_segment_exists_empty_directory(remote_env): 517 + """Test _segment_exists returns False for empty directory.""" 518 + from apps.remote.routes import _segment_exists 519 + 520 + env = remote_env() 521 + day_dir = env.journal / "20250103" 522 + day_dir.mkdir(parents=True) 523 + 524 + assert _segment_exists(day_dir, "120000_300") is False 525 + 526 + 527 + def test_find_available_segment_no_conflict(remote_env): 528 + """Test _find_available_segment returns original when no conflict.""" 529 + from apps.remote.routes import _find_available_segment 530 + 531 + env = remote_env() 532 + day_dir = env.journal / "20250103" 533 + day_dir.mkdir(parents=True) 534 + 535 + result = _find_available_segment(day_dir, "120000_300") 536 + assert result == "120000_300" 537 + 538 + 539 + def test_find_available_segment_with_conflict(remote_env): 540 + """Test _find_available_segment finds alternative when conflict exists.""" 541 + from apps.remote.routes import _find_available_segment 542 + 543 + env = remote_env() 544 + day_dir = env.journal / "20250103" 545 + day_dir.mkdir(parents=True) 546 + 547 + # Create conflicting file 548 + (day_dir / "120000_300_audio.flac").write_bytes(b"test") 549 + 550 + result = _find_available_segment(day_dir, "120000_300") 551 + 552 + # Should find a different segment 553 + assert result is not None 554 + assert result != "120000_300" 555 + # Should be a valid segment format 556 + assert "_" in result 557 + time_part, dur_part = result.split("_") 558 + assert len(time_part) == 6 559 + assert dur_part.isdigit() 560 + 561 + 562 + def test_find_available_segment_with_limited_attempts(remote_env): 563 + """Test _find_available_segment respects max_attempts limit.""" 564 + from apps.remote.routes import _find_available_segment 565 + 566 + env = remote_env() 567 + day_dir = env.journal / "20250103" 568 + day_dir.mkdir(parents=True) 569 + 570 + # Create conflicting file 571 + (day_dir / "120000_300_audio.flac").write_bytes(b"test") 572 + 573 + # With max_attempts=0, should return None immediately (no attempts allowed) 574 + result = _find_available_segment(day_dir, "120000_300", max_attempts=0) 575 + assert result is None 576 + 577 + 578 + def test_save_to_failed_creates_directory(remote_env): 579 + """Test _save_to_failed creates failed directory structure.""" 580 + from io import BytesIO 581 + 582 + from werkzeug.datastructures import FileStorage 583 + 584 + from apps.remote.routes import _save_to_failed 585 + 586 + env = remote_env() 587 + day_dir = env.journal / "20250103" 588 + day_dir.mkdir(parents=True) 589 + 590 + # Create mock file uploads 591 + files = [ 592 + FileStorage(stream=BytesIO(b"audio data"), filename="120000_300_audio.flac"), 593 + FileStorage(stream=BytesIO(b"video data"), filename="120000_300_screen.webm"), 594 + ] 595 + 596 + failed_dir = _save_to_failed(day_dir, files, "120000_300") 597 + 598 + # Verify structure includes segment key 599 + assert failed_dir.exists() 600 + assert "remote/failed/120000_300/" in str(failed_dir) 601 + assert (failed_dir / "120000_300_audio.flac").exists() 602 + assert (failed_dir / "120000_300_screen.webm").exists() 603 + 604 + 605 + # === Integration tests for collision handling === 606 + 607 + 608 + def test_ingest_collision_adjusts_segment(remote_env): 609 + """Test that ingest adjusts segment key on collision.""" 610 + env = remote_env() 611 + 612 + # Create a remote 613 + resp = env.client.post( 614 + "/app/remote/api/create", 615 + json={"name": "collision-test"}, 616 + content_type="application/json", 617 + ) 618 + key = resp.get_json()["key"] 619 + 620 + # Create a conflicting file 621 + day_dir = env.journal / "20250103" 622 + day_dir.mkdir(parents=True) 623 + (day_dir / "120000_300_audio.flac").write_bytes(b"existing") 624 + 625 + # Upload with same segment key 626 + test_data = b"new audio content" 627 + resp = env.client.post( 628 + f"/app/remote/ingest/{key}", 629 + data={ 630 + "day": "20250103", 631 + "segment": "120000_300", 632 + "files": (io.BytesIO(test_data), "120000_300_audio.flac"), 633 + }, 634 + ) 635 + 636 + assert resp.status_code == 200 637 + data = resp.get_json() 638 + assert data["status"] == "ok" 639 + 640 + # The filename should have been adjusted 641 + saved_file = data["files"][0] 642 + assert saved_file != "120000_300_audio.flac" 643 + assert "_audio.flac" in saved_file 644 + 645 + # Verify both files exist 646 + assert (day_dir / "120000_300_audio.flac").exists() # Original 647 + assert (day_dir / saved_file).exists() # New adjusted 648 + 649 + 650 + def test_ingest_no_collision_preserves_segment(remote_env): 651 + """Test that ingest preserves segment key when no collision.""" 652 + env = remote_env() 653 + 654 + # Create a remote 655 + resp = env.client.post( 656 + "/app/remote/api/create", 657 + json={"name": "no-collision-test"}, 658 + content_type="application/json", 659 + ) 660 + key = resp.get_json()["key"] 661 + 662 + # Upload without any conflicting files 663 + test_data = b"audio content" 664 + resp = env.client.post( 665 + f"/app/remote/ingest/{key}", 666 + data={ 667 + "day": "20250103", 668 + "segment": "120000_300", 669 + "files": (io.BytesIO(test_data), "120000_300_audio.flac"), 670 + }, 671 + ) 672 + 673 + assert resp.status_code == 200 674 + data = resp.get_json() 675 + assert data["status"] == "ok" 676 + assert data["files"] == ["120000_300_audio.flac"] 677 + 678 + # Verify file saved with original name 679 + expected_file = env.journal / "20250103" / "120000_300_audio.flac" 680 + assert expected_file.exists() 681 + 682 + 683 + def test_ingest_stats_use_adjusted_segment(remote_env): 684 + """Test that remote stats record the adjusted segment key.""" 685 + env = remote_env() 686 + 687 + # Create a remote 688 + resp = env.client.post( 689 + "/app/remote/api/create", 690 + json={"name": "stats-adjust-test"}, 691 + content_type="application/json", 692 + ) 693 + key = resp.get_json()["key"] 694 + 695 + # Create a conflicting file 696 + day_dir = env.journal / "20250103" 697 + day_dir.mkdir(parents=True) 698 + (day_dir / "120000_300_audio.flac").write_bytes(b"existing") 699 + 700 + # Upload with same segment key 701 + test_data = b"new audio" 702 + resp = env.client.post( 703 + f"/app/remote/ingest/{key}", 704 + data={ 705 + "day": "20250103", 706 + "segment": "120000_300", 707 + "files": (io.BytesIO(test_data), "120000_300_audio.flac"), 708 + }, 709 + ) 710 + 711 + assert resp.status_code == 200 712 + 713 + # Check stats - last_segment should be the adjusted one 714 + resp = env.client.get("/app/remote/api/list") 715 + remotes = resp.get_json() 716 + assert len(remotes) == 1 717 + # The stored segment should be different from original 718 + last_segment = remotes[0]["last_segment"] 719 + assert last_segment is not None 720 + # It should be adjusted (not the original conflicting one) 721 + assert ( 722 + last_segment != "120000_300" 723 + or (day_dir / f"{last_segment}_audio.flac").exists() 724 + )