Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(tranquil-pds): firehose car carries inductive proof

Lewis: May this revision serve well! <lu5a@proton.me>

authored by did:plc:mb5to35neicxt4gemstoro… and committed by

Tangled 180de299 0455dc20

+799 -41
+22 -22
Cargo.lock
··· 7455 7455 7456 7456 [[package]] 7457 7457 name = "tranquil-api" 7458 - version = "0.5.5" 7458 + version = "0.5.6" 7459 7459 dependencies = [ 7460 7460 "anyhow", 7461 7461 "axum", ··· 7506 7506 7507 7507 [[package]] 7508 7508 name = "tranquil-auth" 7509 - version = "0.5.5" 7509 + version = "0.5.6" 7510 7510 dependencies = [ 7511 7511 "anyhow", 7512 7512 "base32", ··· 7529 7529 7530 7530 [[package]] 7531 7531 name = "tranquil-cache" 7532 - version = "0.5.5" 7532 + version = "0.5.6" 7533 7533 dependencies = [ 7534 7534 "async-trait", 7535 7535 "base64 0.22.1", ··· 7543 7543 7544 7544 [[package]] 7545 7545 name = "tranquil-comms" 7546 - version = "0.5.5" 7546 + version = "0.5.6" 7547 7547 dependencies = [ 7548 7548 "async-trait", 7549 7549 "base64 0.22.1", ··· 7561 7561 7562 7562 [[package]] 7563 7563 name = "tranquil-config" 7564 - version = "0.5.5" 7564 + version = "0.5.6" 7565 7565 dependencies = [ 7566 7566 "confique", 7567 7567 "serde", ··· 7569 7569 7570 7570 [[package]] 7571 7571 name = "tranquil-crypto" 7572 - version = "0.5.5" 7572 + version = "0.5.6" 7573 7573 dependencies = [ 7574 7574 "aes-gcm", 7575 7575 "base64 0.22.1", ··· 7585 7585 7586 7586 [[package]] 7587 7587 name = "tranquil-db" 7588 - version = "0.5.5" 7588 + version = "0.5.6" 7589 7589 dependencies = [ 7590 7590 "async-trait", 7591 7591 "chrono", ··· 7602 7602 7603 7603 [[package]] 7604 7604 name = "tranquil-db-traits" 7605 - version = "0.5.5" 7605 + version = "0.5.6" 7606 7606 dependencies = [ 7607 7607 "async-trait", 7608 7608 "base64 0.22.1", ··· 7618 7618 7619 7619 [[package]] 7620 7620 name = "tranquil-infra" 7621 - version = "0.5.5" 7621 + version = "0.5.6" 7622 7622 dependencies = [ 7623 7623 "async-trait", 7624 7624 "bytes", ··· 7629 7629 7630 7630 [[package]] 7631 7631 name = "tranquil-lexicon" 7632 - version = "0.5.5" 7632 + version = "0.5.6" 7633 7633 dependencies = [ 7634 7634 "chrono", 7635 7635 "futures", ··· 7648 7648 7649 7649 [[package]] 7650 7650 name = "tranquil-oauth" 7651 - version = "0.5.5" 7651 + version = "0.5.6" 7652 7652 dependencies = [ 7653 7653 "anyhow", 7654 7654 "axum", ··· 7671 7671 7672 7672 [[package]] 7673 7673 name = "tranquil-oauth-server" 7674 - version = "0.5.5" 7674 + version = "0.5.6" 7675 7675 dependencies = [ 7676 7676 "axum", 7677 7677 "base64 0.22.1", ··· 7704 7704 7705 7705 [[package]] 7706 7706 name = "tranquil-pds" 7707 - version = "0.5.5" 7707 + version = "0.5.6" 7708 7708 dependencies = [ 7709 7709 "aes-gcm", 7710 7710 "anyhow", ··· 7796 7796 7797 7797 [[package]] 7798 7798 name = "tranquil-repo" 7799 - version = "0.5.5" 7799 + version = "0.5.6" 7800 7800 dependencies = [ 7801 7801 "bytes", 7802 7802 "cid", ··· 7808 7808 7809 7809 [[package]] 7810 7810 name = "tranquil-ripple" 7811 - version = "0.5.5" 7811 + version = "0.5.6" 7812 7812 dependencies = [ 7813 7813 "async-trait", 7814 7814 "backon", ··· 7833 7833 7834 7834 [[package]] 7835 7835 name = "tranquil-scopes" 7836 - version = "0.5.5" 7836 + version = "0.5.6" 7837 7837 dependencies = [ 7838 7838 "axum", 7839 7839 "futures", ··· 7849 7849 7850 7850 [[package]] 7851 7851 name = "tranquil-server" 7852 - version = "0.5.5" 7852 + version = "0.5.6" 7853 7853 dependencies = [ 7854 7854 "axum", 7855 7855 "clap", ··· 7870 7870 7871 7871 [[package]] 7872 7872 name = "tranquil-signal" 7873 - version = "0.5.5" 7873 + version = "0.5.6" 7874 7874 dependencies = [ 7875 7875 "async-trait", 7876 7876 "chrono", ··· 7893 7893 7894 7894 [[package]] 7895 7895 name = "tranquil-storage" 7896 - version = "0.5.5" 7896 + version = "0.5.6" 7897 7897 dependencies = [ 7898 7898 "async-trait", 7899 7899 "aws-config", ··· 7910 7910 7911 7911 [[package]] 7912 7912 name = "tranquil-store" 7913 - version = "0.5.5" 7913 + version = "0.5.6" 7914 7914 dependencies = [ 7915 7915 "async-trait", 7916 7916 "bytes", ··· 7959 7959 7960 7960 [[package]] 7961 7961 name = "tranquil-sync" 7962 - version = "0.5.5" 7962 + version = "0.5.6" 7963 7963 dependencies = [ 7964 7964 "anyhow", 7965 7965 "axum", ··· 7981 7981 7982 7982 [[package]] 7983 7983 name = "tranquil-types" 7984 - version = "0.5.5" 7984 + version = "0.5.6" 7985 7985 dependencies = [ 7986 7986 "chrono", 7987 7987 "cid",
+1 -1
Cargo.toml
··· 26 26 ] 27 27 28 28 [workspace.package] 29 - version = "0.5.5" 29 + version = "0.5.6" 30 30 edition = "2024" 31 31 license = "AGPL-3.0-or-later" 32 32
+3
crates/tranquil-config/src/lib.rs
··· 65 65 if env::var("ENABLE_PDS_HOSTED_DID_WEB").is_err() { 66 66 env::set_var("ENABLE_PDS_HOSTED_DID_WEB", "true"); 67 67 } 68 + if env::var("TRANQUIL_LEXICON_OFFLINE").is_err() { 69 + env::set_var("TRANQUIL_LEXICON_OFFLINE", "1"); 70 + } 68 71 } 69 72 TranquilConfig::builder() 70 73 .env()
+88 -7
crates/tranquil-pds/src/repo_ops.rs
··· 6 6 use backon::{ExponentialBuilder, Retryable}; 7 7 use bytes::Bytes; 8 8 use cid::Cid; 9 + use jacquard_common::smol_str::SmolStr; 9 10 use jacquard_common::types::{integer::LimitedU32, string::Tid}; 10 11 use jacquard_repo::commit::Commit; 11 - use jacquard_repo::mst::Mst; 12 12 use jacquard_repo::mst::util::compute_cid; 13 + use jacquard_repo::mst::{Mst, VerifiedWriteOp}; 13 14 use jacquard_repo::storage::BlockStore; 14 15 use k256::ecdsa::SigningKey; 15 16 use serde_json::{Value, json}; 17 + use std::collections::{BTreeMap, HashSet}; 16 18 use std::str::FromStr; 17 19 use std::sync::Arc; 18 20 use tokio::sync::OwnedMutexGuard; 19 - use tracing::error; 21 + use tracing::{error, warn}; 20 22 use tranquil_db_traits::SequenceNumber; 21 23 use uuid::Uuid; 22 24 ··· 236 238 ApiError::InternalError(None) 237 239 })?; 238 240 239 - let block_bytes = ctx.tracking_store.take_written_blocks(); 241 + let written_bytes = ctx.tracking_store.take_written_blocks(); 242 + let new_tree_cids: Vec<Cid> = written_bytes.keys().copied().collect(); 240 243 241 - let storage_for_diff = Arc::new(ctx.tracking_store.clone()); 242 - let original_settled = Mst::load(storage_for_diff.clone(), ctx.prev_data_cid, None); 243 - let new_settled = Mst::load(storage_for_diff, new_mst_root, None); 244 + let storage_for_proof = Arc::new(ctx.tracking_store.clone()); 245 + let original_settled = Mst::load(storage_for_proof.clone(), ctx.prev_data_cid, None); 246 + let new_settled = Mst::load(storage_for_proof.clone(), new_mst_root, None); 247 + 248 + let mut inverse_trace = new_settled.clone(); 249 + let mut non_invertible: Vec<String> = Vec::new(); 250 + let mut invert_errors: Vec<String> = Vec::new(); 251 + for op in params.ops.iter() { 252 + let (collection, rkey) = match op { 253 + RecordOp::Create { 254 + collection, rkey, .. 255 + } 256 + | RecordOp::Update { 257 + collection, rkey, .. 258 + } 259 + | RecordOp::Delete { 260 + collection, rkey, .. 261 + } => (collection, rkey), 262 + }; 263 + let key = SmolStr::new(format!("{}/{}", collection, rkey)); 264 + let verified = match op { 265 + RecordOp::Create { cid, .. } => VerifiedWriteOp::Create { 266 + key, 267 + cid: *cid.as_cid(), 268 + }, 269 + RecordOp::Update { cid, prev, .. } => VerifiedWriteOp::Update { 270 + key, 271 + cid: *cid.as_cid(), 272 + prev: *prev.as_cid(), 273 + }, 274 + RecordOp::Delete { prev, .. } => VerifiedWriteOp::Delete { 275 + key, 276 + prev: *prev.as_cid(), 277 + }, 278 + }; 279 + match inverse_trace.invert_op(verified.clone()).await { 280 + Ok(true) => {} 281 + Ok(false) => non_invertible.push(format!("{:?}", verified)), 282 + Err(e) => invert_errors.push(format!("{:?} -> {:?}", verified, e)), 283 + } 284 + } 285 + if !non_invertible.is_empty() { 286 + warn!( 287 + user_id = %params.user_id, 288 + count = non_invertible.len(), 289 + ops = ?non_invertible, 290 + "firehose proof walk: ops not invertible on new MST, consumer will reject frame" 291 + ); 292 + } 293 + if !invert_errors.is_empty() { 294 + warn!( 295 + user_id = %params.user_id, 296 + count = invert_errors.len(), 297 + failures = ?invert_errors, 298 + "firehose proof walk: invert_op errored, cover blocks may be incomplete" 299 + ); 300 + } 244 301 245 - let new_tree_cids: Vec<Cid> = block_bytes.keys().copied().collect(); 302 + let read_cid_set: HashSet<Cid> = ctx.tracking_store.get_read_cids().into_iter().collect(); 303 + let missing_read_cids: Vec<Cid> = read_cid_set 304 + .iter() 305 + .copied() 306 + .filter(|cid| !written_bytes.contains_key(cid)) 307 + .collect(); 308 + let mut relevant: BTreeMap<Cid, Bytes> = BTreeMap::new(); 309 + if !missing_read_cids.is_empty() { 310 + let fetched = ctx 311 + .tracking_store 312 + .get_many(&missing_read_cids) 313 + .await 314 + .map_err(|e| { 315 + error!("fetch cover read bytes: {e}"); 316 + ApiError::InternalError(None) 317 + })?; 318 + for (cid, maybe) in missing_read_cids.into_iter().zip(fetched) { 319 + if let Some(bytes) = maybe { 320 + relevant.insert(cid, bytes); 321 + } 322 + } 323 + } 246 324 247 325 let obsolete_cids = match original_settled.diff(&new_settled).await { 248 326 Ok(diff) => { ··· 262 340 vec![ctx.current_root_cid] 263 341 } 264 342 }; 343 + 344 + let mut block_bytes = written_bytes; 345 + block_bytes.extend(relevant); 265 346 266 347 let result = commit_and_log( 267 348 state,
+640
crates/tranquil-pds/tests/mst_inductive_firehose.rs
··· 1 + mod common; 2 + mod mst_verify; 3 + 4 + use std::collections::BTreeMap; 5 + use std::str::FromStr; 6 + use std::sync::Arc; 7 + 8 + use cid::Cid; 9 + use common::*; 10 + use jacquard_common::smol_str::SmolStr; 11 + use jacquard_repo::commit::Commit; 12 + use jacquard_repo::mst::{Mst, VerifiedWriteOp}; 13 + use jacquard_repo::storage::{BlockStore, MemoryBlockStore}; 14 + use mst_verify::{extract_event_blocks, inline_to_store}; 15 + use reqwest::StatusCode; 16 + use serde_json::{Value, json}; 17 + use tranquil_db_traits::{RepoEventType, SequenceNumber, SequencedEvent}; 18 + use tranquil_types::Did; 19 + 20 + async fn new_commit_data_cid( 21 + storage: &Arc<MemoryBlockStore>, 22 + commit_cid: &Cid, 23 + ) -> Result<Cid, String> { 24 + let commit_bytes = storage 25 + .get(commit_cid) 26 + .await 27 + .map_err(|e| format!("get commit: {e:?}"))? 28 + .ok_or_else(|| format!("CAR missing commit block {commit_cid}"))?; 29 + let commit = Commit::from_cbor(&commit_bytes).map_err(|e| format!("parse commit: {e:?}"))?; 30 + Ok(*commit.data()) 31 + } 32 + 33 + fn ops_json(event: &SequencedEvent) -> Result<&Vec<Value>, String> { 34 + event 35 + .ops 36 + .as_ref() 37 + .and_then(|v| v.as_array()) 38 + .ok_or_else(|| "event.ops not an array".into()) 39 + } 40 + 41 + fn parse_op_to_verified(op: &Value) -> Result<VerifiedWriteOp, String> { 42 + let action = op["action"].as_str().ok_or("op.action missing")?; 43 + let path = op["path"].as_str().ok_or("op.path missing")?; 44 + let key = SmolStr::new(path); 45 + match action { 46 + "create" => { 47 + let cid_str = op["cid"].as_str().ok_or("create missing cid")?; 48 + let cid = Cid::from_str(cid_str).map_err(|e| format!("parse cid: {e:?}"))?; 49 + Ok(VerifiedWriteOp::Create { key, cid }) 50 + } 51 + "update" => { 52 + let cid_str = op["cid"].as_str().ok_or("update missing cid")?; 53 + let cid = Cid::from_str(cid_str).map_err(|e| format!("parse cid: {e:?}"))?; 54 + let prev_str = op["prev"].as_str().ok_or("update missing prev")?; 55 + let prev = Cid::from_str(prev_str).map_err(|e| format!("parse prev: {e:?}"))?; 56 + Ok(VerifiedWriteOp::Update { key, cid, prev }) 57 + } 58 + "delete" => { 59 + let prev_str = op["prev"].as_str().ok_or("delete missing prev")?; 60 + let prev = Cid::from_str(prev_str).map_err(|e| format!("parse prev: {e:?}"))?; 61 + Ok(VerifiedWriteOp::Delete { key, prev }) 62 + } 63 + other => Err(format!("unknown op action: {other}")), 64 + } 65 + } 66 + 67 + async fn verify_inductive_forward(event: &SequencedEvent) -> Result<(Cid, Cid), String> { 68 + let prev_data_cid = event 69 + .prev_data_cid 70 + .as_ref() 71 + .and_then(|c| c.to_cid()) 72 + .ok_or_else(|| "event missing prev_data_cid".to_string())?; 73 + let commit_cid = event 74 + .commit_cid 75 + .as_ref() 76 + .and_then(|c| c.to_cid()) 77 + .ok_or_else(|| "event missing commit_cid".to_string())?; 78 + 79 + let storage = inline_to_store(extract_event_blocks(event)?); 80 + let expected_new_data = new_commit_data_cid(&storage, &commit_cid).await?; 81 + 82 + let mut mst = Mst::load(storage.clone(), prev_data_cid, None); 83 + for op_value in ops_json(event)? { 84 + let action = op_value["action"].as_str().ok_or("op.action missing")?; 85 + let path = op_value["path"].as_str().ok_or("op.path missing")?; 86 + match action { 87 + "create" | "update" => { 88 + let cid = Cid::from_str(op_value["cid"].as_str().ok_or("op.cid missing")?) 89 + .map_err(|e| format!("parse op.cid: {e:?}"))?; 90 + mst = mst 91 + .add(path, cid) 92 + .await 93 + .map_err(|e| format!("mst.add({path}): {e:?}"))?; 94 + } 95 + "delete" => { 96 + mst = mst 97 + .delete(path) 98 + .await 99 + .map_err(|e| format!("mst.delete({path}): {e:?}"))?; 100 + } 101 + other => return Err(format!("unknown op action: {other}")), 102 + } 103 + } 104 + let computed = mst 105 + .persist() 106 + .await 107 + .map_err(|e| format!("mst.persist: {e:?}"))?; 108 + Ok((expected_new_data, computed)) 109 + } 110 + 111 + async fn verify_inductive_inverse(event: &SequencedEvent) -> Result<(Cid, Cid), String> { 112 + let prev_data_cid = event 113 + .prev_data_cid 114 + .as_ref() 115 + .and_then(|c| c.to_cid()) 116 + .ok_or_else(|| "event missing prev_data_cid".to_string())?; 117 + let commit_cid = event 118 + .commit_cid 119 + .as_ref() 120 + .and_then(|c| c.to_cid()) 121 + .ok_or_else(|| "event missing commit_cid".to_string())?; 122 + 123 + let storage = inline_to_store(extract_event_blocks(event)?); 124 + let new_data_cid = new_commit_data_cid(&storage, &commit_cid).await?; 125 + 126 + let mut mst = Mst::load(storage.clone(), new_data_cid, None); 127 + for op_value in ops_json(event)? { 128 + let verified = parse_op_to_verified(op_value)?; 129 + let inverted = mst 130 + .invert_op(verified.clone()) 131 + .await 132 + .map_err(|e| format!("invert_op({verified:?}): {e:?}"))?; 133 + if !inverted { 134 + return Err(format!("op not invertible: {verified:?}")); 135 + } 136 + } 137 + let computed_prev = mst 138 + .get_pointer() 139 + .await 140 + .map_err(|e| format!("get_pointer: {e:?}"))?; 141 + Ok((prev_data_cid, computed_prev)) 142 + } 143 + 144 + fn report_failures(total: usize, failures: &[String], mode: &str) { 145 + assert!( 146 + failures.is_empty(), 147 + "{} of {total} {mode} commit events failed inductive verification:\n - {}", 148 + failures.len(), 149 + failures.join("\n - "), 150 + ); 151 + } 152 + 153 + async fn apply_writes_batch(client: &reqwest::Client, token: &str, did: &str, writes: Vec<Value>) { 154 + let payload = json!({ "repo": did, "writes": writes }); 155 + let res = client 156 + .post(format!( 157 + "{}/xrpc/com.atproto.repo.applyWrites", 158 + base_url().await 159 + )) 160 + .bearer_auth(token) 161 + .json(&payload) 162 + .send() 163 + .await 164 + .expect("applyWrites request failed"); 165 + assert_eq!( 166 + res.status(), 167 + StatusCode::OK, 168 + "applyWrites failed: {:?}", 169 + res.text().await 170 + ); 171 + } 172 + 173 + async fn create_record(client: &reqwest::Client, token: &str, did: &str, col: &str, rkey: &str) { 174 + let now = chrono::Utc::now().to_rfc3339(); 175 + let res = client 176 + .post(format!( 177 + "{}/xrpc/com.atproto.repo.createRecord", 178 + base_url().await 179 + )) 180 + .bearer_auth(token) 181 + .json(&json!({ 182 + "repo": did, 183 + "collection": col, 184 + "rkey": rkey, 185 + "record": { 186 + "$type": col, 187 + "text": format!("post {rkey}"), 188 + "createdAt": now, 189 + } 190 + })) 191 + .send() 192 + .await 193 + .expect("createRecord request failed"); 194 + assert_eq!(res.status(), StatusCode::OK, "createRecord failed"); 195 + } 196 + 197 + async fn put_record( 198 + client: &reqwest::Client, 199 + token: &str, 200 + did: &str, 201 + col: &str, 202 + rkey: &str, 203 + text: &str, 204 + ) { 205 + let now = chrono::Utc::now().to_rfc3339(); 206 + let res = client 207 + .post(format!( 208 + "{}/xrpc/com.atproto.repo.putRecord", 209 + base_url().await 210 + )) 211 + .bearer_auth(token) 212 + .json(&json!({ 213 + "repo": did, 214 + "collection": col, 215 + "rkey": rkey, 216 + "record": { 217 + "$type": col, 218 + "text": text, 219 + "createdAt": now, 220 + } 221 + })) 222 + .send() 223 + .await 224 + .expect("putRecord request failed"); 225 + assert_eq!(res.status(), StatusCode::OK, "putRecord failed"); 226 + } 227 + 228 + async fn delete_record(client: &reqwest::Client, token: &str, did: &str, col: &str, rkey: &str) { 229 + let res = client 230 + .post(format!( 231 + "{}/xrpc/com.atproto.repo.deleteRecord", 232 + base_url().await 233 + )) 234 + .bearer_auth(token) 235 + .json(&json!({ "repo": did, "collection": col, "rkey": rkey })) 236 + .send() 237 + .await 238 + .expect("deleteRecord request failed"); 239 + assert_eq!(res.status(), StatusCode::OK, "deleteRecord failed"); 240 + } 241 + 242 + const COLLECTION: &str = "app.bsky.feed.post"; 243 + fn rkey_for(prefix: &str, i: usize) -> String { 244 + format!("3k{prefix}{:08}", i) 245 + } 246 + 247 + async fn our_commit_events(did: &str) -> Vec<SequencedEvent> { 248 + let repos = get_test_repos().await; 249 + let typed_did = Did::new(did.to_string()).unwrap(); 250 + let events = repos 251 + .repo 252 + .get_events_since_seq(SequenceNumber::ZERO, None) 253 + .await 254 + .expect("get_events_since_seq"); 255 + events 256 + .into_iter() 257 + .filter(|e| e.did == typed_did && e.event_type == RepoEventType::Commit) 258 + .collect() 259 + } 260 + 261 + #[tokio::test] 262 + async fn inductive_forward_verifies_delete_commits() { 263 + let client = client(); 264 + let (token, did) = create_account_and_login(&client).await; 265 + 266 + let now = chrono::Utc::now().to_rfc3339(); 267 + const N_CREATE: usize = 200; 268 + 269 + let all_writes: Vec<Value> = (0..N_CREATE) 270 + .map(|i| { 271 + json!({ 272 + "$type": "com.atproto.repo.applyWrites#create", 273 + "collection": COLLECTION, 274 + "rkey": rkey_for("del", i), 275 + "value": { 276 + "$type": COLLECTION, 277 + "text": format!("record {i}"), 278 + "createdAt": now, 279 + } 280 + }) 281 + }) 282 + .collect(); 283 + for chunk in all_writes.chunks(50) { 284 + apply_writes_batch(&client, &token, &did, chunk.to_vec()).await; 285 + } 286 + 287 + let delete_indices: Vec<usize> = (10..N_CREATE).step_by(7).collect(); 288 + for i in &delete_indices { 289 + delete_record(&client, &token, &did, COLLECTION, &rkey_for("del", *i)).await; 290 + } 291 + 292 + let our = our_commit_events(&did).await; 293 + let delete_events: Vec<&SequencedEvent> = our 294 + .iter() 295 + .filter(|e| { 296 + ops_json(e) 297 + .map(|arr| arr.iter().any(|op| op["action"].as_str() == Some("delete"))) 298 + .unwrap_or(false) 299 + }) 300 + .collect(); 301 + assert_eq!(delete_events.len(), delete_indices.len()); 302 + 303 + let mut failures = Vec::new(); 304 + for e in &delete_events { 305 + match verify_inductive_forward(e).await { 306 + Ok((exp, got)) if exp == got => {} 307 + Ok((exp, got)) => failures.push(format!( 308 + "seq={}: root mismatch exp={exp} got={got}", 309 + e.seq.as_i64() 310 + )), 311 + Err(msg) => failures.push(format!("seq={}: {msg}", e.seq.as_i64())), 312 + } 313 + } 314 + report_failures(delete_events.len(), &failures, "delete forward"); 315 + } 316 + 317 + #[tokio::test] 318 + async fn inductive_forward_verifies_create_commits() { 319 + let client = client(); 320 + let (token, did) = create_account_and_login(&client).await; 321 + for i in 0..60usize { 322 + create_record(&client, &token, &did, COLLECTION, &rkey_for("cre", i)).await; 323 + } 324 + 325 + let our = our_commit_events(&did).await; 326 + let create_events: Vec<&SequencedEvent> = our 327 + .iter() 328 + .filter(|e| { 329 + ops_json(e) 330 + .map(|arr| arr.iter().all(|op| op["action"].as_str() == Some("create"))) 331 + .unwrap_or(false) 332 + && e.prev_data_cid.is_some() 333 + }) 334 + .collect(); 335 + assert!(!create_events.is_empty()); 336 + 337 + let mut failures = Vec::new(); 338 + for e in &create_events { 339 + match verify_inductive_forward(e).await { 340 + Ok((exp, got)) if exp == got => {} 341 + Ok((exp, got)) => failures.push(format!( 342 + "seq={}: root mismatch exp={exp} got={got}", 343 + e.seq.as_i64() 344 + )), 345 + Err(msg) => failures.push(format!("seq={}: {msg}", e.seq.as_i64())), 346 + } 347 + } 348 + report_failures(create_events.len(), &failures, "create forward"); 349 + } 350 + 351 + #[tokio::test] 352 + async fn inductive_forward_verifies_update_commits() { 353 + let client = client(); 354 + let (token, did) = create_account_and_login(&client).await; 355 + 356 + let now = chrono::Utc::now().to_rfc3339(); 357 + let creates: Vec<Value> = (0..80) 358 + .map(|i| { 359 + json!({ 360 + "$type": "com.atproto.repo.applyWrites#create", 361 + "collection": COLLECTION, 362 + "rkey": rkey_for("upd", i), 363 + "value": { 364 + "$type": COLLECTION, 365 + "text": format!("original {i}"), 366 + "createdAt": now, 367 + } 368 + }) 369 + }) 370 + .collect(); 371 + for chunk in creates.chunks(40) { 372 + apply_writes_batch(&client, &token, &did, chunk.to_vec()).await; 373 + } 374 + 375 + for i in (0..80).step_by(3) { 376 + put_record( 377 + &client, 378 + &token, 379 + &did, 380 + COLLECTION, 381 + &rkey_for("upd", i), 382 + &format!("updated {i}"), 383 + ) 384 + .await; 385 + } 386 + 387 + let our = our_commit_events(&did).await; 388 + let update_events: Vec<&SequencedEvent> = our 389 + .iter() 390 + .filter(|e| { 391 + ops_json(e) 392 + .map(|arr| arr.iter().any(|op| op["action"].as_str() == Some("update"))) 393 + .unwrap_or(false) 394 + }) 395 + .collect(); 396 + assert!(!update_events.is_empty()); 397 + 398 + let mut failures = Vec::new(); 399 + for e in &update_events { 400 + match verify_inductive_forward(e).await { 401 + Ok((exp, got)) if exp == got => {} 402 + Ok((exp, got)) => failures.push(format!( 403 + "seq={}: root mismatch exp={exp} got={got}", 404 + e.seq.as_i64() 405 + )), 406 + Err(msg) => failures.push(format!("seq={}: {msg}", e.seq.as_i64())), 407 + } 408 + } 409 + report_failures(update_events.len(), &failures, "update forward"); 410 + } 411 + 412 + #[tokio::test] 413 + async fn inductive_forward_verifies_mixed_applywrites() { 414 + let client = client(); 415 + let (token, did) = create_account_and_login(&client).await; 416 + 417 + let now = chrono::Utc::now().to_rfc3339(); 418 + let seed: Vec<Value> = (0..120) 419 + .map(|i| { 420 + json!({ 421 + "$type": "com.atproto.repo.applyWrites#create", 422 + "collection": COLLECTION, 423 + "rkey": rkey_for("mix", i), 424 + "value": { 425 + "$type": COLLECTION, 426 + "text": format!("seed {i}"), 427 + "createdAt": now, 428 + } 429 + }) 430 + }) 431 + .collect(); 432 + for chunk in seed.chunks(40) { 433 + apply_writes_batch(&client, &token, &did, chunk.to_vec()).await; 434 + } 435 + 436 + let mixed: Vec<Value> = (0..40) 437 + .flat_map(|i| { 438 + vec![ 439 + json!({ 440 + "$type": "com.atproto.repo.applyWrites#create", 441 + "collection": COLLECTION, 442 + "rkey": rkey_for("mxc", i), 443 + "value": { 444 + "$type": COLLECTION, 445 + "text": format!("new {i}"), 446 + "createdAt": now, 447 + } 448 + }), 449 + json!({ 450 + "$type": "com.atproto.repo.applyWrites#update", 451 + "collection": COLLECTION, 452 + "rkey": rkey_for("mix", i), 453 + "value": { 454 + "$type": COLLECTION, 455 + "text": format!("updated-mix {i}"), 456 + "createdAt": now, 457 + } 458 + }), 459 + json!({ 460 + "$type": "com.atproto.repo.applyWrites#delete", 461 + "collection": COLLECTION, 462 + "rkey": rkey_for("mix", i + 60), 463 + }), 464 + ] 465 + }) 466 + .collect(); 467 + apply_writes_batch(&client, &token, &did, mixed).await; 468 + 469 + let our = our_commit_events(&did).await; 470 + let last = our 471 + .iter() 472 + .rfind(|e| e.prev_data_cid.is_some()) 473 + .expect("at least one non-genesis commit"); 474 + 475 + let actions: Vec<&str> = ops_json(last) 476 + .unwrap() 477 + .iter() 478 + .filter_map(|op| op["action"].as_str()) 479 + .collect(); 480 + assert!(actions.contains(&"create")); 481 + assert!(actions.contains(&"update")); 482 + assert!(actions.contains(&"delete")); 483 + 484 + let (exp, got) = verify_inductive_forward(last) 485 + .await 486 + .expect("mixed applyWrites forward verify"); 487 + assert_eq!(exp, got, "mixed applyWrites commit forward-verify mismatch"); 488 + } 489 + 490 + #[tokio::test] 491 + async fn inductive_inverse_verifies_every_commit() { 492 + let client = client(); 493 + let (token, did) = create_account_and_login(&client).await; 494 + 495 + let now = chrono::Utc::now().to_rfc3339(); 496 + let seed: Vec<Value> = (0..100) 497 + .map(|i| { 498 + json!({ 499 + "$type": "com.atproto.repo.applyWrites#create", 500 + "collection": COLLECTION, 501 + "rkey": rkey_for("inv", i), 502 + "value": { 503 + "$type": COLLECTION, 504 + "text": format!("seed {i}"), 505 + "createdAt": now, 506 + } 507 + }) 508 + }) 509 + .collect(); 510 + for chunk in seed.chunks(50) { 511 + apply_writes_batch(&client, &token, &did, chunk.to_vec()).await; 512 + } 513 + for i in (0..100).step_by(5) { 514 + put_record( 515 + &client, 516 + &token, 517 + &did, 518 + COLLECTION, 519 + &rkey_for("inv", i), 520 + &format!("upd {i}"), 521 + ) 522 + .await; 523 + } 524 + for i in (2..100).step_by(11) { 525 + delete_record(&client, &token, &did, COLLECTION, &rkey_for("inv", i)).await; 526 + } 527 + 528 + let our = our_commit_events(&did).await; 529 + let non_genesis: Vec<&SequencedEvent> = our 530 + .iter() 531 + .filter(|e| e.prev_data_cid.is_some() && ops_json(e).is_ok()) 532 + .collect(); 533 + assert!(!non_genesis.is_empty()); 534 + 535 + let mut failures = Vec::new(); 536 + for e in &non_genesis { 537 + match verify_inductive_inverse(e).await { 538 + Ok((exp, got)) if exp == got => {} 539 + Ok((exp, got)) => failures.push(format!( 540 + "seq={}: inverse root mismatch exp={exp} got={got}", 541 + e.seq.as_i64() 542 + )), 543 + Err(msg) => failures.push(format!("seq={}: {msg}", e.seq.as_i64())), 544 + } 545 + } 546 + report_failures(non_genesis.len(), &failures, "any inverse"); 547 + } 548 + 549 + #[tokio::test] 550 + async fn prev_cid_chain_walks_to_genesis() { 551 + let client = client(); 552 + let (token, did) = create_account_and_login(&client).await; 553 + for i in 0..8 { 554 + create_record(&client, &token, &did, COLLECTION, &rkey_for("cha", i)).await; 555 + } 556 + 557 + let our = our_commit_events(&did).await; 558 + assert!(our.len() >= 2); 559 + 560 + let last = our.last().unwrap(); 561 + let mut current_prev: Option<Cid> = last.prev_cid.as_ref().and_then(|c| c.to_cid()); 562 + let head_commit_cid = last 563 + .commit_cid 564 + .as_ref() 565 + .and_then(|c| c.to_cid()) 566 + .expect("head commit_cid"); 567 + 568 + let by_commit: BTreeMap<Cid, &SequencedEvent> = our 569 + .iter() 570 + .filter_map(|e| { 571 + e.commit_cid 572 + .as_ref() 573 + .and_then(|c| c.to_cid()) 574 + .map(|c| (c, e)) 575 + }) 576 + .collect(); 577 + 578 + let mut visited = 1; 579 + while let Some(prev) = current_prev { 580 + let e = by_commit 581 + .get(&prev) 582 + .unwrap_or_else(|| panic!("prev commit {prev} missing from event list")); 583 + visited += 1; 584 + current_prev = e.prev_cid.as_ref().and_then(|c| c.to_cid()); 585 + } 586 + assert!( 587 + visited >= 2, 588 + "chain too short: visited={visited}, head_commit={head_commit_cid}" 589 + ); 590 + assert_eq!( 591 + visited, 592 + our.len(), 593 + "chain did not reach genesis: walked {visited}, have {}", 594 + our.len() 595 + ); 596 + } 597 + 598 + #[tokio::test] 599 + async fn record_bytes_present_in_car_for_creates() { 600 + let client = client(); 601 + let (token, did) = create_account_and_login(&client).await; 602 + 603 + let now = chrono::Utc::now().to_rfc3339(); 604 + let writes: Vec<Value> = (0..5) 605 + .map(|i| { 606 + json!({ 607 + "$type": "com.atproto.repo.applyWrites#create", 608 + "collection": COLLECTION, 609 + "rkey": rkey_for("rec", i), 610 + "value": { 611 + "$type": COLLECTION, 612 + "text": format!("rec {i}"), 613 + "createdAt": now, 614 + } 615 + }) 616 + }) 617 + .collect(); 618 + apply_writes_batch(&client, &token, &did, writes).await; 619 + 620 + let our = our_commit_events(&did).await; 621 + let latest = our.iter().rfind(|e| e.prev_data_cid.is_some()).unwrap(); 622 + 623 + let inline = extract_event_blocks(latest).unwrap(); 624 + let have_cids: std::collections::HashSet<Cid> = inline 625 + .iter() 626 + .map(|b| Cid::read_bytes(b.cid_bytes.as_slice()).unwrap()) 627 + .collect(); 628 + 629 + for op in ops_json(latest).unwrap() { 630 + if op["action"].as_str() == Some("create") 631 + && let Some(cid_str) = op["cid"].as_str() 632 + { 633 + let cid = Cid::from_str(cid_str).unwrap(); 634 + assert!( 635 + have_cids.contains(&cid), 636 + "create op record CID {cid} not present in CAR inline blocks" 637 + ); 638 + } 639 + } 640 + }
+26
crates/tranquil-pds/tests/mst_verify/mod.rs
··· 1 + use std::collections::BTreeMap; 2 + use std::sync::Arc; 3 + 4 + use bytes::Bytes; 5 + use cid::Cid; 6 + use jacquard_repo::storage::MemoryBlockStore; 7 + use tranquil_db_traits::{EventBlockInline, EventBlocks, SequencedEvent}; 8 + 9 + pub fn extract_event_blocks(event: &SequencedEvent) -> Result<&[EventBlockInline], String> { 10 + match event.blocks.as_ref() { 11 + Some(EventBlocks::Inline(v)) => Ok(v.as_slice()), 12 + Some(EventBlocks::LegacyCids(_)) => Err("legacy cids, not inline".into()), 13 + None => Err("event missing blocks".into()), 14 + } 15 + } 16 + 17 + pub fn inline_to_store(inline: &[EventBlockInline]) -> Arc<MemoryBlockStore> { 18 + let map: BTreeMap<Cid, Bytes> = inline 19 + .iter() 20 + .map(|b| { 21 + let cid = Cid::read_bytes(b.cid_bytes.as_slice()).expect("valid cid bytes"); 22 + (cid, Bytes::from(b.data.clone())) 23 + }) 24 + .collect(); 25 + Arc::new(MemoryBlockStore::new_from_blocks(map)) 26 + }
+1 -5
crates/tranquil-signal/src/client.rs
··· 122 122 123 123 impl fmt::Display for MessageTooLong { 124 124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 125 - write!( 126 - f, 127 - "message body is {} bytes, max {}", 128 - self.len, self.max 129 - ) 125 + write!(f, "message body is {} bytes, max {}", self.len, self.max) 130 126 } 131 127 } 132 128
+18 -6
crates/tranquil-store/src/gauntlet/runner.rs
··· 577 577 oracle.record_crash(); 578 578 } 579 579 shutdown_harness(&mut harness); 580 - match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await { 580 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await 581 + { 581 582 Ok(reopened) => { 582 583 harness = Some(reopened); 583 584 let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1; ··· 613 614 crash(); 614 615 oracle.record_crash(); 615 616 shutdown_harness(&mut harness); 616 - match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await { 617 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await 618 + { 617 619 Ok(reopened) => harness = Some(reopened), 618 620 Err(detail) => { 619 621 violations.push(InvariantViolation { ··· 654 656 { 655 657 let pre_snapshot = snapshot_block_index(&live.store); 656 658 shutdown_harness(&mut harness); 657 - match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await { 659 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await 660 + { 658 661 Ok(reopened) => { 659 662 let post_snapshot = snapshot_block_index(&reopened.store); 660 663 if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) { ··· 1676 1679 oracle.record_crash(); 1677 1680 } 1678 1681 shutdown_harness(&mut harness); 1679 - match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await { 1682 + match reopen_with_recovery( 1683 + &mut open, 1684 + &mut crash, 1685 + tolerate_op_errors, 1686 + reopen_backoff, 1687 + ) 1688 + .await 1689 + { 1680 1690 Ok(reopened) => { 1681 1691 harness = Some(reopened); 1682 1692 let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1; ··· 1713 1723 crash(); 1714 1724 oracle.record_crash(); 1715 1725 shutdown_harness(&mut harness); 1716 - match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await { 1726 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await 1727 + { 1717 1728 Ok(reopened) => harness = Some(reopened), 1718 1729 Err(detail) => { 1719 1730 violations.push(InvariantViolation { ··· 1755 1766 { 1756 1767 let pre_snapshot = snapshot_block_index(&live.store); 1757 1768 shutdown_harness(&mut harness); 1758 - match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await { 1769 + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors, reopen_backoff).await 1770 + { 1759 1771 Ok(reopened) => { 1760 1772 let post_snapshot = snapshot_block_index(&reopened.store); 1761 1773 if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) {