A lexicon-driven AppView for ATProto. happyview.dev
backfill firehose jetstream atproto appview oauth lexicon
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: catch-all XRPC router replaces hardcoded game routes

Trezy 86c83e41 09bf19e3

+393 -375
+1
migrations/20260215000000_add_target_collection.sql
··· 1 + ALTER TABLE lexicons ADD COLUMN target_collection TEXT;
+8 -5
src/admin.rs
··· 88 88 lexicon_json: Value, 89 89 #[serde(default = "default_backfill")] 90 90 backfill: bool, 91 + target_collection: Option<String>, 91 92 } 92 93 93 94 fn default_backfill() -> bool { ··· 137 138 .to_string(); 138 139 139 140 // Validate it parses correctly 140 - ParsedLexicon::parse(body.lexicon_json.clone(), 1) 141 + ParsedLexicon::parse(body.lexicon_json.clone(), 1, body.target_collection.clone()) 141 142 .map_err(|e| AppError::BadRequest(format!("failed to parse lexicon: {e}")))?; 142 143 143 144 // Upsert into database 144 145 let row: (i32,) = sqlx::query_as( 145 146 r#" 146 - INSERT INTO lexicons (id, lexicon_json, backfill) 147 - VALUES ($1, $2, $3) 147 + INSERT INTO lexicons (id, lexicon_json, backfill, target_collection) 148 + VALUES ($1, $2, $3, $4) 148 149 ON CONFLICT (id) DO UPDATE SET 149 150 lexicon_json = EXCLUDED.lexicon_json, 150 151 backfill = EXCLUDED.backfill, 152 + target_collection = EXCLUDED.target_collection, 151 153 revision = lexicons.revision + 1, 152 154 updated_at = NOW() 153 155 RETURNING revision ··· 156 158 .bind(&id) 157 159 .bind(&body.lexicon_json) 158 160 .bind(body.backfill) 161 + .bind(&body.target_collection) 159 162 .fetch_one(&state.db) 160 163 .await 161 164 .map_err(|e| AppError::Internal(format!("failed to upsert lexicon: {e}")))?; ··· 163 166 let revision = row.0; 164 167 165 168 // Update in-memory registry with correct revision 166 - let parsed = ParsedLexicon::parse(body.lexicon_json, revision) 169 + let parsed = ParsedLexicon::parse(body.lexicon_json, revision, body.target_collection) 167 170 .map_err(|e| AppError::Internal(format!("failed to re-parse lexicon: {e}")))?; 168 171 let is_record = parsed.lexicon_type == LexiconType::Record; 169 172 state.lexicons.upsert(parsed).await; ··· 203 206 let summaries: Vec<LexiconSummary> = rows 204 207 .into_iter() 205 208 .map(|(id, revision, json, backfill, created_at, updated_at)| { 206 - let lexicon_type = ParsedLexicon::parse(json, revision) 209 + let lexicon_type = ParsedLexicon::parse(json, revision, None) 207 210 .map(|p| format!("{:?}", p.lexicon_type).to_lowercase()) 208 211 .unwrap_or_else(|_| "unknown".into()); 209 212
+8 -5
src/lexicon.rs
··· 37 37 pub raw: Value, 38 38 /// Database revision number. 39 39 pub revision: i32, 40 + /// For queries/procedures: the backing record collection NSID. 41 + pub target_collection: Option<String>, 40 42 } 41 43 42 44 impl ParsedLexicon { 43 45 /// Parse a lexicon JSON document into a `ParsedLexicon`. 44 - pub fn parse(raw: Value, revision: i32) -> Result<Self, String> { 46 + pub fn parse(raw: Value, revision: i32, target_collection: Option<String>) -> Result<Self, String> { 45 47 let id = raw 46 48 .get("id") 47 49 .and_then(|v| v.as_str()) ··· 81 83 record_schema, 82 84 raw, 83 85 revision, 86 + target_collection, 84 87 }) 85 88 } 86 89 } ··· 100 103 101 104 /// Load all lexicons from the database, replacing any existing entries. 102 105 pub async fn load_from_db(&self, db: &sqlx::PgPool) -> Result<(), String> { 103 - let rows: Vec<(String, Value, i32)> = 104 - sqlx::query_as("SELECT id, lexicon_json, revision FROM lexicons") 106 + let rows: Vec<(String, Value, i32, Option<String>)> = 107 + sqlx::query_as("SELECT id, lexicon_json, revision, target_collection FROM lexicons") 105 108 .fetch_all(db) 106 109 .await 107 110 .map_err(|e| format!("failed to load lexicons: {e}"))?; ··· 110 113 inner.clear(); 111 114 112 115 let mut loaded = 0u32; 113 - for (id, json, revision) in rows { 114 - match ParsedLexicon::parse(json, revision) { 116 + for (id, json, revision, target_collection) in rows { 117 + match ParsedLexicon::parse(json, revision, target_collection) { 115 118 Ok(parsed) => { 116 119 inner.insert(id, parsed); 117 120 loaded += 1;
+1
src/main.rs
··· 7 7 mod profile; 8 8 mod repo; 9 9 mod server; 10 + mod xrpc; 10 11 11 12 use config::Config; 12 13 use lexicon::LexiconRegistry;
+15 -349
src/repo.rs
··· 1 1 use axum::body::Bytes; 2 - use axum::extract::{Query, State}; 2 + use axum::extract::State; 3 3 use axum::http::{HeaderMap, StatusCode}; 4 4 use axum::response::{IntoResponse, Response}; 5 - use axum::Json; 6 5 use base64::Engine; 7 6 use p256::pkcs8::EncodePrivateKey; 8 7 use serde::Deserialize; ··· 11 10 12 11 use crate::auth::Claims; 13 12 use crate::error::AppError; 14 - use crate::profile; 15 13 use crate::AppState; 16 - 17 - const COLLECTION: &str = "games.gamesgamesgamesgames.game"; 18 14 19 15 // --------------------------------------------------------------------------- 20 16 // AT URI parsing 21 17 // --------------------------------------------------------------------------- 22 18 23 19 /// Extract the DID from an AT URI (at://did/collection/rkey). 24 - fn parse_did_from_at_uri(uri: &str) -> Result<String, AppError> { 20 + pub(crate) fn parse_did_from_at_uri(uri: &str) -> Result<String, AppError> { 25 21 let stripped = uri 26 22 .strip_prefix("at://") 27 23 .ok_or_else(|| AppError::Internal("AT URI must start with at://".into()))?; ··· 38 34 // --------------------------------------------------------------------------- 39 35 40 36 #[derive(Deserialize)] 41 - struct AtpSession { 42 - access_token: String, 43 - pds_endpoint: String, 44 - dpop_jwk: DpopJwk, 37 + pub(crate) struct AtpSession { 38 + pub(crate) access_token: String, 39 + pub(crate) pds_endpoint: String, 40 + pub(crate) dpop_jwk: DpopJwk, 45 41 } 46 42 47 43 #[derive(Deserialize)] 48 - struct DpopJwk { 49 - x: String, 50 - y: String, 51 - d: String, 44 + pub(crate) struct DpopJwk { 45 + pub(crate) x: String, 46 + pub(crate) y: String, 47 + pub(crate) d: String, 52 48 } 53 49 54 50 // --------------------------------------------------------------------------- ··· 68 64 } 69 65 70 66 /// Fetch the user's AT Protocol session (PDS credentials) from AIP. 71 - async fn get_atp_session(state: &AppState, token: &str) -> Result<AtpSession, AppError> { 67 + pub(crate) async fn get_atp_session(state: &AppState, token: &str) -> Result<AtpSession, AppError> { 72 68 let url = format!( 73 69 "{}/api/atprotocol/session", 74 70 state.config.aip_url.trim_end_matches('/') ··· 95 91 } 96 92 97 93 /// Generate a DPoP proof JWT for a PDS request. 98 - fn generate_dpop_proof( 94 + pub(crate) fn generate_dpop_proof( 99 95 method: &str, 100 96 url: &str, 101 97 dpop_jwk: &DpopJwk, ··· 167 163 // --------------------------------------------------------------------------- 168 164 169 165 /// Forward a PDS response back to the client, preserving status and body. 170 - async fn forward_pds_response(resp: reqwest::Response) -> Result<Response, AppError> { 166 + pub(crate) async fn forward_pds_response(resp: reqwest::Response) -> Result<Response, AppError> { 171 167 let status = resp.status(); 172 168 let body = resp 173 169 .bytes() ··· 192 188 193 189 /// POST JSON to a PDS XRPC endpoint with DPoP auth and nonce retry. 194 190 /// Returns the raw reqwest::Response so callers can inspect the body. 195 - async fn pds_post_json_raw( 191 + pub(crate) async fn pds_post_json_raw( 196 192 state: &AppState, 197 193 session: &AtpSession, 198 194 xrpc_method: &str, ··· 314 310 // Public handlers 315 311 // --------------------------------------------------------------------------- 316 312 317 - pub async fn create_game( 318 - State(state): State<AppState>, 319 - claims: Claims, 320 - Json(input): Json<Value>, 321 - ) -> Result<Response, AppError> { 322 - let session = get_atp_session(&state, claims.token()).await?; 323 - 324 - let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true); 325 - 326 - let mut record = json!({ 327 - "$type": COLLECTION, 328 - "name": input["name"], 329 - "createdAt": now, 330 - }); 331 - 332 - let rec = record.as_object_mut().unwrap(); 333 - 334 - for key in &[ 335 - "summary", 336 - "applicationType", 337 - "genres", 338 - "modes", 339 - "themes", 340 - "playerPerspectives", 341 - "releases", 342 - "media", 343 - "parent", 344 - ] { 345 - if let Some(val) = input.get(*key) { 346 - if !val.is_null() { 347 - rec.insert((*key).to_string(), val.clone()); 348 - } 349 - } 350 - } 351 - 352 - if input.get("shouldPublish").and_then(|v| v.as_bool()) == Some(true) { 353 - rec.insert("publishedAt".to_string(), json!(now)); 354 - } 355 - 356 - let pds_body = json!({ 357 - "repo": claims.did(), 358 - "collection": COLLECTION, 359 - "record": record, 360 - }); 361 - 362 - let resp = pds_post_json_raw(&state, &session, "com.atproto.repo.createRecord", &pds_body).await?; 363 - 364 - if resp.status().is_success() { 365 - let bytes = resp 366 - .bytes() 367 - .await 368 - .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 369 - 370 - let pds_result: Value = serde_json::from_slice(&bytes) 371 - .map_err(|e| AppError::Internal(format!("invalid PDS JSON: {e}")))?; 372 - 373 - if let (Some(uri), Some(cid)) = ( 374 - pds_result.get("uri").and_then(|v| v.as_str()), 375 - pds_result.get("cid").and_then(|v| v.as_str()), 376 - ) { 377 - let rkey = uri.split('/').last().unwrap_or_default(); 378 - let _ = sqlx::query( 379 - r#" 380 - INSERT INTO records (uri, did, collection, rkey, record, cid) 381 - VALUES ($1, $2, $3, $4, $5, $6) 382 - ON CONFLICT (uri) DO UPDATE 383 - SET record = EXCLUDED.record, 384 - cid = EXCLUDED.cid 385 - "#, 386 - ) 387 - .bind(uri) 388 - .bind(claims.did()) 389 - .bind(COLLECTION) 390 - .bind(rkey) 391 - .bind(&record) 392 - .bind(cid) 393 - .execute(&state.db) 394 - .await; 395 - } 396 - 397 - Ok(( 398 - StatusCode::OK, 399 - [(axum::http::header::CONTENT_TYPE, "application/json")], 400 - bytes, 401 - ) 402 - .into_response()) 403 - } else { 404 - forward_pds_response(resp).await 405 - } 406 - } 407 - 408 - pub async fn put_game( 409 - State(state): State<AppState>, 410 - claims: Claims, 411 - Json(input): Json<Value>, 412 - ) -> Result<Response, AppError> { 413 - let uri = input 414 - .get("uri") 415 - .and_then(|v| v.as_str()) 416 - .ok_or_else(|| AppError::Auth("missing uri field".into()))?; 417 - 418 - // Extract rkey from AT URI: at://did/collection/rkey 419 - let rkey = uri 420 - .split('/') 421 - .last() 422 - .ok_or_else(|| AppError::Internal("invalid AT URI".into()))?; 423 - 424 - let session = get_atp_session(&state, claims.token()).await?; 425 - 426 - let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true); 427 - 428 - let created_at = input 429 - .get("createdAt") 430 - .and_then(|v| v.as_str()) 431 - .unwrap_or(&now); 432 - 433 - let mut record = json!({ 434 - "$type": COLLECTION, 435 - "name": input["name"], 436 - "createdAt": created_at, 437 - }); 438 - 439 - let rec = record.as_object_mut().unwrap(); 440 - 441 - for key in &[ 442 - "summary", 443 - "applicationType", 444 - "genres", 445 - "modes", 446 - "themes", 447 - "playerPerspectives", 448 - "releases", 449 - "media", 450 - "parent", 451 - ] { 452 - if let Some(val) = input.get(*key) { 453 - if !val.is_null() { 454 - rec.insert((*key).to_string(), val.clone()); 455 - } 456 - } 457 - } 458 - 459 - if input.get("shouldPublish").and_then(|v| v.as_bool()) == Some(true) { 460 - rec.insert("publishedAt".to_string(), json!(now)); 461 - } 462 - 463 - let pds_body = json!({ 464 - "repo": claims.did(), 465 - "collection": COLLECTION, 466 - "rkey": rkey, 467 - "record": record, 468 - }); 469 - 470 - let resp = pds_post_json_raw(&state, &session, "com.atproto.repo.putRecord", &pds_body).await?; 471 - 472 - if resp.status().is_success() { 473 - let bytes = resp 474 - .bytes() 475 - .await 476 - .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 477 - 478 - let pds_result: Value = serde_json::from_slice(&bytes) 479 - .map_err(|e| AppError::Internal(format!("invalid PDS JSON: {e}")))?; 480 - 481 - let cid = pds_result 482 - .get("cid") 483 - .and_then(|v| v.as_str()) 484 - .unwrap_or_default(); 485 - 486 - let _ = sqlx::query( 487 - r#" 488 - INSERT INTO records (uri, did, collection, rkey, record, cid) 489 - VALUES ($1, $2, $3, $4, $5, $6) 490 - ON CONFLICT (uri) DO UPDATE 491 - SET record = EXCLUDED.record, 492 - cid = EXCLUDED.cid, 493 - indexed_at = NOW() 494 - "#, 495 - ) 496 - .bind(uri) 497 - .bind(claims.did()) 498 - .bind(COLLECTION) 499 - .bind(rkey) 500 - .bind(&record) 501 - .bind(cid) 502 - .execute(&state.db) 503 - .await; 504 - 505 - Ok(( 506 - StatusCode::OK, 507 - [(axum::http::header::CONTENT_TYPE, "application/json")], 508 - bytes, 509 - ) 510 - .into_response()) 511 - } else { 512 - forward_pds_response(resp).await 513 - } 514 - } 515 - 516 313 pub async fn upload_blob( 517 314 State(state): State<AppState>, 518 315 claims: Claims, ··· 529 326 pds_post_blob(&state, &session, content_type, body).await 530 327 } 531 328 532 - // --------------------------------------------------------------------------- 533 - // getGame (public, unauthenticated) 534 - // --------------------------------------------------------------------------- 535 - 536 - #[derive(Deserialize)] 537 - pub struct GetGameParams { 538 - uri: String, 539 - } 540 - 541 - pub async fn get_game( 542 - State(state): State<AppState>, 543 - Query(params): Query<GetGameParams>, 544 - ) -> Result<Json<Value>, AppError> { 545 - let did = parse_did_from_at_uri(&params.uri)?; 546 - 547 - let row: Option<(Value,)> = sqlx::query_as( 548 - "SELECT record FROM records WHERE uri = $1", 549 - ) 550 - .bind(&params.uri) 551 - .fetch_optional(&state.db) 552 - .await 553 - .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))?; 554 - 555 - let (mut record,) = row 556 - .ok_or_else(|| AppError::NotFound("game record not found".into()))?; 557 - 558 - let pds = profile::resolve_pds_endpoint(&state.http, &did).await?; 559 - enrich_media_blobs(&mut record, &pds, &did); 560 - 561 - record 562 - .as_object_mut() 563 - .unwrap() 564 - .insert("uri".to_string(), json!(params.uri)); 565 - 566 - Ok(Json(json!({ "game": record }))) 567 - } 568 - 569 - // --------------------------------------------------------------------------- 570 - // listGames (public, unauthenticated) 571 - // --------------------------------------------------------------------------- 572 - 573 - #[derive(Deserialize)] 574 - pub struct ListGamesParams { 575 - cursor: Option<String>, 576 - did: Option<String>, 577 - limit: Option<i64>, 578 - } 579 - 580 - pub async fn list_games( 581 - State(state): State<AppState>, 582 - Query(params): Query<ListGamesParams>, 583 - ) -> Result<Json<Value>, AppError> { 584 - let limit = params.limit.unwrap_or(20).min(100); 585 - let offset: i64 = params 586 - .cursor 587 - .as_deref() 588 - .and_then(|c| c.parse().ok()) 589 - .unwrap_or(0); 590 - 591 - let rows: Vec<(String, String, Value)> = if let Some(ref did) = params.did { 592 - sqlx::query_as( 593 - "SELECT uri, did, record FROM records WHERE collection = $1 AND did = $2 ORDER BY indexed_at DESC LIMIT $3 OFFSET $4", 594 - ) 595 - .bind(COLLECTION) 596 - .bind(did) 597 - .bind(limit) 598 - .bind(offset) 599 - .fetch_all(&state.db) 600 - .await 601 - .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))? 602 - } else { 603 - sqlx::query_as( 604 - "SELECT uri, did, record FROM records WHERE collection = $1 ORDER BY indexed_at DESC LIMIT $2 OFFSET $3", 605 - ) 606 - .bind(COLLECTION) 607 - .bind(limit) 608 - .bind(offset) 609 - .fetch_all(&state.db) 610 - .await 611 - .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))? 612 - }; 613 - 614 - let has_next_page = rows.len() as i64 == limit; 615 - 616 - // Collect unique DIDs and resolve their PDS endpoints for blob URL enrichment. 617 - let unique_dids: std::collections::HashSet<&str> = rows.iter().map(|(_, did, _)| did.as_str()).collect(); 618 - let mut pds_map: std::collections::HashMap<String, String> = std::collections::HashMap::new(); 619 - for did in unique_dids { 620 - if let Ok(pds) = profile::resolve_pds_endpoint(&state.http, did).await { 621 - pds_map.insert(did.to_string(), pds); 622 - } 623 - } 624 - 625 - let games: Vec<Value> = rows 626 - .into_iter() 627 - .filter_map(|(uri, did, record)| { 628 - let mut record = record; 629 - if let Some(pds) = pds_map.get(&did) { 630 - enrich_media_blobs(&mut record, pds, &did); 631 - } 632 - 633 - let name = record.get("name")?.as_str()?.to_string(); 634 - let summary = record.get("summary").and_then(|s| s.as_str()).map(|s| s.to_string()); 635 - let media = record.get("media").cloned(); 636 - 637 - let mut game = json!({ 638 - "uri": uri, 639 - "name": name, 640 - }); 641 - 642 - let obj = game.as_object_mut().unwrap(); 643 - if let Some(summary) = summary { 644 - obj.insert("summary".to_string(), json!(summary)); 645 - } 646 - if let Some(media) = media { 647 - obj.insert("media".to_string(), media); 648 - } 649 - 650 - Some(game) 651 - }) 652 - .collect(); 653 - 654 - let mut result = json!({ "games": games }); 655 - if has_next_page { 656 - let next_cursor = (offset + limit).to_string(); 657 - result.as_object_mut().unwrap().insert("cursor".to_string(), json!(next_cursor)); 658 - } 659 - 660 - Ok(Json(result)) 661 - } 662 - 663 329 /// Walk `media[]` and add a `url` field to each blob so the frontend can 664 330 /// display images directly. 665 - fn enrich_media_blobs(record: &mut Value, pds: &str, did: &str) { 331 + pub(crate) fn enrich_media_blobs(record: &mut Value, pds: &str, did: &str) { 666 332 let media = match record.get_mut("media").and_then(|m| m.as_array_mut()) { 667 333 Some(arr) => arr, 668 334 None => return,
+3 -16
src/server.rs
··· 9 9 use crate::error::AppError; 10 10 use crate::profile; 11 11 use crate::repo; 12 + use crate::xrpc; 12 13 use crate::AppState; 13 14 14 15 pub fn router(state: AppState) -> Router { ··· 17 18 .nest("/admin", admin::admin_routes(state.clone())) 18 19 .route("/xrpc/app.bsky.actor.getProfile", get(get_profile)) 19 20 .route( 20 - "/xrpc/games.gamesgamesgamesgames.createGame", 21 - post(repo::create_game), 22 - ) 23 - .route( 24 - "/xrpc/games.gamesgamesgamesgames.getGame", 25 - get(repo::get_game), 26 - ) 27 - .route( 28 - "/xrpc/games.gamesgamesgamesgames.listGames", 29 - get(repo::list_games), 30 - ) 31 - .route( 32 - "/xrpc/games.gamesgamesgamesgames.putGame", 33 - post(repo::put_game), 34 - ) 35 - .route( 36 21 "/xrpc/com.atproto.repo.uploadBlob", 37 22 post(repo::upload_blob).layer(DefaultBodyLimit::max(50 * 1024 * 1024)), 38 23 ) 24 + // Catch-all for dynamically registered lexicons 25 + .route("/xrpc/{method}", get(xrpc::xrpc_get).post(xrpc::xrpc_post)) 39 26 .layer(TraceLayer::new_for_http()) 40 27 .layer(CorsLayer::permissive()) 41 28 .with_state(state)
+357
src/xrpc.rs
··· 1 + use axum::extract::{Path, Query, State}; 2 + use axum::http::StatusCode; 3 + use axum::response::{IntoResponse, Response}; 4 + use axum::Json; 5 + use serde_json::{json, Value}; 6 + use std::collections::{HashMap, HashSet}; 7 + 8 + use crate::auth::Claims; 9 + use crate::error::AppError; 10 + use crate::lexicon::LexiconType; 11 + use crate::profile; 12 + use crate::repo; 13 + use crate::AppState; 14 + 15 + // --------------------------------------------------------------------------- 16 + // Catch-all handler 17 + // --------------------------------------------------------------------------- 18 + 19 + /// Catch-all GET handler for XRPC queries. 20 + pub async fn xrpc_get( 21 + State(state): State<AppState>, 22 + Path(method): Path<String>, 23 + Query(params): Query<HashMap<String, String>>, 24 + ) -> Result<Response, AppError> { 25 + let lexicon = state 26 + .lexicons 27 + .get(&method) 28 + .await 29 + .ok_or_else(|| AppError::BadRequest(format!("method not found: {method}")))?; 30 + 31 + if lexicon.lexicon_type != LexiconType::Query { 32 + return Err(AppError::BadRequest(format!( 33 + "{method} is not a query endpoint" 34 + ))); 35 + } 36 + 37 + handle_query(&state, &method, &params, &lexicon).await 38 + } 39 + 40 + /// Catch-all POST handler for XRPC procedures. 41 + pub async fn xrpc_post( 42 + State(state): State<AppState>, 43 + Path(method): Path<String>, 44 + claims: Claims, 45 + Json(body): Json<Value>, 46 + ) -> Result<Response, AppError> { 47 + let lexicon = state 48 + .lexicons 49 + .get(&method) 50 + .await 51 + .ok_or_else(|| AppError::BadRequest(format!("method not found: {method}")))?; 52 + 53 + if lexicon.lexicon_type != LexiconType::Procedure { 54 + return Err(AppError::BadRequest(format!( 55 + "{method} is not a procedure endpoint" 56 + ))); 57 + } 58 + 59 + handle_procedure(&state, &method, &claims, &body, &lexicon).await 60 + } 61 + 62 + // --------------------------------------------------------------------------- 63 + // Generic query handler 64 + // --------------------------------------------------------------------------- 65 + 66 + async fn handle_query( 67 + state: &AppState, 68 + method: &str, 69 + params: &HashMap<String, String>, 70 + lexicon: &crate::lexicon::ParsedLexicon, 71 + ) -> Result<Response, AppError> { 72 + // Single-record query: has a `uri` parameter 73 + if let Some(uri) = params.get("uri") { 74 + return handle_get_record(state, uri).await; 75 + } 76 + 77 + // List query: needs a target collection to know what to query 78 + let collection = lexicon 79 + .target_collection 80 + .as_deref() 81 + .ok_or_else(|| { 82 + AppError::BadRequest(format!( 83 + "{method} has no target_collection configured for list queries" 84 + )) 85 + })?; 86 + 87 + let limit: i64 = params 88 + .get("limit") 89 + .and_then(|l| l.parse().ok()) 90 + .unwrap_or(20) 91 + .min(100); 92 + 93 + let offset: i64 = params 94 + .get("cursor") 95 + .and_then(|c| c.parse().ok()) 96 + .unwrap_or(0); 97 + 98 + let did = params.get("did"); 99 + 100 + let rows: Vec<(String, String, Value)> = if let Some(did) = did { 101 + sqlx::query_as( 102 + "SELECT uri, did, record FROM records WHERE collection = $1 AND did = $2 ORDER BY indexed_at DESC LIMIT $3 OFFSET $4", 103 + ) 104 + .bind(collection) 105 + .bind(did) 106 + .bind(limit) 107 + .bind(offset) 108 + .fetch_all(&state.db) 109 + .await 110 + .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))? 111 + } else { 112 + sqlx::query_as( 113 + "SELECT uri, did, record FROM records WHERE collection = $1 ORDER BY indexed_at DESC LIMIT $2 OFFSET $3", 114 + ) 115 + .bind(collection) 116 + .bind(limit) 117 + .bind(offset) 118 + .fetch_all(&state.db) 119 + .await 120 + .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))? 121 + }; 122 + 123 + let has_next_page = rows.len() as i64 == limit; 124 + 125 + // Resolve PDS endpoints for blob URL enrichment. 126 + let unique_dids: HashSet<&str> = rows.iter().map(|(_, did, _)| did.as_str()).collect(); 127 + let mut pds_map: HashMap<String, String> = HashMap::new(); 128 + for did in unique_dids { 129 + if let Ok(pds) = profile::resolve_pds_endpoint(&state.http, did).await { 130 + pds_map.insert(did.to_string(), pds); 131 + } 132 + } 133 + 134 + let records: Vec<Value> = rows 135 + .into_iter() 136 + .map(|(uri, did, mut record)| { 137 + if let Some(pds) = pds_map.get(&did) { 138 + repo::enrich_media_blobs(&mut record, pds, &did); 139 + } 140 + record 141 + .as_object_mut() 142 + .map(|obj| obj.insert("uri".to_string(), json!(uri))); 143 + record 144 + }) 145 + .collect(); 146 + 147 + let mut result = json!({ "records": records }); 148 + if has_next_page { 149 + let next_cursor = (offset + limit).to_string(); 150 + result 151 + .as_object_mut() 152 + .unwrap() 153 + .insert("cursor".to_string(), json!(next_cursor)); 154 + } 155 + 156 + Ok(Json(result).into_response()) 157 + } 158 + 159 + async fn handle_get_record(state: &AppState, uri: &str) -> Result<Response, AppError> { 160 + let did = repo::parse_did_from_at_uri(uri)?; 161 + 162 + let row: Option<(Value,)> = 163 + sqlx::query_as("SELECT record FROM records WHERE uri = $1") 164 + .bind(uri) 165 + .fetch_optional(&state.db) 166 + .await 167 + .map_err(|e| AppError::Internal(format!("DB query failed: {e}")))?; 168 + 169 + let (mut record,) = 170 + row.ok_or_else(|| AppError::NotFound("record not found".into()))?; 171 + 172 + let pds = profile::resolve_pds_endpoint(&state.http, &did).await?; 173 + repo::enrich_media_blobs(&mut record, &pds, &did); 174 + 175 + record 176 + .as_object_mut() 177 + .map(|obj| obj.insert("uri".to_string(), json!(uri))); 178 + 179 + Ok(Json(json!({ "record": record })).into_response()) 180 + } 181 + 182 + // --------------------------------------------------------------------------- 183 + // Generic procedure handler 184 + // --------------------------------------------------------------------------- 185 + 186 + async fn handle_procedure( 187 + state: &AppState, 188 + method: &str, 189 + claims: &Claims, 190 + input: &Value, 191 + lexicon: &crate::lexicon::ParsedLexicon, 192 + ) -> Result<Response, AppError> { 193 + let collection = lexicon 194 + .target_collection 195 + .as_deref() 196 + .ok_or_else(|| { 197 + AppError::BadRequest(format!( 198 + "{method} has no target_collection configured" 199 + )) 200 + })?; 201 + 202 + let session = repo::get_atp_session(state, claims.token()).await?; 203 + 204 + // Determine create vs put based on whether input has a `uri` field. 205 + let has_uri = input.get("uri").and_then(|v| v.as_str()).is_some(); 206 + 207 + if has_uri { 208 + handle_put_record(state, claims, input, collection, &session).await 209 + } else { 210 + handle_create_record(state, claims, input, collection, &session).await 211 + } 212 + } 213 + 214 + async fn handle_create_record( 215 + state: &AppState, 216 + claims: &Claims, 217 + input: &Value, 218 + collection: &str, 219 + session: &repo::AtpSession, 220 + ) -> Result<Response, AppError> { 221 + // Build record from input, adding $type 222 + let mut record = input.clone(); 223 + if let Some(obj) = record.as_object_mut() { 224 + obj.insert("$type".to_string(), json!(collection)); 225 + // Remove fields that are procedure params, not record fields 226 + obj.remove("shouldPublish"); 227 + } 228 + 229 + let pds_body = json!({ 230 + "repo": claims.did(), 231 + "collection": collection, 232 + "record": record, 233 + }); 234 + 235 + let resp = repo::pds_post_json_raw(state, session, "com.atproto.repo.createRecord", &pds_body).await?; 236 + 237 + if resp.status().is_success() { 238 + let bytes = resp 239 + .bytes() 240 + .await 241 + .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 242 + 243 + let pds_result: Value = serde_json::from_slice(&bytes) 244 + .map_err(|e| AppError::Internal(format!("invalid PDS JSON: {e}")))?; 245 + 246 + if let (Some(uri), Some(cid)) = ( 247 + pds_result.get("uri").and_then(|v| v.as_str()), 248 + pds_result.get("cid").and_then(|v| v.as_str()), 249 + ) { 250 + let rkey = uri.split('/').last().unwrap_or_default(); 251 + let _ = sqlx::query( 252 + r#" 253 + INSERT INTO records (uri, did, collection, rkey, record, cid) 254 + VALUES ($1, $2, $3, $4, $5, $6) 255 + ON CONFLICT (uri) DO UPDATE 256 + SET record = EXCLUDED.record, 257 + cid = EXCLUDED.cid 258 + "#, 259 + ) 260 + .bind(uri) 261 + .bind(claims.did()) 262 + .bind(collection) 263 + .bind(rkey) 264 + .bind(&record) 265 + .bind(cid) 266 + .execute(&state.db) 267 + .await; 268 + } 269 + 270 + Ok(( 271 + StatusCode::OK, 272 + [(axum::http::header::CONTENT_TYPE, "application/json")], 273 + bytes, 274 + ) 275 + .into_response()) 276 + } else { 277 + repo::forward_pds_response(resp).await 278 + } 279 + } 280 + 281 + async fn handle_put_record( 282 + state: &AppState, 283 + claims: &Claims, 284 + input: &Value, 285 + collection: &str, 286 + session: &repo::AtpSession, 287 + ) -> Result<Response, AppError> { 288 + let uri = input 289 + .get("uri") 290 + .and_then(|v| v.as_str()) 291 + .ok_or_else(|| AppError::BadRequest("missing uri field".into()))?; 292 + 293 + let rkey = uri 294 + .split('/') 295 + .last() 296 + .ok_or_else(|| AppError::Internal("invalid AT URI".into()))?; 297 + 298 + // Build record from input, adding $type 299 + let mut record = input.clone(); 300 + if let Some(obj) = record.as_object_mut() { 301 + obj.insert("$type".to_string(), json!(collection)); 302 + obj.remove("uri"); 303 + obj.remove("shouldPublish"); 304 + } 305 + 306 + let pds_body = json!({ 307 + "repo": claims.did(), 308 + "collection": collection, 309 + "rkey": rkey, 310 + "record": record, 311 + }); 312 + 313 + let resp = repo::pds_post_json_raw(state, session, "com.atproto.repo.putRecord", &pds_body).await?; 314 + 315 + if resp.status().is_success() { 316 + let bytes = resp 317 + .bytes() 318 + .await 319 + .map_err(|e| AppError::Internal(format!("failed to read PDS response: {e}")))?; 320 + 321 + let pds_result: Value = serde_json::from_slice(&bytes) 322 + .map_err(|e| AppError::Internal(format!("invalid PDS JSON: {e}")))?; 323 + 324 + let cid = pds_result 325 + .get("cid") 326 + .and_then(|v| v.as_str()) 327 + .unwrap_or_default(); 328 + 329 + let _ = sqlx::query( 330 + r#" 331 + INSERT INTO records (uri, did, collection, rkey, record, cid) 332 + VALUES ($1, $2, $3, $4, $5, $6) 333 + ON CONFLICT (uri) DO UPDATE 334 + SET record = EXCLUDED.record, 335 + cid = EXCLUDED.cid, 336 + indexed_at = NOW() 337 + "#, 338 + ) 339 + .bind(uri) 340 + .bind(claims.did()) 341 + .bind(collection) 342 + .bind(rkey) 343 + .bind(&record) 344 + .bind(cid) 345 + .execute(&state.db) 346 + .await; 347 + 348 + Ok(( 349 + StatusCode::OK, 350 + [(axum::http::header::CONTENT_TYPE, "application/json")], 351 + bytes, 352 + ) 353 + .into_response()) 354 + } else { 355 + repo::forward_pds_response(resp).await 356 + } 357 + }