A better Rust ATProto crate
102
fork

Configure Feed

Select the types of activity you want to include in your feed.

at pretty-codegen 1659 lines 62 kB view raw
1//! # Stateless XRPC utilities and request/response mapping 2//! 3//! Mapping overview: 4//! - Success (2xx): parse body into the endpoint's typed output. 5//! - 400: try typed error; on failure, fall back to a generic XRPC error (with 6//! `nsid`, `method`, and `http_status`) and map common auth errors. 7//! - 401: if `WWW-Authenticate` is present, return 8//! `ClientError::Auth(AuthError::Other(header))` so higher layers (OAuth/DPoP) 9//! can inspect `error="invalid_token"` or `error="use_dpop_nonce"` and refresh/retry. 10//! If the header is absent, parse the body and map auth errors to 11//! `AuthError::TokenExpired`/`InvalidToken`. 12 13#[cfg(feature = "streaming")] 14pub mod streaming; 15 16/// Hand-written XRPC types for com.atproto endpoints (bootstrap types). 17pub mod atproto; 18 19use alloc::borrow::ToOwned; 20use alloc::boxed::Box; 21use alloc::string::{String, ToString}; 22use alloc::vec::Vec; 23use ipld_core::ipld::Ipld; 24#[cfg(feature = "streaming")] 25pub use streaming::{ 26 StreamingResponse, XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp, 27}; 28 29#[cfg(feature = "websocket")] 30pub mod subscription; 31 32#[cfg(feature = "streaming")] 33use crate::StreamError; 34use crate::error::DecodeError; 35use crate::http_client::HttpClient; 36#[cfg(feature = "streaming")] 37use crate::http_client::HttpClientExt; 38use crate::types::value::Data; 39use crate::{AuthorizationToken, error::AuthError}; 40use crate::{CowStr, error::XrpcResult}; 41use crate::{IntoStatic, types::value::RawData}; 42use bytes::Bytes; 43use core::error::Error; 44use core::fmt::{self, Debug}; 45use core::marker::PhantomData; 46use http::{ 47 HeaderName, HeaderValue, Request, StatusCode, 48 header::{AUTHORIZATION, CONTENT_TYPE}, 49}; 50use serde::{Deserialize, Serialize}; 51use smol_str::SmolStr; 52 53use crate::deps::fluent_uri::Uri; 54#[cfg(feature = "websocket")] 55pub use subscription::{ 56 BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient, 57 SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp, 58 SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription, 59}; 60 61/// Normalize a base URI by removing trailing slashes. 62/// 63/// This is useful for XRPC clients where the base URI might be provided with 64/// a trailing slash (e.g., "<https://bsky.social/>") but needs to be normalized 65/// for consistent path building. Since trimming a trailing slash from a valid URI 66/// always yields a valid URI, the result is guaranteed to be valid. 67pub fn normalize_base_uri(uri: Uri<String>) -> Uri<String> { 68 let s = uri.as_str(); 69 if s.ends_with('/') && s.len() > 1 { 70 let trimmed = s.trim_end_matches('/'); 71 // Invariant: trimming trailing slashes from a valid URI always yields a valid URI. 72 Uri::parse(trimmed.to_string()) 73 .expect("trimming trailing slash from valid URI yields valid URI") 74 } else { 75 uri 76 } 77} 78 79/// Error type for encoding XRPC requests 80#[derive(Debug, thiserror::Error)] 81#[cfg_attr(feature = "std", derive(miette::Diagnostic))] 82#[non_exhaustive] 83pub enum EncodeError { 84 /// Failed to serialize query parameters 85 #[error("Failed to serialize query: {0}")] 86 Query( 87 #[from] 88 #[source] 89 serde_html_form::ser::Error, 90 ), 91 /// Failed to serialize JSON body 92 #[error("Failed to serialize JSON: {0}")] 93 Json( 94 #[from] 95 #[source] 96 serde_json::Error, 97 ), 98 /// Other encoding error 99 #[error("Encoding error: {0}")] 100 Other(String), 101} 102 103/// XRPC method type 104#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 105pub enum XrpcMethod { 106 /// Query (HTTP GET) 107 Query, 108 /// Procedure (HTTP POST) 109 Procedure(&'static str), 110} 111 112impl XrpcMethod { 113 /// Get the HTTP method string 114 pub const fn as_str(&self) -> &'static str { 115 match self { 116 Self::Query => "GET", 117 Self::Procedure(_) => "POST", 118 } 119 } 120 121 /// Get the body encoding type for this method (procedures only) 122 pub const fn body_encoding(&self) -> Option<&'static str> { 123 match self { 124 Self::Query => None, 125 Self::Procedure(enc) => Some(enc), 126 } 127 } 128} 129 130/// Trait for XRPC request types (queries and procedures) 131/// 132/// This trait provides metadata about XRPC endpoints including the NSID, 133/// HTTP method, encoding, and associated output type. 134/// 135/// The trait is implemented on the request parameters/input type itself. 136pub trait XrpcRequest: Serialize { 137 /// The NSID for this XRPC method 138 const NSID: &'static str; 139 140 /// XRPC method (query/GET or procedure/POST) 141 const METHOD: XrpcMethod; 142 143 /// Response type returned from the XRPC call (marker struct) 144 type Response: XrpcResp; 145 146 /// Encode the request body for procedures. 147 /// 148 /// Default implementation serializes to JSON. Override for non-JSON encodings. 149 fn encode_body(&self) -> Result<Vec<u8>, EncodeError> { 150 Ok(serde_json::to_vec(self)?) 151 } 152 153 /// Decode the request body for procedures. 154 /// 155 /// Default implementation deserializes from JSON. Override for non-JSON encodings. 156 fn decode_body<'de>(body: &'de [u8]) -> Result<Box<Self>, DecodeError> 157 where 158 Self: Deserialize<'de>, 159 { 160 let body: Self = serde_json::from_slice(body)?; 161 162 Ok(Box::new(body)) 163 } 164} 165 166/// Trait for XRPC Response types 167/// 168/// It mirrors the NSID and carries the encoding types as well as Output (success) and Err types 169pub trait XrpcResp { 170 /// The NSID for this XRPC method 171 const NSID: &'static str; 172 173 /// Output encoding (MIME type) 174 const ENCODING: &'static str; 175 176 /// Response output type 177 type Output<'de>: Serialize + Deserialize<'de> + IntoStatic; 178 179 /// Error type for this request 180 type Err<'de>: Error + Deserialize<'de> + Serialize + IntoStatic; 181 182 /// Output body encoding function, similar to the request-side type 183 fn encode_output(output: &Self::Output<'_>) -> Result<Vec<u8>, EncodeError> { 184 Ok(serde_json::to_vec(output)?) 185 } 186 187 /// Decode the response output body. 188 /// 189 /// Default implementation deserializes from JSON. Override for non-JSON encodings. 190 fn decode_output<'de>(body: &'de [u8]) -> core::result::Result<Self::Output<'de>, DecodeError> 191 where 192 Self::Output<'de>: Deserialize<'de>, 193 { 194 let body = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?; 195 Ok(body) 196 } 197} 198 199/// XRPC server endpoint trait 200/// 201/// Defines the fully-qualified path and method, as well as request and response types 202/// This exists primarily to work around lifetime issues for crates like Axum 203/// by moving the lifetime from the trait itself into an associated type. 204/// 205/// It is implemented by the code generation on a marker struct, like the client-side [XrpcResp] trait. 206pub trait XrpcEndpoint { 207 /// Fully-qualified path ('/xrpc/\[nsid\]') where this endpoint should live on the server 208 const PATH: &'static str; 209 /// XRPC method (query/GET or procedure/POST) 210 const METHOD: XrpcMethod; 211 /// XRPC Request data type 212 type Request<'de>: XrpcRequest + Deserialize<'de> + IntoStatic; 213 /// XRPC Response data type 214 type Response: XrpcResp; 215} 216 217/// Error type for XRPC endpoints that don't define any errors 218#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] 219pub struct GenericError<'a>(#[serde(borrow)] Data<'a>); 220 221impl<'de> fmt::Display for GenericError<'de> { 222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 223 self.0.fmt(f) 224 } 225} 226 227impl Error for GenericError<'_> {} 228 229impl IntoStatic for GenericError<'_> { 230 type Output = GenericError<'static>; 231 fn into_static(self) -> Self::Output { 232 GenericError(self.0.into_static()) 233 } 234} 235 236/// Per-request options for XRPC calls. 237#[derive(Debug, Default, Clone)] 238pub struct CallOptions<'a> { 239 /// Optional Authorization to apply (`Bearer` or `DPoP`). 240 pub auth: Option<AuthorizationToken<'a>>, 241 /// `atproto-proxy` header value. 242 pub atproto_proxy: Option<CowStr<'a>>, 243 /// `atproto-accept-labelers` header values. 244 pub atproto_accept_labelers: Option<Vec<CowStr<'a>>>, 245 /// Extra headers to attach to this request. 246 pub extra_headers: Vec<(HeaderName, HeaderValue)>, 247} 248 249impl IntoStatic for CallOptions<'_> { 250 type Output = CallOptions<'static>; 251 252 fn into_static(self) -> Self::Output { 253 CallOptions { 254 auth: self.auth.map(|auth| auth.into_static()), 255 atproto_proxy: self.atproto_proxy.map(|proxy| proxy.into_static()), 256 atproto_accept_labelers: self 257 .atproto_accept_labelers 258 .map(|labelers| labelers.into_static()), 259 extra_headers: self.extra_headers, 260 } 261 } 262} 263 264/// Extension for stateless XRPC calls on any `HttpClient`. 265/// 266/// Example 267/// ```no_run 268/// # #[tokio::main] 269/// # async fn main() -> Result<(), Box<dyn std::error::Error>> { 270/// use jacquard_common::xrpc::XrpcExt; 271/// use jacquard_common::http_client::HttpClient; 272/// use jacquard_common::deps::fluent_uri::Uri; 273/// 274/// let http = reqwest::Client::new(); 275/// let base = Uri::parse("https://public.api.bsky.app").unwrap().to_owned(); 276/// // let resp = http.xrpc(base).send(&request).await?; 277/// # Ok(()) 278/// # } 279/// ``` 280pub trait XrpcExt: HttpClient { 281 /// Start building an XRPC call for the given base URI. 282 fn xrpc<'a>(&'a self, base: Uri<String>) -> XrpcCall<'a, Self> 283 where 284 Self: Sized, 285 { 286 XrpcCall { 287 client: self, 288 base, 289 opts: CallOptions::default(), 290 } 291 } 292} 293 294impl<T: HttpClient> XrpcExt for T {} 295 296/// Nicer alias for Xrpc response type 297pub type XrpcResponse<R> = Response<<R as XrpcRequest>::Response>; 298 299/// Stateful XRPC call trait 300#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))] 301pub trait XrpcClient: HttpClient { 302 /// Get the base URI for the client. 303 fn base_uri(&self) -> impl Future<Output = Uri<String>>; 304 305 /// Set the base URI for the client. 306 /// 307 /// The implementation should strip any trailing slash from the URI path before storing. 308 fn set_base_uri(&self, uri: Uri<String>) -> impl Future<Output = ()> { 309 let _ = uri; 310 async {} 311 } 312 313 /// Get the call options for the client. 314 fn opts(&self) -> impl Future<Output = CallOptions<'_>> { 315 async { CallOptions::default() } 316 } 317 318 /// Set the call options for the client. 319 fn set_opts(&self, opts: CallOptions) -> impl Future<Output = ()> { 320 let _ = opts; 321 async {} 322 } 323 324 /// Send an XRPC request and parse the response 325 #[cfg(not(target_arch = "wasm32"))] 326 fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>> 327 where 328 R: XrpcRequest + Send + Sync, 329 <R as XrpcRequest>::Response: Send + Sync, 330 Self: Sync; 331 332 /// Send an XRPC request and parse the response 333 #[cfg(target_arch = "wasm32")] 334 fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>> 335 where 336 R: XrpcRequest + Send + Sync, 337 <R as XrpcRequest>::Response: Send + Sync; 338 339 /// Send an XRPC request and parse the response 340 #[cfg(not(target_arch = "wasm32"))] 341 fn send_with_opts<R>( 342 &self, 343 request: R, 344 opts: CallOptions<'_>, 345 ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>> 346 where 347 R: XrpcRequest + Send + Sync, 348 <R as XrpcRequest>::Response: Send + Sync, 349 Self: Sync; 350 351 /// Send an XRPC request with custom options and parse the response 352 #[cfg(target_arch = "wasm32")] 353 fn send_with_opts<R>( 354 &self, 355 request: R, 356 opts: CallOptions<'_>, 357 ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>> 358 where 359 R: XrpcRequest + Send + Sync, 360 <R as XrpcRequest>::Response: Send + Sync; 361} 362 363/// Stateful XRPC streaming client trait 364#[cfg(feature = "streaming")] 365pub trait XrpcStreamingClient: XrpcClient + HttpClientExt { 366 /// Send an XRPC request and stream the response 367 #[cfg(not(target_arch = "wasm32"))] 368 fn download<R>( 369 &self, 370 request: R, 371 ) -> impl Future<Output = Result<StreamingResponse, StreamError>> + Send 372 where 373 R: XrpcRequest + Send + Sync, 374 <R as XrpcRequest>::Response: Send + Sync, 375 Self: Sync; 376 377 /// Send an XRPC request and stream the response 378 #[cfg(target_arch = "wasm32")] 379 fn download<R>( 380 &self, 381 request: R, 382 ) -> impl Future<Output = Result<StreamingResponse, StreamError>> 383 where 384 R: XrpcRequest + Send + Sync, 385 <R as XrpcRequest>::Response: Send + Sync; 386 387 /// Stream an XRPC procedure call and its response 388 #[cfg(not(target_arch = "wasm32"))] 389 fn stream<S>( 390 &self, 391 stream: XrpcProcedureSend<S::Frame<'static>>, 392 ) -> impl Future< 393 Output = Result< 394 XrpcResponseStream< 395 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>, 396 >, 397 StreamError, 398 >, 399 > 400 where 401 S: XrpcProcedureStream + 'static, 402 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp, 403 Self: Sync; 404 405 /// Stream an XRPC procedure call and its response 406 #[cfg(target_arch = "wasm32")] 407 fn stream<S>( 408 &self, 409 stream: XrpcProcedureSend<S::Frame<'static>>, 410 ) -> impl Future< 411 Output = Result< 412 XrpcResponseStream< 413 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>, 414 >, 415 StreamError, 416 >, 417 > 418 where 419 S: XrpcProcedureStream + 'static, 420 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp; 421} 422 423/// Stateless XRPC call builder. 424/// 425/// Example (per-request overrides) 426/// ```no_run 427/// # #[tokio::main] 428/// # async fn main() -> Result<(), Box<dyn std::error::Error>> { 429/// use jacquard_common::xrpc::XrpcExt; 430/// use jacquard_common::{AuthorizationToken, CowStr}; 431/// use jacquard_common::deps::fluent_uri::Uri; 432/// 433/// let http = reqwest::Client::new(); 434/// let base = Uri::parse("https://public.api.bsky.app").unwrap().to_owned(); 435/// let call = http 436/// .xrpc(base) 437/// .auth(AuthorizationToken::Bearer(CowStr::from("ACCESS_JWT"))) 438/// .accept_labelers(vec![CowStr::from("did:plc:labelerid")]) 439/// .header(http::header::USER_AGENT, http::HeaderValue::from_static("jacquard-example")); 440/// // let resp = call.send(&request).await?; 441/// # Ok(()) 442/// # } 443/// ``` 444pub struct XrpcCall<'a, C: HttpClient> { 445 pub(crate) client: &'a C, 446 pub(crate) base: Uri<String>, 447 pub(crate) opts: CallOptions<'a>, 448} 449 450impl<'a, C: HttpClient> XrpcCall<'a, C> { 451 /// Apply Authorization to this call. 452 pub fn auth(mut self, token: AuthorizationToken<'a>) -> Self { 453 self.opts.auth = Some(token); 454 self 455 } 456 /// Set `atproto-proxy` header for this call. 457 pub fn proxy(mut self, proxy: CowStr<'a>) -> Self { 458 self.opts.atproto_proxy = Some(proxy); 459 self 460 } 461 /// Set `atproto-accept-labelers` header(s) for this call. 462 pub fn accept_labelers(mut self, labelers: Vec<CowStr<'a>>) -> Self { 463 self.opts.atproto_accept_labelers = Some(labelers); 464 self 465 } 466 /// Add an extra header. 467 pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self { 468 self.opts.extra_headers.push((name, value)); 469 self 470 } 471 /// Replace the builder's options entirely. 472 pub fn with_options(mut self, opts: CallOptions<'a>) -> Self { 473 self.opts = opts; 474 self 475 } 476 477 /// Send the given typed XRPC request and return a response wrapper. 478 /// 479 /// Note on 401 handling: 480 /// - When the server returns 401 with a `WWW-Authenticate` header, this surfaces as 481 /// `ClientError::Auth(AuthError::Other(header))` so higher layers (e.g., OAuth/DPoP) can 482 /// inspect the header for `error="invalid_token"` or `error="use_dpop_nonce"` and react 483 /// (refresh/retry). If the header is absent, the 401 body flows through to `Response` and 484 /// can be parsed/mapped to `AuthError` as appropriate. 485 #[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip(self, request), fields(nsid = R::NSID)))] 486 pub async fn send<R>(self, request: &R) -> XrpcResult<Response<<R as XrpcRequest>::Response>> 487 where 488 R: XrpcRequest, 489 <R as XrpcRequest>::Response: Send + Sync, 490 { 491 let http_request = build_http_request(&self.base, request, &self.opts)?; 492 493 let http_response = self 494 .client 495 .send_http(http_request) 496 .await 497 .map_err(|e| crate::error::ClientError::transport(e).for_nsid(R::NSID))?; 498 499 process_response(http_response) 500 } 501} 502 503/// Process the HTTP response from the server into a proper xrpc response statelessly. 504/// 505/// Exposed to make things more easily pluggable 506#[inline] 507pub fn process_response<Resp>(http_response: http::Response<Vec<u8>>) -> XrpcResult<Response<Resp>> 508where 509 Resp: XrpcResp, 510{ 511 let status = http_response.status(); 512 513 // If the server returned 401 with a WWW-Authenticate header, expose it so higher layers 514 // (e.g., DPoP handling) can detect `error="invalid_token"` and trigger refresh. 515 #[allow(deprecated)] 516 if status.as_u16() == 401 { 517 if let Some(hv) = http_response.headers().get(http::header::WWW_AUTHENTICATE) { 518 return Err( 519 crate::error::ClientError::auth(crate::error::AuthError::Other(hv.clone())) 520 .for_nsid(Resp::NSID), 521 ); 522 } 523 } 524 let buffer = Bytes::from(http_response.into_body()); 525 526 if !status.is_success() && !matches!(status.as_u16(), 400 | 401) { 527 return Err(crate::error::ClientError::from(crate::error::HttpError { 528 status, 529 body: Some(buffer), 530 }) 531 .for_nsid(Resp::NSID)); 532 } 533 534 Ok(Response::new(buffer, status)) 535} 536 537/// HTTP headers commonly used in XRPC requests 538pub enum Header { 539 /// Content-Type header 540 ContentType, 541 /// Authorization header 542 Authorization, 543 /// `atproto-proxy` header - specifies which service (app server or other atproto service) the user's PDS should forward requests to as appropriate. 544 /// 545 /// See: <https://atproto.com/specs/xrpc#service-proxying> 546 AtprotoProxy, 547 /// `atproto-accept-labelers` header used by clients to request labels from specific labelers to be included and applied in the response. See [label](https://atproto.com/specs/label) specification for details. 548 AtprotoAcceptLabelers, 549} 550 551impl From<Header> for HeaderName { 552 fn from(value: Header) -> Self { 553 match value { 554 Header::ContentType => CONTENT_TYPE, 555 Header::Authorization => AUTHORIZATION, 556 Header::AtprotoProxy => HeaderName::from_static("atproto-proxy"), 557 Header::AtprotoAcceptLabelers => HeaderName::from_static("atproto-accept-labelers"), 558 } 559 } 560} 561 562/// Construct an XRPC endpoint URI from a base URI, NSID, and optional query string. 563/// 564/// This helper: 565/// 1. Extracts scheme and authority from the base URI 566/// 2. Gets the base path (already guaranteed no trailing slash from `set_base_uri`) 567/// 3. Builds new path: `{base_path}/xrpc/{nsid}` 568/// 4. Optionally sets query from serialized parameters 569/// 5. Returns the constructed URI 570fn xrpc_endpoint_uri( 571 base: &Uri<String>, 572 nsid: &str, 573 query: Option<&str>, 574) -> XrpcResult<Uri<String>> { 575 use crate::error::ClientError; 576 577 let base_path = base.path().as_str().trim_end_matches('/'); 578 579 // Calculate approximate capacity: scheme + "://" + authority + base_path + "/xrpc/" + nsid + optional query 580 let capacity = base.scheme().as_str().len() 581 + 3 // "://" 582 + base.authority().map(|a| a.as_str().len()).unwrap_or(0) 583 + base_path.len() 584 + 6 // "/xrpc/" 585 + nsid.len() 586 + query.map(|q| q.len() + 1).unwrap_or(0); // query + "?" 587 588 // Build new path string: {base_path}/xrpc/{nsid} 589 let mut uri_str = String::with_capacity(capacity); 590 uri_str.push_str(base.scheme().as_str()); 591 uri_str.push_str("://"); 592 593 if let Some(authority) = base.authority() { 594 uri_str.push_str(authority.as_str()); 595 } 596 597 uri_str.push_str(base_path); 598 uri_str.push_str("/xrpc/"); 599 uri_str.push_str(nsid); 600 601 if let Some(q) = query { 602 uri_str.push('?'); 603 uri_str.push_str(q); 604 } 605 606 Uri::parse(uri_str) 607 .map(|u| u.to_owned()) 608 .map_err(|_| ClientError::invalid_request("Failed to construct XRPC endpoint URI")) 609} 610 611/// Build an HTTP request for an XRPC call given base URI and options 612pub fn build_http_request<'s, R>( 613 base: &Uri<String>, 614 req: &R, 615 opts: &CallOptions<'_>, 616) -> XrpcResult<Request<Vec<u8>>> 617where 618 R: XrpcRequest, 619{ 620 use crate::error::ClientError; 621 622 // Determine query string for Query methods 623 let query_string = if let XrpcMethod::Query = <R as XrpcRequest>::METHOD { 624 let qs = serde_html_form::to_string(&req).map_err(|e| { 625 ClientError::invalid_request(format!("Failed to serialize query: {}", e)) 626 })?; 627 if !qs.is_empty() { Some(qs) } else { None } 628 } else { 629 None 630 }; 631 632 // Construct the XRPC endpoint URI using the helper 633 let uri = xrpc_endpoint_uri(base, <R as XrpcRequest>::NSID, query_string.as_deref())?; 634 635 let method = match <R as XrpcRequest>::METHOD { 636 XrpcMethod::Query => http::Method::GET, 637 XrpcMethod::Procedure(_) => http::Method::POST, 638 }; 639 640 let mut builder = Request::builder().method(method).uri(uri.as_str()); 641 642 let has_content_type = opts 643 .extra_headers 644 .iter() 645 .any(|(name, _)| name == CONTENT_TYPE); 646 647 if let XrpcMethod::Procedure(encoding) = <R as XrpcRequest>::METHOD { 648 // Only set default Content-Type if not provided in extra_headers 649 if !has_content_type { 650 builder = builder.header(Header::ContentType, encoding); 651 } 652 } 653 let output_encoding = <R::Response as XrpcResp>::ENCODING; 654 builder = builder.header(http::header::ACCEPT, output_encoding); 655 656 if let Some(token) = &opts.auth { 657 let hv = match token { 658 AuthorizationToken::Bearer(t) => { 659 HeaderValue::from_str(&format!("Bearer {}", t.as_ref())) 660 } 661 AuthorizationToken::Dpop(t) => HeaderValue::from_str(&format!("DPoP {}", t.as_ref())), 662 } 663 .map_err(|e| ClientError::invalid_request(format!("Invalid authorization token: {}", e)))?; 664 builder = builder.header(Header::Authorization, hv); 665 } 666 667 if let Some(proxy) = &opts.atproto_proxy { 668 builder = builder.header(Header::AtprotoProxy, proxy.as_ref()); 669 } 670 if let Some(labelers) = &opts.atproto_accept_labelers { 671 if !labelers.is_empty() { 672 let joined = labelers 673 .iter() 674 .map(|s| s.as_ref()) 675 .collect::<Vec<_>>() 676 .join(", "); 677 builder = builder.header(Header::AtprotoAcceptLabelers, joined); 678 } 679 } 680 for (name, value) in &opts.extra_headers { 681 builder = builder.header(name, value); 682 } 683 684 let body = if let XrpcMethod::Procedure(_) = R::METHOD { 685 req.encode_body() 686 .map_err(|e| ClientError::invalid_request(format!("Failed to encode body: {}", e)))? 687 } else { 688 vec![] 689 }; 690 691 builder 692 .body(body) 693 .map_err(|e| ClientError::invalid_request(format!("Failed to build request: {}", e))) 694} 695 696/// XRPC response wrapper that owns the response buffer 697/// 698/// Allows borrowing from the buffer when parsing to avoid unnecessary allocations. 699/// Generic over the response marker type (e.g., `GetAuthorFeedResponse`), not the request. 700pub struct Response<Resp> 701where 702 Resp: XrpcResp, // HRTB: Resp works with any lifetime 703{ 704 _marker: PhantomData<fn() -> Resp>, 705 buffer: Bytes, 706 status: StatusCode, 707} 708 709impl<R> Response<R> 710where 711 R: XrpcResp, 712{ 713 /// Create a new response from a buffer and status code 714 pub fn new(buffer: Bytes, status: StatusCode) -> Self { 715 Self { 716 buffer, 717 status, 718 _marker: PhantomData, 719 } 720 } 721 722 /// Get the HTTP status code 723 pub fn status(&self) -> StatusCode { 724 self.status 725 } 726 727 /// Get the raw buffer 728 pub fn buffer(&self) -> &Bytes { 729 &self.buffer 730 } 731 732 /// Parse the response, borrowing from the internal buffer 733 pub fn parse<'s>(&'s self) -> Result<RespOutput<'s, R>, XrpcError<RespErr<'s, R>>> { 734 // 200: parse as output 735 if self.status.is_success() { 736 match R::decode_output(&self.buffer) { 737 Ok(output) => Ok(output), 738 Err(e) => Err(XrpcError::Decode(e)), 739 } 740 // 400: try typed XRPC error, fallback to generic error 741 } else if self.status.as_u16() == 400 { 742 match serde_json::from_slice::<R::Err<'_>>(&self.buffer) { 743 Ok(error) => { 744 use alloc::string::ToString; 745 if error.to_string().contains("InvalidToken") { 746 Err(XrpcError::Auth(AuthError::InvalidToken)) 747 } else if error.to_string().contains("ExpiredToken") { 748 Err(XrpcError::Auth(AuthError::TokenExpired)) 749 } else { 750 Err(XrpcError::Xrpc(error)) 751 } 752 } 753 Err(_) => { 754 // Fallback to generic error (InvalidRequest, ExpiredToken, etc.) 755 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) { 756 Ok(mut generic) => { 757 generic.nsid = R::NSID; 758 generic.method = ""; // method info only available on request 759 generic.http_status = self.status; 760 // Map auth-related errors to AuthError 761 match generic.error.as_str() { 762 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)), 763 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)), 764 _ => Err(XrpcError::Generic(generic)), 765 } 766 } 767 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))), 768 } 769 } 770 } 771 // 401: always auth error 772 } else { 773 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) { 774 Ok(mut generic) => { 775 generic.nsid = R::NSID; 776 generic.method = ""; // method info only available on request 777 generic.http_status = self.status; 778 match generic.error.as_str() { 779 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)), 780 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)), 781 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)), 782 } 783 } 784 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))), 785 } 786 } 787 } 788 789 /// Parse this as validated, loosely typed atproto data. 790 /// 791 /// NOTE: If the response is an error, it will still parse as the matching error type for the request. 792 pub fn parse_data<'s>(&'s self) -> Result<Data<'s>, XrpcError<RespErr<'s, R>>> { 793 // 200: parse as output 794 if self.status.is_success() { 795 match serde_json::from_slice::<_>(&self.buffer) { 796 Ok(output) => Ok(output), 797 Err(_) => { 798 if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) { 799 if let Ok(data) = Data::from_cbor(&data) { 800 Ok(data.into_static()) 801 } else { 802 Ok(Data::Bytes(self.buffer.clone())) 803 } 804 } else { 805 Ok(Data::Bytes(self.buffer.clone())) 806 } 807 } 808 } 809 // 400: try typed XRPC error, fallback to generic error 810 } else if self.status.as_u16() == 400 { 811 match serde_json::from_slice::<R::Err<'_>>(&self.buffer) { 812 Ok(error) => { 813 use alloc::string::ToString; 814 if error.to_string().contains("InvalidToken") { 815 Err(XrpcError::Auth(AuthError::InvalidToken)) 816 } else if error.to_string().contains("ExpiredToken") { 817 Err(XrpcError::Auth(AuthError::TokenExpired)) 818 } else { 819 Err(XrpcError::Xrpc(error)) 820 } 821 } 822 Err(_) => { 823 // Fallback to generic error (InvalidRequest, ExpiredToken, etc.) 824 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) { 825 Ok(mut generic) => { 826 generic.nsid = R::NSID; 827 generic.method = ""; // method info only available on request 828 generic.http_status = self.status; 829 // Map auth-related errors to AuthError 830 match generic.error.as_str() { 831 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)), 832 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)), 833 _ => Err(XrpcError::Generic(generic)), 834 } 835 } 836 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))), 837 } 838 } 839 } 840 // 401: always auth error 841 } else { 842 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) { 843 Ok(mut generic) => { 844 generic.nsid = R::NSID; 845 generic.method = ""; // method info only available on request 846 generic.http_status = self.status; 847 match generic.error.as_str() { 848 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)), 849 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)), 850 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)), 851 } 852 } 853 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))), 854 } 855 } 856 } 857 858 /// Parse this as raw atproto data with minimal validation. 859 /// 860 /// NOTE: If the response is an error, it will still parse as the matching error type for the request. 861 pub fn parse_raw<'s>(&'s self) -> Result<RawData<'s>, XrpcError<RespErr<'s, R>>> { 862 // 200: parse as output 863 if self.status.is_success() { 864 match serde_json::from_slice::<_>(&self.buffer) { 865 Ok(output) => Ok(output), 866 Err(_) => { 867 if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) { 868 if let Ok(data) = RawData::from_cbor(&data) { 869 Ok(data.into_static()) 870 } else { 871 Ok(RawData::Bytes(self.buffer.clone())) 872 } 873 } else { 874 Ok(RawData::Bytes(self.buffer.clone())) 875 } 876 } 877 } 878 // 400: try typed XRPC error, fallback to generic error 879 } else if self.status.as_u16() == 400 { 880 match serde_json::from_slice::<R::Err<'_>>(&self.buffer) { 881 Ok(error) => { 882 use alloc::string::ToString; 883 if error.to_string().contains("InvalidToken") { 884 Err(XrpcError::Auth(AuthError::InvalidToken)) 885 } else if error.to_string().contains("ExpiredToken") { 886 Err(XrpcError::Auth(AuthError::TokenExpired)) 887 } else { 888 Err(XrpcError::Xrpc(error)) 889 } 890 } 891 Err(_) => { 892 // Fallback to generic error (InvalidRequest, ExpiredToken, etc.) 893 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) { 894 Ok(mut generic) => { 895 generic.nsid = R::NSID; 896 generic.method = ""; // method info only available on request 897 generic.http_status = self.status; 898 // Map auth-related errors to AuthError 899 match generic.error.as_str() { 900 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)), 901 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)), 902 _ => Err(XrpcError::Generic(generic)), 903 } 904 } 905 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))), 906 } 907 } 908 } 909 // 401: always auth error 910 } else { 911 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) { 912 Ok(mut generic) => { 913 generic.nsid = R::NSID; 914 generic.method = ""; // method info only available on request 915 generic.http_status = self.status; 916 match generic.error.as_str() { 917 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)), 918 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)), 919 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)), 920 } 921 } 922 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))), 923 } 924 } 925 } 926 927 /// Reinterpret this response as a different response type. 928 /// 929 /// This transmutes the response by keeping the same buffer and status code, 930 /// but changing the type-level marker. Useful for converting generic XRPC responses 931 /// into collection-specific typed responses. 932 /// 933 /// # Safety 934 /// 935 /// This is safe in the sense that no memory unsafety occurs, but logical correctness 936 /// depends on ensuring the buffer actually contains data that can deserialize to `NEW`. 937 /// Incorrect conversion will cause deserialization errors at runtime. 938 pub fn transmute<NEW: XrpcResp>(self) -> Response<NEW> { 939 Response { 940 buffer: self.buffer, 941 status: self.status, 942 _marker: PhantomData, 943 } 944 } 945} 946 947/// doc 948pub type RespOutput<'a, Resp> = <Resp as XrpcResp>::Output<'a>; 949/// doc 950pub type RespErr<'a, Resp> = <Resp as XrpcResp>::Err<'a>; 951 952impl<R> Response<R> 953where 954 R: XrpcResp, 955{ 956 /// Parse the response into an owned output 957 pub fn into_output(self) -> Result<RespOutput<'static, R>, XrpcError<RespErr<'static, R>>> 958 where 959 for<'a> RespOutput<'a, R>: IntoStatic<Output = RespOutput<'static, R>>, 960 for<'a> RespErr<'a, R>: IntoStatic<Output = RespErr<'static, R>>, 961 { 962 fn parse_error<'b, R: XrpcResp>(buffer: &'b [u8]) -> Result<R::Err<'b>, serde_json::Error> { 963 serde_json::from_slice(buffer) 964 } 965 966 // 200: parse as output 967 if self.status.is_success() { 968 match R::decode_output(&self.buffer) { 969 Ok(output) => Ok(output.into_static()), 970 Err(e) => Err(XrpcError::Decode(e)), 971 } 972 // 400: try typed XRPC error, fallback to generic error 973 } else if self.status.as_u16() == 400 { 974 let error = match parse_error::<R>(&self.buffer) { 975 Ok(error) => { 976 use alloc::string::ToString; 977 if error.to_string().contains("InvalidToken") { 978 XrpcError::Auth(AuthError::InvalidToken) 979 } else if error.to_string().contains("ExpiredToken") { 980 XrpcError::Auth(AuthError::TokenExpired) 981 } else { 982 XrpcError::Xrpc(error) 983 } 984 } 985 Err(_) => { 986 // Fallback to generic error (InvalidRequest, ExpiredToken, etc.) 987 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) { 988 Ok(mut generic) => { 989 generic.nsid = R::NSID; 990 generic.method = ""; // method info only available on request 991 generic.http_status = self.status; 992 // Map auth-related errors to AuthError 993 match generic.error.as_ref() { 994 "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired), 995 "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken), 996 _ => XrpcError::Generic(generic), 997 } 998 } 999 Err(e) => XrpcError::Decode(DecodeError::Json(e)), 1000 } 1001 } 1002 }; 1003 Err(error.into_static()) 1004 // 401: always auth error 1005 } else { 1006 let error: XrpcError<<R as XrpcResp>::Err<'_>> = 1007 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) { 1008 Ok(mut generic) => { 1009 let status = self.status; 1010 generic.nsid = R::NSID; 1011 generic.method = ""; // method info only available on request 1012 generic.http_status = status; 1013 match generic.error.as_ref() { 1014 "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired), 1015 "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken), 1016 _ => XrpcError::Auth(AuthError::NotAuthenticated), 1017 } 1018 } 1019 Err(e) => XrpcError::Decode(DecodeError::Json(e)), 1020 }; 1021 1022 Err(error.into_static()) 1023 } 1024 } 1025} 1026 1027/// Generic XRPC error format for untyped errors like InvalidRequest 1028/// 1029/// Used when the error doesn't match the endpoint's specific error enum 1030#[derive(Debug, Clone, Deserialize, Serialize)] 1031pub struct GenericXrpcError { 1032 /// Error code (e.g., "InvalidRequest") 1033 pub error: SmolStr, 1034 /// Optional error message with details 1035 pub message: Option<SmolStr>, 1036 /// XRPC method NSID that produced this error (context only; not serialized) 1037 #[serde(skip)] 1038 pub nsid: &'static str, 1039 /// HTTP method used (GET/POST) (context only; not serialized) 1040 #[serde(skip)] 1041 pub method: &'static str, 1042 /// HTTP status code (context only; not serialized) 1043 #[serde(skip)] 1044 pub http_status: StatusCode, 1045} 1046 1047impl core::fmt::Display for GenericXrpcError { 1048 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 1049 if let Some(msg) = &self.message { 1050 write!( 1051 f, 1052 "{}: {} (nsid={}, method={}, status={})", 1053 self.error, msg, self.nsid, self.method, self.http_status 1054 ) 1055 } else { 1056 write!( 1057 f, 1058 "{} (nsid={}, method={}, status={})", 1059 self.error, self.nsid, self.method, self.http_status 1060 ) 1061 } 1062 } 1063} 1064 1065impl IntoStatic for GenericXrpcError { 1066 type Output = Self; 1067 1068 fn into_static(self) -> Self::Output { 1069 self 1070 } 1071} 1072 1073impl core::error::Error for GenericXrpcError {} 1074 1075/// XRPC-specific errors returned from endpoints 1076/// 1077/// Represents errors returned in the response body 1078/// Type parameter `E` is the endpoint's specific error enum type. 1079#[derive(Debug, thiserror::Error)] 1080#[cfg_attr(feature = "std", derive(miette::Diagnostic))] 1081#[non_exhaustive] 1082pub enum XrpcError<E: core::error::Error + IntoStatic> { 1083 /// Typed XRPC error from the endpoint's specific error enum 1084 #[error("XRPC error: {0}")] 1085 #[cfg_attr(feature = "std", diagnostic(code(jacquard_common::xrpc::typed)))] 1086 Xrpc(E), 1087 1088 /// Authentication error (ExpiredToken, InvalidToken, etc.) 1089 #[error("Authentication error: {0}")] 1090 #[cfg_attr(feature = "std", diagnostic(code(jacquard_common::xrpc::auth)))] 1091 Auth(#[from] AuthError), 1092 1093 /// Generic XRPC error not in the endpoint's error enum (e.g., InvalidRequest) 1094 #[error("XRPC error: {0}")] 1095 #[cfg_attr(feature = "std", diagnostic(code(jacquard_common::xrpc::generic)))] 1096 Generic(GenericXrpcError), 1097 1098 /// Failed to decode the response body 1099 #[error("Failed to decode response: {0}")] 1100 #[cfg_attr(feature = "std", diagnostic(code(jacquard_common::xrpc::decode)))] 1101 Decode(#[from] DecodeError), 1102} 1103 1104impl<E> IntoStatic for XrpcError<E> 1105where 1106 E: core::error::Error + IntoStatic, 1107 E::Output: core::error::Error + IntoStatic, 1108 <E as IntoStatic>::Output: core::error::Error + IntoStatic, 1109{ 1110 type Output = XrpcError<E::Output>; 1111 fn into_static(self) -> Self::Output { 1112 match self { 1113 XrpcError::Xrpc(e) => XrpcError::Xrpc(e.into_static()), 1114 XrpcError::Auth(e) => XrpcError::Auth(e.into_static()), 1115 XrpcError::Generic(e) => XrpcError::Generic(e), 1116 XrpcError::Decode(e) => XrpcError::Decode(e), 1117 } 1118 } 1119} 1120 1121impl<E> Serialize for XrpcError<E> 1122where 1123 E: core::error::Error + IntoStatic + Serialize, 1124{ 1125 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 1126 where 1127 S: serde::Serializer, 1128 { 1129 use serde::ser::SerializeStruct; 1130 1131 match self { 1132 // Typed errors already serialize to correct atproto format 1133 XrpcError::Xrpc(e) => e.serialize(serializer), 1134 // Generic errors already have correct format 1135 XrpcError::Generic(g) => g.serialize(serializer), 1136 // Auth and Decode need manual mapping to {"error": "...", "message": ...} 1137 XrpcError::Auth(auth) => { 1138 let mut state = serializer.serialize_struct("XrpcError", 2)?; 1139 let (error, message) = match auth { 1140 AuthError::TokenExpired => ("ExpiredToken", Some("Access token has expired")), 1141 AuthError::InvalidToken => { 1142 ("InvalidToken", Some("Access token is invalid or malformed")) 1143 } 1144 AuthError::RefreshFailed => { 1145 ("RefreshFailed", Some("Token refresh request failed")) 1146 } 1147 AuthError::NotAuthenticated => ( 1148 "AuthenticationRequired", 1149 Some("Request requires authentication but none was provided"), 1150 ), 1151 AuthError::DpopProofFailed => { 1152 ("DpopProofFailed", Some("DPoP proof construction failed")) 1153 } 1154 AuthError::DpopNonceFailed => { 1155 ("DpopNonceFailed", Some("DPoP nonce negotiation failed")) 1156 } 1157 AuthError::Other(hv) => { 1158 let msg = hv.to_str().unwrap_or("[non-utf8 header]"); 1159 ("AuthenticationError", Some(msg)) 1160 } 1161 }; 1162 state.serialize_field("error", error)?; 1163 if let Some(msg) = message { 1164 state.serialize_field("message", msg)?; 1165 } 1166 state.end() 1167 } 1168 XrpcError::Decode(decode_err) => { 1169 let mut state = serializer.serialize_struct("XrpcError", 2)?; 1170 state.serialize_field("error", "ResponseDecodeError")?; 1171 // Convert DecodeError to string for message field 1172 let msg = format!("{:?}", decode_err); 1173 state.serialize_field("message", &msg)?; 1174 state.end() 1175 } 1176 } 1177 } 1178} 1179 1180#[cfg(feature = "streaming")] 1181impl<'a, C: HttpClient + HttpClientExt> XrpcCall<'a, C> { 1182 /// Send an XRPC call and stream the binary response. 1183 /// 1184 /// Useful for downloading blobs and entire repository archives 1185 pub async fn download<R>(self, request: &R) -> Result<StreamingResponse, StreamError> 1186 where 1187 R: XrpcRequest, 1188 <R as XrpcRequest>::Response: Send + Sync, 1189 { 1190 let http_request = 1191 build_http_request(&self.base, request, &self.opts).map_err(StreamError::transport)?; 1192 1193 let http_response = self 1194 .client 1195 .send_http_streaming(http_request) 1196 .await 1197 .map_err(StreamError::transport)?; 1198 let (parts, body) = http_response.into_parts(); 1199 1200 Ok(StreamingResponse::new(parts, body)) 1201 } 1202 1203 /// Stream an XRPC procedure call and its response 1204 /// 1205 /// Useful for streaming upload of large payloads, or for "pipe-through" operations 1206 /// where you are processing a large payload. 1207 pub async fn stream<S>( 1208 self, 1209 stream: XrpcProcedureSend<S::Frame<'static>>, 1210 ) -> Result<XrpcResponseStream<<S::Response as XrpcStreamResp>::Frame<'static>>, StreamError> 1211 where 1212 S: XrpcProcedureStream + 'static, 1213 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp, 1214 { 1215 use futures::TryStreamExt; 1216 1217 let uri = xrpc_endpoint_uri(&self.base, <S::Request as XrpcRequest>::NSID, None).map_err( 1218 |e| StreamError::protocol(format!("Failed to construct endpoint URI: {}", e)), 1219 )?; 1220 1221 let mut builder = http::Request::post(uri.as_str()); 1222 1223 if let Some(token) = &self.opts.auth { 1224 let hv = match token { 1225 AuthorizationToken::Bearer(t) => { 1226 HeaderValue::from_str(&format!("Bearer {}", t.as_ref())) 1227 } 1228 AuthorizationToken::Dpop(t) => { 1229 HeaderValue::from_str(&format!("DPoP {}", t.as_ref())) 1230 } 1231 } 1232 .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?; 1233 builder = builder.header(Header::Authorization, hv); 1234 } 1235 1236 if let Some(proxy) = &self.opts.atproto_proxy { 1237 builder = builder.header(Header::AtprotoProxy, proxy.as_ref()); 1238 } 1239 if let Some(labelers) = &self.opts.atproto_accept_labelers { 1240 if !labelers.is_empty() { 1241 let joined = labelers 1242 .iter() 1243 .map(|s| s.as_ref()) 1244 .collect::<Vec<_>>() 1245 .join(", "); 1246 builder = builder.header(Header::AtprotoAcceptLabelers, joined); 1247 } 1248 } 1249 1250 for (name, value) in &self.opts.extra_headers { 1251 builder = builder.header(name, value); 1252 } 1253 1254 let (parts, _) = builder 1255 .body(()) 1256 .map_err(|e| StreamError::protocol(e.to_string()))? 1257 .into_parts(); 1258 1259 let body_stream = Box::pin(stream.0.map_ok(|f| f.buffer)); 1260 1261 let resp = self 1262 .client 1263 .send_http_bidirectional(parts, body_stream) 1264 .await 1265 .map_err(StreamError::transport)?; 1266 1267 let (parts, body) = resp.into_parts(); 1268 1269 Ok(XrpcResponseStream::< 1270 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>, 1271 >::from_typed_parts(parts, body)) 1272 } 1273} 1274 1275#[cfg(test)] 1276mod tests { 1277 use super::*; 1278 use serde::{Deserialize, Serialize}; 1279 1280 #[derive(Serialize, Deserialize)] 1281 #[allow(dead_code)] 1282 struct DummyReq; 1283 1284 #[derive(Deserialize, Serialize, Debug, thiserror::Error)] 1285 #[error("{0}")] 1286 struct DummyErr<'a>(#[serde(borrow)] CowStr<'a>); 1287 1288 impl IntoStatic for DummyErr<'_> { 1289 type Output = DummyErr<'static>; 1290 fn into_static(self) -> Self::Output { 1291 DummyErr(self.0.into_static()) 1292 } 1293 } 1294 1295 struct DummyResp; 1296 1297 impl XrpcResp for DummyResp { 1298 const NSID: &'static str = "test.dummy"; 1299 const ENCODING: &'static str = "application/json"; 1300 type Output<'de> = (); 1301 type Err<'de> = DummyErr<'de>; 1302 } 1303 1304 impl XrpcRequest for DummyReq { 1305 const NSID: &'static str = "test.dummy"; 1306 const METHOD: XrpcMethod = XrpcMethod::Procedure("application/json"); 1307 type Response = DummyResp; 1308 } 1309 1310 #[test] 1311 fn generic_error_carries_context() { 1312 let body = serde_json::json!({"error":"InvalidRequest","message":"missing"}); 1313 let buf = Bytes::from(serde_json::to_vec(&body).unwrap()); 1314 let resp: Response<DummyResp> = Response::new(buf, StatusCode::BAD_REQUEST); 1315 match resp.parse().unwrap_err() { 1316 XrpcError::Generic(g) => { 1317 assert_eq!(g.error.as_str(), "InvalidRequest"); 1318 assert_eq!(g.message.as_deref(), Some("missing")); 1319 assert_eq!(g.nsid, DummyResp::NSID); 1320 assert_eq!(g.method, ""); // method info only on request 1321 assert_eq!(g.http_status, StatusCode::BAD_REQUEST); 1322 } 1323 other => panic!("unexpected: {other:?}"), 1324 } 1325 } 1326 1327 #[test] 1328 fn auth_error_mapping() { 1329 for (code, expect) in [ 1330 ("ExpiredToken", AuthError::TokenExpired), 1331 ("InvalidToken", AuthError::InvalidToken), 1332 ] { 1333 let body = serde_json::json!({"error": code}); 1334 let buf = Bytes::from(serde_json::to_vec(&body).unwrap()); 1335 let resp: Response<DummyResp> = Response::new(buf, StatusCode::UNAUTHORIZED); 1336 match resp.parse().unwrap_err() { 1337 XrpcError::Auth(e) => match (e, expect) { 1338 (AuthError::TokenExpired, AuthError::TokenExpired) => {} 1339 (AuthError::InvalidToken, AuthError::InvalidToken) => {} 1340 other => panic!("mismatch: {other:?}"), 1341 }, 1342 other => panic!("unexpected: {other:?}"), 1343 } 1344 } 1345 } 1346 1347 #[test] 1348 fn xrpc_uri_construction_basic() { 1349 use crate::alloc::string::ToString; 1350 #[derive(Serialize, Deserialize)] 1351 struct Req; 1352 #[derive(Deserialize, Serialize, Debug, thiserror::Error)] 1353 #[error("{0}")] 1354 struct Err<'a>(#[serde(borrow)] CowStr<'a>); 1355 impl IntoStatic for Err<'_> { 1356 type Output = Err<'static>; 1357 fn into_static(self) -> Self::Output { 1358 Err(self.0.into_static()) 1359 } 1360 } 1361 struct Resp; 1362 impl XrpcResp for Resp { 1363 const NSID: &'static str = "com.example.test"; 1364 const ENCODING: &'static str = "application/json"; 1365 type Output<'de> = (); 1366 type Err<'de> = Err<'de>; 1367 } 1368 impl XrpcRequest for Req { 1369 const NSID: &'static str = "com.example.test"; 1370 const METHOD: XrpcMethod = XrpcMethod::Query; 1371 type Response = Resp; 1372 } 1373 1374 let opts = CallOptions::default(); 1375 1376 // AC1.1: Base URI without trailing slash + NSID produces correct `/xrpc/{nsid}` path 1377 let base1 = Uri::parse("https://pds.example.com") 1378 .expect("URI should be valid") 1379 .to_owned(); 1380 let req1 = build_http_request(&base1, &Req, &opts).unwrap(); 1381 let uri1 = req1.uri().to_string(); 1382 assert!( 1383 uri1.contains("/xrpc/com.example.test"), 1384 "AC1.1: URI {} should contain '/xrpc/com.example.test'", 1385 uri1 1386 ); 1387 assert_eq!( 1388 uri1, "https://pds.example.com/xrpc/com.example.test", 1389 "AC1.1: URI should be exact match" 1390 ); 1391 1392 // AC1.2: Base URI with sub-path preserves it: `/base/xrpc/{nsid}` 1393 let base2 = Uri::parse("https://pds.example.com/base") 1394 .expect("URI should be valid") 1395 .to_owned(); 1396 let req2 = build_http_request(&base2, &Req, &opts).unwrap(); 1397 let uri2 = req2.uri().to_string(); 1398 assert!( 1399 uri2.contains("/base/xrpc/com.example.test"), 1400 "AC1.2: URI {} should contain '/base/xrpc/com.example.test'", 1401 uri2 1402 ); 1403 assert_eq!( 1404 uri2, "https://pds.example.com/base/xrpc/com.example.test", 1405 "AC1.2: URI should preserve sub-path" 1406 ); 1407 1408 // AC1.5: Base URI with trailing slash is normalized (slash stripped) before construction 1409 let base_with_slash = Uri::parse("https://pds.example.com/") 1410 .expect("URI should be valid") 1411 .to_owned(); 1412 let req_slash = build_http_request(&base_with_slash, &Req, &opts).unwrap(); 1413 let uri_slash = req_slash.uri().to_string(); 1414 assert!( 1415 !uri_slash.contains("//xrpc"), 1416 "AC1.5: URI {} should not contain '//xrpc'", 1417 uri_slash 1418 ); 1419 assert_eq!( 1420 uri_slash, "https://pds.example.com/xrpc/com.example.test", 1421 "AC1.5: URI should handle trailing slash" 1422 ); 1423 } 1424 1425 #[test] 1426 fn xrpc_uri_query_parameters() { 1427 use crate::alloc::string::ToString; 1428 use serde::Serialize; 1429 1430 #[derive(Serialize)] 1431 struct QueryReq { 1432 #[serde(skip_serializing_if = "Option::is_none")] 1433 param1: Option<String>, 1434 #[serde(skip_serializing_if = "Option::is_none")] 1435 param2: Option<String>, 1436 } 1437 1438 #[derive(Serialize, Deserialize, Debug, thiserror::Error)] 1439 #[error("test error")] 1440 struct Err; 1441 impl IntoStatic for Err { 1442 type Output = Err; 1443 fn into_static(self) -> Self::Output { 1444 self 1445 } 1446 } 1447 1448 struct Resp; 1449 impl XrpcResp for Resp { 1450 const NSID: &'static str = "com.example.test"; 1451 const ENCODING: &'static str = "application/json"; 1452 type Output<'de> = (); 1453 type Err<'de> = Err; 1454 } 1455 impl XrpcRequest for QueryReq { 1456 const NSID: &'static str = "com.example.test"; 1457 const METHOD: XrpcMethod = XrpcMethod::Query; 1458 type Response = Resp; 1459 } 1460 1461 let opts = CallOptions::default(); 1462 let base = Uri::parse("https://pds.example.com") 1463 .expect("URI should be valid") 1464 .to_owned(); 1465 1466 // AC1.3: Query parameters from serde serialisation are set correctly 1467 let req_with_params = QueryReq { 1468 param1: Some("value1".to_string()), 1469 param2: Some("value2".to_string()), 1470 }; 1471 let http_req = build_http_request(&base, &req_with_params, &opts).unwrap(); 1472 let uri_str = http_req.uri().to_string(); 1473 assert!( 1474 uri_str.contains("?"), 1475 "AC1.3: URI should contain query string" 1476 ); 1477 assert!( 1478 uri_str.contains("param1=value1"), 1479 "AC1.3: URI should contain param1" 1480 ); 1481 assert!( 1482 uri_str.contains("param2=value2"), 1483 "AC1.3: URI should contain param2" 1484 ); 1485 1486 // AC1.4: Empty/default query parameters result in no `?` in the constructed URI 1487 let req_empty_params = QueryReq { 1488 param1: None, 1489 param2: None, 1490 }; 1491 let http_req_empty = build_http_request(&base, &req_empty_params, &opts).unwrap(); 1492 let uri_str_empty = http_req_empty.uri().to_string(); 1493 assert!( 1494 !uri_str_empty.contains("?"), 1495 "AC1.4: URI {} should not contain '?' with empty params", 1496 uri_str_empty 1497 ); 1498 assert_eq!( 1499 uri_str_empty, "https://pds.example.com/xrpc/com.example.test", 1500 "AC1.4: URI should have no query string" 1501 ); 1502 } 1503 1504 #[test] 1505 fn xrpc_uri_special_characters_in_query() { 1506 use crate::alloc::string::ToString; 1507 use serde::Serialize; 1508 1509 #[derive(Serialize)] 1510 struct QueryReq { 1511 #[serde(skip_serializing_if = "Option::is_none")] 1512 search: Option<String>, 1513 #[serde(skip_serializing_if = "Option::is_none")] 1514 filter: Option<String>, 1515 #[serde(skip_serializing_if = "Option::is_none")] 1516 unicode_param: Option<String>, 1517 } 1518 1519 #[derive(Serialize, Deserialize, Debug, thiserror::Error)] 1520 #[error("test error")] 1521 struct Err; 1522 impl IntoStatic for Err { 1523 type Output = Err; 1524 fn into_static(self) -> Self::Output { 1525 self 1526 } 1527 } 1528 1529 struct Resp; 1530 impl XrpcResp for Resp { 1531 const NSID: &'static str = "com.example.test"; 1532 const ENCODING: &'static str = "application/json"; 1533 type Output<'de> = (); 1534 type Err<'de> = Err; 1535 } 1536 impl XrpcRequest for QueryReq { 1537 const NSID: &'static str = "com.example.test"; 1538 const METHOD: XrpcMethod = XrpcMethod::Query; 1539 type Response = Resp; 1540 } 1541 1542 let opts = CallOptions::default(); 1543 let base = Uri::parse("https://pds.example.com") 1544 .expect("URI should be valid") 1545 .to_owned(); 1546 1547 // AC1.3: Test with spaces (serde_html_form uses + for spaces per application/x-www-form-urlencoded) 1548 let req_spaces = QueryReq { 1549 search: Some("hello world".to_string()), 1550 filter: None, 1551 unicode_param: None, 1552 }; 1553 let http_req_spaces = build_http_request(&base, &req_spaces, &opts).unwrap(); 1554 let uri_spaces = http_req_spaces.uri().to_string(); 1555 assert!( 1556 uri_spaces.contains("search=hello"), 1557 "AC1.3: URI should contain search param" 1558 ); 1559 // serde_html_form encodes spaces as + 1560 assert!( 1561 uri_spaces.contains("hello+world") || uri_spaces.contains("hello%20world"), 1562 "AC1.3: URI {} should encode space in 'hello world'", 1563 uri_spaces 1564 ); 1565 1566 // AC1.3: Test with special characters: &, =, + 1567 let req_special = QueryReq { 1568 search: Some("a=b&c+d".to_string()), 1569 filter: None, 1570 unicode_param: None, 1571 }; 1572 let http_req_special = build_http_request(&base, &req_special, &opts).unwrap(); 1573 let uri_special = http_req_special.uri().to_string(); 1574 assert!( 1575 uri_special.contains("?"), 1576 "AC1.3: URI should contain query string for special chars" 1577 ); 1578 // Verify the URI can be parsed successfully (fluent-uri handles encoded values) 1579 let parsed = Uri::parse(uri_special.clone()); 1580 assert!( 1581 parsed.is_ok(), 1582 "AC1.3: URI {} should be parseable by fluent-uri", 1583 uri_special 1584 ); 1585 1586 // AC1.3: Test with unicode characters 1587 let req_unicode = QueryReq { 1588 search: None, 1589 filter: None, 1590 unicode_param: Some("你好世界".to_string()), 1591 }; 1592 let http_req_unicode = build_http_request(&base, &req_unicode, &opts).unwrap(); 1593 let uri_unicode = http_req_unicode.uri().to_string(); 1594 assert!( 1595 uri_unicode.contains("?"), 1596 "AC1.3: URI should contain query string for unicode" 1597 ); 1598 // Verify the URI can be parsed successfully 1599 let parsed_unicode = Uri::parse(uri_unicode.clone()); 1600 assert!( 1601 parsed_unicode.is_ok(), 1602 "AC1.3: URI {} should be parseable for unicode params", 1603 uri_unicode 1604 ); 1605 } 1606 1607 #[test] 1608 fn no_double_slash_in_path() { 1609 use crate::alloc::string::ToString; 1610 #[derive(Serialize, Deserialize)] 1611 struct Req; 1612 #[derive(Deserialize, Serialize, Debug, thiserror::Error)] 1613 #[error("{0}")] 1614 struct Err<'a>(#[serde(borrow)] CowStr<'a>); 1615 impl IntoStatic for Err<'_> { 1616 type Output = Err<'static>; 1617 fn into_static(self) -> Self::Output { 1618 Err(self.0.into_static()) 1619 } 1620 } 1621 struct Resp; 1622 impl XrpcResp for Resp { 1623 const NSID: &'static str = "com.example.test"; 1624 const ENCODING: &'static str = "application/json"; 1625 type Output<'de> = (); 1626 type Err<'de> = Err<'de>; 1627 } 1628 impl XrpcRequest for Req { 1629 const NSID: &'static str = "com.example.test"; 1630 const METHOD: XrpcMethod = XrpcMethod::Query; 1631 type Response = Resp; 1632 } 1633 1634 let opts = CallOptions::default(); 1635 1636 // Ensure no double slashes in path 1637 let base1 = Uri::parse("https://pds") 1638 .expect("URI should be valid") 1639 .to_owned(); 1640 let req1 = build_http_request(&base1, &Req, &opts).unwrap(); 1641 let uri1 = req1.uri().to_string(); 1642 assert!( 1643 !uri1.contains("//xrpc"), 1644 "URI {} should not contain '//xrpc'", 1645 uri1 1646 ); 1647 1648 let base2 = Uri::parse("https://pds/base") 1649 .expect("URI should be valid") 1650 .to_owned(); 1651 let req2 = build_http_request(&base2, &Req, &opts).unwrap(); 1652 let uri2 = req2.uri().to_string(); 1653 assert!( 1654 !uri2.contains("//xrpc"), 1655 "URI {} should not contain '//xrpc'", 1656 uri2 1657 ); 1658 } 1659}