A lexicon-driven AppView for ATProto. happyview.dev
backfill firehose jetstream atproto appview oauth lexicon
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor!: split admin, repo, and xrpc into directory modules

Trezy 46782a93 a9faf48f

+1603 -1548
-597
src/admin.rs
··· 1 - use axum::extract::{Path, State}; 2 - use axum::http::StatusCode; 3 - use axum::routing::{delete, get, post}; 4 - use axum::{Json, Router}; 5 - use serde::{Deserialize, Serialize}; 6 - use serde_json::Value; 7 - use sha2::{Digest, Sha256}; 8 - 9 - use crate::AppState; 10 - use crate::error::AppError; 11 - use crate::lexicon::{LexiconType, ParsedLexicon}; 12 - 13 - // --------------------------------------------------------------------------- 14 - // Helpers 15 - // --------------------------------------------------------------------------- 16 - 17 - /// SHA-256 hash a plaintext API key for storage/comparison. 18 - pub(crate) fn hash_api_key(key: &str) -> String { 19 - let hash = Sha256::digest(key.as_bytes()); 20 - hex::encode(hash) 21 - } 22 - 23 - // --------------------------------------------------------------------------- 24 - // Admin auth middleware 25 - // --------------------------------------------------------------------------- 26 - 27 - pub fn admin_routes(_state: AppState) -> Router<AppState> { 28 - Router::new() 29 - .route("/lexicons", post(upload_lexicon).get(list_lexicons)) 30 - .route("/lexicons/{id}", get(get_lexicon).delete(delete_lexicon)) 31 - .route("/stats", get(stats)) 32 - .route("/backfill", post(create_backfill)) 33 - .route("/backfill/status", get(backfill_status)) 34 - .route("/admins", post(create_admin).get(list_admins)) 35 - .route("/admins/{id}", delete(delete_admin)) 36 - } 37 - 38 - /// Axum extractor for admin auth. Checks the Bearer token against: 39 - /// 1. The `admins` table (hashed key lookup) 40 - /// 2. Falls back to `ADMIN_SECRET` env var for bootstrap 41 - pub struct AdminAuth; 42 - 43 - impl axum::extract::FromRequestParts<AppState> for AdminAuth { 44 - type Rejection = AppError; 45 - 46 - async fn from_request_parts( 47 - parts: &mut axum::http::request::Parts, 48 - state: &AppState, 49 - ) -> Result<Self, Self::Rejection> { 50 - let header = parts 51 - .headers 52 - .get("authorization") 53 - .and_then(|v| v.to_str().ok()) 54 - .ok_or_else(|| AppError::Auth("missing Authorization header".into()))?; 55 - 56 - let token = header 57 - .strip_prefix("Bearer ") 58 - .ok_or_else(|| AppError::Auth("invalid Authorization scheme".into()))?; 59 - 60 - // Check admins table first 61 - let key_hash = hash_api_key(token); 62 - let found: Option<(String,)> = 63 - sqlx::query_as("SELECT id::text FROM admins WHERE api_key_hash = $1") 64 - .bind(&key_hash) 65 - .fetch_optional(&state.db) 66 - .await 67 - .map_err(|e| AppError::Internal(format!("admin auth query failed: {e}")))?; 68 - 69 - if let Some((admin_id,)) = found { 70 - // Update last_used_at in the background 71 - let db = state.db.clone(); 72 - let admin_id = admin_id.clone(); 73 - tokio::spawn(async move { 74 - let _ = sqlx::query("UPDATE admins SET last_used_at = NOW() WHERE id::text = $1") 75 - .bind(&admin_id) 76 - .execute(&db) 77 - .await; 78 - }); 79 - return Ok(AdminAuth); 80 - } 81 - 82 - // Fall back to ADMIN_SECRET env var 83 - if let Some(ref secret) = state.config.admin_secret 84 - && token == secret 85 - { 86 - return Ok(AdminAuth); 87 - } 88 - 89 - Err(AppError::Auth("invalid admin credentials".into())) 90 - } 91 - } 92 - 93 - /// Bootstrap: if no admins exist and ADMIN_SECRET is set, create a bootstrap admin. 94 - pub async fn bootstrap(db: &sqlx::PgPool, admin_secret: &Option<String>) { 95 - let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM admins") 96 - .fetch_one(db) 97 - .await 98 - .unwrap_or((0,)); 99 - 100 - if count.0 > 0 { 101 - return; 102 - } 103 - 104 - if let Some(secret) = admin_secret { 105 - let key_hash = hash_api_key(secret); 106 - let _ = sqlx::query( 107 - "INSERT INTO admins (name, api_key_hash) VALUES ($1, $2) ON CONFLICT DO NOTHING", 108 - ) 109 - .bind("bootstrap") 110 - .bind(&key_hash) 111 - .execute(db) 112 - .await; 113 - tracing::info!("created bootstrap admin from ADMIN_SECRET"); 114 - } 115 - } 116 - 117 - // --------------------------------------------------------------------------- 118 - // Jetstream notification 119 - // --------------------------------------------------------------------------- 120 - 121 - /// Send the current record collection list to the Jetstream task so it 122 - /// reconnects with the updated filter. 123 - async fn notify_jetstream(state: &AppState) { 124 - let collections = state.lexicons.get_record_collections().await; 125 - let _ = state.collections_tx.send(collections); 126 - } 127 - 128 - // --------------------------------------------------------------------------- 129 - // Request / response types 130 - // --------------------------------------------------------------------------- 131 - 132 - #[derive(Serialize)] 133 - struct LexiconSummary { 134 - id: String, 135 - revision: i32, 136 - lexicon_type: String, 137 - backfill: bool, 138 - created_at: chrono::DateTime<chrono::Utc>, 139 - updated_at: chrono::DateTime<chrono::Utc>, 140 - } 141 - 142 - #[derive(Deserialize)] 143 - struct UploadLexiconBody { 144 - lexicon_json: Value, 145 - #[serde(default = "default_backfill")] 146 - backfill: bool, 147 - target_collection: Option<String>, 148 - } 149 - 150 - fn default_backfill() -> bool { 151 - true 152 - } 153 - 154 - #[derive(Serialize)] 155 - struct StatsResponse { 156 - total_records: i64, 157 - collections: Vec<CollectionStat>, 158 - } 159 - 160 - #[derive(Serialize)] 161 - struct CollectionStat { 162 - collection: String, 163 - count: i64, 164 - } 165 - 166 - // --------------------------------------------------------------------------- 167 - // Handlers 168 - // --------------------------------------------------------------------------- 169 - 170 - /// POST /admin/lexicons — upload (upsert) a lexicon. 171 - async fn upload_lexicon( 172 - State(state): State<AppState>, 173 - _admin: AdminAuth, 174 - Json(body): Json<UploadLexiconBody>, 175 - ) -> Result<(StatusCode, Json<Value>), AppError> { 176 - // Validate basic structure 177 - let lexicon_version = body 178 - .lexicon_json 179 - .get("lexicon") 180 - .and_then(|v| v.as_i64()) 181 - .ok_or_else(|| { 182 - AppError::BadRequest("lexicon JSON must have a numeric 'lexicon' field".into()) 183 - })?; 184 - 185 - if lexicon_version != 1 { 186 - return Err(AppError::BadRequest(format!( 187 - "unsupported lexicon version: {lexicon_version}" 188 - ))); 189 - } 190 - 191 - let id = body 192 - .lexicon_json 193 - .get("id") 194 - .and_then(|v| v.as_str()) 195 - .ok_or_else(|| AppError::BadRequest("lexicon JSON must have a string 'id' field".into()))? 196 - .to_string(); 197 - 198 - // Validate it parses correctly 199 - ParsedLexicon::parse(body.lexicon_json.clone(), 1, body.target_collection.clone()) 200 - .map_err(|e| AppError::BadRequest(format!("failed to parse lexicon: {e}")))?; 201 - 202 - // Upsert into database 203 - let row: (i32,) = sqlx::query_as( 204 - r#" 205 - INSERT INTO lexicons (id, lexicon_json, backfill, target_collection) 206 - VALUES ($1, $2, $3, $4) 207 - ON CONFLICT (id) DO UPDATE SET 208 - lexicon_json = EXCLUDED.lexicon_json, 209 - backfill = EXCLUDED.backfill, 210 - target_collection = EXCLUDED.target_collection, 211 - revision = lexicons.revision + 1, 212 - updated_at = NOW() 213 - RETURNING revision 214 - "#, 215 - ) 216 - .bind(&id) 217 - .bind(&body.lexicon_json) 218 - .bind(body.backfill) 219 - .bind(&body.target_collection) 220 - .fetch_one(&state.db) 221 - .await 222 - .map_err(|e| AppError::Internal(format!("failed to upsert lexicon: {e}")))?; 223 - 224 - let revision = row.0; 225 - 226 - // Update in-memory registry with correct revision 227 - let parsed = ParsedLexicon::parse(body.lexicon_json, revision, body.target_collection) 228 - .map_err(|e| AppError::Internal(format!("failed to re-parse lexicon: {e}")))?; 229 - let is_record = parsed.lexicon_type == LexiconType::Record; 230 - state.lexicons.upsert(parsed).await; 231 - 232 - if is_record { 233 - notify_jetstream(&state).await; 234 - } 235 - 236 - let status = if revision == 1 { 237 - StatusCode::CREATED 238 - } else { 239 - StatusCode::OK 240 - }; 241 - 242 - Ok(( 243 - status, 244 - Json(serde_json::json!({ 245 - "id": id, 246 - "revision": revision, 247 - })), 248 - )) 249 - } 250 - 251 - /// GET /admin/lexicons — list all lexicons. 252 - async fn list_lexicons( 253 - State(state): State<AppState>, 254 - _admin: AdminAuth, 255 - ) -> Result<Json<Vec<LexiconSummary>>, AppError> { 256 - #[allow(clippy::type_complexity)] 257 - let rows: Vec<(String, i32, Value, bool, chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)> = 258 - sqlx::query_as( 259 - "SELECT id, revision, lexicon_json, backfill, created_at, updated_at FROM lexicons ORDER BY id", 260 - ) 261 - .fetch_all(&state.db) 262 - .await 263 - .map_err(|e| AppError::Internal(format!("failed to list lexicons: {e}")))?; 264 - 265 - let summaries: Vec<LexiconSummary> = rows 266 - .into_iter() 267 - .map(|(id, revision, json, backfill, created_at, updated_at)| { 268 - let lexicon_type = ParsedLexicon::parse(json, revision, None) 269 - .map(|p| format!("{:?}", p.lexicon_type).to_lowercase()) 270 - .unwrap_or_else(|_| "unknown".into()); 271 - 272 - LexiconSummary { 273 - id, 274 - revision, 275 - lexicon_type, 276 - backfill, 277 - created_at, 278 - updated_at, 279 - } 280 - }) 281 - .collect(); 282 - 283 - Ok(Json(summaries)) 284 - } 285 - 286 - /// GET /admin/lexicons/:id — get a single lexicon. 287 - async fn get_lexicon( 288 - State(state): State<AppState>, 289 - _admin: AdminAuth, 290 - Path(id): Path<String>, 291 - ) -> Result<Json<Value>, AppError> { 292 - #[allow(clippy::type_complexity)] 293 - let row: Option<(String, i32, Value, bool, chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)> = 294 - sqlx::query_as( 295 - "SELECT id, revision, lexicon_json, backfill, created_at, updated_at FROM lexicons WHERE id = $1", 296 - ) 297 - .bind(&id) 298 - .fetch_optional(&state.db) 299 - .await 300 - .map_err(|e| AppError::Internal(format!("failed to get lexicon: {e}")))?; 301 - 302 - let (id, revision, lexicon_json, backfill, created_at, updated_at) = 303 - row.ok_or_else(|| AppError::NotFound(format!("lexicon '{id}' not found")))?; 304 - 305 - Ok(Json(serde_json::json!({ 306 - "id": id, 307 - "revision": revision, 308 - "lexicon_json": lexicon_json, 309 - "backfill": backfill, 310 - "created_at": created_at, 311 - "updated_at": updated_at, 312 - }))) 313 - } 314 - 315 - /// DELETE /admin/lexicons/:id — remove a lexicon. 316 - async fn delete_lexicon( 317 - State(state): State<AppState>, 318 - _admin: AdminAuth, 319 - Path(id): Path<String>, 320 - ) -> Result<StatusCode, AppError> { 321 - let result = sqlx::query("DELETE FROM lexicons WHERE id = $1") 322 - .bind(&id) 323 - .execute(&state.db) 324 - .await 325 - .map_err(|e| AppError::Internal(format!("failed to delete lexicon: {e}")))?; 326 - 327 - if result.rows_affected() == 0 { 328 - return Err(AppError::NotFound(format!("lexicon '{id}' not found"))); 329 - } 330 - 331 - state.lexicons.remove(&id).await; 332 - notify_jetstream(&state).await; 333 - 334 - Ok(StatusCode::NO_CONTENT) 335 - } 336 - 337 - /// GET /admin/stats — system statistics. 338 - async fn stats( 339 - State(state): State<AppState>, 340 - _admin: AdminAuth, 341 - ) -> Result<Json<StatsResponse>, AppError> { 342 - let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM records") 343 - .fetch_one(&state.db) 344 - .await 345 - .map_err(|e| AppError::Internal(format!("failed to count records: {e}")))?; 346 - 347 - let collections: Vec<(String, i64)> = sqlx::query_as( 348 - "SELECT collection, COUNT(*) FROM records GROUP BY collection ORDER BY collection", 349 - ) 350 - .fetch_all(&state.db) 351 - .await 352 - .map_err(|e| AppError::Internal(format!("failed to count by collection: {e}")))?; 353 - 354 - Ok(Json(StatsResponse { 355 - total_records: total.0, 356 - collections: collections 357 - .into_iter() 358 - .map(|(collection, count)| CollectionStat { collection, count }) 359 - .collect(), 360 - })) 361 - } 362 - 363 - // --------------------------------------------------------------------------- 364 - // Backfill endpoints 365 - // --------------------------------------------------------------------------- 366 - 367 - #[derive(Deserialize)] 368 - struct CreateBackfillBody { 369 - collection: Option<String>, 370 - did: Option<String>, 371 - } 372 - 373 - #[derive(Serialize)] 374 - struct BackfillJob { 375 - id: String, 376 - collection: Option<String>, 377 - did: Option<String>, 378 - status: String, 379 - total_repos: Option<i32>, 380 - processed_repos: Option<i32>, 381 - total_records: Option<i32>, 382 - error: Option<String>, 383 - started_at: Option<chrono::DateTime<chrono::Utc>>, 384 - completed_at: Option<chrono::DateTime<chrono::Utc>>, 385 - created_at: chrono::DateTime<chrono::Utc>, 386 - } 387 - 388 - /// POST /admin/backfill — create a new backfill job. 389 - async fn create_backfill( 390 - State(state): State<AppState>, 391 - _admin: AdminAuth, 392 - Json(body): Json<CreateBackfillBody>, 393 - ) -> Result<(StatusCode, Json<Value>), AppError> { 394 - let row: (String,) = sqlx::query_as( 395 - "INSERT INTO backfill_jobs (collection, did) VALUES ($1, $2) RETURNING id::text", 396 - ) 397 - .bind(&body.collection) 398 - .bind(&body.did) 399 - .fetch_one(&state.db) 400 - .await 401 - .map_err(|e| AppError::Internal(format!("failed to create backfill job: {e}")))?; 402 - 403 - Ok(( 404 - StatusCode::CREATED, 405 - Json(serde_json::json!({ 406 - "id": row.0, 407 - "status": "pending", 408 - })), 409 - )) 410 - } 411 - 412 - /// GET /admin/backfill/status — list all backfill jobs. 413 - async fn backfill_status( 414 - State(state): State<AppState>, 415 - _admin: AdminAuth, 416 - ) -> Result<Json<Vec<BackfillJob>>, AppError> { 417 - #[allow(clippy::type_complexity)] 418 - let rows: Vec<( 419 - String, 420 - Option<String>, 421 - Option<String>, 422 - String, 423 - Option<i32>, 424 - Option<i32>, 425 - Option<i32>, 426 - Option<String>, 427 - Option<chrono::DateTime<chrono::Utc>>, 428 - Option<chrono::DateTime<chrono::Utc>>, 429 - chrono::DateTime<chrono::Utc>, 430 - )> = sqlx::query_as( 431 - "SELECT id::text, collection, did, status, total_repos, processed_repos, total_records, error, started_at, completed_at, created_at FROM backfill_jobs ORDER BY created_at DESC", 432 - ) 433 - .fetch_all(&state.db) 434 - .await 435 - .map_err(|e| AppError::Internal(format!("failed to list backfill jobs: {e}")))?; 436 - 437 - let jobs: Vec<BackfillJob> = rows 438 - .into_iter() 439 - .map( 440 - |( 441 - id, 442 - collection, 443 - did, 444 - status, 445 - total_repos, 446 - processed_repos, 447 - total_records, 448 - error, 449 - started_at, 450 - completed_at, 451 - created_at, 452 - )| { 453 - BackfillJob { 454 - id, 455 - collection, 456 - did, 457 - status, 458 - total_repos, 459 - processed_repos, 460 - total_records, 461 - error, 462 - started_at, 463 - completed_at, 464 - created_at, 465 - } 466 - }, 467 - ) 468 - .collect(); 469 - 470 - Ok(Json(jobs)) 471 - } 472 - 473 - // --------------------------------------------------------------------------- 474 - // Admin management endpoints 475 - // --------------------------------------------------------------------------- 476 - 477 - #[derive(Deserialize)] 478 - struct CreateAdminBody { 479 - name: String, 480 - } 481 - 482 - #[derive(Serialize)] 483 - struct AdminSummary { 484 - id: String, 485 - name: String, 486 - created_at: chrono::DateTime<chrono::Utc>, 487 - last_used_at: Option<chrono::DateTime<chrono::Utc>>, 488 - } 489 - 490 - /// POST /admin/admins — create a new admin. Returns the API key once. 491 - async fn create_admin( 492 - State(state): State<AppState>, 493 - _admin: AdminAuth, 494 - Json(body): Json<CreateAdminBody>, 495 - ) -> Result<(StatusCode, Json<Value>), AppError> { 496 - let api_key = uuid::Uuid::new_v4().to_string(); 497 - let key_hash = hash_api_key(&api_key); 498 - 499 - let row: (String,) = sqlx::query_as( 500 - "INSERT INTO admins (name, api_key_hash) VALUES ($1, $2) RETURNING id::text", 501 - ) 502 - .bind(&body.name) 503 - .bind(&key_hash) 504 - .fetch_one(&state.db) 505 - .await 506 - .map_err(|e| AppError::Internal(format!("failed to create admin: {e}")))?; 507 - 508 - Ok(( 509 - StatusCode::CREATED, 510 - Json(serde_json::json!({ 511 - "id": row.0, 512 - "name": body.name, 513 - "api_key": api_key, 514 - })), 515 - )) 516 - } 517 - 518 - /// GET /admin/admins — list all admins (without keys). 519 - async fn list_admins( 520 - State(state): State<AppState>, 521 - _admin: AdminAuth, 522 - ) -> Result<Json<Vec<AdminSummary>>, AppError> { 523 - #[allow(clippy::type_complexity)] 524 - let rows: Vec<( 525 - String, 526 - String, 527 - chrono::DateTime<chrono::Utc>, 528 - Option<chrono::DateTime<chrono::Utc>>, 529 - )> = sqlx::query_as( 530 - "SELECT id::text, name, created_at, last_used_at FROM admins ORDER BY created_at", 531 - ) 532 - .fetch_all(&state.db) 533 - .await 534 - .map_err(|e| AppError::Internal(format!("failed to list admins: {e}")))?; 535 - 536 - let admins: Vec<AdminSummary> = rows 537 - .into_iter() 538 - .map(|(id, name, created_at, last_used_at)| AdminSummary { 539 - id, 540 - name, 541 - created_at, 542 - last_used_at, 543 - }) 544 - .collect(); 545 - 546 - Ok(Json(admins)) 547 - } 548 - 549 - /// DELETE /admin/admins/:id — remove an admin. 550 - async fn delete_admin( 551 - State(state): State<AppState>, 552 - _admin: AdminAuth, 553 - Path(id): Path<String>, 554 - ) -> Result<StatusCode, AppError> { 555 - let result = sqlx::query("DELETE FROM admins WHERE id::text = $1") 556 - .bind(&id) 557 - .execute(&state.db) 558 - .await 559 - .map_err(|e| AppError::Internal(format!("failed to delete admin: {e}")))?; 560 - 561 - if result.rows_affected() == 0 { 562 - return Err(AppError::NotFound(format!("admin '{id}' not found"))); 563 - } 564 - 565 - Ok(StatusCode::NO_CONTENT) 566 - } 567 - 568 - #[cfg(test)] 569 - mod tests { 570 - use super::*; 571 - 572 - #[test] 573 - fn hash_api_key_produces_deterministic_sha256_hex() { 574 - let h1 = hash_api_key("test-key"); 575 - let h2 = hash_api_key("test-key"); 576 - assert_eq!(h1, h2); 577 - assert_eq!(h1.len(), 64); 578 - assert!(h1.chars().all(|c| c.is_ascii_hexdigit())); 579 - } 580 - 581 - #[test] 582 - fn hash_api_key_different_inputs_differ() { 583 - let h1 = hash_api_key("key-a"); 584 - let h2 = hash_api_key("key-b"); 585 - assert_ne!(h1, h2); 586 - } 587 - 588 - #[test] 589 - fn hash_api_key_known_value() { 590 - // SHA-256 of "hello" is well-known 591 - let hash = hash_api_key("hello"); 592 - assert_eq!( 593 - hash, 594 - "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" 595 - ); 596 - } 597 - }
+89
src/admin/admins.rs
··· 1 + use axum::extract::{Path, State}; 2 + use axum::http::StatusCode; 3 + use axum::Json; 4 + use serde_json::Value; 5 + 6 + use crate::AppState; 7 + use crate::error::AppError; 8 + 9 + use super::auth::AdminAuth; 10 + use super::hash::hash_api_key; 11 + use super::types::{AdminSummary, CreateAdminBody}; 12 + 13 + /// POST /admin/admins — create a new admin. Returns the API key once. 14 + pub(super) async fn create_admin( 15 + State(state): State<AppState>, 16 + _admin: AdminAuth, 17 + Json(body): Json<CreateAdminBody>, 18 + ) -> Result<(StatusCode, Json<Value>), AppError> { 19 + let api_key = uuid::Uuid::new_v4().to_string(); 20 + let key_hash = hash_api_key(&api_key); 21 + 22 + let row: (String,) = sqlx::query_as( 23 + "INSERT INTO admins (name, api_key_hash) VALUES ($1, $2) RETURNING id::text", 24 + ) 25 + .bind(&body.name) 26 + .bind(&key_hash) 27 + .fetch_one(&state.db) 28 + .await 29 + .map_err(|e| AppError::Internal(format!("failed to create admin: {e}")))?; 30 + 31 + Ok(( 32 + StatusCode::CREATED, 33 + Json(serde_json::json!({ 34 + "id": row.0, 35 + "name": body.name, 36 + "api_key": api_key, 37 + })), 38 + )) 39 + } 40 + 41 + /// GET /admin/admins — list all admins (without keys). 42 + pub(super) async fn list_admins( 43 + State(state): State<AppState>, 44 + _admin: AdminAuth, 45 + ) -> Result<Json<Vec<AdminSummary>>, AppError> { 46 + #[allow(clippy::type_complexity)] 47 + let rows: Vec<( 48 + String, 49 + String, 50 + chrono::DateTime<chrono::Utc>, 51 + Option<chrono::DateTime<chrono::Utc>>, 52 + )> = sqlx::query_as( 53 + "SELECT id::text, name, created_at, last_used_at FROM admins ORDER BY created_at", 54 + ) 55 + .fetch_all(&state.db) 56 + .await 57 + .map_err(|e| AppError::Internal(format!("failed to list admins: {e}")))?; 58 + 59 + let admins: Vec<AdminSummary> = rows 60 + .into_iter() 61 + .map(|(id, name, created_at, last_used_at)| AdminSummary { 62 + id, 63 + name, 64 + created_at, 65 + last_used_at, 66 + }) 67 + .collect(); 68 + 69 + Ok(Json(admins)) 70 + } 71 + 72 + /// DELETE /admin/admins/:id — remove an admin. 73 + pub(super) async fn delete_admin( 74 + State(state): State<AppState>, 75 + _admin: AdminAuth, 76 + Path(id): Path<String>, 77 + ) -> Result<StatusCode, AppError> { 78 + let result = sqlx::query("DELETE FROM admins WHERE id::text = $1") 79 + .bind(&id) 80 + .execute(&state.db) 81 + .await 82 + .map_err(|e| AppError::Internal(format!("failed to delete admin: {e}")))?; 83 + 84 + if result.rows_affected() == 0 { 85 + return Err(AppError::NotFound(format!("admin '{id}' not found"))); 86 + } 87 + 88 + Ok(StatusCode::NO_CONTENT) 89 + }
+59
src/admin/auth.rs
··· 1 + use crate::AppState; 2 + use crate::error::AppError; 3 + 4 + use super::hash::hash_api_key; 5 + 6 + /// Axum extractor for admin auth. Checks the Bearer token against: 7 + /// 1. The `admins` table (hashed key lookup) 8 + /// 2. Falls back to `ADMIN_SECRET` env var for bootstrap 9 + pub struct AdminAuth; 10 + 11 + impl axum::extract::FromRequestParts<AppState> for AdminAuth { 12 + type Rejection = AppError; 13 + 14 + async fn from_request_parts( 15 + parts: &mut axum::http::request::Parts, 16 + state: &AppState, 17 + ) -> Result<Self, Self::Rejection> { 18 + let header = parts 19 + .headers 20 + .get("authorization") 21 + .and_then(|v| v.to_str().ok()) 22 + .ok_or_else(|| AppError::Auth("missing Authorization header".into()))?; 23 + 24 + let token = header 25 + .strip_prefix("Bearer ") 26 + .ok_or_else(|| AppError::Auth("invalid Authorization scheme".into()))?; 27 + 28 + // Check admins table first 29 + let key_hash = hash_api_key(token); 30 + let found: Option<(String,)> = 31 + sqlx::query_as("SELECT id::text FROM admins WHERE api_key_hash = $1") 32 + .bind(&key_hash) 33 + .fetch_optional(&state.db) 34 + .await 35 + .map_err(|e| AppError::Internal(format!("admin auth query failed: {e}")))?; 36 + 37 + if let Some((admin_id,)) = found { 38 + // Update last_used_at in the background 39 + let db = state.db.clone(); 40 + let admin_id = admin_id.clone(); 41 + tokio::spawn(async move { 42 + let _ = sqlx::query("UPDATE admins SET last_used_at = NOW() WHERE id::text = $1") 43 + .bind(&admin_id) 44 + .execute(&db) 45 + .await; 46 + }); 47 + return Ok(AdminAuth); 48 + } 49 + 50 + // Fall back to ADMIN_SECRET env var 51 + if let Some(ref secret) = state.config.admin_secret 52 + && token == secret 53 + { 54 + return Ok(AdminAuth); 55 + } 56 + 57 + Err(AppError::Auth("invalid admin credentials".into())) 58 + } 59 + }
+95
src/admin/backfill.rs
··· 1 + use axum::extract::State; 2 + use axum::http::StatusCode; 3 + use axum::Json; 4 + use serde_json::Value; 5 + 6 + use crate::AppState; 7 + use crate::error::AppError; 8 + 9 + use super::auth::AdminAuth; 10 + use super::types::{BackfillJob, CreateBackfillBody}; 11 + 12 + /// POST /admin/backfill — create a new backfill job. 13 + pub(super) async fn create_backfill( 14 + State(state): State<AppState>, 15 + _admin: AdminAuth, 16 + Json(body): Json<CreateBackfillBody>, 17 + ) -> Result<(StatusCode, Json<Value>), AppError> { 18 + let row: (String,) = sqlx::query_as( 19 + "INSERT INTO backfill_jobs (collection, did) VALUES ($1, $2) RETURNING id::text", 20 + ) 21 + .bind(&body.collection) 22 + .bind(&body.did) 23 + .fetch_one(&state.db) 24 + .await 25 + .map_err(|e| AppError::Internal(format!("failed to create backfill job: {e}")))?; 26 + 27 + Ok(( 28 + StatusCode::CREATED, 29 + Json(serde_json::json!({ 30 + "id": row.0, 31 + "status": "pending", 32 + })), 33 + )) 34 + } 35 + 36 + /// GET /admin/backfill/status — list all backfill jobs. 37 + pub(super) async fn backfill_status( 38 + State(state): State<AppState>, 39 + _admin: AdminAuth, 40 + ) -> Result<Json<Vec<BackfillJob>>, AppError> { 41 + #[allow(clippy::type_complexity)] 42 + let rows: Vec<( 43 + String, 44 + Option<String>, 45 + Option<String>, 46 + String, 47 + Option<i32>, 48 + Option<i32>, 49 + Option<i32>, 50 + Option<String>, 51 + Option<chrono::DateTime<chrono::Utc>>, 52 + Option<chrono::DateTime<chrono::Utc>>, 53 + chrono::DateTime<chrono::Utc>, 54 + )> = sqlx::query_as( 55 + "SELECT id::text, collection, did, status, total_repos, processed_repos, total_records, error, started_at, completed_at, created_at FROM backfill_jobs ORDER BY created_at DESC", 56 + ) 57 + .fetch_all(&state.db) 58 + .await 59 + .map_err(|e| AppError::Internal(format!("failed to list backfill jobs: {e}")))?; 60 + 61 + let jobs: Vec<BackfillJob> = rows 62 + .into_iter() 63 + .map( 64 + |( 65 + id, 66 + collection, 67 + did, 68 + status, 69 + total_repos, 70 + processed_repos, 71 + total_records, 72 + error, 73 + started_at, 74 + completed_at, 75 + created_at, 76 + )| { 77 + BackfillJob { 78 + id, 79 + collection, 80 + did, 81 + status, 82 + total_repos, 83 + processed_repos, 84 + total_records, 85 + error, 86 + started_at, 87 + completed_at, 88 + created_at, 89 + } 90 + }, 91 + ) 92 + .collect(); 93 + 94 + Ok(Json(jobs)) 95 + }
+25
src/admin/bootstrap.rs
··· 1 + use super::hash::hash_api_key; 2 + 3 + /// Bootstrap: if no admins exist and ADMIN_SECRET is set, create a bootstrap admin. 4 + pub async fn bootstrap(db: &sqlx::PgPool, admin_secret: &Option<String>) { 5 + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM admins") 6 + .fetch_one(db) 7 + .await 8 + .unwrap_or((0,)); 9 + 10 + if count.0 > 0 { 11 + return; 12 + } 13 + 14 + if let Some(secret) = admin_secret { 15 + let key_hash = hash_api_key(secret); 16 + let _ = sqlx::query( 17 + "INSERT INTO admins (name, api_key_hash) VALUES ($1, $2) ON CONFLICT DO NOTHING", 18 + ) 19 + .bind("bootstrap") 20 + .bind(&key_hash) 21 + .execute(db) 22 + .await; 23 + tracing::info!("created bootstrap admin from ADMIN_SECRET"); 24 + } 25 + }
+38
src/admin/hash.rs
··· 1 + use sha2::{Digest, Sha256}; 2 + 3 + /// SHA-256 hash a plaintext API key for storage/comparison. 4 + pub(crate) fn hash_api_key(key: &str) -> String { 5 + let hash = Sha256::digest(key.as_bytes()); 6 + hex::encode(hash) 7 + } 8 + 9 + #[cfg(test)] 10 + mod tests { 11 + use super::*; 12 + 13 + #[test] 14 + fn hash_api_key_produces_deterministic_sha256_hex() { 15 + let h1 = hash_api_key("test-key"); 16 + let h2 = hash_api_key("test-key"); 17 + assert_eq!(h1, h2); 18 + assert_eq!(h1.len(), 64); 19 + assert!(h1.chars().all(|c| c.is_ascii_hexdigit())); 20 + } 21 + 22 + #[test] 23 + fn hash_api_key_different_inputs_differ() { 24 + let h1 = hash_api_key("key-a"); 25 + let h2 = hash_api_key("key-b"); 26 + assert_ne!(h1, h2); 27 + } 28 + 29 + #[test] 30 + fn hash_api_key_known_value() { 31 + // SHA-256 of "hello" is well-known 32 + let hash = hash_api_key("hello"); 33 + assert_eq!( 34 + hash, 35 + "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" 36 + ); 37 + } 38 + }
+185
src/admin/lexicons.rs
··· 1 + use axum::extract::{Path, State}; 2 + use axum::http::StatusCode; 3 + use axum::Json; 4 + use serde_json::Value; 5 + 6 + use crate::AppState; 7 + use crate::error::AppError; 8 + use crate::lexicon::{LexiconType, ParsedLexicon}; 9 + 10 + use super::auth::AdminAuth; 11 + use super::types::{LexiconSummary, UploadLexiconBody}; 12 + 13 + /// Send the current record collection list to the Jetstream task so it 14 + /// reconnects with the updated filter. 15 + async fn notify_jetstream(state: &AppState) { 16 + let collections = state.lexicons.get_record_collections().await; 17 + let _ = state.collections_tx.send(collections); 18 + } 19 + 20 + /// POST /admin/lexicons — upload (upsert) a lexicon. 21 + pub(super) async fn upload_lexicon( 22 + State(state): State<AppState>, 23 + _admin: AdminAuth, 24 + Json(body): Json<UploadLexiconBody>, 25 + ) -> Result<(StatusCode, Json<Value>), AppError> { 26 + // Validate basic structure 27 + let lexicon_version = body 28 + .lexicon_json 29 + .get("lexicon") 30 + .and_then(|v| v.as_i64()) 31 + .ok_or_else(|| { 32 + AppError::BadRequest("lexicon JSON must have a numeric 'lexicon' field".into()) 33 + })?; 34 + 35 + if lexicon_version != 1 { 36 + return Err(AppError::BadRequest(format!( 37 + "unsupported lexicon version: {lexicon_version}" 38 + ))); 39 + } 40 + 41 + let id = body 42 + .lexicon_json 43 + .get("id") 44 + .and_then(|v| v.as_str()) 45 + .ok_or_else(|| AppError::BadRequest("lexicon JSON must have a string 'id' field".into()))? 46 + .to_string(); 47 + 48 + // Validate it parses correctly 49 + ParsedLexicon::parse(body.lexicon_json.clone(), 1, body.target_collection.clone()) 50 + .map_err(|e| AppError::BadRequest(format!("failed to parse lexicon: {e}")))?; 51 + 52 + // Upsert into database 53 + let row: (i32,) = sqlx::query_as( 54 + r#" 55 + INSERT INTO lexicons (id, lexicon_json, backfill, target_collection) 56 + VALUES ($1, $2, $3, $4) 57 + ON CONFLICT (id) DO UPDATE SET 58 + lexicon_json = EXCLUDED.lexicon_json, 59 + backfill = EXCLUDED.backfill, 60 + target_collection = EXCLUDED.target_collection, 61 + revision = lexicons.revision + 1, 62 + updated_at = NOW() 63 + RETURNING revision 64 + "#, 65 + ) 66 + .bind(&id) 67 + .bind(&body.lexicon_json) 68 + .bind(body.backfill) 69 + .bind(&body.target_collection) 70 + .fetch_one(&state.db) 71 + .await 72 + .map_err(|e| AppError::Internal(format!("failed to upsert lexicon: {e}")))?; 73 + 74 + let revision = row.0; 75 + 76 + // Update in-memory registry with correct revision 77 + let parsed = ParsedLexicon::parse(body.lexicon_json, revision, body.target_collection) 78 + .map_err(|e| AppError::Internal(format!("failed to re-parse lexicon: {e}")))?; 79 + let is_record = parsed.lexicon_type == LexiconType::Record; 80 + state.lexicons.upsert(parsed).await; 81 + 82 + if is_record { 83 + notify_jetstream(&state).await; 84 + } 85 + 86 + let status = if revision == 1 { 87 + StatusCode::CREATED 88 + } else { 89 + StatusCode::OK 90 + }; 91 + 92 + Ok(( 93 + status, 94 + Json(serde_json::json!({ 95 + "id": id, 96 + "revision": revision, 97 + })), 98 + )) 99 + } 100 + 101 + /// GET /admin/lexicons — list all lexicons. 102 + pub(super) async fn list_lexicons( 103 + State(state): State<AppState>, 104 + _admin: AdminAuth, 105 + ) -> Result<Json<Vec<LexiconSummary>>, AppError> { 106 + #[allow(clippy::type_complexity)] 107 + let rows: Vec<(String, i32, Value, bool, chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)> = 108 + sqlx::query_as( 109 + "SELECT id, revision, lexicon_json, backfill, created_at, updated_at FROM lexicons ORDER BY id", 110 + ) 111 + .fetch_all(&state.db) 112 + .await 113 + .map_err(|e| AppError::Internal(format!("failed to list lexicons: {e}")))?; 114 + 115 + let summaries: Vec<LexiconSummary> = rows 116 + .into_iter() 117 + .map(|(id, revision, json, backfill, created_at, updated_at)| { 118 + let lexicon_type = ParsedLexicon::parse(json, revision, None) 119 + .map(|p| format!("{:?}", p.lexicon_type).to_lowercase()) 120 + .unwrap_or_else(|_| "unknown".into()); 121 + 122 + LexiconSummary { 123 + id, 124 + revision, 125 + lexicon_type, 126 + backfill, 127 + created_at, 128 + updated_at, 129 + } 130 + }) 131 + .collect(); 132 + 133 + Ok(Json(summaries)) 134 + } 135 + 136 + /// GET /admin/lexicons/:id — get a single lexicon. 137 + pub(super) async fn get_lexicon( 138 + State(state): State<AppState>, 139 + _admin: AdminAuth, 140 + Path(id): Path<String>, 141 + ) -> Result<Json<Value>, AppError> { 142 + #[allow(clippy::type_complexity)] 143 + let row: Option<(String, i32, Value, bool, chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)> = 144 + sqlx::query_as( 145 + "SELECT id, revision, lexicon_json, backfill, created_at, updated_at FROM lexicons WHERE id = $1", 146 + ) 147 + .bind(&id) 148 + .fetch_optional(&state.db) 149 + .await 150 + .map_err(|e| AppError::Internal(format!("failed to get lexicon: {e}")))?; 151 + 152 + let (id, revision, lexicon_json, backfill, created_at, updated_at) = 153 + row.ok_or_else(|| AppError::NotFound(format!("lexicon '{id}' not found")))?; 154 + 155 + Ok(Json(serde_json::json!({ 156 + "id": id, 157 + "revision": revision, 158 + "lexicon_json": lexicon_json, 159 + "backfill": backfill, 160 + "created_at": created_at, 161 + "updated_at": updated_at, 162 + }))) 163 + } 164 + 165 + /// DELETE /admin/lexicons/:id — remove a lexicon. 166 + pub(super) async fn delete_lexicon( 167 + State(state): State<AppState>, 168 + _admin: AdminAuth, 169 + Path(id): Path<String>, 170 + ) -> Result<StatusCode, AppError> { 171 + let result = sqlx::query("DELETE FROM lexicons WHERE id = $1") 172 + .bind(&id) 173 + .execute(&state.db) 174 + .await 175 + .map_err(|e| AppError::Internal(format!("failed to delete lexicon: {e}")))?; 176 + 177 + if result.rows_affected() == 0 { 178 + return Err(AppError::NotFound(format!("lexicon '{id}' not found"))); 179 + } 180 + 181 + state.lexicons.remove(&id).await; 182 + notify_jetstream(&state).await; 183 + 184 + Ok(StatusCode::NO_CONTENT) 185 + }
+26
src/admin/mod.rs
··· 1 + mod admins; 2 + pub(crate) mod auth; 3 + pub(crate) mod bootstrap; 4 + mod backfill; 5 + pub(crate) mod hash; 6 + mod lexicons; 7 + mod stats; 8 + mod types; 9 + 10 + use axum::routing::{delete, get, post}; 11 + use axum::Router; 12 + 13 + use crate::AppState; 14 + 15 + pub use bootstrap::bootstrap; 16 + 17 + pub fn admin_routes(_state: AppState) -> Router<AppState> { 18 + Router::new() 19 + .route("/lexicons", post(lexicons::upload_lexicon).get(lexicons::list_lexicons)) 20 + .route("/lexicons/{id}", get(lexicons::get_lexicon).delete(lexicons::delete_lexicon)) 21 + .route("/stats", get(stats::stats)) 22 + .route("/backfill", post(backfill::create_backfill)) 23 + .route("/backfill/status", get(backfill::backfill_status)) 24 + .route("/admins", post(admins::create_admin).get(admins::list_admins)) 25 + .route("/admins/{id}", delete(admins::delete_admin)) 26 + }
+34
src/admin/stats.rs
··· 1 + use axum::extract::State; 2 + use axum::Json; 3 + 4 + use crate::AppState; 5 + use crate::error::AppError; 6 + 7 + use super::auth::AdminAuth; 8 + use super::types::{CollectionStat, StatsResponse}; 9 + 10 + /// GET /admin/stats — system statistics. 11 + pub(super) async fn stats( 12 + State(state): State<AppState>, 13 + _admin: AdminAuth, 14 + ) -> Result<Json<StatsResponse>, AppError> { 15 + let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM records") 16 + .fetch_one(&state.db) 17 + .await 18 + .map_err(|e| AppError::Internal(format!("failed to count records: {e}")))?; 19 + 20 + let collections: Vec<(String, i64)> = sqlx::query_as( 21 + "SELECT collection, COUNT(*) FROM records GROUP BY collection ORDER BY collection", 22 + ) 23 + .fetch_all(&state.db) 24 + .await 25 + .map_err(|e| AppError::Internal(format!("failed to count by collection: {e}")))?; 26 + 27 + Ok(Json(StatsResponse { 28 + total_records: total.0, 29 + collections: collections 30 + .into_iter() 31 + .map(|(collection, count)| CollectionStat { collection, count }) 32 + .collect(), 33 + })) 34 + }
+86
src/admin/types.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + use serde_json::Value; 3 + 4 + // --------------------------------------------------------------------------- 5 + // Lexicon types 6 + // --------------------------------------------------------------------------- 7 + 8 + #[derive(Serialize)] 9 + pub(super) struct LexiconSummary { 10 + pub(super) id: String, 11 + pub(super) revision: i32, 12 + pub(super) lexicon_type: String, 13 + pub(super) backfill: bool, 14 + pub(super) created_at: chrono::DateTime<chrono::Utc>, 15 + pub(super) updated_at: chrono::DateTime<chrono::Utc>, 16 + } 17 + 18 + #[derive(Deserialize)] 19 + pub(super) struct UploadLexiconBody { 20 + pub(super) lexicon_json: Value, 21 + #[serde(default = "default_backfill")] 22 + pub(super) backfill: bool, 23 + pub(super) target_collection: Option<String>, 24 + } 25 + 26 + fn default_backfill() -> bool { 27 + true 28 + } 29 + 30 + // --------------------------------------------------------------------------- 31 + // Stats types 32 + // --------------------------------------------------------------------------- 33 + 34 + #[derive(Serialize)] 35 + pub(super) struct StatsResponse { 36 + pub(super) total_records: i64, 37 + pub(super) collections: Vec<CollectionStat>, 38 + } 39 + 40 + #[derive(Serialize)] 41 + pub(super) struct CollectionStat { 42 + pub(super) collection: String, 43 + pub(super) count: i64, 44 + } 45 + 46 + // --------------------------------------------------------------------------- 47 + // Backfill types 48 + // --------------------------------------------------------------------------- 49 + 50 + #[derive(Deserialize)] 51 + pub(super) struct CreateBackfillBody { 52 + pub(super) collection: Option<String>, 53 + pub(super) did: Option<String>, 54 + } 55 + 56 + #[derive(Serialize)] 57 + pub(super) struct BackfillJob { 58 + pub(super) id: String, 59 + pub(super) collection: Option<String>, 60 + pub(super) did: Option<String>, 61 + pub(super) status: String, 62 + pub(super) total_repos: Option<i32>, 63 + pub(super) processed_repos: Option<i32>, 64 + pub(super) total_records: Option<i32>, 65 + pub(super) error: Option<String>, 66 + pub(super) started_at: Option<chrono::DateTime<chrono::Utc>>, 67 + pub(super) completed_at: Option<chrono::DateTime<chrono::Utc>>, 68 + pub(super) created_at: chrono::DateTime<chrono::Utc>, 69 + } 70 + 71 + // --------------------------------------------------------------------------- 72 + // Admin management types 73 + // --------------------------------------------------------------------------- 74 + 75 + #[derive(Deserialize)] 76 + pub(super) struct CreateAdminBody { 77 + pub(super) name: String, 78 + } 79 + 80 + #[derive(Serialize)] 81 + pub(super) struct AdminSummary { 82 + pub(super) id: String, 83 + pub(super) name: String, 84 + pub(super) created_at: chrono::DateTime<chrono::Utc>, 85 + pub(super) last_used_at: Option<chrono::DateTime<chrono::Utc>>, 86 + }
-600
src/repo.rs
··· 1 - use axum::body::Bytes; 2 - use axum::extract::State; 3 - use axum::http::{HeaderMap, StatusCode}; 4 - use axum::response::{IntoResponse, Response}; 5 - use base64::Engine; 6 - use p256::pkcs8::EncodePrivateKey; 7 - use serde::Deserialize; 8 - use serde_json::{Value, json}; 9 - use sha2::{Digest, Sha256}; 10 - 11 - use crate::AppState; 12 - use crate::auth::Claims; 13 - use crate::error::AppError; 14 - 15 - // --------------------------------------------------------------------------- 16 - // AT URI parsing 17 - // --------------------------------------------------------------------------- 18 - 19 - /// Extract the DID from an AT URI (at://did/collection/rkey). 20 - pub(crate) fn parse_did_from_at_uri(uri: &str) -> Result<String, AppError> { 21 - let stripped = uri 22 - .strip_prefix("at://") 23 - .ok_or_else(|| AppError::Internal("AT URI must start with at://".into()))?; 24 - 25 - stripped 26 - .split('/') 27 - .next() 28 - .map(|s| s.to_string()) 29 - .ok_or_else(|| AppError::Internal("invalid AT URI".into())) 30 - } 31 - 32 - // --------------------------------------------------------------------------- 33 - // AIP session types 34 - // --------------------------------------------------------------------------- 35 - 36 - #[derive(Deserialize)] 37 - pub(crate) struct AtpSession { 38 - pub(crate) access_token: String, 39 - pub(crate) pds_endpoint: String, 40 - pub(crate) dpop_jwk: DpopJwk, 41 - } 42 - 43 - #[derive(Deserialize)] 44 - pub(crate) struct DpopJwk { 45 - pub(crate) x: String, 46 - pub(crate) y: String, 47 - pub(crate) d: String, 48 - } 49 - 50 - // --------------------------------------------------------------------------- 51 - // DPoP proof generation 52 - // --------------------------------------------------------------------------- 53 - 54 - #[derive(serde::Serialize)] 55 - struct DpopClaims { 56 - jti: String, 57 - htm: String, 58 - htu: String, 59 - iat: i64, 60 - exp: i64, 61 - ath: String, 62 - #[serde(skip_serializing_if = "Option::is_none")] 63 - nonce: Option<String>, 64 - } 65 - 66 - /// Fetch the user's AT Protocol session (PDS credentials) from AIP. 67 - pub(crate) async fn get_atp_session(state: &AppState, token: &str) -> Result<AtpSession, AppError> { 68 - let url = format!( 69 - "{}/api/atprotocol/session", 70 - state.config.aip_url.trim_end_matches('/') 71 - ); 72 - 73 - let resp = state 74 - .http 75 - .get(&url) 76 - .header("authorization", format!("Bearer {token}")) 77 - .send() 78 - .await 79 - .map_err(|e| AppError::Internal(format!("AIP session request failed: {e}")))?; 80 - 81 - if !resp.status().is_success() { 82 - return Err(AppError::Auth(format!( 83 - "AIP session returned {}", 84 - resp.status() 85 - ))); 86 - } 87 - 88 - resp.json() 89 - .await 90 - .map_err(|e| AppError::Internal(format!("invalid AIP session response: {e}"))) 91 - } 92 - 93 - /// Generate a DPoP proof JWT for a PDS request. 94 - pub(crate) fn generate_dpop_proof( 95 - method: &str, 96 - url: &str, 97 - dpop_jwk: &DpopJwk, 98 - access_token: &str, 99 - nonce: Option<&str>, 100 - ) -> Result<String, AppError> { 101 - // Decode the private key from base64url 102 - let d_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 103 - .decode(&dpop_jwk.d) 104 - .map_err(|e| AppError::Internal(format!("invalid DPoP key d: {e}")))?; 105 - 106 - let secret_key = p256::SecretKey::from_slice(&d_bytes) 107 - .map_err(|e| AppError::Internal(format!("invalid P-256 key: {e}")))?; 108 - 109 - let pkcs8_der = secret_key 110 - .to_pkcs8_der() 111 - .map_err(|e| AppError::Internal(format!("PKCS#8 conversion failed: {e}")))?; 112 - 113 - let encoding_key = jsonwebtoken::EncodingKey::from_ec_der(pkcs8_der.as_bytes()); 114 - 115 - // Public JWK for the header (no private component) 116 - let public_jwk = jsonwebtoken::jwk::Jwk { 117 - common: jsonwebtoken::jwk::CommonParameters { 118 - public_key_use: None, 119 - key_operations: None, 120 - key_algorithm: None, 121 - key_id: None, 122 - x509_url: None, 123 - x509_chain: None, 124 - x509_sha1_fingerprint: None, 125 - x509_sha256_fingerprint: None, 126 - }, 127 - algorithm: jsonwebtoken::jwk::AlgorithmParameters::EllipticCurve( 128 - jsonwebtoken::jwk::EllipticCurveKeyParameters { 129 - key_type: jsonwebtoken::jwk::EllipticCurveKeyType::EC, 130 - curve: jsonwebtoken::jwk::EllipticCurve::P256, 131 - x: dpop_jwk.x.clone(), 132 - y: dpop_jwk.y.clone(), 133 - }, 134 - ), 135 - }; 136 - 137 - let mut header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::ES256); 138 - header.typ = Some("dpop+jwt".to_string()); 139 - header.jwk = Some(public_jwk); 140 - 141 - // Access token hash (ath) 142 - let ath_hash = Sha256::digest(access_token.as_bytes()); 143 - let ath = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(ath_hash); 144 - 145 - let now = chrono::Utc::now().timestamp(); 146 - let claims = DpopClaims { 147 - jti: uuid::Uuid::new_v4().to_string(), 148 - htm: method.to_uppercase(), 149 - htu: url.to_string(), 150 - iat: now, 151 - exp: now + 300, 152 - ath, 153 - nonce: nonce.map(|n| n.to_string()), 154 - }; 155 - 156 - jsonwebtoken::encode(&header, &claims, &encoding_key) 157 - .map_err(|e| AppError::Internal(format!("DPoP proof signing failed: {e}"))) 158 - } 159 - 160 - // --------------------------------------------------------------------------- 161 - // PDS request helpers with DPoP + nonce retry 162 - // --------------------------------------------------------------------------- 163 - 164 - /// Forward a PDS response back to the client, preserving status and body. 165 - pub(crate) async fn forward_pds_response(resp: reqwest::Response) -> Result<Response, AppError> { 166 - let status = resp.status(); 167 - let body = resp 168 - .bytes() 169 - .await 170 - .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 171 - 172 - let axum_status = StatusCode::from_u16(status.as_u16()).unwrap(); 173 - 174 - if status.is_success() { 175 - Ok(( 176 - axum_status, 177 - [(axum::http::header::CONTENT_TYPE, "application/json")], 178 - body, 179 - ) 180 - .into_response()) 181 - } else { 182 - let body_str = String::from_utf8_lossy(&body); 183 - tracing::warn!(status = %axum_status, body = %body_str, "PDS returned error"); 184 - Err(AppError::PdsError(axum_status, body)) 185 - } 186 - } 187 - 188 - /// POST JSON to a PDS XRPC endpoint with DPoP auth and nonce retry. 189 - /// Returns the raw reqwest::Response so callers can inspect the body. 190 - pub(crate) async fn pds_post_json_raw( 191 - state: &AppState, 192 - session: &AtpSession, 193 - xrpc_method: &str, 194 - body: &Value, 195 - ) -> Result<reqwest::Response, AppError> { 196 - let url = format!( 197 - "{}/xrpc/{xrpc_method}", 198 - session.pds_endpoint.trim_end_matches('/') 199 - ); 200 - 201 - let dpop = generate_dpop_proof("POST", &url, &session.dpop_jwk, &session.access_token, None)?; 202 - 203 - let resp = state 204 - .http 205 - .post(&url) 206 - .header("authorization", format!("DPoP {}", session.access_token)) 207 - .header("dpop", &dpop) 208 - .json(body) 209 - .send() 210 - .await 211 - .map_err(|e| AppError::Internal(format!("PDS request failed: {e}")))?; 212 - 213 - // Retry with nonce if PDS requires it 214 - if resp.status() == reqwest::StatusCode::UNAUTHORIZED 215 - && let Some(nonce) = resp 216 - .headers() 217 - .get("dpop-nonce") 218 - .and_then(|v| v.to_str().ok()) 219 - { 220 - let nonce = nonce.to_string(); 221 - tracing::debug!("retrying with DPoP nonce"); 222 - 223 - let dpop = generate_dpop_proof( 224 - "POST", 225 - &url, 226 - &session.dpop_jwk, 227 - &session.access_token, 228 - Some(&nonce), 229 - )?; 230 - 231 - let resp = state 232 - .http 233 - .post(&url) 234 - .header("authorization", format!("DPoP {}", session.access_token)) 235 - .header("dpop", &dpop) 236 - .json(body) 237 - .send() 238 - .await 239 - .map_err(|e| AppError::Internal(format!("PDS request retry failed: {e}")))?; 240 - 241 - return Ok(resp); 242 - } 243 - 244 - Ok(resp) 245 - } 246 - 247 - /// POST a binary blob to the PDS with DPoP auth and nonce retry. 248 - async fn pds_post_blob( 249 - state: &AppState, 250 - session: &AtpSession, 251 - content_type: &str, 252 - blob: Bytes, 253 - ) -> Result<Response, AppError> { 254 - let url = format!( 255 - "{}/xrpc/com.atproto.repo.uploadBlob", 256 - session.pds_endpoint.trim_end_matches('/') 257 - ); 258 - 259 - let dpop = generate_dpop_proof("POST", &url, &session.dpop_jwk, &session.access_token, None)?; 260 - 261 - let resp = state 262 - .http 263 - .post(&url) 264 - .header("authorization", format!("DPoP {}", session.access_token)) 265 - .header("dpop", &dpop) 266 - .header("content-type", content_type) 267 - .body(blob.clone()) 268 - .send() 269 - .await 270 - .map_err(|e| AppError::Internal(format!("PDS uploadBlob failed: {e}")))?; 271 - 272 - if resp.status() == reqwest::StatusCode::UNAUTHORIZED 273 - && let Some(nonce) = resp 274 - .headers() 275 - .get("dpop-nonce") 276 - .and_then(|v| v.to_str().ok()) 277 - { 278 - let nonce = nonce.to_string(); 279 - tracing::debug!("retrying uploadBlob with DPoP nonce"); 280 - 281 - let dpop = generate_dpop_proof( 282 - "POST", 283 - &url, 284 - &session.dpop_jwk, 285 - &session.access_token, 286 - Some(&nonce), 287 - )?; 288 - 289 - let resp = state 290 - .http 291 - .post(&url) 292 - .header("authorization", format!("DPoP {}", session.access_token)) 293 - .header("dpop", &dpop) 294 - .header("content-type", content_type) 295 - .body(blob) 296 - .send() 297 - .await 298 - .map_err(|e| AppError::Internal(format!("PDS uploadBlob retry failed: {e}")))?; 299 - 300 - return forward_pds_response(resp).await; 301 - } 302 - 303 - forward_pds_response(resp).await 304 - } 305 - 306 - // --------------------------------------------------------------------------- 307 - // Public handlers 308 - // --------------------------------------------------------------------------- 309 - 310 - pub async fn upload_blob( 311 - State(state): State<AppState>, 312 - claims: Claims, 313 - headers: HeaderMap, 314 - body: Bytes, 315 - ) -> Result<Response, AppError> { 316 - let session = get_atp_session(&state, claims.token()).await?; 317 - 318 - let content_type = headers 319 - .get("content-type") 320 - .and_then(|v| v.to_str().ok()) 321 - .unwrap_or("application/octet-stream"); 322 - 323 - pds_post_blob(&state, &session, content_type, body).await 324 - } 325 - 326 - /// Walk `media[]` and add a `url` field to each blob so the frontend can 327 - /// display images directly. 328 - pub(crate) fn enrich_media_blobs(record: &mut Value, pds: &str, did: &str) { 329 - let media = match record.get_mut("media").and_then(|m| m.as_array_mut()) { 330 - Some(arr) => arr, 331 - None => return, 332 - }; 333 - 334 - let pds_base = pds.trim_end_matches('/'); 335 - 336 - for item in media.iter_mut() { 337 - let cid = item 338 - .get("blob") 339 - .and_then(|b| b.get("ref")) 340 - .and_then(|r| r.get("$link")) 341 - .and_then(|l| l.as_str()) 342 - .map(|s| s.to_string()); 343 - 344 - if let Some(cid) = cid 345 - && let Some(blob) = item.get_mut("blob") 346 - && let Some(obj) = blob.as_object_mut() 347 - { 348 - obj.insert( 349 - "url".to_string(), 350 - json!(format!( 351 - "{pds_base}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}" 352 - )), 353 - ); 354 - } 355 - } 356 - } 357 - 358 - #[cfg(test)] 359 - mod tests { 360 - use super::*; 361 - 362 - // ----------------------------------------------------------------------- 363 - // parse_did_from_at_uri 364 - // ----------------------------------------------------------------------- 365 - 366 - #[test] 367 - fn parse_did_from_valid_at_uri() { 368 - let did = parse_did_from_at_uri("at://did:plc:abc123/app.bsky.feed.post/3k2bqxyz").unwrap(); 369 - assert_eq!(did, "did:plc:abc123"); 370 - } 371 - 372 - #[test] 373 - fn parse_did_from_uri_with_no_rkey() { 374 - let did = parse_did_from_at_uri("at://did:plc:abc123/collection").unwrap(); 375 - assert_eq!(did, "did:plc:abc123"); 376 - } 377 - 378 - #[test] 379 - fn parse_did_from_did_web_uri() { 380 - let did = parse_did_from_at_uri("at://did:web:example.com/collection/rkey").unwrap(); 381 - assert_eq!(did, "did:web:example.com"); 382 - } 383 - 384 - #[test] 385 - fn parse_did_from_uri_missing_prefix() { 386 - let result = parse_did_from_at_uri("did:plc:abc123/collection/rkey"); 387 - assert!(result.is_err()); 388 - } 389 - 390 - // ----------------------------------------------------------------------- 391 - // enrich_media_blobs 392 - // ----------------------------------------------------------------------- 393 - 394 - #[test] 395 - fn enrich_media_adds_url() { 396 - let mut record = json!({ 397 - "media": [{ 398 - "blob": { 399 - "ref": { "$link": "bafyreiabc" }, 400 - "mimeType": "image/jpeg", 401 - "size": 1024 402 - } 403 - }] 404 - }); 405 - 406 - enrich_media_blobs(&mut record, "https://pds.example.com", "did:plc:test"); 407 - 408 - let url = record["media"][0]["blob"]["url"].as_str().unwrap(); 409 - assert_eq!( 410 - url, 411 - "https://pds.example.com/xrpc/com.atproto.sync.getBlob?did=did:plc:test&cid=bafyreiabc" 412 - ); 413 - } 414 - 415 - #[test] 416 - fn enrich_media_noop_without_media() { 417 - let mut record = json!({"title": "test"}); 418 - enrich_media_blobs(&mut record, "https://pds.example.com", "did:plc:test"); 419 - assert!(record.get("media").is_none()); 420 - } 421 - 422 - #[test] 423 - fn enrich_media_skips_items_without_ref() { 424 - let mut record = json!({ 425 - "media": [{ 426 - "blob": { "mimeType": "image/png" } 427 - }] 428 - }); 429 - 430 - enrich_media_blobs(&mut record, "https://pds.example.com", "did:plc:test"); 431 - assert!(record["media"][0]["blob"].get("url").is_none()); 432 - } 433 - 434 - #[test] 435 - fn enrich_media_handles_multiple_items() { 436 - let mut record = json!({ 437 - "media": [ 438 - { "blob": { "ref": { "$link": "cid1" } } }, 439 - { "blob": { "ref": { "$link": "cid2" } } } 440 - ] 441 - }); 442 - 443 - enrich_media_blobs(&mut record, "https://pds.example.com/", "did:plc:x"); 444 - 445 - let url1 = record["media"][0]["blob"]["url"].as_str().unwrap(); 446 - let url2 = record["media"][1]["blob"]["url"].as_str().unwrap(); 447 - assert!(url1.contains("cid1")); 448 - assert!(url2.contains("cid2")); 449 - } 450 - 451 - #[test] 452 - fn enrich_media_trims_trailing_slash() { 453 - let mut record = json!({ 454 - "media": [{ 455 - "blob": { "ref": { "$link": "bafytest" } } 456 - }] 457 - }); 458 - 459 - enrich_media_blobs(&mut record, "https://pds.example.com/", "did:plc:test"); 460 - 461 - let url = record["media"][0]["blob"]["url"].as_str().unwrap(); 462 - assert!(url.starts_with("https://pds.example.com/xrpc/")); 463 - assert!(!url.contains("//xrpc")); 464 - } 465 - 466 - // ----------------------------------------------------------------------- 467 - // generate_dpop_proof 468 - // ----------------------------------------------------------------------- 469 - 470 - fn test_dpop_jwk() -> DpopJwk { 471 - use p256::elliptic_curve::rand_core::OsRng; 472 - use p256::elliptic_curve::sec1::ToEncodedPoint; 473 - // Generate a valid P-256 key for testing 474 - let secret = p256::SecretKey::random(&mut OsRng); 475 - let public = secret.public_key(); 476 - let point = public.to_encoded_point(false); 477 - 478 - DpopJwk { 479 - x: base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(point.x().unwrap()), 480 - y: base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(point.y().unwrap()), 481 - d: base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(secret.to_bytes()), 482 - } 483 - } 484 - 485 - #[test] 486 - fn dpop_proof_produces_valid_jwt_structure() { 487 - let jwk = test_dpop_jwk(); 488 - let token = generate_dpop_proof( 489 - "POST", 490 - "https://pds.example.com/xrpc/test", 491 - &jwk, 492 - "access-tok", 493 - None, 494 - ) 495 - .unwrap(); 496 - 497 - let parts: Vec<&str> = token.split('.').collect(); 498 - assert_eq!(parts.len(), 3, "JWT should have 3 parts"); 499 - } 500 - 501 - #[test] 502 - fn dpop_proof_header_has_correct_fields() { 503 - let jwk = test_dpop_jwk(); 504 - let token = generate_dpop_proof( 505 - "POST", 506 - "https://pds.example.com/xrpc/test", 507 - &jwk, 508 - "access-tok", 509 - None, 510 - ) 511 - .unwrap(); 512 - 513 - let header_b64 = token.split('.').next().unwrap(); 514 - let header_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 515 - .decode(header_b64) 516 - .unwrap(); 517 - let header: serde_json::Value = serde_json::from_slice(&header_bytes).unwrap(); 518 - 519 - assert_eq!(header["typ"], "dpop+jwt"); 520 - assert_eq!(header["alg"], "ES256"); 521 - assert!(header.get("jwk").is_some()); 522 - } 523 - 524 - #[test] 525 - fn dpop_proof_claims_have_correct_fields() { 526 - let jwk = test_dpop_jwk(); 527 - let token = generate_dpop_proof( 528 - "GET", 529 - "https://pds.example.com/xrpc/test", 530 - &jwk, 531 - "my-access-token", 532 - None, 533 - ) 534 - .unwrap(); 535 - 536 - let payload_b64 = token.split('.').nth(1).unwrap(); 537 - let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 538 - .decode(payload_b64) 539 - .unwrap(); 540 - let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).unwrap(); 541 - 542 - assert_eq!(claims["htm"], "GET"); 543 - assert_eq!(claims["htu"], "https://pds.example.com/xrpc/test"); 544 - assert!(claims.get("jti").is_some()); 545 - assert!(claims.get("iat").is_some()); 546 - assert!(claims.get("exp").is_some()); 547 - assert!(claims.get("ath").is_some()); 548 - assert!(claims.get("nonce").is_none()); 549 - } 550 - 551 - #[test] 552 - fn dpop_proof_includes_nonce_when_provided() { 553 - let jwk = test_dpop_jwk(); 554 - let token = generate_dpop_proof( 555 - "POST", 556 - "https://pds.example.com/xrpc/test", 557 - &jwk, 558 - "tok", 559 - Some("abc123"), 560 - ) 561 - .unwrap(); 562 - 563 - let payload_b64 = token.split('.').nth(1).unwrap(); 564 - let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 565 - .decode(payload_b64) 566 - .unwrap(); 567 - let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).unwrap(); 568 - 569 - assert_eq!(claims["nonce"], "abc123"); 570 - } 571 - 572 - #[test] 573 - fn dpop_proof_ath_is_sha256_of_access_token() { 574 - let jwk = test_dpop_jwk(); 575 - let access_token = "test-access-token"; 576 - let token = 577 - generate_dpop_proof("POST", "https://example.com", &jwk, access_token, None).unwrap(); 578 - 579 - let payload_b64 = token.split('.').nth(1).unwrap(); 580 - let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 581 - .decode(payload_b64) 582 - .unwrap(); 583 - let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).unwrap(); 584 - 585 - let expected_hash = Sha256::digest(access_token.as_bytes()); 586 - let expected_ath = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(expected_hash); 587 - assert_eq!(claims["ath"], expected_ath); 588 - } 589 - 590 - #[test] 591 - fn dpop_proof_invalid_key_returns_error() { 592 - let jwk = DpopJwk { 593 - x: "invalid".into(), 594 - y: "invalid".into(), 595 - d: "invalid".into(), 596 - }; 597 - let result = generate_dpop_proof("POST", "https://example.com", &jwk, "tok", None); 598 - assert!(result.is_err()); 599 - } 600 - }
+43
src/repo/at_uri.rs
··· 1 + use crate::error::AppError; 2 + 3 + /// Extract the DID from an AT URI (at://did/collection/rkey). 4 + pub(crate) fn parse_did_from_at_uri(uri: &str) -> Result<String, AppError> { 5 + let stripped = uri 6 + .strip_prefix("at://") 7 + .ok_or_else(|| AppError::Internal("AT URI must start with at://".into()))?; 8 + 9 + stripped 10 + .split('/') 11 + .next() 12 + .map(|s| s.to_string()) 13 + .ok_or_else(|| AppError::Internal("invalid AT URI".into())) 14 + } 15 + 16 + #[cfg(test)] 17 + mod tests { 18 + use super::*; 19 + 20 + #[test] 21 + fn parse_did_from_valid_at_uri() { 22 + let did = parse_did_from_at_uri("at://did:plc:abc123/app.bsky.feed.post/3k2bqxyz").unwrap(); 23 + assert_eq!(did, "did:plc:abc123"); 24 + } 25 + 26 + #[test] 27 + fn parse_did_from_uri_with_no_rkey() { 28 + let did = parse_did_from_at_uri("at://did:plc:abc123/collection").unwrap(); 29 + assert_eq!(did, "did:plc:abc123"); 30 + } 31 + 32 + #[test] 33 + fn parse_did_from_did_web_uri() { 34 + let did = parse_did_from_at_uri("at://did:web:example.com/collection/rkey").unwrap(); 35 + assert_eq!(did, "did:web:example.com"); 36 + } 37 + 38 + #[test] 39 + fn parse_did_from_uri_missing_prefix() { 40 + let result = parse_did_from_at_uri("did:plc:abc123/collection/rkey"); 41 + assert!(result.is_err()); 42 + } 43 + }
+222
src/repo/dpop.rs
··· 1 + use base64::Engine; 2 + use p256::pkcs8::EncodePrivateKey; 3 + use sha2::{Digest, Sha256}; 4 + 5 + use crate::error::AppError; 6 + 7 + use super::session::DpopJwk; 8 + 9 + #[derive(serde::Serialize)] 10 + struct DpopClaims { 11 + jti: String, 12 + htm: String, 13 + htu: String, 14 + iat: i64, 15 + exp: i64, 16 + ath: String, 17 + #[serde(skip_serializing_if = "Option::is_none")] 18 + nonce: Option<String>, 19 + } 20 + 21 + /// Generate a DPoP proof JWT for a PDS request. 22 + pub(crate) fn generate_dpop_proof( 23 + method: &str, 24 + url: &str, 25 + dpop_jwk: &DpopJwk, 26 + access_token: &str, 27 + nonce: Option<&str>, 28 + ) -> Result<String, AppError> { 29 + // Decode the private key from base64url 30 + let d_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 31 + .decode(&dpop_jwk.d) 32 + .map_err(|e| AppError::Internal(format!("invalid DPoP key d: {e}")))?; 33 + 34 + let secret_key = p256::SecretKey::from_slice(&d_bytes) 35 + .map_err(|e| AppError::Internal(format!("invalid P-256 key: {e}")))?; 36 + 37 + let pkcs8_der = secret_key 38 + .to_pkcs8_der() 39 + .map_err(|e| AppError::Internal(format!("PKCS#8 conversion failed: {e}")))?; 40 + 41 + let encoding_key = jsonwebtoken::EncodingKey::from_ec_der(pkcs8_der.as_bytes()); 42 + 43 + // Public JWK for the header (no private component) 44 + let public_jwk = jsonwebtoken::jwk::Jwk { 45 + common: jsonwebtoken::jwk::CommonParameters { 46 + public_key_use: None, 47 + key_operations: None, 48 + key_algorithm: None, 49 + key_id: None, 50 + x509_url: None, 51 + x509_chain: None, 52 + x509_sha1_fingerprint: None, 53 + x509_sha256_fingerprint: None, 54 + }, 55 + algorithm: jsonwebtoken::jwk::AlgorithmParameters::EllipticCurve( 56 + jsonwebtoken::jwk::EllipticCurveKeyParameters { 57 + key_type: jsonwebtoken::jwk::EllipticCurveKeyType::EC, 58 + curve: jsonwebtoken::jwk::EllipticCurve::P256, 59 + x: dpop_jwk.x.clone(), 60 + y: dpop_jwk.y.clone(), 61 + }, 62 + ), 63 + }; 64 + 65 + let mut header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::ES256); 66 + header.typ = Some("dpop+jwt".to_string()); 67 + header.jwk = Some(public_jwk); 68 + 69 + // Access token hash (ath) 70 + let ath_hash = Sha256::digest(access_token.as_bytes()); 71 + let ath = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(ath_hash); 72 + 73 + let now = chrono::Utc::now().timestamp(); 74 + let claims = DpopClaims { 75 + jti: uuid::Uuid::new_v4().to_string(), 76 + htm: method.to_uppercase(), 77 + htu: url.to_string(), 78 + iat: now, 79 + exp: now + 300, 80 + ath, 81 + nonce: nonce.map(|n| n.to_string()), 82 + }; 83 + 84 + jsonwebtoken::encode(&header, &claims, &encoding_key) 85 + .map_err(|e| AppError::Internal(format!("DPoP proof signing failed: {e}"))) 86 + } 87 + 88 + #[cfg(test)] 89 + mod tests { 90 + use super::*; 91 + 92 + fn test_dpop_jwk() -> DpopJwk { 93 + use p256::elliptic_curve::rand_core::OsRng; 94 + use p256::elliptic_curve::sec1::ToEncodedPoint; 95 + // Generate a valid P-256 key for testing 96 + let secret = p256::SecretKey::random(&mut OsRng); 97 + let public = secret.public_key(); 98 + let point = public.to_encoded_point(false); 99 + 100 + DpopJwk { 101 + x: base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(point.x().unwrap()), 102 + y: base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(point.y().unwrap()), 103 + d: base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(secret.to_bytes()), 104 + } 105 + } 106 + 107 + #[test] 108 + fn dpop_proof_produces_valid_jwt_structure() { 109 + let jwk = test_dpop_jwk(); 110 + let token = generate_dpop_proof( 111 + "POST", 112 + "https://pds.example.com/xrpc/test", 113 + &jwk, 114 + "access-tok", 115 + None, 116 + ) 117 + .unwrap(); 118 + 119 + let parts: Vec<&str> = token.split('.').collect(); 120 + assert_eq!(parts.len(), 3, "JWT should have 3 parts"); 121 + } 122 + 123 + #[test] 124 + fn dpop_proof_header_has_correct_fields() { 125 + let jwk = test_dpop_jwk(); 126 + let token = generate_dpop_proof( 127 + "POST", 128 + "https://pds.example.com/xrpc/test", 129 + &jwk, 130 + "access-tok", 131 + None, 132 + ) 133 + .unwrap(); 134 + 135 + let header_b64 = token.split('.').next().unwrap(); 136 + let header_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 137 + .decode(header_b64) 138 + .unwrap(); 139 + let header: serde_json::Value = serde_json::from_slice(&header_bytes).unwrap(); 140 + 141 + assert_eq!(header["typ"], "dpop+jwt"); 142 + assert_eq!(header["alg"], "ES256"); 143 + assert!(header.get("jwk").is_some()); 144 + } 145 + 146 + #[test] 147 + fn dpop_proof_claims_have_correct_fields() { 148 + let jwk = test_dpop_jwk(); 149 + let token = generate_dpop_proof( 150 + "GET", 151 + "https://pds.example.com/xrpc/test", 152 + &jwk, 153 + "my-access-token", 154 + None, 155 + ) 156 + .unwrap(); 157 + 158 + let payload_b64 = token.split('.').nth(1).unwrap(); 159 + let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 160 + .decode(payload_b64) 161 + .unwrap(); 162 + let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).unwrap(); 163 + 164 + assert_eq!(claims["htm"], "GET"); 165 + assert_eq!(claims["htu"], "https://pds.example.com/xrpc/test"); 166 + assert!(claims.get("jti").is_some()); 167 + assert!(claims.get("iat").is_some()); 168 + assert!(claims.get("exp").is_some()); 169 + assert!(claims.get("ath").is_some()); 170 + assert!(claims.get("nonce").is_none()); 171 + } 172 + 173 + #[test] 174 + fn dpop_proof_includes_nonce_when_provided() { 175 + let jwk = test_dpop_jwk(); 176 + let token = generate_dpop_proof( 177 + "POST", 178 + "https://pds.example.com/xrpc/test", 179 + &jwk, 180 + "tok", 181 + Some("abc123"), 182 + ) 183 + .unwrap(); 184 + 185 + let payload_b64 = token.split('.').nth(1).unwrap(); 186 + let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 187 + .decode(payload_b64) 188 + .unwrap(); 189 + let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).unwrap(); 190 + 191 + assert_eq!(claims["nonce"], "abc123"); 192 + } 193 + 194 + #[test] 195 + fn dpop_proof_ath_is_sha256_of_access_token() { 196 + let jwk = test_dpop_jwk(); 197 + let access_token = "test-access-token"; 198 + let token = 199 + generate_dpop_proof("POST", "https://example.com", &jwk, access_token, None).unwrap(); 200 + 201 + let payload_b64 = token.split('.').nth(1).unwrap(); 202 + let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 203 + .decode(payload_b64) 204 + .unwrap(); 205 + let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).unwrap(); 206 + 207 + let expected_hash = Sha256::digest(access_token.as_bytes()); 208 + let expected_ath = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(expected_hash); 209 + assert_eq!(claims["ath"], expected_ath); 210 + } 211 + 212 + #[test] 213 + fn dpop_proof_invalid_key_returns_error() { 214 + let jwk = DpopJwk { 215 + x: "invalid".into(), 216 + y: "invalid".into(), 217 + d: "invalid".into(), 218 + }; 219 + let result = generate_dpop_proof("POST", "https://example.com", &jwk, "tok", None); 220 + assert!(result.is_err()); 221 + } 222 + }
+110
src/repo/media.rs
··· 1 + use serde_json::{Value, json}; 2 + 3 + /// Walk `media[]` and add a `url` field to each blob so the frontend can 4 + /// display images directly. 5 + pub(crate) fn enrich_media_blobs(record: &mut Value, pds: &str, did: &str) { 6 + let media = match record.get_mut("media").and_then(|m| m.as_array_mut()) { 7 + Some(arr) => arr, 8 + None => return, 9 + }; 10 + 11 + let pds_base = pds.trim_end_matches('/'); 12 + 13 + for item in media.iter_mut() { 14 + let cid = item 15 + .get("blob") 16 + .and_then(|b| b.get("ref")) 17 + .and_then(|r| r.get("$link")) 18 + .and_then(|l| l.as_str()) 19 + .map(|s| s.to_string()); 20 + 21 + if let Some(cid) = cid 22 + && let Some(blob) = item.get_mut("blob") 23 + && let Some(obj) = blob.as_object_mut() 24 + { 25 + obj.insert( 26 + "url".to_string(), 27 + json!(format!( 28 + "{pds_base}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}" 29 + )), 30 + ); 31 + } 32 + } 33 + } 34 + 35 + #[cfg(test)] 36 + mod tests { 37 + use super::*; 38 + 39 + #[test] 40 + fn enrich_media_adds_url() { 41 + let mut record = json!({ 42 + "media": [{ 43 + "blob": { 44 + "ref": { "$link": "bafyreiabc" }, 45 + "mimeType": "image/jpeg", 46 + "size": 1024 47 + } 48 + }] 49 + }); 50 + 51 + enrich_media_blobs(&mut record, "https://pds.example.com", "did:plc:test"); 52 + 53 + let url = record["media"][0]["blob"]["url"].as_str().unwrap(); 54 + assert_eq!( 55 + url, 56 + "https://pds.example.com/xrpc/com.atproto.sync.getBlob?did=did:plc:test&cid=bafyreiabc" 57 + ); 58 + } 59 + 60 + #[test] 61 + fn enrich_media_noop_without_media() { 62 + let mut record = json!({"title": "test"}); 63 + enrich_media_blobs(&mut record, "https://pds.example.com", "did:plc:test"); 64 + assert!(record.get("media").is_none()); 65 + } 66 + 67 + #[test] 68 + fn enrich_media_skips_items_without_ref() { 69 + let mut record = json!({ 70 + "media": [{ 71 + "blob": { "mimeType": "image/png" } 72 + }] 73 + }); 74 + 75 + enrich_media_blobs(&mut record, "https://pds.example.com", "did:plc:test"); 76 + assert!(record["media"][0]["blob"].get("url").is_none()); 77 + } 78 + 79 + #[test] 80 + fn enrich_media_handles_multiple_items() { 81 + let mut record = json!({ 82 + "media": [ 83 + { "blob": { "ref": { "$link": "cid1" } } }, 84 + { "blob": { "ref": { "$link": "cid2" } } } 85 + ] 86 + }); 87 + 88 + enrich_media_blobs(&mut record, "https://pds.example.com/", "did:plc:x"); 89 + 90 + let url1 = record["media"][0]["blob"]["url"].as_str().unwrap(); 91 + let url2 = record["media"][1]["blob"]["url"].as_str().unwrap(); 92 + assert!(url1.contains("cid1")); 93 + assert!(url2.contains("cid2")); 94 + } 95 + 96 + #[test] 97 + fn enrich_media_trims_trailing_slash() { 98 + let mut record = json!({ 99 + "media": [{ 100 + "blob": { "ref": { "$link": "bafytest" } } 101 + }] 102 + }); 103 + 104 + enrich_media_blobs(&mut record, "https://pds.example.com/", "did:plc:test"); 105 + 106 + let url = record["media"][0]["blob"]["url"].as_str().unwrap(); 107 + assert!(url.starts_with("https://pds.example.com/xrpc/")); 108 + assert!(!url.contains("//xrpc")); 109 + } 110 + }
+12
src/repo/mod.rs
··· 1 + mod at_uri; 2 + mod dpop; 3 + mod media; 4 + mod pds; 5 + pub(crate) mod session; 6 + mod upload_blob; 7 + 8 + pub use upload_blob::upload_blob; 9 + pub(crate) use at_uri::parse_did_from_at_uri; 10 + pub(crate) use media::enrich_media_blobs; 11 + pub(crate) use pds::{forward_pds_response, pds_post_json_raw}; 12 + pub(crate) use session::{AtpSession, get_atp_session};
+152
src/repo/pds.rs
··· 1 + use axum::body::Bytes; 2 + use axum::http::StatusCode; 3 + use axum::response::{IntoResponse, Response}; 4 + use serde_json::Value; 5 + 6 + use crate::AppState; 7 + use crate::error::AppError; 8 + 9 + use super::dpop::generate_dpop_proof; 10 + use super::session::AtpSession; 11 + 12 + /// Forward a PDS response back to the client, preserving status and body. 13 + pub(crate) async fn forward_pds_response(resp: reqwest::Response) -> Result<Response, AppError> { 14 + let status = resp.status(); 15 + let body = resp 16 + .bytes() 17 + .await 18 + .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 19 + 20 + let axum_status = StatusCode::from_u16(status.as_u16()).unwrap(); 21 + 22 + if status.is_success() { 23 + Ok(( 24 + axum_status, 25 + [(axum::http::header::CONTENT_TYPE, "application/json")], 26 + body, 27 + ) 28 + .into_response()) 29 + } else { 30 + let body_str = String::from_utf8_lossy(&body); 31 + tracing::warn!(status = %axum_status, body = %body_str, "PDS returned error"); 32 + Err(AppError::PdsError(axum_status, body)) 33 + } 34 + } 35 + 36 + /// POST JSON to a PDS XRPC endpoint with DPoP auth and nonce retry. 37 + /// Returns the raw reqwest::Response so callers can inspect the body. 38 + pub(crate) async fn pds_post_json_raw( 39 + state: &AppState, 40 + session: &AtpSession, 41 + xrpc_method: &str, 42 + body: &Value, 43 + ) -> Result<reqwest::Response, AppError> { 44 + let url = format!( 45 + "{}/xrpc/{xrpc_method}", 46 + session.pds_endpoint.trim_end_matches('/') 47 + ); 48 + 49 + let dpop = generate_dpop_proof("POST", &url, &session.dpop_jwk, &session.access_token, None)?; 50 + 51 + let resp = state 52 + .http 53 + .post(&url) 54 + .header("authorization", format!("DPoP {}", session.access_token)) 55 + .header("dpop", &dpop) 56 + .json(body) 57 + .send() 58 + .await 59 + .map_err(|e| AppError::Internal(format!("PDS request failed: {e}")))?; 60 + 61 + // Retry with nonce if PDS requires it 62 + if resp.status() == reqwest::StatusCode::UNAUTHORIZED 63 + && let Some(nonce) = resp 64 + .headers() 65 + .get("dpop-nonce") 66 + .and_then(|v| v.to_str().ok()) 67 + { 68 + let nonce = nonce.to_string(); 69 + tracing::debug!("retrying with DPoP nonce"); 70 + 71 + let dpop = generate_dpop_proof( 72 + "POST", 73 + &url, 74 + &session.dpop_jwk, 75 + &session.access_token, 76 + Some(&nonce), 77 + )?; 78 + 79 + let resp = state 80 + .http 81 + .post(&url) 82 + .header("authorization", format!("DPoP {}", session.access_token)) 83 + .header("dpop", &dpop) 84 + .json(body) 85 + .send() 86 + .await 87 + .map_err(|e| AppError::Internal(format!("PDS request retry failed: {e}")))?; 88 + 89 + return Ok(resp); 90 + } 91 + 92 + Ok(resp) 93 + } 94 + 95 + /// POST a binary blob to the PDS with DPoP auth and nonce retry. 96 + pub(super) async fn pds_post_blob( 97 + state: &AppState, 98 + session: &AtpSession, 99 + content_type: &str, 100 + blob: Bytes, 101 + ) -> Result<Response, AppError> { 102 + let url = format!( 103 + "{}/xrpc/com.atproto.repo.uploadBlob", 104 + session.pds_endpoint.trim_end_matches('/') 105 + ); 106 + 107 + let dpop = generate_dpop_proof("POST", &url, &session.dpop_jwk, &session.access_token, None)?; 108 + 109 + let resp = state 110 + .http 111 + .post(&url) 112 + .header("authorization", format!("DPoP {}", session.access_token)) 113 + .header("dpop", &dpop) 114 + .header("content-type", content_type) 115 + .body(blob.clone()) 116 + .send() 117 + .await 118 + .map_err(|e| AppError::Internal(format!("PDS uploadBlob failed: {e}")))?; 119 + 120 + if resp.status() == reqwest::StatusCode::UNAUTHORIZED 121 + && let Some(nonce) = resp 122 + .headers() 123 + .get("dpop-nonce") 124 + .and_then(|v| v.to_str().ok()) 125 + { 126 + let nonce = nonce.to_string(); 127 + tracing::debug!("retrying uploadBlob with DPoP nonce"); 128 + 129 + let dpop = generate_dpop_proof( 130 + "POST", 131 + &url, 132 + &session.dpop_jwk, 133 + &session.access_token, 134 + Some(&nonce), 135 + )?; 136 + 137 + let resp = state 138 + .http 139 + .post(&url) 140 + .header("authorization", format!("DPoP {}", session.access_token)) 141 + .header("dpop", &dpop) 142 + .header("content-type", content_type) 143 + .body(blob) 144 + .send() 145 + .await 146 + .map_err(|e| AppError::Internal(format!("PDS uploadBlob retry failed: {e}")))?; 147 + 148 + return forward_pds_response(resp).await; 149 + } 150 + 151 + forward_pds_response(resp).await 152 + }
+45
src/repo/session.rs
··· 1 + use serde::Deserialize; 2 + 3 + use crate::AppState; 4 + use crate::error::AppError; 5 + 6 + #[derive(Deserialize)] 7 + pub(crate) struct AtpSession { 8 + pub(crate) access_token: String, 9 + pub(crate) pds_endpoint: String, 10 + pub(crate) dpop_jwk: DpopJwk, 11 + } 12 + 13 + #[derive(Deserialize)] 14 + pub(crate) struct DpopJwk { 15 + pub(crate) x: String, 16 + pub(crate) y: String, 17 + pub(crate) d: String, 18 + } 19 + 20 + /// Fetch the user's AT Protocol session (PDS credentials) from AIP. 21 + pub(crate) async fn get_atp_session(state: &AppState, token: &str) -> Result<AtpSession, AppError> { 22 + let url = format!( 23 + "{}/api/atprotocol/session", 24 + state.config.aip_url.trim_end_matches('/') 25 + ); 26 + 27 + let resp = state 28 + .http 29 + .get(&url) 30 + .header("authorization", format!("Bearer {token}")) 31 + .send() 32 + .await 33 + .map_err(|e| AppError::Internal(format!("AIP session request failed: {e}")))?; 34 + 35 + if !resp.status().is_success() { 36 + return Err(AppError::Auth(format!( 37 + "AIP session returned {}", 38 + resp.status() 39 + ))); 40 + } 41 + 42 + resp.json() 43 + .await 44 + .map_err(|e| AppError::Internal(format!("invalid AIP session response: {e}"))) 45 + }
+27
src/repo/upload_blob.rs
··· 1 + use axum::body::Bytes; 2 + use axum::extract::State; 3 + use axum::http::HeaderMap; 4 + use axum::response::Response; 5 + 6 + use crate::AppState; 7 + use crate::auth::Claims; 8 + use crate::error::AppError; 9 + 10 + use super::pds::pds_post_blob; 11 + use super::session::get_atp_session; 12 + 13 + pub async fn upload_blob( 14 + State(state): State<AppState>, 15 + claims: Claims, 16 + headers: HeaderMap, 17 + body: Bytes, 18 + ) -> Result<Response, AppError> { 19 + let session = get_atp_session(&state, claims.token()).await?; 20 + 21 + let content_type = headers 22 + .get("content-type") 23 + .and_then(|v| v.to_str().ok()) 24 + .unwrap_or("application/octet-stream"); 25 + 26 + pds_post_blob(&state, &session, content_type, body).await 27 + }
-351
src/xrpc.rs
··· 1 - use axum::Json; 2 - use axum::extract::{Path, Query, State}; 3 - use axum::http::StatusCode; 4 - use axum::response::{IntoResponse, Response}; 5 - use serde_json::{Value, json}; 6 - use std::collections::{HashMap, HashSet}; 7 - 8 - use crate::AppState; 9 - use crate::auth::Claims; 10 - use crate::error::AppError; 11 - use crate::lexicon::LexiconType; 12 - use crate::profile; 13 - use crate::repo; 14 - 15 - // --------------------------------------------------------------------------- 16 - // Catch-all handler 17 - // --------------------------------------------------------------------------- 18 - 19 - /// Catch-all GET handler for XRPC queries. 20 - pub async fn xrpc_get( 21 - State(state): State<AppState>, 22 - Path(method): Path<String>, 23 - Query(params): Query<HashMap<String, String>>, 24 - ) -> Result<Response, AppError> { 25 - let lexicon = state 26 - .lexicons 27 - .get(&method) 28 - .await 29 - .ok_or_else(|| AppError::BadRequest(format!("method not found: {method}")))?; 30 - 31 - if lexicon.lexicon_type != LexiconType::Query { 32 - return Err(AppError::BadRequest(format!( 33 - "{method} is not a query endpoint" 34 - ))); 35 - } 36 - 37 - handle_query(&state, &method, &params, &lexicon).await 38 - } 39 - 40 - /// Catch-all POST handler for XRPC procedures. 41 - pub async fn xrpc_post( 42 - State(state): State<AppState>, 43 - Path(method): Path<String>, 44 - claims: Claims, 45 - Json(body): Json<Value>, 46 - ) -> Result<Response, AppError> { 47 - let lexicon = state 48 - .lexicons 49 - .get(&method) 50 - .await 51 - .ok_or_else(|| AppError::BadRequest(format!("method not found: {method}")))?; 52 - 53 - if lexicon.lexicon_type != LexiconType::Procedure { 54 - return Err(AppError::BadRequest(format!( 55 - "{method} is not a procedure endpoint" 56 - ))); 57 - } 58 - 59 - handle_procedure(&state, &method, &claims, &body, &lexicon).await 60 - } 61 - 62 - // --------------------------------------------------------------------------- 63 - // Generic query handler 64 - // --------------------------------------------------------------------------- 65 - 66 - async fn handle_query( 67 - state: &AppState, 68 - method: &str, 69 - params: &HashMap<String, String>, 70 - lexicon: &crate::lexicon::ParsedLexicon, 71 - ) -> Result<Response, AppError> { 72 - // Single-record query: has a `uri` parameter 73 - if let Some(uri) = params.get("uri") { 74 - return handle_get_record(state, uri).await; 75 - } 76 - 77 - // List query: needs a target collection to know what to query 78 - let collection = lexicon.target_collection.as_deref().ok_or_else(|| { 79 - AppError::BadRequest(format!( 80 - "{method} has no target_collection configured for list queries" 81 - )) 82 - })?; 83 - 84 - let limit: i64 = params 85 - .get("limit") 86 - .and_then(|l| l.parse().ok()) 87 - .unwrap_or(20) 88 - .min(100); 89 - 90 - let offset: i64 = params 91 - .get("cursor") 92 - .and_then(|c| c.parse().ok()) 93 - .unwrap_or(0); 94 - 95 - let did = params.get("did"); 96 - 97 - let rows: Vec<(String, String, Value)> = if let Some(did) = did { 98 - sqlx::query_as( 99 - "SELECT uri, did, record FROM records WHERE collection = $1 AND did = $2 ORDER BY indexed_at DESC LIMIT $3 OFFSET $4", 100 - ) 101 - .bind(collection) 102 - .bind(did) 103 - .bind(limit) 104 - .bind(offset) 105 - .fetch_all(&state.db) 106 - .await 107 - .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))? 108 - } else { 109 - sqlx::query_as( 110 - "SELECT uri, did, record FROM records WHERE collection = $1 ORDER BY indexed_at DESC LIMIT $2 OFFSET $3", 111 - ) 112 - .bind(collection) 113 - .bind(limit) 114 - .bind(offset) 115 - .fetch_all(&state.db) 116 - .await 117 - .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))? 118 - }; 119 - 120 - let has_next_page = rows.len() as i64 == limit; 121 - 122 - // Resolve PDS endpoints for blob URL enrichment. 123 - let unique_dids: HashSet<&str> = rows.iter().map(|(_, did, _)| did.as_str()).collect(); 124 - let mut pds_map: HashMap<String, String> = HashMap::new(); 125 - for did in unique_dids { 126 - if let Ok(pds) = 127 - profile::resolve_pds_endpoint(&state.http, &state.config.plc_url, did).await 128 - { 129 - pds_map.insert(did.to_string(), pds); 130 - } 131 - } 132 - 133 - let records: Vec<Value> = rows 134 - .into_iter() 135 - .map(|(uri, did, mut record)| { 136 - if let Some(pds) = pds_map.get(&did) { 137 - repo::enrich_media_blobs(&mut record, pds, &did); 138 - } 139 - record 140 - .as_object_mut() 141 - .map(|obj| obj.insert("uri".to_string(), json!(uri))); 142 - record 143 - }) 144 - .collect(); 145 - 146 - let mut result = json!({ "records": records }); 147 - if has_next_page { 148 - let next_cursor = (offset + limit).to_string(); 149 - result 150 - .as_object_mut() 151 - .unwrap() 152 - .insert("cursor".to_string(), json!(next_cursor)); 153 - } 154 - 155 - Ok(Json(result).into_response()) 156 - } 157 - 158 - async fn handle_get_record(state: &AppState, uri: &str) -> Result<Response, AppError> { 159 - let did = repo::parse_did_from_at_uri(uri)?; 160 - 161 - let row: Option<(Value,)> = sqlx::query_as("SELECT record FROM records WHERE uri = $1") 162 - .bind(uri) 163 - .fetch_optional(&state.db) 164 - .await 165 - .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))?; 166 - 167 - let (mut record,) = row.ok_or_else(|| AppError::NotFound("record not found".into()))?; 168 - 169 - let pds = profile::resolve_pds_endpoint(&state.http, &state.config.plc_url, &did).await?; 170 - repo::enrich_media_blobs(&mut record, &pds, &did); 171 - 172 - record 173 - .as_object_mut() 174 - .map(|obj| obj.insert("uri".to_string(), json!(uri))); 175 - 176 - Ok(Json(json!({ "record": record })).into_response()) 177 - } 178 - 179 - // --------------------------------------------------------------------------- 180 - // Generic procedure handler 181 - // --------------------------------------------------------------------------- 182 - 183 - async fn handle_procedure( 184 - state: &AppState, 185 - method: &str, 186 - claims: &Claims, 187 - input: &Value, 188 - lexicon: &crate::lexicon::ParsedLexicon, 189 - ) -> Result<Response, AppError> { 190 - let collection = lexicon.target_collection.as_deref().ok_or_else(|| { 191 - AppError::BadRequest(format!("{method} has no target_collection configured")) 192 - })?; 193 - 194 - let session = repo::get_atp_session(state, claims.token()).await?; 195 - 196 - // Determine create vs put based on whether input has a `uri` field. 197 - let has_uri = input.get("uri").and_then(|v| v.as_str()).is_some(); 198 - 199 - if has_uri { 200 - handle_put_record(state, claims, input, collection, &session).await 201 - } else { 202 - handle_create_record(state, claims, input, collection, &session).await 203 - } 204 - } 205 - 206 - async fn handle_create_record( 207 - state: &AppState, 208 - claims: &Claims, 209 - input: &Value, 210 - collection: &str, 211 - session: &repo::AtpSession, 212 - ) -> Result<Response, AppError> { 213 - // Build record from input, adding $type 214 - let mut record = input.clone(); 215 - if let Some(obj) = record.as_object_mut() { 216 - obj.insert("$type".to_string(), json!(collection)); 217 - // Remove fields that are procedure params, not record fields 218 - obj.remove("shouldPublish"); 219 - } 220 - 221 - let pds_body = json!({ 222 - "repo": claims.did(), 223 - "collection": collection, 224 - "record": record, 225 - }); 226 - 227 - let resp = 228 - repo::pds_post_json_raw(state, session, "com.atproto.repo.createRecord", &pds_body).await?; 229 - 230 - if resp.status().is_success() { 231 - let bytes = resp 232 - .bytes() 233 - .await 234 - .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 235 - 236 - let pds_result: Value = serde_json::from_slice(&bytes) 237 - .map_err(|e| AppError::Internal(format!("invalid PDS JSON: {e}")))?; 238 - 239 - if let (Some(uri), Some(cid)) = ( 240 - pds_result.get("uri").and_then(|v| v.as_str()), 241 - pds_result.get("cid").and_then(|v| v.as_str()), 242 - ) { 243 - let rkey = uri.split('/').next_back().unwrap_or_default(); 244 - let _ = sqlx::query( 245 - r#" 246 - INSERT INTO records (uri, did, collection, rkey, record, cid) 247 - VALUES ($1, $2, $3, $4, $5, $6) 248 - ON CONFLICT (uri) DO UPDATE 249 - SET record = EXCLUDED.record, 250 - cid = EXCLUDED.cid 251 - "#, 252 - ) 253 - .bind(uri) 254 - .bind(claims.did()) 255 - .bind(collection) 256 - .bind(rkey) 257 - .bind(&record) 258 - .bind(cid) 259 - .execute(&state.db) 260 - .await; 261 - } 262 - 263 - Ok(( 264 - StatusCode::OK, 265 - [(axum::http::header::CONTENT_TYPE, "application/json")], 266 - bytes, 267 - ) 268 - .into_response()) 269 - } else { 270 - repo::forward_pds_response(resp).await 271 - } 272 - } 273 - 274 - async fn handle_put_record( 275 - state: &AppState, 276 - claims: &Claims, 277 - input: &Value, 278 - collection: &str, 279 - session: &repo::AtpSession, 280 - ) -> Result<Response, AppError> { 281 - let uri = input 282 - .get("uri") 283 - .and_then(|v| v.as_str()) 284 - .ok_or_else(|| AppError::BadRequest("missing uri field".into()))?; 285 - 286 - let rkey = uri 287 - .split('/') 288 - .next_back() 289 - .ok_or_else(|| AppError::Internal("invalid AT URI".into()))?; 290 - 291 - // Build record from input, adding $type 292 - let mut record = input.clone(); 293 - if let Some(obj) = record.as_object_mut() { 294 - obj.insert("$type".to_string(), json!(collection)); 295 - obj.remove("uri"); 296 - obj.remove("shouldPublish"); 297 - } 298 - 299 - let pds_body = json!({ 300 - "repo": claims.did(), 301 - "collection": collection, 302 - "rkey": rkey, 303 - "record": record, 304 - }); 305 - 306 - let resp = 307 - repo::pds_post_json_raw(state, session, "com.atproto.repo.putRecord", &pds_body).await?; 308 - 309 - if resp.status().is_success() { 310 - let bytes = resp 311 - .bytes() 312 - .await 313 - .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 314 - 315 - let pds_result: Value = serde_json::from_slice(&bytes) 316 - .map_err(|e| AppError::Internal(format!("invalid PDS JSON: {e}")))?; 317 - 318 - let cid = pds_result 319 - .get("cid") 320 - .and_then(|v| v.as_str()) 321 - .unwrap_or_default(); 322 - 323 - let _ = sqlx::query( 324 - r#" 325 - INSERT INTO records (uri, did, collection, rkey, record, cid) 326 - VALUES ($1, $2, $3, $4, $5, $6) 327 - ON CONFLICT (uri) DO UPDATE 328 - SET record = EXCLUDED.record, 329 - cid = EXCLUDED.cid, 330 - indexed_at = NOW() 331 - "#, 332 - ) 333 - .bind(uri) 334 - .bind(claims.did()) 335 - .bind(collection) 336 - .bind(rkey) 337 - .bind(&record) 338 - .bind(cid) 339 - .execute(&state.db) 340 - .await; 341 - 342 - Ok(( 343 - StatusCode::OK, 344 - [(axum::http::header::CONTENT_TYPE, "application/json")], 345 - bytes, 346 - ) 347 - .into_response()) 348 - } else { 349 - repo::forward_pds_response(resp).await 350 - } 351 - }
+55
src/xrpc/mod.rs
··· 1 + mod procedure; 2 + mod query; 3 + 4 + use axum::Json; 5 + use axum::extract::{Path, Query, State}; 6 + use axum::response::Response; 7 + use std::collections::HashMap; 8 + 9 + use crate::AppState; 10 + use crate::auth::Claims; 11 + use crate::error::AppError; 12 + use crate::lexicon::LexiconType; 13 + 14 + /// Catch-all GET handler for XRPC queries. 15 + pub async fn xrpc_get( 16 + State(state): State<AppState>, 17 + Path(method): Path<String>, 18 + Query(params): Query<HashMap<String, String>>, 19 + ) -> Result<Response, AppError> { 20 + let lexicon = state 21 + .lexicons 22 + .get(&method) 23 + .await 24 + .ok_or_else(|| AppError::BadRequest(format!("method not found: {method}")))?; 25 + 26 + if lexicon.lexicon_type != LexiconType::Query { 27 + return Err(AppError::BadRequest(format!( 28 + "{method} is not a query endpoint" 29 + ))); 30 + } 31 + 32 + query::handle_query(&state, &method, &params, &lexicon).await 33 + } 34 + 35 + /// Catch-all POST handler for XRPC procedures. 36 + pub async fn xrpc_post( 37 + State(state): State<AppState>, 38 + Path(method): Path<String>, 39 + claims: Claims, 40 + Json(body): Json<serde_json::Value>, 41 + ) -> Result<Response, AppError> { 42 + let lexicon = state 43 + .lexicons 44 + .get(&method) 45 + .await 46 + .ok_or_else(|| AppError::BadRequest(format!("method not found: {method}")))?; 47 + 48 + if lexicon.lexicon_type != LexiconType::Procedure { 49 + return Err(AppError::BadRequest(format!( 50 + "{method} is not a procedure endpoint" 51 + ))); 52 + } 53 + 54 + procedure::handle_procedure(&state, &method, &claims, &body, &lexicon).await 55 + }
+178
src/xrpc/procedure.rs
··· 1 + use axum::http::StatusCode; 2 + use axum::response::{IntoResponse, Response}; 3 + use serde_json::{Value, json}; 4 + 5 + use crate::AppState; 6 + use crate::auth::Claims; 7 + use crate::error::AppError; 8 + use crate::repo; 9 + 10 + pub(super) async fn handle_procedure( 11 + state: &AppState, 12 + method: &str, 13 + claims: &Claims, 14 + input: &Value, 15 + lexicon: &crate::lexicon::ParsedLexicon, 16 + ) -> Result<Response, AppError> { 17 + let collection = lexicon.target_collection.as_deref().ok_or_else(|| { 18 + AppError::BadRequest(format!("{method} has no target_collection configured")) 19 + })?; 20 + 21 + let session = repo::get_atp_session(state, claims.token()).await?; 22 + 23 + // Determine create vs put based on whether input has a `uri` field. 24 + let has_uri = input.get("uri").and_then(|v| v.as_str()).is_some(); 25 + 26 + if has_uri { 27 + handle_put_record(state, claims, input, collection, &session).await 28 + } else { 29 + handle_create_record(state, claims, input, collection, &session).await 30 + } 31 + } 32 + 33 + async fn handle_create_record( 34 + state: &AppState, 35 + claims: &Claims, 36 + input: &Value, 37 + collection: &str, 38 + session: &repo::AtpSession, 39 + ) -> Result<Response, AppError> { 40 + // Build record from input, adding $type 41 + let mut record = input.clone(); 42 + if let Some(obj) = record.as_object_mut() { 43 + obj.insert("$type".to_string(), json!(collection)); 44 + // Remove fields that are procedure params, not record fields 45 + obj.remove("shouldPublish"); 46 + } 47 + 48 + let pds_body = json!({ 49 + "repo": claims.did(), 50 + "collection": collection, 51 + "record": record, 52 + }); 53 + 54 + let resp = 55 + repo::pds_post_json_raw(state, session, "com.atproto.repo.createRecord", &pds_body).await?; 56 + 57 + if resp.status().is_success() { 58 + let bytes = resp 59 + .bytes() 60 + .await 61 + .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 62 + 63 + let pds_result: Value = serde_json::from_slice(&bytes) 64 + .map_err(|e| AppError::Internal(format!("invalid PDS JSON: {e}")))?; 65 + 66 + if let (Some(uri), Some(cid)) = ( 67 + pds_result.get("uri").and_then(|v| v.as_str()), 68 + pds_result.get("cid").and_then(|v| v.as_str()), 69 + ) { 70 + let rkey = uri.split('/').next_back().unwrap_or_default(); 71 + let _ = sqlx::query( 72 + r#" 73 + INSERT INTO records (uri, did, collection, rkey, record, cid) 74 + VALUES ($1, $2, $3, $4, $5, $6) 75 + ON CONFLICT (uri) DO UPDATE 76 + SET record = EXCLUDED.record, 77 + cid = EXCLUDED.cid 78 + "#, 79 + ) 80 + .bind(uri) 81 + .bind(claims.did()) 82 + .bind(collection) 83 + .bind(rkey) 84 + .bind(&record) 85 + .bind(cid) 86 + .execute(&state.db) 87 + .await; 88 + } 89 + 90 + Ok(( 91 + StatusCode::OK, 92 + [(axum::http::header::CONTENT_TYPE, "application/json")], 93 + bytes, 94 + ) 95 + .into_response()) 96 + } else { 97 + repo::forward_pds_response(resp).await 98 + } 99 + } 100 + 101 + async fn handle_put_record( 102 + state: &AppState, 103 + claims: &Claims, 104 + input: &Value, 105 + collection: &str, 106 + session: &repo::AtpSession, 107 + ) -> Result<Response, AppError> { 108 + let uri = input 109 + .get("uri") 110 + .and_then(|v| v.as_str()) 111 + .ok_or_else(|| AppError::BadRequest("missing uri field".into()))?; 112 + 113 + let rkey = uri 114 + .split('/') 115 + .next_back() 116 + .ok_or_else(|| AppError::Internal("invalid AT URI".into()))?; 117 + 118 + // Build record from input, adding $type 119 + let mut record = input.clone(); 120 + if let Some(obj) = record.as_object_mut() { 121 + obj.insert("$type".to_string(), json!(collection)); 122 + obj.remove("uri"); 123 + obj.remove("shouldPublish"); 124 + } 125 + 126 + let pds_body = json!({ 127 + "repo": claims.did(), 128 + "collection": collection, 129 + "rkey": rkey, 130 + "record": record, 131 + }); 132 + 133 + let resp = 134 + repo::pds_post_json_raw(state, session, "com.atproto.repo.putRecord", &pds_body).await?; 135 + 136 + if resp.status().is_success() { 137 + let bytes = resp 138 + .bytes() 139 + .await 140 + .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 141 + 142 + let pds_result: Value = serde_json::from_slice(&bytes) 143 + .map_err(|e| AppError::Internal(format!("invalid PDS JSON: {e}")))?; 144 + 145 + let cid = pds_result 146 + .get("cid") 147 + .and_then(|v| v.as_str()) 148 + .unwrap_or_default(); 149 + 150 + let _ = sqlx::query( 151 + r#" 152 + INSERT INTO records (uri, did, collection, rkey, record, cid) 153 + VALUES ($1, $2, $3, $4, $5, $6) 154 + ON CONFLICT (uri) DO UPDATE 155 + SET record = EXCLUDED.record, 156 + cid = EXCLUDED.cid, 157 + indexed_at = NOW() 158 + "#, 159 + ) 160 + .bind(uri) 161 + .bind(claims.did()) 162 + .bind(collection) 163 + .bind(rkey) 164 + .bind(&record) 165 + .bind(cid) 166 + .execute(&state.db) 167 + .await; 168 + 169 + Ok(( 170 + StatusCode::OK, 171 + [(axum::http::header::CONTENT_TYPE, "application/json")], 172 + bytes, 173 + ) 174 + .into_response()) 175 + } else { 176 + repo::forward_pds_response(resp).await 177 + } 178 + }
+122
src/xrpc/query.rs
··· 1 + use axum::Json; 2 + use axum::response::{IntoResponse, Response}; 3 + use serde_json::{Value, json}; 4 + use std::collections::{HashMap, HashSet}; 5 + 6 + use crate::AppState; 7 + use crate::error::AppError; 8 + use crate::profile; 9 + use crate::repo; 10 + 11 + pub(super) async fn handle_query( 12 + state: &AppState, 13 + method: &str, 14 + params: &HashMap<String, String>, 15 + lexicon: &crate::lexicon::ParsedLexicon, 16 + ) -> Result<Response, AppError> { 17 + // Single-record query: has a `uri` parameter 18 + if let Some(uri) = params.get("uri") { 19 + return handle_get_record(state, uri).await; 20 + } 21 + 22 + // List query: needs a target collection to know what to query 23 + let collection = lexicon.target_collection.as_deref().ok_or_else(|| { 24 + AppError::BadRequest(format!( 25 + "{method} has no target_collection configured for list queries" 26 + )) 27 + })?; 28 + 29 + let limit: i64 = params 30 + .get("limit") 31 + .and_then(|l| l.parse().ok()) 32 + .unwrap_or(20) 33 + .min(100); 34 + 35 + let offset: i64 = params 36 + .get("cursor") 37 + .and_then(|c| c.parse().ok()) 38 + .unwrap_or(0); 39 + 40 + let did = params.get("did"); 41 + 42 + let rows: Vec<(String, String, Value)> = if let Some(did) = did { 43 + sqlx::query_as( 44 + "SELECT uri, did, record FROM records WHERE collection = $1 AND did = $2 ORDER BY indexed_at DESC LIMIT $3 OFFSET $4", 45 + ) 46 + .bind(collection) 47 + .bind(did) 48 + .bind(limit) 49 + .bind(offset) 50 + .fetch_all(&state.db) 51 + .await 52 + .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))? 53 + } else { 54 + sqlx::query_as( 55 + "SELECT uri, did, record FROM records WHERE collection = $1 ORDER BY indexed_at DESC LIMIT $2 OFFSET $3", 56 + ) 57 + .bind(collection) 58 + .bind(limit) 59 + .bind(offset) 60 + .fetch_all(&state.db) 61 + .await 62 + .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))? 63 + }; 64 + 65 + let has_next_page = rows.len() as i64 == limit; 66 + 67 + // Resolve PDS endpoints for blob URL enrichment. 68 + let unique_dids: HashSet<&str> = rows.iter().map(|(_, did, _)| did.as_str()).collect(); 69 + let mut pds_map: HashMap<String, String> = HashMap::new(); 70 + for did in unique_dids { 71 + if let Ok(pds) = 72 + profile::resolve_pds_endpoint(&state.http, &state.config.plc_url, did).await 73 + { 74 + pds_map.insert(did.to_string(), pds); 75 + } 76 + } 77 + 78 + let records: Vec<Value> = rows 79 + .into_iter() 80 + .map(|(uri, did, mut record)| { 81 + if let Some(pds) = pds_map.get(&did) { 82 + repo::enrich_media_blobs(&mut record, pds, &did); 83 + } 84 + record 85 + .as_object_mut() 86 + .map(|obj| obj.insert("uri".to_string(), json!(uri))); 87 + record 88 + }) 89 + .collect(); 90 + 91 + let mut result = json!({ "records": records }); 92 + if has_next_page { 93 + let next_cursor = (offset + limit).to_string(); 94 + result 95 + .as_object_mut() 96 + .unwrap() 97 + .insert("cursor".to_string(), json!(next_cursor)); 98 + } 99 + 100 + Ok(Json(result).into_response()) 101 + } 102 + 103 + pub(super) async fn handle_get_record(state: &AppState, uri: &str) -> Result<Response, AppError> { 104 + let did = repo::parse_did_from_at_uri(uri)?; 105 + 106 + let row: Option<(Value,)> = sqlx::query_as("SELECT record FROM records WHERE uri = $1") 107 + .bind(uri) 108 + .fetch_optional(&state.db) 109 + .await 110 + .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))?; 111 + 112 + let (mut record,) = row.ok_or_else(|| AppError::NotFound("record not found".into()))?; 113 + 114 + let pds = profile::resolve_pds_endpoint(&state.http, &state.config.plc_url, &did).await?; 115 + repo::enrich_media_blobs(&mut record, &pds, &did); 116 + 117 + record 118 + .as_object_mut() 119 + .map(|obj| obj.insert("uri".to_string(), json!(uri))); 120 + 121 + Ok(Json(json!({ "record": record })).into_response()) 122 + }