don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(knot): store service auth JWT claims in database

Signed-off-by: tjh <x@tjh.dev>

tjh 9f304183 42ab59c6

+167 -80
+1
crates/knot/migrations/20260114093520_claims.down.sql
··· 1 + DROP TABLE claim;
+6
crates/knot/migrations/20260114093520_claims.up.sql
··· 1 + CREATE TABLE claim ( 2 + id text NOT NULL, 3 + claims jsonb NOT NULL, 4 + 5 + PRIMARY KEY (id) 6 + );
+13 -6
crates/knot/src/model.rs
··· 12 12 extract::{FromRef, FromRequestParts}, 13 13 http::request::Parts, 14 14 }; 15 + use futures_util::future::BoxFuture; 15 16 use git_service::state::GitServiceState; 16 17 use identity::Resolver; 17 18 use tokio::process::Command; ··· 20 19 use crate::{ 21 20 private, 22 21 public::git::{Error, GitAuthorization, NotFound}, 23 - services::authorization::AuthorizationClaimsStore, 22 + services::authorization::{AuthorizationClaimsStore, AuthorizationClaimsStoreError}, 24 23 types::repository_path::RepositoryPath, 25 24 }; 26 25 ··· 55 54 } 56 55 57 56 impl AuthorizationClaimsStore<auth::jwt::Claims> for Knot { 58 - #[inline] 59 - fn get_unexpired_claims(&self, jti: &str, now: i64) -> Option<auth::jwt::Claims> { 57 + fn get_unexpired_claims<'a: 'b, 'b>( 58 + &'a self, 59 + jti: &'b str, 60 + now: i64, 61 + ) -> BoxFuture<'b, Result<Option<auth::jwt::Claims>, AuthorizationClaimsStoreError>> { 60 62 self.inner.get_unexpired_claims(jti, now) 61 63 } 62 64 63 - #[inline] 64 - fn store_claims(&self, claims: auth::jwt::Claims) { 65 - self.inner.store_claims(claims); 65 + fn store_claims( 66 + &self, 67 + claims: auth::jwt::Claims, 68 + now: i64, 69 + ) -> BoxFuture<'_, Result<(), AuthorizationClaimsStoreError>> { 70 + self.inner.store_claims(claims, now) 66 71 } 67 72 } 68 73
+12
crates/knot/src/model/errors.rs
··· 110 110 } 111 111 } 112 112 } 113 + 114 + pub struct Internal<E: fmt::Display>(pub E); 115 + 116 + impl<E: fmt::Display> From<Internal<E>> for XrpcError { 117 + fn from(value: Internal<E>) -> Self { 118 + Self { 119 + status: StatusCode::INTERNAL_SERVER_ERROR, 120 + error: Cow::Borrowed("InternalServerError"), 121 + message: Cow::Owned(value.0.to_string()), 122 + } 123 + } 124 + }
+36 -51
crates/knot/src/model/knot_state.rs
··· 3 3 io, 4 4 net::{TcpListener, ToSocketAddrs as _}, 5 5 ops, 6 - sync::{Arc, Mutex, MutexGuard, RwLock}, 7 - time::Duration, 6 + sync::{Arc, Mutex, RwLock}, 8 7 }; 9 8 10 9 use atproto::{did::Did, tid::Tid}; 11 - use auth::jwt; 12 10 use bytes::Bytes; 11 + use futures_util::{FutureExt, future::BoxFuture}; 13 12 use identity::{HttpClient, Resolver}; 14 13 use jetstream::{JetstreamClient, client_config::JetstreamConfig}; 15 14 use lexicon::sh::tangled::{git::RefUpdate, repo::Repo}; ··· 21 22 use crate::{ 22 23 services::{ 23 24 atrepo, 24 - authorization::AuthorizationClaimsStore, 25 + authorization::{AuthorizationClaimsStore, AuthorizationClaimsStoreError}, 25 26 database::{DataStore, DataStoreError}, 26 27 }, 27 28 types::{repository_key::RepositoryKey, repository_path::RepositoryPath}, ··· 68 69 pool: ThreadPool, 69 70 70 71 events: tokio::sync::broadcast::Sender<(i64, OffsetDateTime, Event)>, 71 - 72 - /// Stores JWT claims to prevent re-use. 73 - jwt_claims: Mutex<HashMap<Box<str>, jwt::Claims>>, 74 72 75 73 /// Resolved repository path lookup cache. 76 74 /// ··· 124 128 store: database, 125 129 pool, 126 130 events, 127 - jwt_claims: Default::default(), 128 131 repo_cache: Default::default(), 129 132 push_seed: Default::default(), 130 133 private_addrs, ··· 146 151 let internal = crate::private::router().with_state(Arc::clone(&inner).into()); 147 152 tasks.spawn(crate::serve_all(internal, private_listeners)) 148 153 }; 149 - 150 - let state = Arc::clone(&inner); 151 - tasks.spawn(async move { 152 - use tokio_stream::wrappers::IntervalStream; 153 - 154 - const EXPIRY_SLOP: i64 = 60; 155 - 156 - let mut interval = tokio::time::interval(Duration::from_secs(120)); 157 - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 158 - 159 - let mut interval = IntervalStream::new(interval); 160 - while interval.next().await.is_some() { 161 - let now = OffsetDateTime::now_utc().unix_timestamp() + EXPIRY_SLOP; 162 - 163 - let mut store = state.jwt_claims.lock().unwrap(); 164 - let before = store.len(); 165 - store.retain(|_, claims| claims.exp > now); 166 - match before - store.len() { 167 - 0 => {} 168 - n => tracing::debug!("evicted {n} expired jti claims"), 169 - } 170 - } 171 - 172 - Ok(()) 173 - }); 174 154 175 155 Ok((inner, tasks)) 176 156 } ··· 178 208 #[inline] 179 209 pub fn store(&self) -> &DataStore { 180 210 &self.store 181 - } 182 - 183 - /// Acquire a lock on the jwt claims store. 184 - pub fn jwt_claims(&self) -> MutexGuard<'_, HashMap<Box<str>, jwt::Claims>> { 185 - self.jwt_claims.lock().unwrap() 186 211 } 187 212 188 213 pub async fn fetch_public_keys(&self, did: &Did) -> anyhow::Result<()> { ··· 503 538 } 504 539 505 540 impl AuthorizationClaimsStore<auth::jwt::Claims> for KnotState { 506 - #[inline] 507 - fn get_unexpired_claims(&self, jti: &str, now: i64) -> Option<auth::jwt::Claims> { 508 - let mut store = self.jwt_claims(); 509 - let claims = store.get(jti).cloned(); 541 + fn get_unexpired_claims<'a: 'b, 'b>( 542 + &'a self, 543 + jti: &'b str, 544 + now: i64, 545 + ) -> BoxFuture<'b, Result<Option<auth::jwt::Claims>, AuthorizationClaimsStoreError>> { 546 + async move { 547 + let claims = self.store().get_claims(jti, now).await.ok().flatten(); 510 548 511 - // If the claims have expired, remove them. 512 - if matches!(&claims, Some(claims) if claims.exp < now) { 513 - store.remove(jti); 514 - return None; 549 + // If the claims have expired, remove them. 550 + if matches!(&claims, Some(claims) if claims.exp < now) { 551 + self.store() 552 + .delete_claims(jti) 553 + .await 554 + .map_err(|error| AuthorizationClaimsStoreError(error.into()))?; 555 + 556 + return Ok(None); 557 + } 558 + 559 + Ok(claims) 515 560 } 516 - 517 - claims 561 + .boxed() 518 562 } 519 563 520 - #[inline] 521 - fn store_claims(&self, claims: auth::jwt::Claims) { 522 - self.jwt_claims().insert(claims.jti.clone(), claims); 564 + fn store_claims( 565 + &self, 566 + claims: auth::jwt::Claims, 567 + now: i64, 568 + ) -> BoxFuture<'_, Result<(), AuthorizationClaimsStoreError>> { 569 + async move { 570 + self.store() 571 + .store_claims(claims, now) 572 + .await 573 + .map_err(|error| AuthorizationClaimsStoreError(error.into()))?; 574 + 575 + Ok(()) 576 + } 577 + .boxed() 523 578 } 524 579 } 525 580
+6 -5
crates/knot/src/public/git/authorization.rs
··· 50 50 // Before performing a relatively expensive DID look-up, ensure the token 51 51 // claims are valid. 52 52 let unverified_claims = unverified_token.claims; 53 - GitVerification::verify(&knot, now, knot.instance_audience(), &unverified_claims).map_err( 54 - |error| match error { 53 + GitVerification::verify(&knot, now, knot.instance_audience(), &unverified_claims) 54 + .await 55 + .map_err(|error| match error { 55 56 // Git re-uses the token from the credential helper for each request in a single push. 56 57 // 57 58 // Returning 'Forbidden' here will make git abort. Instead, we return an Unauthorized 58 59 // which will force git to get a new token from the credential helper. 59 60 VerificationError::Reused => Error::unauthorized(&knot, "authorization re-used"), 60 61 error => Error::forbidden(&knot, error.to_string()), 61 - }, 62 - )?; 62 + })?; 63 63 64 64 // Resolve the DID document for the claimed issuer, extract and parse 65 65 // the verification methods into public keys. ··· 97 97 98 98 // Re-verify the claims for the sake of paranoia. 99 99 GitVerification::verify(&knot, now, knot.instance_audience(), &claims) 100 + .await 100 101 .map_err(|error| Error::forbidden(&knot, error.to_string()))?; 101 102 102 103 // Store the JWT so it cannot be re-used within the claim period. 103 - knot.store_claims(claims.clone()); 104 + knot.store_claims(claims.clone(), now).await?; 104 105 return Ok(Self(claims)); 105 106 } 106 107 }
+2 -1
crates/knot/src/public/git/error.rs
··· 1 - use crate::model::Knot; 1 + use crate::{model::Knot, services::authorization::AuthorizationClaimsStoreError}; 2 2 use axum::{ 3 3 extract::rejection::PathRejection, 4 4 http::{ ··· 109 109 110 110 internal_error!(std::io::Error); 111 111 internal_error!(axum::Error); 112 + internal_error!(AuthorizationClaimsStoreError);
+40 -17
crates/knot/src/services/authorization.rs
··· 11 11 request::Parts, 12 12 }, 13 13 }; 14 + use futures_util::future::BoxFuture; 14 15 use identity::Resolver; 15 16 use time::OffsetDateTime; 16 17 ··· 20 19 public::xrpc::XrpcError, 21 20 }; 22 21 23 - pub trait AuthorizationClaimsStore<T> { 24 - fn get_unexpired_claims(&self, jti: &str, now: i64) -> Option<T>; 22 + #[derive(Debug, thiserror::Error)] 23 + #[error("transparent")] 24 + pub struct AuthorizationClaimsStoreError(pub Box<dyn std::error::Error>); 25 25 26 - fn store_claims(&self, claims: T); 26 + pub trait AuthorizationClaimsStore<T>: Send + Sync { 27 + fn get_unexpired_claims<'a: 'b, 'b>( 28 + &'a self, 29 + jti: &'b str, 30 + now: i64, 31 + ) -> BoxFuture<'b, Result<Option<T>, AuthorizationClaimsStoreError>>; 32 + 33 + fn store_claims( 34 + &self, 35 + claims: T, 36 + now: i64, 37 + ) -> BoxFuture<'_, Result<(), AuthorizationClaimsStoreError>>; 27 38 } 28 39 29 40 #[derive(Debug, thiserror::Error)] ··· 50 37 WrongAudience, 51 38 #[error("re-used authorization")] 52 39 Reused, 40 + #[error("failed to read claims storage: {0}")] 41 + Storage(#[from] AuthorizationClaimsStoreError), 53 42 } 54 43 55 - pub trait Verification: fmt::Debug { 44 + pub trait Verification: fmt::Debug + Send { 56 45 const LEXICON_METHOD: &'static str; 57 46 58 47 fn verify_iat(now: i64, claims: &Claims) -> Result<i64, VerificationError> { ··· 93 78 store: &dyn AuthorizationClaimsStore<Claims>, 94 79 now: i64, 95 80 claims: &Claims, 96 - ) -> Result<(), VerificationError> { 97 - match store.get_unexpired_claims(&claims.jti, now) { 98 - Some(stored_claims) if stored_claims.exp < now => Ok(()), 99 - None => Ok(()), 100 - _ => Err(VerificationError::Reused), 81 + ) -> impl Future<Output = Result<(), VerificationError>> + Send { 82 + async move { 83 + match store.get_unexpired_claims(&claims.jti, now).await? { 84 + Some(stored_claims) if stored_claims.exp < now => Ok(()), 85 + None => Ok(()), 86 + _ => Err(VerificationError::Reused), 87 + } 101 88 } 102 89 } 103 90 ··· 108 91 now: i64, 109 92 audience: &atproto::Did, 110 93 claims: &Claims, 111 - ) -> Result<(), VerificationError> { 112 - Self::verify_iat(now, claims)?; 113 - Self::verify_exp(now, claims)?; 114 - Self::verify_lexicon_method(claims)?; 115 - Self::verify_audience(audience, claims)?; 116 - Self::verify_unique(store, now, claims)?; 117 - Ok(()) 94 + ) -> impl Future<Output = Result<(), VerificationError>> + Send { 95 + async move { 96 + Self::verify_iat(now, claims)?; 97 + Self::verify_exp(now, claims)?; 98 + Self::verify_lexicon_method(claims)?; 99 + Self::verify_audience(audience, claims)?; 100 + Self::verify_unique(store, now, claims).await?; 101 + Ok(()) 102 + } 118 103 } 119 104 } 120 105 ··· 164 145 // claims are valid. 165 146 let unverified_claims = unverified_token.claims; 166 147 V::verify(&knot, now, knot.instance_audience(), &unverified_claims) 148 + .await 167 149 .map_err(errors::Forbidden)?; 168 150 169 151 // Resolve the DID document for the claimed issuer, extract and parse ··· 193 173 194 174 // Re-verify the claims for the sake of paranoia. 195 175 V::verify(&knot, now, knot.instance_audience(), &claims) 176 + .await 196 177 .map_err(errors::Forbidden)?; 197 178 198 179 // Store the JWT so it cannot be re-used. 199 - knot.store_claims(claims.clone()); 180 + knot.store_claims(claims.clone(), now) 181 + .await 182 + .map_err(errors::Internal)?; 200 183 201 184 return Ok(Self::new(claims)); 202 185 }
+51
crates/knot/src/services/database.rs
··· 20 20 AtUri(#[from] atproto::aturi::Error), 21 21 #[error(transparent)] 22 22 DateTime(#[from] time::error::ComponentRange), 23 + #[error("Invalid JSON data in database: {0}")] 24 + Json(#[from] serde_json::Error), 23 25 #[error("{0}")] 24 26 Other(#[from] anyhow::Error), 25 27 } ··· 337 335 Ok(record) 338 336 }) 339 337 .boxed() 338 + } 339 + 340 + pub async fn store_claims( 341 + &self, 342 + claims: auth::jwt::Claims, 343 + now: i64, 344 + ) -> Result<(), DataStoreError> { 345 + let mut transaction = self.db.begin().await?; 346 + 347 + // First delete any expired claims. 348 + sqlx::query!( 349 + "DELETE FROM claim WHERE json_extract(claims, '$.exp') < ?", 350 + now 351 + ) 352 + .execute(&mut *transaction) 353 + .await?; 354 + 355 + let id = &claims.jti; 356 + let claims = serde_json::to_value(&claims)?; 357 + sqlx::query!("INSERT INTO claim (id, claims) VALUES (?, ?)", id, claims) 358 + .execute(&mut *transaction) 359 + .await?; 360 + 361 + transaction.commit().await?; 362 + Ok(()) 363 + } 364 + 365 + pub async fn get_claims( 366 + &self, 367 + id: &str, 368 + now: i64, 369 + ) -> Result<Option<auth::jwt::Claims>, DataStoreError> { 370 + let claims = sqlx::query!( 371 + r#"SELECT claims as "claims: Value" FROM claim WHERE id = ? AND json_extract(claims, '$.exp') >= ?"#, 372 + id, now 373 + ) 374 + .fetch_optional(&self.db) 375 + .await? 376 + .map(|record| serde_json::from_value::<auth::jwt::Claims>(record.claims)) 377 + .transpose()?; 378 + 379 + Ok(claims) 380 + } 381 + 382 + pub async fn delete_claims(&self, id: &str) -> Result<(), DataStoreError> { 383 + sqlx::query!("DELETE FROM claim WHERE id = ?", id) 384 + .execute(&self.db) 385 + .await?; 386 + Ok(()) 340 387 } 341 388 }