don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(knot): restructure cli

Signed-off-by: tjh <x@tjh.dev>

tjh cab16d95 a4e76c4b

+744 -666
+26 -261
crates/gordian-knot/src/cli.rs
··· 1 - use clap::{ArgAction, Args, CommandFactory, Parser, Subcommand, ValueEnum, ValueHint}; 2 - use clap_complete::Shell; 3 - use core::fmt; 4 - use gix::bstr::BString; 5 - use gordian_identity::HttpClient; 6 - use gordian_knot::model::config::{DEFAULT_READMES, KnotConfiguration, RepoCacheConfig}; 7 - use gordian_types::OwnedDid; 8 - use std::{env, path::PathBuf, time::Duration}; 9 - use url::Url; 1 + pub mod generate; 2 + pub mod hook; 3 + pub mod serve; 10 4 11 - pub fn parse() -> KnotCommand { 12 - match Arguments::parse().command { 13 - KnotCommand::Generate(arguments) => { 14 - let mut command = Arguments::command(); 15 - let name = command.get_name().to_string(); 16 - clap_complete::generate(arguments.shell, &mut command, name, &mut std::io::stdout()); 17 - std::process::exit(0); 18 - } 19 - KnotCommand::Serve(mut arguments) => { 20 - if let Some("") = arguments.archive_bz2_command.as_deref() { 21 - arguments.archive_bz2_command = None; 22 - } 5 + use clap::{Parser, Subcommand}; 23 6 24 - if let Some("") = arguments.archive_xz_command.as_deref() { 25 - arguments.archive_xz_command = None; 26 - } 7 + pub trait RunCommand { 8 + type Error; 27 9 28 - KnotCommand::Serve(arguments) 29 - } 30 - hook @ KnotCommand::Hook(_) => hook, 31 - } 10 + fn run(self) -> Result<(), Self::Error>; 32 11 } 33 12 34 13 #[derive(Debug, Parser)] 35 14 #[command(about, author, version)] 36 15 pub struct Arguments { 37 16 #[clap(subcommand)] 38 - command: KnotCommand, 17 + pub command: KnotCommand, 39 18 } 40 19 41 20 #[derive(Debug, Subcommand, Clone)] 42 21 pub enum KnotCommand { 43 - Generate(GenerateArguments), 44 - Serve(ServeArguments), 45 - Hook(HookArguments), 22 + Generate(generate::Generate), 23 + Serve(serve::Serve), 24 + Hook(hook::Hook), 46 25 } 47 26 48 - /// Generate shell completions. 49 - #[derive(Clone, Debug, Args)] 50 - pub struct GenerateArguments { 51 - shell: Shell, 52 - } 27 + pub fn parse() -> Arguments { 28 + let mut arguments = Arguments::parse(); 53 29 54 - /// Serve the tangled knot. 55 - #[derive(Clone, Debug, Args)] 56 - pub struct ServeArguments { 57 - /// FQDN of the knot. 58 - #[arg(long, short, value_hint = ValueHint::Hostname, env = "KNOT_NAME")] 59 - #[cfg_attr(debug_assertions, arg(default_value = "localhost:5555"))] 60 - pub name: String, 30 + // Fix some of the arguments to 'serve'. 31 + if let KnotCommand::Serve(arguments) = &mut arguments.command { 32 + let is_empty = |val: &mut String| val.is_empty(); 33 + arguments.archive_bz2_command.take_if(is_empty); 34 + arguments.archive_xz_command.take_if(is_empty); 61 35 62 - /// Handle or DID of the knot owner. 63 - #[arg(long, short, env = "KNOT_OWNER")] 64 - pub owner: OwnedDid, 65 - 66 - /// Base path for repositories. 67 - #[arg(long, short, value_hint = ValueHint::DirPath, env = "KNOT_REPO_BASE")] 68 - #[arg(default_value = default_repository_base().into_os_string())] 69 - pub repos: PathBuf, 70 - 71 - /// Path to knot-level git hooks. 72 - #[arg(long, short = 'H', value_hint = ValueHint::DirPath, env = "KNOT_HOOKS_PATH")] 73 - pub hooks: Option<PathBuf>, 74 - 75 - /// Path to knot-level git config. 76 - #[arg(long, value_hint = ValueHint::FilePath, env = "KNOT_GIT_CONFIG_PATH")] 77 - #[arg(default_value = default_repository_base().join("git_config").into_os_string())] 78 - pub git_config: PathBuf, 79 - 80 - /// Address to bind the the public knot API. 81 - #[arg(long, value_delimiter = ',', env = "KNOT_ADDR")] 82 - #[arg(default_value = "localhost:5555")] 83 - pub bind: Vec<String>, 84 - 85 - /// Path to the knot sqlite database. 86 - #[arg(long, env = "KNOT_DATABASE_PATH", default_value = "knot.db")] 87 - pub db: PathBuf, 88 - 89 - /// PLC directory for DID resolution. 90 - #[arg(long, value_hint = ValueHint::Url, env = "KNOT_PLC_DIRECTORY")] 91 - #[arg(default_value = "https://plc.directory")] 92 - pub plc_directory: String, 93 - 94 - #[arg(long, short, value_delimiter = ',', value_hint = ValueHint::Url, env = "KNOT_JETSTREAM")] 95 - #[arg(default_value = default_jetstream_instances())] 96 - pub jetstream: Vec<String>, 97 - 98 - /// Acceptable authorization methods for git pushes over http. 99 - #[arg(hide = true, long, require_equals = true, value_delimiter = ',')] 100 - #[arg(env = "KNOT_AUTH_METHODS")] 101 - #[arg(default_value = "service-auth,public-key")] 102 - pub auth_methods: Vec<AuthenticationMethods>, 103 - 104 - /// Require git pushes to be signed by a public key from a 'sh.tangled.publicKey'. 105 - /// 106 - /// See: <https://git-scm.com/docs/git-push#Documentation/git-push.txt---signed> 107 - #[arg(long, action = ArgAction::Set, require_equals = true)] 108 - #[arg(default_value_t = true)] 109 - pub require_signed_push: bool, 110 - 111 - /// Number of open repository handles to cache. 112 - /// 113 - /// Keeping open handles reduces the overhead of opening a repository at the 114 - /// expense of increased memory usage. 115 - #[arg(long, env = "KNOT_REPO_CACHE_SIZE", default_value_t = 0)] 116 - pub repo_cache_size: u64, 117 - 118 - /// Seconds to retain an idle repository handle in cache. 119 - #[arg(long, env = "KNOT_REPO_CACHE_IDLE", default_value_t = 60)] 120 - pub repo_cache_idle: u64, 121 - 122 - /// Seconds to retain a repository handle in cache. 123 - #[arg(long, env = "KNOT_REPO_CACHE_LIVE", default_value_t = 600)] 124 - pub repo_cache_live: u64, 125 - 126 - /// Command to use to compress bzip2 archives. 127 - #[arg(long, env = "KNOT_ARCHIVE_BZ2", default_value = find_command("bzip2").unwrap_or_default())] 128 - pub archive_bz2_command: Option<String>, 129 - 130 - /// Command to use to compress xz archives. 131 - #[arg(long, env = "KNOT_ARCHIVE_XZ", default_value = find_command("xz").unwrap_or_default())] 132 - pub archive_xz_command: Option<String>, 133 - } 134 - 135 - fn find_command(name: &str) -> Option<String> { 136 - use std::process::Command; 137 - 138 - let output = Command::new("which").arg(name).output().ok()?; 139 - if !output.status.success() { 140 - return None; 36 + arguments.jetstream.retain(|val| !val.is_empty()); 141 37 } 142 38 143 - let full_path = String::from_utf8(output.stdout).ok()?; 144 - Some(full_path.trim().to_string()) 39 + arguments 145 40 } 146 41 147 - impl ServeArguments { 148 - pub fn to_knot_config(&self) -> Result<KnotConfiguration, Error> { 149 - let Self { 150 - name, 151 - owner, 152 - repos: repo_path, 153 - hooks: _, 154 - git_config, 155 - bind: _, 156 - db: _, 157 - plc_directory: _, 158 - jetstream: _, 159 - auth_methods: _, 160 - require_signed_push: _, 161 - repo_cache_size, 162 - repo_cache_idle, 163 - repo_cache_live, 164 - archive_bz2_command: _, 165 - archive_xz_command: _, 166 - } = self.clone(); 42 + pub fn run() { 43 + let arguments = parse(); 167 44 168 - // @TODO Validate? 169 - 170 - let instance = format!("did:web:{name}").parse()?; 171 - 172 - Ok(KnotConfiguration { 173 - owner, 174 - instance, 175 - repo_path, 176 - git_config, 177 - readmes: DEFAULT_READMES 178 - .iter() 179 - .map(|v| BString::new(v.to_vec())) 180 - .collect(), 181 - repo_cache: RepoCacheConfig { 182 - size: repo_cache_size, 183 - idle: Duration::from_secs(repo_cache_idle), 184 - live: Duration::from_secs(repo_cache_live), 185 - }, 186 - }) 187 - } 188 - 189 - pub fn init_resolver(&self, http: HttpClient) -> gordian_identity::Resolver { 190 - let plc_url = Url::parse(&self.plc_directory).expect("PLC directory should be a valid URL"); 191 - assert!(["http", "https"].contains(&plc_url.scheme())); 192 - 193 - gordian_identity::Resolver::builder() 194 - .plc_directory(self.plc_directory.clone()) 195 - .build_with(http) 196 - } 197 - } 198 - 199 - #[derive(Debug, thiserror::Error)] 200 - pub enum Error { 201 - #[error("unable to build 'did:web:{{name}}' from knot fqdn: {0}")] 202 - Name(#[from] gordian_types::did::Error), 203 - } 204 - 205 - #[derive(Clone, Debug, ValueEnum)] 206 - pub enum AuthenticationMethods { 207 - ServiceAuth, 208 - PublicKey, 209 - } 210 - 211 - fn default_repository_base() -> PathBuf { 212 - env::current_dir().expect("current working directory should be readable") 213 - } 214 - 215 - fn default_jetstream_instances() -> String { 216 - gordian_jetstream::PUBLIC_JETSTREAM_INSTANCES.join(",") 217 - } 218 - 219 - /// Forward a git hook to the internal API. 220 - /// 221 - /// This command is expected to be invoked by git during operations via 222 - /// the global hook shims. 223 - #[derive(Clone, Args)] 224 - pub struct HookArguments { 225 - /// Internal API endpoints. 226 - #[arg(long, value_delimiter = ',', env = gordian_knot::private::ENV_PRIVATE_ENDPOINTS)] 227 - pub api: Vec<Url>, 228 - 229 - /// DID of the repository owner. 230 - #[arg(long, env = gordian_knot::private::ENV_REPO_DID)] 231 - pub repo_did: OwnedDid, 232 - 233 - /// Record key of the repository. 234 - #[arg(long, env = gordian_knot::private::ENV_REPO_RKEY)] 235 - pub repo_rkey: String, 236 - 237 - /// Name of the hook to forward. 238 - pub hook: HookName, 239 - } 240 - 241 - impl fmt::Debug for HookArguments { 242 - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 243 - f.debug_struct("HookArguments") 244 - // Suppress `url::Url`'s god-awful debug output. 245 - .field("api", &self.api.iter().map(Url::as_str).collect::<Vec<_>>()) 246 - .field("repo_did", &self.repo_did) 247 - .field("repo_rkey", &self.repo_rkey) 248 - .field("hook", &self.hook) 249 - .finish() 250 - } 251 - } 252 - 253 - #[derive(Clone, Copy, Debug, ValueEnum)] 254 - #[clap(rename_all = "kebab-case")] 255 - pub enum HookName { 256 - PreReceive, 257 - PostReceive, 258 - PostUpdate, 259 - } 260 - 261 - impl fmt::Display for HookName { 262 - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 263 - f.write_str(match self { 264 - Self::PreReceive => "pre-receive", 265 - Self::PostReceive => "post-receive", 266 - Self::PostUpdate => "post-update", 267 - }) 268 - } 269 - } 270 - 271 - impl AsRef<std::path::Path> for HookName { 272 - fn as_ref(&self) -> &std::path::Path { 273 - std::path::Path::new(match self { 274 - Self::PreReceive => "pre-receive", 275 - Self::PostReceive => "post-receive", 276 - Self::PostUpdate => "post-update", 277 - }) 278 - } 279 - } 280 - 281 - impl HookName { 282 - pub fn iter_variants() -> impl Iterator<Item = HookName> { 283 - [Self::PreReceive, Self::PostReceive, Self::PostUpdate].into_iter() 45 + match arguments.command { 46 + KnotCommand::Generate(generate) => generate.run().unwrap(), 47 + KnotCommand::Serve(serve) => serve.run().unwrap(), 48 + KnotCommand::Hook(hook) => hook.run().unwrap(), 284 49 } 285 50 }
+23
crates/gordian-knot/src/cli/generate.rs
··· 1 + use clap::{Args, CommandFactory as _}; 2 + use clap_complete::Shell; 3 + 4 + /// Generate shell completions. 5 + #[derive(Clone, Debug, Args)] 6 + pub struct Generate { 7 + /// Shell to generate a completion script for. 8 + pub shell: Shell, 9 + } 10 + 11 + impl super::RunCommand for Generate { 12 + type Error = std::convert::Infallible; 13 + 14 + fn run(self) -> Result<(), Self::Error> { 15 + use super::Arguments; 16 + 17 + let mut command = Arguments::command(); 18 + let name = command.get_name().to_string(); 19 + clap_complete::generate(self.shell, &mut command, name, &mut std::io::stdout()); 20 + 21 + Ok(()) 22 + } 23 + }
+199
crates/gordian-knot/src/cli/hook.rs
··· 1 + use core::fmt; 2 + use std::{ 3 + collections::HashMap, 4 + env, 5 + io::{self, Write as _}, 6 + path, 7 + }; 8 + 9 + use axum::http::{HeaderMap, HeaderName, HeaderValue}; 10 + use bytes::Bytes; 11 + use gordian_types::OwnedDid; 12 + use reqwest::header::InvalidHeaderName; 13 + use url::Url; 14 + 15 + /// Forward a git hook to the internal API. 16 + /// 17 + /// This command is expected to be invoked by git during operations via 18 + /// the global hook shims. 19 + #[derive(Clone, clap::Args)] 20 + pub struct Hook { 21 + /// Internal API endpoints. 22 + #[arg(long, value_delimiter = ',', env = gordian_knot::private::ENV_PRIVATE_ENDPOINTS)] 23 + pub api: Vec<Url>, 24 + 25 + /// DID of the repository owner. 26 + #[arg(long, env = gordian_knot::private::ENV_REPO_DID)] 27 + pub repo_did: OwnedDid, 28 + 29 + /// Record key of the repository. 30 + #[arg(long, env = gordian_knot::private::ENV_REPO_RKEY)] 31 + pub repo_rkey: String, 32 + 33 + /// Name of the hook to forward. 34 + pub hook: HookName, 35 + } 36 + 37 + impl fmt::Debug for Hook { 38 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 39 + f.debug_struct("HookArguments") 40 + // Suppress `url::Url`'s god-awful debug output. 41 + .field("api", &self.api.iter().map(Url::as_str).collect::<Vec<_>>()) 42 + .field("repo_did", &self.repo_did) 43 + .field("repo_rkey", &self.repo_rkey) 44 + .field("hook", &self.hook) 45 + .finish() 46 + } 47 + } 48 + 49 + #[derive(Clone, Copy, Debug, clap::ValueEnum)] 50 + #[clap(rename_all = "kebab-case")] 51 + pub enum HookName { 52 + PreReceive, 53 + PostReceive, 54 + PostUpdate, 55 + } 56 + 57 + impl fmt::Display for HookName { 58 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 59 + f.write_str(match self { 60 + Self::PreReceive => "pre-receive", 61 + Self::PostReceive => "post-receive", 62 + Self::PostUpdate => "post-update", 63 + }) 64 + } 65 + } 66 + 67 + impl AsRef<path::Path> for HookName { 68 + fn as_ref(&self) -> &path::Path { 69 + std::path::Path::new(match self { 70 + Self::PreReceive => "pre-receive", 71 + Self::PostReceive => "post-receive", 72 + Self::PostUpdate => "post-update", 73 + }) 74 + } 75 + } 76 + 77 + impl super::RunCommand for Hook { 78 + type Error = anyhow::Error; 79 + 80 + fn run(self) -> Result<(), Self::Error> { 81 + use tokio::runtime::Builder; 82 + 83 + let runtime = Builder::new_current_thread() 84 + .enable_all() 85 + .build() 86 + .expect("Failed to build runtime"); 87 + 88 + runtime.block_on(forward_hook(self)) 89 + } 90 + } 91 + 92 + /// [`core::fmt::Debug`] an [`url::Url`] without causing eye-cancer. 93 + #[repr(transparent)] 94 + struct DebugUrl(url::Url); 95 + 96 + impl core::fmt::Debug for DebugUrl { 97 + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 98 + core::fmt::Display::fmt(&self.0, f) 99 + } 100 + } 101 + 102 + /// [`core::fmt::Debug`] a slice [`url::Url`] without causing eye-cancer. 103 + pub struct DebugUrls<'a>(pub &'a [url::Url]); 104 + 105 + impl<'a> core::fmt::Debug for DebugUrls<'a> { 106 + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 107 + let urls = unsafe { 108 + // SAFETY: #[repr(transparent)] on DebugUrl guarantees Url === DebugUrl. 109 + &*(self.0 as *const [url::Url] as *const [DebugUrl]) 110 + }; 111 + core::fmt::Debug::fmt(&urls, f) 112 + } 113 + } 114 + 115 + #[tracing::instrument(fields(api = ?DebugUrls(&api)))] 116 + pub async fn forward_hook( 117 + Hook { 118 + api, 119 + repo_did, 120 + repo_rkey, 121 + hook, 122 + }: Hook, 123 + ) -> anyhow::Result<()> { 124 + if api.is_empty() { 125 + tracing::warn!("internal API not specified, skipping hook"); 126 + return Ok(()); 127 + }; 128 + 129 + let mut environment_vars: HashMap<_, _> = env::vars() 130 + .filter(|(key, _)| !key.trim().is_empty()) 131 + .collect(); 132 + 133 + let request_id = take_var(&mut environment_vars, "X_REQUEST_ID").ok(); 134 + 135 + // Build a header map with the remaining environment variables. 136 + let mut headers = HeaderMap::with_capacity(environment_vars.len()); 137 + if let Some(request_id) = request_id { 138 + headers.insert("X-Request-ID", HeaderValue::from_str(&request_id)?); 139 + } 140 + 141 + for (key, value) in environment_vars { 142 + match (variable_to_header_name(&key), HeaderValue::try_from(&value)) { 143 + (Ok(key), Ok(value)) => _ = headers.insert(key, value), 144 + (Err(error), _) => tracing::warn!(?error, ?key, ?value, "ignoring header"), 145 + (_, Err(error)) => tracing::warn!(?error, ?key, ?value, "ignoring header"), 146 + } 147 + } 148 + 149 + let stdin = Bytes::from(io::read_to_string(io::stdin())?); 150 + 151 + let client = reqwest::Client::new(); 152 + let url_path = format!("/hook/{repo_did}/{repo_rkey}/{hook}"); 153 + for mut hook_url in api { 154 + hook_url.set_path(&url_path); 155 + let response = client 156 + .post(hook_url) 157 + .headers(headers.clone()) 158 + .body(stdin.clone()) 159 + .send() 160 + .await; 161 + 162 + match response { 163 + Ok(response) if response.status().is_success() => { 164 + let body = response.bytes().await?; 165 + io::stdout().write_all(&body)?; 166 + return Ok(()); 167 + } 168 + Ok(response) => { 169 + let status = response.status(); 170 + let body = response.bytes().await?; 171 + io::stdout().write_all(&body)?; 172 + return Err(anyhow::anyhow!("Knot returned error status {status}")); 173 + } 174 + Err(error) => { 175 + tracing::error!(?error, "failed to post hook to internal API"); 176 + continue; 177 + } 178 + } 179 + } 180 + 181 + Err(anyhow::anyhow!("Failed to find a valid internal endpoint")) 182 + } 183 + 184 + fn take_var(vars: &mut HashMap<String, String>, name: &str) -> anyhow::Result<String> { 185 + vars.remove(name).ok_or(anyhow::anyhow!( 186 + "Expected environment variable {name:?} to be set", 187 + )) 188 + } 189 + 190 + fn variable_to_header_name(name: &str) -> Result<HeaderName, InvalidHeaderName> { 191 + format!( 192 + "{}-{}", 193 + gordian_knot::private::ENV_HEADER_PREFIX, 194 + name.trim_start_matches("GORDIAN_") 195 + ) 196 + .replace('_', "-") 197 + .to_lowercase() 198 + .try_into() 199 + }
+444
crates/gordian-knot/src/cli/serve.rs
··· 1 + use std::{env, ffi, net::ToSocketAddrs as _, path, time::Duration}; 2 + 3 + use anyhow::Context as _; 4 + use axum::http::{Request, Response}; 5 + use clap::{ArgAction, Args, ValueEnum, ValueHint}; 6 + use futures_util::FutureExt as _; 7 + use gix::bstr::BString; 8 + use gordian_identity::HttpClient; 9 + use gordian_knot::{ 10 + model::{ 11 + Knot, KnotState, 12 + config::{self, KnotConfiguration}, 13 + }, 14 + services::database::DataStore, 15 + }; 16 + use gordian_types::OwnedDid; 17 + use tokio::{net::TcpListener, signal, task::JoinSet}; 18 + use tokio_util::sync::CancellationToken; 19 + use tower::ServiceBuilder; 20 + use tower_http::{ 21 + ServiceBuilderExt as _, 22 + decompression::RequestDecompressionLayer, 23 + request_id::{MakeRequestUuid, RequestId}, 24 + trace::{MakeSpan, OnResponse, TraceLayer}, 25 + }; 26 + use tracing::{Span, field::Empty}; 27 + use url::Url; 28 + 29 + const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); 30 + 31 + /// Serve the tangled knot. 32 + #[derive(Clone, Debug, Args)] 33 + pub struct Serve { 34 + /// FQDN of the knot. 35 + #[arg(long, short, value_hint = ValueHint::Hostname, env = "KNOT_NAME")] 36 + #[cfg_attr(debug_assertions, arg(default_value = "localhost:5555"))] 37 + pub name: String, 38 + 39 + /// Handle or DID of the knot owner. 40 + #[arg(long, short, env = "KNOT_OWNER")] 41 + pub owner: OwnedDid, 42 + 43 + /// Base path for repositories. 44 + #[arg(long, short, value_hint = ValueHint::DirPath, env = "KNOT_REPO_BASE")] 45 + #[arg(default_value = default_repository_base().into_os_string())] 46 + pub repos: path::PathBuf, 47 + 48 + /// Path to knot-level git hooks. 49 + #[arg(long, short = 'H', value_hint = ValueHint::DirPath, env = "KNOT_HOOKS_PATH")] 50 + pub hooks: Option<path::PathBuf>, 51 + 52 + /// Path to knot-level git config. 53 + #[arg(long, value_hint = ValueHint::FilePath, env = "KNOT_GIT_CONFIG_PATH")] 54 + #[arg(default_value = default_repository_base().join("git_config").into_os_string())] 55 + pub git_config: path::PathBuf, 56 + 57 + /// Address to bind the the public knot API. 58 + #[arg(long, value_delimiter = ',', env = "KNOT_ADDR")] 59 + #[arg(default_value = "localhost:5555")] 60 + pub bind: Vec<String>, 61 + 62 + /// Path to the knot sqlite database. 63 + #[arg(long, env = "KNOT_DATABASE_PATH", default_value = "knot.db")] 64 + pub db: path::PathBuf, 65 + 66 + /// PLC directory for DID resolution. 67 + #[arg(long, value_hint = ValueHint::Url, env = "KNOT_PLC_DIRECTORY")] 68 + #[arg(default_value = "https://plc.directory")] 69 + pub plc_directory: String, 70 + 71 + #[arg(long, short, value_delimiter = ',', value_hint = ValueHint::Url, env = "KNOT_JETSTREAM")] 72 + #[arg(default_value = default_jetstream_instances())] 73 + pub jetstream: Vec<String>, 74 + 75 + /// Acceptable authorization methods for git pushes over http. 76 + #[arg(hide = true, long, require_equals = true, value_delimiter = ',')] 77 + #[arg(env = "KNOT_AUTH_METHODS")] 78 + #[arg(default_value = "service-auth,public-key")] 79 + pub auth_methods: Vec<AuthenticationMethods>, 80 + 81 + /// Require git pushes to be signed by a public key from a 'sh.tangled.publicKey'. 82 + /// 83 + /// See: <https://git-scm.com/docs/git-push#Documentation/git-push.txt---signed> 84 + #[arg(long, action = ArgAction::Set, require_equals = true)] 85 + #[arg(default_value_t = true)] 86 + pub require_signed_push: bool, 87 + 88 + /// Number of open repository handles to cache. 89 + /// 90 + /// Keeping open handles reduces the overhead of opening a repository at the 91 + /// expense of increased memory usage. 92 + #[arg(long, env = "KNOT_REPO_CACHE_SIZE", default_value_t = 0)] 93 + pub repo_cache_size: u64, 94 + 95 + /// Seconds to retain an idle repository handle in cache. 96 + #[arg(long, env = "KNOT_REPO_CACHE_IDLE", default_value_t = 60)] 97 + pub repo_cache_idle: u64, 98 + 99 + /// Seconds to retain a repository handle in cache. 100 + #[arg(long, env = "KNOT_REPO_CACHE_LIVE", default_value_t = 600)] 101 + pub repo_cache_live: u64, 102 + 103 + /// Command to use to compress bzip2 archives. 104 + #[arg(long, env = "KNOT_ARCHIVE_BZ2", default_value = find_command("bzip2").unwrap_or_default())] 105 + pub archive_bz2_command: Option<String>, 106 + 107 + /// Command to use to compress xz archives. 108 + #[arg(long, env = "KNOT_ARCHIVE_XZ", default_value = find_command("xz").unwrap_or_default())] 109 + pub archive_xz_command: Option<String>, 110 + } 111 + 112 + impl Serve { 113 + pub fn to_knot_config(&self) -> Result<KnotConfiguration, Error> { 114 + let Self { 115 + name, 116 + owner, 117 + repos: repo_path, 118 + hooks: _, 119 + git_config, 120 + bind: _, 121 + db: _, 122 + plc_directory: _, 123 + jetstream: _, 124 + auth_methods: _, 125 + require_signed_push: _, 126 + repo_cache_size, 127 + repo_cache_idle, 128 + repo_cache_live, 129 + archive_bz2_command: _, 130 + archive_xz_command: _, 131 + } = self.clone(); 132 + 133 + // @TODO Validate? 134 + 135 + let instance = format!("did:web:{name}").parse()?; 136 + 137 + Ok(KnotConfiguration { 138 + owner, 139 + instance, 140 + repo_path, 141 + git_config, 142 + readmes: config::DEFAULT_READMES 143 + .iter() 144 + .map(|v| BString::new(v.to_vec())) 145 + .collect(), 146 + repo_cache: config::RepoCacheConfig { 147 + size: repo_cache_size, 148 + idle: Duration::from_secs(repo_cache_idle), 149 + live: Duration::from_secs(repo_cache_live), 150 + }, 151 + }) 152 + } 153 + 154 + pub fn init_resolver(&self, http: HttpClient) -> gordian_identity::Resolver { 155 + let plc_url = Url::parse(&self.plc_directory).expect("PLC directory should be a valid URL"); 156 + assert!(["http", "https"].contains(&plc_url.scheme())); 157 + 158 + gordian_identity::Resolver::builder() 159 + .plc_directory(self.plc_directory.clone()) 160 + .build_with(http) 161 + } 162 + } 163 + 164 + fn find_command(name: &str) -> Option<String> { 165 + use std::process::Command; 166 + 167 + let output = Command::new("which").arg(name).output().ok()?; 168 + if !output.status.success() { 169 + return None; 170 + } 171 + 172 + let full_path = String::from_utf8(output.stdout).ok()?; 173 + Some(full_path.trim().to_string()) 174 + } 175 + 176 + #[derive(Debug, thiserror::Error)] 177 + pub enum Error { 178 + #[error("unable to build 'did:web:{{name}}' from knot fqdn: {0}")] 179 + Name(#[from] gordian_types::did::Error), 180 + } 181 + 182 + #[derive(Clone, Debug, ValueEnum)] 183 + pub enum AuthenticationMethods { 184 + ServiceAuth, 185 + PublicKey, 186 + } 187 + 188 + fn default_repository_base() -> path::PathBuf { 189 + env::current_dir().expect("current working directory should be readable") 190 + } 191 + 192 + fn default_jetstream_instances() -> String { 193 + gordian_jetstream::PUBLIC_JETSTREAM_INSTANCES.join(",") 194 + } 195 + 196 + impl super::RunCommand for Serve { 197 + type Error = anyhow::Error; 198 + 199 + fn run(self) -> Result<(), Self::Error> { 200 + use tokio::runtime::Builder; 201 + 202 + let runtime = Builder::new_current_thread() 203 + .enable_all() 204 + .build() 205 + .expect("Failed to build runtime"); 206 + 207 + runtime.block_on(serve_knot(self)) 208 + } 209 + } 210 + 211 + pub async fn serve_knot(arguments: Serve) -> anyhow::Result<()> { 212 + unsafe { env::set_var("GIT_CONFIG_GLOBAL", &arguments.git_config) }; 213 + 214 + let tempdir = tempfile::TempDir::with_prefix("gordian-knot-")?; 215 + let hooks_path = if let Some(path) = &arguments.hooks { 216 + // @TODO Verify hooks exist in the specified path. 217 + tracing::warn!(?path, "assuming existence of hooks at path"); 218 + path.to_path_buf() 219 + } else { 220 + let path = tempdir.path().join("hooks"); 221 + crate::hooks::install_global_hooks(&path)?; 222 + path 223 + }; 224 + 225 + assert!(git_config_global("core.hooksPath", &hooks_path)?); 226 + assert!(git_config_global("receive.advertisePushOptions", "true")?); 227 + if let Some(command) = &arguments.archive_bz2_command { 228 + assert!(git_config_global("tar.tar.bz2.command", command)?); 229 + } 230 + if let Some(command) = &arguments.archive_xz_command { 231 + assert!(git_config_global("tar.tar.xz.command", command)?); 232 + } 233 + 234 + let database = { 235 + use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; 236 + 237 + let pool = { 238 + let connect_options = SqliteConnectOptions::new() 239 + .filename(&arguments.db) 240 + .create_if_missing(true) 241 + .foreign_keys(true) 242 + .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); 243 + 244 + SqlitePoolOptions::new() 245 + .connect_with(connect_options) 246 + .await? 247 + }; 248 + 249 + sqlx::migrate!().run(&pool).await?; 250 + DataStore::new(pool) 251 + }; 252 + 253 + let public_http = reqwest::ClientBuilder::new() 254 + .timeout(Duration::from_secs(2)) 255 + .user_agent(USER_AGENT) 256 + .http2_keep_alive_while_idle(true) 257 + .https_only(true) 258 + .build() 259 + .context("Failed to build public HTTP client")?; 260 + 261 + let resolver = arguments.init_resolver(public_http.clone()); 262 + 263 + // Bind listeners for the public API. 264 + let mut public_listeners = Vec::with_capacity(arguments.bind.len()); 265 + for addr in &arguments.bind { 266 + for socket in addr.to_socket_addrs()? { 267 + let listener = TcpListener::bind(socket).await?; 268 + public_listeners.push(listener); 269 + } 270 + } 271 + 272 + // Bind listeners for the private API. 273 + let mut private_listeners = Vec::with_capacity(2); 274 + for socket in "localhost:0".to_socket_addrs()? { 275 + let listener = TcpListener::bind(socket).await?; 276 + private_listeners.push(listener); 277 + } 278 + 279 + // The knot needs to know the sockets we've bound the private API. 280 + let private_addrs = private_listeners 281 + .iter() 282 + .map(tokio::net::TcpListener::local_addr) 283 + .collect::<Result<Vec<_>, std::io::Error>>()?; 284 + 285 + tracing::info!(?private_addrs, "bound internal API"); 286 + 287 + let config: KnotConfiguration = arguments.to_knot_config()?; 288 + let knot_state = KnotState::new(config, resolver, public_http, database, &private_addrs)?; 289 + let knot = Knot::from(knot_state); 290 + 291 + // Ensure the knot owner's records are seeded. 292 + knot.seed_owner() 293 + .await 294 + .context("seeding knot owner's records")?; 295 + 296 + let mut tasks = JoinSet::new(); 297 + let shutdown = CancellationToken::new(); 298 + 299 + // Spawn the internal API. 300 + tasks.spawn(gordian_knot::serve_all( 301 + gordian_knot::private::router() 302 + .layer( 303 + ServiceBuilder::new() 304 + .set_x_request_id(MakeRequestUuid) 305 + .layer( 306 + TraceLayer::new_for_http() 307 + .make_span_with(PrivateHttpSpan) 308 + .on_request(|_: &Request<_>, _: &Span| {}) 309 + .on_response(TraceResponse), 310 + ) 311 + .propagate_x_request_id(), 312 + ) 313 + .with_state(knot.clone()), 314 + private_listeners, 315 + shutdown.child_token(), 316 + )); 317 + 318 + // Spawn the jetstream consumer. 319 + tasks.spawn( 320 + gordian_knot::services::jetstream::init_consumer( 321 + &knot, 322 + arguments.jetstream.as_slice(), 323 + shutdown.child_token(), 324 + ) 325 + .map(|_| Ok(())), 326 + ); 327 + 328 + // Build the public API. 329 + let router = gordian_knot::public::router() 330 + .layer(RequestDecompressionLayer::new()) 331 + .layer( 332 + ServiceBuilder::new() 333 + .set_x_request_id(MakeRequestUuid) 334 + .layer( 335 + TraceLayer::new_for_http() 336 + .make_span_with(PublicHttpSpan) 337 + .on_request(|_: &Request<_>, _: &Span| {}) 338 + .on_response(TraceResponse), 339 + ) 340 + .propagate_x_request_id(), 341 + ) 342 + .with_state(knot); 343 + 344 + tasks.spawn(gordian_knot::serve_all( 345 + router, 346 + public_listeners, 347 + shutdown.child_token(), 348 + )); 349 + 350 + tasks.spawn(wait_for_shutdown(shutdown)); 351 + 352 + for task in tasks.join_all().await { 353 + if let Err(error) = task { 354 + tracing::error!(?error, "knot task completed with error"); 355 + } 356 + } 357 + 358 + Ok(()) 359 + } 360 + 361 + async fn wait_for_shutdown(shutdown: CancellationToken) -> std::io::Result<()> { 362 + use tokio::signal::unix::SignalKind; 363 + 364 + let mut sigterm = signal::unix::signal(SignalKind::terminate())?; 365 + 366 + tokio::select! { 367 + Ok(()) = signal::ctrl_c() => { 368 + eprintln!(); 369 + tracing::info!("ctrl+c received, shutting down ..."); 370 + }, 371 + Some(()) = sigterm.recv() => { 372 + tracing::info!("SIGTERM received, shutting down ..."); 373 + } 374 + } 375 + 376 + shutdown.cancel(); 377 + 378 + Ok(()) 379 + } 380 + 381 + fn git_config_global<K, V>(key: K, value: V) -> std::io::Result<bool> 382 + where 383 + K: AsRef<ffi::OsStr>, 384 + V: AsRef<ffi::OsStr>, 385 + { 386 + use std::process::Stdio; 387 + 388 + let success = std::process::Command::new("/usr/bin/git") 389 + .args(["config", "set", "--global"]) 390 + .arg(key) 391 + .arg(value) 392 + .stdout(Stdio::inherit()) 393 + .stderr(Stdio::inherit()) 394 + .spawn()? 395 + .wait()? 396 + .success(); 397 + 398 + Ok(success) 399 + } 400 + 401 + macro_rules! make_span { 402 + ($name:ident, $label:literal) => { 403 + #[derive(Clone)] 404 + struct $name; 405 + 406 + impl<B> MakeSpan<B> for $name { 407 + fn make_span(&mut self, request: &axum::http::Request<B>) -> tracing::Span { 408 + let method = request.method(); 409 + let path = request.uri().path(); 410 + 411 + let span = tracing::error_span!($label, id = Empty, method = Empty, path = Empty); 412 + if let Some(id) = request 413 + .extensions() 414 + .get::<RequestId>() 415 + .and_then(|request_id| request_id.header_value().to_str().ok()) 416 + { 417 + span.record("id", &id); 418 + } 419 + 420 + span.record("method", tracing::field::debug(&method)); 421 + span.record("path", tracing::field::debug(&path)); 422 + 423 + span 424 + } 425 + } 426 + }; 427 + } 428 + 429 + make_span!(PublicHttpSpan, "public"); 430 + make_span!(PrivateHttpSpan, "private"); 431 + 432 + #[derive(Clone)] 433 + pub struct TraceResponse; 434 + 435 + impl<B> OnResponse<B> for TraceResponse { 436 + fn on_response(self, response: &Response<B>, latency: Duration, _: &Span) { 437 + match response.status() { 438 + status if status.is_success() => tracing::trace!(?status, ?latency), 439 + status if status.is_client_error() => tracing::warn!(?status, ?latency), 440 + status if status.is_server_error() => tracing::error!(?status, ?latency), 441 + status => tracing::info!(?status, ?latency), 442 + } 443 + } 444 + }
+36 -128
crates/gordian-knot/src/hooks.rs
··· 1 - use std::{ 2 - collections::HashMap, 3 - env, 4 - fs::{self, Permissions}, 5 - io::{self, Write}, 6 - os::unix::fs::PermissionsExt, 7 - path::Path, 8 - }; 1 + use std::{env, fs, io, path}; 9 2 10 - use axum::http::{HeaderMap, HeaderName, HeaderValue, header::InvalidHeaderName}; 11 - use bytes::Bytes; 12 - use gordian_knot::private; 3 + use crate::cli::hook::HookName; 13 4 14 - use crate::cli::{HookArguments, HookName}; 5 + /// Install the knot-global git-hooks in `path`. If `path` does not exist it will be created. 6 + /// 7 + /// # Panics 8 + /// 9 + /// Panics if the path to the currently running executable is not utf8. 10 + /// 11 + #[cfg(unix)] 12 + pub fn install_global_hooks<P: AsRef<path::Path>>(path: P) -> io::Result<()> { 13 + use std::os::unix::fs::PermissionsExt as _; 15 14 16 - /// Setup the global hooks directory at `path`. 17 - pub fn setup_global_hooks<P: AsRef<Path>>(path: P) -> io::Result<()> { 15 + use clap::ValueEnum as _; 16 + 18 17 let executable = env::current_exe() 19 - .map(|path| path.to_str().map(ToOwned::to_owned)) 20 - .expect("Current executable must be defined") 21 - .expect("Current executable must be valid utf8"); 18 + .map(|path| path.to_str().map(ToOwned::to_owned))? 19 + .expect("executable path must be valid utf8"); 22 20 23 - let _ = fs::create_dir_all(&path); 24 - for hook_name in HookName::iter_variants() { 25 - let hook_path = path.as_ref().join(hook_name); 21 + ensure_hooks_dir_exists(&path)?; 22 + 23 + for hook_name in HookName::value_variants() { 26 24 let script = format!( 27 - "#!/usr/bin/sh\n# This file is generated by gordian-knot. Do not modify.\n{executable} hook {hook_name}\n" 25 + "#!/usr/bin/sh\n\ 26 + # This file is generated by gordian-knot. Do not modify.\n\ 27 + {executable} hook {hook_name}\n" 28 28 ); 29 - std::fs::write(&hook_path, script)?; 30 29 31 - let permissions = Permissions::from_mode(0o755); 32 - std::fs::set_permissions(&hook_path, permissions)?; 33 - tracing::info!(?executable, ?hook_path, "git hook installed"); 30 + let hook_path = path.as_ref().join(hook_name); 31 + fs::write(&hook_path, script)?; 32 + fs::set_permissions(&hook_path, fs::Permissions::from_mode(0o755))?; 34 33 } 34 + 35 35 Ok(()) 36 36 } 37 37 38 - /// [`core::fmt::Debug`] an [`url::Url`] without causing eye-cancer. 39 - #[repr(transparent)] 40 - struct DebugUrl(url::Url); 41 - 42 - impl core::fmt::Debug for DebugUrl { 43 - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 44 - core::fmt::Display::fmt(&self.0, f) 45 - } 46 - } 47 - 48 - /// [`core::fmt::Debug`] a slice [`url::Url`] without causing eye-cancer. 49 - pub struct DebugUrls<'a>(pub &'a [url::Url]); 50 - 51 - impl<'a> core::fmt::Debug for DebugUrls<'a> { 52 - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 53 - let urls = unsafe { 54 - // SAFETY: Close your eyes an pray! 55 - &*(self.0 as *const [url::Url] as *const [DebugUrl]) 56 - }; 57 - core::fmt::Debug::fmt(&urls, f) 58 - } 59 - } 60 - 61 - #[tracing::instrument(fields(api = ?DebugUrls(&api)))] 62 - pub async fn run_hook( 63 - HookArguments { 64 - api, 65 - repo_did, 66 - repo_rkey, 67 - hook, 68 - }: HookArguments, 69 - ) -> anyhow::Result<()> { 70 - if api.is_empty() { 71 - tracing::warn!("internal API not specified, skipping hook"); 72 - return Ok(()); 73 - }; 74 - 75 - let mut environment_vars: HashMap<_, _> = env::vars() 76 - .filter(|(key, _)| !key.trim().is_empty()) 77 - .collect(); 78 - 79 - let request_id = take_var(&mut environment_vars, "X_REQUEST_ID").ok(); 80 - 81 - // Build a header map with the remaining environment variables. 82 - let mut headers = HeaderMap::with_capacity(environment_vars.len()); 83 - if let Some(request_id) = request_id { 84 - headers.insert("X-Request-ID", HeaderValue::from_str(&request_id)?); 38 + fn ensure_hooks_dir_exists<P: AsRef<path::Path>>(path: P) -> io::Result<()> { 39 + if let Err(error) = fs::create_dir_all(&path) 40 + && error.kind() == io::ErrorKind::AlreadyExists 41 + { 42 + return Err(error); 85 43 } 86 44 87 - for (key, value) in environment_vars { 88 - match (variable_to_header_name(&key), HeaderValue::try_from(&value)) { 89 - (Ok(key), Ok(value)) => _ = headers.insert(key, value), 90 - (Err(error), _) => tracing::warn!(?error, ?key, ?value, "ignoring header"), 91 - (_, Err(error)) => tracing::warn!(?error, ?key, ?value, "ignoring header"), 92 - } 45 + if !fs::metadata(path)?.is_dir() { 46 + return Err(io::Error::new( 47 + io::ErrorKind::NotADirectory, 48 + "global hook path is not a directory", 49 + )); 93 50 } 94 51 95 - let stdin = Bytes::from(io::read_to_string(io::stdin())?); 96 - 97 - let client = reqwest::Client::new(); 98 - let url_path = format!("/hook/{repo_did}/{repo_rkey}/{hook}"); 99 - for mut hook_url in api { 100 - hook_url.set_path(&url_path); 101 - let response = client 102 - .post(hook_url) 103 - .headers(headers.clone()) 104 - .body(stdin.clone()) 105 - .send() 106 - .await; 107 - 108 - match response { 109 - Ok(response) if response.status().is_success() => { 110 - let body = response.bytes().await?; 111 - io::stdout().write_all(&body)?; 112 - return Ok(()); 113 - } 114 - Ok(response) => { 115 - let status = response.status(); 116 - let body = response.bytes().await?; 117 - io::stdout().write_all(&body)?; 118 - return Err(anyhow::anyhow!("Knot returned error status {status}")); 119 - } 120 - Err(error) => { 121 - tracing::error!(?error, "failed to post hook to internal API"); 122 - continue; 123 - } 124 - } 125 - } 126 - 127 - Err(anyhow::anyhow!("Failed to find a valid internal endpoint")) 128 - } 129 - 130 - fn take_var(vars: &mut HashMap<String, String>, name: &str) -> anyhow::Result<String> { 131 - vars.remove(name).ok_or(anyhow::anyhow!( 132 - "Expected environment variable {name:?} to be set", 133 - )) 134 - } 135 - 136 - fn variable_to_header_name(name: &str) -> Result<HeaderName, InvalidHeaderName> { 137 - format!( 138 - "{}-{}", 139 - private::ENV_HEADER_PREFIX, 140 - name.trim_start_matches("GORDIAN_") 141 - ) 142 - .replace('_', "-") 143 - .to_lowercase() 144 - .try_into() 52 + Ok(()) 145 53 }
+16 -277
crates/gordian-knot/src/main.rs
··· 1 1 mod cli; 2 2 mod hooks; 3 3 4 - use anyhow::Context as _; 5 - use axum::http::{Request, Response}; 6 - use futures_util::FutureExt as _; 7 - use gordian_knot::{ 8 - model::{Knot, KnotState, config::KnotConfiguration}, 9 - services::database::DataStore, 10 - }; 11 - use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; 12 - use std::{env, ffi::OsStr, net::ToSocketAddrs as _, time::Duration}; 13 - use tokio::{net::TcpListener, signal::unix::SignalKind, task::JoinSet}; 14 - use tokio::{runtime::Builder, signal}; 15 - use tokio_util::sync::CancellationToken; 16 - use tower::ServiceBuilder; 17 - use tower_http::{ 18 - ServiceBuilderExt as _, 19 - decompression::RequestDecompressionLayer, 20 - request_id::{MakeRequestUuid, RequestId}, 21 - trace::{MakeSpan, OnResponse, TraceLayer}, 22 - }; 23 - use tracing::{Span, field::Empty, level_filters::LevelFilter}; 4 + use tracing::level_filters::LevelFilter; 24 5 use tracing_subscriber::{EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _}; 25 6 26 7 #[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] 27 - use tikv_jemallocator::Jemalloc; 28 - 29 - #[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] 30 8 #[global_allocator] 31 - static GLOBAL: Jemalloc = Jemalloc; 9 + static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; 32 10 33 - const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); 11 + fn main() { 12 + let filter = EnvFilter::builder() 13 + .with_default_directive(LevelFilter::INFO.into()) 14 + .from_env_lossy(); 34 15 35 - fn main() -> anyhow::Result<()> { 16 + let stderr = tracing_subscriber::fmt::layer() 17 + .with_writer(std::io::stderr) 18 + .without_time(); 19 + 20 + #[cfg(debug_assertions)] 21 + let stderr = stderr.pretty(); 22 + 36 23 tracing_subscriber::registry() 37 - .with( 38 - EnvFilter::builder() 39 - .with_default_directive(LevelFilter::INFO.into()) 40 - .from_env_lossy(), 41 - ) 42 - .with( 43 - tracing_subscriber::fmt::layer() 44 - .with_writer(std::io::stderr) 45 - .without_time(), 46 - ) 24 + .with(filter) 25 + .with(stderr) 47 26 .init(); 48 27 49 - let runtime = Builder::new_current_thread() 50 - .enable_all() 51 - .build() 52 - .expect("Failed to build runtime"); 53 - 54 - match cli::parse() { 55 - cli::KnotCommand::Generate(_) => unreachable!("Handled by cli module"), 56 - cli::KnotCommand::Serve(arguments) => runtime.block_on(knot_main(arguments)), 57 - cli::KnotCommand::Hook(arguments) => runtime.block_on(hooks::run_hook(arguments)), 58 - } 59 - } 60 - 61 - pub async fn knot_main(arguments: cli::ServeArguments) -> anyhow::Result<()> { 62 - unsafe { env::set_var("GIT_CONFIG_GLOBAL", &arguments.git_config) }; 63 - 64 - let tempdir = tempfile::TempDir::with_prefix("gordian-knot-")?; 65 - let hooks_path = if let Some(path) = &arguments.hooks { 66 - // @TODO Verify hooks exist in the specified path. 67 - tracing::warn!(?path, "assuming existence of hooks at path"); 68 - path.to_path_buf() 69 - } else { 70 - let path = tempdir.path().join("hooks"); 71 - hooks::setup_global_hooks(&path)?; 72 - path 73 - }; 74 - 75 - assert!(git_config_global("core.hooksPath", &hooks_path)?); 76 - assert!(git_config_global("receive.advertisePushOptions", "true")?); 77 - if let Some(command) = &arguments.archive_bz2_command { 78 - assert!(git_config_global("tar.tar.bz2.command", command)?); 79 - } 80 - if let Some(command) = &arguments.archive_xz_command { 81 - assert!(git_config_global("tar.tar.xz.command", command)?); 82 - } 83 - 84 - let database = { 85 - let pool = { 86 - let connect_options = SqliteConnectOptions::new() 87 - .filename(&arguments.db) 88 - .create_if_missing(true) 89 - .foreign_keys(true) 90 - .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); 91 - 92 - SqlitePoolOptions::new() 93 - .connect_with(connect_options) 94 - .await? 95 - }; 96 - 97 - sqlx::migrate!().run(&pool).await?; 98 - DataStore::new(pool) 99 - }; 100 - 101 - let public_http = reqwest::ClientBuilder::new() 102 - .timeout(Duration::from_secs(2)) 103 - .user_agent(USER_AGENT) 104 - .http2_keep_alive_while_idle(true) 105 - .https_only(true) 106 - .build() 107 - .context("Failed to build public HTTP client")?; 108 - 109 - let resolver = arguments.init_resolver(public_http.clone()); 110 - 111 - // Bind listeners for the public API. 112 - let mut public_listeners = Vec::with_capacity(arguments.bind.len()); 113 - for addr in &arguments.bind { 114 - for socket in addr.to_socket_addrs()? { 115 - let listener = TcpListener::bind(socket).await?; 116 - public_listeners.push(listener); 117 - } 118 - } 119 - 120 - // Bind listeners for the private API. 121 - let mut private_listeners = Vec::with_capacity(2); 122 - for socket in "localhost:0".to_socket_addrs()? { 123 - let listener = TcpListener::bind(socket).await?; 124 - private_listeners.push(listener); 125 - } 126 - 127 - // The knot needs to know the sockets we've bound the private API. 128 - let private_addrs = private_listeners 129 - .iter() 130 - .map(tokio::net::TcpListener::local_addr) 131 - .collect::<Result<Vec<_>, std::io::Error>>()?; 132 - 133 - tracing::info!(?private_addrs, "bound internal API"); 134 - 135 - let config: KnotConfiguration = arguments.to_knot_config()?; 136 - let knot_state = KnotState::new(config, resolver, public_http, database, &private_addrs)?; 137 - let knot = Knot::from(knot_state); 138 - 139 - // Ensure the knot owner's records are seeded. 140 - knot.seed_owner() 141 - .await 142 - .context("seeding knot owner's records")?; 143 - 144 - let mut tasks = JoinSet::new(); 145 - let shutdown = CancellationToken::new(); 146 - 147 - // Spawn the internal API. 148 - tasks.spawn(gordian_knot::serve_all( 149 - gordian_knot::private::router() 150 - .layer( 151 - ServiceBuilder::new() 152 - .set_x_request_id(MakeRequestUuid) 153 - .layer( 154 - TraceLayer::new_for_http() 155 - .make_span_with(PrivateHttpSpan) 156 - .on_request(|_: &Request<_>, _: &Span| {}) 157 - .on_response(TraceResponse), 158 - ) 159 - .propagate_x_request_id(), 160 - ) 161 - .with_state(knot.clone()), 162 - private_listeners, 163 - shutdown.child_token(), 164 - )); 165 - 166 - // Spawn the jetstream consumer. 167 - tasks.spawn( 168 - gordian_knot::services::jetstream::init_consumer( 169 - &knot, 170 - arguments.jetstream.as_slice(), 171 - shutdown.child_token(), 172 - ) 173 - .map(|_| Ok(())), 174 - ); 175 - 176 - // Build the public API. 177 - let router = gordian_knot::public::router() 178 - .layer(RequestDecompressionLayer::new()) 179 - .layer( 180 - ServiceBuilder::new() 181 - .set_x_request_id(MakeRequestUuid) 182 - .layer( 183 - TraceLayer::new_for_http() 184 - .make_span_with(PublicHttpSpan) 185 - .on_request(|_: &Request<_>, _: &Span| {}) 186 - .on_response(TraceResponse), 187 - ) 188 - .propagate_x_request_id(), 189 - ) 190 - .with_state(knot); 191 - 192 - tasks.spawn(gordian_knot::serve_all( 193 - router, 194 - public_listeners, 195 - shutdown.child_token(), 196 - )); 197 - 198 - tasks.spawn(wait_for_shutdown(shutdown)); 199 - 200 - for task in tasks.join_all().await { 201 - if let Err(error) = task { 202 - tracing::error!(?error, "knot task completed with error"); 203 - } 204 - } 205 - 206 - Ok(()) 207 - } 208 - 209 - async fn wait_for_shutdown(shutdown: CancellationToken) -> std::io::Result<()> { 210 - let mut sigterm = signal::unix::signal(SignalKind::terminate())?; 211 - 212 - tokio::select! { 213 - Ok(()) = signal::ctrl_c() => { 214 - eprintln!(); 215 - tracing::info!("ctrl+c received, shutting down ..."); 216 - }, 217 - Some(()) = sigterm.recv() => { 218 - tracing::info!("SIGTERM received, shutting down ..."); 219 - } 220 - } 221 - 222 - shutdown.cancel(); 223 - 224 - Ok(()) 225 - } 226 - 227 - fn git_config_global<K, V>(key: K, value: V) -> std::io::Result<bool> 228 - where 229 - K: AsRef<OsStr>, 230 - V: AsRef<OsStr>, 231 - { 232 - use std::process::Stdio; 233 - 234 - let success = std::process::Command::new("/usr/bin/git") 235 - .args(["config", "set", "--global"]) 236 - .arg(key) 237 - .arg(value) 238 - .stdout(Stdio::inherit()) 239 - .stderr(Stdio::inherit()) 240 - .spawn()? 241 - .wait()? 242 - .success(); 243 - 244 - Ok(success) 245 - } 246 - 247 - macro_rules! make_span { 248 - ($name:ident, $label:literal) => { 249 - #[derive(Clone)] 250 - struct $name; 251 - 252 - impl<B> MakeSpan<B> for $name { 253 - fn make_span(&mut self, request: &axum::http::Request<B>) -> tracing::Span { 254 - let method = request.method(); 255 - let path = request.uri().path(); 256 - 257 - let span = tracing::error_span!($label, id = Empty, method = Empty, path = Empty); 258 - if let Some(id) = request 259 - .extensions() 260 - .get::<RequestId>() 261 - .and_then(|request_id| request_id.header_value().to_str().ok()) 262 - { 263 - span.record("id", &id); 264 - } 265 - 266 - span.record("method", tracing::field::debug(&method)); 267 - span.record("path", tracing::field::debug(&path)); 268 - 269 - span 270 - } 271 - } 272 - }; 273 - } 274 - 275 - make_span!(PublicHttpSpan, "public"); 276 - make_span!(PrivateHttpSpan, "private"); 277 - 278 - #[derive(Clone)] 279 - pub struct TraceResponse; 280 - 281 - impl<B> OnResponse<B> for TraceResponse { 282 - fn on_response(self, response: &Response<B>, latency: Duration, _: &Span) { 283 - match response.status() { 284 - status if status.is_success() => tracing::trace!(?status, ?latency), 285 - status if status.is_client_error() => tracing::warn!(?status, ?latency), 286 - status if status.is_server_error() => tracing::error!(?status, ?latency), 287 - status => tracing::info!(?status, ?latency), 288 - } 289 - } 28 + cli::run(); 290 29 }