A lexicon-driven AppView for ATProto. happyview.dev
backfill firehose jetstream atproto appview oauth lexicon
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

test: add unit tests and make codebase testable

Trezy c0fe3da2 f2ae8226

+862 -38
+140
Cargo.lock
··· 33 33 checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" 34 34 35 35 [[package]] 36 + name = "assert-json-diff" 37 + version = "2.0.2" 38 + source = "registry+https://github.com/rust-lang/crates.io-index" 39 + checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" 40 + dependencies = [ 41 + "serde", 42 + "serde_json", 43 + ] 44 + 45 + [[package]] 36 46 name = "atoi" 37 47 version = "2.0.0" 38 48 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 288 298 checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" 289 299 290 300 [[package]] 301 + name = "deadpool" 302 + version = "0.12.3" 303 + source = "registry+https://github.com/rust-lang/crates.io-index" 304 + checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b" 305 + dependencies = [ 306 + "deadpool-runtime", 307 + "lazy_static", 308 + "num_cpus", 309 + "tokio", 310 + ] 311 + 312 + [[package]] 313 + name = "deadpool-runtime" 314 + version = "0.1.4" 315 + source = "registry+https://github.com/rust-lang/crates.io-index" 316 + checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" 317 + 318 + [[package]] 291 319 name = "der" 292 320 version = "0.7.10" 293 321 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 496 524 ] 497 525 498 526 [[package]] 527 + name = "futures" 528 + version = "0.3.31" 529 + source = "registry+https://github.com/rust-lang/crates.io-index" 530 + checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" 531 + dependencies = [ 532 + "futures-channel", 533 + "futures-core", 534 + "futures-executor", 535 + "futures-io", 536 + "futures-sink", 537 + "futures-task", 538 + "futures-util", 539 + ] 540 + 541 + [[package]] 499 542 name = "futures-channel" 500 543 version = "0.3.31" 501 544 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 568 611 source = "registry+https://github.com/rust-lang/crates.io-index" 569 612 checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" 570 613 dependencies = [ 614 + "futures-channel", 571 615 "futures-core", 572 616 "futures-io", 573 617 "futures-macro", ··· 669 713 "dotenvy", 670 714 "futures-util", 671 715 "hex", 716 + "http-body-util", 672 717 "jsonwebtoken", 673 718 "p256", 674 719 "reqwest", 675 720 "serde", 676 721 "serde_json", 722 + "serial_test", 677 723 "sha2", 678 724 "sqlx", 679 725 "tokio", 680 726 "tokio-tungstenite", 727 + "tower", 681 728 "tower-http", 682 729 "tracing", 683 730 "tracing-subscriber", 684 731 "uuid", 732 + "wiremock", 685 733 ] 686 734 687 735 [[package]] ··· 715 763 version = "0.5.0" 716 764 source = "registry+https://github.com/rust-lang/crates.io-index" 717 765 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 766 + 767 + [[package]] 768 + name = "hermit-abi" 769 + version = "0.5.2" 770 + source = "registry+https://github.com/rust-lang/crates.io-index" 771 + checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" 718 772 719 773 [[package]] 720 774 name = "hex" ··· 1277 1331 ] 1278 1332 1279 1333 [[package]] 1334 + name = "num_cpus" 1335 + version = "1.17.0" 1336 + source = "registry+https://github.com/rust-lang/crates.io-index" 1337 + checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" 1338 + dependencies = [ 1339 + "hermit-abi", 1340 + "libc", 1341 + ] 1342 + 1343 + [[package]] 1280 1344 name = "once_cell" 1281 1345 version = "1.21.3" 1282 1346 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1576 1640 ] 1577 1641 1578 1642 [[package]] 1643 + name = "regex" 1644 + version = "1.12.3" 1645 + source = "registry+https://github.com/rust-lang/crates.io-index" 1646 + checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" 1647 + dependencies = [ 1648 + "aho-corasick", 1649 + "memchr", 1650 + "regex-automata", 1651 + "regex-syntax", 1652 + ] 1653 + 1654 + [[package]] 1579 1655 name = "regex-automata" 1580 1656 version = "0.4.14" 1581 1657 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1736 1812 checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" 1737 1813 1738 1814 [[package]] 1815 + name = "scc" 1816 + version = "2.4.0" 1817 + source = "registry+https://github.com/rust-lang/crates.io-index" 1818 + checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc" 1819 + dependencies = [ 1820 + "sdd", 1821 + ] 1822 + 1823 + [[package]] 1739 1824 name = "schannel" 1740 1825 version = "0.1.28" 1741 1826 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1749 1834 version = "1.2.0" 1750 1835 source = "registry+https://github.com/rust-lang/crates.io-index" 1751 1836 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1837 + 1838 + [[package]] 1839 + name = "sdd" 1840 + version = "3.0.10" 1841 + source = "registry+https://github.com/rust-lang/crates.io-index" 1842 + checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" 1752 1843 1753 1844 [[package]] 1754 1845 name = "sec1" ··· 1857 1948 "itoa", 1858 1949 "ryu", 1859 1950 "serde", 1951 + ] 1952 + 1953 + [[package]] 1954 + name = "serial_test" 1955 + version = "3.3.1" 1956 + source = "registry+https://github.com/rust-lang/crates.io-index" 1957 + checksum = "0d0b343e184fc3b7bb44dff0705fffcf4b3756ba6aff420dddd8b24ca145e555" 1958 + dependencies = [ 1959 + "futures-executor", 1960 + "futures-util", 1961 + "log", 1962 + "once_cell", 1963 + "parking_lot", 1964 + "scc", 1965 + "serial_test_derive", 1966 + ] 1967 + 1968 + [[package]] 1969 + name = "serial_test_derive" 1970 + version = "3.3.1" 1971 + source = "registry+https://github.com/rust-lang/crates.io-index" 1972 + checksum = "6f50427f258fb77356e4cd4aa0e87e2bd2c66dbcee41dc405282cae2bfc26c83" 1973 + dependencies = [ 1974 + "proc-macro2", 1975 + "quote", 1976 + "syn", 1860 1977 ] 1861 1978 1862 1979 [[package]] ··· 3133 3250 version = "0.53.1" 3134 3251 source = "registry+https://github.com/rust-lang/crates.io-index" 3135 3252 checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" 3253 + 3254 + [[package]] 3255 + name = "wiremock" 3256 + version = "0.6.5" 3257 + source = "registry+https://github.com/rust-lang/crates.io-index" 3258 + checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031" 3259 + dependencies = [ 3260 + "assert-json-diff", 3261 + "base64", 3262 + "deadpool", 3263 + "futures", 3264 + "http", 3265 + "http-body-util", 3266 + "hyper", 3267 + "hyper-util", 3268 + "log", 3269 + "once_cell", 3270 + "regex", 3271 + "serde", 3272 + "serde_json", 3273 + "tokio", 3274 + "url", 3275 + ] 3136 3276 3137 3277 [[package]] 3138 3278 name = "wit-bindgen"
+6
Cargo.toml
··· 24 24 tracing = "0.1" 25 25 tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } 26 26 hex = "0.4.3" 27 + 28 + [dev-dependencies] 29 + wiremock = "0.6" 30 + tower = { version = "0.5", features = ["util"] } 31 + http-body-util = "0.1" 32 + serial_test = "3"
+32 -1
src/admin.rs
··· 15 15 // --------------------------------------------------------------------------- 16 16 17 17 /// SHA-256 hash a plaintext API key for storage/comparison. 18 - fn hash_api_key(key: &str) -> String { 18 + pub(crate) fn hash_api_key(key: &str) -> String { 19 19 let hash = Sha256::digest(key.as_bytes()); 20 20 hex::encode(hash) 21 21 } ··· 545 545 546 546 Ok(StatusCode::NO_CONTENT) 547 547 } 548 + 549 + #[cfg(test)] 550 + mod tests { 551 + use super::*; 552 + 553 + #[test] 554 + fn hash_api_key_produces_deterministic_sha256_hex() { 555 + let h1 = hash_api_key("test-key"); 556 + let h2 = hash_api_key("test-key"); 557 + assert_eq!(h1, h2); 558 + assert_eq!(h1.len(), 64); 559 + assert!(h1.chars().all(|c| c.is_ascii_hexdigit())); 560 + } 561 + 562 + #[test] 563 + fn hash_api_key_different_inputs_differ() { 564 + let h1 = hash_api_key("key-a"); 565 + let h2 = hash_api_key("key-b"); 566 + assert_ne!(h1, h2); 567 + } 568 + 569 + #[test] 570 + fn hash_api_key_known_value() { 571 + // SHA-256 of "hello" is well-known 572 + let hash = hash_api_key("hello"); 573 + assert_eq!( 574 + hash, 575 + "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" 576 + ); 577 + } 578 + }
+6
src/auth/middleware.rs
··· 22 22 pub fn token(&self) -> &str { 23 23 &self.token 24 24 } 25 + 26 + /// Test-only constructor. 27 + #[cfg(test)] 28 + pub fn new_for_test(did: String, token: String) -> Self { 29 + Self { did, token } 30 + } 25 31 } 26 32 27 33 #[derive(Deserialize)]
+7 -4
src/backfill.rs
··· 150 150 db: &PgPool, 151 151 http: &reqwest::Client, 152 152 relay_url: &str, 153 + plc_url: &str, 153 154 job_id: &str, 154 155 ) -> Result<(), String> { 155 156 // Fetch the job ··· 232 233 let db = db.clone(); 233 234 let collection = collection.clone(); 234 235 236 + let plc_url = plc_url.to_string(); 235 237 let task = tokio::spawn(async move { 236 238 let _permit = permit; 237 - backfill_repo(&db, &http, &did, &collection).await 239 + backfill_repo(&db, &http, &plc_url, &did, &collection).await 238 240 }); 239 241 tasks.push(task); 240 242 } ··· 286 288 async fn backfill_repo( 287 289 db: &PgPool, 288 290 http: &reqwest::Client, 291 + plc_url: &str, 289 292 did: &str, 290 293 collection: &str, 291 294 ) -> Result<i32, String> { 292 295 // Resolve PDS 293 - let pds = profile::resolve_pds_endpoint(http, did) 296 + let pds = profile::resolve_pds_endpoint(http, plc_url, did) 294 297 .await 295 298 .map_err(|e| format!("PDS resolution failed for {did}: {e}"))?; 296 299 ··· 330 333 // --------------------------------------------------------------------------- 331 334 332 335 /// Spawn a background task that polls for pending backfill jobs and runs them. 333 - pub fn spawn_worker(db: PgPool, http: reqwest::Client, relay_url: String) { 336 + pub fn spawn_worker(db: PgPool, http: reqwest::Client, relay_url: String, plc_url: String) { 334 337 tokio::spawn(async move { 335 338 info!("backfill worker started"); 336 339 loop { ··· 344 347 345 348 if let Some((job_id,)) = job { 346 349 info!(job = %job_id, "picked up backfill job"); 347 - if let Err(e) = run_job(&db, &http, &relay_url, &job_id).await { 350 + if let Err(e) = run_job(&db, &http, &relay_url, &plc_url, &job_id).await { 348 351 error!(job = %job_id, error = %e, "backfill job failed"); 349 352 let _ = sqlx::query( 350 353 "UPDATE backfill_jobs SET status = 'failed', completed_at = NOW(), error = $2 WHERE id::text = $1",
+113
src/config.rs
··· 10 10 pub jetstream_url: String, 11 11 pub admin_secret: Option<String>, 12 12 pub relay_url: String, 13 + pub plc_url: String, 13 14 } 14 15 15 16 impl Config { ··· 29 30 admin_secret: env::var("ADMIN_SECRET").ok(), 30 31 relay_url: env::var("RELAY_URL") 31 32 .unwrap_or_else(|_| "https://bsky.network".into()), 33 + plc_url: env::var("PLC_URL") 34 + .unwrap_or_else(|_| "https://plc.directory".into()), 32 35 } 33 36 } 34 37 ··· 38 41 .expect("invalid HOST/PORT") 39 42 } 40 43 } 44 + 45 + #[cfg(test)] 46 + mod tests { 47 + use super::*; 48 + use serial_test::serial; 49 + 50 + unsafe fn clear_env() { 51 + for key in [ 52 + "HOST", "PORT", "DATABASE_URL", "AIP_URL", "JETSTREAM_URL", 53 + "ADMIN_SECRET", "RELAY_URL", "PLC_URL", 54 + ] { 55 + unsafe { env::remove_var(key); } 56 + } 57 + } 58 + 59 + unsafe fn set_required_env() { 60 + unsafe { 61 + env::set_var("DATABASE_URL", "postgres://localhost/test"); 62 + env::set_var("AIP_URL", "http://localhost:4000"); 63 + } 64 + } 65 + 66 + #[test] 67 + fn listen_addr_combines_host_and_port() { 68 + let config = Config { 69 + host: "127.0.0.1".into(), 70 + port: 8080, 71 + database_url: String::new(), 72 + aip_url: String::new(), 73 + jetstream_url: String::new(), 74 + admin_secret: None, 75 + relay_url: String::new(), 76 + plc_url: String::new(), 77 + }; 78 + assert_eq!( 79 + config.listen_addr(), 80 + "127.0.0.1:8080".parse::<SocketAddr>().unwrap() 81 + ); 82 + } 83 + 84 + #[test] 85 + #[serial] 86 + fn from_env_reads_required_vars() { 87 + unsafe { 88 + clear_env(); 89 + set_required_env(); 90 + } 91 + let config = Config::from_env(); 92 + assert_eq!(config.database_url, "postgres://localhost/test"); 93 + assert_eq!(config.aip_url, "http://localhost:4000"); 94 + } 95 + 96 + #[test] 97 + #[serial] 98 + fn from_env_applies_defaults() { 99 + unsafe { 100 + clear_env(); 101 + set_required_env(); 102 + } 103 + let config = Config::from_env(); 104 + assert_eq!(config.host, "0.0.0.0"); 105 + assert_eq!(config.port, 3000); 106 + assert!(config.jetstream_url.contains("jetstream")); 107 + assert_eq!(config.relay_url, "https://bsky.network"); 108 + assert_eq!(config.plc_url, "https://plc.directory"); 109 + assert!(config.admin_secret.is_none()); 110 + } 111 + 112 + #[test] 113 + #[serial] 114 + fn from_env_reads_optional_overrides() { 115 + unsafe { 116 + clear_env(); 117 + set_required_env(); 118 + env::set_var("HOST", "10.0.0.1"); 119 + env::set_var("PORT", "9090"); 120 + env::set_var("ADMIN_SECRET", "s3cret"); 121 + env::set_var("RELAY_URL", "https://relay.example.com"); 122 + env::set_var("PLC_URL", "https://plc.example.com"); 123 + } 124 + let config = Config::from_env(); 125 + assert_eq!(config.host, "10.0.0.1"); 126 + assert_eq!(config.port, 9090); 127 + assert_eq!(config.admin_secret, Some("s3cret".into())); 128 + assert_eq!(config.relay_url, "https://relay.example.com"); 129 + assert_eq!(config.plc_url, "https://plc.example.com"); 130 + } 131 + 132 + #[test] 133 + #[serial] 134 + #[should_panic(expected = "DATABASE_URL must be set")] 135 + fn from_env_panics_without_database_url() { 136 + unsafe { 137 + clear_env(); 138 + env::set_var("AIP_URL", "http://localhost:4000"); 139 + } 140 + Config::from_env(); 141 + } 142 + 143 + #[test] 144 + #[serial] 145 + #[should_panic(expected = "AIP_URL must be set")] 146 + fn from_env_panics_without_aip_url() { 147 + unsafe { 148 + clear_env(); 149 + env::set_var("DATABASE_URL", "postgres://localhost/test"); 150 + } 151 + Config::from_env(); 152 + } 153 + }
+76
src/error.rs
··· 53 53 } 54 54 } 55 55 } 56 + 57 + #[cfg(test)] 58 + mod tests { 59 + use super::*; 60 + use axum::response::IntoResponse; 61 + use http_body_util::BodyExt; 62 + 63 + async fn response_parts(err: AppError) -> (StatusCode, serde_json::Value) { 64 + let resp = err.into_response(); 65 + let status = resp.status(); 66 + let body = resp.into_body().collect().await.unwrap().to_bytes(); 67 + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); 68 + (status, json) 69 + } 70 + 71 + #[tokio::test] 72 + async fn auth_error_returns_401() { 73 + let (status, body) = response_parts(AppError::Auth("bad token".into())).await; 74 + assert_eq!(status, StatusCode::UNAUTHORIZED); 75 + assert_eq!(body["error"], "bad token"); 76 + } 77 + 78 + #[tokio::test] 79 + async fn bad_request_returns_400() { 80 + let (status, body) = response_parts(AppError::BadRequest("missing field".into())).await; 81 + assert_eq!(status, StatusCode::BAD_REQUEST); 82 + assert_eq!(body["error"], "missing field"); 83 + } 84 + 85 + #[tokio::test] 86 + async fn internal_error_returns_500_and_hides_detail() { 87 + let (status, body) = response_parts(AppError::Internal("secret details".into())).await; 88 + assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR); 89 + assert_eq!(body["error"], "internal server error"); 90 + } 91 + 92 + #[tokio::test] 93 + async fn not_found_returns_404() { 94 + let (status, body) = response_parts(AppError::NotFound("no such thing".into())).await; 95 + assert_eq!(status, StatusCode::NOT_FOUND); 96 + assert_eq!(body["error"], "no such thing"); 97 + } 98 + 99 + #[tokio::test] 100 + async fn pds_error_preserves_status_and_body() { 101 + let raw_body = Bytes::from(r#"{"error":"upstream"}"#); 102 + let resp = AppError::PdsError(StatusCode::BAD_GATEWAY, raw_body.clone()).into_response(); 103 + assert_eq!(resp.status(), StatusCode::BAD_GATEWAY); 104 + let body = resp.into_body().collect().await.unwrap().to_bytes(); 105 + assert_eq!(body, raw_body); 106 + } 107 + 108 + #[test] 109 + fn display_formats() { 110 + assert_eq!( 111 + AppError::Auth("x".into()).to_string(), 112 + "auth error: x" 113 + ); 114 + assert_eq!( 115 + AppError::BadRequest("y".into()).to_string(), 116 + "bad request: y" 117 + ); 118 + assert_eq!( 119 + AppError::Internal("z".into()).to_string(), 120 + "internal error: z" 121 + ); 122 + assert_eq!( 123 + AppError::NotFound("w".into()).to_string(), 124 + "not found: w" 125 + ); 126 + assert_eq!( 127 + AppError::PdsError(StatusCode::BAD_GATEWAY, Bytes::new()).to_string(), 128 + "PDS error: 502 Bad Gateway" 129 + ); 130 + } 131 + }
+227
src/lexicon.rs
··· 183 183 inner.len() 184 184 } 185 185 } 186 + 187 + #[cfg(test)] 188 + mod tests { 189 + use super::*; 190 + use serde_json::json; 191 + 192 + // ----------------------------------------------------------------------- 193 + // ParsedLexicon::parse 194 + // ----------------------------------------------------------------------- 195 + 196 + fn record_lexicon_json() -> Value { 197 + json!({ 198 + "lexicon": 1, 199 + "id": "games.gamesgamesgamesgames.game", 200 + "defs": { 201 + "main": { 202 + "type": "record", 203 + "key": "tid", 204 + "record": { 205 + "type": "object", 206 + "properties": { 207 + "title": { "type": "string" } 208 + } 209 + } 210 + } 211 + } 212 + }) 213 + } 214 + 215 + fn query_lexicon_json() -> Value { 216 + json!({ 217 + "lexicon": 1, 218 + "id": "games.gamesgamesgamesgames.listGames", 219 + "defs": { 220 + "main": { 221 + "type": "query", 222 + "parameters": { 223 + "type": "params", 224 + "properties": { 225 + "limit": { "type": "integer" } 226 + } 227 + }, 228 + "output": { 229 + "encoding": "application/json" 230 + } 231 + } 232 + } 233 + }) 234 + } 235 + 236 + fn procedure_lexicon_json() -> Value { 237 + json!({ 238 + "lexicon": 1, 239 + "id": "games.gamesgamesgamesgames.createGame", 240 + "defs": { 241 + "main": { 242 + "type": "procedure", 243 + "input": { 244 + "encoding": "application/json" 245 + }, 246 + "output": { 247 + "encoding": "application/json" 248 + } 249 + } 250 + } 251 + }) 252 + } 253 + 254 + fn definitions_lexicon_json() -> Value { 255 + json!({ 256 + "lexicon": 1, 257 + "id": "games.gamesgamesgamesgames.defs", 258 + "defs": { 259 + "genre": { 260 + "type": "string", 261 + "knownValues": ["action", "rpg"] 262 + } 263 + } 264 + }) 265 + } 266 + 267 + #[test] 268 + fn parse_record_lexicon() { 269 + let parsed = ParsedLexicon::parse(record_lexicon_json(), 1, None).unwrap(); 270 + assert_eq!(parsed.id, "games.gamesgamesgamesgames.game"); 271 + assert_eq!(parsed.lexicon_type, LexiconType::Record); 272 + assert_eq!(parsed.record_key, Some("tid".into())); 273 + assert!(parsed.record_schema.is_some()); 274 + assert!(parsed.parameters.is_none()); 275 + assert!(parsed.input.is_none()); 276 + } 277 + 278 + #[test] 279 + fn parse_query_lexicon() { 280 + let parsed = ParsedLexicon::parse(query_lexicon_json(), 2, Some("games.gamesgamesgamesgames.game".into())).unwrap(); 281 + assert_eq!(parsed.lexicon_type, LexiconType::Query); 282 + assert!(parsed.parameters.is_some()); 283 + assert!(parsed.output.is_some()); 284 + assert_eq!(parsed.target_collection, Some("games.gamesgamesgamesgames.game".into())); 285 + assert_eq!(parsed.revision, 2); 286 + } 287 + 288 + #[test] 289 + fn parse_procedure_lexicon() { 290 + let parsed = ParsedLexicon::parse(procedure_lexicon_json(), 1, None).unwrap(); 291 + assert_eq!(parsed.lexicon_type, LexiconType::Procedure); 292 + assert!(parsed.input.is_some()); 293 + assert!(parsed.output.is_some()); 294 + } 295 + 296 + #[test] 297 + fn parse_definitions_lexicon() { 298 + let parsed = ParsedLexicon::parse(definitions_lexicon_json(), 1, None).unwrap(); 299 + assert_eq!(parsed.lexicon_type, LexiconType::Definitions); 300 + } 301 + 302 + #[test] 303 + fn parse_missing_id_returns_error() { 304 + let raw = json!({"lexicon": 1, "defs": {}}); 305 + let result = ParsedLexicon::parse(raw, 1, None); 306 + assert!(result.is_err()); 307 + assert!(result.unwrap_err().contains("id")); 308 + } 309 + 310 + #[test] 311 + fn parse_preserves_raw_json() { 312 + let raw = record_lexicon_json(); 313 + let parsed = ParsedLexicon::parse(raw.clone(), 1, None).unwrap(); 314 + assert_eq!(parsed.raw, raw); 315 + } 316 + 317 + #[test] 318 + fn parse_target_collection_passthrough() { 319 + let parsed = ParsedLexicon::parse( 320 + query_lexicon_json(), 321 + 1, 322 + Some("custom.collection".into()), 323 + ) 324 + .unwrap(); 325 + assert_eq!(parsed.target_collection, Some("custom.collection".into())); 326 + } 327 + 328 + // ----------------------------------------------------------------------- 329 + // LexiconRegistry 330 + // ----------------------------------------------------------------------- 331 + 332 + #[tokio::test] 333 + async fn registry_new_is_empty() { 334 + let reg = LexiconRegistry::new(); 335 + assert_eq!(reg.count().await, 0); 336 + } 337 + 338 + #[tokio::test] 339 + async fn registry_upsert_and_get() { 340 + let reg = LexiconRegistry::new(); 341 + let parsed = ParsedLexicon::parse(record_lexicon_json(), 1, None).unwrap(); 342 + reg.upsert(parsed).await; 343 + 344 + let got = reg.get("games.gamesgamesgamesgames.game").await; 345 + assert!(got.is_some()); 346 + assert_eq!(got.unwrap().lexicon_type, LexiconType::Record); 347 + } 348 + 349 + #[tokio::test] 350 + async fn registry_upsert_replaces() { 351 + let reg = LexiconRegistry::new(); 352 + let v1 = ParsedLexicon::parse(record_lexicon_json(), 1, None).unwrap(); 353 + reg.upsert(v1).await; 354 + 355 + let v2 = ParsedLexicon::parse(record_lexicon_json(), 5, None).unwrap(); 356 + reg.upsert(v2).await; 357 + 358 + assert_eq!(reg.count().await, 1); 359 + assert_eq!(reg.get("games.gamesgamesgamesgames.game").await.unwrap().revision, 5); 360 + } 361 + 362 + #[tokio::test] 363 + async fn registry_remove_existing() { 364 + let reg = LexiconRegistry::new(); 365 + let parsed = ParsedLexicon::parse(record_lexicon_json(), 1, None).unwrap(); 366 + reg.upsert(parsed).await; 367 + 368 + assert!(reg.remove("games.gamesgamesgamesgames.game").await); 369 + assert_eq!(reg.count().await, 0); 370 + } 371 + 372 + #[tokio::test] 373 + async fn registry_remove_nonexistent() { 374 + let reg = LexiconRegistry::new(); 375 + assert!(!reg.remove("nonexistent").await); 376 + } 377 + 378 + #[tokio::test] 379 + async fn registry_get_nonexistent() { 380 + let reg = LexiconRegistry::new(); 381 + assert!(reg.get("nonexistent").await.is_none()); 382 + } 383 + 384 + #[tokio::test] 385 + async fn registry_type_filtered_collections() { 386 + let reg = LexiconRegistry::new(); 387 + 388 + let record = ParsedLexicon::parse(record_lexicon_json(), 1, None).unwrap(); 389 + let query = ParsedLexicon::parse(query_lexicon_json(), 1, None).unwrap(); 390 + let procedure = ParsedLexicon::parse(procedure_lexicon_json(), 1, None).unwrap(); 391 + let defs = ParsedLexicon::parse(definitions_lexicon_json(), 1, None).unwrap(); 392 + 393 + reg.upsert(record).await; 394 + reg.upsert(query).await; 395 + reg.upsert(procedure).await; 396 + reg.upsert(defs).await; 397 + 398 + assert_eq!(reg.count().await, 4); 399 + 400 + let records = reg.get_record_collections().await; 401 + assert_eq!(records.len(), 1); 402 + assert!(records.contains(&"games.gamesgamesgamesgames.game".to_string())); 403 + 404 + let queries = reg.get_queries().await; 405 + assert_eq!(queries.len(), 1); 406 + assert!(queries.contains(&"games.gamesgamesgamesgames.listGames".to_string())); 407 + 408 + let procedures = reg.get_procedures().await; 409 + assert_eq!(procedures.len(), 1); 410 + assert!(procedures.contains(&"games.gamesgamesgamesgames.createGame".to_string())); 411 + } 412 + }
+24
src/lib.rs
··· 1 + pub mod admin; 2 + pub mod auth; 3 + pub mod backfill; 4 + pub mod config; 5 + pub mod error; 6 + pub mod jetstream; 7 + pub mod lexicon; 8 + pub mod profile; 9 + pub mod repo; 10 + pub mod server; 11 + pub mod xrpc; 12 + 13 + use config::Config; 14 + use lexicon::LexiconRegistry; 15 + use tokio::sync::watch; 16 + 17 + #[derive(Clone)] 18 + pub struct AppState { 19 + pub config: Config, 20 + pub http: reqwest::Client, 21 + pub db: sqlx::PgPool, 22 + pub lexicons: LexiconRegistry, 23 + pub collections_tx: watch::Sender<Vec<String>>, 24 + }
+4 -24
src/main.rs
··· 1 - mod admin; 2 - mod auth; 3 - mod backfill; 4 - mod config; 5 - mod error; 6 - mod jetstream; 7 - mod lexicon; 8 - mod profile; 9 - mod repo; 10 - mod server; 11 - mod xrpc; 12 - 13 - use config::Config; 14 - use lexicon::LexiconRegistry; 1 + use happyview::config::Config; 2 + use happyview::lexicon::LexiconRegistry; 3 + use happyview::{admin, backfill, jetstream, server, AppState}; 15 4 use tokio::sync::watch; 16 5 use tracing::info; 17 - 18 - #[derive(Clone)] 19 - pub struct AppState { 20 - pub config: Config, 21 - pub http: reqwest::Client, 22 - pub db: sqlx::PgPool, 23 - pub lexicons: LexiconRegistry, 24 - pub collections_tx: watch::Sender<Vec<String>>, 25 - } 26 6 27 7 #[tokio::main] 28 8 async fn main() { ··· 69 49 }; 70 50 71 51 jetstream::spawn(state.db.clone(), config.jetstream_url.clone(), collections_rx); 72 - backfill::spawn_worker(state.db.clone(), state.http.clone(), config.relay_url.clone()); 52 + backfill::spawn_worker(state.db.clone(), state.http.clone(), config.relay_url.clone(), config.plc_url.clone()); 73 53 74 54 let app = server::router(state); 75 55 let addr = config.listen_addr();
+6 -6
src/profile.rs
··· 36 36 } 37 37 38 38 /// Resolve a full profile for the given DID: DID document -> handle + PDS -> profile record. 39 - pub async fn resolve_profile(http: &reqwest::Client, did: &str) -> Result<Profile, AppError> { 40 - let did_doc = resolve_did_document(http, did).await?; 39 + pub async fn resolve_profile(http: &reqwest::Client, plc_url: &str, did: &str) -> Result<Profile, AppError> { 40 + let did_doc = resolve_did_document(http, plc_url, did).await?; 41 41 42 42 let handle = did_doc 43 43 .also_known_as ··· 67 67 } 68 68 69 69 /// Resolve the PDS endpoint for a DID by fetching its DID document. 70 - pub async fn resolve_pds_endpoint(http: &reqwest::Client, did: &str) -> Result<String, AppError> { 71 - let did_doc = resolve_did_document(http, did).await?; 70 + pub async fn resolve_pds_endpoint(http: &reqwest::Client, plc_url: &str, did: &str) -> Result<String, AppError> { 71 + let did_doc = resolve_did_document(http, plc_url, did).await?; 72 72 73 73 did_doc 74 74 .service ··· 80 80 81 81 /// Fetch a DID document from the PLC directory. 82 82 // TODO: handle did:web:* resolution (fetch https://{domain}/.well-known/did.json) 83 - async fn resolve_did_document(http: &reqwest::Client, did: &str) -> Result<DidDocument, AppError> { 84 - let url = format!("https://plc.directory/{did}"); 83 + async fn resolve_did_document(http: &reqwest::Client, plc_url: &str, did: &str) -> Result<DidDocument, AppError> { 84 + let url = format!("{}/{did}", plc_url.trim_end_matches('/')); 85 85 86 86 let resp = http 87 87 .get(&url)
+218
src/repo.rs
··· 358 358 } 359 359 } 360 360 } 361 + 362 + #[cfg(test)] 363 + mod tests { 364 + use super::*; 365 + 366 + // ----------------------------------------------------------------------- 367 + // parse_did_from_at_uri 368 + // ----------------------------------------------------------------------- 369 + 370 + #[test] 371 + fn parse_did_from_valid_at_uri() { 372 + let did = parse_did_from_at_uri("at://did:plc:abc123/app.bsky.feed.post/3k2bqxyz").unwrap(); 373 + assert_eq!(did, "did:plc:abc123"); 374 + } 375 + 376 + #[test] 377 + fn parse_did_from_uri_with_no_rkey() { 378 + let did = parse_did_from_at_uri("at://did:plc:abc123/collection").unwrap(); 379 + assert_eq!(did, "did:plc:abc123"); 380 + } 381 + 382 + #[test] 383 + fn parse_did_from_did_web_uri() { 384 + let did = parse_did_from_at_uri("at://did:web:example.com/collection/rkey").unwrap(); 385 + assert_eq!(did, "did:web:example.com"); 386 + } 387 + 388 + #[test] 389 + fn parse_did_from_uri_missing_prefix() { 390 + let result = parse_did_from_at_uri("did:plc:abc123/collection/rkey"); 391 + assert!(result.is_err()); 392 + } 393 + 394 + // ----------------------------------------------------------------------- 395 + // enrich_media_blobs 396 + // ----------------------------------------------------------------------- 397 + 398 + #[test] 399 + fn enrich_media_adds_url() { 400 + let mut record = json!({ 401 + "media": [{ 402 + "blob": { 403 + "ref": { "$link": "bafyreiabc" }, 404 + "mimeType": "image/jpeg", 405 + "size": 1024 406 + } 407 + }] 408 + }); 409 + 410 + enrich_media_blobs(&mut record, "https://pds.example.com", "did:plc:test"); 411 + 412 + let url = record["media"][0]["blob"]["url"].as_str().unwrap(); 413 + assert_eq!( 414 + url, 415 + "https://pds.example.com/xrpc/com.atproto.sync.getBlob?did=did:plc:test&cid=bafyreiabc" 416 + ); 417 + } 418 + 419 + #[test] 420 + fn enrich_media_noop_without_media() { 421 + let mut record = json!({"title": "test"}); 422 + enrich_media_blobs(&mut record, "https://pds.example.com", "did:plc:test"); 423 + assert!(record.get("media").is_none()); 424 + } 425 + 426 + #[test] 427 + fn enrich_media_skips_items_without_ref() { 428 + let mut record = json!({ 429 + "media": [{ 430 + "blob": { "mimeType": "image/png" } 431 + }] 432 + }); 433 + 434 + enrich_media_blobs(&mut record, "https://pds.example.com", "did:plc:test"); 435 + assert!(record["media"][0]["blob"].get("url").is_none()); 436 + } 437 + 438 + #[test] 439 + fn enrich_media_handles_multiple_items() { 440 + let mut record = json!({ 441 + "media": [ 442 + { "blob": { "ref": { "$link": "cid1" } } }, 443 + { "blob": { "ref": { "$link": "cid2" } } } 444 + ] 445 + }); 446 + 447 + enrich_media_blobs(&mut record, "https://pds.example.com/", "did:plc:x"); 448 + 449 + let url1 = record["media"][0]["blob"]["url"].as_str().unwrap(); 450 + let url2 = record["media"][1]["blob"]["url"].as_str().unwrap(); 451 + assert!(url1.contains("cid1")); 452 + assert!(url2.contains("cid2")); 453 + } 454 + 455 + #[test] 456 + fn enrich_media_trims_trailing_slash() { 457 + let mut record = json!({ 458 + "media": [{ 459 + "blob": { "ref": { "$link": "bafytest" } } 460 + }] 461 + }); 462 + 463 + enrich_media_blobs(&mut record, "https://pds.example.com/", "did:plc:test"); 464 + 465 + let url = record["media"][0]["blob"]["url"].as_str().unwrap(); 466 + assert!(url.starts_with("https://pds.example.com/xrpc/")); 467 + assert!(!url.contains("//xrpc")); 468 + } 469 + 470 + // ----------------------------------------------------------------------- 471 + // generate_dpop_proof 472 + // ----------------------------------------------------------------------- 473 + 474 + fn test_dpop_jwk() -> DpopJwk { 475 + use p256::elliptic_curve::rand_core::OsRng; 476 + use p256::elliptic_curve::sec1::ToEncodedPoint; 477 + // Generate a valid P-256 key for testing 478 + let secret = p256::SecretKey::random(&mut OsRng); 479 + let public = secret.public_key(); 480 + let point = public.to_encoded_point(false); 481 + 482 + DpopJwk { 483 + x: base64::engine::general_purpose::URL_SAFE_NO_PAD 484 + .encode(point.x().unwrap()), 485 + y: base64::engine::general_purpose::URL_SAFE_NO_PAD 486 + .encode(point.y().unwrap()), 487 + d: base64::engine::general_purpose::URL_SAFE_NO_PAD 488 + .encode(secret.to_bytes()), 489 + } 490 + } 491 + 492 + #[test] 493 + fn dpop_proof_produces_valid_jwt_structure() { 494 + let jwk = test_dpop_jwk(); 495 + let token = generate_dpop_proof("POST", "https://pds.example.com/xrpc/test", &jwk, "access-tok", None).unwrap(); 496 + 497 + let parts: Vec<&str> = token.split('.').collect(); 498 + assert_eq!(parts.len(), 3, "JWT should have 3 parts"); 499 + } 500 + 501 + #[test] 502 + fn dpop_proof_header_has_correct_fields() { 503 + let jwk = test_dpop_jwk(); 504 + let token = generate_dpop_proof("POST", "https://pds.example.com/xrpc/test", &jwk, "access-tok", None).unwrap(); 505 + 506 + let header_b64 = token.split('.').next().unwrap(); 507 + let header_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 508 + .decode(header_b64) 509 + .unwrap(); 510 + let header: serde_json::Value = serde_json::from_slice(&header_bytes).unwrap(); 511 + 512 + assert_eq!(header["typ"], "dpop+jwt"); 513 + assert_eq!(header["alg"], "ES256"); 514 + assert!(header.get("jwk").is_some()); 515 + } 516 + 517 + #[test] 518 + fn dpop_proof_claims_have_correct_fields() { 519 + let jwk = test_dpop_jwk(); 520 + let token = generate_dpop_proof("GET", "https://pds.example.com/xrpc/test", &jwk, "my-access-token", None).unwrap(); 521 + 522 + let payload_b64 = token.split('.').nth(1).unwrap(); 523 + let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 524 + .decode(payload_b64) 525 + .unwrap(); 526 + let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).unwrap(); 527 + 528 + assert_eq!(claims["htm"], "GET"); 529 + assert_eq!(claims["htu"], "https://pds.example.com/xrpc/test"); 530 + assert!(claims.get("jti").is_some()); 531 + assert!(claims.get("iat").is_some()); 532 + assert!(claims.get("exp").is_some()); 533 + assert!(claims.get("ath").is_some()); 534 + assert!(claims.get("nonce").is_none()); 535 + } 536 + 537 + #[test] 538 + fn dpop_proof_includes_nonce_when_provided() { 539 + let jwk = test_dpop_jwk(); 540 + let token = generate_dpop_proof("POST", "https://pds.example.com/xrpc/test", &jwk, "tok", Some("abc123")).unwrap(); 541 + 542 + let payload_b64 = token.split('.').nth(1).unwrap(); 543 + let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 544 + .decode(payload_b64) 545 + .unwrap(); 546 + let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).unwrap(); 547 + 548 + assert_eq!(claims["nonce"], "abc123"); 549 + } 550 + 551 + #[test] 552 + fn dpop_proof_ath_is_sha256_of_access_token() { 553 + let jwk = test_dpop_jwk(); 554 + let access_token = "test-access-token"; 555 + let token = generate_dpop_proof("POST", "https://example.com", &jwk, access_token, None).unwrap(); 556 + 557 + let payload_b64 = token.split('.').nth(1).unwrap(); 558 + let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD 559 + .decode(payload_b64) 560 + .unwrap(); 561 + let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).unwrap(); 562 + 563 + let expected_hash = Sha256::digest(access_token.as_bytes()); 564 + let expected_ath = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(expected_hash); 565 + assert_eq!(claims["ath"], expected_ath); 566 + } 567 + 568 + #[test] 569 + fn dpop_proof_invalid_key_returns_error() { 570 + let jwk = DpopJwk { 571 + x: "invalid".into(), 572 + y: "invalid".into(), 573 + d: "invalid".into(), 574 + }; 575 + let result = generate_dpop_proof("POST", "https://example.com", &jwk, "tok", None); 576 + assert!(result.is_err()); 577 + } 578 + }
+1 -1
src/server.rs
··· 36 36 State(state): State<AppState>, 37 37 claims: Claims, 38 38 ) -> Result<Json<profile::Profile>, AppError> { 39 - let profile = profile::resolve_profile(&state.http, claims.did()).await?; 39 + let profile = profile::resolve_profile(&state.http, &state.config.plc_url, claims.did()).await?; 40 40 Ok(Json(profile)) 41 41 }
+2 -2
src/xrpc.rs
··· 126 126 let unique_dids: HashSet<&str> = rows.iter().map(|(_, did, _)| did.as_str()).collect(); 127 127 let mut pds_map: HashMap<String, String> = HashMap::new(); 128 128 for did in unique_dids { 129 - if let Ok(pds) = profile::resolve_pds_endpoint(&state.http, did).await { 129 + if let Ok(pds) = profile::resolve_pds_endpoint(&state.http, &state.config.plc_url, did).await { 130 130 pds_map.insert(did.to_string(), pds); 131 131 } 132 132 } ··· 169 169 let (mut record,) = 170 170 row.ok_or_else(|| AppError::NotFound("record not found".into()))?; 171 171 172 - let pds = profile::resolve_pds_endpoint(&state.http, &did).await?; 172 + let pds = profile::resolve_pds_endpoint(&state.http, &state.config.plc_url, &did).await?; 173 173 repo::enrich_media_blobs(&mut record, &pds, &did); 174 174 175 175 record