I would like to bind multiple addresses! Linux may do funky stuff with v6es in and of themselves, but just making it specific that one can do a v4 and v6 separately if they want
+145
-34
Diff
round #0
+1
Cargo.lock
+1
Cargo.lock
+1
Cargo.toml
+1
Cargo.toml
+2
-2
docs/configuration.md
+2
-2
docs/configuration.md
···
10
10
| :--- | :--- | :--- |
11
11
| `DATABASE_PATH` | `./hydrant.db` | path to the database folder |
12
12
| `RUST_LOG` | `info` | log filter directives (e.g., `debug`, `hydrant=trace`). [tracing env-filter syntax](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html) |
13
-
| `API_PORT` | `3000` | port for the API server |
13
+
| `API_BIND` | `0.0.0.0:3000,[::]:3000` | comma-separated list of `<ip>:<port>` socket addresses to bind the API server to. literal IPs only (hostnames not resolved). when both an ipv4 and ipv6 entry share the same port, the v6 listener is set to v6-only to avoid bind collision; a lone `[::]:<port>` listens dual-stack |
14
14
| `ENABLE_DEBUG` | `false` | enable debug endpoints |
15
-
| `DEBUG_PORT` | `API_PORT + 1` | port for debug endpoints (if enabled) |
15
+
| `DEBUG_PORT` | first `API_BIND` port + 1 | port for debug endpoints (if enabled) |
16
16
17
17
## indexing mode
18
18
+57
-12
src/api/mod.rs
+57
-12
src/api/mod.rs
···
1
-
use crate::control::Hydrant;
1
+
use crate::control::{ApiBinds, Hydrant};
2
2
use crate::state::AppState;
3
3
use axum::{Router, routing::get};
4
4
use std::{net::SocketAddr, sync::Arc};
5
5
use tower_http::cors::CorsLayer;
6
6
use tower_http::trace::TraceLayer;
7
7
8
+
const LISTEN_BACKLOG: i32 = 1024;
9
+
8
10
#[cfg(feature = "indexer")]
9
11
mod crawler;
10
12
mod db;
···
19
21
mod stream;
20
22
mod xrpc;
21
23
22
-
pub async fn serve(hydrant: Hydrant, port: u16) -> miette::Result<()> {
24
+
pub async fn serve(hydrant: Hydrant, binds: ApiBinds) -> miette::Result<()> {
23
25
let blocks_available = hydrant.state.is_block_storage_enabled();
24
26
let app = Router::new()
25
27
.route(
···
61
63
.layer(TraceLayer::new_for_http())
62
64
.layer(CorsLayer::permissive());
63
65
64
-
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}"))
65
-
.await
66
-
.map_err(|e| miette::miette!("failed to bind to port {port}: {e}"))?;
66
+
let bind_list: Vec<SocketAddr> = binds.iter().collect();
67
+
let listeners: Vec<tokio::net::TcpListener> = bind_list
68
+
.iter()
69
+
.map(|&addr| {
70
+
let v6only = v6only_for(addr, &bind_list);
71
+
let listener = bind_listener(addr, v6only)
72
+
.map_err(|e| miette::miette!("failed to bind to {addr}: {e}"))?;
73
+
tracing::info!("API server listening on {}", listener.local_addr().unwrap());
74
+
Ok::<_, miette::Report>(listener)
75
+
})
76
+
.collect::<Result<_, _>>()?;
67
77
68
-
tracing::info!("API server listening on {}", listener.local_addr().unwrap());
78
+
let services = listeners.into_iter().map(|listener| {
79
+
let app = app.clone();
80
+
async move {
81
+
axum::serve(
82
+
listener,
83
+
app.into_make_service_with_connect_info::<SocketAddr>(),
84
+
)
85
+
.await
86
+
.map_err(|e| miette::miette!("axum server error: {e}"))
87
+
}
88
+
});
69
89
70
-
axum::serve(
71
-
listener,
72
-
app.into_make_service_with_connect_info::<SocketAddr>(),
73
-
)
74
-
.await
75
-
.map_err(|e| miette::miette!("axum server error: {e}"))?;
90
+
futures::future::try_join_all(services).await?;
76
91
77
92
Ok(())
78
93
}
79
94
95
+
fn v6only_for(addr: SocketAddr, all: &[SocketAddr]) -> Option<bool> {
96
+
let SocketAddr::V6(v6) = addr else {
97
+
return None;
98
+
};
99
+
let has_v4_sibling = all
100
+
.iter()
101
+
.any(|a| matches!(a, SocketAddr::V4(v4) if v4.port() == v6.port()));
102
+
Some(has_v4_sibling)
103
+
}
104
+
105
+
fn bind_listener(
106
+
addr: SocketAddr,
107
+
v6only: Option<bool>,
108
+
) -> std::io::Result<tokio::net::TcpListener> {
109
+
let domain = match addr {
110
+
SocketAddr::V4(_) => socket2::Domain::IPV4,
111
+
SocketAddr::V6(_) => socket2::Domain::IPV6,
112
+
};
113
+
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
114
+
if let Some(only) = v6only {
115
+
socket.set_only_v6(only)?;
116
+
}
117
+
socket.set_reuse_address(true)?;
118
+
socket.set_nonblocking(true)?;
119
+
socket.bind(&addr.into())?;
120
+
socket.listen(LISTEN_BACKLOG)?;
121
+
let std_listener: std::net::TcpListener = socket.into();
122
+
tokio::net::TcpListener::from_std(std_listener)
123
+
}
124
+
80
125
pub async fn serve_debug(state: Arc<AppState>, port: u16) -> miette::Result<()> {
81
126
let app = debug::router()
82
127
.with_state(state)
+35
-6
src/control/mod.rs
+35
-6
src/control/mod.rs
···
57
57
use stream::relay_stream_thread;
58
58
use url::Url;
59
59
60
+
#[derive(Debug, Clone)]
61
+
pub struct ApiBinds {
62
+
head: std::net::SocketAddr,
63
+
tail: Vec<std::net::SocketAddr>,
64
+
}
65
+
66
+
impl ApiBinds {
67
+
pub fn new(addr: std::net::SocketAddr) -> Self {
68
+
Self {
69
+
head: addr,
70
+
tail: Vec::new(),
71
+
}
72
+
}
73
+
74
+
pub fn try_from_iter<I: IntoIterator<Item = std::net::SocketAddr>>(iter: I) -> Option<Self> {
75
+
let mut iter = iter.into_iter();
76
+
let head = iter.next()?;
77
+
Some(Self {
78
+
head,
79
+
tail: iter.collect(),
80
+
})
81
+
}
82
+
83
+
pub fn iter(&self) -> impl Iterator<Item = std::net::SocketAddr> + '_ {
84
+
std::iter::once(self.head).chain(self.tail.iter().copied())
85
+
}
86
+
}
87
+
60
88
#[derive(Debug, Clone)]
61
89
/// infromation about a host hydrant is consuming from.
62
90
pub struct Host {
···
90
118
/// # example
91
119
///
92
120
/// ```rust,no_run
93
-
/// use hydrant::control::Hydrant;
121
+
/// use hydrant::control::{ApiBinds, Hydrant};
94
122
///
95
123
/// #[tokio::main]
96
124
/// async fn main() -> miette::Result<()> {
97
125
/// let hydrant = Hydrant::from_env().await?;
126
+
/// let binds = ApiBinds::new("0.0.0.0:3000".parse().unwrap());
98
127
///
99
128
/// tokio::select! {
100
-
/// r = hydrant.run()? => r,
101
-
/// r = hydrant.serve(3000) => r,
129
+
/// r = hydrant.run()? => r,
130
+
/// r = hydrant.serve(binds) => r,
102
131
/// }
103
132
/// }
104
133
/// ```
···
806
835
Ok(StatsResponse { counts, sizes })
807
836
}
808
837
809
-
/// returns a future that runs the HTTP management API server on `0.0.0.0:{port}`.
838
+
/// returns a future that runs the HTTP management API server on the given bind addresses.
810
839
///
811
840
/// the server exposes all management endpoints (`/filter`, `/repos`, `/ingestion`,
812
841
/// `/stream`, `/stats`, `/db/*`, `/xrpc/*`). it runs indefinitely and resolves
···
816
845
/// of `self` is deferred until the future is first polled.
817
846
///
818
847
/// to disable the HTTP API entirely, simply don't call this method.
819
-
pub fn serve(&self, port: u16) -> impl Future<Output = Result<()>> {
848
+
pub fn serve(&self, binds: ApiBinds) -> impl Future<Output = Result<()>> {
820
849
let hydrant = self.clone();
821
-
async move { crate::api::serve(hydrant, port).await }
850
+
async move { crate::api::serve(hydrant, binds).await }
822
851
}
823
852
824
853
/// returns a future that runs the debug HTTP API server on `127.0.0.1:{port}`.
+46
-11
src/main.rs
+46
-11
src/main.rs
···
1
1
use futures::FutureExt;
2
2
use hydrant::config::Config;
3
-
use hydrant::control::Hydrant;
3
+
use hydrant::control::{ApiBinds, Hydrant};
4
4
use mimalloc::MiMalloc;
5
+
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6
+
7
+
const DEFAULT_API_PORT: u16 = 3000;
8
+
const DEFAULT_DEBUG_PORT: u16 = DEFAULT_API_PORT + 1;
9
+
10
+
fn default_api_binds() -> ApiBinds {
11
+
ApiBinds::try_from_iter([
12
+
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), DEFAULT_API_PORT),
13
+
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), DEFAULT_API_PORT),
14
+
])
15
+
.expect("two-element literal is non-empty")
16
+
}
5
17
6
18
struct AppConfig {
7
-
api_port: u16,
19
+
api_binds: ApiBinds,
8
20
enable_debug: bool,
9
21
debug_port: u16,
10
22
}
11
23
12
24
impl AppConfig {
13
-
fn from_env() -> Self {
25
+
fn from_env() -> miette::Result<Self> {
14
26
use hydrant::__cfg as cfg;
15
-
let api_port = cfg!("API_PORT", 3000u16);
27
+
let api_binds = parse_api_binds()?;
16
28
let enable_debug = cfg!("ENABLE_DEBUG", false);
17
-
let debug_port: u16 = api_port + 1;
18
-
let debug_port = cfg!("DEBUG_PORT", debug_port);
19
-
Self {
20
-
api_port,
29
+
let debug_port_default = api_binds
30
+
.iter()
31
+
.next()
32
+
.expect("ApiBinds is non-empty by construction")
33
+
.port()
34
+
.checked_add(1)
35
+
.unwrap_or(DEFAULT_DEBUG_PORT);
36
+
let debug_port = cfg!("DEBUG_PORT", debug_port_default);
37
+
Ok(Self {
38
+
api_binds,
21
39
enable_debug,
22
40
debug_port,
23
-
}
41
+
})
24
42
}
25
43
}
26
44
45
+
fn parse_api_binds() -> miette::Result<ApiBinds> {
46
+
let Ok(raw) = std::env::var("HYDRANT_API_BIND") else {
47
+
return Ok(default_api_binds());
48
+
};
49
+
let parsed = raw
50
+
.split(',')
51
+
.map(str::trim)
52
+
.filter(|s| !s.is_empty())
53
+
.map(|s| {
54
+
s.parse::<SocketAddr>()
55
+
.map_err(|e| miette::miette!("invalid HYDRANT_API_BIND entry `{s}`: {e}"))
56
+
})
57
+
.collect::<miette::Result<Vec<_>>>()?;
58
+
ApiBinds::try_from_iter(parsed)
59
+
.ok_or_else(|| miette::miette!("HYDRANT_API_BIND is set but contains no addresses"))
60
+
}
61
+
27
62
#[global_allocator]
28
63
static GLOBAL: MiMalloc = MiMalloc;
29
64
···
34
69
.ok();
35
70
36
71
let cfg = Config::from_env()?;
37
-
let app = AppConfig::from_env();
72
+
let app = AppConfig::from_env()?;
38
73
39
74
let env_filter = tracing_subscriber::EnvFilter::builder()
40
75
.with_default_directive(tracing::Level::INFO.into())
···
50
85
51
86
tokio::select! {
52
87
r = hydrant.run()? => r,
53
-
r = hydrant.serve(app.api_port) => r,
88
+
r = hydrant.serve(app.api_binds) => r,
54
89
r = debug_fut => r,
55
90
}
56
91
}
+1
-1
tests/common.nu
+1
-1
tests/common.nu
···
141
141
let env_vars = {
142
142
HYDRANT_DATABASE_PATH: ($db_path),
143
143
HYDRANT_FULL_NETWORK: "false",
144
-
HYDRANT_API_PORT: ($port | into string),
144
+
HYDRANT_API_BIND: $"127.0.0.1:($port)",
145
145
HYDRANT_ENABLE_DEBUG: "true",
146
146
HYDRANT_DEBUG_PORT: (resolve-test-debug-port ($port + 1) | into string),
147
147
HYDRANT_PLC_URL: "https://plc.klbr.net",
+1
-1
tests/throttling.nu
+1
-1
tests/throttling.nu
···
40
40
HYDRANT_RELAY_HOST: ($mock_url),
41
41
HYDRANT_DISABLE_FIREHOSE: "true",
42
42
HYDRANT_DISABLE_BACKFILL: "true", # disable backfill so pending count stays up
43
-
HYDRANT_API_PORT: ($port | into string),
43
+
HYDRANT_API_BIND: $"127.0.0.1:($port)",
44
44
HYDRANT_LOG_LEVEL: "debug",
45
45
RUST_LOG: "debug",
46
46
HYDRANT_CRAWLER_MAX_PENDING_REPOS: "2",
+1
-1
tests/verify_crawler.nu
+1
-1
tests/verify_crawler.nu
···
45
45
HYDRANT_RELAY_HOST: ($mock_url),
46
46
HYDRANT_DISABLE_FIREHOSE: "true",
47
47
HYDRANT_DISABLE_BACKFILL: "true",
48
-
HYDRANT_API_PORT: ($port | into string),
48
+
HYDRANT_API_BIND: $"127.0.0.1:($port)",
49
49
HYDRANT_ENABLE_DEBUG: "true", # for stats checking
50
50
HYDRANT_DEBUG_PORT: (resolve-test-debug-port ($port + 1) | into string),
51
51
HYDRANT_LOG_LEVEL: "debug",
History
1 round
0 comments
oyster.cafe
submitted
#0
patch application failed: error: patch failed: Cargo.lock:1533
error: Cargo.lock: patch does not apply
error: patch failed: Cargo.toml:14
error: Cargo.toml: patch does not apply
error: patch failed: docs/configuration.md:10
error: docs/configuration.md: patch does not apply
error: patch failed: src/api/mod.rs:1
error: src/api/mod.rs: patch does not apply
error: patch failed: src/control/mod.rs:57
error: src/control/mod.rs: patch does not apply
error: patch failed: src/main.rs:1
error: src/main.rs: patch does not apply
error: patch failed: tests/common.nu:141
error: tests/common.nu: patch does not apply
error: tests/throttling.nu: No such file or directory
error: tests/verify_crawler.nu: No such file or directory