don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(knot): simplify knot initialisation

Signed-off-by: tjh <x@tjh.dev>

tjh 42ab59c6 accf4b33

+67 -72
+25
crates/knot/src/lib.rs
··· 1 + use std::io; 2 + 3 + use axum::Router; 4 + use tokio::{net::TcpListener, task::JoinSet}; 5 + 1 6 pub mod model; 2 7 pub mod private; 3 8 pub mod public; ··· 13 8 pub const BASE32_SORTABLE: data_encoding::Encoding = data_encoding_macro::new_encoding! { 14 9 symbols: "234567abcdefghijklmnopqrstuvwxyz", 15 10 }; 11 + 12 + pub async fn serve_all( 13 + router: Router, 14 + listeners: impl IntoIterator<Item = TcpListener>, 15 + ) -> io::Result<()> { 16 + let mut service = JoinSet::new(); 17 + for listener in listeners { 18 + let router = router.clone(); 19 + let addr = listener.local_addr()?; 20 + tracing::info!(?addr, "listening on socket"); 21 + 22 + service.spawn(async move { axum::serve(listener, router).await }); 23 + } 24 + 25 + for task in service.join_all().await { 26 + task?; 27 + } 28 + 29 + Ok(()) 30 + }
+8 -48
crates/knot/src/main.rs
··· 2 2 mod hooks; 3 3 4 4 use anyhow::Context; 5 - use axum::{ 6 - Router, 7 - http::{HeaderName, Request, Response}, 8 - }; 5 + use axum::http::{HeaderName, Request, Response}; 9 6 use futures_util::StreamExt; 10 7 use identity::Resolver; 11 8 use jetstream::client_config::JetstreamConfig; ··· 15 18 use std::{ 16 19 env, 17 20 ffi::OsString, 18 - net::{SocketAddr, ToSocketAddrs}, 21 + net::ToSocketAddrs, 19 22 path::PathBuf, 20 23 process, 21 24 sync::{Arc, atomic::AtomicU64}, ··· 189 192 .await 190 193 .context("Failed to resolve owner handle")?; 191 194 192 - let router = Router::new() 193 - .without_v07_checks() 194 - .merge(knot::public::router()); 195 - 196 195 let jetstream_config = { 197 196 let cursor = db 198 197 .get_jetstream_cursor() ··· 219 226 }; 220 227 221 228 let mut service = JoinSet::new(); 222 - let mut private_sockets = Vec::new(); 223 - for socket in "localhost:0".to_socket_addrs()? { 224 - let bound_socket = TcpListener::bind(socket).await?; 225 - private_sockets.push(bound_socket); 226 - } 227 - 228 - let private_addrs: Vec<_> = private_sockets 229 - .iter() 230 - .map(|listener| listener.local_addr().unwrap()) 231 - .collect(); 232 - 233 - tracing::info!(?private_addrs, "bound internal API"); 234 229 235 230 let config = KnotConfiguration::builder() 236 231 .instance_name(&arguments.name) ··· 226 245 .repo_path(&arguments.repos) 227 246 .hook_path(&hooks_path) 228 247 .git_config_path(&git_config_path) 229 - .private_sockets(&private_addrs) 230 248 .build()?; 231 249 232 250 let (knot_state, knot_tasks) = 233 - KnotState::new(config, resolver, public_http, jetstream_config, db); 251 + KnotState::new(config, resolver, public_http, jetstream_config, db)?; 234 252 let knot: Knot = knot_state.into(); 235 253 236 254 let owner = resolved_owner.as_str(); ··· 249 269 Err(error) => tracing::error!(?error), 250 270 } 251 271 252 - let router = router 272 + let router = knot::public::router() 253 273 .layer(SetRequestIdLayer::new( 254 274 X_REQUEST_ID, 255 275 IncrementRequestId::default(), ··· 277 297 ) 278 298 .with_state(knot.clone()); 279 299 280 - let internal = knot::private::router().with_state(knot); 281 - for socket in private_sockets { 282 - let router = internal.clone(); 283 - service.spawn(async move { axum::serve(socket, router).await }); 284 - } 285 - 286 - let mut sockets = Vec::with_capacity(arguments.addr.len()); 300 + let mut public_listeners = Vec::with_capacity(arguments.addr.len()); 287 301 for addr in &arguments.addr { 288 302 for socket in addr.to_socket_addrs()? { 289 - sockets.push(socket); 303 + let listener = TcpListener::bind(socket).await?; 304 + public_listeners.push(listener); 290 305 } 291 306 } 292 307 293 - for socket in sockets { 294 - serve(&mut service, socket, router.clone()).await; 295 - } 308 + service.spawn(knot::serve_all(router, public_listeners)); 296 309 297 310 service.spawn(async move { 298 311 tracing::debug!("starting knot tasks"); ··· 302 329 } 303 330 304 331 Ok(()) 305 - } 306 - 307 - async fn serve(set: &mut JoinSet<std::io::Result<()>>, socket: SocketAddr, router: Router) { 308 - let listener = TcpListener::bind(socket) 309 - .await 310 - .expect("Failed to bind socket"); 311 - 312 - let addr = listener 313 - .local_addr() 314 - .expect("Failed to acquire local socket address"); 315 - 316 - tracing::info!(?addr, "listening on socket"); 317 - set.spawn(async move { axum::serve(listener, router).await }); 318 332 }
-21
crates/knot/src/model/config.rs
··· 3 3 use gix::bstr::BString; 4 4 use std::{ 5 5 collections::HashSet, 6 - net, 7 6 path::{Path, PathBuf}, 8 7 }; 9 8 ··· 32 33 repo_path: PathBuf, 33 34 hook_path: PathBuf, 34 35 git_config: PathBuf, 35 - private_sockets: String, 36 36 readmes: HashSet<BString>, 37 37 } 38 38 ··· 74 76 self.git_config.as_path() 75 77 } 76 78 77 - #[inline] 78 - pub fn private_endpoints(&self) -> &str { 79 - &self.private_sockets 80 - } 81 - 82 79 pub fn readmes(&self) -> &HashSet<BString> { 83 80 &self.readmes 84 81 } ··· 86 93 repo_path: Option<&'a Path>, 87 94 hook_path: Option<&'a Path>, 88 95 git_config: Option<&'a Path>, 89 - private_addrs: Vec<net::SocketAddr>, 90 96 } 91 97 92 98 impl<'a> KnotConfigurationBuilder<'a> { ··· 114 122 self 115 123 } 116 124 117 - pub fn private_sockets(&mut self, addrs: &[net::SocketAddr]) -> &mut Self { 118 - self.private_addrs.extend_from_slice(addrs); 119 - self 120 - } 121 - 122 125 pub fn build(&self) -> Result<KnotConfiguration, Error> { 123 126 let instance_name = self.instance_name.ok_or(Error::InstanceName)?.into(); 124 127 let owner_did = self.owner_did.ok_or(Error::OwnerDid)?.into(); ··· 130 143 ))); 131 144 } 132 145 133 - let private_sockets = self 134 - .private_addrs 135 - .iter() 136 - .map(|addr| format!("http://{addr}/")) 137 - .collect::<Vec<_>>() 138 - .join(" "); 139 - 140 146 let instance_audience = format!("did:web:{instance_name}").try_into()?; 141 147 Ok(KnotConfiguration { 142 148 instance_name, ··· 138 158 repo_path, 139 159 hook_path, 140 160 git_config, 141 - private_sockets, 142 161 readmes: DEFAULT_READMES 143 162 .iter() 144 163 .map(|v| BString::new(v.to_vec()))
+34 -3
crates/knot/src/model/knot_state.rs
··· 1 1 use std::{ 2 2 collections::HashMap, 3 + io, 4 + net::{TcpListener, ToSocketAddrs as _}, 3 5 ops, 4 6 sync::{Arc, Mutex, MutexGuard, RwLock}, 5 7 time::Duration, ··· 80 78 81 79 push_seed: Mutex<HashMap<RepositoryKey, Box<str>>>, 82 80 81 + private_addrs: String, 82 + 83 83 #[cfg(feature = "repository-cache")] 84 84 repo_handle_cache: Mutex<HashMap<RepositoryKey, gix::ThreadSafeRepository>>, 85 85 } ··· 93 89 public_http: HttpClient, 94 90 jetstream_config: JetstreamConfig, 95 91 database: DataStore, 96 - ) -> (Arc<Self>, JoinSet<()>) { 92 + ) -> io::Result<(Arc<Self>, JoinSet<io::Result<()>>)> { 97 93 let pool = ThreadPoolBuilder::new() 98 94 .build() 99 95 .expect("Failed to build thread pool"); ··· 101 97 let (events, _) = tokio::sync::broadcast::channel(16); 102 98 103 99 let (jetstream, jetstream_rx, jetstream_task) = jetstream_config.connect(); 100 + 101 + // Setup the private API. 102 + let private_listeners = "localhost:0" 103 + .to_socket_addrs()? 104 + .map(|socket| { 105 + let bound = TcpListener::bind(socket)?; 106 + bound.set_nonblocking(true)?; 107 + Ok(tokio::net::TcpListener::from_std(bound)?) 108 + }) 109 + .collect::<Result<Vec<_>, io::Error>>()?; 110 + 111 + let private_addrs = private_listeners 112 + .iter() 113 + .map(|listener| Ok(format!("http://{}/", listener.local_addr()?))) 114 + .collect::<Result<Vec<_>, io::Error>>()? 115 + .join(" "); 116 + 117 + tracing::info!(?private_addrs, "bound internal API"); 104 118 105 119 let inner = Arc::new(Self { 106 120 config, ··· 131 109 jwt_claims: Default::default(), 132 110 repo_cache: Default::default(), 133 111 push_seed: Default::default(), 112 + private_addrs, 134 113 #[cfg(feature = "repository-cache")] 135 114 repo_handle_cache: Default::default(), 136 115 }); ··· 147 124 ); 148 125 149 126 panic!("jetstream consumer/task completed"); 150 - }) 127 + }); 128 + let internal = crate::private::router().with_state(Arc::clone(&inner).into()); 129 + tasks.spawn(crate::serve_all(internal, private_listeners)) 151 130 }; 152 131 153 132 let state = Arc::clone(&inner); ··· 173 148 n => tracing::debug!("evicted {n} expired jti claims"), 174 149 } 175 150 } 151 + 152 + Ok(()) 176 153 }); 177 154 178 - (inner, tasks) 155 + Ok((inner, tasks)) 179 156 } 180 157 181 158 /// Return a reference to the identity resolver. ··· 356 329 .await?; 357 330 358 331 Ok(response) 332 + } 333 + 334 + pub fn private_endpoints(&self) -> &str { 335 + &self.private_addrs 359 336 } 360 337 361 338 pub async fn resolve_repo_path(