An easy-to-host PDS on the ATProtocol, iPhone and MacOS. Maintain control of your keys and data, always.
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: address PR review feedback for refreshSession

- Non-fatal handle lookup after token rotation (prevents replay lockout)
- rows_affected check on rotation UPDATE for concurrency safety
- jti and structured context added to rotation/replay tracing logs
- audience-absent warning in verify_refresh_token matches access token path
- Improved doc comments on RefreshTokenClaims and verify_refresh_token
- deduplicate create_session test helpers via routes::test_utils
- New tests: chained rotation, handle.invalid fallback, DB-deleted token,
specific HTTP status and error code assertions on all error paths
- Update crates/relay/CLAUDE.md: add refresh_session.rs route entry and
expand jwt.rs description to include token issuance and refresh verification

authored by

Malpercio and committed by
Tangled
8a7ff9cf e1726c69

+165 -79
+3 -2
crates/relay/CLAUDE.md
··· 1 1 # Relay Crate 2 2 3 - Last verified: 2026-03-24 3 + Last verified: 2026-03-25 4 4 5 5 ## Purpose 6 6 ··· 27 27 |---|---|---| 28 28 | `dpop.rs` | Mixed (unavoidable) | DPoP proof validation, nonce store | 29 29 | `extractors.rs` | Imperative Shell | `AuthenticatedUser` axum extractor | 30 - | `jwt.rs` | Functional Core | JWT parsing, scope validation, access token verification | 30 + | `jwt.rs` | Functional Core | JWT parsing, scope validation, access/refresh token verification, HS256 token issuance | 31 31 | `password.rs` | Functional Core | `hash_password`, `verify_password` (argon2id) | 32 32 | `rate_limit.rs` | Functional Core | Sliding-window login-failure rate limiter | 33 33 | `signing_key.rs` | Imperative Shell | ES256 signing key load-or-create | ··· 66 66 | `oauth_templates.rs` | Pure HTML rendering helpers (Functional Core, no handler) | 67 67 | `create_session.rs` | `POST /xrpc/com.atproto.server.createSession` | 68 68 | `get_session.rs` | `GET /xrpc/com.atproto.server.getSession` | 69 + | `refresh_session.rs` | `POST /xrpc/com.atproto.server.refreshSession` | 69 70 | `create_did.rs` | `POST /v1/dids` | 70 71 | `create_account.rs` | `POST /v1/accounts` | 71 72 | `create_handle.rs` | `POST /v1/handles` |
+11 -6
crates/relay/src/auth/jwt.rs
··· 154 154 155 155 /// Claims decoded from a refresh JWT (scope: com.atproto.refresh). 156 156 /// 157 - /// `sub` is intentionally omitted — the authoritative DID is read from the DB 158 - /// after confirming the token exists, rather than trusting the JWT claim directly. 159 - /// JWT library still enforces `sub` presence via `set_required_spec_claims`. 157 + /// `sub` is present in the JWT payload but intentionally not decoded here: 158 + /// the library enforces its presence via `set_required_spec_claims`, and the 159 + /// authoritative DID is read from the DB row after the token is confirmed to 160 + /// exist — never trusted directly from the JWT claim. 160 161 #[derive(Debug, Deserialize)] 161 162 pub(crate) struct RefreshTokenClaims { 162 163 pub scope: String, 163 164 /// Token ID embedded in the JWT and stored in `refresh_tokens.jti`. 164 - /// `None` when an access token (no `jti`) is mistakenly presented here. 165 + /// `None` when an access token (which has no `jti`) is mistakenly presented. 165 166 pub jti: Option<String>, 166 167 } 167 168 ··· 172 173 /// for that check so that the error message can be precise. 173 174 pub fn verify_refresh_token( 174 175 token: &str, 175 - state: &crate::app::AppState, 176 + state: &AppState, 176 177 ) -> Result<RefreshTokenClaims, ApiError> { 177 178 let decoding_key = DecodingKey::from_secret(&state.jwt_secret); 178 179 let mut validation = Validation::new(Algorithm::HS256); ··· 180 181 Some(did) => validation.set_audience(&[did]), 181 182 None => { 182 183 validation.validate_aud = false; 184 + tracing::warn!( 185 + "server_did not configured; JWT audience validation is disabled — \ 186 + set server_did in config for production deployments" 187 + ); 183 188 } 184 189 } 185 190 validation.set_required_spec_claims(&["exp", "sub"]); ··· 194 199 ApiError::new(ErrorCode::TokenExpired, "token has expired") 195 200 } 196 201 _ => { 197 - tracing::debug!(error = %e, error_kind = ?e.kind(), "refresh token verification failed"); 202 + tracing::warn!(error = %e, error_kind = ?e.kind(), "refresh token verification failed"); 198 203 ApiError::new(ErrorCode::InvalidToken, "invalid token") 199 204 } 200 205 }
+1 -43
crates/relay/src/routes/create_session.rs
··· 209 209 210 210 #[cfg(test)] 211 211 mod tests { 212 - use argon2::{ 213 - password_hash::{rand_core::OsRng, SaltString}, 214 - Argon2, PasswordHasher, 215 - }; 216 212 use axum::{ 217 213 body::Body, 218 214 http::{Request, StatusCode}, ··· 221 217 222 218 use crate::app::{app, test_state}; 223 219 use crate::auth::rate_limit::RATE_LIMIT_MAX_FAILURES; 220 + use crate::routes::test_utils::{body_json, insert_account_with_password}; 224 221 225 222 // ── Helpers ─────────────────────────────────────────────────────────────── 226 223 ··· 233 230 r#"{{"identifier":"{identifier}","password":"{password}"}}"# 234 231 ))) 235 232 .unwrap() 236 - } 237 - 238 - async fn insert_account_with_password( 239 - db: &sqlx::SqlitePool, 240 - did: &str, 241 - handle: &str, 242 - email: &str, 243 - password: &str, 244 - ) { 245 - let salt = SaltString::generate(&mut OsRng); 246 - let hash = Argon2::default() 247 - .hash_password(password.as_bytes(), &salt) 248 - .unwrap() 249 - .to_string(); 250 - 251 - sqlx::query( 252 - "INSERT INTO accounts (did, email, password_hash, created_at, updated_at) \ 253 - VALUES (?, ?, ?, datetime('now'), datetime('now'))", 254 - ) 255 - .bind(did) 256 - .bind(email) 257 - .bind(&hash) 258 - .execute(db) 259 - .await 260 - .unwrap(); 261 - 262 - sqlx::query("INSERT INTO handles (handle, did, created_at) VALUES (?, ?, datetime('now'))") 263 - .bind(handle) 264 - .bind(did) 265 - .execute(db) 266 - .await 267 - .unwrap(); 268 - } 269 - 270 - async fn body_json(response: axum::response::Response) -> serde_json::Value { 271 - let bytes = axum::body::to_bytes(response.into_body(), usize::MAX) 272 - .await 273 - .unwrap(); 274 - serde_json::from_slice(&bytes).unwrap() 275 233 } 276 234 277 235 // ── Happy path ────────────────────────────────────────────────────────────
+150 -28
crates/relay/src/routes/refresh_session.rs
··· 35 35 /// POST /xrpc/com.atproto.server.refreshSession 36 36 /// 37 37 /// Exchanges a refresh JWT for a new access + refresh token pair. 38 - /// Implements token rotation: the old refresh token is invalidated on first use 39 - /// and a new one is issued. Replay detection: if an already-rotated refresh token 40 - /// is presented, the entire session is revoked as a security measure. 38 + /// Token rotation: the old refresh token is marked as used (via next_jti) on 39 + /// first use and a new one is issued. Replay detection: if an already-rotated 40 + /// refresh token is presented, the entire session is revoked as a security measure. 41 41 pub async fn refresh_session( 42 42 State(state): State<AppState>, 43 43 headers: HeaderMap, ··· 110 110 tracing::warn!( 111 111 did = %did, 112 112 session_id = %session_id, 113 + jti = %jti, 113 114 "refresh token replay detected; session revoked" 114 115 ); 115 116 return Err(ApiError::new( ··· 140 141 141 142 // --- Atomically rotate: insert new token, mark old as used --- 142 143 let mut tx = state.db.begin().await.map_err(|e| { 143 - tracing::error!(error = %e, "failed to begin token rotation transaction"); 144 + tracing::error!(error = %e, did = %did, session_id = %session_id, jti = %jti, "failed to begin token rotation transaction"); 144 145 ApiError::new(ErrorCode::InternalError, "failed to refresh session") 145 146 })?; 146 147 ··· 154 155 .execute(&mut *tx) 155 156 .await 156 157 .map_err(|e| { 157 - tracing::error!(error = %e, "failed to insert new refresh token"); 158 + tracing::error!(error = %e, did = %did, session_id = %session_id, jti = %jti, "failed to insert new refresh token"); 158 159 ApiError::new(ErrorCode::InternalError, "failed to refresh session") 159 160 })?; 160 161 161 - sqlx::query("UPDATE refresh_tokens SET next_jti = ? WHERE jti = ?") 162 + let updated = sqlx::query("UPDATE refresh_tokens SET next_jti = ? WHERE jti = ?") 162 163 .bind(&new_refresh_jti) 163 164 .bind(&jti) 164 165 .execute(&mut *tx) 165 166 .await 166 167 .map_err(|e| { 167 - tracing::error!(error = %e, "failed to mark old refresh token as used"); 168 + tracing::error!(error = %e, did = %did, session_id = %session_id, jti = %jti, "failed to mark old refresh token as used"); 168 169 ApiError::new(ErrorCode::InternalError, "failed to refresh session") 169 170 })?; 170 171 172 + if updated.rows_affected() != 1 { 173 + tracing::error!( 174 + did = %did, 175 + session_id = %session_id, 176 + jti = %jti, 177 + rows = updated.rows_affected(), 178 + "rotation UPDATE affected unexpected row count; token may have been deleted concurrently" 179 + ); 180 + return Err(ApiError::new( 181 + ErrorCode::InternalError, 182 + "failed to refresh session", 183 + )); 184 + } 185 + 171 186 tx.commit().await.map_err(|e| { 172 - tracing::error!(error = %e, "failed to commit token rotation transaction"); 187 + tracing::error!(error = %e, did = %did, session_id = %session_id, jti = %jti, "failed to commit token rotation transaction"); 173 188 ApiError::new(ErrorCode::InternalError, "failed to refresh session") 174 189 })?; 175 190 176 191 // --- Look up handle for the response --- 177 - let handle: Option<String> = sqlx::query_scalar( 192 + // Non-fatal: token rotation already committed. A handle lookup failure must not 193 + // force the client to retry with the old token (which would trigger replay detection). 194 + let handle = sqlx::query_scalar::<_, String>( 178 195 "SELECT h.handle FROM handles h WHERE h.did = ? LIMIT 1", 179 196 ) 180 197 .bind(&did) 181 198 .fetch_optional(&state.db) 182 199 .await 183 - .map_err(|e| { 184 - tracing::error!(error = %e, did = %did, "DB error fetching handle for refreshSession"); 185 - ApiError::new(ErrorCode::InternalError, "internal error") 186 - })?; 200 + .unwrap_or_else(|e| { 201 + tracing::warn!(error = %e, did = %did, "handle lookup failed after token rotation; using handle.invalid"); 202 + None 203 + }); 187 204 188 205 // ATProto spec: "handle.invalid" is the sentinel for accounts without a resolvable handle. 189 206 let handle = handle.unwrap_or_else(|| "handle.invalid".to_string()); ··· 417 434 assert!(new_exists.is_some(), "new refresh token must be persisted in DB"); 418 435 } 419 436 420 - // ── Replay detection (AC2 + AC3) ────────────────────────────────────────── 437 + #[tokio::test] 438 + async fn second_rotation_succeeds_with_new_token() { 439 + let state = test_state().await; 440 + insert_account_with_password( 441 + &state.db, 442 + "did:plc:chain", 443 + "chain.test.example.com", 444 + "chain@example.com", 445 + "hunter2", 446 + ) 447 + .await; 448 + 449 + // First rotation. 450 + let tokens1 = create_session_tokens(&state, "did:plc:chain", "hunter2").await; 451 + let refresh_jwt1 = tokens1["refreshJwt"].as_str().unwrap().to_string(); 452 + let resp1 = app(state.clone()) 453 + .oneshot(post_refresh_session(&refresh_jwt1)) 454 + .await 455 + .unwrap(); 456 + assert_eq!(resp1.status(), StatusCode::OK); 457 + let json1 = body_json(resp1).await; 458 + let refresh_jwt2 = json1["refreshJwt"].as_str().unwrap().to_string(); 459 + 460 + // Second rotation using the token issued by the first rotation. 461 + let resp2 = app(state) 462 + .oneshot(post_refresh_session(&refresh_jwt2)) 463 + .await 464 + .unwrap(); 465 + assert_eq!(resp2.status(), StatusCode::OK); 466 + let json2 = body_json(resp2).await; 467 + assert!(json2["accessJwt"].as_str().is_some(), "second rotation must issue new accessJwt"); 468 + assert!(json2["refreshJwt"].as_str().is_some(), "second rotation must issue new refreshJwt"); 469 + } 470 + 471 + #[tokio::test] 472 + async fn account_without_handle_returns_handle_invalid() { 473 + let state = test_state().await; 474 + // Insert account with a handle, then delete the handle to simulate no-handle account. 475 + insert_account_with_password( 476 + &state.db, 477 + "did:plc:nohandle", 478 + "nohandle.test.example.com", 479 + "nohandle@example.com", 480 + "hunter2", 481 + ) 482 + .await; 483 + sqlx::query("DELETE FROM handles WHERE did = 'did:plc:nohandle'") 484 + .execute(&state.db) 485 + .await 486 + .unwrap(); 487 + 488 + let tokens = create_session_tokens(&state, "did:plc:nohandle", "hunter2").await; 489 + let refresh_jwt = tokens["refreshJwt"].as_str().unwrap().to_string(); 490 + 491 + let response = app(state) 492 + .oneshot(post_refresh_session(&refresh_jwt)) 493 + .await 494 + .unwrap(); 495 + 496 + assert_eq!(response.status(), StatusCode::OK); 497 + let json = body_json(response).await; 498 + assert_eq!(json["handle"], "handle.invalid"); 499 + } 500 + 501 + // ── Replay detection ────────────────────────────────────────────────────── 421 502 422 503 #[tokio::test] 423 504 async fn old_refresh_token_rejected_after_rotation() { ··· 446 527 .oneshot(post_refresh_session(&refresh_jwt)) 447 528 .await 448 529 .unwrap(); 449 - assert_ne!(replay.status(), StatusCode::OK, "replay must be rejected"); 530 + assert_eq!(replay.status(), StatusCode::UNAUTHORIZED, "replay must be rejected"); 531 + let json = body_json(replay).await; 532 + assert_eq!(json["error"]["code"], "INVALID_TOKEN"); 450 533 } 451 534 452 535 #[tokio::test] ··· 479 562 .unwrap(); 480 563 481 564 // Replay triggers full session revocation. 482 - app(state) 565 + let replay = app(state) 483 566 .oneshot(post_refresh_session(&refresh_jwt)) 484 567 .await 485 568 .unwrap(); 486 569 570 + assert_eq!(replay.status(), StatusCode::UNAUTHORIZED, "replay must return 401"); 571 + let json = body_json(replay).await; 572 + assert_eq!(json["error"]["code"], "INVALID_TOKEN"); 573 + 487 574 let session_count: i64 = 488 575 sqlx::query_scalar("SELECT COUNT(*) FROM sessions WHERE id = ?") 489 576 .bind(&session_id) ··· 504 591 // ── Error cases ─────────────────────────────────────────────────────────── 505 592 506 593 #[tokio::test] 507 - async fn expired_refresh_token_returns_error() { 594 + async fn expired_refresh_token_returns_401() { 508 595 let state = test_state().await; 509 596 insert_account_with_password( 510 597 &state.db, ··· 521 608 .await 522 609 .unwrap(); 523 610 524 - assert_ne!( 525 - response.status(), 526 - StatusCode::OK, 527 - "expired token must be rejected" 528 - ); 611 + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); 612 + let json = body_json(response).await; 613 + assert_eq!(json["error"]["code"], "TOKEN_EXPIRED"); 614 + } 615 + 616 + #[tokio::test] 617 + async fn token_deleted_from_db_returns_401() { 618 + let state = test_state().await; 619 + insert_account_with_password( 620 + &state.db, 621 + "did:plc:deleted", 622 + "deleted.test.example.com", 623 + "deleted@example.com", 624 + "hunter2", 625 + ) 626 + .await; 627 + 628 + let tokens = create_session_tokens(&state, "did:plc:deleted", "hunter2").await; 629 + let refresh_jwt = tokens["refreshJwt"].as_str().unwrap().to_string(); 630 + 631 + // Simulate out-of-band revocation by deleting the DB row directly. 632 + sqlx::query("DELETE FROM refresh_tokens WHERE did = 'did:plc:deleted'") 633 + .execute(&state.db) 634 + .await 635 + .unwrap(); 636 + 637 + let response = app(state) 638 + .oneshot(post_refresh_session(&refresh_jwt)) 639 + .await 640 + .unwrap(); 641 + 642 + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); 643 + let json = body_json(response).await; 644 + assert_eq!(json["error"]["code"], "INVALID_TOKEN"); 529 645 } 530 646 531 647 #[tokio::test] 532 - async fn invalid_token_signature_returns_error() { 648 + async fn invalid_token_signature_returns_401() { 533 649 let response = app(test_state().await) 534 650 .oneshot(post_refresh_session("not.a.valid.jwt")) 535 651 .await 536 652 .unwrap(); 537 653 538 - assert_ne!(response.status(), StatusCode::OK); 654 + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); 655 + let json = body_json(response).await; 656 + assert_eq!(json["error"]["code"], "INVALID_TOKEN"); 539 657 } 540 658 541 659 #[tokio::test] ··· 558 676 .await 559 677 .unwrap(); 560 678 561 - assert_ne!( 679 + assert_eq!( 562 680 response.status(), 563 - StatusCode::OK, 681 + StatusCode::UNAUTHORIZED, 564 682 "access token must be rejected at the refresh endpoint" 565 683 ); 684 + let json = body_json(response).await; 685 + assert_eq!(json["error"]["code"], "INVALID_TOKEN"); 566 686 } 567 687 568 688 #[tokio::test] 569 - async fn missing_authorization_header_returns_error() { 689 + async fn missing_authorization_header_returns_401() { 570 690 let request = Request::builder() 571 691 .method("POST") 572 692 .uri("/xrpc/com.atproto.server.refreshSession") ··· 575 695 576 696 let response = app(test_state().await).oneshot(request).await.unwrap(); 577 697 578 - assert_ne!(response.status(), StatusCode::OK); 698 + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); 699 + let json = body_json(response).await; 700 + assert_eq!(json["error"]["code"], "AUTHENTICATION_REQUIRED"); 579 701 } 580 702 }