Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

test(tranquil-pds): websocket firehose end-to-end mst verification

Lewis: May this revision serve well! <lu5a@proton.me>

authored by did:plc:mb5to35neicxt4gemstoro… and committed by

Tangled ccc99161 bc8fd66a

+475
+475
crates/tranquil-pds/tests/mst_firehose_e2e.rs
··· 1 + mod common; 2 + mod firehose; 3 + 4 + use std::collections::BTreeMap; 5 + use std::io::Cursor; 6 + use std::str::FromStr; 7 + use std::sync::Arc; 8 + use std::time::Duration; 9 + 10 + use bytes::Bytes; 11 + use cid::Cid; 12 + use common::*; 13 + use firehose::{FirehoseConsumer, ParsedCommitFrame}; 14 + use iroh_car::CarReader; 15 + use jacquard_common::smol_str::SmolStr; 16 + use jacquard_repo::commit::Commit; 17 + use jacquard_repo::mst::{Mst, VerifiedWriteOp}; 18 + use jacquard_repo::storage::{BlockStore, MemoryBlockStore}; 19 + use reqwest::StatusCode; 20 + use serde_json::{Value, json}; 21 + use tranquil_scopes::RepoAction; 22 + 23 + async fn car_to_blocks(car_bytes: &[u8]) -> BTreeMap<Cid, Bytes> { 24 + let mut reader = CarReader::new(Cursor::new(car_bytes)) 25 + .await 26 + .expect("parse CAR header"); 27 + let mut blocks = BTreeMap::new(); 28 + while let Ok(Some((cid, data))) = reader.next_block().await { 29 + blocks.insert(cid, Bytes::from(data)); 30 + } 31 + blocks 32 + } 33 + 34 + fn op_to_verified(op: &firehose::ParsedRepoOp) -> Result<VerifiedWriteOp, String> { 35 + let key = SmolStr::new(&op.path); 36 + match op.action { 37 + RepoAction::Create => { 38 + let cid = op.cid.ok_or("create op missing cid")?; 39 + Ok(VerifiedWriteOp::Create { key, cid }) 40 + } 41 + RepoAction::Update => { 42 + let cid = op.cid.ok_or("update op missing cid")?; 43 + let prev = op.prev.ok_or("update op missing prev")?; 44 + Ok(VerifiedWriteOp::Update { key, cid, prev }) 45 + } 46 + RepoAction::Delete => { 47 + let prev = op.prev.ok_or("delete op missing prev")?; 48 + Ok(VerifiedWriteOp::Delete { key, prev }) 49 + } 50 + } 51 + } 52 + 53 + async fn verify_frame_forward(frame: &ParsedCommitFrame) -> Result<(), String> { 54 + let prev_data = frame 55 + .prev_data 56 + .ok_or_else(|| "frame missing prev_data (v1.1 required)".to_string())?; 57 + 58 + let blocks = car_to_blocks(&frame.blocks).await; 59 + let storage = Arc::new(MemoryBlockStore::new_from_blocks(blocks)); 60 + 61 + let commit_bytes = storage 62 + .get(&frame.commit) 63 + .await 64 + .map_err(|e| format!("get commit: {e:?}"))? 65 + .ok_or_else(|| format!("CAR missing commit {}", frame.commit))?; 66 + let commit = Commit::from_cbor(&commit_bytes).map_err(|e| format!("parse commit: {e:?}"))?; 67 + let expected = *commit.data(); 68 + 69 + let mut mst = Mst::load(storage, prev_data, None); 70 + for op in &frame.ops { 71 + let path = &op.path; 72 + match op.action { 73 + RepoAction::Create | RepoAction::Update => { 74 + let cid = op.cid.ok_or_else(|| format!("{path}: op missing cid"))?; 75 + mst = mst 76 + .add(path, cid) 77 + .await 78 + .map_err(|e| format!("forward {path}: {e:?}"))?; 79 + } 80 + RepoAction::Delete => { 81 + mst = mst 82 + .delete(path) 83 + .await 84 + .map_err(|e| format!("forward delete {path}: {e:?}"))?; 85 + } 86 + } 87 + } 88 + let computed = mst.persist().await.map_err(|e| format!("persist: {e:?}"))?; 89 + if computed != expected { 90 + return Err(format!( 91 + "root mismatch expected={expected} computed={computed}" 92 + )); 93 + } 94 + Ok(()) 95 + } 96 + 97 + async fn verify_frame_inverse(frame: &ParsedCommitFrame) -> Result<(), String> { 98 + let prev_data = frame 99 + .prev_data 100 + .ok_or_else(|| "frame missing prev_data (v1.1 required)".to_string())?; 101 + 102 + let blocks = car_to_blocks(&frame.blocks).await; 103 + let storage = Arc::new(MemoryBlockStore::new_from_blocks(blocks)); 104 + 105 + let commit_bytes = storage 106 + .get(&frame.commit) 107 + .await 108 + .map_err(|e| format!("get commit: {e:?}"))? 109 + .ok_or_else(|| format!("CAR missing commit {}", frame.commit))?; 110 + let commit = Commit::from_cbor(&commit_bytes).map_err(|e| format!("parse commit: {e:?}"))?; 111 + let new_data = *commit.data(); 112 + 113 + let mut mst = Mst::load(storage, new_data, None); 114 + for op in &frame.ops { 115 + let verified = op_to_verified(op)?; 116 + let inverted = mst 117 + .invert_op(verified.clone()) 118 + .await 119 + .map_err(|e| format!("invert {verified:?}: {e:?}"))?; 120 + if !inverted { 121 + return Err(format!("op not invertible: {verified:?}")); 122 + } 123 + } 124 + let computed_prev = mst 125 + .get_pointer() 126 + .await 127 + .map_err(|e| format!("get_pointer: {e:?}"))?; 128 + if computed_prev != prev_data { 129 + return Err(format!( 130 + "inverse root mismatch expected={prev_data} computed={computed_prev}" 131 + )); 132 + } 133 + Ok(()) 134 + } 135 + 136 + async fn create_record(client: &reqwest::Client, token: &str, did: &str, rkey: &str, text: &str) { 137 + let now = chrono::Utc::now().to_rfc3339(); 138 + let res = client 139 + .post(format!( 140 + "{}/xrpc/com.atproto.repo.createRecord", 141 + base_url().await 142 + )) 143 + .bearer_auth(token) 144 + .json(&json!({ 145 + "repo": did, 146 + "collection": "app.bsky.feed.post", 147 + "rkey": rkey, 148 + "record": { 149 + "$type": "app.bsky.feed.post", 150 + "text": text, 151 + "createdAt": now, 152 + } 153 + })) 154 + .send() 155 + .await 156 + .expect("createRecord"); 157 + assert_eq!(res.status(), StatusCode::OK); 158 + } 159 + 160 + async fn put_record(client: &reqwest::Client, token: &str, did: &str, rkey: &str, text: &str) { 161 + let now = chrono::Utc::now().to_rfc3339(); 162 + let res = client 163 + .post(format!( 164 + "{}/xrpc/com.atproto.repo.putRecord", 165 + base_url().await 166 + )) 167 + .bearer_auth(token) 168 + .json(&json!({ 169 + "repo": did, 170 + "collection": "app.bsky.feed.post", 171 + "rkey": rkey, 172 + "record": { 173 + "$type": "app.bsky.feed.post", 174 + "text": text, 175 + "createdAt": now, 176 + } 177 + })) 178 + .send() 179 + .await 180 + .expect("putRecord"); 181 + assert_eq!(res.status(), StatusCode::OK); 182 + } 183 + 184 + async fn delete_record(client: &reqwest::Client, token: &str, did: &str, rkey: &str) { 185 + let res = client 186 + .post(format!( 187 + "{}/xrpc/com.atproto.repo.deleteRecord", 188 + base_url().await 189 + )) 190 + .bearer_auth(token) 191 + .json(&json!({ 192 + "repo": did, 193 + "collection": "app.bsky.feed.post", 194 + "rkey": rkey, 195 + })) 196 + .send() 197 + .await 198 + .expect("deleteRecord"); 199 + assert_eq!(res.status(), StatusCode::OK); 200 + } 201 + 202 + async fn apply_writes_batch(client: &reqwest::Client, token: &str, did: &str, writes: Vec<Value>) { 203 + let res = client 204 + .post(format!( 205 + "{}/xrpc/com.atproto.repo.applyWrites", 206 + base_url().await 207 + )) 208 + .bearer_auth(token) 209 + .json(&json!({ "repo": did, "writes": writes })) 210 + .send() 211 + .await 212 + .expect("applyWrites"); 213 + assert_eq!(res.status(), StatusCode::OK); 214 + } 215 + 216 + fn rkey_for(i: usize) -> String { 217 + format!("3ke2e{:08}", i) 218 + } 219 + 220 + #[tokio::test] 221 + async fn websocket_firehose_frames_pass_inductive_forward_and_inverse() { 222 + let client = client(); 223 + let (token, did) = create_account_and_login(&client).await; 224 + 225 + let repos = get_test_repos().await; 226 + let cursor = repos.repo.get_max_seq().await.unwrap().as_i64(); 227 + let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await; 228 + tokio::time::sleep(Duration::from_millis(100)).await; 229 + 230 + let now = chrono::Utc::now().to_rfc3339(); 231 + let seed: Vec<Value> = (0..120) 232 + .map(|i| { 233 + json!({ 234 + "$type": "com.atproto.repo.applyWrites#create", 235 + "collection": "app.bsky.feed.post", 236 + "rkey": rkey_for(i), 237 + "value": { 238 + "$type": "app.bsky.feed.post", 239 + "text": format!("e2e {i}"), 240 + "createdAt": now, 241 + } 242 + }) 243 + }) 244 + .collect(); 245 + for chunk in seed.chunks(40) { 246 + apply_writes_batch(&client, &token, &did, chunk.to_vec()).await; 247 + } 248 + for i in (0..120).step_by(6) { 249 + put_record(&client, &token, &did, &rkey_for(i), &format!("upd {i}")).await; 250 + } 251 + for i in (2..120).step_by(11) { 252 + delete_record(&client, &token, &did, &rkey_for(i)).await; 253 + } 254 + create_record(&client, &token, &did, "3ke2efinal001", "final").await; 255 + 256 + let target_commits = 3 + 20 + 11 + 1; 257 + let frames = consumer 258 + .wait_for_commits(&did, target_commits, Duration::from_secs(90)) 259 + .await; 260 + assert!( 261 + frames.len() >= target_commits, 262 + "expected {} commit frames, got {}", 263 + target_commits, 264 + frames.len() 265 + ); 266 + 267 + let mut forward_failures = Vec::new(); 268 + let mut inverse_failures = Vec::new(); 269 + for frame in &frames { 270 + if frame.prev_data.is_none() { 271 + continue; 272 + } 273 + if frame.ops.is_empty() { 274 + continue; 275 + } 276 + if let Err(msg) = verify_frame_forward(frame).await { 277 + forward_failures.push(format!("seq={}: {msg}", frame.seq)); 278 + } 279 + if let Err(msg) = verify_frame_inverse(frame).await { 280 + inverse_failures.push(format!("seq={}: {msg}", frame.seq)); 281 + } 282 + } 283 + assert!( 284 + forward_failures.is_empty(), 285 + "forward verification failures:\n - {}", 286 + forward_failures.join("\n - ") 287 + ); 288 + assert!( 289 + inverse_failures.is_empty(), 290 + "inverse verification failures:\n - {}", 291 + inverse_failures.join("\n - ") 292 + ); 293 + } 294 + 295 + #[tokio::test] 296 + async fn websocket_firehose_car_root_matches_commit_cid() { 297 + let client = client(); 298 + let (token, did) = create_account_and_login(&client).await; 299 + 300 + let repos = get_test_repos().await; 301 + let cursor = repos.repo.get_max_seq().await.unwrap().as_i64(); 302 + let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await; 303 + tokio::time::sleep(Duration::from_millis(100)).await; 304 + 305 + for i in 0..4 { 306 + create_record(&client, &token, &did, &rkey_for(i), "ck").await; 307 + } 308 + 309 + let frames = consumer 310 + .wait_for_commits(&did, 4, Duration::from_secs(10)) 311 + .await; 312 + 313 + for frame in &frames { 314 + let mut reader = CarReader::new(Cursor::new(&frame.blocks)) 315 + .await 316 + .expect("CAR header"); 317 + let roots = reader.header().roots(); 318 + assert_eq!(roots.len(), 1, "CAR must have exactly one root"); 319 + assert_eq!( 320 + roots[0], frame.commit, 321 + "CAR root must equal frame commit CID" 322 + ); 323 + let mut found = false; 324 + while let Ok(Some((cid, _))) = reader.next_block().await { 325 + if cid == frame.commit { 326 + found = true; 327 + } 328 + } 329 + assert!(found, "CAR body must contain commit block"); 330 + } 331 + } 332 + 333 + #[tokio::test] 334 + async fn websocket_firehose_resumption_from_cursor_yields_valid_frames() { 335 + let client = client(); 336 + let (token, did) = create_account_and_login(&client).await; 337 + let repos = get_test_repos().await; 338 + 339 + for i in 0..5 { 340 + create_record(&client, &token, &did, &rkey_for(i), "pre").await; 341 + } 342 + 343 + let resume_cursor = repos.repo.get_max_seq().await.unwrap().as_i64(); 344 + 345 + for i in 5..12 { 346 + create_record(&client, &token, &did, &rkey_for(i), "post").await; 347 + } 348 + 349 + let consumer = FirehoseConsumer::connect_with_cursor(app_port(), resume_cursor).await; 350 + let frames = consumer 351 + .wait_for_commits(&did, 7, Duration::from_secs(20)) 352 + .await; 353 + assert!( 354 + frames.len() >= 7, 355 + "expected 7+ frames after cursor resume, got {}", 356 + frames.len() 357 + ); 358 + 359 + for frame in &frames { 360 + if frame.prev_data.is_none() || frame.ops.is_empty() { 361 + continue; 362 + } 363 + verify_frame_forward(frame) 364 + .await 365 + .unwrap_or_else(|e| panic!("resumed frame seq={} invalid: {e}", frame.seq)); 366 + } 367 + } 368 + 369 + #[tokio::test] 370 + async fn websocket_firehose_ops_include_prev_field_for_update_delete() { 371 + let client = client(); 372 + let (token, did) = create_account_and_login(&client).await; 373 + let repos = get_test_repos().await; 374 + let cursor = repos.repo.get_max_seq().await.unwrap().as_i64(); 375 + let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await; 376 + tokio::time::sleep(Duration::from_millis(100)).await; 377 + 378 + create_record(&client, &token, &did, "3ke2eprev01", "v1").await; 379 + put_record(&client, &token, &did, "3ke2eprev01", "v2").await; 380 + delete_record(&client, &token, &did, "3ke2eprev01").await; 381 + 382 + let frames = consumer 383 + .wait_for_commits(&did, 3, Duration::from_secs(10)) 384 + .await; 385 + assert!(frames.len() >= 3); 386 + 387 + for frame in &frames { 388 + for op in &frame.ops { 389 + match op.action { 390 + RepoAction::Create => { 391 + assert!(op.cid.is_some(), "create must have cid"); 392 + assert!(op.prev.is_none(), "create must not have prev"); 393 + } 394 + RepoAction::Update => { 395 + assert!(op.cid.is_some(), "update must have cid"); 396 + assert!( 397 + op.prev.is_some(), 398 + "v1.1 update must carry prev CID (seq={})", 399 + frame.seq 400 + ); 401 + } 402 + RepoAction::Delete => { 403 + assert!(op.cid.is_none(), "delete must have null cid"); 404 + assert!( 405 + op.prev.is_some(), 406 + "v1.1 delete must carry prev CID (seq={})", 407 + frame.seq 408 + ); 409 + } 410 + } 411 + } 412 + } 413 + } 414 + 415 + #[tokio::test] 416 + async fn websocket_firehose_rebuild_new_mst_from_car_matches_commit_data() { 417 + let client = client(); 418 + let (token, did) = create_account_and_login(&client).await; 419 + let repos = get_test_repos().await; 420 + let cursor = repos.repo.get_max_seq().await.unwrap().as_i64(); 421 + let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await; 422 + tokio::time::sleep(Duration::from_millis(100)).await; 423 + 424 + let now = chrono::Utc::now().to_rfc3339(); 425 + let writes: Vec<Value> = (0..30) 426 + .map(|i| { 427 + json!({ 428 + "$type": "com.atproto.repo.applyWrites#create", 429 + "collection": "app.bsky.feed.post", 430 + "rkey": rkey_for(i), 431 + "value": { 432 + "$type": "app.bsky.feed.post", 433 + "text": format!("rb {i}"), 434 + "createdAt": now, 435 + } 436 + }) 437 + }) 438 + .collect(); 439 + apply_writes_batch(&client, &token, &did, writes).await; 440 + 441 + let frames = consumer 442 + .wait_for_commits(&did, 1, Duration::from_secs(10)) 443 + .await; 444 + let last = frames.last().expect("frame"); 445 + 446 + let blocks = car_to_blocks(&last.blocks).await; 447 + let storage = Arc::new(MemoryBlockStore::new_from_blocks(blocks)); 448 + let commit_bytes = storage 449 + .get(&last.commit) 450 + .await 451 + .unwrap() 452 + .expect("commit block"); 453 + let commit = Commit::from_cbor(&commit_bytes).unwrap(); 454 + 455 + let new_root_cid = *commit.data(); 456 + let mst = Mst::load(storage, new_root_cid, None); 457 + let rehydrated_cid = mst.get_pointer().await.expect("rebuild mst"); 458 + assert_eq!( 459 + rehydrated_cid, new_root_cid, 460 + "MST loaded from CAR must yield same root as commit.data()" 461 + ); 462 + 463 + for op in &last.ops { 464 + if op.action == RepoAction::Create { 465 + let expected_cid = op.cid.unwrap(); 466 + let got = mst 467 + .get(&op.path) 468 + .await 469 + .expect("mst.get") 470 + .unwrap_or_else(|| panic!("key {} missing from rebuilt tree", op.path)); 471 + assert_eq!(got, expected_cid, "record CID mismatch for {}", op.path); 472 + let _ = Cid::from_str(&expected_cid.to_string()).unwrap(); 473 + } 474 + } 475 + }