Our Personal Data Server from scratch!
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(pds): do service identifier resolution for proxying correctly

authored by

nelind and committed by tangled.org f777c288 90396487

+467 -464
+4 -3
crates/tranquil-api/src/delegation.rs
··· 37 37 async move { 38 38 if c.handle.is_none() { 39 39 c.handle = did_resolver 40 - .resolve_did_document(c.did.as_str()) 40 + .fetch_did_document(c.did.as_str()) 41 41 .await 42 + .ok() 42 43 .and_then(|doc| tranquil_types::did_doc::extract_handle(&doc)) 43 44 .map(Into::into); 44 45 } ··· 64 65 ) -> Result<Json<SuccessResponse>, ApiError> { 65 66 let resolved = tranquil_pds::delegation::resolve_identity(&state, &input.controller_did) 66 67 .await 67 - .ok_or(ApiError::ControllerNotFound)?; 68 + .map_err(|_| ApiError::ControllerNotFound)?; 68 69 69 70 if !resolved.is_local 70 71 && let Some(ref pds_url) = resolved.pds_url ··· 476 477 477 478 let resolved = tranquil_pds::delegation::resolve_identity(&state, &did) 478 479 .await 479 - .ok_or(ApiError::ControllerNotFound)?; 480 + .map_err(|_| ApiError::ControllerNotFound)?; 480 481 481 482 Ok(Json(resolved)) 482 483 }
+8 -3
crates/tranquil-api/src/identity/account.rs
··· 153 153 Json(CreateAccountOutput { 154 154 handle: handle.to_string().into(), 155 155 did: did_typed.clone(), 156 - did_doc: state.did_resolver.resolve_did_document(did).await, 156 + did_doc: state 157 + .did_resolver 158 + .fetch_did_document(did) 159 + .await 160 + .ok() 161 + .and_then(|f| Some((*f).clone())), 157 162 access_jwt: access_meta.token, 158 163 refresh_jwt: refresh_meta.token, 159 164 verification_required, ··· 580 585 Err(e) => return e.into_response(), 581 586 }; 582 587 583 - let did_doc = state.did_resolver.resolve_did_document(&did).await; 588 + let did_doc = state.did_resolver.fetch_did_document(&did).await.ok(); 584 589 585 590 if is_migration { 586 591 info!( ··· 594 599 Json(CreateAccountOutput { 595 600 handle: handle.clone().into(), 596 601 did: did_for_commit, 597 - did_doc, 602 + did_doc: did_doc.and_then(|f| Some((*f).clone())), 598 603 access_jwt: session.access_jwt, 599 604 refresh_jwt: session.refresh_jwt, 600 605 verification_required: !is_migration,
+1 -1
crates/tranquil-api/src/identity/plc/submit.rs
··· 153 153 .cache 154 154 .delete(&tranquil_pds::cache_keys::plc_data_key(did)) 155 155 .await; 156 - if state.did_resolver.refresh_did(did).await.is_none() { 156 + if state.did_resolver.refresh_did(did).await.is_err() { 157 157 warn!(did = %did, "Failed to refresh DID cache after PLC update"); 158 158 } 159 159 info!(did = %did, "PLC operation submitted successfully");
+1 -1
crates/tranquil-api/src/server/account_status.rs
··· 365 365 .cache 366 366 .delete(&tranquil_pds::cache_keys::plc_data_key(&did)) 367 367 .await; 368 - if state.did_resolver.refresh_did(did.as_str()).await.is_none() { 368 + if state.did_resolver.refresh_did(did.as_str()).await.is_err() { 369 369 warn!( 370 370 "[MIGRATION] activateAccount: Failed to refresh DID cache for {}", 371 371 did
+44 -25
crates/tranquil-api/src/server/session.rs
··· 69 69 input.identifier, normalized_identifier 70 70 ); 71 71 let row = match state 72 - .repos.user 72 + .repos 73 + .user 73 74 .get_login_full_by_identifier(normalized_identifier.as_str()) 74 75 .await 75 76 { ··· 130 131 } 131 132 let is_verified = row.channel_verification.has_any_verified(); 132 133 let is_delegated = state 133 - .repos.delegation 134 + .repos 135 + .delegation 134 136 .is_delegated_account(&row.did) 135 137 .await 136 138 .unwrap_or(false); ··· 270 272 }; 271 273 let (insert_result, did_doc) = tokio::join!( 272 274 state.repos.session.create_session(&session_data), 273 - did_resolver.resolve_did_document(&did_for_doc) 275 + did_resolver.fetch_did_document(&did_for_doc), 274 276 ); 275 277 if let Err(e) = insert_result { 276 278 error!("Failed to insert session: {:?}", e); ··· 311 313 refresh_jwt: refresh_meta.token, 312 314 handle, 313 315 did: row.did, 314 - did_doc, 316 + did_doc: did_doc.ok().and_then(|f| Some((*f).clone())), 315 317 email: row.email, 316 318 email_confirmed: Some(row.channel_verification.email), 317 319 email_auth_factor: email_auth_factor_out, ··· 360 362 let did_resolver = state.did_resolver.clone(); 361 363 let (db_result, did_doc) = tokio::join!( 362 364 state.repos.user.get_session_info_by_did(&auth.did), 363 - did_resolver.resolve_did_document(&did_for_doc) 365 + did_resolver.fetch_did_document(&did_for_doc) 364 366 ); 365 367 match db_result { 366 368 Ok(Some(row)) => { ··· 404 406 status: account_state.status_for_session().map(String::from), 405 407 migrated_to_pds, 406 408 migrated_at, 407 - did_doc, 409 + did_doc: did_doc.ok().and_then(|f| Some((*f).clone())), 408 410 })) 409 411 } 410 412 Ok(None) => Err(ApiError::AuthenticationFailed(None)), ··· 476 478 } 477 479 }; 478 480 if let Ok(Some(_)) = state 479 - .repos.session 481 + .repos 482 + .session 480 483 .check_refresh_token_used(&refresh_jti) 481 484 .await 482 485 { ··· 486 489 ))); 487 490 } 488 491 let session_row = match state 489 - .repos.session 492 + .repos 493 + .session 490 494 .get_session_for_refresh(&refresh_jti) 491 495 .await 492 496 { ··· 548 552 new_refresh_expires_at: new_refresh_meta.expires_at, 549 553 }; 550 554 match state 551 - .repos.session 555 + .repos 556 + .session 552 557 .refresh_session_atomic(&refresh_data) 553 558 .await 554 559 { ··· 577 582 let did_resolver = state.did_resolver.clone(); 578 583 let (db_result, did_doc) = tokio::join!( 579 584 state.repos.user.get_session_info_by_did(&session_row.did), 580 - did_resolver.resolve_did_document(&did_for_doc) 585 + did_resolver.fetch_did_document(&did_for_doc) 581 586 ); 582 587 match db_result { 583 588 Ok(Some(u)) => { ··· 599 604 preferred_locale: u.preferred_locale, 600 605 is_admin: u.is_admin, 601 606 active: account_state.is_active(), 602 - did_doc, 607 + did_doc: did_doc.ok().and_then(|f| Some((*f).clone())), 603 608 status: account_state.status_for_session().map(String::from), 604 609 })) 605 610 } ··· 702 707 }; 703 708 704 709 if let Err(e) = state 705 - .repos.user 710 + .repos 711 + .user 706 712 .set_channel_verified(&input.did, row.channel) 707 713 .await 708 714 { ··· 821 827 ) -> Result<Json<SuccessResponse>, ApiError> { 822 828 info!("resend_verification called for DID: {}", input.did); 823 829 let row = match state 824 - .repos.user 830 + .repos 831 + .user 825 832 .get_resend_verification_by_did(&input.did) 826 833 .await 827 834 { ··· 895 902 let current_jti = tranquil_pds::auth::extract_jti_from_headers(&headers); 896 903 897 904 let jwt_rows = state 898 - .repos.session 905 + .repos 906 + .session 899 907 .list_sessions_by_did(&auth.did) 900 908 .await 901 909 .log_db_err("fetching JWT sessions")?; 902 910 903 911 let oauth_rows = state 904 - .repos.oauth 912 + .repos 913 + .oauth 905 914 .list_sessions_by_did(&auth.did) 906 915 .await 907 916 .log_db_err("fetching OAuth sessions")?; ··· 962 971 .map(SessionId::new) 963 972 .map_err(|_| ApiError::InvalidRequest("Invalid session ID".into()))?; 964 973 let access_jti = state 965 - .repos.session 974 + .repos 975 + .session 966 976 .get_session_access_jti_by_id(session_id, &auth.did) 967 977 .await 968 978 .log_db_err("in revoke_session")? 969 979 .ok_or(ApiError::SessionNotFound)?; 970 980 state 971 - .repos.session 981 + .repos 982 + .session 972 983 .delete_session_by_id(session_id) 973 984 .await 974 985 .log_db_err("deleting session")?; ··· 983 994 .map(TokenFamilyId::new) 984 995 .map_err(|_| ApiError::InvalidRequest("Invalid session ID".into()))?; 985 996 let deleted = state 986 - .repos.oauth 997 + .repos 998 + .oauth 987 999 .delete_session_by_id(session_id, &auth.did) 988 1000 .await 989 1001 .log_db_err("deleting OAuth session")?; ··· 1007 1019 1008 1020 if auth.is_oauth() { 1009 1021 state 1010 - .repos.session 1022 + .repos 1023 + .session 1011 1024 .delete_sessions_by_did(&auth.did) 1012 1025 .await 1013 1026 .log_db_err("revoking JWT sessions")?; 1014 1027 let jti_typed = TokenId::from(jti.clone()); 1015 1028 state 1016 - .repos.oauth 1029 + .repos 1030 + .oauth 1017 1031 .delete_sessions_by_did_except(&auth.did, &jti_typed) 1018 1032 .await 1019 1033 .log_db_err("revoking OAuth sessions")?; 1020 1034 } else { 1021 1035 state 1022 - .repos.session 1036 + .repos 1037 + .session 1023 1038 .delete_sessions_by_did_except_jti(&auth.did, &jti) 1024 1039 .await 1025 1040 .log_db_err("revoking JWT sessions")?; 1026 1041 state 1027 - .repos.oauth 1042 + .repos 1043 + .oauth 1028 1044 .delete_sessions_by_did(&auth.did) 1029 1045 .await 1030 1046 .log_db_err("revoking OAuth sessions")?; ··· 1046 1062 auth: Auth<Active>, 1047 1063 ) -> Result<Json<LegacyLoginPreferenceOutput>, ApiError> { 1048 1064 let pref = state 1049 - .repos.user 1065 + .repos 1066 + .user 1050 1067 .get_legacy_login_pref(&auth.did) 1051 1068 .await 1052 1069 .log_db_err("getting legacy login pref")? ··· 1079 1096 let reauth_mfa = require_reauth_window(&state, &auth).await?; 1080 1097 1081 1098 let updated = state 1082 - .repos.user 1099 + .repos 1100 + .user 1083 1101 .update_legacy_login(reauth_mfa.did(), input.allow_legacy_login) 1084 1102 .await 1085 1103 .log_db_err("updating legacy login")?; ··· 1117 1135 } 1118 1136 1119 1137 let updated = state 1120 - .repos.user 1138 + .repos 1139 + .user 1121 1140 .update_locale(&auth.did, &input.preferred_locale) 1122 1141 .await 1123 1142 .log_db_err("updating locale")?;
+8 -3
crates/tranquil-pds/src/api/proxy.rs
··· 205 205 .into_response(); 206 206 }; 207 207 208 - let did = proxy_header.split('#').next().unwrap_or(&proxy_header); 209 - let Some(resolved) = state.did_resolver.resolve_did(did).await else { 210 - error!(did = %did, "Could not resolve service DID"); 208 + let Some((did, service_id)) = proxy_header.split_once("#") else { 209 + return ApiError::InvalidRequest( 210 + "Invalid atproto-proxy header. Missing service identifier.".into(), 211 + ) 212 + .into_response(); 213 + }; 214 + let Ok(resolved) = state.did_resolver.resolve_service(did, service_id).await else { 215 + error!(did = %did, service_id = %service_id, "Could not resolve service DID"); 211 216 return ApiError::UpstreamFailure.into_response(); 212 217 }; 213 218
-418
crates/tranquil-pds/src/appview/mod.rs
··· 1 - use reqwest::Client; 2 - use serde::{Deserialize, Serialize}; 3 - use std::collections::HashMap; 4 - use std::sync::Arc; 5 - use std::time::{Duration, Instant}; 6 - use tokio::sync::RwLock; 7 - use tracing::{debug, error, info, warn}; 8 - 9 - #[derive(Debug, thiserror::Error)] 10 - pub enum DidResolutionError { 11 - #[error("Invalid did:web format")] 12 - InvalidDidWeb, 13 - #[error("HTTP request failed: {0}")] 14 - HttpFailed(String), 15 - #[error("Invalid DID document: {0}")] 16 - InvalidDocument(String), 17 - #[error("DID not found")] 18 - NotFound, 19 - } 20 - 21 - #[derive(Debug, Clone, Serialize, Deserialize)] 22 - pub struct DidDocument { 23 - pub id: String, 24 - #[serde(default)] 25 - pub service: Vec<DidService>, 26 - } 27 - 28 - #[derive(Debug, Clone, Serialize, Deserialize)] 29 - #[serde(rename_all = "camelCase")] 30 - pub struct DidService { 31 - pub id: String, 32 - #[serde(rename = "type")] 33 - pub service_type: String, 34 - pub service_endpoint: String, 35 - } 36 - 37 - #[derive(Clone)] 38 - struct CachedDid { 39 - url: String, 40 - did: String, 41 - resolved_at: Instant, 42 - } 43 - 44 - #[derive(Clone)] 45 - struct CachedDidDocument { 46 - document: serde_json::Value, 47 - resolved_at: Instant, 48 - } 49 - 50 - #[derive(Debug, Clone)] 51 - pub struct ResolvedService { 52 - pub url: String, 53 - pub did: String, 54 - } 55 - 56 - #[derive(Clone)] 57 - pub struct DidResolver { 58 - did_cache: Arc<RwLock<HashMap<String, CachedDid>>>, 59 - did_doc_cache: Arc<RwLock<HashMap<String, CachedDidDocument>>>, 60 - client: Client, 61 - cache_ttl: Duration, 62 - plc_directory_url: String, 63 - } 64 - 65 - impl DidResolver { 66 - pub fn new() -> Self { 67 - let cfg = tranquil_config::get(); 68 - let cache_ttl_secs = cfg.plc.did_cache_ttl_secs; 69 - 70 - let plc_directory_url = cfg.plc.directory_url.clone(); 71 - 72 - let client = Client::builder() 73 - .timeout(Duration::from_secs(10)) 74 - .connect_timeout(Duration::from_secs(5)) 75 - .pool_max_idle_per_host(10) 76 - .build() 77 - .unwrap_or_else(|_| Client::new()); 78 - 79 - info!("DID resolver initialized"); 80 - 81 - Self { 82 - did_cache: Arc::new(RwLock::new(HashMap::new())), 83 - did_doc_cache: Arc::new(RwLock::new(HashMap::new())), 84 - client, 85 - cache_ttl: Duration::from_secs(cache_ttl_secs), 86 - plc_directory_url, 87 - } 88 - } 89 - 90 - fn build_did_web_url(did: &str) -> Result<String, DidResolutionError> { 91 - let host = did 92 - .strip_prefix("did:web:") 93 - .ok_or(DidResolutionError::InvalidDidWeb)?; 94 - 95 - let (host, path) = if host.contains(':') { 96 - let decoded = host.replace("%3A", ":"); 97 - let parts: Vec<&str> = decoded.splitn(2, '/').collect(); 98 - if parts.len() > 1 { 99 - (parts[0].to_string(), format!("/{}", parts[1])) 100 - } else { 101 - (decoded, String::new()) 102 - } 103 - } else { 104 - let parts: Vec<&str> = host.splitn(2, ':').collect(); 105 - if parts.len() > 1 && parts[1].contains('/') { 106 - let path_parts: Vec<&str> = parts[1].splitn(2, '/').collect(); 107 - if path_parts.len() > 1 { 108 - ( 109 - format!("{}:{}", parts[0], path_parts[0]), 110 - format!("/{}", path_parts[1]), 111 - ) 112 - } else { 113 - (host.to_string(), String::new()) 114 - } 115 - } else { 116 - (host.to_string(), String::new()) 117 - } 118 - }; 119 - 120 - let scheme = 121 - if host.starts_with("localhost") || host.starts_with("127.0.0.1") || host.contains(':') 122 - { 123 - "http" 124 - } else { 125 - "https" 126 - }; 127 - 128 - let url = if path.is_empty() { 129 - format!("{}://{}/.well-known/did.json", scheme, host) 130 - } else { 131 - format!("{}://{}{}/did.json", scheme, host, path) 132 - }; 133 - 134 - Ok(url) 135 - } 136 - 137 - pub async fn resolve_did(&self, did: &str) -> Option<ResolvedService> { 138 - { 139 - let cache = self.did_cache.read().await; 140 - if let Some(cached) = cache.get(did) 141 - && cached.resolved_at.elapsed() < self.cache_ttl 142 - { 143 - return Some(ResolvedService { 144 - url: cached.url.clone(), 145 - did: cached.did.clone(), 146 - }); 147 - } 148 - } 149 - 150 - let resolved = self.resolve_did_internal(did).await?; 151 - 152 - { 153 - let mut cache = self.did_cache.write().await; 154 - cache.insert( 155 - did.to_string(), 156 - CachedDid { 157 - url: resolved.url.clone(), 158 - did: resolved.did.clone(), 159 - resolved_at: Instant::now(), 160 - }, 161 - ); 162 - } 163 - 164 - Some(resolved) 165 - } 166 - 167 - pub async fn refresh_did(&self, did: &str) -> Option<ResolvedService> { 168 - { 169 - let mut cache = self.did_cache.write().await; 170 - cache.remove(did); 171 - } 172 - self.resolve_did(did).await 173 - } 174 - 175 - async fn resolve_did_internal(&self, did: &str) -> Option<ResolvedService> { 176 - let did_doc = if did.starts_with("did:web:") { 177 - self.resolve_did_web(did).await 178 - } else if did.starts_with("did:plc:") { 179 - self.resolve_did_plc(did).await 180 - } else { 181 - warn!("Unsupported DID method: {}", did); 182 - return None; 183 - }; 184 - 185 - let doc = match did_doc { 186 - Ok(doc) => doc, 187 - Err(e) => { 188 - error!("Failed to resolve DID {}: {}", did, e); 189 - return None; 190 - } 191 - }; 192 - 193 - self.extract_service_endpoint(&doc) 194 - } 195 - 196 - async fn resolve_did_web(&self, did: &str) -> Result<DidDocument, DidResolutionError> { 197 - let url = Self::build_did_web_url(did)?; 198 - 199 - debug!("Resolving did:web {} via {}", did, url); 200 - 201 - let resp = self 202 - .client 203 - .get(&url) 204 - .send() 205 - .await 206 - .map_err(|e| DidResolutionError::HttpFailed(e.to_string()))?; 207 - 208 - if !resp.status().is_success() { 209 - return Err(DidResolutionError::HttpFailed(format!( 210 - "HTTP {}", 211 - resp.status() 212 - ))); 213 - } 214 - 215 - resp.json::<DidDocument>() 216 - .await 217 - .map_err(|e| DidResolutionError::InvalidDocument(e.to_string())) 218 - } 219 - 220 - async fn resolve_did_plc(&self, did: &str) -> Result<DidDocument, DidResolutionError> { 221 - let url = format!("{}/{}", self.plc_directory_url, urlencoding::encode(did)); 222 - 223 - debug!("Resolving did:plc {} via {}", did, url); 224 - 225 - let resp = self 226 - .client 227 - .get(&url) 228 - .send() 229 - .await 230 - .map_err(|e| DidResolutionError::HttpFailed(e.to_string()))?; 231 - 232 - if resp.status() == reqwest::StatusCode::NOT_FOUND { 233 - return Err(DidResolutionError::NotFound); 234 - } 235 - 236 - if !resp.status().is_success() { 237 - return Err(DidResolutionError::HttpFailed(format!( 238 - "HTTP {}", 239 - resp.status() 240 - ))); 241 - } 242 - 243 - resp.json::<DidDocument>() 244 - .await 245 - .map_err(|e| DidResolutionError::InvalidDocument(e.to_string())) 246 - } 247 - 248 - fn extract_service_endpoint(&self, doc: &DidDocument) -> Option<ResolvedService> { 249 - if let Some(service) = doc.service.iter().find(|s| { 250 - s.service_type == crate::plc::ServiceType::AppView.as_str() 251 - || s.id.contains("atproto_appview") 252 - || s.id.ends_with("#bsky_appview") 253 - }) { 254 - return Some(ResolvedService { 255 - url: service.service_endpoint.clone(), 256 - did: doc.id.clone(), 257 - }); 258 - } 259 - 260 - if let Some(service) = doc 261 - .service 262 - .iter() 263 - .find(|s| s.service_type.contains("AppView") || s.id.contains("appview")) 264 - { 265 - return Some(ResolvedService { 266 - url: service.service_endpoint.clone(), 267 - did: doc.id.clone(), 268 - }); 269 - } 270 - 271 - if let Some(service) = doc.service.first() 272 - && service.service_endpoint.starts_with("http") 273 - { 274 - warn!( 275 - "No explicit AppView service found for {}, using first service: {}", 276 - doc.id, service.service_endpoint 277 - ); 278 - return Some(ResolvedService { 279 - url: service.service_endpoint.clone(), 280 - did: doc.id.clone(), 281 - }); 282 - } 283 - 284 - if doc.id.starts_with("did:web:") { 285 - let host = doc.id.strip_prefix("did:web:")?; 286 - let decoded_host = host.replace("%3A", ":"); 287 - let base_host = decoded_host.split('/').next()?; 288 - let scheme = if base_host.starts_with("localhost") 289 - || base_host.starts_with("127.0.0.1") 290 - || base_host.contains(':') 291 - { 292 - "http" 293 - } else { 294 - "https" 295 - }; 296 - warn!( 297 - "No service found for {}, deriving URL from DID: {}://{}", 298 - doc.id, scheme, base_host 299 - ); 300 - return Some(ResolvedService { 301 - url: format!("{}://{}", scheme, base_host), 302 - did: doc.id.clone(), 303 - }); 304 - } 305 - 306 - None 307 - } 308 - 309 - pub async fn resolve_did_document(&self, did: &str) -> Option<serde_json::Value> { 310 - { 311 - let cache = self.did_doc_cache.read().await; 312 - if let Some(cached) = cache.get(did) 313 - && cached.resolved_at.elapsed() < self.cache_ttl 314 - { 315 - return Some(cached.document.clone()); 316 - } 317 - } 318 - 319 - let result = if did.starts_with("did:web:") { 320 - self.fetch_did_document_web(did).await 321 - } else if did.starts_with("did:plc:") { 322 - self.fetch_did_document_plc(did).await 323 - } else { 324 - warn!("Unsupported DID method for document resolution: {}", did); 325 - return None; 326 - }; 327 - 328 - match result { 329 - Ok(doc) => { 330 - let mut cache = self.did_doc_cache.write().await; 331 - cache.insert( 332 - did.to_string(), 333 - CachedDidDocument { 334 - document: doc.clone(), 335 - resolved_at: Instant::now(), 336 - }, 337 - ); 338 - Some(doc) 339 - } 340 - Err(e) => { 341 - warn!("Failed to resolve DID document for {}: {}", did, e); 342 - None 343 - } 344 - } 345 - } 346 - 347 - async fn fetch_did_document_web( 348 - &self, 349 - did: &str, 350 - ) -> Result<serde_json::Value, DidResolutionError> { 351 - let url = Self::build_did_web_url(did)?; 352 - 353 - let resp = self 354 - .client 355 - .get(&url) 356 - .send() 357 - .await 358 - .map_err(|e| DidResolutionError::HttpFailed(e.to_string()))?; 359 - 360 - if !resp.status().is_success() { 361 - return Err(DidResolutionError::HttpFailed(format!( 362 - "HTTP {}", 363 - resp.status() 364 - ))); 365 - } 366 - 367 - resp.json::<serde_json::Value>() 368 - .await 369 - .map_err(|e| DidResolutionError::InvalidDocument(e.to_string())) 370 - } 371 - 372 - async fn fetch_did_document_plc( 373 - &self, 374 - did: &str, 375 - ) -> Result<serde_json::Value, DidResolutionError> { 376 - let url = format!("{}/{}", self.plc_directory_url, urlencoding::encode(did)); 377 - 378 - let resp = self 379 - .client 380 - .get(&url) 381 - .send() 382 - .await 383 - .map_err(|e| DidResolutionError::HttpFailed(e.to_string()))?; 384 - 385 - if resp.status() == reqwest::StatusCode::NOT_FOUND { 386 - return Err(DidResolutionError::NotFound); 387 - } 388 - 389 - if !resp.status().is_success() { 390 - return Err(DidResolutionError::HttpFailed(format!( 391 - "HTTP {}", 392 - resp.status() 393 - ))); 394 - } 395 - 396 - resp.json::<serde_json::Value>() 397 - .await 398 - .map_err(|e| DidResolutionError::InvalidDocument(e.to_string())) 399 - } 400 - 401 - pub async fn invalidate_cache(&self, did: &str) { 402 - let mut cache = self.did_cache.write().await; 403 - cache.remove(did); 404 - drop(cache); 405 - let mut doc_cache = self.did_doc_cache.write().await; 406 - doc_cache.remove(did); 407 - } 408 - } 409 - 410 - impl Default for DidResolver { 411 - fn default() -> Self { 412 - Self::new() 413 - } 414 - } 415 - 416 - pub fn create_did_resolver() -> Arc<DidResolver> { 417 - Arc::new(DidResolver::new()) 418 - }
+21 -8
crates/tranquil-pds/src/delegation/mod.rs
··· 10 10 }; 11 11 pub use tranquil_db_traits::DelegationActionType; 12 12 13 + use crate::did::DidResolutionError; 13 14 use crate::state::AppState; 14 15 use crate::types::Did; 15 16 ··· 24 25 pub is_local: bool, 25 26 } 26 27 27 - pub async fn resolve_identity(state: &AppState, did: &Did) -> Option<ResolvedIdentity> { 28 + pub async fn resolve_identity( 29 + state: &AppState, 30 + did: &Did, 31 + ) -> Result<ResolvedIdentity, DidResolutionError> { 28 32 let is_local = state 29 33 .repos.user 30 34 .get_by_did(did) ··· 33 37 .flatten() 34 38 .is_some(); 35 39 36 - let did_doc = state 37 - .did_resolver 38 - .resolve_did_document(did.as_str()) 39 - .await?; 40 + let did_doc = state.did_resolver.resolve_did(did.as_str()).await?; 40 41 41 - let pds_url = tranquil_types::did_doc::extract_pds_endpoint(&did_doc); 42 - let handle = tranquil_types::did_doc::extract_handle(&did_doc); 42 + let pds_url = did_doc.services.iter().find_map(|svc| { 43 + if (svc.id == "#atproto_pds" || svc.id.ends_with("#atproto_pds")) 44 + && svc.service_type == "AtprotoPersonalDataServer" 45 + { 46 + Some(svc.service_endpoint.clone()) 47 + } else { 48 + None 49 + } 50 + }); 51 + let handle = did_doc.also_known_as.iter().find_map(|alias| { 52 + alias 53 + .strip_prefix("at://") 54 + .and_then(|s| Some(s.to_string())) 55 + }); 43 56 44 - Some(ResolvedIdentity { 57 + Ok(ResolvedIdentity { 45 58 did: did.clone(), 46 59 handle, 47 60 pds_url,
+378
crates/tranquil-pds/src/did.rs
··· 1 + use reqwest::Client; 2 + use serde::{Deserialize, Serialize}; 3 + use std::collections::HashMap; 4 + use std::sync::Arc; 5 + use std::time::{Duration, Instant}; 6 + use tokio::sync::RwLock; 7 + use tracing::{debug, error, info, warn}; 8 + 9 + #[derive(Debug, thiserror::Error)] 10 + pub enum DidResolutionError { 11 + #[error("Unsupported DID method: \"{0}\". Only did:web and did:plc are allowed in atproto")] 12 + UnsupportedDidMethod(String), 13 + #[error("Invalid did:web format")] 14 + InvalidDidWeb, 15 + #[error("HTTP request failed: {0}")] 16 + HttpFailed(String), 17 + #[error("Invalid DID document: {0}")] 18 + InvalidDocument(String), 19 + #[error("DID not found")] 20 + NotFound, 21 + } 22 + 23 + #[derive(Debug, thiserror::Error)] 24 + pub enum ServiceResolutionError { 25 + #[error("DID resolution failed: {0}")] 26 + DidResolutionFailed(#[from] DidResolutionError), 27 + #[error("Service ID \"{0}\" not found in DID doc")] 28 + ServiceIdNotFound(String), 29 + } 30 + 31 + #[derive(Debug, Clone, Serialize, Deserialize)] 32 + pub struct DidDocument { 33 + pub id: String, 34 + #[serde(default)] 35 + #[serde(rename = "service")] 36 + pub services: Vec<DidService>, 37 + #[serde(default)] 38 + #[serde(rename = "alsoKnownAs")] 39 + pub also_known_as: Vec<String>, 40 + } 41 + 42 + #[derive(Debug, Clone, Serialize, Deserialize)] 43 + #[serde(rename_all = "camelCase")] 44 + pub struct DidService { 45 + pub id: String, 46 + #[serde(rename = "type")] 47 + pub service_type: String, 48 + pub service_endpoint: String, 49 + } 50 + 51 + #[derive(Debug, Clone)] 52 + pub struct ResolvedService { 53 + pub url: String, 54 + pub did: String, 55 + pub service_id: String, 56 + } 57 + 58 + pub struct DidResolver { 59 + did_doc_cache: RwLock<HashMap<Box<str>, (Instant, Arc<serde_json::Value>)>>, 60 + parsed_did_doc_cache: RwLock<HashMap<Box<str>, (Instant, Arc<DidDocument>)>>, 61 + service_cache: RwLock<HashMap<Box<str>, (Instant, Arc<ResolvedService>)>>, 62 + client: Client, 63 + cache_ttl: Duration, 64 + plc_directory_url: String, 65 + } 66 + 67 + impl DidResolver { 68 + pub fn new() -> Self { 69 + let cfg = tranquil_config::get(); 70 + let cache_ttl_secs = cfg.plc.did_cache_ttl_secs; 71 + 72 + let plc_directory_url = cfg.plc.directory_url.clone(); 73 + 74 + let client = Client::builder() 75 + .timeout(Duration::from_secs(10)) 76 + .connect_timeout(Duration::from_secs(5)) 77 + .pool_max_idle_per_host(10) 78 + .build() 79 + .unwrap_or_else(|_| Client::new()); 80 + 81 + info!("DID resolver initialized"); 82 + 83 + Self { 84 + did_doc_cache: RwLock::new(HashMap::new()), 85 + parsed_did_doc_cache: RwLock::new(HashMap::new()), 86 + service_cache: RwLock::new(HashMap::new()), 87 + client, 88 + cache_ttl: Duration::from_secs(cache_ttl_secs), 89 + plc_directory_url, 90 + } 91 + } 92 + 93 + pub async fn resolve_service( 94 + &self, 95 + did: &str, 96 + service_id: &str, 97 + ) -> Result<Arc<ResolvedService>, ServiceResolutionError> { 98 + { 99 + let cache = self.service_cache.read().await; 100 + if let Some(cached) = cache.get(&*format!("{did}#{service_id}")) 101 + && cached.0.elapsed() < self.cache_ttl 102 + { 103 + return Ok(cached.1.clone()); 104 + } 105 + } 106 + 107 + let did_doc = self.resolve_did(did).await?; 108 + let Some(service) = did_doc 109 + .services 110 + .iter() 111 + .find(|s| s.id.ends_with(&format!("#{service_id}"))) 112 + else { 113 + return Err(ServiceResolutionError::ServiceIdNotFound(service_id.into())); 114 + }; 115 + 116 + let resolved = Arc::new(ResolvedService { 117 + url: service.service_endpoint.clone(), 118 + did: did.into(), 119 + service_id: service_id.into(), 120 + }); 121 + 122 + { 123 + let mut cache = self.service_cache.write().await; 124 + cache.insert( 125 + format!("{did}#{service_id}").into(), 126 + (Instant::now(), resolved.clone()), 127 + ); 128 + } 129 + 130 + Ok(resolved) 131 + } 132 + 133 + pub async fn resolve_did(&self, did: &str) -> Result<Arc<DidDocument>, DidResolutionError> { 134 + { 135 + let cache = self.parsed_did_doc_cache.read().await; 136 + if let Some(cached) = cache.get(did) 137 + && cached.0.elapsed() < self.cache_ttl 138 + { 139 + return Ok(cached.1.clone()); 140 + } 141 + } 142 + 143 + let resolved = Arc::new(self.resolve_did_uncached(did).await?); 144 + 145 + { 146 + let mut cache = self.parsed_did_doc_cache.write().await; 147 + cache.insert(did.into(), (Instant::now(), resolved.clone())); 148 + } 149 + 150 + Ok(resolved) 151 + } 152 + 153 + pub async fn refresh_did(&self, did: &str) -> Result<Arc<DidDocument>, DidResolutionError> { 154 + { 155 + let mut cache = self.parsed_did_doc_cache.write().await; 156 + cache.remove(did); 157 + let mut cache = self.service_cache.write().await; 158 + cache.retain(|k, _| !k.starts_with(did)); 159 + } 160 + self.resolve_did(did).await 161 + } 162 + 163 + async fn resolve_did_uncached(&self, did: &str) -> Result<DidDocument, DidResolutionError> { 164 + if did.starts_with("did:web:") { 165 + self.resolve_did_web(did).await 166 + } else if did.starts_with("did:plc:") { 167 + self.resolve_did_plc(did).await 168 + } else { 169 + warn!("Unsupported DID method: {}", did); 170 + Err(DidResolutionError::UnsupportedDidMethod(did.into())) 171 + } 172 + } 173 + 174 + async fn resolve_did_web(&self, did: &str) -> Result<DidDocument, DidResolutionError> { 175 + let url = build_did_web_url(did)?; 176 + 177 + debug!("Resolving did:web {} via {}", did, url); 178 + 179 + let resp = self 180 + .client 181 + .get(&url) 182 + .send() 183 + .await 184 + .map_err(|e| DidResolutionError::HttpFailed(e.to_string()))?; 185 + 186 + if !resp.status().is_success() { 187 + return Err(DidResolutionError::HttpFailed(format!( 188 + "HTTP {}", 189 + resp.status() 190 + ))); 191 + } 192 + 193 + resp.json::<DidDocument>() 194 + .await 195 + .map_err(|e| DidResolutionError::InvalidDocument(e.to_string())) 196 + } 197 + 198 + async fn resolve_did_plc(&self, did: &str) -> Result<DidDocument, DidResolutionError> { 199 + let url = format!("{}/{}", self.plc_directory_url, urlencoding::encode(did)); 200 + 201 + debug!("Resolving did:plc {} via {}", did, url); 202 + 203 + let resp = self 204 + .client 205 + .get(&url) 206 + .send() 207 + .await 208 + .map_err(|e| DidResolutionError::HttpFailed(e.to_string()))?; 209 + 210 + if resp.status() == reqwest::StatusCode::NOT_FOUND { 211 + return Err(DidResolutionError::NotFound); 212 + } 213 + 214 + if !resp.status().is_success() { 215 + return Err(DidResolutionError::HttpFailed(format!( 216 + "HTTP {}", 217 + resp.status() 218 + ))); 219 + } 220 + 221 + resp.json::<DidDocument>() 222 + .await 223 + .map_err(|e| DidResolutionError::InvalidDocument(e.to_string())) 224 + } 225 + 226 + pub async fn fetch_did_document( 227 + &self, 228 + did: &str, 229 + ) -> Result<Arc<serde_json::Value>, DidResolutionError> { 230 + { 231 + let cache = self.did_doc_cache.read().await; 232 + if let Some(cached) = cache.get(did) 233 + && cached.0.elapsed() < self.cache_ttl 234 + { 235 + return Ok(cached.1.clone()); 236 + } 237 + } 238 + 239 + let resolved = Arc::new(self.fetch_did_document_uncached(did).await?); 240 + 241 + { 242 + let mut cache = self.did_doc_cache.write().await; 243 + cache.insert(did.into(), (Instant::now(), resolved.clone())); 244 + } 245 + 246 + Ok(resolved) 247 + } 248 + 249 + // TODO: make cached version 250 + async fn fetch_did_document_uncached( 251 + &self, 252 + did: &str, 253 + ) -> Result<serde_json::Value, DidResolutionError> { 254 + if did.starts_with("did:web:") { 255 + self.fetch_did_document_web(did).await 256 + } else if did.starts_with("did:plc:") { 257 + self.fetch_did_document_plc(did).await 258 + } else { 259 + warn!("Unsupported DID method: {}", did); 260 + Err(DidResolutionError::UnsupportedDidMethod(did.into())) 261 + } 262 + } 263 + 264 + async fn fetch_did_document_web( 265 + &self, 266 + did: &str, 267 + ) -> Result<serde_json::Value, DidResolutionError> { 268 + let url = build_did_web_url(did)?; 269 + 270 + let resp = self 271 + .client 272 + .get(&url) 273 + .send() 274 + .await 275 + .map_err(|e| DidResolutionError::HttpFailed(e.to_string()))?; 276 + 277 + if !resp.status().is_success() { 278 + return Err(DidResolutionError::HttpFailed(format!( 279 + "HTTP {}", 280 + resp.status() 281 + ))); 282 + } 283 + 284 + resp.json::<serde_json::Value>() 285 + .await 286 + .map_err(|e| DidResolutionError::InvalidDocument(e.to_string())) 287 + } 288 + 289 + async fn fetch_did_document_plc( 290 + &self, 291 + did: &str, 292 + ) -> Result<serde_json::Value, DidResolutionError> { 293 + let url = format!("{}/{}", self.plc_directory_url, urlencoding::encode(did)); 294 + 295 + let resp = self 296 + .client 297 + .get(&url) 298 + .send() 299 + .await 300 + .map_err(|e| DidResolutionError::HttpFailed(e.to_string()))?; 301 + 302 + if resp.status() == reqwest::StatusCode::NOT_FOUND { 303 + return Err(DidResolutionError::NotFound); 304 + } 305 + 306 + if !resp.status().is_success() { 307 + return Err(DidResolutionError::HttpFailed(format!( 308 + "HTTP {}", 309 + resp.status() 310 + ))); 311 + } 312 + 313 + resp.json::<serde_json::Value>() 314 + .await 315 + .map_err(|e| DidResolutionError::InvalidDocument(e.to_string())) 316 + } 317 + 318 + pub async fn invalidate_cache(&self, did: &str) { 319 + let mut doc_cache = self.parsed_did_doc_cache.write().await; 320 + doc_cache.remove(did); 321 + } 322 + } 323 + 324 + impl Default for DidResolver { 325 + fn default() -> Self { 326 + Self::new() 327 + } 328 + } 329 + 330 + pub fn create_did_resolver() -> Arc<DidResolver> { 331 + Arc::new(DidResolver::new()) 332 + } 333 + 334 + fn build_did_web_url(did: &str) -> Result<String, DidResolutionError> { 335 + let host = did 336 + .strip_prefix("did:web:") 337 + .ok_or(DidResolutionError::InvalidDidWeb)?; 338 + 339 + let (host, path) = if host.contains(':') { 340 + let decoded = host.replace("%3A", ":"); 341 + let parts: Vec<&str> = decoded.splitn(2, '/').collect(); 342 + if parts.len() > 1 { 343 + (parts[0].to_string(), format!("/{}", parts[1])) 344 + } else { 345 + (decoded, String::new()) 346 + } 347 + } else { 348 + let parts: Vec<&str> = host.splitn(2, ':').collect(); 349 + if parts.len() > 1 && parts[1].contains('/') { 350 + let path_parts: Vec<&str> = parts[1].splitn(2, '/').collect(); 351 + if path_parts.len() > 1 { 352 + ( 353 + format!("{}:{}", parts[0], path_parts[0]), 354 + format!("/{}", path_parts[1]), 355 + ) 356 + } else { 357 + (host.to_string(), String::new()) 358 + } 359 + } else { 360 + (host.to_string(), String::new()) 361 + } 362 + }; 363 + 364 + let scheme = 365 + if host.starts_with("localhost") || host.starts_with("127.0.0.1") || host.contains(':') { 366 + "http" 367 + } else { 368 + "https" 369 + }; 370 + 371 + let url = if path.is_empty() { 372 + format!("{}://{}/.well-known/did.json", scheme, host) 373 + } else { 374 + format!("{}://{}{}/did.json", scheme, host, path) 375 + }; 376 + 377 + Ok(url) 378 + }
+1 -1
crates/tranquil-pds/src/lib.rs
··· 1 1 pub mod api; 2 - pub mod appview; 3 2 pub mod auth; 4 3 pub mod cache; 5 4 pub mod cache_keys; ··· 9 8 pub mod config; 10 9 pub mod crawlers; 11 10 pub mod delegation; 11 + pub mod did; 12 12 pub mod handle; 13 13 pub mod image; 14 14 pub mod metrics;
+1 -1
crates/tranquil-pds/src/state.rs
··· 1 - use crate::appview::DidResolver; 2 1 use crate::auth::webauthn::WebAuthnConfig; 3 2 use crate::cache::{Cache, DistributedRateLimiter, create_cache}; 4 3 use crate::circuit_breaker::CircuitBreakers; 5 4 use crate::config::AuthConfig; 5 + use crate::did::DidResolver; 6 6 use crate::oauth::client::CrossPdsOAuthClient; 7 7 use crate::plc::PlcClient; 8 8 use crate::rate_limit::RateLimiters;