this repo has no description
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add automatic token refresh with centralized session persistence

When the PDS returns ExpiredToken, the client automatically refreshes
the session using refresh_jwt and retries the original request. Session
persistence is handled once in the dispatch layer (main.rs) rather than
in each command — Execute trait now returns Option<Session>.

+387 -146
+7 -5
crates/opake-cli/src/commands/download.rs
··· 5 5 use clap::Args; 6 6 use opake_core::documents; 7 7 8 + use opake_core::client::Session; 9 + 8 10 use crate::commands::Execute; 9 11 use crate::identity; 10 12 use crate::session; ··· 40 42 } 41 43 42 44 impl Execute for DownloadCommand { 43 - async fn execute(self) -> Result<()> { 44 - let client = session::load_client()?; 45 + async fn execute(self) -> Result<Option<Session>> { 46 + let mut client = session::load_client()?; 45 47 let id = identity::load_identity().context("run `opake login` first")?; 46 48 let private_key = id.private_key_bytes()?; 47 49 48 - let uri = documents::resolve_uri(&client, &self.reference).await?; 50 + let uri = documents::resolve_uri(&mut client, &self.reference).await?; 49 51 50 52 let (name, plaintext) = 51 - documents::download_and_decrypt(&client, &id.did, &private_key, &uri).await?; 53 + documents::download_and_decrypt(&mut client, &id.did, &private_key, &uri).await?; 52 54 53 55 let output_path = resolve_output_path(self.output, &name); 54 56 write_output(&output_path, &plaintext)?; ··· 60 62 plaintext.len() 61 63 ); 62 64 63 - Ok(()) 65 + Ok(session::refreshed_session(&client)) 64 66 } 65 67 } 66 68
+9 -5
crates/opake-cli/src/commands/login.rs
··· 1 1 use anyhow::Result; 2 2 use clap::Args; 3 3 use log::debug; 4 - use opake_core::client::XrpcClient; 4 + use opake_core::client::{Session, XrpcClient}; 5 5 6 6 use crate::commands::Execute; 7 + use crate::config; 7 8 use crate::identity; 8 - use crate::session; 9 9 use crate::transport::ReqwestTransport; 10 10 use crate::utils::prefixed_get_env; 11 11 ··· 43 43 } 44 44 45 45 impl Execute for LoginCommand { 46 - async fn execute(self) -> Result<()> { 46 + async fn execute(self) -> Result<Option<Session>> { 47 47 debug!("Starting login command"); 48 48 49 49 let password = resolve_password(prefixed_get_env("PASSWORD"), || { ··· 55 55 56 56 let session = client.login(self.identifier.trim(), &password).await?; 57 57 58 - session::save_session(session, &self.pds)?; 58 + // PDS URL is login-specific config — write it here, session 59 + // persistence is handled by the dispatch layer. 60 + config::save_config(&config::Config { 61 + pds_url: self.pds.clone(), 62 + })?; 59 63 60 64 let (_, generated) = 61 65 identity::ensure_identity(&session.did, &mut opake_core::crypto::OsRng)?; ··· 66 70 67 71 println!("Logged in as {}", session.handle); 68 72 69 - Ok(()) 73 + Ok(Some(session.clone())) 70 74 } 71 75 } 72 76
+7 -5
crates/opake-cli/src/commands/ls.rs
··· 2 2 use clap::Args; 3 3 use opake_core::documents::{self, DocumentEntry}; 4 4 5 + use opake_core::client::Session; 6 + 5 7 use crate::commands::Execute; 6 8 use crate::session; 7 9 ··· 70 72 } 71 73 72 74 impl Execute for LsCommand { 73 - async fn execute(self) -> Result<()> { 74 - let client = session::load_client()?; 75 - let mut entries = documents::list_documents(&client).await?; 75 + async fn execute(self) -> Result<Option<Session>> { 76 + let mut client = session::load_client()?; 77 + let mut entries = documents::list_documents(&mut client).await?; 76 78 77 79 if let Some(ref tag) = self.tag { 78 80 filter_by_tag(&mut entries, tag); ··· 84 86 } else { 85 87 println!("no documents"); 86 88 } 87 - return Ok(()); 89 + return Ok(session::refreshed_session(&client)); 88 90 } 89 91 90 92 if self.long { ··· 95 97 96 98 println!("\n{} document(s)", entries.len()); 97 99 98 - Ok(()) 100 + Ok(session::refreshed_session(&client)) 99 101 } 100 102 } 101 103
+2 -1
crates/opake-cli/src/commands/mod.rs
··· 5 5 pub mod upload; 6 6 7 7 use anyhow::Result; 8 + use opake_core::client::Session; 8 9 9 10 pub trait Execute { 10 - fn execute(self) -> impl std::future::Future<Output = Result<()>>; 11 + fn execute(self) -> impl std::future::Future<Output = Result<Option<Session>>>; 11 12 }
+7 -6
crates/opake-cli/src/commands/rm.rs
··· 1 1 use anyhow::{Context, Result}; 2 2 use clap::Args; 3 + use opake_core::client::Session; 3 4 use opake_core::documents; 4 5 5 6 use crate::commands::Execute; ··· 17 18 } 18 19 19 20 impl Execute for RmCommand { 20 - async fn execute(self) -> Result<()> { 21 - let client = session::load_client()?; 22 - let uri = documents::resolve_uri(&client, &self.reference).await?; 21 + async fn execute(self) -> Result<Option<Session>> { 22 + let mut client = session::load_client()?; 23 + let uri = documents::resolve_uri(&mut client, &self.reference).await?; 23 24 24 25 if !self.yes { 25 26 eprint!("delete {}? [y/N] ", uri); ··· 29 30 .context("failed to read confirmation")?; 30 31 if !answer.trim().eq_ignore_ascii_case("y") { 31 32 println!("aborted"); 32 - return Ok(()); 33 + return Ok(session::refreshed_session(&client)); 33 34 } 34 35 } 35 36 36 - documents::delete_document(&client, &uri).await?; 37 + documents::delete_document(&mut client, &uri).await?; 37 38 println!("deleted {}", uri); 38 39 39 - Ok(()) 40 + Ok(session::refreshed_session(&client)) 40 41 } 41 42 }
+6 -4
crates/opake-cli/src/commands/upload.rs
··· 7 7 use opake_core::crypto::OsRng; 8 8 use opake_core::documents::{self, UploadParams}; 9 9 10 + use opake_core::client::Session; 11 + 10 12 use crate::commands::Execute; 11 13 use crate::identity; 12 14 use crate::session; ··· 27 29 } 28 30 29 31 impl Execute for UploadCommand { 30 - async fn execute(self) -> Result<()> { 32 + async fn execute(self) -> Result<Option<Session>> { 31 33 if self.keyring.is_some() { 32 34 anyhow::bail!("--keyring not yet supported (tracking: chainlink #21)"); 33 35 } 34 36 35 - let client = session::load_client()?; 37 + let mut client = session::load_client()?; 36 38 let id = identity::load_identity()?; 37 39 let owner_pubkey = id.public_key_bytes()?; 38 40 ··· 59 61 created_at: &Utc::now().to_rfc3339(), 60 62 }; 61 63 62 - let uri = documents::encrypt_and_upload(&client, &params, &mut OsRng).await?; 64 + let uri = documents::encrypt_and_upload(&mut client, &params, &mut OsRng).await?; 63 65 64 66 println!("{} → {}", filename, uri); 65 - Ok(()) 67 + Ok(session::refreshed_session(&client)) 66 68 } 67 69 } 68 70
+12 -7
crates/opake-cli/src/main.rs
··· 5 5 mod transport; 6 6 pub mod utils; 7 7 8 - use anyhow::Context; 9 8 use clap::{Parser, Subcommand}; 10 9 use commands::Execute; 11 10 use log::info; ··· 32 31 info!("Starting Opake CLI. Hello!"); 33 32 let cli = Cli::parse(); 34 33 35 - match cli.command { 36 - Command::Login(cmd) => cmd.execute().await.context("Failed to log into your PDS"), 37 - Command::Upload(cmd) => cmd.execute().await, 38 - Command::Download(cmd) => cmd.execute().await, 39 - Command::Ls(cmd) => cmd.execute().await, 40 - Command::Rm(cmd) => cmd.execute().await, 34 + let refreshed = match cli.command { 35 + Command::Login(cmd) => cmd.execute().await?, 36 + Command::Upload(cmd) => cmd.execute().await?, 37 + Command::Download(cmd) => cmd.execute().await?, 38 + Command::Ls(cmd) => cmd.execute().await?, 39 + Command::Rm(cmd) => cmd.execute().await?, 40 + }; 41 + 42 + if let Some(ref s) = refreshed { 43 + session::persist_session(s)?; 41 44 } 45 + 46 + Ok(()) 42 47 }
+20 -16
crates/opake-cli/src/session.rs
··· 1 1 use log::info; 2 2 use opake_core::client::{Session, XrpcClient}; 3 3 4 - use crate::config::{self, Config}; 4 + use crate::config; 5 5 use crate::transport::ReqwestTransport; 6 6 7 7 const FILENAME: &str = "session.json"; 8 8 9 - /// Save the session and PDS URL to disk after successful login. 10 - pub fn save_session(session: &Session, pds_url: &str) -> anyhow::Result<()> { 11 - config::save_json(FILENAME, session)?; 12 - config::save_config(&Config { 13 - pds_url: pds_url.to_string(), 14 - })?; 15 - info!("session saved"); 16 - Ok(()) 17 - } 18 - 19 9 fn load_session() -> anyhow::Result<Session> { 20 10 config::load_json(FILENAME) 21 11 } ··· 28 18 Ok(XrpcClient::with_session(transport, config.pds_url, session)) 29 19 } 30 20 21 + /// Extract the session if it was refreshed during this client's lifetime. 22 + /// Commands return this to the dispatch layer for persistence. 23 + pub fn refreshed_session(client: &XrpcClient<ReqwestTransport>) -> Option<Session> { 24 + if client.session_refreshed() { 25 + client.session().cloned() 26 + } else { 27 + None 28 + } 29 + } 30 + 31 + /// Persist a refreshed session to disk. Called once from the dispatch layer. 32 + pub fn persist_session(session: &Session) -> anyhow::Result<()> { 33 + config::save_json(FILENAME, session)?; 34 + info!("persisted refreshed session tokens"); 35 + Ok(()) 36 + } 37 + 31 38 #[cfg(test)] 32 39 mod tests { 33 40 use super::*; ··· 48 55 } 49 56 50 57 #[test] 51 - fn save_and_load_session_roundtrip() { 58 + fn persist_and_load_session_roundtrip() { 52 59 with_test_dir(|_| { 53 60 let session = fake_session(); 54 - save_session(&session, "https://pds.test").unwrap(); 55 - 56 - let config = config::load_config().unwrap(); 57 - assert_eq!(config.pds_url, "https://pds.test"); 61 + persist_session(&session).unwrap(); 58 62 59 63 let loaded = load_session().unwrap(); 60 64 assert_eq!(loaded.did, session.did);
+238 -24
crates/opake-core/src/client.rs
··· 89 89 transport: T, 90 90 base_url: String, 91 91 session: Option<Session>, 92 + session_refreshed: bool, 92 93 } 93 94 94 95 impl<T: Transport> XrpcClient<T> { ··· 97 98 transport, 98 99 base_url, 99 100 session: None, 101 + session_refreshed: false, 100 102 } 101 103 } 102 104 ··· 105 107 transport, 106 108 base_url, 107 109 session: Some(session), 110 + session_refreshed: false, 108 111 } 109 112 } 110 113 111 114 pub fn session(&self) -> Option<&Session> { 112 115 self.session.as_ref() 116 + } 117 + 118 + /// Whether the session was refreshed during this client's lifetime. 119 + /// The CLI uses this to persist updated tokens to disk. 120 + pub fn session_refreshed(&self) -> bool { 121 + self.session_refreshed 113 122 } 114 123 115 124 /// Authenticate via `com.atproto.server.createSession`. ··· 163 172 .ok_or_else(|| Error::Auth("not logged in".into())) 164 173 } 165 174 175 + /// Replace the Authorization header in a request with the current access token. 176 + fn replace_auth_header(&self, mut request: HttpRequest) -> Result<HttpRequest, Error> { 177 + let (key, value) = self.auth_header()?; 178 + if let Some(h) = request.headers.iter_mut().find(|(k, _)| k == &key) { 179 + h.1 = value; 180 + } 181 + Ok(request) 182 + } 183 + 184 + /// Check whether a PDS response is an expired-token error. 185 + fn is_expired_token(response: &HttpResponse) -> bool { 186 + if response.status != 400 { 187 + return false; 188 + } 189 + 190 + #[derive(Deserialize)] 191 + struct Body { 192 + error: Option<String>, 193 + } 194 + 195 + serde_json::from_slice::<Body>(&response.body) 196 + .ok() 197 + .and_then(|b| b.error) 198 + .is_some_and(|e| e == "ExpiredToken") 199 + } 200 + 201 + /// Refresh the session using the stored refresh_jwt. 202 + async fn refresh_session(&mut self) -> Result<(), Error> { 203 + let refresh_jwt = self 204 + .session 205 + .as_ref() 206 + .map(|s| s.refresh_jwt.clone()) 207 + .ok_or_else(|| Error::Auth("not logged in".into()))?; 208 + 209 + info!("access token expired, refreshing session"); 210 + 211 + let response = self 212 + .transport 213 + .send(HttpRequest { 214 + method: HttpMethod::Post, 215 + url: format!("{}/xrpc/com.atproto.server.refreshSession", self.base_url), 216 + headers: vec![("Authorization".into(), format!("Bearer {}", refresh_jwt))], 217 + body: None, 218 + }) 219 + .await?; 220 + 221 + if response.status != 200 { 222 + warn!("session refresh failed with HTTP {}", response.status); 223 + return Err(Error::Auth(format!( 224 + "session refresh failed (HTTP {}) — run `opake login` again", 225 + response.status 226 + ))); 227 + } 228 + 229 + let new_session: Session = serde_json::from_slice(&response.body)?; 230 + info!("session refreshed for {}", new_session.handle); 231 + self.session = Some(new_session); 232 + self.session_refreshed = true; 233 + 234 + Ok(()) 235 + } 236 + 166 237 /// Send a request and check the response status. Every XRPC method except 167 238 /// `login` (which has custom error handling) goes through here. 168 - async fn send_checked(&self, request: HttpRequest) -> Result<HttpResponse, Error> { 169 - let response = self.transport.send(request).await?; 239 + /// 240 + /// If the PDS returns `ExpiredToken`, the session is automatically refreshed 241 + /// and the request is retried once with the new access token. 242 + async fn send_checked(&mut self, request: HttpRequest) -> Result<HttpResponse, Error> { 243 + let mut response = self.transport.send(request.clone()).await?; 244 + 245 + if Self::is_expired_token(&response) { 246 + self.refresh_session().await?; 247 + let retried = self.replace_auth_header(request)?; 248 + response = self.transport.send(retried).await?; 249 + } 250 + 170 251 Self::check_response(&response)?; 171 252 Ok(response) 172 253 } ··· 206 287 } 207 288 208 289 /// Upload raw bytes as a blob via `com.atproto.repo.uploadBlob`. 209 - pub async fn upload_blob(&self, data: Vec<u8>, mime_type: &str) -> Result<BlobRef, Error> { 290 + pub async fn upload_blob(&mut self, data: Vec<u8>, mime_type: &str) -> Result<BlobRef, Error> { 210 291 debug!("uploading blob ({} bytes, {})", data.len(), mime_type); 211 292 let auth = self.auth_header()?; 212 293 ··· 232 313 } 233 314 234 315 /// Fetch a blob by DID + CID via `com.atproto.sync.getBlob`. 235 - pub async fn get_blob(&self, did: &str, cid: &str) -> Result<Vec<u8>, Error> { 316 + pub async fn get_blob(&mut self, did: &str, cid: &str) -> Result<Vec<u8>, Error> { 236 317 debug!("fetching blob did={} cid={}", did, cid); 237 318 let auth = self.auth_header()?; 238 319 let url = format!( ··· 254 335 255 336 /// Create a record via `com.atproto.repo.createRecord`. 256 337 pub async fn create_record<R: Serialize>( 257 - &self, 338 + &mut self, 258 339 collection: &str, 259 340 record: &R, 260 341 ) -> Result<RecordRef, Error> { ··· 282 363 283 364 /// Fetch a single record via `com.atproto.repo.getRecord`. 284 365 pub async fn get_record( 285 - &self, 366 + &mut self, 286 367 did: &str, 287 368 collection: &str, 288 369 rkey: &str, ··· 308 389 309 390 /// List records in a collection via `com.atproto.repo.listRecords`. 310 391 pub async fn list_records( 311 - &self, 392 + &mut self, 312 393 collection: &str, 313 394 limit: Option<u32>, 314 395 cursor: Option<&str>, ··· 341 422 } 342 423 343 424 /// Delete a record via `com.atproto.repo.deleteRecord`. 344 - pub async fn delete_record(&self, collection: &str, rkey: &str) -> Result<(), Error> { 425 + pub async fn delete_record(&mut self, collection: &str, rkey: &str) -> Result<(), Error> { 345 426 debug!("deleting record {}/{}", collection, rkey); 346 427 let auth = self.auth_header()?; 347 428 let did = self.did()?; ··· 367 448 #[cfg(test)] 368 449 mod tests { 369 450 use super::*; 451 + use crate::test_utils::MockTransport; 370 452 371 453 // Test check_response directly — it's pure logic over HttpResponse, 372 454 // no transport needed. ··· 382 464 383 465 #[test] 384 466 fn ok_200_passes() { 385 - assert!(XrpcClient::<DummyTransport>::check_response(&response(200, "")).is_ok()); 467 + assert!(XrpcClient::<MockTransport>::check_response(&response(200, "")).is_ok()); 386 468 } 387 469 388 470 #[test] 389 471 fn created_201_passes() { 390 - assert!(XrpcClient::<DummyTransport>::check_response(&response(201, "")).is_ok()); 472 + assert!(XrpcClient::<MockTransport>::check_response(&response(201, "")).is_ok()); 391 473 } 392 474 393 475 #[test] 394 476 fn no_content_204_passes() { 395 - assert!(XrpcClient::<DummyTransport>::check_response(&response(204, "")).is_ok()); 477 + assert!(XrpcClient::<MockTransport>::check_response(&response(204, "")).is_ok()); 396 478 } 397 479 398 480 // -- XRPC error bodies -- ··· 403 485 500, 404 486 r#"{"error":"InternalServerError","message":"Internal Server Error"}"#, 405 487 ); 406 - let err = XrpcClient::<DummyTransport>::check_response(&r).unwrap_err(); 488 + let err = XrpcClient::<MockTransport>::check_response(&r).unwrap_err(); 407 489 match err { 408 490 Error::Xrpc { status, message } => { 409 491 assert_eq!(status, 500); ··· 417 499 #[test] 418 500 fn error_400_with_error_code_only() { 419 501 let r = response(400, r#"{"error":"InvalidRequest"}"#); 420 - let err = XrpcClient::<DummyTransport>::check_response(&r).unwrap_err(); 502 + let err = XrpcClient::<MockTransport>::check_response(&r).unwrap_err(); 421 503 match err { 422 504 Error::Xrpc { status, message } => { 423 505 assert_eq!(status, 400); ··· 430 512 #[test] 431 513 fn error_403_with_message_only() { 432 514 let r = response(403, r#"{"message":"not authorized"}"#); 433 - let err = XrpcClient::<DummyTransport>::check_response(&r).unwrap_err(); 515 + let err = XrpcClient::<MockTransport>::check_response(&r).unwrap_err(); 434 516 match err { 435 517 Error::Xrpc { status, message } => { 436 518 assert_eq!(status, 403); ··· 448 530 404, 449 531 r#"{"error":"RecordNotFound","message":"no such record"}"#, 450 532 ); 451 - let err = XrpcClient::<DummyTransport>::check_response(&r).unwrap_err(); 533 + let err = XrpcClient::<MockTransport>::check_response(&r).unwrap_err(); 452 534 assert!(matches!(err, Error::NotFound(_))); 453 535 } 454 536 ··· 457 539 #[test] 458 540 fn error_502_with_html_body() { 459 541 let r = response(502, "<html><body>Bad Gateway</body></html>"); 460 - let err = XrpcClient::<DummyTransport>::check_response(&r).unwrap_err(); 542 + let err = XrpcClient::<MockTransport>::check_response(&r).unwrap_err(); 461 543 match err { 462 544 Error::Xrpc { status, message } => { 463 545 assert_eq!(status, 502); ··· 470 552 #[test] 471 553 fn error_500_with_empty_body() { 472 554 let r = response(500, ""); 473 - let err = XrpcClient::<DummyTransport>::check_response(&r).unwrap_err(); 555 + let err = XrpcClient::<MockTransport>::check_response(&r).unwrap_err(); 474 556 match err { 475 557 Error::Xrpc { status, message } => { 476 558 assert_eq!(status, 500); ··· 483 565 #[test] 484 566 fn error_500_with_empty_json_object() { 485 567 let r = response(500, "{}"); 486 - let err = XrpcClient::<DummyTransport>::check_response(&r).unwrap_err(); 568 + let err = XrpcClient::<MockTransport>::check_response(&r).unwrap_err(); 487 569 match err { 488 570 Error::Xrpc { status, message } => { 489 571 assert_eq!(status, 500); ··· 497 579 498 580 #[test] 499 581 fn redirect_300_is_error() { 500 - assert!(XrpcClient::<DummyTransport>::check_response(&response(300, "")).is_err()); 582 + assert!(XrpcClient::<MockTransport>::check_response(&response(300, "")).is_err()); 501 583 } 502 584 503 - // -- Dummy transport for type parameter (never called) -- 585 + // -- Token refresh tests -- 504 586 505 - struct DummyTransport; 587 + fn expired_token_response() -> HttpResponse { 588 + HttpResponse { 589 + status: 400, 590 + body: br#"{"error":"ExpiredToken","message":"Token has expired"}"#.to_vec(), 591 + } 592 + } 506 593 507 - impl Transport for DummyTransport { 508 - async fn send(&self, _request: HttpRequest) -> Result<HttpResponse, Error> { 509 - unreachable!("check_response tests don't use transport") 594 + fn refresh_session_response() -> HttpResponse { 595 + let body = serde_json::json!({ 596 + "did": "did:plc:test", 597 + "handle": "test.handle", 598 + "accessJwt": "fresh-access-jwt", 599 + "refreshJwt": "fresh-refresh-jwt", 600 + }); 601 + HttpResponse { 602 + status: 200, 603 + body: serde_json::to_vec(&body).unwrap(), 604 + } 605 + } 606 + 607 + fn success_response(body: &str) -> HttpResponse { 608 + HttpResponse { 609 + status: 200, 610 + body: body.as_bytes().to_vec(), 510 611 } 612 + } 613 + 614 + fn mock_client(mock: MockTransport) -> XrpcClient<MockTransport> { 615 + let session = Session { 616 + did: "did:plc:test".into(), 617 + handle: "test.handle".into(), 618 + access_jwt: "stale-access-jwt".into(), 619 + refresh_jwt: "valid-refresh-jwt".into(), 620 + }; 621 + XrpcClient::with_session(mock, "https://pds.test".into(), session) 622 + } 623 + 624 + #[tokio::test] 625 + async fn refresh_on_expired_token_then_retry() { 626 + let mock = MockTransport::new(); 627 + // First request: expired token 628 + mock.enqueue(expired_token_response()); 629 + // Refresh succeeds 630 + mock.enqueue(refresh_session_response()); 631 + // Retry succeeds 632 + mock.enqueue(success_response(r#"{"records":[]}"#)); 633 + 634 + let mut client = mock_client(mock.clone()); 635 + let page = client 636 + .list_records("app.opake.cloud.document", Some(100), None) 637 + .await 638 + .unwrap(); 639 + 640 + assert!(page.records.is_empty()); 641 + assert!(client.session_refreshed()); 642 + 643 + let session = client.session().unwrap(); 644 + assert_eq!(session.access_jwt, "fresh-access-jwt"); 645 + assert_eq!(session.refresh_jwt, "fresh-refresh-jwt"); 646 + 647 + // Verify: 3 requests — original, refresh, retry 648 + let reqs = mock.requests(); 649 + assert_eq!(reqs.len(), 3); 650 + assert!(reqs[0].url.contains("listRecords")); 651 + assert!(reqs[1].url.contains("refreshSession")); 652 + assert!(reqs[2].url.contains("listRecords")); 653 + 654 + // Retry used the new token 655 + let retry_auth = reqs[2] 656 + .headers 657 + .iter() 658 + .find(|(k, _)| k == "Authorization") 659 + .unwrap(); 660 + assert_eq!(retry_auth.1, "Bearer fresh-access-jwt"); 661 + } 662 + 663 + #[tokio::test] 664 + async fn refresh_failure_propagates_error() { 665 + let mock = MockTransport::new(); 666 + mock.enqueue(expired_token_response()); 667 + // Refresh fails 668 + mock.enqueue(HttpResponse { 669 + status: 401, 670 + body: br#"{"error":"InvalidToken","message":"bad refresh token"}"#.to_vec(), 671 + }); 672 + 673 + let mut client = mock_client(mock); 674 + let err = client 675 + .list_records("app.opake.cloud.document", Some(100), None) 676 + .await 677 + .unwrap_err(); 678 + 679 + assert!(err.to_string().contains("session refresh failed")); 680 + assert!(err.to_string().contains("opake login")); 681 + assert!(!client.session_refreshed()); 682 + } 683 + 684 + #[tokio::test] 685 + async fn non_expired_error_passes_through() { 686 + let mock = MockTransport::new(); 687 + mock.enqueue(HttpResponse { 688 + status: 500, 689 + body: br#"{"error":"InternalServerError","message":"oops"}"#.to_vec(), 690 + }); 691 + 692 + let mut client = mock_client(mock); 693 + let err = client 694 + .list_records("app.opake.cloud.document", Some(100), None) 695 + .await 696 + .unwrap_err(); 697 + 698 + assert!(matches!(err, Error::Xrpc { status: 500, .. })); 699 + assert!(!client.session_refreshed()); 700 + } 701 + 702 + #[test] 703 + fn is_expired_token_detects_correctly() { 704 + assert!(XrpcClient::<MockTransport>::is_expired_token( 705 + &expired_token_response() 706 + )); 707 + } 708 + 709 + #[test] 710 + fn is_expired_token_rejects_other_400() { 711 + let r = response(400, r#"{"error":"InvalidRequest"}"#); 712 + assert!(!XrpcClient::<MockTransport>::is_expired_token(&r)); 713 + } 714 + 715 + #[test] 716 + fn is_expired_token_rejects_500() { 717 + let r = response(500, r#"{"error":"ExpiredToken"}"#); 718 + assert!(!XrpcClient::<MockTransport>::is_expired_token(&r)); 719 + } 720 + 721 + #[test] 722 + fn is_expired_token_rejects_no_json() { 723 + let r = response(400, "not json"); 724 + assert!(!XrpcClient::<MockTransport>::is_expired_token(&r)); 511 725 } 512 726 }
+16 -13
crates/opake-core/src/documents/delete.rs
··· 10 10 /// `app.opake.cloud.document` to prevent accidental deletion of other 11 11 /// record types. The blob becomes orphaned and will eventually be 12 12 /// garbage-collected by the PDS. 13 - pub async fn delete_document(client: &XrpcClient<impl Transport>, uri: &str) -> Result<(), Error> { 13 + pub async fn delete_document( 14 + client: &mut XrpcClient<impl Transport>, 15 + uri: &str, 16 + ) -> Result<(), Error> { 14 17 let at_uri = atproto::parse_at_uri(uri)?; 15 18 16 19 if at_uri.collection != DOCUMENT_COLLECTION { ··· 44 47 body: b"{}".to_vec(), 45 48 }); 46 49 47 - let client = mock_client(mock.clone()); 50 + let mut client = mock_client(mock.clone()); 48 51 let uri = format!("at://{}/app.opake.cloud.document/abc123", TEST_DID); 49 - delete_document(&client, &uri).await.unwrap(); 52 + delete_document(&mut client, &uri).await.unwrap(); 50 53 51 54 let requests = mock.requests(); 52 55 assert_eq!(requests.len(), 1); ··· 65 68 #[tokio::test] 66 69 async fn rejects_grant_uri() { 67 70 let mock = MockTransport::new(); 68 - let client = mock_client(mock); 71 + let mut client = mock_client(mock); 69 72 let uri = format!("at://{}/app.opake.cloud.grant/abc123", TEST_DID); 70 - let err = delete_document(&client, &uri).await.unwrap_err(); 73 + let err = delete_document(&mut client, &uri).await.unwrap_err(); 71 74 assert!( 72 75 err.to_string().contains("expected a document URI"), 73 76 "got: {err}" ··· 77 80 #[tokio::test] 78 81 async fn rejects_arbitrary_collection() { 79 82 let mock = MockTransport::new(); 80 - let client = mock_client(mock); 83 + let mut client = mock_client(mock); 81 84 let uri = format!("at://{}/app.bsky.feed.post/abc123", TEST_DID); 82 - let err = delete_document(&client, &uri).await.unwrap_err(); 85 + let err = delete_document(&mut client, &uri).await.unwrap_err(); 83 86 assert!( 84 87 err.to_string().contains("expected a document URI"), 85 88 "got: {err}" ··· 89 92 #[tokio::test] 90 93 async fn rejects_invalid_uri() { 91 94 let mock = MockTransport::new(); 92 - let client = mock_client(mock); 93 - let err = delete_document(&client, "not-a-uri").await.unwrap_err(); 95 + let mut client = mock_client(mock); 96 + let err = delete_document(&mut client, "not-a-uri").await.unwrap_err(); 94 97 assert!(err.to_string().contains("AT-URI"), "got: {err}"); 95 98 } 96 99 ··· 102 105 body: br#"{"error":"RecordNotFound","message":"no such record"}"#.to_vec(), 103 106 }); 104 107 105 - let client = mock_client(mock); 108 + let mut client = mock_client(mock); 106 109 let uri = format!("at://{}/app.opake.cloud.document/gone", TEST_DID); 107 - let err = delete_document(&client, &uri).await.unwrap_err(); 110 + let err = delete_document(&mut client, &uri).await.unwrap_err(); 108 111 assert!(matches!(err, Error::NotFound(_))); 109 112 } 110 113 ··· 116 119 body: br#"{"error":"InternalServerError","message":"storage error"}"#.to_vec(), 117 120 }); 118 121 119 - let client = mock_client(mock); 122 + let mut client = mock_client(mock); 120 123 let uri = format!("at://{}/app.opake.cloud.document/abc", TEST_DID); 121 - let err = delete_document(&client, &uri).await.unwrap_err(); 124 + let err = delete_document(&mut client, &uri).await.unwrap_err(); 122 125 assert!(matches!(err, Error::Xrpc { .. })); 123 126 } 124 127 }
+19 -19
crates/opake-core/src/documents/download.rs
··· 10 10 /// Fetch a document record and its encrypted blob, then decrypt. 11 11 /// Returns `(filename, plaintext_bytes)`. 12 12 pub async fn download_and_decrypt( 13 - client: &XrpcClient<impl Transport>, 13 + client: &mut XrpcClient<impl Transport>, 14 14 did: &str, 15 15 private_key: &[u8; 32], 16 16 uri: &str, ··· 158 158 mock.enqueue(record_response(&doc)); 159 159 mock.enqueue(blob_response(&fixture.ciphertext)); 160 160 161 - let client = mock_client(mock.clone()); 162 - let (name, decrypted) = download_and_decrypt(&client, TEST_DID, &private_key, TEST_URI) 161 + let mut client = mock_client(mock.clone()); 162 + let (name, decrypted) = download_and_decrypt(&mut client, TEST_DID, &private_key, TEST_URI) 163 163 .await 164 164 .unwrap(); 165 165 ··· 182 182 mock.enqueue(record_response(&doc)); 183 183 mock.enqueue(blob_response(&fixture.ciphertext)); 184 184 185 - let client = mock_client(mock); 186 - let (_, decrypted) = download_and_decrypt(&client, TEST_DID, &private_key, TEST_URI) 185 + let mut client = mock_client(mock); 186 + let (_, decrypted) = download_and_decrypt(&mut client, TEST_DID, &private_key, TEST_URI) 187 187 .await 188 188 .unwrap(); 189 189 assert!(decrypted.is_empty()); ··· 198 198 let mock = MockTransport::new(); 199 199 mock.enqueue(record_response(&doc)); 200 200 201 - let client = mock_client(mock); 202 - let err = download_and_decrypt(&client, "did:plc:wrong", &private_key, TEST_URI) 201 + let mut client = mock_client(mock); 202 + let err = download_and_decrypt(&mut client, "did:plc:wrong", &private_key, TEST_URI) 203 203 .await 204 204 .unwrap_err(); 205 205 assert!( ··· 243 243 }); 244 244 245 245 let (_, private_key) = test_keypair(); 246 - let client = mock_client(mock); 247 - let err = download_and_decrypt(&client, TEST_DID, &private_key, TEST_URI) 246 + let mut client = mock_client(mock); 247 + let err = download_and_decrypt(&mut client, TEST_DID, &private_key, TEST_URI) 248 248 .await 249 249 .unwrap_err(); 250 250 assert!(err.to_string().contains("keyring"), "got: {err}"); ··· 259 259 }); 260 260 261 261 let (_, private_key) = test_keypair(); 262 - let client = mock_client(mock); 263 - let err = download_and_decrypt(&client, TEST_DID, &private_key, TEST_URI) 262 + let mut client = mock_client(mock); 263 + let err = download_and_decrypt(&mut client, TEST_DID, &private_key, TEST_URI) 264 264 .await 265 265 .unwrap_err(); 266 266 assert!(matches!(err, Error::NotFound(_))); ··· 279 279 body: br#"{"error":"InternalServerError","message":"blob storage error"}"#.to_vec(), 280 280 }); 281 281 282 - let client = mock_client(mock); 283 - let err = download_and_decrypt(&client, TEST_DID, &private_key, TEST_URI) 282 + let mut client = mock_client(mock); 283 + let err = download_and_decrypt(&mut client, TEST_DID, &private_key, TEST_URI) 284 284 .await 285 285 .unwrap_err(); 286 286 assert!(matches!(err, Error::Xrpc { .. })); ··· 296 296 let mock = MockTransport::new(); 297 297 mock.enqueue(record_response(&doc)); 298 298 299 - let client = mock_client(mock); 300 - let err = download_and_decrypt(&client, TEST_DID, &private_key, TEST_URI) 299 + let mut client = mock_client(mock); 300 + let err = download_and_decrypt(&mut client, TEST_DID, &private_key, TEST_URI) 301 301 .await 302 302 .unwrap_err(); 303 303 assert!(err.to_string().contains("schema version"), "got: {err}"); ··· 307 307 async fn rejects_invalid_uri() { 308 308 let (_, private_key) = test_keypair(); 309 309 let mock = MockTransport::new(); 310 - let client = mock_client(mock); 311 - let err = download_and_decrypt(&client, TEST_DID, &private_key, "not-a-uri") 310 + let mut client = mock_client(mock); 311 + let err = download_and_decrypt(&mut client, TEST_DID, &private_key, "not-a-uri") 312 312 .await 313 313 .unwrap_err(); 314 314 assert!(err.to_string().contains("AT-URI"), "got: {err}"); ··· 324 324 let mock = MockTransport::new(); 325 325 mock.enqueue(record_response(&doc)); 326 326 327 - let client = mock_client(mock); 328 - let err = download_and_decrypt(&client, TEST_DID, &wrong_private_key, TEST_URI) 327 + let mut client = mock_client(mock); 328 + let err = download_and_decrypt(&mut client, TEST_DID, &wrong_private_key, TEST_URI) 329 329 .await 330 330 .unwrap_err(); 331 331 // Wrong key produces either a KeyWrap or Decryption error depending
+15 -15
crates/opake-core/src/documents/list.rs
··· 21 21 /// Silently skips records that can't be parsed or have an unsupported 22 22 /// schema version — these are expected when upgrading clients. 23 23 pub async fn list_documents( 24 - client: &XrpcClient<impl Transport>, 24 + client: &mut XrpcClient<impl Transport>, 25 25 ) -> Result<Vec<DocumentEntry>, Error> { 26 26 let mut entries = Vec::new(); 27 27 let mut cursor: Option<String> = None; ··· 83 83 let mock = MockTransport::new(); 84 84 mock.enqueue(list_records_response(&[("abc", doc)], None)); 85 85 86 - let client = mock_client(mock.clone()); 87 - let entries = list_documents(&client).await.unwrap(); 86 + let mut client = mock_client(mock.clone()); 87 + let entries = list_documents(&mut client).await.unwrap(); 88 88 89 89 assert_eq!(entries.len(), 1); 90 90 assert_eq!(entries[0].name, "notes.txt"); ··· 113 113 let mock = MockTransport::new(); 114 114 mock.enqueue(list_records_response(&docs, None)); 115 115 116 - let client = mock_client(mock); 117 - let entries = list_documents(&client).await.unwrap(); 116 + let mut client = mock_client(mock); 117 + let entries = list_documents(&mut client).await.unwrap(); 118 118 119 119 assert_eq!(entries.len(), 3); 120 120 assert_eq!(entries[0].name, "photo.jpg"); ··· 135 135 None, 136 136 )); 137 137 138 - let client = mock_client(mock.clone()); 139 - let entries = list_documents(&client).await.unwrap(); 138 + let mut client = mock_client(mock.clone()); 139 + let entries = list_documents(&mut client).await.unwrap(); 140 140 141 141 assert_eq!(entries.len(), 2); 142 142 assert_eq!(entries[0].name, "file1.txt"); ··· 152 152 let mock = MockTransport::new(); 153 153 mock.enqueue(list_records_response(&[], None)); 154 154 155 - let client = mock_client(mock); 156 - let entries = list_documents(&client).await.unwrap(); 155 + let mut client = mock_client(mock); 156 + let entries = list_documents(&mut client).await.unwrap(); 157 157 assert!(entries.is_empty()); 158 158 } 159 159 ··· 180 180 body: serde_json::to_vec(&body).unwrap(), 181 181 }); 182 182 183 - let client = mock_client(mock); 184 - let entries = list_documents(&client).await.unwrap(); 183 + let mut client = mock_client(mock); 184 + let entries = list_documents(&mut client).await.unwrap(); 185 185 186 186 assert_eq!(entries.len(), 1); 187 187 assert_eq!(entries[0].name, "good.txt"); ··· 195 195 let mock = MockTransport::new(); 196 196 mock.enqueue(list_records_response(&[("f1", doc)], None)); 197 197 198 - let client = mock_client(mock); 199 - let entries = list_documents(&client).await.unwrap(); 198 + let mut client = mock_client(mock); 199 + let entries = list_documents(&mut client).await.unwrap(); 200 200 assert!(entries.is_empty()); 201 201 } 202 202 ··· 208 208 body: br#"{"error":"InternalServerError","message":"something broke"}"#.to_vec(), 209 209 }); 210 210 211 - let client = mock_client(mock); 212 - let err = list_documents(&client).await.unwrap_err(); 211 + let mut client = mock_client(mock); 212 + let err = list_documents(&mut client).await.unwrap_err(); 213 213 assert!(matches!(err, Error::Xrpc { .. })); 214 214 } 215 215 }
+16 -13
crates/opake-core/src/documents/resolve.rs
··· 10 10 /// matching entry. Exactly one match is required — zero or multiple matches 11 11 /// are errors. 12 12 pub async fn resolve_uri( 13 - client: &XrpcClient<impl Transport>, 13 + client: &mut XrpcClient<impl Transport>, 14 14 reference: &str, 15 15 ) -> Result<String, Error> { 16 16 if reference.starts_with("at://") { ··· 47 47 #[tokio::test] 48 48 async fn passthrough_at_uri() { 49 49 let mock = MockTransport::new(); 50 - let client = mock_client(mock); 50 + let mut client = mock_client(mock); 51 51 52 - let uri = resolve_uri(&client, "at://did:plc:test/app.opake.cloud.document/abc") 53 - .await 54 - .unwrap(); 52 + let uri = resolve_uri( 53 + &mut client, 54 + "at://did:plc:test/app.opake.cloud.document/abc", 55 + ) 56 + .await 57 + .unwrap(); 55 58 assert_eq!(uri, "at://did:plc:test/app.opake.cloud.document/abc"); 56 59 } 57 60 ··· 66 69 None, 67 70 )); 68 71 69 - let client = mock_client(mock); 70 - let uri = resolve_uri(&client, "photo.jpg").await.unwrap(); 72 + let mut client = mock_client(mock); 73 + let uri = resolve_uri(&mut client, "photo.jpg").await.unwrap(); 71 74 assert!(uri.contains("a2")); 72 75 } 73 76 ··· 79 82 None, 80 83 )); 81 84 82 - let client = mock_client(mock); 83 - let err = resolve_uri(&client, "missing.pdf").await.unwrap_err(); 85 + let mut client = mock_client(mock); 86 + let err = resolve_uri(&mut client, "missing.pdf").await.unwrap_err(); 84 87 let msg = err.to_string(); 85 88 assert!(msg.contains("no document named"), "got: {msg}"); 86 89 assert!(msg.contains("opake ls"), "should suggest ls, got: {msg}"); ··· 97 100 None, 98 101 )); 99 102 100 - let client = mock_client(mock); 101 - let err = resolve_uri(&client, "report.pdf").await.unwrap_err(); 103 + let mut client = mock_client(mock); 104 + let err = resolve_uri(&mut client, "report.pdf").await.unwrap_err(); 102 105 let msg = err.to_string(); 103 106 assert!(msg.contains("report.pdf"), "got: {msg}"); 104 107 assert!(msg.contains("2"), "should mention count, got: {msg}"); ··· 109 112 let mock = MockTransport::new(); 110 113 mock.enqueue(list_records_response(&[], None)); 111 114 112 - let client = mock_client(mock); 113 - let err = resolve_uri(&client, "anything.txt").await.unwrap_err(); 115 + let mut client = mock_client(mock); 116 + let err = resolve_uri(&mut client, "anything.txt").await.unwrap_err(); 114 117 assert!(err.to_string().contains("no document named")); 115 118 } 116 119 }
+13 -13
crates/opake-core/src/documents/upload.rs
··· 30 30 /// The caller is responsible for reading the file from disk, detecting the MIME 31 31 /// type, and extracting the filename — this function is platform-agnostic. 32 32 pub async fn encrypt_and_upload( 33 - client: &XrpcClient<impl Transport>, 33 + client: &mut XrpcClient<impl Transport>, 34 34 params: &UploadParams<'_>, 35 35 rng: &mut (impl CryptoRng + RngCore), 36 36 ) -> Result<String, Error> { ··· 157 157 mock.enqueue(upload_blob_response()); 158 158 mock.enqueue(create_record_response()); 159 159 160 - let client = mock_client(mock.clone()); 160 + let mut client = mock_client(mock.clone()); 161 161 let params = test_params( 162 162 b"hello world", 163 163 "hello.txt", 164 164 &public_key, 165 165 vec!["test".into()], 166 166 ); 167 - let uri = encrypt_and_upload(&client, &params, &mut OsRng) 167 + let uri = encrypt_and_upload(&mut client, &params, &mut OsRng) 168 168 .await 169 169 .unwrap(); 170 170 ··· 202 202 async fn rejects_oversized_blob() { 203 203 let (public_key, _) = test_keypair(); 204 204 let mock = MockTransport::new(); 205 - let client = mock_client(mock); 205 + let mut client = mock_client(mock); 206 206 207 207 let oversized = vec![0u8; MAX_BLOB_SIZE + 1]; 208 208 let params = test_params(&oversized, "big.bin", &public_key, vec![]); 209 - let err = encrypt_and_upload(&client, &params, &mut OsRng) 209 + let err = encrypt_and_upload(&mut client, &params, &mut OsRng) 210 210 .await 211 211 .unwrap_err(); 212 212 ··· 220 220 mock.enqueue(upload_blob_response()); 221 221 mock.enqueue(create_record_response()); 222 222 223 - let client = mock_client(mock); 223 + let mut client = mock_client(mock); 224 224 let params = test_params(b"", "empty.txt", &public_key, vec![]); 225 - let uri = encrypt_and_upload(&client, &params, &mut OsRng) 225 + let uri = encrypt_and_upload(&mut client, &params, &mut OsRng) 226 226 .await 227 227 .unwrap(); 228 228 ··· 238 238 body: br#"{"error":"InternalServerError","message":"blob storage down"}"#.to_vec(), 239 239 }); 240 240 241 - let client = mock_client(mock); 241 + let mut client = mock_client(mock); 242 242 let params = test_params(b"data", "file.bin", &public_key, vec![]); 243 - let err = encrypt_and_upload(&client, &params, &mut OsRng) 243 + let err = encrypt_and_upload(&mut client, &params, &mut OsRng) 244 244 .await 245 245 .unwrap_err(); 246 246 ··· 257 257 body: br#"{"error":"InternalServerError","message":"record write failed"}"#.to_vec(), 258 258 }); 259 259 260 - let client = mock_client(mock); 260 + let mut client = mock_client(mock); 261 261 let params = test_params(b"data", "file.bin", &public_key, vec![]); 262 - let err = encrypt_and_upload(&client, &params, &mut OsRng) 262 + let err = encrypt_and_upload(&mut client, &params, &mut OsRng) 263 263 .await 264 264 .unwrap_err(); 265 265 ··· 275 275 mock.enqueue(upload_blob_response()); 276 276 mock.enqueue(create_record_response()); 277 277 278 - let client = mock_client(mock.clone()); 278 + let mut client = mock_client(mock.clone()); 279 279 let params = test_params(plaintext, "roundtrip.txt", &public_key, vec![]); 280 - encrypt_and_upload(&client, &params, &mut OsRng) 280 + encrypt_and_upload(&mut client, &params, &mut OsRng) 281 281 .await 282 282 .unwrap(); 283 283