Our Personal Data Server from scratch! tranquil.farm
pds rust database fun oauth atproto
238
fork

Configure Feed

Select the types of activity you want to include in your feed.

delayed loading spinner in frontend, more functional-style code

+813 -562
+16
.sqlx/query-7c914c71e0340325e99495a1867fea9c814b056bd978c67a0eab55ed61278197.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO record_blobs (repo_id, record_uri, blob_cid)\n SELECT $1, record_uri, blob_cid\n FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid)\n ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "TextArray", 10 + "TextArray" 11 + ] 12 + }, 13 + "nullable": [] 14 + }, 15 + "hash": "7c914c71e0340325e99495a1867fea9c814b056bd978c67a0eab55ed61278197" 16 + }
+19
.sqlx/query-a97815493ba7b9b20f6759e2e96a9000473ec5e85d865325500d2e193d5dcf8c.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes)\n VALUES ($1, $2, $3, $4, $5, $6)\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "Text", 10 + "Text", 11 + "Text", 12 + "Int4", 13 + "Int8" 14 + ] 15 + }, 16 + "nullable": [] 17 + }, 18 + "hash": "a97815493ba7b9b20f6759e2e96a9000473ec5e85d865325500d2e193d5dcf8c" 19 + }
+55 -23
crates/tranquil-pds/src/api/actor/preferences.rs
··· 127 127 )) 128 128 .into_response(); 129 129 } 130 - let mut forbidden_prefs: Vec<String> = Vec::new(); 131 - for pref in &input.preferences { 132 - let pref_str = serde_json::to_string(pref).unwrap_or_default(); 133 - if pref_str.len() > MAX_PREFERENCE_SIZE { 134 - return ApiError::InvalidRequest(format!( 130 + enum PrefValidation { 131 + Ok(Option<String>), 132 + TooLarge(usize), 133 + MissingType, 134 + WrongNamespace, 135 + } 136 + 137 + let validation_results: Vec<PrefValidation> = input 138 + .preferences 139 + .iter() 140 + .map(|pref| { 141 + let pref_str = serde_json::to_string(pref).unwrap_or_default(); 142 + if pref_str.len() > MAX_PREFERENCE_SIZE { 143 + return PrefValidation::TooLarge(pref_str.len()); 144 + } 145 + let pref_type = match pref.get("$type").and_then(|t| t.as_str()) { 146 + Some(t) => t, 147 + None => return PrefValidation::MissingType, 148 + }; 149 + if !pref_type.starts_with(APP_BSKY_NAMESPACE) { 150 + return PrefValidation::WrongNamespace; 151 + } 152 + if pref_type == PERSONAL_DETAILS_PREF && !has_full_access { 153 + PrefValidation::Ok(Some(pref_type.to_string())) 154 + } else { 155 + PrefValidation::Ok(None) 156 + } 157 + }) 158 + .collect(); 159 + 160 + if let Some(err) = validation_results.iter().find_map(|v| match v { 161 + PrefValidation::TooLarge(size) => Some( 162 + ApiError::InvalidRequest(format!( 135 163 "Preference too large: {} bytes exceeds limit of {}", 136 - pref_str.len(), 137 - MAX_PREFERENCE_SIZE 164 + size, MAX_PREFERENCE_SIZE 138 165 )) 139 - .into_response(); 140 - } 141 - let pref_type = match pref.get("$type").and_then(|t| t.as_str()) { 142 - Some(t) => t, 143 - None => { 144 - return ApiError::InvalidRequest("Preference is missing a $type".into()) 145 - .into_response(); 146 - } 147 - }; 148 - if !pref_type.starts_with(APP_BSKY_NAMESPACE) { 149 - return ApiError::InvalidRequest(format!( 166 + .into_response(), 167 + ), 168 + PrefValidation::MissingType => Some( 169 + ApiError::InvalidRequest("Preference is missing a $type".into()).into_response(), 170 + ), 171 + PrefValidation::WrongNamespace => Some( 172 + ApiError::InvalidRequest(format!( 150 173 "Some preferences are not in the {} namespace", 151 174 APP_BSKY_NAMESPACE 152 175 )) 153 - .into_response(); 154 - } 155 - if pref_type == PERSONAL_DETAILS_PREF && !has_full_access { 156 - forbidden_prefs.push(pref_type.to_string()); 157 - } 176 + .into_response(), 177 + ), 178 + PrefValidation::Ok(_) => None, 179 + }) { 180 + return err; 158 181 } 182 + 183 + let forbidden_prefs: Vec<String> = validation_results 184 + .into_iter() 185 + .filter_map(|v| match v { 186 + PrefValidation::Ok(Some(s)) => Some(s), 187 + _ => None, 188 + }) 189 + .collect(); 190 + 159 191 if !forbidden_prefs.is_empty() { 160 192 return ApiError::InvalidRequest(format!( 161 193 "Do not have authorization to set preferences: {}",
+47 -42
crates/tranquil-pds/src/api/repo/record/batch.rs
··· 343 343 }) 344 344 .collect(); 345 345 346 - for collection in create_collections { 347 - if let Err(e) = crate::auth::scope_check::check_repo_scope( 348 - is_oauth, 349 - scope.as_deref(), 350 - crate::oauth::RepoAction::Create, 351 - collection, 352 - ) { 353 - return e; 354 - } 355 - } 356 - for collection in update_collections { 357 - if let Err(e) = crate::auth::scope_check::check_repo_scope( 358 - is_oauth, 359 - scope.as_deref(), 360 - crate::oauth::RepoAction::Update, 361 - collection, 362 - ) { 363 - return e; 364 - } 365 - } 366 - for collection in delete_collections { 367 - if let Err(e) = crate::auth::scope_check::check_repo_scope( 368 - is_oauth, 369 - scope.as_deref(), 370 - crate::oauth::RepoAction::Delete, 371 - collection, 372 - ) { 373 - return e; 374 - } 346 + let scope_checks = create_collections 347 + .iter() 348 + .map(|c| (crate::oauth::RepoAction::Create, c)) 349 + .chain( 350 + update_collections 351 + .iter() 352 + .map(|c| (crate::oauth::RepoAction::Update, c)), 353 + ) 354 + .chain( 355 + delete_collections 356 + .iter() 357 + .map(|c| (crate::oauth::RepoAction::Delete, c)), 358 + ); 359 + 360 + if let Some(err) = scope_checks 361 + .filter_map(|(action, collection)| { 362 + crate::auth::scope_check::check_repo_scope( 363 + is_oauth, 364 + scope.as_deref(), 365 + action, 366 + collection, 367 + ) 368 + .err() 369 + }) 370 + .next() 371 + { 372 + return err; 375 373 } 376 374 } 377 375 ··· 439 437 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 440 438 } 441 439 }; 442 - let mut new_mst_blocks = std::collections::BTreeMap::new(); 443 - let mut old_mst_blocks = std::collections::BTreeMap::new(); 444 - for key in &modified_keys { 445 - if mst.blocks_for_path(key, &mut new_mst_blocks).await.is_err() { 446 - return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 440 + let (new_mst_blocks, old_mst_blocks) = { 441 + let mut new_blocks = std::collections::BTreeMap::new(); 442 + let mut old_blocks = std::collections::BTreeMap::new(); 443 + for key in &modified_keys { 444 + if mst.blocks_for_path(key, &mut new_blocks).await.is_err() { 445 + return ApiError::InternalError(Some( 446 + "Failed to get new MST blocks for path".into(), 447 + )) 447 448 .into_response(); 448 - } 449 - if original_mst 450 - .blocks_for_path(key, &mut old_mst_blocks) 451 - .await 452 - .is_err() 453 - { 454 - return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 449 + } 450 + if original_mst 451 + .blocks_for_path(key, &mut old_blocks) 452 + .await 453 + .is_err() 454 + { 455 + return ApiError::InternalError(Some( 456 + "Failed to get old MST blocks for path".into(), 457 + )) 455 458 .into_response(); 459 + } 456 460 } 457 - } 461 + (new_blocks, old_blocks) 462 + }; 458 463 let mut relevant_blocks = new_mst_blocks.clone(); 459 464 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 460 465 let written_cids: Vec<Cid> = tracking_store
+4 -6
crates/tranquil-pds/src/api/repo/record/utils.rs
··· 25 25 { 26 26 blobs.push(link.to_string()); 27 27 } 28 - for v in map.values() { 29 - extract_blob_cids_recursive(v, blobs); 30 - } 28 + map.values() 29 + .for_each(|v| extract_blob_cids_recursive(v, blobs)); 31 30 } 32 31 Value::Array(arr) => { 33 - for v in arr { 34 - extract_blob_cids_recursive(v, blobs); 35 - } 32 + arr.iter() 33 + .for_each(|v| extract_blob_cids_recursive(v, blobs)); 36 34 } 37 35 _ => {} 38 36 }
+18 -17
crates/tranquil-pds/src/appview/mod.rs
··· 231 231 } 232 232 233 233 fn extract_service_endpoint(&self, doc: &DidDocument) -> Option<ResolvedService> { 234 - for service in &doc.service { 235 - if service.service_type == "AtprotoAppView" 236 - || service.id.contains("atproto_appview") 237 - || service.id.ends_with("#bsky_appview") 238 - { 239 - return Some(ResolvedService { 240 - url: service.service_endpoint.clone(), 241 - did: doc.id.clone(), 242 - }); 243 - } 234 + if let Some(service) = doc.service.iter().find(|s| { 235 + s.service_type == "AtprotoAppView" 236 + || s.id.contains("atproto_appview") 237 + || s.id.ends_with("#bsky_appview") 238 + }) { 239 + return Some(ResolvedService { 240 + url: service.service_endpoint.clone(), 241 + did: doc.id.clone(), 242 + }); 244 243 } 245 244 246 - for service in &doc.service { 247 - if service.service_type.contains("AppView") || service.id.contains("appview") { 248 - return Some(ResolvedService { 249 - url: service.service_endpoint.clone(), 250 - did: doc.id.clone(), 251 - }); 252 - } 245 + if let Some(service) = doc 246 + .service 247 + .iter() 248 + .find(|s| s.service_type.contains("AppView") || s.id.contains("appview")) 249 + { 250 + return Some(ResolvedService { 251 + url: service.service_endpoint.clone(), 252 + did: doc.id.clone(), 253 + }); 253 254 } 254 255 255 256 if let Some(service) = doc.service.first()
+2 -6
crates/tranquil-pds/src/circuit_breaker.rs
··· 265 265 async fn test_circuit_breaker_half_open_closes_after_successes() { 266 266 let cb = CircuitBreaker::new("test", 3, 2, 0); 267 267 268 - for _ in 0..3 { 269 - cb.record_failure().await; 270 - } 268 + futures::future::join_all((0..3).map(|_| cb.record_failure())).await; 271 269 assert_eq!(cb.state().await, CircuitState::Open); 272 270 273 271 tokio::time::sleep(Duration::from_millis(100)).await; ··· 285 283 async fn test_circuit_breaker_half_open_reopens_on_failure() { 286 284 let cb = CircuitBreaker::new("test", 3, 2, 0); 287 285 288 - for _ in 0..3 { 289 - cb.record_failure().await; 290 - } 286 + futures::future::join_all((0..3).map(|_| cb.record_failure())).await; 291 287 292 288 tokio::time::sleep(Duration::from_millis(100)).await; 293 289 cb.can_execute().await;
+1 -3
crates/tranquil-pds/src/comms/service.rs
··· 115 115 return Ok(()); 116 116 } 117 117 debug!(count = items.len(), "Processing comms batch"); 118 - for item in items { 119 - self.process_item(item).await; 120 - } 118 + futures::future::join_all(items.into_iter().map(|item| self.process_item(item))).await; 121 119 Ok(()) 122 120 } 123 121
+2 -2
crates/tranquil-pds/src/crawlers.rs
··· 91 91 self.mark_notified(); 92 92 let circuit_breaker = self.circuit_breaker.clone(); 93 93 94 - for crawler_url in &self.crawler_urls { 94 + self.crawler_urls.iter().for_each(|crawler_url| { 95 95 let url = format!( 96 96 "{}/xrpc/com.atproto.sync.requestCrawl", 97 97 crawler_url.trim_end_matches('/') ··· 136 136 } 137 137 } 138 138 }); 139 - } 139 + }); 140 140 } 141 141 } 142 142
+24 -29
crates/tranquil-pds/src/delegation/scopes.rs
··· 57 57 return granted_set.into_iter().collect::<Vec<_>>().join(" "); 58 58 } 59 59 60 - let mut result: Vec<&str> = Vec::new(); 61 - 62 - for requested_scope in &requested_set { 63 - if granted_set.contains(requested_scope) { 64 - result.push(requested_scope); 65 - continue; 66 - } 67 - 68 - if let Some(match_result) = find_matching_scope(requested_scope, &granted_set) { 69 - result.push(match_result); 70 - } 71 - } 60 + let mut result: Vec<&str> = requested_set 61 + .iter() 62 + .filter_map(|requested_scope| { 63 + if granted_set.contains(requested_scope) { 64 + Some(*requested_scope) 65 + } else { 66 + find_matching_scope(requested_scope, &granted_set) 67 + } 68 + }) 69 + .collect(); 72 70 73 71 result.sort(); 74 72 result.join(" ") ··· 118 116 return Ok(()); 119 117 } 120 118 121 - for scope in scopes.split_whitespace() { 122 - let (base, _) = split_scope(scope); 123 - 124 - if !is_valid_scope_prefix(base) { 125 - return Err(format!("Invalid scope: {}", scope)); 126 - } 127 - } 128 - 129 - Ok(()) 119 + scopes 120 + .split_whitespace() 121 + .try_for_each(|scope| { 122 + let (base, _) = split_scope(scope); 123 + if is_valid_scope_prefix(base) { 124 + Ok(()) 125 + } else { 126 + Err(format!("Invalid scope: {}", scope)) 127 + } 128 + }) 130 129 } 131 130 132 131 fn is_valid_scope_prefix(base: &str) -> bool { 133 - let valid_prefixes = [ 132 + const VALID_PREFIXES: [&str; 7] = [ 134 133 "atproto", 135 134 "repo:", 136 135 "blob:", ··· 140 139 "transition:", 141 140 ]; 142 141 143 - for prefix in valid_prefixes { 144 - if base == prefix.trim_end_matches(':') || base.starts_with(prefix) { 145 - return true; 146 - } 147 - } 148 - 149 - false 142 + VALID_PREFIXES 143 + .iter() 144 + .any(|prefix| base == prefix.trim_end_matches(':') || base.starts_with(prefix)) 150 145 } 151 146 152 147 #[cfg(test)]
+12 -19
crates/tranquil-pds/src/handle/mod.rs
··· 25 25 .txt_lookup(&query_name) 26 26 .await 27 27 .map_err(|e| HandleResolutionError::DnsError(e.to_string()))?; 28 - for record in txt_lookup.iter() { 29 - for txt in record.txt_data() { 28 + txt_lookup 29 + .iter() 30 + .flat_map(|record| record.txt_data()) 31 + .find_map(|txt| { 30 32 let txt_str = String::from_utf8_lossy(txt); 31 - if let Some(did) = txt_str.strip_prefix("did=") { 33 + txt_str.strip_prefix("did=").and_then(|did| { 32 34 let did = did.trim(); 33 - if did.starts_with("did:") { 34 - return Ok(did.to_string()); 35 - } 36 - } 37 - } 38 - } 39 - Err(HandleResolutionError::NotFound) 35 + did.starts_with("did:").then(|| did.to_string()) 36 + }) 37 + }) 38 + .ok_or(HandleResolutionError::NotFound) 40 39 } 41 40 42 41 pub async fn resolve_handle_http(handle: &str) -> Result<String, HandleResolutionError> { ··· 95 94 let service_domains: Vec<String> = std::env::var("PDS_SERVICE_HANDLE_DOMAINS") 96 95 .map(|s| s.split(',').map(|d| d.trim().to_string()).collect()) 97 96 .unwrap_or_else(|_| vec![hostname.to_string()]); 98 - for domain in service_domains { 99 - if handle.ends_with(&format!(".{}", domain)) { 100 - return true; 101 - } 102 - if handle == domain { 103 - return true; 104 - } 105 - } 106 - false 97 + service_domains 98 + .iter() 99 + .any(|domain| handle.ends_with(&format!(".{}", domain)) || handle == domain) 107 100 } 108 101 109 102 #[cfg(test)]
+6 -13
crates/tranquil-pds/src/handle/reserved.rs
··· 1029 1029 ]; 1030 1030 1031 1031 pub static RESERVED_SUBDOMAINS: LazyLock<HashSet<&'static str>> = LazyLock::new(|| { 1032 - let mut set = HashSet::with_capacity( 1033 - ATP_SPECIFIC.len() + COMMONLY_RESERVED.len() + FAMOUS_ACCOUNTS.len(), 1034 - ); 1035 - for s in ATP_SPECIFIC { 1036 - set.insert(*s); 1037 - } 1038 - for s in COMMONLY_RESERVED { 1039 - set.insert(*s); 1040 - } 1041 - for s in FAMOUS_ACCOUNTS { 1042 - set.insert(*s); 1043 - } 1044 - set 1032 + ATP_SPECIFIC 1033 + .iter() 1034 + .chain(COMMONLY_RESERVED.iter()) 1035 + .chain(FAMOUS_ACCOUNTS.iter()) 1036 + .copied() 1037 + .collect() 1045 1038 }); 1046 1039 1047 1040 pub fn is_reserved_subdomain(subdomain: &str) -> bool {
+37 -35
crates/tranquil-pds/src/oauth/endpoints/authorize.rs
··· 1352 1352 ) 1353 1353 .await 1354 1354 .unwrap_or(true); 1355 - let mut scopes = Vec::new(); 1356 - for scope in &requested_scopes { 1357 - let (category, required, description, display_name) = 1358 - if let Some(def) = crate::oauth::scopes::SCOPE_DEFINITIONS.get(*scope) { 1359 - ( 1360 - def.category.display_name().to_string(), 1361 - def.required, 1362 - def.description.to_string(), 1363 - def.display_name.to_string(), 1364 - ) 1365 - } else if scope.starts_with("ref:") { 1366 - ( 1367 - "Reference".to_string(), 1368 - false, 1369 - "Referenced scope".to_string(), 1370 - scope.to_string(), 1371 - ) 1372 - } else { 1373 - ( 1374 - "Other".to_string(), 1375 - false, 1376 - format!("Access to {}", scope), 1377 - scope.to_string(), 1378 - ) 1379 - }; 1380 - let granted = pref_map.get(*scope).copied(); 1381 - scopes.push(ScopeInfo { 1382 - scope: scope.to_string(), 1383 - category, 1384 - required, 1385 - description, 1386 - display_name, 1387 - granted, 1388 - }); 1389 - } 1355 + let scopes: Vec<ScopeInfo> = requested_scopes 1356 + .iter() 1357 + .map(|scope| { 1358 + let (category, required, description, display_name) = 1359 + if let Some(def) = crate::oauth::scopes::SCOPE_DEFINITIONS.get(*scope) { 1360 + ( 1361 + def.category.display_name().to_string(), 1362 + def.required, 1363 + def.description.to_string(), 1364 + def.display_name.to_string(), 1365 + ) 1366 + } else if scope.starts_with("ref:") { 1367 + ( 1368 + "Reference".to_string(), 1369 + false, 1370 + "Referenced scope".to_string(), 1371 + scope.to_string(), 1372 + ) 1373 + } else { 1374 + ( 1375 + "Other".to_string(), 1376 + false, 1377 + format!("Access to {}", scope), 1378 + scope.to_string(), 1379 + ) 1380 + }; 1381 + let granted = pref_map.get(*scope).copied(); 1382 + ScopeInfo { 1383 + scope: scope.to_string(), 1384 + category, 1385 + required, 1386 + description, 1387 + display_name, 1388 + granted, 1389 + } 1390 + }) 1391 + .collect(); 1390 1392 let (is_delegation, controller_did, controller_handle, delegation_level) = 1391 1393 if let Some(ref ctrl_did) = request_data.controller_did { 1392 1394 let ctrl_handle =
+6 -6
crates/tranquil-pds/src/plc/mod.rs
··· 526 526 } 527 527 let cbor_bytes = serde_ipld_dagcbor::to_vec(&unsigned_op) 528 528 .map_err(|e| PlcError::Serialization(e.to_string()))?; 529 - for key_did in rotation_keys { 530 - if let Ok(true) = verify_signature_with_did_key(key_did, &cbor_bytes, &signature) { 531 - return Ok(true); 532 - } 533 - } 534 - Ok(false) 529 + let verified = rotation_keys 530 + .iter() 531 + .any(|key_did| { 532 + verify_signature_with_did_key(key_did, &cbor_bytes, &signature).unwrap_or(false) 533 + }); 534 + Ok(verified) 535 535 } 536 536 537 537 fn verify_signature_with_did_key(
+443 -344
crates/tranquil-pds/src/scheduled.rs
··· 14 14 use crate::storage::{BackupStorage, BlobStorage}; 15 15 use crate::sync::car::encode_car_header; 16 16 17 + async fn update_genesis_blocks_cids(db: &PgPool, blocks_cids: &[String], seq: i64) -> Result<(), sqlx::Error> { 18 + sqlx::query!( 19 + "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 20 + blocks_cids, 21 + seq 22 + ) 23 + .execute(db) 24 + .await?; 25 + Ok(()) 26 + } 27 + 28 + async fn update_repo_rev(db: &PgPool, rev: &str, user_id: uuid::Uuid) -> Result<(), sqlx::Error> { 29 + sqlx::query!( 30 + "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 31 + rev, 32 + user_id 33 + ) 34 + .execute(db) 35 + .await?; 36 + Ok(()) 37 + } 38 + 39 + async fn insert_user_blocks(db: &PgPool, user_id: uuid::Uuid, block_cids: &[Vec<u8>]) -> Result<(), sqlx::Error> { 40 + sqlx::query!( 41 + r#" 42 + INSERT INTO user_blocks (user_id, block_cid) 43 + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 44 + ON CONFLICT (user_id, block_cid) DO NOTHING 45 + "#, 46 + user_id, 47 + block_cids 48 + ) 49 + .execute(db) 50 + .await?; 51 + Ok(()) 52 + } 53 + 54 + async fn fetch_user_records(db: &PgPool, user_id: uuid::Uuid) -> Result<Vec<(String, String, String)>, sqlx::Error> { 55 + let rows = sqlx::query!( 56 + "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", 57 + user_id 58 + ) 59 + .fetch_all(db) 60 + .await?; 61 + Ok(rows.into_iter().map(|r| (r.collection, r.rkey, r.record_cid)).collect()) 62 + } 63 + 64 + async fn insert_record_blobs(db: &PgPool, user_id: uuid::Uuid, record_uris: &[String], blob_cids: &[String]) -> Result<(), sqlx::Error> { 65 + sqlx::query!( 66 + r#" 67 + INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 68 + SELECT $1, record_uri, blob_cid 69 + FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid) 70 + ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 71 + "#, 72 + user_id, 73 + record_uris, 74 + blob_cids 75 + ) 76 + .execute(db) 77 + .await?; 78 + Ok(()) 79 + } 80 + 81 + async fn delete_backup_record(db: &PgPool, id: uuid::Uuid) -> Result<(), sqlx::Error> { 82 + sqlx::query!("DELETE FROM account_backups WHERE id = $1", id) 83 + .execute(db) 84 + .await?; 85 + Ok(()) 86 + } 87 + 88 + async fn fetch_old_backups( 89 + db: &PgPool, 90 + user_id: uuid::Uuid, 91 + retention_count: i64, 92 + ) -> Result<Vec<(uuid::Uuid, String)>, sqlx::Error> { 93 + let rows = sqlx::query!( 94 + r#" 95 + SELECT id, storage_key 96 + FROM account_backups 97 + WHERE user_id = $1 98 + ORDER BY created_at DESC 99 + OFFSET $2 100 + "#, 101 + user_id, 102 + retention_count 103 + ) 104 + .fetch_all(db) 105 + .await?; 106 + Ok(rows.into_iter().map(|r| (r.id, r.storage_key)).collect()) 107 + } 108 + 109 + async fn insert_backup_record( 110 + db: &PgPool, 111 + user_id: uuid::Uuid, 112 + storage_key: &str, 113 + repo_root_cid: &str, 114 + repo_rev: &str, 115 + block_count: i32, 116 + size_bytes: i64, 117 + ) -> Result<(), sqlx::Error> { 118 + sqlx::query!( 119 + r#" 120 + INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes) 121 + VALUES ($1, $2, $3, $4, $5, $6) 122 + "#, 123 + user_id, 124 + storage_key, 125 + repo_root_cid, 126 + repo_rev, 127 + block_count, 128 + size_bytes 129 + ) 130 + .execute(db) 131 + .await?; 132 + Ok(()) 133 + } 134 + 135 + struct GenesisCommitRow { 136 + seq: i64, 137 + did: String, 138 + commit_cid: Option<String>, 139 + } 140 + 141 + async fn process_genesis_commit( 142 + db: &PgPool, 143 + block_store: &PostgresBlockStore, 144 + row: GenesisCommitRow, 145 + ) -> Result<(String, i64), (i64, &'static str)> { 146 + let commit_cid_str = row.commit_cid.ok_or((row.seq, "missing commit_cid"))?; 147 + let commit_cid = Cid::from_str(&commit_cid_str).map_err(|_| (row.seq, "invalid CID"))?; 148 + let block = block_store 149 + .get(&commit_cid) 150 + .await 151 + .map_err(|_| (row.seq, "failed to fetch block"))? 152 + .ok_or((row.seq, "block not found"))?; 153 + let commit = Commit::from_cbor(&block).map_err(|_| (row.seq, "failed to parse commit"))?; 154 + let blocks_cids = vec![commit.data.to_string(), commit_cid.to_string()]; 155 + update_genesis_blocks_cids(db, &blocks_cids, row.seq) 156 + .await 157 + .map_err(|_| (row.seq, "failed to update"))?; 158 + Ok((row.did, row.seq)) 159 + } 160 + 17 161 pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 18 162 let broken_genesis_commits = match sqlx::query!( 19 163 r#" ··· 44 188 "Backfilling blocks_cids for genesis commits" 45 189 ); 46 190 47 - let mut success = 0; 48 - let mut failed = 0; 49 - 50 - for commit_row in broken_genesis_commits { 51 - let commit_cid_str = match &commit_row.commit_cid { 52 - Some(c) => c.clone(), 53 - None => { 54 - warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); 55 - failed += 1; 56 - continue; 57 - } 58 - }; 59 - 60 - let commit_cid = match Cid::from_str(&commit_cid_str) { 61 - Ok(c) => c, 62 - Err(_) => { 63 - warn!(seq = commit_row.seq, "Invalid commit CID"); 64 - failed += 1; 65 - continue; 66 - } 67 - }; 68 - 69 - let block = match block_store.get(&commit_cid).await { 70 - Ok(Some(b)) => b, 71 - Ok(None) => { 72 - warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); 73 - failed += 1; 74 - continue; 75 - } 76 - Err(e) => { 77 - warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); 78 - failed += 1; 79 - continue; 80 - } 81 - }; 82 - 83 - let commit = match Commit::from_cbor(&block) { 84 - Ok(c) => c, 85 - Err(e) => { 86 - warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); 87 - failed += 1; 88 - continue; 89 - } 90 - }; 91 - 92 - let mst_root_cid = commit.data; 93 - let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 94 - 95 - if let Err(e) = sqlx::query!( 96 - "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 97 - &blocks_cids, 98 - commit_row.seq 191 + let results = futures::future::join_all(broken_genesis_commits.into_iter().map(|row| { 192 + process_genesis_commit( 193 + db, 194 + &block_store, 195 + GenesisCommitRow { 196 + seq: row.seq, 197 + did: row.did, 198 + commit_cid: row.commit_cid, 199 + }, 99 200 ) 100 - .execute(db) 101 - .await 102 - { 103 - warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); 104 - failed += 1; 105 - } else { 106 - info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); 107 - success += 1; 201 + })) 202 + .await; 203 + 204 + let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r { 205 + Ok((did, seq)) => { 206 + info!(seq = seq, did = %did, "Fixed genesis commit blocks_cids"); 207 + (s + 1, f) 108 208 } 109 - } 209 + Err((seq, reason)) => { 210 + warn!(seq = seq, reason = reason, "Failed to process genesis commit"); 211 + (s, f + 1) 212 + } 213 + }); 110 214 111 215 info!( 112 216 success, 113 217 failed, "Completed genesis commit blocks_cids backfill" 114 218 ); 219 + } 220 + 221 + async fn process_repo_rev( 222 + db: &PgPool, 223 + block_store: &PostgresBlockStore, 224 + user_id: uuid::Uuid, 225 + repo_root_cid: String, 226 + ) -> Result<uuid::Uuid, uuid::Uuid> { 227 + let cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?; 228 + let block = block_store 229 + .get(&cid) 230 + .await 231 + .ok() 232 + .flatten() 233 + .ok_or(user_id)?; 234 + let commit = Commit::from_cbor(&block).map_err(|_| user_id)?; 235 + let rev = commit.rev().to_string(); 236 + update_repo_rev(db, &rev, user_id) 237 + .await 238 + .map_err(|_| user_id)?; 239 + Ok(user_id) 115 240 } 116 241 117 242 pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { ··· 137 262 "Backfilling repo_rev for existing repos" 138 263 ); 139 264 140 - let mut success = 0; 141 - let mut failed = 0; 265 + let results = futures::future::join_all(repos_missing_rev.into_iter().map(|repo| { 266 + process_repo_rev(db, &block_store, repo.user_id, repo.repo_root_cid) 267 + })) 268 + .await; 142 269 143 - for repo in repos_missing_rev { 144 - let cid = match Cid::from_str(&repo.repo_root_cid) { 145 - Ok(c) => c, 146 - Err(_) => { 147 - failed += 1; 148 - continue; 270 + let (success, failed) = results 271 + .iter() 272 + .fold((0, 0), |(s, f), r| match r { 273 + Ok(_) => (s + 1, f), 274 + Err(user_id) => { 275 + warn!(user_id = %user_id, "Failed to update repo_rev"); 276 + (s, f + 1) 149 277 } 150 - }; 278 + }); 151 279 152 - let block = match block_store.get(&cid).await { 153 - Ok(Some(b)) => b, 154 - _ => { 155 - failed += 1; 156 - continue; 157 - } 158 - }; 280 + info!(success, failed, "Completed repo_rev backfill"); 281 + } 159 282 160 - let commit = match Commit::from_cbor(&block) { 161 - Ok(c) => c, 162 - Err(_) => { 163 - failed += 1; 164 - continue; 165 - } 166 - }; 167 - 168 - let rev = commit.rev().to_string(); 169 - 170 - if let Err(e) = sqlx::query!( 171 - "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 172 - rev, 173 - repo.user_id 174 - ) 175 - .execute(db) 283 + async fn process_user_blocks( 284 + db: &PgPool, 285 + block_store: &PostgresBlockStore, 286 + user_id: uuid::Uuid, 287 + repo_root_cid: String, 288 + ) -> Result<(uuid::Uuid, usize), uuid::Uuid> { 289 + let root_cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?; 290 + let block_cids = collect_current_repo_blocks(block_store, &root_cid) 176 291 .await 177 - { 178 - warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); 179 - failed += 1; 180 - } else { 181 - success += 1; 182 - } 292 + .map_err(|_| user_id)?; 293 + if block_cids.is_empty() { 294 + return Err(user_id); 183 295 } 184 - 185 - info!(success, failed, "Completed repo_rev backfill"); 296 + let count = block_cids.len(); 297 + insert_user_blocks(db, user_id, &block_cids) 298 + .await 299 + .map_err(|_| user_id)?; 300 + Ok((user_id, count)) 186 301 } 187 302 188 303 pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { ··· 214 329 "Backfilling user_blocks for existing repos" 215 330 ); 216 331 217 - let mut success = 0; 218 - let mut failed = 0; 332 + let results = futures::future::join_all(users_without_blocks.into_iter().map(|user| { 333 + process_user_blocks(db, &block_store, user.user_id, user.repo_root_cid) 334 + })) 335 + .await; 219 336 220 - for user in users_without_blocks { 221 - let root_cid = match Cid::from_str(&user.repo_root_cid) { 222 - Ok(c) => c, 223 - Err(_) => { 224 - failed += 1; 225 - continue; 226 - } 227 - }; 228 - 229 - match collect_current_repo_blocks(&block_store, &root_cid).await { 230 - Ok(block_cids) => { 231 - if block_cids.is_empty() { 232 - failed += 1; 233 - continue; 234 - } 235 - 236 - if let Err(e) = sqlx::query!( 237 - r#" 238 - INSERT INTO user_blocks (user_id, block_cid) 239 - SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 240 - ON CONFLICT (user_id, block_cid) DO NOTHING 241 - "#, 242 - user.user_id, 243 - &block_cids 244 - ) 245 - .execute(db) 246 - .await 247 - { 248 - warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 249 - failed += 1; 250 - } else { 251 - info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 252 - success += 1; 253 - } 254 - } 255 - Err(e) => { 256 - warn!(user_id = %user.user_id, error = %e, "Failed to collect repo blocks for backfill"); 257 - failed += 1; 258 - } 337 + let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r { 338 + Ok((user_id, count)) => { 339 + info!(user_id = %user_id, block_count = count, "Backfilled user_blocks"); 340 + (s + 1, f) 341 + } 342 + Err(user_id) => { 343 + warn!(user_id = %user_id, "Failed to backfill user_blocks"); 344 + (s, f + 1) 259 345 } 260 - } 346 + }); 261 347 262 348 info!(success, failed, "Completed user_blocks backfill"); 263 349 } ··· 314 400 Ok(block_cids) 315 401 } 316 402 403 + async fn process_record_blobs( 404 + db: &PgPool, 405 + block_store: &PostgresBlockStore, 406 + user_id: uuid::Uuid, 407 + did: String, 408 + ) -> Result<(uuid::Uuid, String, usize), (uuid::Uuid, &'static str)> { 409 + let records = fetch_user_records(db, user_id) 410 + .await 411 + .map_err(|_| (user_id, "failed to fetch records"))?; 412 + 413 + let mut batch_record_uris: Vec<String> = Vec::new(); 414 + let mut batch_blob_cids: Vec<String> = Vec::new(); 415 + 416 + futures::future::join_all(records.into_iter().map(|(collection, rkey, record_cid)| { 417 + let did = did.clone(); 418 + async move { 419 + let cid = Cid::from_str(&record_cid).ok()?; 420 + let block_bytes = block_store.get(&cid).await.ok()??; 421 + let record_ipld: Ipld = serde_ipld_dagcbor::from_slice(&block_bytes).ok()?; 422 + let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); 423 + Some( 424 + blob_refs 425 + .into_iter() 426 + .map(|blob_ref| { 427 + let record_uri = format!("at://{}/{}/{}", did, collection, rkey); 428 + (record_uri, blob_ref.cid) 429 + }) 430 + .collect::<Vec<_>>(), 431 + ) 432 + } 433 + })) 434 + .await 435 + .into_iter() 436 + .flatten() 437 + .flatten() 438 + .for_each(|(uri, cid)| { 439 + batch_record_uris.push(uri); 440 + batch_blob_cids.push(cid); 441 + }); 442 + 443 + let blob_refs_found = batch_record_uris.len(); 444 + if !batch_record_uris.is_empty() { 445 + insert_record_blobs(db, user_id, &batch_record_uris, &batch_blob_cids) 446 + .await 447 + .map_err(|_| (user_id, "failed to insert"))?; 448 + } 449 + Ok((user_id, did, blob_refs_found)) 450 + } 451 + 317 452 pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { 318 453 let users_needing_backfill = match sqlx::query!( 319 454 r#" ··· 344 479 "Backfilling record_blobs for existing repos" 345 480 ); 346 481 347 - let mut success = 0; 348 - let mut failed = 0; 482 + let results = futures::future::join_all(users_needing_backfill.into_iter().map(|user| { 483 + process_record_blobs(db, &block_store, user.user_id, user.did) 484 + })) 485 + .await; 349 486 350 - for user in users_needing_backfill { 351 - let records = match sqlx::query!( 352 - "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", 353 - user.user_id 354 - ) 355 - .fetch_all(db) 356 - .await 357 - { 358 - Ok(r) => r, 359 - Err(e) => { 360 - warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill"); 361 - failed += 1; 362 - continue; 487 + let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r { 488 + Ok((user_id, did, blob_refs)) => { 489 + if *blob_refs > 0 { 490 + info!(user_id = %user_id, did = %did, blob_refs = blob_refs, "Backfilled record_blobs"); 363 491 } 364 - }; 365 - 366 - let mut batch_record_uris: Vec<String> = Vec::new(); 367 - let mut batch_blob_cids: Vec<String> = Vec::new(); 368 - 369 - for record in records { 370 - let record_cid = match Cid::from_str(&record.record_cid) { 371 - Ok(c) => c, 372 - Err(_) => continue, 373 - }; 374 - 375 - let block_bytes = match block_store.get(&record_cid).await { 376 - Ok(Some(b)) => b, 377 - _ => continue, 378 - }; 379 - 380 - let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) { 381 - Ok(v) => v, 382 - Err(_) => continue, 383 - }; 384 - 385 - let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); 386 - for blob_ref in blob_refs { 387 - let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey); 388 - batch_record_uris.push(record_uri); 389 - batch_blob_cids.push(blob_ref.cid); 390 - } 492 + (s + 1, f) 391 493 } 392 - 393 - let blob_refs_found = batch_record_uris.len(); 394 - if !batch_record_uris.is_empty() { 395 - if let Err(e) = sqlx::query!( 396 - r#" 397 - INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 398 - SELECT $1, record_uri, blob_cid 399 - FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid) 400 - ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 401 - "#, 402 - user.user_id, 403 - &batch_record_uris, 404 - &batch_blob_cids 405 - ) 406 - .execute(db) 407 - .await 408 - { 409 - warn!(error = %e, "Failed to batch insert record_blobs during backfill"); 410 - } else { 411 - info!( 412 - user_id = %user.user_id, 413 - did = %user.did, 414 - blob_refs = blob_refs_found, 415 - "Backfilled record_blobs" 416 - ); 417 - } 494 + Err((user_id, reason)) => { 495 + warn!(user_id = %user_id, reason = reason, "Failed to backfill record_blobs"); 496 + (s, f + 1) 418 497 } 419 - success += 1; 420 - } 498 + }); 421 499 422 500 info!(success, failed, "Completed record_blobs backfill"); 423 501 } ··· 487 565 "Processing scheduled account deletions" 488 566 ); 489 567 490 - for account in accounts_to_delete { 491 - if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { 492 - warn!( 493 - did = %account.did, 494 - handle = %account.handle, 495 - error = %e, 496 - "Failed to delete scheduled account" 497 - ); 498 - } else { 499 - info!( 500 - did = %account.did, 501 - handle = %account.handle, 502 - "Successfully deleted scheduled account" 503 - ); 504 - } 505 - } 568 + futures::future::join_all(accounts_to_delete.into_iter().map(|account| async move { 569 + let result = delete_account_data(db, blob_store, &account.did, &account.handle).await; 570 + (account.did, account.handle, result) 571 + })) 572 + .await 573 + .into_iter() 574 + .for_each(|(did, handle, result)| match result { 575 + Ok(()) => info!(did = %did, handle = %handle, "Successfully deleted scheduled account"), 576 + Err(e) => warn!(did = %did, handle = %handle, error = %e, "Failed to delete scheduled account"), 577 + }); 506 578 507 579 Ok(()) 508 580 } ··· 526 598 .await 527 599 .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 528 600 529 - for storage_key in &blob_storage_keys { 530 - if let Err(e) = blob_store.delete(storage_key).await { 531 - warn!( 532 - storage_key = %storage_key, 533 - error = %e, 534 - "Failed to delete blob from storage (continuing anyway)" 535 - ); 536 - } 537 - } 601 + futures::future::join_all(blob_storage_keys.iter().map(|storage_key| async move { 602 + (storage_key, blob_store.delete(storage_key).await) 603 + })) 604 + .await 605 + .into_iter() 606 + .filter_map(|(key, result)| result.err().map(|e| (key, e))) 607 + .for_each(|(key, e)| { 608 + warn!(storage_key = %key, error = %e, "Failed to delete blob from storage (continuing anyway)"); 609 + }); 538 610 539 611 let mut tx = db 540 612 .begin() ··· 624 696 } 625 697 } 626 698 699 + struct BackupResult { 700 + did: String, 701 + repo_rev: String, 702 + size_bytes: i64, 703 + block_count: i32, 704 + user_id: uuid::Uuid, 705 + } 706 + 707 + enum BackupOutcome { 708 + Success(BackupResult), 709 + Skipped(String, &'static str), 710 + Failed(String, String), 711 + } 712 + 713 + async fn process_single_backup( 714 + db: &PgPool, 715 + block_store: &PostgresBlockStore, 716 + backup_storage: &BackupStorage, 717 + user_id: uuid::Uuid, 718 + did: String, 719 + repo_root_cid: String, 720 + repo_rev: Option<String>, 721 + ) -> BackupOutcome { 722 + let repo_rev = match repo_rev { 723 + Some(rev) => rev, 724 + None => return BackupOutcome::Skipped(did, "no repo_rev"), 725 + }; 726 + 727 + let head_cid = match Cid::from_str(&repo_root_cid) { 728 + Ok(c) => c, 729 + Err(_) => return BackupOutcome::Skipped(did, "invalid repo_root_cid"), 730 + }; 731 + 732 + let car_bytes = match generate_full_backup(db, block_store, user_id, &head_cid).await { 733 + Ok(bytes) => bytes, 734 + Err(e) => return BackupOutcome::Failed(did, format!("CAR generation: {}", e)), 735 + }; 736 + 737 + let block_count = count_car_blocks(&car_bytes); 738 + let size_bytes = car_bytes.len() as i64; 739 + 740 + let storage_key = match backup_storage.put_backup(&did, &repo_rev, &car_bytes).await { 741 + Ok(key) => key, 742 + Err(e) => return BackupOutcome::Failed(did, format!("S3 upload: {}", e)), 743 + }; 744 + 745 + if let Err(e) = insert_backup_record( 746 + db, 747 + user_id, 748 + &storage_key, 749 + &repo_root_cid, 750 + &repo_rev, 751 + block_count, 752 + size_bytes, 753 + ) 754 + .await 755 + { 756 + if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await { 757 + error!( 758 + did = %did, 759 + storage_key = %storage_key, 760 + error = %rollback_err, 761 + "Failed to rollback orphaned backup from S3" 762 + ); 763 + } 764 + return BackupOutcome::Failed(did, format!("DB insert: {}", e)); 765 + } 766 + 767 + BackupOutcome::Success(BackupResult { 768 + did, 769 + repo_rev, 770 + size_bytes, 771 + block_count, 772 + user_id, 773 + }) 774 + } 775 + 627 776 async fn process_scheduled_backups( 628 777 db: &PgPool, 629 778 block_store: &PostgresBlockStore, ··· 665 814 "Processing scheduled backups" 666 815 ); 667 816 668 - for user in users_needing_backup { 669 - let repo_root_cid = user.repo_root_cid.clone(); 670 - 671 - let repo_rev = match &user.repo_rev { 672 - Some(rev) => rev.clone(), 673 - None => { 674 - warn!(did = %user.did, "User has no repo_rev, skipping backup"); 675 - continue; 676 - } 677 - }; 678 - 679 - let head_cid = match Cid::from_str(&repo_root_cid) { 680 - Ok(c) => c, 681 - Err(e) => { 682 - warn!(did = %user.did, error = %e, "Invalid repo_root_cid, skipping backup"); 683 - continue; 684 - } 685 - }; 686 - 687 - let car_result = generate_full_backup(db, block_store, user.user_id, &head_cid).await; 688 - let car_bytes = match car_result { 689 - Ok(bytes) => bytes, 690 - Err(e) => { 691 - warn!(did = %user.did, error = %e, "Failed to generate CAR for backup"); 692 - continue; 693 - } 694 - }; 695 - 696 - let block_count = count_car_blocks(&car_bytes); 697 - let size_bytes = car_bytes.len() as i64; 698 - 699 - let storage_key = match backup_storage 700 - .put_backup(&user.did, &repo_rev, &car_bytes) 701 - .await 702 - { 703 - Ok(key) => key, 704 - Err(e) => { 705 - warn!(did = %user.did, error = %e, "Failed to upload backup to storage"); 706 - continue; 707 - } 708 - }; 709 - 710 - if let Err(e) = sqlx::query!( 711 - r#" 712 - INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes) 713 - VALUES ($1, $2, $3, $4, $5, $6) 714 - "#, 817 + let results = futures::future::join_all(users_needing_backup.into_iter().map(|user| { 818 + process_single_backup( 819 + db, 820 + block_store, 821 + backup_storage, 715 822 user.user_id, 716 - storage_key, 717 - repo_root_cid, 718 - repo_rev, 719 - block_count, 720 - size_bytes 823 + user.did, 824 + user.repo_root_cid, 825 + user.repo_rev, 721 826 ) 722 - .execute(db) 723 - .await 724 - { 725 - warn!(did = %user.did, error = %e, "Failed to insert backup record, rolling back S3 upload"); 726 - if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await { 727 - error!( 728 - did = %user.did, 729 - storage_key = %storage_key, 730 - error = %rollback_err, 731 - "Failed to rollback orphaned backup from S3" 827 + })) 828 + .await; 829 + 830 + futures::future::join_all(results.into_iter().map(|outcome| async move { 831 + match outcome { 832 + BackupOutcome::Success(result) => { 833 + info!( 834 + did = %result.did, 835 + rev = %result.repo_rev, 836 + size_bytes = result.size_bytes, 837 + block_count = result.block_count, 838 + "Created backup" 732 839 ); 840 + if let Err(e) = 841 + cleanup_old_backups(db, backup_storage, result.user_id, retention_count).await 842 + { 843 + warn!(did = %result.did, error = %e, "Failed to cleanup old backups"); 844 + } 733 845 } 734 - continue; 846 + BackupOutcome::Skipped(did, reason) => { 847 + warn!(did = %did, reason = reason, "Skipped backup"); 848 + } 849 + BackupOutcome::Failed(did, error) => { 850 + warn!(did = %did, error = %error, "Failed backup"); 851 + } 735 852 } 736 - 737 - info!( 738 - did = %user.did, 739 - rev = %repo_rev, 740 - size_bytes, 741 - block_count, 742 - "Created backup" 743 - ); 744 - 745 - if let Err(e) = cleanup_old_backups(db, backup_storage, user.user_id, retention_count).await 746 - { 747 - warn!(did = %user.did, error = %e, "Failed to cleanup old backups"); 748 - } 749 - } 853 + })) 854 + .await; 750 855 751 856 Ok(()) 752 857 } ··· 877 982 user_id: uuid::Uuid, 878 983 retention_count: u32, 879 984 ) -> Result<(), String> { 880 - let old_backups = sqlx::query!( 881 - r#" 882 - SELECT id, storage_key 883 - FROM account_backups 884 - WHERE user_id = $1 885 - ORDER BY created_at DESC 886 - OFFSET $2 887 - "#, 888 - user_id, 889 - retention_count as i64 890 - ) 891 - .fetch_all(db) 892 - .await 893 - .map_err(|e| format!("DB error fetching old backups: {}", e))?; 985 + let old_backups = fetch_old_backups(db, user_id, retention_count as i64) 986 + .await 987 + .map_err(|e| format!("DB error fetching old backups: {}", e))?; 894 988 895 - for backup in old_backups { 896 - if let Err(e) = backup_storage.delete_backup(&backup.storage_key).await { 897 - warn!( 898 - storage_key = %backup.storage_key, 899 - error = %e, 900 - "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan" 901 - ); 902 - continue; 989 + let results = futures::future::join_all(old_backups.into_iter().map(|(id, storage_key)| async move { 990 + match backup_storage.delete_backup(&storage_key).await { 991 + Ok(()) => match delete_backup_record(db, id).await { 992 + Ok(()) => Ok(()), 993 + Err(e) => Err(format!("DB delete failed for {}: {}", storage_key, e)), 994 + }, 995 + Err(e) => { 996 + warn!( 997 + storage_key = %storage_key, 998 + error = %e, 999 + "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan" 1000 + ); 1001 + Ok(()) 1002 + } 903 1003 } 1004 + })) 1005 + .await; 904 1006 905 - sqlx::query!("DELETE FROM account_backups WHERE id = $1", backup.id) 906 - .execute(db) 907 - .await 908 - .map_err(|e| format!("Failed to delete old backup record: {}", e))?; 909 - } 910 - 911 - Ok(()) 1007 + results 1008 + .into_iter() 1009 + .find_map(|r| r.err()) 1010 + .map_or(Ok(()), Err) 912 1011 }
+4 -4
crates/tranquil-pds/src/sync/listener.rs
··· 48 48 from_seq = catchup_start, 49 49 "Broadcasting catch-up events" 50 50 ); 51 - for event in events { 51 + events.into_iter().for_each(|event| { 52 52 let seq = event.seq; 53 53 let _ = state.firehose_tx.send(event); 54 54 LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst); 55 - } 55 + }); 56 56 } 57 57 loop { 58 58 let notification = listener.recv().await?; ··· 93 93 .await?; 94 94 if !gap_events.is_empty() { 95 95 debug!(count = gap_events.len(), "Filling sequence gap"); 96 - for event in gap_events { 96 + gap_events.into_iter().for_each(|event| { 97 97 let seq = event.seq; 98 98 let _ = state.firehose_tx.send(event); 99 99 LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst); 100 - } 100 + }); 101 101 } 102 102 } 103 103 let event = sqlx::query_as!(
+5 -8
crates/tranquil-pds/src/util.rs
··· 257 257 assert_eq!(parts[0].len(), 5); 258 258 assert_eq!(parts[1].len(), 5); 259 259 260 - for c in code.chars() { 261 - if c != '-' { 262 - assert!(BASE32_ALPHABET.contains(c)); 263 - } 264 - } 260 + assert!(code 261 + .chars() 262 + .filter(|&c| c != '-') 263 + .all(|c| BASE32_ALPHABET.contains(c))); 265 264 } 266 265 267 266 #[test] ··· 270 269 let parts: Vec<&str> = code.split('-').collect(); 271 270 assert_eq!(parts.len(), 3); 272 271 273 - for part in parts { 274 - assert_eq!(part.len(), 4); 275 - } 272 + assert!(parts.iter().all(|part| part.len() == 4)); 276 273 } 277 274 278 275 #[test]
+3 -2
crates/tranquil-pds/src/validation/mod.rs
··· 534 534 "Collection NSID must have at least 3 segments".to_string(), 535 535 )); 536 536 } 537 - for part in &parts { 537 + parts.iter().try_for_each(|part| { 538 538 if part.is_empty() { 539 539 return Err(ValidationError::InvalidRecord( 540 540 "Collection NSID segments cannot be empty".to_string(), ··· 545 545 "Collection NSID segments must be alphanumeric or hyphens".to_string(), 546 546 )); 547 547 } 548 - } 548 + Ok(()) 549 + })?; 549 550 Ok(()) 550 551 } 551 552
+45 -2
frontend/src/App.svelte
··· 35 35 import ActAs from './routes/ActAs.svelte' 36 36 import Migration from './routes/Migration.svelte' 37 37 import DidDocumentEditor from './routes/DidDocumentEditor.svelte' 38 + import { _ } from './lib/i18n' 38 39 initI18n() 39 40 40 41 const auth = $derived(getAuthState()) 41 42 42 43 let oauthCallbackPending = $state(hasOAuthCallback()) 44 + let showSpinner = $state(false) 45 + let loadingTimer: ReturnType<typeof setTimeout> | null = null 43 46 44 47 function hasOAuthCallback(): boolean { 45 48 if (window.location.pathname === '/app/migrate') { ··· 50 53 } 51 54 52 55 $effect(() => { 56 + loadingTimer = setTimeout(() => { 57 + showSpinner = true 58 + }, 5000) 59 + 53 60 initServerConfig() 54 61 initAuth().then(({ oauthLoginCompleted }) => { 55 62 if (oauthLoginCompleted) { 56 63 navigate('/dashboard', { replace: true }) 57 64 } 58 65 oauthCallbackPending = false 66 + if (loadingTimer) { 67 + clearTimeout(loadingTimer) 68 + loadingTimer = null 69 + } 59 70 }) 71 + 72 + return () => { 73 + if (loadingTimer) { 74 + clearTimeout(loadingTimer) 75 + } 76 + } 60 77 }) 78 + 79 + const isLoading = $derived( 80 + auth.kind === 'loading' || $i18nLoading || oauthCallbackPending 81 + ) 61 82 62 83 $effect(() => { 63 84 if (auth.kind === 'loading') return ··· 143 164 </script> 144 165 145 166 <main> 146 - {#if auth.kind === 'loading' || $i18nLoading || oauthCallbackPending} 147 - <div class="loading"></div> 167 + {#if isLoading} 168 + <div class="loading"> 169 + {#if showSpinner} 170 + <div class="loading-content"> 171 + <div class="spinner"></div> 172 + <p>{$_('common.loading')}</p> 173 + </div> 174 + {/if} 175 + </div> 148 176 {:else} 149 177 <CurrentComponent /> 150 178 {/if} ··· 158 186 159 187 .loading { 160 188 min-height: 100vh; 189 + display: flex; 190 + align-items: center; 191 + justify-content: center; 192 + } 193 + 194 + .loading-content { 195 + display: flex; 196 + flex-direction: column; 197 + align-items: center; 198 + gap: var(--space-4); 199 + } 200 + 201 + .loading-content p { 202 + margin: 0; 203 + color: var(--text-secondary); 161 204 } 162 205 </style>
+37 -1
frontend/src/routes/OAuthConsent.svelte
··· 27 27 } 28 28 29 29 let loading = $state(true) 30 + let showSpinner = $state(false) 31 + let loadingTimer: ReturnType<typeof setTimeout> | null = null 30 32 let error = $state<string | null>(null) 31 33 let submitting = $state(false) 32 34 let consentData = $state<ConsentData | null>(null) ··· 71 73 error = $_('oauth.error.genericError') 72 74 } finally { 73 75 loading = false 76 + showSpinner = false 77 + if (loadingTimer) { 78 + clearTimeout(loadingTimer) 79 + loadingTimer = null 80 + } 74 81 } 75 82 } 76 83 ··· 151 158 } 152 159 153 160 $effect(() => { 161 + loadingTimer = setTimeout(() => { 162 + if (loading) { 163 + showSpinner = true 164 + } 165 + }, 5000) 154 166 fetchConsentData() 167 + return () => { 168 + if (loadingTimer) { 169 + clearTimeout(loadingTimer) 170 + } 171 + } 155 172 }) 156 173 157 174 let scopeGroups = $derived(consentData ? groupScopesByCategory(consentData.scopes) : {}) ··· 159 176 160 177 <div class="consent-container"> 161 178 {#if loading} 162 - <div class="loading"></div> 179 + <div class="loading"> 180 + {#if showSpinner} 181 + <div class="loading-content"> 182 + <div class="spinner"></div> 183 + <p>{$_('common.loading')}</p> 184 + </div> 185 + {/if} 186 + </div> 163 187 {:else if error} 164 188 <div class="error-container"> 165 189 <h1>{$_('oauth.error.title')}</h1> ··· 293 317 align-items: center; 294 318 justify-content: center; 295 319 min-height: 200px; 320 + color: var(--text-secondary); 321 + } 322 + 323 + .loading-content { 324 + display: flex; 325 + flex-direction: column; 326 + align-items: center; 327 + gap: var(--space-4); 328 + } 329 + 330 + .loading-content p { 331 + margin: 0; 296 332 color: var(--text-secondary); 297 333 } 298 334
+27
frontend/src/styles/base.css
··· 494 494 .info-panel p:last-child { 495 495 margin-bottom: 0; 496 496 } 497 + 498 + .spinner { 499 + width: 40px; 500 + height: 40px; 501 + border: 3px solid var(--border-color); 502 + border-top-color: var(--accent); 503 + border-radius: 50%; 504 + animation: spin 1s linear infinite; 505 + } 506 + 507 + .spinner.sm { 508 + width: 20px; 509 + height: 20px; 510 + border-width: 2px; 511 + } 512 + 513 + .spinner.lg { 514 + width: 60px; 515 + height: 60px; 516 + border-width: 4px; 517 + } 518 + 519 + @keyframes spin { 520 + to { 521 + transform: rotate(360deg); 522 + } 523 + }