An easy-to-host PDS on the ATProtocol, iPhone and MacOS. Maintain control of your keys and data, always.
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(relay): Axum HTTP server skeleton + XRPC routing (MM-71)

- Convert relay binary to async with #[tokio::main]
- Add AppState (Arc<Config>) and fn app() router factory for testability
- Wire GET/POST /xrpc/:method catch-all returning MethodNotImplemented (501)
- Add TraceLayer + permissive CorsLayer middleware
- Graceful shutdown on SIGTERM/SIGINT via tokio::select!
- Add MethodNotImplemented variant to ErrorCode (PascalCase serde rename for ATProto compat)
- Add tower-http 0.5 and tower 0.4 to workspace deps

authored by

Malpercio and committed by
Tangled
21c7560f 3fdec991

+239 -6
+57 -1
Cargo.lock
··· 112 112 "serde_urlencoded", 113 113 "sync_wrapper", 114 114 "tokio", 115 - "tower", 115 + "tower 0.5.3", 116 116 "tower-layer", 117 117 "tower-service", 118 118 "tracing", ··· 564 564 checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" 565 565 566 566 [[package]] 567 + name = "pin-project" 568 + version = "1.1.11" 569 + source = "registry+https://github.com/rust-lang/crates.io-index" 570 + checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" 571 + dependencies = [ 572 + "pin-project-internal", 573 + ] 574 + 575 + [[package]] 576 + name = "pin-project-internal" 577 + version = "1.1.11" 578 + source = "registry+https://github.com/rust-lang/crates.io-index" 579 + checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" 580 + dependencies = [ 581 + "proc-macro2", 582 + "quote", 583 + "syn", 584 + ] 585 + 586 + [[package]] 567 587 name = "pin-project-lite" 568 588 version = "0.2.17" 569 589 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 643 663 "axum", 644 664 "clap", 645 665 "common", 666 + "serde_json", 667 + "tokio", 668 + "tower 0.4.13", 669 + "tower-http", 646 670 "tracing", 647 671 "tracing-subscriber", 648 672 ] ··· 940 964 941 965 [[package]] 942 966 name = "tower" 967 + version = "0.4.13" 968 + source = "registry+https://github.com/rust-lang/crates.io-index" 969 + checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" 970 + dependencies = [ 971 + "futures-core", 972 + "futures-util", 973 + "pin-project", 974 + "pin-project-lite", 975 + "tower-layer", 976 + "tower-service", 977 + "tracing", 978 + ] 979 + 980 + [[package]] 981 + name = "tower" 943 982 version = "0.5.3" 944 983 source = "registry+https://github.com/rust-lang/crates.io-index" 945 984 checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" ··· 949 988 "pin-project-lite", 950 989 "sync_wrapper", 951 990 "tokio", 991 + "tower-layer", 992 + "tower-service", 993 + "tracing", 994 + ] 995 + 996 + [[package]] 997 + name = "tower-http" 998 + version = "0.5.2" 999 + source = "registry+https://github.com/rust-lang/crates.io-index" 1000 + checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" 1001 + dependencies = [ 1002 + "bitflags", 1003 + "bytes", 1004 + "http", 1005 + "http-body", 1006 + "http-body-util", 1007 + "pin-project-lite", 952 1008 "tower-layer", 953 1009 "tower-service", 954 1010 "tracing",
+4
Cargo.toml
··· 37 37 tracing = "0.1" 38 38 tracing-subscriber = { version = "0.3", features = ["env-filter"] } 39 39 40 + # HTTP middleware 41 + tower-http = { version = "0.5", features = ["trace", "cors"] } 42 + tower = { version = "0.4", features = ["util"] } 43 + 40 44 # ATProto (repo-engine) 41 45 # atrium-api = "0.22" 42 46 # atrium-repo = "0.1"
+7
crates/common/src/error.rs
··· 20 20 WeakPassword, 21 21 RateLimited, 22 22 ExportInProgress, 23 + /// Returned for any XRPC NSID that has no registered handler. 24 + /// 25 + /// Serialized as `"MethodNotImplemented"` (PascalCase) to match the AT Protocol XRPC 26 + /// error format, which uses PascalCase error names rather than SCREAMING_SNAKE_CASE. 27 + #[serde(rename = "MethodNotImplemented")] 28 + MethodNotImplemented, 23 29 // TODO: add remaining codes from Appendix A as endpoints are implemented: 24 30 // 400: INVALID_DOCUMENT, INVALID_PROOF, INVALID_ENDPOINT, INVALID_CONFIRMATION 25 31 // 401: INVALID_CREDENTIALS ··· 44 50 ErrorCode::WeakPassword => 422, 45 51 ErrorCode::RateLimited => 429, 46 52 ErrorCode::ExportInProgress => 503, 53 + ErrorCode::MethodNotImplemented => 501, 47 54 } 48 55 } 49 56 }
+6
crates/relay/Cargo.toml
··· 17 17 anyhow = { workspace = true } 18 18 tracing = { workspace = true } 19 19 tracing-subscriber = { workspace = true } 20 + tokio = { workspace = true } 21 + tower-http = { workspace = true } 22 + 23 + [dev-dependencies] 24 + tower = { workspace = true } 25 + serde_json = { workspace = true }
+117
crates/relay/src/app.rs
··· 1 + // pattern: Functional Core (router construction is pure — no I/O) 2 + 3 + use std::sync::Arc; 4 + 5 + use axum::{extract::Path, routing::get, Router}; 6 + use common::{ApiError, Config, ErrorCode}; 7 + use tower_http::{cors::CorsLayer, trace::TraceLayer}; 8 + 9 + /// Shared application state cloned into every request handler via Axum's `State` extractor. 10 + /// 11 + /// Fields will grow as waves are implemented (MM-72 adds the DB pool, etc.). 12 + #[derive(Clone)] 13 + pub struct AppState { 14 + // Read by handlers from MM-73 onward; suppressed until then. 15 + #[allow(dead_code)] 16 + pub config: Arc<Config>, 17 + } 18 + 19 + /// Build the Axum router with middleware and routes. 20 + /// 21 + /// Keeping router construction separate from `main` makes it testable without a real TCP 22 + /// listener — callers can use `tower::ServiceExt::oneshot` to drive requests in tests. 23 + pub fn app(state: AppState) -> Router { 24 + Router::new() 25 + .route("/xrpc/:method", get(xrpc_handler).post(xrpc_handler)) 26 + .layer(CorsLayer::permissive()) 27 + .layer(TraceLayer::new_for_http()) 28 + .with_state(state) 29 + } 30 + 31 + /// Catch-all XRPC handler — returns `MethodNotImplemented` for any unrecognised NSID. 32 + /// 33 + /// Real XRPC endpoints (MM-73+) will register specific routes that shadow this catch-all 34 + /// for their own NSIDs. 35 + async fn xrpc_handler(Path(method): Path<String>) -> ApiError { 36 + ApiError::new( 37 + ErrorCode::MethodNotImplemented, 38 + format!("XRPC method {method:?} is not implemented"), 39 + ) 40 + } 41 + 42 + #[cfg(test)] 43 + mod tests { 44 + use super::*; 45 + use axum::{ 46 + body::Body, 47 + http::{Request, StatusCode}, 48 + }; 49 + use common::{BlobsConfig, IrohConfig, OAuthConfig}; 50 + use std::path::PathBuf; 51 + use tower::ServiceExt; 52 + 53 + fn test_state() -> AppState { 54 + AppState { 55 + config: Arc::new(Config { 56 + bind_address: "127.0.0.1".to_string(), 57 + port: 8080, 58 + data_dir: PathBuf::from("/tmp"), 59 + database_url: "/tmp/test.db".to_string(), 60 + public_url: "https://test.example.com".to_string(), 61 + blobs: BlobsConfig::default(), 62 + oauth: OAuthConfig::default(), 63 + iroh: IrohConfig::default(), 64 + }), 65 + } 66 + } 67 + 68 + #[tokio::test] 69 + async fn xrpc_get_unknown_method_returns_501() { 70 + let response = app(test_state()) 71 + .oneshot( 72 + Request::builder() 73 + .uri("/xrpc/com.example.unknownMethod") 74 + .body(Body::empty()) 75 + .unwrap(), 76 + ) 77 + .await 78 + .unwrap(); 79 + 80 + assert_eq!(response.status(), StatusCode::NOT_IMPLEMENTED); 81 + } 82 + 83 + #[tokio::test] 84 + async fn xrpc_post_unknown_method_returns_501() { 85 + let response = app(test_state()) 86 + .oneshot( 87 + Request::builder() 88 + .method("POST") 89 + .uri("/xrpc/com.example.unknownMethod") 90 + .body(Body::empty()) 91 + .unwrap(), 92 + ) 93 + .await 94 + .unwrap(); 95 + 96 + assert_eq!(response.status(), StatusCode::NOT_IMPLEMENTED); 97 + } 98 + 99 + #[tokio::test] 100 + async fn xrpc_response_body_is_method_not_implemented() { 101 + let response = app(test_state()) 102 + .oneshot( 103 + Request::builder() 104 + .uri("/xrpc/com.atproto.server.createSession") 105 + .body(Body::empty()) 106 + .unwrap(), 107 + ) 108 + .await 109 + .unwrap(); 110 + 111 + let body = axum::body::to_bytes(response.into_body(), 4096) 112 + .await 113 + .unwrap(); 114 + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); 115 + assert_eq!(json["error"]["code"], "MethodNotImplemented"); 116 + } 117 + }
+48 -5
crates/relay/src/main.rs
··· 2 2 3 3 use anyhow::Context; 4 4 use clap::Parser; 5 - use std::path::PathBuf; 5 + use std::{path::PathBuf, sync::Arc}; 6 + 7 + mod app; 6 8 7 9 #[derive(Parser)] 8 10 #[command(name = "relay", about = "ezpds relay server")] ··· 12 14 config: Option<PathBuf>, 13 15 } 14 16 15 - fn main() { 16 - if let Err(err) = run() { 17 + #[tokio::main] 18 + async fn main() { 19 + if let Err(err) = run().await { 17 20 eprintln!("error: {err:#}"); 18 21 std::process::exit(1); 19 22 } 20 23 } 21 24 22 - ///Hello! 23 - fn run() -> anyhow::Result<()> { 25 + async fn run() -> anyhow::Result<()> { 24 26 tracing_subscriber::fmt() 25 27 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) 26 28 .try_init() ··· 39 41 "relay starting" 40 42 ); 41 43 44 + let addr = format!("{}:{}", config.bind_address, config.port); 45 + let state = app::AppState { 46 + config: Arc::new(config), 47 + }; 48 + 49 + let listener = tokio::net::TcpListener::bind(&addr) 50 + .await 51 + .with_context(|| format!("failed to bind to {addr}"))?; 52 + 53 + tracing::info!(address = %addr, "listening"); 54 + 55 + axum::serve(listener, app::app(state)) 56 + .with_graceful_shutdown(shutdown_signal()) 57 + .await 58 + .context("server error")?; 59 + 60 + tracing::info!("relay shut down"); 42 61 Ok(()) 43 62 } 63 + 64 + async fn shutdown_signal() { 65 + let ctrl_c = async { 66 + tokio::signal::ctrl_c() 67 + .await 68 + .expect("failed to install Ctrl+C handler"); 69 + }; 70 + 71 + #[cfg(unix)] 72 + let terminate = async { 73 + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) 74 + .expect("failed to install SIGTERM handler") 75 + .recv() 76 + .await; 77 + }; 78 + 79 + #[cfg(not(unix))] 80 + let terminate = std::future::pending::<()>(); 81 + 82 + tokio::select! { 83 + _ = ctrl_c => {}, 84 + _ = terminate => {}, 85 + } 86 + }