very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

Binding multiple listen addresses #1

open opened by oyster.cafe targeting main

I would like to bind multiple addresses! Linux may do funky stuff with v6es in and of themselves, but just making it specific that one can do a v4 and v6 separately if they want

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3ml4dgt54he22
+145 -34
Diff #0
+1
Cargo.lock
··· 1533 1533 "serde_urlencoded", 1534 1534 "sha2", 1535 1535 "smol_str", 1536 + "socket2", 1536 1537 "tempfile", 1537 1538 "thiserror 2.0.18", 1538 1539 "tokio",
+1
Cargo.toml
··· 14 14 [dependencies] 15 15 tokio = { version = "1.0", features = ["full"] } 16 16 tokio-util = { version = "0.7", features = ["io"] } 17 + socket2 = "0.6" 17 18 18 19 tracing = "0.1" 19 20 tracing-subscriber = { version = "0.3", features = ["env-filter"] }
+2 -2
docs/configuration.md
··· 10 10 | :--- | :--- | :--- | 11 11 | `DATABASE_PATH` | `./hydrant.db` | path to the database folder | 12 12 | `RUST_LOG` | `info` | log filter directives (e.g., `debug`, `hydrant=trace`). [tracing env-filter syntax](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html) | 13 - | `API_PORT` | `3000` | port for the API server | 13 + | `API_BIND` | `0.0.0.0:3000,[::]:3000` | comma-separated list of `<ip>:<port>` socket addresses to bind the API server to. literal IPs only (hostnames not resolved). when both an ipv4 and ipv6 entry share the same port, the v6 listener is set to v6-only to avoid bind collision; a lone `[::]:<port>` listens dual-stack | 14 14 | `ENABLE_DEBUG` | `false` | enable debug endpoints | 15 - | `DEBUG_PORT` | `API_PORT + 1` | port for debug endpoints (if enabled) | 15 + | `DEBUG_PORT` | first `API_BIND` port + 1 | port for debug endpoints (if enabled) | 16 16 17 17 ## indexing mode 18 18
+57 -12
src/api/mod.rs
··· 1 - use crate::control::Hydrant; 1 + use crate::control::{ApiBinds, Hydrant}; 2 2 use crate::state::AppState; 3 3 use axum::{Router, routing::get}; 4 4 use std::{net::SocketAddr, sync::Arc}; 5 5 use tower_http::cors::CorsLayer; 6 6 use tower_http::trace::TraceLayer; 7 7 8 + const LISTEN_BACKLOG: i32 = 1024; 9 + 8 10 #[cfg(feature = "indexer")] 9 11 mod crawler; 10 12 mod db; ··· 19 21 mod stream; 20 22 mod xrpc; 21 23 22 - pub async fn serve(hydrant: Hydrant, port: u16) -> miette::Result<()> { 24 + pub async fn serve(hydrant: Hydrant, binds: ApiBinds) -> miette::Result<()> { 23 25 let blocks_available = hydrant.state.is_block_storage_enabled(); 24 26 let app = Router::new() 25 27 .route( ··· 61 63 .layer(TraceLayer::new_for_http()) 62 64 .layer(CorsLayer::permissive()); 63 65 64 - let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}")) 65 - .await 66 - .map_err(|e| miette::miette!("failed to bind to port {port}: {e}"))?; 66 + let bind_list: Vec<SocketAddr> = binds.iter().collect(); 67 + let listeners: Vec<tokio::net::TcpListener> = bind_list 68 + .iter() 69 + .map(|&addr| { 70 + let v6only = v6only_for(addr, &bind_list); 71 + let listener = bind_listener(addr, v6only) 72 + .map_err(|e| miette::miette!("failed to bind to {addr}: {e}"))?; 73 + tracing::info!("API server listening on {}", listener.local_addr().unwrap()); 74 + Ok::<_, miette::Report>(listener) 75 + }) 76 + .collect::<Result<_, _>>()?; 67 77 68 - tracing::info!("API server listening on {}", listener.local_addr().unwrap()); 78 + let services = listeners.into_iter().map(|listener| { 79 + let app = app.clone(); 80 + async move { 81 + axum::serve( 82 + listener, 83 + app.into_make_service_with_connect_info::<SocketAddr>(), 84 + ) 85 + .await 86 + .map_err(|e| miette::miette!("axum server error: {e}")) 87 + } 88 + }); 69 89 70 - axum::serve( 71 - listener, 72 - app.into_make_service_with_connect_info::<SocketAddr>(), 73 - ) 74 - .await 75 - .map_err(|e| miette::miette!("axum server error: {e}"))?; 90 + futures::future::try_join_all(services).await?; 76 91 77 92 Ok(()) 78 93 } 79 94 95 + fn v6only_for(addr: SocketAddr, all: &[SocketAddr]) -> Option<bool> { 96 + let SocketAddr::V6(v6) = addr else { 97 + return None; 98 + }; 99 + let has_v4_sibling = all 100 + .iter() 101 + .any(|a| matches!(a, SocketAddr::V4(v4) if v4.port() == v6.port())); 102 + Some(has_v4_sibling) 103 + } 104 + 105 + fn bind_listener( 106 + addr: SocketAddr, 107 + v6only: Option<bool>, 108 + ) -> std::io::Result<tokio::net::TcpListener> { 109 + let domain = match addr { 110 + SocketAddr::V4(_) => socket2::Domain::IPV4, 111 + SocketAddr::V6(_) => socket2::Domain::IPV6, 112 + }; 113 + let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; 114 + if let Some(only) = v6only { 115 + socket.set_only_v6(only)?; 116 + } 117 + socket.set_reuse_address(true)?; 118 + socket.set_nonblocking(true)?; 119 + socket.bind(&addr.into())?; 120 + socket.listen(LISTEN_BACKLOG)?; 121 + let std_listener: std::net::TcpListener = socket.into(); 122 + tokio::net::TcpListener::from_std(std_listener) 123 + } 124 + 80 125 pub async fn serve_debug(state: Arc<AppState>, port: u16) -> miette::Result<()> { 81 126 let app = debug::router() 82 127 .with_state(state)
+35 -6
src/control/mod.rs
··· 57 57 use stream::relay_stream_thread; 58 58 use url::Url; 59 59 60 + #[derive(Debug, Clone)] 61 + pub struct ApiBinds { 62 + head: std::net::SocketAddr, 63 + tail: Vec<std::net::SocketAddr>, 64 + } 65 + 66 + impl ApiBinds { 67 + pub fn new(addr: std::net::SocketAddr) -> Self { 68 + Self { 69 + head: addr, 70 + tail: Vec::new(), 71 + } 72 + } 73 + 74 + pub fn try_from_iter<I: IntoIterator<Item = std::net::SocketAddr>>(iter: I) -> Option<Self> { 75 + let mut iter = iter.into_iter(); 76 + let head = iter.next()?; 77 + Some(Self { 78 + head, 79 + tail: iter.collect(), 80 + }) 81 + } 82 + 83 + pub fn iter(&self) -> impl Iterator<Item = std::net::SocketAddr> + '_ { 84 + std::iter::once(self.head).chain(self.tail.iter().copied()) 85 + } 86 + } 87 + 60 88 #[derive(Debug, Clone)] 61 89 /// infromation about a host hydrant is consuming from. 62 90 pub struct Host { ··· 90 118 /// # example 91 119 /// 92 120 /// ```rust,no_run 93 - /// use hydrant::control::Hydrant; 121 + /// use hydrant::control::{ApiBinds, Hydrant}; 94 122 /// 95 123 /// #[tokio::main] 96 124 /// async fn main() -> miette::Result<()> { 97 125 /// let hydrant = Hydrant::from_env().await?; 126 + /// let binds = ApiBinds::new("0.0.0.0:3000".parse().unwrap()); 98 127 /// 99 128 /// tokio::select! { 100 - /// r = hydrant.run()? => r, 101 - /// r = hydrant.serve(3000) => r, 129 + /// r = hydrant.run()? => r, 130 + /// r = hydrant.serve(binds) => r, 102 131 /// } 103 132 /// } 104 133 /// ``` ··· 806 835 Ok(StatsResponse { counts, sizes }) 807 836 } 808 837 809 - /// returns a future that runs the HTTP management API server on `0.0.0.0:{port}`. 838 + /// returns a future that runs the HTTP management API server on the given bind addresses. 810 839 /// 811 840 /// the server exposes all management endpoints (`/filter`, `/repos`, `/ingestion`, 812 841 /// `/stream`, `/stats`, `/db/*`, `/xrpc/*`). it runs indefinitely and resolves ··· 816 845 /// of `self` is deferred until the future is first polled. 817 846 /// 818 847 /// to disable the HTTP API entirely, simply don't call this method. 819 - pub fn serve(&self, port: u16) -> impl Future<Output = Result<()>> { 848 + pub fn serve(&self, binds: ApiBinds) -> impl Future<Output = Result<()>> { 820 849 let hydrant = self.clone(); 821 - async move { crate::api::serve(hydrant, port).await } 850 + async move { crate::api::serve(hydrant, binds).await } 822 851 } 823 852 824 853 /// returns a future that runs the debug HTTP API server on `127.0.0.1:{port}`.
+46 -11
src/main.rs
··· 1 1 use futures::FutureExt; 2 2 use hydrant::config::Config; 3 - use hydrant::control::Hydrant; 3 + use hydrant::control::{ApiBinds, Hydrant}; 4 4 use mimalloc::MiMalloc; 5 + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; 6 + 7 + const DEFAULT_API_PORT: u16 = 3000; 8 + const DEFAULT_DEBUG_PORT: u16 = DEFAULT_API_PORT + 1; 9 + 10 + fn default_api_binds() -> ApiBinds { 11 + ApiBinds::try_from_iter([ 12 + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), DEFAULT_API_PORT), 13 + SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), DEFAULT_API_PORT), 14 + ]) 15 + .expect("two-element literal is non-empty") 16 + } 5 17 6 18 struct AppConfig { 7 - api_port: u16, 19 + api_binds: ApiBinds, 8 20 enable_debug: bool, 9 21 debug_port: u16, 10 22 } 11 23 12 24 impl AppConfig { 13 - fn from_env() -> Self { 25 + fn from_env() -> miette::Result<Self> { 14 26 use hydrant::__cfg as cfg; 15 - let api_port = cfg!("API_PORT", 3000u16); 27 + let api_binds = parse_api_binds()?; 16 28 let enable_debug = cfg!("ENABLE_DEBUG", false); 17 - let debug_port: u16 = api_port + 1; 18 - let debug_port = cfg!("DEBUG_PORT", debug_port); 19 - Self { 20 - api_port, 29 + let debug_port_default = api_binds 30 + .iter() 31 + .next() 32 + .expect("ApiBinds is non-empty by construction") 33 + .port() 34 + .checked_add(1) 35 + .unwrap_or(DEFAULT_DEBUG_PORT); 36 + let debug_port = cfg!("DEBUG_PORT", debug_port_default); 37 + Ok(Self { 38 + api_binds, 21 39 enable_debug, 22 40 debug_port, 23 - } 41 + }) 24 42 } 25 43 } 26 44 45 + fn parse_api_binds() -> miette::Result<ApiBinds> { 46 + let Ok(raw) = std::env::var("HYDRANT_API_BIND") else { 47 + return Ok(default_api_binds()); 48 + }; 49 + let parsed = raw 50 + .split(',') 51 + .map(str::trim) 52 + .filter(|s| !s.is_empty()) 53 + .map(|s| { 54 + s.parse::<SocketAddr>() 55 + .map_err(|e| miette::miette!("invalid HYDRANT_API_BIND entry `{s}`: {e}")) 56 + }) 57 + .collect::<miette::Result<Vec<_>>>()?; 58 + ApiBinds::try_from_iter(parsed) 59 + .ok_or_else(|| miette::miette!("HYDRANT_API_BIND is set but contains no addresses")) 60 + } 61 + 27 62 #[global_allocator] 28 63 static GLOBAL: MiMalloc = MiMalloc; 29 64 ··· 34 69 .ok(); 35 70 36 71 let cfg = Config::from_env()?; 37 - let app = AppConfig::from_env(); 72 + let app = AppConfig::from_env()?; 38 73 39 74 let env_filter = tracing_subscriber::EnvFilter::builder() 40 75 .with_default_directive(tracing::Level::INFO.into()) ··· 50 85 51 86 tokio::select! { 52 87 r = hydrant.run()? => r, 53 - r = hydrant.serve(app.api_port) => r, 88 + r = hydrant.serve(app.api_binds) => r, 54 89 r = debug_fut => r, 55 90 } 56 91 }
+1 -1
tests/common.nu
··· 141 141 let env_vars = { 142 142 HYDRANT_DATABASE_PATH: ($db_path), 143 143 HYDRANT_FULL_NETWORK: "false", 144 - HYDRANT_API_PORT: ($port | into string), 144 + HYDRANT_API_BIND: $"127.0.0.1:($port)", 145 145 HYDRANT_ENABLE_DEBUG: "true", 146 146 HYDRANT_DEBUG_PORT: (resolve-test-debug-port ($port + 1) | into string), 147 147 HYDRANT_PLC_URL: "https://plc.klbr.net",
+1 -1
tests/throttling.nu
··· 40 40 HYDRANT_RELAY_HOST: ($mock_url), 41 41 HYDRANT_DISABLE_FIREHOSE: "true", 42 42 HYDRANT_DISABLE_BACKFILL: "true", # disable backfill so pending count stays up 43 - HYDRANT_API_PORT: ($port | into string), 43 + HYDRANT_API_BIND: $"127.0.0.1:($port)", 44 44 HYDRANT_LOG_LEVEL: "debug", 45 45 RUST_LOG: "debug", 46 46 HYDRANT_CRAWLER_MAX_PENDING_REPOS: "2",
+1 -1
tests/verify_crawler.nu
··· 45 45 HYDRANT_RELAY_HOST: ($mock_url), 46 46 HYDRANT_DISABLE_FIREHOSE: "true", 47 47 HYDRANT_DISABLE_BACKFILL: "true", 48 - HYDRANT_API_PORT: ($port | into string), 48 + HYDRANT_API_BIND: $"127.0.0.1:($port)", 49 49 HYDRANT_ENABLE_DEBUG: "true", # for stats checking 50 50 HYDRANT_DEBUG_PORT: (resolve-test-debug-port ($port + 1) | into string), 51 51 HYDRANT_LOG_LEVEL: "debug",

History

1 round 0 comments
sign up or login to add to the discussion
oyster.cafe submitted #0
patch application failed: error: patch failed: Cargo.lock:1533 error: Cargo.lock: patch does not apply error: patch failed: Cargo.toml:14 error: Cargo.toml: patch does not apply error: patch failed: docs/configuration.md:10 error: docs/configuration.md: patch does not apply error: patch failed: src/api/mod.rs:1 error: src/api/mod.rs: patch does not apply error: patch failed: src/control/mod.rs:57 error: src/control/mod.rs: patch does not apply error: patch failed: src/main.rs:1 error: src/main.rs: patch does not apply error: patch failed: tests/common.nu:141 error: tests/common.nu: patch does not apply error: tests/throttling.nu: No such file or directory error: tests/verify_crawler.nu: No such file or directory
expand 0 comments