A lexicon-driven AppView for ATProto. happyview.dev
backfill firehose jetstream atproto appview oauth lexicon
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add AIP reverse proxy and runtime config endpoint

Trezy 0bdf7b4c b9279d4e

+316 -20
+241
src/aip.rs
··· 1 + use axum::body::Body; 2 + use axum::extract::{Path, State}; 3 + use axum::http::{HeaderMap, Method, StatusCode, Uri}; 4 + use axum::response::{IntoResponse, Response}; 5 + 6 + use crate::AppState; 7 + 8 + /// Reverse-proxy requests from `/aip/*` to the configured AIP server. 9 + pub async fn aip_proxy( 10 + State(state): State<AppState>, 11 + method: Method, 12 + Path(path): Path<String>, 13 + uri: Uri, 14 + headers: HeaderMap, 15 + body: Body, 16 + ) -> impl IntoResponse { 17 + let query = uri.query().map(|q| format!("?{q}")).unwrap_or_default(); 18 + let upstream_url = format!("{}/{path}{query}", state.config.aip_url); 19 + 20 + let mut req = state.http.request(method.clone(), &upstream_url); 21 + 22 + // Copy relevant request headers 23 + for name in ["content-type", "authorization", "dpop", "accept"] { 24 + if let Some(val) = headers.get(name) { 25 + req = req.header(name, val); 26 + } 27 + } 28 + 29 + // Attach body for non-GET requests 30 + if method != Method::GET { 31 + let bytes = match axum::body::to_bytes(body, 10 * 1024 * 1024).await { 32 + Ok(b) => b, 33 + Err(_) => { 34 + return Response::builder() 35 + .status(StatusCode::BAD_REQUEST) 36 + .body(Body::from("request body too large")) 37 + .unwrap(); 38 + } 39 + }; 40 + req = req.body(bytes); 41 + } 42 + 43 + let upstream_resp = match req.send().await { 44 + Ok(r) => r, 45 + Err(e) => { 46 + tracing::error!("AIP proxy error: {e}"); 47 + return Response::builder() 48 + .status(StatusCode::BAD_GATEWAY) 49 + .body(Body::from("upstream request failed")) 50 + .unwrap(); 51 + } 52 + }; 53 + 54 + let status = upstream_resp.status(); 55 + let mut resp_headers = HeaderMap::new(); 56 + 57 + // Copy relevant response headers 58 + for name in [ 59 + "content-type", 60 + "dpop-nonce", 61 + "www-authenticate", 62 + "cache-control", 63 + ] { 64 + if let Some(val) = upstream_resp.headers().get(name) { 65 + resp_headers.insert( 66 + name.parse::<axum::http::header::HeaderName>().unwrap(), 67 + val.clone(), 68 + ); 69 + } 70 + } 71 + 72 + let bytes = match upstream_resp.bytes().await { 73 + Ok(b) => b, 74 + Err(e) => { 75 + tracing::error!("AIP proxy read error: {e}"); 76 + return Response::builder() 77 + .status(StatusCode::BAD_GATEWAY) 78 + .body(Body::from("failed to read upstream response")) 79 + .unwrap(); 80 + } 81 + }; 82 + 83 + let mut response = Response::new(Body::from(bytes)); 84 + *response.status_mut() = status; 85 + *response.headers_mut() = resp_headers; 86 + response 87 + } 88 + 89 + #[cfg(test)] 90 + mod tests { 91 + use super::*; 92 + use axum::Router; 93 + use axum::body::to_bytes; 94 + use axum::extract::Request; 95 + use axum::routing::{get, post}; 96 + use tokio::sync::watch; 97 + use tower::ServiceExt; 98 + 99 + fn test_state(aip_url: &str) -> AppState { 100 + let config = crate::config::Config { 101 + host: "127.0.0.1".into(), 102 + port: 3000, 103 + database_url: String::new(), 104 + aip_url: aip_url.into(), 105 + tap_url: String::new(), 106 + tap_admin_password: None, 107 + relay_url: String::new(), 108 + plc_url: String::new(), 109 + static_dir: String::new(), 110 + }; 111 + let (tx, _) = watch::channel(vec![]); 112 + AppState { 113 + config, 114 + http: reqwest::Client::new(), 115 + db: sqlx::PgPool::connect_lazy("postgres://localhost/fake").unwrap(), 116 + lexicons: crate::lexicon::LexiconRegistry::new(), 117 + collections_tx: tx, 118 + } 119 + } 120 + 121 + #[tokio::test] 122 + async fn proxy_forwards_get_request() { 123 + let mock = wiremock::MockServer::start().await; 124 + wiremock::Mock::given(wiremock::matchers::method("GET")) 125 + .and(wiremock::matchers::path("/oauth/authorize")) 126 + .respond_with( 127 + wiremock::ResponseTemplate::new(200) 128 + .set_body_string("ok") 129 + .insert_header("content-type", "text/plain") 130 + .insert_header("dpop-nonce", "test-nonce"), 131 + ) 132 + .mount(&mock) 133 + .await; 134 + 135 + let state = test_state(&mock.uri()); 136 + let app = Router::new() 137 + .route("/aip/{*path}", get(aip_proxy)) 138 + .with_state(state); 139 + 140 + let req = Request::builder() 141 + .method("GET") 142 + .uri("/aip/oauth/authorize") 143 + .body(Body::empty()) 144 + .unwrap(); 145 + 146 + let resp = app.oneshot(req).await.unwrap(); 147 + assert_eq!(resp.status(), StatusCode::OK); 148 + assert_eq!(resp.headers().get("dpop-nonce").unwrap(), "test-nonce"); 149 + let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap(); 150 + assert_eq!(&body[..], b"ok"); 151 + } 152 + 153 + #[tokio::test] 154 + async fn proxy_forwards_post_with_body() { 155 + let mock = wiremock::MockServer::start().await; 156 + wiremock::Mock::given(wiremock::matchers::method("POST")) 157 + .and(wiremock::matchers::path("/oauth/token")) 158 + .respond_with( 159 + wiremock::ResponseTemplate::new(200) 160 + .set_body_json(serde_json::json!({"access_token": "tok"})) 161 + .insert_header("content-type", "application/json"), 162 + ) 163 + .mount(&mock) 164 + .await; 165 + 166 + let state = test_state(&mock.uri()); 167 + let app = Router::new() 168 + .route("/aip/{*path}", post(aip_proxy)) 169 + .with_state(state); 170 + 171 + let req = Request::builder() 172 + .method("POST") 173 + .uri("/aip/oauth/token") 174 + .header("content-type", "application/x-www-form-urlencoded") 175 + .body(Body::from("grant_type=authorization_code")) 176 + .unwrap(); 177 + 178 + let resp = app.oneshot(req).await.unwrap(); 179 + assert_eq!(resp.status(), StatusCode::OK); 180 + assert_eq!( 181 + resp.headers().get("content-type").unwrap(), 182 + "application/json" 183 + ); 184 + } 185 + 186 + #[tokio::test] 187 + async fn proxy_forwards_query_string() { 188 + let mock = wiremock::MockServer::start().await; 189 + wiremock::Mock::given(wiremock::matchers::method("GET")) 190 + .and(wiremock::matchers::path("/oauth/authorize")) 191 + .and(wiremock::matchers::query_param("client_id", "abc")) 192 + .respond_with(wiremock::ResponseTemplate::new(200).set_body_string("found")) 193 + .mount(&mock) 194 + .await; 195 + 196 + let state = test_state(&mock.uri()); 197 + let app = Router::new() 198 + .route("/aip/{*path}", get(aip_proxy)) 199 + .with_state(state); 200 + 201 + let req = Request::builder() 202 + .method("GET") 203 + .uri("/aip/oauth/authorize?client_id=abc") 204 + .body(Body::empty()) 205 + .unwrap(); 206 + 207 + let resp = app.oneshot(req).await.unwrap(); 208 + assert_eq!(resp.status(), StatusCode::OK); 209 + let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap(); 210 + assert_eq!(&body[..], b"found"); 211 + } 212 + 213 + #[tokio::test] 214 + async fn proxy_preserves_error_status() { 215 + let mock = wiremock::MockServer::start().await; 216 + wiremock::Mock::given(wiremock::matchers::method("POST")) 217 + .and(wiremock::matchers::path("/oauth/token")) 218 + .respond_with( 219 + wiremock::ResponseTemplate::new(400) 220 + .set_body_string("bad request") 221 + .insert_header("www-authenticate", "DPoP error=\"use_dpop_nonce\""), 222 + ) 223 + .mount(&mock) 224 + .await; 225 + 226 + let state = test_state(&mock.uri()); 227 + let app = Router::new() 228 + .route("/aip/{*path}", post(aip_proxy)) 229 + .with_state(state); 230 + 231 + let req = Request::builder() 232 + .method("POST") 233 + .uri("/aip/oauth/token") 234 + .body(Body::from("grant_type=authorization_code")) 235 + .unwrap(); 236 + 237 + let resp = app.oneshot(req).await.unwrap(); 238 + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 239 + assert!(resp.headers().get("www-authenticate").is_some()); 240 + } 241 + }
+1
src/lib.rs
··· 1 1 pub mod admin; 2 + pub mod aip; 2 3 pub mod auth; 3 4 pub mod config; 4 5 pub mod error;
+7
src/server.rs
··· 7 7 8 8 use crate::AppState; 9 9 use crate::admin; 10 + use crate::aip; 10 11 use crate::auth::Claims; 11 12 use crate::error::AppError; 12 13 use crate::profile; ··· 28 29 ) 29 30 // Catch-all for dynamically registered lexicons 30 31 .route("/xrpc/{method}", get(xrpc::xrpc_get).post(xrpc::xrpc_post)) 32 + .route("/config", get(config_endpoint)) 33 + .route("/aip/{*path}", get(aip::aip_proxy).post(aip::aip_proxy)) 31 34 .fallback_service(serve_dir) 32 35 .layer(TraceLayer::new_for_http()) 33 36 .layer(CorsLayer::permissive()) ··· 36 39 37 40 async fn health() -> &'static str { 38 41 "ok" 42 + } 43 + 44 + async fn config_endpoint(State(state): State<AppState>) -> Json<serde_json::Value> { 45 + Json(serde_json::json!({ "aip_url": state.config.aip_url })) 39 46 } 40 47 41 48 async fn get_profile(
+1
web/next.config.ts
··· 16 16 { source: "/xrpc/:path*", destination: `${apiBase}/xrpc/:path*` }, 17 17 { source: "/health", destination: `${apiBase}/health` }, 18 18 { source: "/aip/:path*", destination: `${aipBase}/:path*` }, 19 + { source: "/config", destination: `${apiBase}/config` }, 19 20 ]; 20 21 } 21 22
+6 -3
web/src/app/layout.tsx
··· 1 1 import type { Metadata } from "next" 2 2 import { Geist, Geist_Mono } from "next/font/google" 3 3 import "./globals.css" 4 + import { ConfigProvider } from "@/lib/config-context" 4 5 import { AuthProvider } from "@/lib/auth-context" 5 6 import { TooltipProvider } from "@/components/ui/tooltip" 6 7 ··· 29 30 <body 30 31 className={`${geistSans.variable} ${geistMono.variable} antialiased`} 31 32 > 32 - <AuthProvider> 33 - <TooltipProvider>{children}</TooltipProvider> 34 - </AuthProvider> 33 + <ConfigProvider> 34 + <AuthProvider> 35 + <TooltipProvider>{children}</TooltipProvider> 36 + </AuthProvider> 37 + </ConfigProvider> 35 38 </body> 36 39 </html> 37 40 )
+4 -3
web/src/lib/api.ts
··· 2 2 3 3 // The DPoP proof for admin API calls must target AIP's userinfo URL, 4 4 // because the backend forwards the proof to AIP for token validation. 5 - const AIP_URL = process.env.NEXT_PUBLIC_AIP_URL || "" 6 - const AIP_USERINFO_URL = `${AIP_URL}/oauth/userinfo` 5 + // Set at runtime via ConfigProvider. 6 + let aipUrl = "" 7 + export function setAipUrl(url: string) { aipUrl = url } 7 8 8 9 export class ApiError extends Error { 9 10 status: number ··· 24 25 25 26 // Proof targets AIP's userinfo endpoint (GET) since the backend 26 27 // forwards it there for token validation. 27 - const dpopProof = await createDpopProof("GET", AIP_USERINFO_URL, token, dpopNonce) 28 + const dpopProof = await createDpopProof("GET", `${aipUrl}/oauth/userinfo`, token, dpopNonce) 28 29 29 30 const headers: Record<string, string> = { 30 31 Authorization: `DPoP ${token}`,
+14 -14
web/src/lib/auth-context.tsx
··· 9 9 } from "react" 10 10 11 11 import { clearDpopKeypair, createDpopProof, ensureDpopKeypair, setDpopNonce } from "./dpop" 12 + import { useConfig } from "./config-context" 12 13 13 14 interface AuthContextType { 14 15 did: string | null ··· 27 28 loading: true, 28 29 error: null, 29 30 }) 30 - 31 - // AIP URL for browser redirects (authorization endpoint) 32 - const AIP_URL = process.env.NEXT_PUBLIC_AIP_URL || "" 33 31 34 32 // PKCE helpers 35 33 ··· 54 52 55 53 // Dynamic client registration with AIP. 56 54 // Caches the client_id in localStorage so we only register once. 57 - async function getOrRegisterClient(redirectUri: string): Promise<string> { 58 - const cacheKey = `oauth_client_id:${AIP_URL}:${redirectUri}` 55 + async function getOrRegisterClient(aipUrl: string, redirectUri: string): Promise<string> { 56 + const cacheKey = `oauth_client_id:${aipUrl}:${redirectUri}` 59 57 const cached = localStorage.getItem(cacheKey) 60 58 if (cached) return cached 61 59 ··· 84 82 } 85 83 86 84 export function AuthProvider({ children }: { children: React.ReactNode }) { 85 + const { aip_url } = useConfig() 87 86 const [accessToken, setAccessToken] = useState<string | null>(null) 88 87 const [did, setDid] = useState<string | null>(null) 89 88 const [loading, setLoading] = useState(true) ··· 106 105 107 106 if (code && state) { 108 107 console.log("[auth] OAuth callback detected, exchanging code") 109 - await handleOAuthCallback(code, state, cancelled, { 108 + await handleOAuthCallback(aip_url, code, state, cancelled, { 110 109 setAccessToken, 111 110 setDid, 112 111 }) ··· 149 148 return () => { 150 149 cancelled = true 151 150 } 152 - }, []) 151 + }, [aip_url]) 153 152 154 153 const getToken = useCallback(async (): Promise<string | null> => { 155 154 return accessToken 156 155 }, [accessToken]) 157 156 158 157 const login = useCallback(async (handle: string) => { 159 - if (!AIP_URL) { 160 - throw new Error("AIP URL not configured (set NEXT_PUBLIC_AIP_URL)") 158 + if (!aip_url) { 159 + throw new Error("AIP URL not configured") 161 160 } 162 161 163 162 setError(null) ··· 165 164 await ensureDpopKeypair() 166 165 167 166 const redirectUri = `${window.location.origin}/` 168 - const clientId = await getOrRegisterClient(redirectUri) 167 + const clientId = await getOrRegisterClient(aip_url, redirectUri) 169 168 170 169 const codeVerifier = generateRandomString(32) 171 170 const codeChallenge = await generateCodeChallenge(codeVerifier) ··· 186 185 login_hint: handle, 187 186 }) 188 187 189 - window.location.href = `${AIP_URL}/oauth/authorize?${params.toString()}` 190 - }, []) 188 + window.location.href = `${aip_url}/oauth/authorize?${params.toString()}` 189 + }, [aip_url]) 191 190 192 191 const logout = useCallback(async () => { 193 192 const clientId = sessionStorage.getItem("oauth_client_id") ··· 232 231 } 233 232 234 233 async function handleOAuthCallback( 234 + aipUrl: string, 235 235 code: string, 236 236 state: string, 237 237 cancelled: boolean, ··· 269 269 270 270 // Token exchange via proxied path (avoids CORS). 271 271 // AIP may require a DPoP nonce — retry once if we get one back. 272 - const tokenUrl = `${AIP_URL}/oauth/token` 272 + const tokenUrl = `${aipUrl}/oauth/token` 273 273 const tokenBody = new URLSearchParams({ 274 274 grant_type: "authorization_code", 275 275 code, ··· 331 331 // Get DID from token response or userinfo 332 332 let userDid: string | undefined = tokens.sub 333 333 if (!userDid) { 334 - const userinfoUrl = `${AIP_URL}/oauth/userinfo` 334 + const userinfoUrl = `${aipUrl}/oauth/userinfo` 335 335 // Use the nonce from the token response if available 336 336 let currentNonce = dpopNonce 337 337 let userinfoDpopProof = await createDpopProof("GET", userinfoUrl, accessToken, currentNonce ?? undefined)
+42
web/src/lib/config-context.tsx
··· 1 + "use client" 2 + 3 + import { createContext, useContext, useEffect, useState } from "react" 4 + import { setAipUrl } from "./api" 5 + 6 + interface ConfigContextType { 7 + aip_url: string 8 + } 9 + 10 + const ConfigContext = createContext<ConfigContextType>({ aip_url: "" }) 11 + 12 + export function ConfigProvider({ children }: { children: React.ReactNode }) { 13 + const [config, setConfig] = useState<ConfigContextType | null>(null) 14 + const [error, setError] = useState<string | null>(null) 15 + 16 + useEffect(() => { 17 + fetch("/config") 18 + .then((res) => { 19 + if (!res.ok) throw new Error(`Config fetch failed: ${res.status}`) 20 + return res.json() 21 + }) 22 + .then((data) => { 23 + setAipUrl(data.aip_url) 24 + setConfig({ aip_url: data.aip_url }) 25 + }) 26 + .catch((e) => setError(e.message)) 27 + }, []) 28 + 29 + if (error) { 30 + return <div style={{ padding: "2rem", color: "red" }}>Failed to load config: {error}</div> 31 + } 32 + 33 + if (!config) return null 34 + 35 + return ( 36 + <ConfigContext.Provider value={config}>{children}</ConfigContext.Provider> 37 + ) 38 + } 39 + 40 + export function useConfig() { 41 + return useContext(ConfigContext) 42 + }