Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

test(tranquil-pds): same-rkey batch coverage and inductive inverse for in-batch dups

Lewis: May this revision serve well! <lu5a@proton.me>

authored by did:plc:mb5to35neicxt4gemstoro… and committed by

Tangled af382151 8f7aad37

+1079 -2
+1 -1
crates/tranquil-pds/tests/lifecycle_record.rs
··· 456 456 "writes": [ 457 457 { "$type": "com.atproto.repo.applyWrites#create", "collection": "app.bsky.feed.post", "rkey": "batch-post-1", "value": { "$type": "app.bsky.feed.post", "text": "First batch post", "createdAt": now } }, 458 458 { "$type": "com.atproto.repo.applyWrites#create", "collection": "app.bsky.feed.post", "rkey": "batch-post-2", "value": { "$type": "app.bsky.feed.post", "text": "Second batch post", "createdAt": now } }, 459 - { "$type": "com.atproto.repo.applyWrites#create", "collection": "app.bsky.actor.profile", "rkey": "self", "value": { "$type": "app.bsky.actor.profile", "displayName": "Batch User" } } 459 + { "$type": "com.atproto.repo.applyWrites#update", "collection": "app.bsky.actor.profile", "rkey": "self", "value": { "$type": "app.bsky.actor.profile", "displayName": "Batch User" } } 460 460 ] 461 461 }); 462 462 let apply_res = client
+58 -1
crates/tranquil-pds/tests/mst_inductive_firehose.rs
··· 124 124 let new_data_cid = new_commit_data_cid(&storage, &commit_cid).await?; 125 125 126 126 let mut mst = Mst::load(storage.clone(), new_data_cid, None); 127 - for op_value in ops_json(event)? { 127 + for op_value in ops_json(event)?.iter().rev() { 128 128 let verified = parse_op_to_verified(op_value)?; 129 129 let inverted = mst 130 130 .invert_op(verified.clone()) ··· 544 544 } 545 545 } 546 546 report_failures(non_genesis.len(), &failures, "any inverse"); 547 + } 548 + 549 + #[tokio::test] 550 + async fn inductive_inverse_handles_same_rkey_in_batch() { 551 + let client = client(); 552 + let (token, did) = create_account_and_login(&client).await; 553 + 554 + let now = chrono::Utc::now().to_rfc3339(); 555 + let rkey = rkey_for("dup", 0); 556 + create_record(&client, &token, &did, COLLECTION, &rkey).await; 557 + 558 + let writes = vec![ 559 + json!({ 560 + "$type": "com.atproto.repo.applyWrites#update", 561 + "collection": COLLECTION, 562 + "rkey": rkey, 563 + "value": { 564 + "$type": COLLECTION, 565 + "text": "v1", 566 + "createdAt": now, 567 + } 568 + }), 569 + json!({ 570 + "$type": "com.atproto.repo.applyWrites#update", 571 + "collection": COLLECTION, 572 + "rkey": rkey, 573 + "value": { 574 + "$type": COLLECTION, 575 + "text": "v2", 576 + "createdAt": now, 577 + } 578 + }), 579 + ]; 580 + apply_writes_batch(&client, &token, &did, writes).await; 581 + 582 + let our = our_commit_events(&did).await; 583 + let dup_event = our 584 + .iter() 585 + .find(|e| { 586 + ops_json(e) 587 + .map(|arr| { 588 + arr.iter() 589 + .filter(|op| op["action"].as_str() == Some("update")) 590 + .count() 591 + == 2 592 + }) 593 + .unwrap_or(false) 594 + }) 595 + .expect("commit event with two same-rkey updates"); 596 + 597 + let (exp, got) = verify_inductive_inverse(dup_event) 598 + .await 599 + .expect("inverse verify should succeed for same-rkey batch"); 600 + assert_eq!( 601 + exp, got, 602 + "inverse root mismatch for same-rkey batch: exp={exp} got={got}" 603 + ); 547 604 } 548 605 549 606 #[tokio::test]
+1020
crates/tranquil-pds/tests/repo_batch.rs
··· 3 3 use common::*; 4 4 use reqwest::StatusCode; 5 5 use serde_json::{Value, json}; 6 + use tranquil_db_traits::{Backlink, BacklinkPath}; 7 + use tranquil_pds::types::{AtUri, Did, Nsid}; 6 8 7 9 #[tokio::test] 8 10 async fn test_apply_writes_create() { ··· 312 314 .expect("Failed to send request"); 313 315 assert_eq!(res.status(), StatusCode::BAD_REQUEST); 314 316 } 317 + 318 + #[tokio::test] 319 + async fn test_apply_writes_delete_then_create_same_rkey() { 320 + let client = client(); 321 + let (token, did) = create_account_and_login(&client).await; 322 + let now = Utc::now().to_rfc3339(); 323 + let rkey = format!("recreate_{}", Utc::now().timestamp_millis()); 324 + 325 + let create_payload = json!({ 326 + "repo": did, 327 + "collection": "app.bsky.feed.post", 328 + "rkey": rkey, 329 + "record": { 330 + "$type": "app.bsky.feed.post", 331 + "text": "original", 332 + "createdAt": now 333 + } 334 + }); 335 + let res = client 336 + .post(format!( 337 + "{}/xrpc/com.atproto.repo.putRecord", 338 + base_url().await 339 + )) 340 + .bearer_auth(&token) 341 + .json(&create_payload) 342 + .send() 343 + .await 344 + .expect("Failed to create"); 345 + assert_eq!(res.status(), StatusCode::OK); 346 + 347 + let recreate_payload = json!({ 348 + "repo": did, 349 + "writes": [ 350 + { 351 + "$type": "com.atproto.repo.applyWrites#delete", 352 + "collection": "app.bsky.feed.post", 353 + "rkey": rkey 354 + }, 355 + { 356 + "$type": "com.atproto.repo.applyWrites#create", 357 + "collection": "app.bsky.feed.post", 358 + "rkey": rkey, 359 + "value": { 360 + "$type": "app.bsky.feed.post", 361 + "text": "recreated", 362 + "createdAt": now 363 + } 364 + } 365 + ] 366 + }); 367 + let res = client 368 + .post(format!( 369 + "{}/xrpc/com.atproto.repo.applyWrites", 370 + base_url().await 371 + )) 372 + .bearer_auth(&token) 373 + .json(&recreate_payload) 374 + .send() 375 + .await 376 + .expect("Failed to send applyWrites"); 377 + assert_eq!(res.status(), StatusCode::OK); 378 + 379 + let get_res = client 380 + .get(format!( 381 + "{}/xrpc/com.atproto.repo.getRecord", 382 + base_url().await 383 + )) 384 + .query(&[ 385 + ("repo", did.as_str()), 386 + ("collection", "app.bsky.feed.post"), 387 + ("rkey", rkey.as_str()), 388 + ]) 389 + .send() 390 + .await 391 + .expect("Failed to fetch record"); 392 + assert_eq!( 393 + get_res.status(), 394 + StatusCode::OK, 395 + "repo.getRecord must return the recreated record, not 404" 396 + ); 397 + let body: Value = get_res.json().await.expect("Response was not valid JSON"); 398 + assert_eq!(body["value"]["text"], json!("recreated")); 399 + 400 + let list_res = client 401 + .get(format!( 402 + "{}/xrpc/com.atproto.repo.listRecords", 403 + base_url().await 404 + )) 405 + .query(&[ 406 + ("repo", did.as_str()), 407 + ("collection", "app.bsky.feed.post"), 408 + ]) 409 + .send() 410 + .await 411 + .expect("Failed to list records"); 412 + assert_eq!(list_res.status(), StatusCode::OK); 413 + let list_body: Value = list_res.json().await.expect("listRecords not valid JSON"); 414 + let records = list_body["records"] 415 + .as_array() 416 + .expect("records must be array"); 417 + let expected_uri = format!("at://{}/app.bsky.feed.post/{}", did, rkey); 418 + assert!( 419 + records.iter().any(|r| r["uri"] == json!(expected_uri)), 420 + "listRecords must include the recreated rkey" 421 + ); 422 + } 423 + 424 + #[tokio::test] 425 + async fn test_apply_writes_create_then_delete_same_rkey() { 426 + let client = client(); 427 + let (token, did) = create_account_and_login(&client).await; 428 + let now = Utc::now().to_rfc3339(); 429 + let rkey = format!("transient_{}", Utc::now().timestamp_millis()); 430 + 431 + let payload = json!({ 432 + "repo": did, 433 + "writes": [ 434 + { 435 + "$type": "com.atproto.repo.applyWrites#create", 436 + "collection": "app.bsky.feed.post", 437 + "rkey": rkey, 438 + "value": { 439 + "$type": "app.bsky.feed.post", 440 + "text": "transient", 441 + "createdAt": now 442 + } 443 + }, 444 + { 445 + "$type": "com.atproto.repo.applyWrites#delete", 446 + "collection": "app.bsky.feed.post", 447 + "rkey": rkey 448 + } 449 + ] 450 + }); 451 + let res = client 452 + .post(format!( 453 + "{}/xrpc/com.atproto.repo.applyWrites", 454 + base_url().await 455 + )) 456 + .bearer_auth(&token) 457 + .json(&payload) 458 + .send() 459 + .await 460 + .expect("Failed to send applyWrites"); 461 + assert_eq!(res.status(), StatusCode::OK); 462 + 463 + let get_res = client 464 + .get(format!( 465 + "{}/xrpc/com.atproto.repo.getRecord", 466 + base_url().await 467 + )) 468 + .query(&[ 469 + ("repo", did.as_str()), 470 + ("collection", "app.bsky.feed.post"), 471 + ("rkey", rkey.as_str()), 472 + ]) 473 + .send() 474 + .await 475 + .expect("Failed to fetch record"); 476 + assert_eq!( 477 + get_res.status(), 478 + StatusCode::NOT_FOUND, 479 + "create+delete on same rkey must leave no record" 480 + ); 481 + 482 + let list_res = client 483 + .get(format!( 484 + "{}/xrpc/com.atproto.repo.listRecords", 485 + base_url().await 486 + )) 487 + .query(&[ 488 + ("repo", did.as_str()), 489 + ("collection", "app.bsky.feed.post"), 490 + ]) 491 + .send() 492 + .await 493 + .expect("Failed to list records"); 494 + assert_eq!(list_res.status(), StatusCode::OK); 495 + let list_body: Value = list_res.json().await.expect("listRecords not valid JSON"); 496 + let records = list_body["records"] 497 + .as_array() 498 + .expect("records must be array"); 499 + let expected_uri = format!("at://{}/app.bsky.feed.post/{}", did, rkey); 500 + assert!( 501 + !records.iter().any(|r| r["uri"] == json!(expected_uri)), 502 + "listRecords must not include a created-then-deleted rkey" 503 + ); 504 + } 505 + 506 + async fn repo_id_for_did(did: &str) -> uuid::Uuid { 507 + let repos = get_test_repos().await; 508 + let parsed = Did::new(did).expect("valid did"); 509 + repos 510 + .user 511 + .get_id_by_did(&parsed) 512 + .await 513 + .expect("lookup user_id") 514 + .expect("user exists") 515 + } 516 + 517 + async fn follow_uris_pointing_to(repo_id: uuid::Uuid, target_did: &str) -> Vec<String> { 518 + let repos = get_test_repos().await; 519 + let probe = Backlink { 520 + uri: AtUri::from_parts("did:plc:probe", "app.bsky.graph.follow", "probe"), 521 + path: BacklinkPath::Subject, 522 + link_to: target_did.to_string(), 523 + }; 524 + let collection = Nsid::new("app.bsky.graph.follow").expect("valid nsid"); 525 + repos 526 + .backlink 527 + .get_backlink_conflicts(repo_id, &collection, &[probe]) 528 + .await 529 + .expect("backlink query") 530 + .into_iter() 531 + .map(|u| u.as_str().to_string()) 532 + .collect() 533 + } 534 + 535 + #[tokio::test] 536 + async fn test_apply_writes_update_then_update_same_rkey() { 537 + let client = client(); 538 + let (token, did) = create_account_and_login(&client).await; 539 + let now = Utc::now().to_rfc3339(); 540 + let rkey = format!("double_update_{}", Utc::now().timestamp_millis()); 541 + 542 + let create_payload = json!({ 543 + "repo": did, 544 + "collection": "app.bsky.feed.post", 545 + "rkey": rkey, 546 + "record": { 547 + "$type": "app.bsky.feed.post", 548 + "text": "v0", 549 + "createdAt": now 550 + } 551 + }); 552 + let res = client 553 + .post(format!( 554 + "{}/xrpc/com.atproto.repo.putRecord", 555 + base_url().await 556 + )) 557 + .bearer_auth(&token) 558 + .json(&create_payload) 559 + .send() 560 + .await 561 + .expect("Failed to create"); 562 + assert_eq!(res.status(), StatusCode::OK); 563 + 564 + let payload = json!({ 565 + "repo": did, 566 + "writes": [ 567 + { 568 + "$type": "com.atproto.repo.applyWrites#update", 569 + "collection": "app.bsky.feed.post", 570 + "rkey": rkey, 571 + "value": { 572 + "$type": "app.bsky.feed.post", 573 + "text": "v1", 574 + "createdAt": now 575 + } 576 + }, 577 + { 578 + "$type": "com.atproto.repo.applyWrites#update", 579 + "collection": "app.bsky.feed.post", 580 + "rkey": rkey, 581 + "value": { 582 + "$type": "app.bsky.feed.post", 583 + "text": "v2", 584 + "createdAt": now 585 + } 586 + } 587 + ] 588 + }); 589 + let res = client 590 + .post(format!( 591 + "{}/xrpc/com.atproto.repo.applyWrites", 592 + base_url().await 593 + )) 594 + .bearer_auth(&token) 595 + .json(&payload) 596 + .send() 597 + .await 598 + .expect("Failed to send applyWrites"); 599 + assert_eq!(res.status(), StatusCode::OK); 600 + let body: Value = res.json().await.expect("Response was not valid JSON"); 601 + let final_cid = body["results"] 602 + .as_array() 603 + .and_then(|r| r.last()) 604 + .and_then(|r| r["cid"].as_str()) 605 + .expect("last result must carry a cid") 606 + .to_string(); 607 + 608 + let get_res = client 609 + .get(format!( 610 + "{}/xrpc/com.atproto.repo.getRecord", 611 + base_url().await 612 + )) 613 + .query(&[ 614 + ("repo", did.as_str()), 615 + ("collection", "app.bsky.feed.post"), 616 + ("rkey", rkey.as_str()), 617 + ]) 618 + .send() 619 + .await 620 + .expect("Failed to fetch record"); 621 + assert_eq!(get_res.status(), StatusCode::OK); 622 + let stored: Value = get_res.json().await.expect("Response was not valid JSON"); 623 + assert_eq!(stored["value"]["text"], json!("v2")); 624 + assert_eq!(stored["cid"], json!(final_cid)); 625 + } 626 + 627 + #[tokio::test] 628 + async fn test_apply_writes_update_then_delete_same_rkey() { 629 + let client = client(); 630 + let (token, did) = create_account_and_login(&client).await; 631 + let now = Utc::now().to_rfc3339(); 632 + let rkey = format!("update_delete_{}", Utc::now().timestamp_millis()); 633 + 634 + let create_payload = json!({ 635 + "repo": did, 636 + "collection": "app.bsky.feed.post", 637 + "rkey": rkey, 638 + "record": { 639 + "$type": "app.bsky.feed.post", 640 + "text": "v0", 641 + "createdAt": now 642 + } 643 + }); 644 + let res = client 645 + .post(format!( 646 + "{}/xrpc/com.atproto.repo.putRecord", 647 + base_url().await 648 + )) 649 + .bearer_auth(&token) 650 + .json(&create_payload) 651 + .send() 652 + .await 653 + .expect("Failed to create"); 654 + assert_eq!(res.status(), StatusCode::OK); 655 + 656 + let payload = json!({ 657 + "repo": did, 658 + "writes": [ 659 + { 660 + "$type": "com.atproto.repo.applyWrites#update", 661 + "collection": "app.bsky.feed.post", 662 + "rkey": rkey, 663 + "value": { 664 + "$type": "app.bsky.feed.post", 665 + "text": "v1", 666 + "createdAt": now 667 + } 668 + }, 669 + { 670 + "$type": "com.atproto.repo.applyWrites#delete", 671 + "collection": "app.bsky.feed.post", 672 + "rkey": rkey 673 + } 674 + ] 675 + }); 676 + let res = client 677 + .post(format!( 678 + "{}/xrpc/com.atproto.repo.applyWrites", 679 + base_url().await 680 + )) 681 + .bearer_auth(&token) 682 + .json(&payload) 683 + .send() 684 + .await 685 + .expect("Failed to send applyWrites"); 686 + assert_eq!(res.status(), StatusCode::OK); 687 + 688 + let get_res = client 689 + .get(format!( 690 + "{}/xrpc/com.atproto.repo.getRecord", 691 + base_url().await 692 + )) 693 + .query(&[ 694 + ("repo", did.as_str()), 695 + ("collection", "app.bsky.feed.post"), 696 + ("rkey", rkey.as_str()), 697 + ]) 698 + .send() 699 + .await 700 + .expect("Failed to fetch record"); 701 + assert_eq!(get_res.status(), StatusCode::NOT_FOUND); 702 + } 703 + 704 + #[tokio::test] 705 + async fn test_apply_writes_delete_then_update_same_rkey_rejected() { 706 + let client = client(); 707 + let (token, did) = create_account_and_login(&client).await; 708 + let now = Utc::now().to_rfc3339(); 709 + let rkey = format!("delete_update_{}", Utc::now().timestamp_millis()); 710 + 711 + let create_payload = json!({ 712 + "repo": did, 713 + "collection": "app.bsky.feed.post", 714 + "rkey": rkey, 715 + "record": { 716 + "$type": "app.bsky.feed.post", 717 + "text": "v0", 718 + "createdAt": now 719 + } 720 + }); 721 + let res = client 722 + .post(format!( 723 + "{}/xrpc/com.atproto.repo.putRecord", 724 + base_url().await 725 + )) 726 + .bearer_auth(&token) 727 + .json(&create_payload) 728 + .send() 729 + .await 730 + .expect("Failed to seed record"); 731 + assert_eq!(res.status(), StatusCode::OK); 732 + 733 + let payload = json!({ 734 + "repo": did, 735 + "writes": [ 736 + { 737 + "$type": "com.atproto.repo.applyWrites#delete", 738 + "collection": "app.bsky.feed.post", 739 + "rkey": rkey 740 + }, 741 + { 742 + "$type": "com.atproto.repo.applyWrites#update", 743 + "collection": "app.bsky.feed.post", 744 + "rkey": rkey, 745 + "value": { 746 + "$type": "app.bsky.feed.post", 747 + "text": "v1", 748 + "createdAt": now 749 + } 750 + } 751 + ] 752 + }); 753 + let res = client 754 + .post(format!( 755 + "{}/xrpc/com.atproto.repo.applyWrites", 756 + base_url().await 757 + )) 758 + .bearer_auth(&token) 759 + .json(&payload) 760 + .send() 761 + .await 762 + .expect("Failed to send applyWrites"); 763 + assert_eq!( 764 + res.status(), 765 + StatusCode::BAD_REQUEST, 766 + "update of a record deleted earlier in the same batch must be rejected" 767 + ); 768 + 769 + let get_res = client 770 + .get(format!( 771 + "{}/xrpc/com.atproto.repo.getRecord", 772 + base_url().await 773 + )) 774 + .query(&[ 775 + ("repo", did.as_str()), 776 + ("collection", "app.bsky.feed.post"), 777 + ("rkey", rkey.as_str()), 778 + ]) 779 + .send() 780 + .await 781 + .expect("Failed to fetch record"); 782 + assert_eq!(get_res.status(), StatusCode::OK); 783 + let body: Value = get_res.json().await.expect("Response was not valid JSON"); 784 + assert_eq!( 785 + body["value"]["text"], 786 + json!("v0"), 787 + "rejected batch must leave the seed record untouched" 788 + ); 789 + } 790 + 791 + #[tokio::test] 792 + async fn test_apply_writes_create_update_delete_same_rkey() { 793 + let client = client(); 794 + let (token, did) = create_account_and_login(&client).await; 795 + let now = Utc::now().to_rfc3339(); 796 + let rkey = format!("triple_{}", Utc::now().timestamp_millis()); 797 + 798 + let payload = json!({ 799 + "repo": did, 800 + "writes": [ 801 + { 802 + "$type": "com.atproto.repo.applyWrites#create", 803 + "collection": "app.bsky.feed.post", 804 + "rkey": rkey, 805 + "value": { 806 + "$type": "app.bsky.feed.post", 807 + "text": "v0", 808 + "createdAt": now 809 + } 810 + }, 811 + { 812 + "$type": "com.atproto.repo.applyWrites#update", 813 + "collection": "app.bsky.feed.post", 814 + "rkey": rkey, 815 + "value": { 816 + "$type": "app.bsky.feed.post", 817 + "text": "v1", 818 + "createdAt": now 819 + } 820 + }, 821 + { 822 + "$type": "com.atproto.repo.applyWrites#delete", 823 + "collection": "app.bsky.feed.post", 824 + "rkey": rkey 825 + } 826 + ] 827 + }); 828 + let res = client 829 + .post(format!( 830 + "{}/xrpc/com.atproto.repo.applyWrites", 831 + base_url().await 832 + )) 833 + .bearer_auth(&token) 834 + .json(&payload) 835 + .send() 836 + .await 837 + .expect("Failed to send applyWrites"); 838 + assert_eq!(res.status(), StatusCode::OK); 839 + 840 + let get_res = client 841 + .get(format!( 842 + "{}/xrpc/com.atproto.repo.getRecord", 843 + base_url().await 844 + )) 845 + .query(&[ 846 + ("repo", did.as_str()), 847 + ("collection", "app.bsky.feed.post"), 848 + ("rkey", rkey.as_str()), 849 + ]) 850 + .send() 851 + .await 852 + .expect("Failed to fetch record"); 853 + assert_eq!(get_res.status(), StatusCode::NOT_FOUND); 854 + } 855 + 856 + #[tokio::test] 857 + async fn test_apply_writes_distinct_rkeys_not_conflated() { 858 + let client = client(); 859 + let (token, did) = create_account_and_login(&client).await; 860 + let now = Utc::now().to_rfc3339(); 861 + let stamp = Utc::now().timestamp_millis(); 862 + let rkey_a = format!("distinct_a_{}", stamp); 863 + let rkey_b = format!("distinct_b_{}", stamp); 864 + 865 + let create_a = json!({ 866 + "repo": did, 867 + "collection": "app.bsky.feed.post", 868 + "rkey": rkey_a, 869 + "record": { 870 + "$type": "app.bsky.feed.post", 871 + "text": "a0", 872 + "createdAt": now 873 + } 874 + }); 875 + let res = client 876 + .post(format!( 877 + "{}/xrpc/com.atproto.repo.putRecord", 878 + base_url().await 879 + )) 880 + .bearer_auth(&token) 881 + .json(&create_a) 882 + .send() 883 + .await 884 + .expect("Failed to seed rkey_a"); 885 + assert_eq!(res.status(), StatusCode::OK); 886 + 887 + let payload = json!({ 888 + "repo": did, 889 + "writes": [ 890 + { 891 + "$type": "com.atproto.repo.applyWrites#delete", 892 + "collection": "app.bsky.feed.post", 893 + "rkey": rkey_a 894 + }, 895 + { 896 + "$type": "com.atproto.repo.applyWrites#create", 897 + "collection": "app.bsky.feed.post", 898 + "rkey": rkey_a, 899 + "value": { 900 + "$type": "app.bsky.feed.post", 901 + "text": "a1", 902 + "createdAt": now 903 + } 904 + }, 905 + { 906 + "$type": "com.atproto.repo.applyWrites#create", 907 + "collection": "app.bsky.feed.post", 908 + "rkey": rkey_b, 909 + "value": { 910 + "$type": "app.bsky.feed.post", 911 + "text": "b", 912 + "createdAt": now 913 + } 914 + } 915 + ] 916 + }); 917 + let res = client 918 + .post(format!( 919 + "{}/xrpc/com.atproto.repo.applyWrites", 920 + base_url().await 921 + )) 922 + .bearer_auth(&token) 923 + .json(&payload) 924 + .send() 925 + .await 926 + .expect("Failed to send applyWrites"); 927 + assert_eq!(res.status(), StatusCode::OK); 928 + 929 + let fetch = |rkey: String| { 930 + let did = did.clone(); 931 + let client = client.clone(); 932 + async move { 933 + let res = client 934 + .get(format!( 935 + "{}/xrpc/com.atproto.repo.getRecord", 936 + base_url().await 937 + )) 938 + .query(&[ 939 + ("repo", did.as_str()), 940 + ("collection", "app.bsky.feed.post"), 941 + ("rkey", rkey.as_str()), 942 + ]) 943 + .send() 944 + .await 945 + .expect("Failed to fetch record"); 946 + assert_eq!(res.status(), StatusCode::OK); 947 + let body: Value = res.json().await.expect("Response was not valid JSON"); 948 + body["value"]["text"].as_str().unwrap().to_string() 949 + } 950 + }; 951 + 952 + assert_eq!(fetch(rkey_a.clone()).await, "a1"); 953 + assert_eq!(fetch(rkey_b.clone()).await, "b"); 954 + } 955 + 956 + #[tokio::test] 957 + async fn test_apply_writes_follow_create_then_delete_no_orphan_backlink() { 958 + let client = client(); 959 + let (token, did) = create_account_and_login(&client).await; 960 + let now = Utc::now().to_rfc3339(); 961 + let rkey = format!("follow_orphan_{}", Utc::now().timestamp_millis()); 962 + let target = "did:plc:orphantargettestabcdefgh"; 963 + 964 + let payload = json!({ 965 + "repo": did, 966 + "writes": [ 967 + { 968 + "$type": "com.atproto.repo.applyWrites#create", 969 + "collection": "app.bsky.graph.follow", 970 + "rkey": rkey, 971 + "value": { 972 + "$type": "app.bsky.graph.follow", 973 + "subject": target, 974 + "createdAt": now 975 + } 976 + }, 977 + { 978 + "$type": "com.atproto.repo.applyWrites#delete", 979 + "collection": "app.bsky.graph.follow", 980 + "rkey": rkey 981 + } 982 + ] 983 + }); 984 + let res = client 985 + .post(format!( 986 + "{}/xrpc/com.atproto.repo.applyWrites", 987 + base_url().await 988 + )) 989 + .bearer_auth(&token) 990 + .json(&payload) 991 + .send() 992 + .await 993 + .expect("Failed to send applyWrites"); 994 + assert_eq!(res.status(), StatusCode::OK); 995 + 996 + let repo_id = repo_id_for_did(&did).await; 997 + let lingering = follow_uris_pointing_to(repo_id, target).await; 998 + assert!( 999 + lingering.is_empty(), 1000 + "follow record was deleted in same batch but backlink survived: {:?}", 1001 + lingering 1002 + ); 1003 + } 1004 + 1005 + #[tokio::test] 1006 + async fn test_apply_writes_follow_update_update_final_link_wins() { 1007 + let client = client(); 1008 + let (token, did) = create_account_and_login(&client).await; 1009 + let now = Utc::now().to_rfc3339(); 1010 + let rkey = format!("follow_doubleupdate_{}", Utc::now().timestamp_millis()); 1011 + let target_initial = "did:plc:initialtarget1234567890"; 1012 + let target_intermediate = "did:plc:intermediatetarget12345"; 1013 + let target_final = "did:plc:finaltarget1234567890ab"; 1014 + 1015 + let create_payload = json!({ 1016 + "repo": did, 1017 + "collection": "app.bsky.graph.follow", 1018 + "rkey": rkey, 1019 + "record": { 1020 + "$type": "app.bsky.graph.follow", 1021 + "subject": target_initial, 1022 + "createdAt": now 1023 + } 1024 + }); 1025 + let res = client 1026 + .post(format!( 1027 + "{}/xrpc/com.atproto.repo.putRecord", 1028 + base_url().await 1029 + )) 1030 + .bearer_auth(&token) 1031 + .json(&create_payload) 1032 + .send() 1033 + .await 1034 + .expect("Failed to create initial follow"); 1035 + assert_eq!(res.status(), StatusCode::OK); 1036 + 1037 + let payload = json!({ 1038 + "repo": did, 1039 + "writes": [ 1040 + { 1041 + "$type": "com.atproto.repo.applyWrites#update", 1042 + "collection": "app.bsky.graph.follow", 1043 + "rkey": rkey, 1044 + "value": { 1045 + "$type": "app.bsky.graph.follow", 1046 + "subject": target_intermediate, 1047 + "createdAt": now 1048 + } 1049 + }, 1050 + { 1051 + "$type": "com.atproto.repo.applyWrites#update", 1052 + "collection": "app.bsky.graph.follow", 1053 + "rkey": rkey, 1054 + "value": { 1055 + "$type": "app.bsky.graph.follow", 1056 + "subject": target_final, 1057 + "createdAt": now 1058 + } 1059 + } 1060 + ] 1061 + }); 1062 + let res = client 1063 + .post(format!( 1064 + "{}/xrpc/com.atproto.repo.applyWrites", 1065 + base_url().await 1066 + )) 1067 + .bearer_auth(&token) 1068 + .json(&payload) 1069 + .send() 1070 + .await 1071 + .expect("Failed to send applyWrites"); 1072 + assert_eq!(res.status(), StatusCode::OK); 1073 + 1074 + let repo_id = repo_id_for_did(&did).await; 1075 + let expected_uri = format!("at://{}/app.bsky.graph.follow/{}", did, rkey); 1076 + 1077 + let final_links = follow_uris_pointing_to(repo_id, target_final).await; 1078 + assert_eq!( 1079 + final_links, 1080 + vec![expected_uri.clone()], 1081 + "backlink must point to final subject" 1082 + ); 1083 + 1084 + let intermediate_links = follow_uris_pointing_to(repo_id, target_intermediate).await; 1085 + assert!( 1086 + intermediate_links.is_empty(), 1087 + "intermediate subject must not retain a backlink: {:?}", 1088 + intermediate_links 1089 + ); 1090 + 1091 + let initial_links = follow_uris_pointing_to(repo_id, target_initial).await; 1092 + assert!( 1093 + initial_links.is_empty(), 1094 + "initial subject backlink must be cleared: {:?}", 1095 + initial_links 1096 + ); 1097 + } 1098 + 1099 + #[tokio::test] 1100 + async fn test_apply_writes_create_create_same_rkey_rejected() { 1101 + let client = client(); 1102 + let (token, did) = create_account_and_login(&client).await; 1103 + let now = Utc::now().to_rfc3339(); 1104 + let rkey = format!("dup_create_{}", Utc::now().timestamp_millis()); 1105 + 1106 + let payload = json!({ 1107 + "repo": did, 1108 + "writes": [ 1109 + { 1110 + "$type": "com.atproto.repo.applyWrites#create", 1111 + "collection": "app.bsky.feed.post", 1112 + "rkey": rkey, 1113 + "value": { 1114 + "$type": "app.bsky.feed.post", 1115 + "text": "first", 1116 + "createdAt": now 1117 + } 1118 + }, 1119 + { 1120 + "$type": "com.atproto.repo.applyWrites#create", 1121 + "collection": "app.bsky.feed.post", 1122 + "rkey": rkey, 1123 + "value": { 1124 + "$type": "app.bsky.feed.post", 1125 + "text": "second", 1126 + "createdAt": now 1127 + } 1128 + } 1129 + ] 1130 + }); 1131 + let res = client 1132 + .post(format!( 1133 + "{}/xrpc/com.atproto.repo.applyWrites", 1134 + base_url().await 1135 + )) 1136 + .bearer_auth(&token) 1137 + .json(&payload) 1138 + .send() 1139 + .await 1140 + .expect("Failed to send applyWrites"); 1141 + assert_eq!( 1142 + res.status(), 1143 + StatusCode::BAD_REQUEST, 1144 + "duplicate create on same rkey within a batch must be rejected" 1145 + ); 1146 + 1147 + let get_res = client 1148 + .get(format!( 1149 + "{}/xrpc/com.atproto.repo.getRecord", 1150 + base_url().await 1151 + )) 1152 + .query(&[ 1153 + ("repo", did.as_str()), 1154 + ("collection", "app.bsky.feed.post"), 1155 + ("rkey", rkey.as_str()), 1156 + ]) 1157 + .send() 1158 + .await 1159 + .expect("Failed to fetch record"); 1160 + assert_eq!( 1161 + get_res.status(), 1162 + StatusCode::NOT_FOUND, 1163 + "rejected batch must not have produced a record" 1164 + ); 1165 + } 1166 + 1167 + #[tokio::test] 1168 + async fn test_apply_writes_update_then_create_same_rkey_rejected() { 1169 + let client = client(); 1170 + let (token, did) = create_account_and_login(&client).await; 1171 + let now = Utc::now().to_rfc3339(); 1172 + let rkey = format!("update_create_{}", Utc::now().timestamp_millis()); 1173 + 1174 + let create_payload = json!({ 1175 + "repo": did, 1176 + "collection": "app.bsky.feed.post", 1177 + "rkey": rkey, 1178 + "record": { 1179 + "$type": "app.bsky.feed.post", 1180 + "text": "v0", 1181 + "createdAt": now 1182 + } 1183 + }); 1184 + let res = client 1185 + .post(format!( 1186 + "{}/xrpc/com.atproto.repo.putRecord", 1187 + base_url().await 1188 + )) 1189 + .bearer_auth(&token) 1190 + .json(&create_payload) 1191 + .send() 1192 + .await 1193 + .expect("Failed to seed record"); 1194 + assert_eq!(res.status(), StatusCode::OK); 1195 + 1196 + let payload = json!({ 1197 + "repo": did, 1198 + "writes": [ 1199 + { 1200 + "$type": "com.atproto.repo.applyWrites#update", 1201 + "collection": "app.bsky.feed.post", 1202 + "rkey": rkey, 1203 + "value": { 1204 + "$type": "app.bsky.feed.post", 1205 + "text": "v1", 1206 + "createdAt": now 1207 + } 1208 + }, 1209 + { 1210 + "$type": "com.atproto.repo.applyWrites#create", 1211 + "collection": "app.bsky.feed.post", 1212 + "rkey": rkey, 1213 + "value": { 1214 + "$type": "app.bsky.feed.post", 1215 + "text": "v2", 1216 + "createdAt": now 1217 + } 1218 + } 1219 + ] 1220 + }); 1221 + let res = client 1222 + .post(format!( 1223 + "{}/xrpc/com.atproto.repo.applyWrites", 1224 + base_url().await 1225 + )) 1226 + .bearer_auth(&token) 1227 + .json(&payload) 1228 + .send() 1229 + .await 1230 + .expect("Failed to send applyWrites"); 1231 + assert_eq!( 1232 + res.status(), 1233 + StatusCode::BAD_REQUEST, 1234 + "create over a record live in this batch must be rejected" 1235 + ); 1236 + 1237 + let get_res = client 1238 + .get(format!( 1239 + "{}/xrpc/com.atproto.repo.getRecord", 1240 + base_url().await 1241 + )) 1242 + .query(&[ 1243 + ("repo", did.as_str()), 1244 + ("collection", "app.bsky.feed.post"), 1245 + ("rkey", rkey.as_str()), 1246 + ]) 1247 + .send() 1248 + .await 1249 + .expect("Failed to fetch record"); 1250 + assert_eq!(get_res.status(), StatusCode::OK); 1251 + let body: Value = get_res.json().await.expect("Response was not valid JSON"); 1252 + assert_eq!( 1253 + body["value"]["text"], 1254 + json!("v0"), 1255 + "rejected batch must leave the seed record untouched" 1256 + ); 1257 + } 1258 + 1259 + #[tokio::test] 1260 + async fn test_create_record_rejects_existing_rkey() { 1261 + let client = client(); 1262 + let (token, did) = create_account_and_login(&client).await; 1263 + let now = Utc::now().to_rfc3339(); 1264 + let rkey = format!("existing_{}", Utc::now().timestamp_millis()); 1265 + 1266 + let seed = json!({ 1267 + "repo": did, 1268 + "collection": "app.bsky.feed.post", 1269 + "rkey": rkey, 1270 + "record": { 1271 + "$type": "app.bsky.feed.post", 1272 + "text": "first", 1273 + "createdAt": now 1274 + } 1275 + }); 1276 + let res = client 1277 + .post(format!( 1278 + "{}/xrpc/com.atproto.repo.createRecord", 1279 + base_url().await 1280 + )) 1281 + .bearer_auth(&token) 1282 + .json(&seed) 1283 + .send() 1284 + .await 1285 + .expect("Failed initial create"); 1286 + assert_eq!(res.status(), StatusCode::OK); 1287 + 1288 + let dup = json!({ 1289 + "repo": did, 1290 + "collection": "app.bsky.feed.post", 1291 + "rkey": rkey, 1292 + "record": { 1293 + "$type": "app.bsky.feed.post", 1294 + "text": "second", 1295 + "createdAt": now 1296 + } 1297 + }); 1298 + let res = client 1299 + .post(format!( 1300 + "{}/xrpc/com.atproto.repo.createRecord", 1301 + base_url().await 1302 + )) 1303 + .bearer_auth(&token) 1304 + .json(&dup) 1305 + .send() 1306 + .await 1307 + .expect("Failed duplicate create"); 1308 + assert_eq!( 1309 + res.status(), 1310 + StatusCode::BAD_REQUEST, 1311 + "createRecord on an existing rkey must be rejected" 1312 + ); 1313 + 1314 + let get_res = client 1315 + .get(format!( 1316 + "{}/xrpc/com.atproto.repo.getRecord", 1317 + base_url().await 1318 + )) 1319 + .query(&[ 1320 + ("repo", did.as_str()), 1321 + ("collection", "app.bsky.feed.post"), 1322 + ("rkey", rkey.as_str()), 1323 + ]) 1324 + .send() 1325 + .await 1326 + .expect("Failed to fetch record"); 1327 + assert_eq!(get_res.status(), StatusCode::OK); 1328 + let body: Value = get_res.json().await.expect("Response was not valid JSON"); 1329 + assert_eq!( 1330 + body["value"]["text"], 1331 + json!("first"), 1332 + "duplicate create must not have overwritten the original" 1333 + ); 1334 + }