Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
213
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(tranquil-pds): same-rkey batch semantics and firehose lag recovery

Lewis: May this revision serve well! <lu5a@proton.me>

authored by lu5a.myatproto.social and committed by

Tangled 8f7aad37 75b9e316

+215 -140
+22 -22
Cargo.lock
··· 7455 7455 7456 7456 [[package]] 7457 7457 name = "tranquil-api" 7458 - version = "0.5.6" 7458 + version = "0.5.7" 7459 7459 dependencies = [ 7460 7460 "anyhow", 7461 7461 "axum", ··· 7506 7506 7507 7507 [[package]] 7508 7508 name = "tranquil-auth" 7509 - version = "0.5.6" 7509 + version = "0.5.7" 7510 7510 dependencies = [ 7511 7511 "anyhow", 7512 7512 "base32", ··· 7529 7529 7530 7530 [[package]] 7531 7531 name = "tranquil-cache" 7532 - version = "0.5.6" 7532 + version = "0.5.7" 7533 7533 dependencies = [ 7534 7534 "async-trait", 7535 7535 "base64 0.22.1", ··· 7543 7543 7544 7544 [[package]] 7545 7545 name = "tranquil-comms" 7546 - version = "0.5.6" 7546 + version = "0.5.7" 7547 7547 dependencies = [ 7548 7548 "async-trait", 7549 7549 "base64 0.22.1", ··· 7561 7561 7562 7562 [[package]] 7563 7563 name = "tranquil-config" 7564 - version = "0.5.6" 7564 + version = "0.5.7" 7565 7565 dependencies = [ 7566 7566 "confique", 7567 7567 "serde", ··· 7569 7569 7570 7570 [[package]] 7571 7571 name = "tranquil-crypto" 7572 - version = "0.5.6" 7572 + version = "0.5.7" 7573 7573 dependencies = [ 7574 7574 "aes-gcm", 7575 7575 "base64 0.22.1", ··· 7585 7585 7586 7586 [[package]] 7587 7587 name = "tranquil-db" 7588 - version = "0.5.6" 7588 + version = "0.5.7" 7589 7589 dependencies = [ 7590 7590 "async-trait", 7591 7591 "chrono", ··· 7602 7602 7603 7603 [[package]] 7604 7604 name = "tranquil-db-traits" 7605 - version = "0.5.6" 7605 + version = "0.5.7" 7606 7606 dependencies = [ 7607 7607 "async-trait", 7608 7608 "base64 0.22.1", ··· 7618 7618 7619 7619 [[package]] 7620 7620 name = "tranquil-infra" 7621 - version = "0.5.6" 7621 + version = "0.5.7" 7622 7622 dependencies = [ 7623 7623 "async-trait", 7624 7624 "bytes", ··· 7629 7629 7630 7630 [[package]] 7631 7631 name = "tranquil-lexicon" 7632 - version = "0.5.6" 7632 + version = "0.5.7" 7633 7633 dependencies = [ 7634 7634 "chrono", 7635 7635 "futures", ··· 7648 7648 7649 7649 [[package]] 7650 7650 name = "tranquil-oauth" 7651 - version = "0.5.6" 7651 + version = "0.5.7" 7652 7652 dependencies = [ 7653 7653 "anyhow", 7654 7654 "axum", ··· 7671 7671 7672 7672 [[package]] 7673 7673 name = "tranquil-oauth-server" 7674 - version = "0.5.6" 7674 + version = "0.5.7" 7675 7675 dependencies = [ 7676 7676 "axum", 7677 7677 "base64 0.22.1", ··· 7704 7704 7705 7705 [[package]] 7706 7706 name = "tranquil-pds" 7707 - version = "0.5.6" 7707 + version = "0.5.7" 7708 7708 dependencies = [ 7709 7709 "aes-gcm", 7710 7710 "anyhow", ··· 7796 7796 7797 7797 [[package]] 7798 7798 name = "tranquil-repo" 7799 - version = "0.5.6" 7799 + version = "0.5.7" 7800 7800 dependencies = [ 7801 7801 "bytes", 7802 7802 "cid", ··· 7808 7808 7809 7809 [[package]] 7810 7810 name = "tranquil-ripple" 7811 - version = "0.5.6" 7811 + version = "0.5.7" 7812 7812 dependencies = [ 7813 7813 "async-trait", 7814 7814 "backon", ··· 7833 7833 7834 7834 [[package]] 7835 7835 name = "tranquil-scopes" 7836 - version = "0.5.6" 7836 + version = "0.5.7" 7837 7837 dependencies = [ 7838 7838 "axum", 7839 7839 "futures", ··· 7849 7849 7850 7850 [[package]] 7851 7851 name = "tranquil-server" 7852 - version = "0.5.6" 7852 + version = "0.5.7" 7853 7853 dependencies = [ 7854 7854 "axum", 7855 7855 "clap", ··· 7870 7870 7871 7871 [[package]] 7872 7872 name = "tranquil-signal" 7873 - version = "0.5.6" 7873 + version = "0.5.7" 7874 7874 dependencies = [ 7875 7875 "async-trait", 7876 7876 "chrono", ··· 7893 7893 7894 7894 [[package]] 7895 7895 name = "tranquil-storage" 7896 - version = "0.5.6" 7896 + version = "0.5.7" 7897 7897 dependencies = [ 7898 7898 "async-trait", 7899 7899 "aws-config", ··· 7910 7910 7911 7911 [[package]] 7912 7912 name = "tranquil-store" 7913 - version = "0.5.6" 7913 + version = "0.5.7" 7914 7914 dependencies = [ 7915 7915 "async-trait", 7916 7916 "bytes", ··· 7959 7959 7960 7960 [[package]] 7961 7961 name = "tranquil-sync" 7962 - version = "0.5.6" 7962 + version = "0.5.7" 7963 7963 dependencies = [ 7964 7964 "anyhow", 7965 7965 "axum", ··· 7981 7981 7982 7982 [[package]] 7983 7983 name = "tranquil-types" 7984 - version = "0.5.6" 7984 + version = "0.5.7" 7985 7985 dependencies = [ 7986 7986 "chrono", 7987 7987 "cid",
+1 -1
Cargo.toml
··· 26 26 ] 27 27 28 28 [workspace.package] 29 - version = "0.5.6" 29 + version = "0.5.7" 30 30 edition = "2024" 31 31 license = "AGPL-3.0-or-later" 32 32
+20 -21
crates/tranquil-api/src/repo/record/batch.rs
··· 27 27 mst: Mst<TrackingBlockStore>, 28 28 results: Vec<WriteResult>, 29 29 ops: Vec<RecordOp>, 30 - modified_keys: Vec<String>, 31 30 all_blob_cids: Vec<String>, 32 31 backlinks_to_add: Vec<Backlink>, 33 32 backlinks_to_remove: Vec<AtUri>, ··· 44 43 mst, 45 44 mut results, 46 45 mut ops, 47 - mut modified_keys, 48 46 mut all_blob_cids, 49 47 mut backlinks_to_add, 50 48 mut backlinks_to_remove, ··· 69 67 .await?, 70 68 ) 71 69 }; 70 + let rkey = rkey.clone().unwrap_or_else(Rkey::generate); 71 + let key = format!("{}/{}", collection, rkey); 72 + if mst 73 + .get(&key) 74 + .await 75 + .map_err(|e| ApiError::InternalError(Some(format!("Failed to read MST: {e}"))))? 76 + .is_some() 77 + { 78 + return Err(ApiError::InvalidRequest(format!( 79 + "Record already exists at {key}" 80 + ))); 81 + } 72 82 all_blob_cids.extend(extract_blob_cids(value)); 73 - let rkey = rkey.clone().unwrap_or_else(Rkey::generate); 74 83 let record_ipld = tranquil_pds::util::json_to_ipld(value); 75 84 let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld) 76 85 .map_err(|_| ApiError::InvalidRecord("Failed to serialize record".into()))?; ··· 78 87 .put(&record_bytes) 79 88 .await 80 89 .map_err(|_| ApiError::InternalError(Some("Failed to store record".into())))?; 81 - let key = format!("{}/{}", collection, rkey); 82 - modified_keys.push(key.clone()); 83 90 let new_mst = mst 84 91 .add(&key, record_cid) 85 92 .await ··· 100 107 mst: new_mst, 101 108 results, 102 109 ops, 103 - modified_keys, 104 110 all_blob_cids, 105 111 backlinks_to_add, 106 112 backlinks_to_remove, ··· 124 130 .await?, 125 131 ) 126 132 }; 127 - all_blob_cids.extend(extract_blob_cids(value)); 128 - let record_ipld = tranquil_pds::util::json_to_ipld(value); 129 - let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld) 130 - .map_err(|_| ApiError::InvalidRecord("Failed to serialize record".into()))?; 131 - let record_cid = tracking_store 132 - .put(&record_bytes) 133 - .await 134 - .map_err(|_| ApiError::InternalError(Some("Failed to store record".into())))?; 135 133 let key = format!("{}/{}", collection, rkey); 136 - modified_keys.push(key.clone()); 137 134 let prev_record_cid = mst 138 135 .get(&key) 139 136 .await ··· 143 140 .ok_or_else(|| { 144 141 ApiError::InvalidRequest("Update target record does not exist".into()) 145 142 })?; 143 + all_blob_cids.extend(extract_blob_cids(value)); 144 + let record_ipld = tranquil_pds::util::json_to_ipld(value); 145 + let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld) 146 + .map_err(|_| ApiError::InvalidRecord("Failed to serialize record".into()))?; 147 + let record_cid = tracking_store 148 + .put(&record_bytes) 149 + .await 150 + .map_err(|_| ApiError::InternalError(Some("Failed to store record".into())))?; 146 151 let new_mst = mst 147 152 .update(&key, record_cid) 148 153 .await ··· 165 170 mst: new_mst, 166 171 results, 167 172 ops, 168 - modified_keys, 169 173 all_blob_cids, 170 174 backlinks_to_add, 171 175 backlinks_to_remove, ··· 173 177 } 174 178 WriteOp::Delete { collection, rkey } => { 175 179 let key = format!("{}/{}", collection, rkey); 176 - modified_keys.push(key.clone()); 177 180 let prev_record_cid = mst 178 181 .get(&key) 179 182 .await ··· 198 201 mst: new_mst, 199 202 results, 200 203 ops, 201 - modified_keys, 202 204 all_blob_cids, 203 205 backlinks_to_add, 204 206 backlinks_to_remove, ··· 219 221 mst: initial_mst, 220 222 results: Vec::new(), 221 223 ops: Vec::new(), 222 - modified_keys: Vec::new(), 223 224 all_blob_cids: Vec::new(), 224 225 backlinks_to_add: Vec::new(), 225 226 backlinks_to_remove: Vec::new(), ··· 351 352 mst: final_mst, 352 353 results, 353 354 ops, 354 - modified_keys, 355 355 all_blob_cids, 356 356 backlinks_to_add, 357 357 backlinks_to_remove, ··· 407 407 controller_did: controller_did.as_ref(), 408 408 delegation_detail: write_summary, 409 409 ops, 410 - modified_keys: &modified_keys, 411 410 blob_cids: &all_blob_cids, 412 411 backlinks_to_add, 413 412 backlinks_to_remove,
-2
crates/tranquil-api/src/repo/record/delete.rs
··· 74 74 prev: RecordCid::from(prev_record_cid), 75 75 }; 76 76 77 - let modified_keys = [key]; 78 77 let deleted_uri = AtUri::from_parts(&did, &input.collection, &input.rkey); 79 78 80 79 let commit_result = finalize_repo_write( ··· 93 92 }) 94 93 }), 95 94 ops: vec![op], 96 - modified_keys: &modified_keys, 97 95 blob_cids: &[], 98 96 backlinks_to_add: vec![], 99 97 backlinks_to_remove: vec![deleted_uri],
+12 -19
crates/tranquil-api/src/repo/record/write.rs
··· 179 179 } 180 180 } 181 181 182 + let key = format!("{}/{}", input.collection, rkey); 183 + if mst 184 + .get(&key) 185 + .await 186 + .map_err(|e| ApiError::InternalError(Some(format!("Failed to read MST: {e}"))))? 187 + .is_some() 188 + { 189 + return Err(ApiError::InvalidRequest(format!( 190 + "Record already exists at {key}" 191 + ))); 192 + } 193 + 182 194 let record_ipld = tranquil_pds::util::json_to_ipld(&input.record); 183 195 let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld) 184 196 .map_err(|_| ApiError::InvalidRecord("Failed to serialize record".into()))?; ··· 187 199 .put(&record_bytes) 188 200 .await 189 201 .map_err(|_| ApiError::InternalError(Some("Failed to save record block".into())))?; 190 - 191 - let key = format!("{}/{}", input.collection, rkey); 192 202 mst = mst 193 203 .add(&key, record_cid) 194 204 .await ··· 200 210 cid: tranquil_pds::cid_types::RecordCid::from(record_cid), 201 211 }); 202 212 203 - let modified_keys: Vec<String> = ops 204 - .iter() 205 - .map(|op| match op { 206 - RecordOp::Create { 207 - collection, rkey, .. 208 - } 209 - | RecordOp::Update { 210 - collection, rkey, .. 211 - } 212 - | RecordOp::Delete { 213 - collection, rkey, .. 214 - } => format!("{}/{}", collection, rkey), 215 - }) 216 - .collect(); 217 213 let blob_cids = extract_blob_cids(&input.record); 218 214 219 215 let created_uri = AtUri::from_parts(&did, &input.collection, &rkey); ··· 235 231 }) 236 232 }), 237 233 ops, 238 - modified_keys: &modified_keys, 239 234 blob_cids: &blob_cids, 240 235 backlinks_to_add, 241 236 backlinks_to_remove: conflict_uris_to_cleanup, ··· 367 362 } 368 363 }; 369 364 370 - let modified_keys = [key]; 371 365 let blob_cids = extract_blob_cids(&input.record); 372 366 let backlinks_to_add = extract_backlinks(&record_uri, &input.record); 373 367 ··· 387 381 }) 388 382 }), 389 383 ops: vec![op], 390 - modified_keys: &modified_keys, 391 384 blob_cids: &blob_cids, 392 385 backlinks_to_add, 393 386 backlinks_to_remove,
-4
crates/tranquil-config/src/lib.rs
··· 725 725 #[config(env = "FIREHOSE_BACKFILL_HOURS", default = 72)] 726 726 pub backfill_hours: i64, 727 727 728 - /// Maximum number of lagged events before disconnecting a slow consumer. 729 - #[config(env = "FIREHOSE_MAX_LAG", default = 5000)] 730 - pub max_lag: u64, 731 - 732 728 /// Maximum concurrent full-repo exports, eg. getRepo without `since`. 733 729 #[config(env = "MAX_CONCURRENT_REPO_EXPORTS", default = 4)] 734 730 pub max_concurrent_repo_exports: usize,
+1 -1
crates/tranquil-db-traits/src/backlink.rs
··· 7 7 8 8 use crate::DbError; 9 9 10 - #[derive(Debug, Clone, Copy, PartialEq, Eq)] 10 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 11 11 pub enum BacklinkPath { 12 12 Subject, 13 13 SubjectUri,
+94 -53
crates/tranquil-pds/src/repo_ops.rs
··· 14 14 use jacquard_repo::storage::BlockStore; 15 15 use k256::ecdsa::SigningKey; 16 16 use serde_json::{Value, json}; 17 - use std::collections::{BTreeMap, HashSet}; 17 + use std::collections::{BTreeMap, HashMap, HashSet}; 18 18 use std::str::FromStr; 19 19 use std::sync::Arc; 20 20 use tokio::sync::OwnedMutexGuard; ··· 40 40 MstOperationFailed(String), 41 41 RecordSerializationFailed(String), 42 42 InvalidCid(String), 43 + RecordAlreadyExists(String), 43 44 } 44 45 45 46 impl std::fmt::Display for CommitError { ··· 65 66 write!(f, "Failed to serialize record: {}", e) 66 67 } 67 68 Self::InvalidCid(e) => write!(f, "Invalid CID: {}", e), 69 + Self::RecordAlreadyExists(key) => write!(f, "Record already exists at {}", key), 68 70 } 69 71 } 70 72 } ··· 79 81 } 80 82 CommitError::RepoNotFound => ApiError::RepoNotFound(None), 81 83 CommitError::UserNotFound => ApiError::RepoNotFound(Some("User not found".into())), 84 + CommitError::RecordAlreadyExists(key) => { 85 + ApiError::InvalidRequest(format!("Record already exists at {key}")) 86 + } 82 87 other => { 83 88 error!("Commit failed: {}", other); 84 89 ApiError::InternalError(Some("Failed to commit changes".into())) ··· 162 167 pub controller_did: Option<&'a Did>, 163 168 pub delegation_detail: Option<serde_json::Value>, 164 169 pub ops: Vec<RecordOp>, 165 - pub modified_keys: &'a [String], 166 170 pub blob_cids: &'a [String], 167 171 pub backlinks_to_add: Vec<Backlink>, 168 172 pub backlinks_to_remove: Vec<AtUri>, ··· 248 252 let mut inverse_trace = new_settled.clone(); 249 253 let mut non_invertible: Vec<String> = Vec::new(); 250 254 let mut invert_errors: Vec<String> = Vec::new(); 251 - for op in params.ops.iter() { 252 - let (collection, rkey) = match op { 253 - RecordOp::Create { 254 - collection, rkey, .. 255 - } 256 - | RecordOp::Update { 257 - collection, rkey, .. 258 - } 259 - | RecordOp::Delete { 260 - collection, rkey, .. 261 - } => (collection, rkey), 262 - }; 255 + for op in params.ops.iter().rev() { 256 + let (collection, rkey) = op.collection_rkey(); 263 257 let key = SmolStr::new(format!("{}/{}", collection, rkey)); 264 258 let verified = match op { 265 259 RecordOp::Create { cid, .. } => VerifiedWriteOp::Create { ··· 427 421 }, 428 422 } 429 423 424 + impl RecordOp { 425 + pub fn collection_rkey(&self) -> (&Nsid, &Rkey) { 426 + match self { 427 + Self::Create { 428 + collection, rkey, .. 429 + } 430 + | Self::Update { 431 + collection, rkey, .. 432 + } 433 + | Self::Delete { 434 + collection, rkey, .. 435 + } => (collection, rkey), 436 + } 437 + } 438 + } 439 + 430 440 pub struct CommitResult { 431 441 pub commit_cid: Cid, 432 442 pub rev: String, ··· 457 467 RecordUpsert, RepoEventType, 458 468 }; 459 469 460 - let backlinks_to_add = params.backlinks_to_add; 461 - let backlinks_to_remove = params.backlinks_to_remove; 462 470 let CommitParams { 463 471 did, 464 472 user_id, ··· 471 479 new_tree_cids, 472 480 blobs, 473 481 obsolete_cids, 474 - .. 482 + backlinks_to_add, 483 + backlinks_to_remove, 475 484 } = params; 476 485 debug_assert_eq!( 477 486 current_root_cid.is_some(), ··· 517 526 518 527 let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect(); 519 528 520 - let (record_upserts, record_deletes): (Vec<RecordUpsert>, Vec<RecordDelete>) = ops.iter().fold( 521 - (Vec::new(), Vec::new()), 522 - |(mut upserts, mut deletes), op| { 523 - match op { 524 - RecordOp::Create { 525 - collection, 526 - rkey, 527 - cid, 528 - } 529 - | RecordOp::Update { 530 - collection, 531 - rkey, 532 - cid, 533 - .. 534 - } => { 535 - upserts.push(RecordUpsert { 536 - collection: collection.clone(), 537 - rkey: rkey.clone(), 538 - cid: crate::types::CidLink::from(cid.as_cid()), 539 - }); 540 - } 541 - RecordOp::Delete { 542 - collection, rkey, .. 543 - } => { 544 - deletes.push(RecordDelete { 545 - collection: collection.clone(), 546 - rkey: rkey.clone(), 547 - }); 548 - } 529 + let final_ops: HashMap<(&Nsid, &Rkey), &RecordOp> = ops 530 + .iter() 531 + .map(|op| (op.collection_rkey(), op)) 532 + .collect(); 533 + 534 + let final_record_uris: HashSet<AtUri> = final_ops 535 + .iter() 536 + .filter(|(_, op)| !matches!(op, RecordOp::Delete { .. })) 537 + .map(|((c, r), _)| AtUri::from_parts(did, c, r)) 538 + .collect(); 539 + 540 + let record_upserts: Vec<RecordUpsert> = final_ops 541 + .values() 542 + .filter_map(|op| match op { 543 + RecordOp::Create { 544 + collection, 545 + rkey, 546 + cid, 549 547 } 550 - (upserts, deletes) 551 - }, 552 - ); 548 + | RecordOp::Update { 549 + collection, 550 + rkey, 551 + cid, 552 + .. 553 + } => Some(RecordUpsert { 554 + collection: collection.clone(), 555 + rkey: rkey.clone(), 556 + cid: crate::types::CidLink::from(cid.as_cid()), 557 + }), 558 + RecordOp::Delete { .. } => None, 559 + }) 560 + .collect(); 561 + 562 + let record_deletes: Vec<RecordDelete> = final_ops 563 + .values() 564 + .filter_map(|op| match op { 565 + RecordOp::Delete { 566 + collection, rkey, .. 567 + } => Some(RecordDelete { 568 + collection: collection.clone(), 569 + rkey: rkey.clone(), 570 + }), 571 + _ => None, 572 + }) 573 + .collect(); 574 + 575 + let backlinks_to_add: Vec<Backlink> = backlinks_to_add 576 + .into_iter() 577 + .filter(|b| final_record_uris.contains(&b.uri)) 578 + .map(|b| ((b.uri.clone(), b.path), b)) 579 + .collect::<HashMap<_, _>>() 580 + .into_values() 581 + .collect(); 582 + 583 + let backlinks_to_remove: Vec<AtUri> = backlinks_to_remove 584 + .into_iter() 585 + .collect::<HashSet<_>>() 586 + .into_iter() 587 + .collect(); 553 588 554 589 let ops_json: Vec<serde_json::Value> = ops 555 590 .iter() ··· 684 719 .await 685 720 .map_err(to_commit_err)?; 686 721 722 + let key = format!("{}/{}", collection, rkey); 723 + if mst 724 + .get(&key) 725 + .await 726 + .map_err(|e| CommitError::MstOperationFailed(e.to_string()))? 727 + .is_some() 728 + { 729 + return Err(CommitError::RecordAlreadyExists(key)); 730 + } 731 + 687 732 let record_ipld = crate::util::json_to_ipld(record); 688 733 let mut record_bytes = Vec::new(); 689 734 serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld) ··· 693 738 .put(&record_bytes) 694 739 .await 695 740 .map_err(|e| CommitError::BlockStoreFailed(e.to_string()))?; 696 - 697 - let key = format!("{}/{}", collection, rkey); 698 741 let new_mst = mst 699 742 .add(&key, record_cid) 700 743 .await ··· 705 748 rkey: rkey.clone(), 706 749 cid: RecordCid::from(record_cid), 707 750 }; 708 - let modified_keys = [key]; 709 751 let blob_cids = extract_blob_cids(record); 710 752 let record_uri = AtUri::from_parts(did.as_str(), collection.as_str(), rkey.as_str()); 711 753 let backlinks = extract_backlinks(&record_uri, record); ··· 720 762 controller_did: None, 721 763 delegation_detail: None, 722 764 ops: vec![op], 723 - modified_keys: &modified_keys, 724 765 blob_cids: &blob_cids, 725 766 backlinks_to_add: backlinks, 726 767 backlinks_to_remove: vec![],
+60 -5
crates/tranquil-sync/src/subscribe_repos.rs
··· 48 48 SUBSCRIBER_COUNT.load(Ordering::SeqCst) 49 49 } 50 50 51 + async fn recover_lagged_events( 52 + socket: &mut WebSocket, 53 + state: &AppState, 54 + last_seen: &mut SequenceNumber, 55 + ) -> Result<(), ()> { 56 + if !last_seen.is_valid() { 57 + *last_seen = state.repos.repo.get_max_seq().await.map_err(|e| { 58 + error!("Lag recovery failed to read head sequence: {:?}", e); 59 + })?; 60 + return Ok(()); 61 + } 62 + loop { 63 + let events = match state 64 + .repos 65 + .repo 66 + .get_events_since_cursor(*last_seen, BACKFILL_BATCH_SIZE) 67 + .await 68 + { 69 + Ok(e) => e, 70 + Err(e) => { 71 + error!("Lag recovery DB query failed: {:?}", e); 72 + return Err(()); 73 + } 74 + }; 75 + if events.is_empty() { 76 + return Ok(()); 77 + } 78 + let batch_len = events.len(); 79 + let prefetched = match prefetch_blocks_for_events(state, &events).await { 80 + Ok(b) => b, 81 + Err(e) => { 82 + error!("Lag recovery prefetch failed: {:?}", e); 83 + return Err(()); 84 + } 85 + }; 86 + for event in events { 87 + *last_seen = event.seq; 88 + let bytes = 89 + match format_event_with_prefetched_blocks(state, event, &prefetched).await { 90 + Ok(b) => b, 91 + Err(e) => { 92 + warn!("Lag recovery format failed: {}", e); 93 + return Err(()); 94 + } 95 + }; 96 + if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 97 + warn!("Lag recovery send failed: {}", e); 98 + return Err(()); 99 + } 100 + tranquil_pds::metrics::record_firehose_event(); 101 + } 102 + if batch_len < BACKFILL_BATCH_SIZE as usize { 103 + return Ok(()); 104 + } 105 + } 106 + } 107 + 51 108 async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) { 52 109 let count = SUBSCRIBER_COUNT.fetch_add(1, Ordering::SeqCst) + 1; 53 110 tranquil_pds::metrics::set_firehose_subscribers(count); ··· 208 265 } 209 266 } 210 267 } 211 - let max_lag_before_disconnect: u64 = tranquil_config::get().firehose.max_lag; 212 268 loop { 213 269 tokio::select! { 214 270 result = rx.recv() => match result { ··· 224 280 tranquil_pds::metrics::record_firehose_event(); 225 281 } 226 282 Err(RecvError::Lagged(skipped)) => { 227 - warn!(skipped = skipped, "Firehose subscriber lagged behind"); 228 - if skipped > max_lag_before_disconnect { 229 - warn!(skipped = skipped, max_lag = max_lag_before_disconnect, 230 - "Disconnecting slow firehose consumer"); 283 + warn!(skipped, last_seen = last_seen.as_i64(), 284 + "Firehose subscriber lagged, recovering missed events from DB"); 285 + if let Err(()) = recover_lagged_events(socket, state, &mut last_seen).await { 231 286 break; 232 287 } 233 288 }
-7
example.toml
··· 348 348 # Default value: 72 349 349 #backfill_hours = 72 350 350 351 - # Maximum number of lagged events before disconnecting a slow consumer. 352 - # 353 - # Can also be specified via environment variable `FIREHOSE_MAX_LAG`. 354 - # 355 - # Default value: 5000 356 - #max_lag = 5000 357 - 358 351 # Maximum concurrent full-repo exports, eg. getRepo without `since`. 359 352 # 360 353 # Can also be specified via environment variable `MAX_CONCURRENT_REPO_EXPORTS`.
+5 -5
justfile
··· 32 32 SQLX_OFFLINE=true GAUNTLET_DURATION_HOURS={{HOURS}} cargo nextest run -p tranquil-store --features tranquil-store/test-harness --profile gauntlet-nightly --test gauntlet_smoke --run-ignored all 33 33 34 34 gauntlet-farm SCENARIO HOURS="6" DUMP="proptest-regressions": 35 - SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- farm --scenario {{SCENARIO}} --hours {{HOURS}} --dump-regressions {{DUMP}} 35 + SQLX_OFFLINE=true cargo run --release --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- farm --scenario {{SCENARIO}} --hours {{HOURS}} --dump-regressions {{DUMP}} 36 36 37 37 gauntlet-repro SEED SCENARIO="smoke-pr": 38 - SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --scenario {{SCENARIO}} --seed {{SEED}} 38 + SQLX_OFFLINE=true cargo run --release --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --scenario {{SCENARIO}} --seed {{SEED}} 39 39 40 40 gauntlet-repro-config CONFIG SEED: 41 - SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --config {{CONFIG}} --seed {{SEED}} 41 + SQLX_OFFLINE=true cargo run --release --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --config {{CONFIG}} --seed {{SEED}} 42 42 43 43 gauntlet-repro-from FILE: 44 - SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --from {{FILE}} 44 + SQLX_OFFLINE=true cargo run --release --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --from {{FILE}} 45 45 46 46 gauntlet-sweep CONFIG SEEDS="8" DUMP="proptest-regressions": 47 - SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- sweep --config {{CONFIG}} --seeds {{SEEDS}} --dump-regressions {{DUMP}} 47 + SQLX_OFFLINE=true cargo run --release --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- sweep --config {{CONFIG}} --seeds {{SEEDS}} --dump-regressions {{DUMP}} 48 48 49 49 gauntlet-soak HOURS="24" OUTPUT="": 50 50 SQLX_OFFLINE=true GAUNTLET_SOAK_HOURS={{HOURS}} GAUNTLET_SOAK_OUTPUT={{OUTPUT}} cargo nextest run -p tranquil-store --features tranquil-store/test-harness --profile gauntlet-soak --test gauntlet_soak --run-ignored all -- soak_long_leak_gate