···2727url.workspace = true28282929async-trait = "0.1.89"3030+aws-lc-rs = { version = "1.14.1", default-features = false, features = ["alloc", "aws-lc-sys"] }3031axum = { workspace = true, features = ["ws"] }3132axum-extra = { version = "0.12.1", features = ["async-read-body"] }3233bytes = "1.10.1"3334clap = { version = "4.5.47", features = ["derive", "env", "string"] }3435data-encoding.workspace = true3636+data-encoding-macro = "0.1.18"3537futures-util = "0.3.31"3638hyper-util = { version = "0.1.17", features = ["client"] }3939+rand = "0.9.2"3740rayon = "1.11.0"3841rustc-hash = "2.1.1"3942sqlx = { version = "0.8.6", features = ["runtime-tokio", "tls-native-tls", "postgres", "time", "json", "macros", "derive"] }
+133
crates/knot/src/hooks.rs
···11+use core::fmt;22+use std::{33+ collections::HashMap,44+ env, fs,55+ io::{self, Write},66+ path::Path,77+};88+99+use atproto::Did;1010+use axum::http::{HeaderMap, HeaderName, header::InvalidHeaderName};1111+use bytes::Bytes;1212+use knot::private::{self, Hook};1313+use url::Url;1414+1515+/// Setup the global hooks directory at `path`.1616+///1717+/// Currently, this just creates symlinks to the currently running executable for the1818+/// hook names returned by [`Hook::iter_names()`].1919+///2020+pub fn setup_global_hooks<P: AsRef<Path>>(path: P) -> io::Result<()> {2121+ let current_exe = env::current_exe()?;2222+ let _ = fs::create_dir_all(&path);2323+ for hook_name in Hook::iter_names() {2424+ let hook_path = path.as_ref().join(hook_name);2525+ update_symlink(hook_path, ¤t_exe)?;2626+ }2727+ Ok(())2828+}2929+3030+/// Creates or updates a symlink pointing to the specified target.3131+///3232+/// # Errors3333+///3434+/// Will return an error in the following situations, but is not limited to just these3535+/// cases:3636+///3737+/// * If the path `symlink` already exists, but is not a symlink.3838+///3939+fn update_symlink<P: AsRef<Path> + fmt::Debug, Q: AsRef<Path>>(4040+ symlink: P,4141+ target: Q,4242+) -> io::Result<fs::Metadata> {4343+ match fs::symlink_metadata(&symlink) {4444+ Ok(attr) if attr.is_symlink() => {4545+ fs::remove_file(&symlink)?;4646+ }4747+ Ok(_) => Err(io::Error::new(4848+ io::ErrorKind::AlreadyExists,4949+ format!("{symlink:?} already exists and is not a symlink"),5050+ ))?,5151+ Err(error) if matches!(error.kind(), io::ErrorKind::NotFound) => {}5252+ Err(error) => Err(error)?,5353+ }5454+5555+ std::os::unix::fs::symlink(target, &symlink)?;5656+ fs::metadata(symlink)5757+}5858+5959+#[tracing::instrument]6060+pub async fn post_hook(hook: &Hook) -> anyhow::Result<()> {6161+ let mut environment_vars: HashMap<_, _> = env::vars().collect();6262+6363+ // Take the environment variables we need to post the hook to the internal API.6464+ let endpoints = take_var(&mut environment_vars, private::ENV_PRIVATE_ENDPOINTS)?;6565+ let repo_did: Box<Did> = take_var(&mut environment_vars, private::ENV_REPO_DID)?.parse()?;6666+ let repo_rkey = take_var(&mut environment_vars, private::ENV_REPO_RKEY)?;6767+6868+ // Build a header map with the remaining environment variables.6969+ let mut headers = HeaderMap::with_capacity(environment_vars.len());7070+ for (key, value) in environment_vars {7171+ let header_name = variable_to_header_name(&key)?;7272+ headers.insert(header_name, value.try_into()?);7373+ }7474+7575+ let stdin = Bytes::from(io::read_to_string(io::stdin())?);7676+7777+ let client = reqwest::Client::new();7878+ let url_path = format!("/hook/{repo_did}/{repo_rkey}/{hook}");7979+ for endpoint in endpoints.split_whitespace() {8080+ let mut hook_url = match Url::parse(endpoint) {8181+ Ok(hook_url) => hook_url,8282+ Err(error) => {8383+ tracing::error!(?error, ?endpoint, "failed to parse internal endpoint");8484+ continue;8585+ }8686+ };8787+8888+ hook_url.set_path(&url_path);8989+ let response = client9090+ .post(hook_url)9191+ .headers(headers.clone())9292+ .body(stdin.clone())9393+ .send()9494+ .await;9595+9696+ match response {9797+ Ok(response) if response.status().is_success() => {9898+ let body = response.bytes().await?;9999+ io::stdout().write_all(&body)?;100100+ return Ok(());101101+ }102102+ Ok(response) => {103103+ let status = response.status();104104+ let body = response.bytes().await?;105105+ io::stdout().write_all(&body)?;106106+ return Err(anyhow::anyhow!("Knot returned error status {status}"));107107+ }108108+ Err(error) => {109109+ tracing::error!(?error, "failed to post hook to internal API");110110+ continue;111111+ }112112+ }113113+ }114114+115115+ Err(anyhow::anyhow!("Failed to find a valid internal endpoint"))116116+}117117+118118+fn take_var(vars: &mut HashMap<String, String>, name: &str) -> anyhow::Result<String> {119119+ vars.remove(name).ok_or(anyhow::anyhow!(120120+ "Expected environment variable {name:?} to be set",121121+ ))122122+}123123+124124+fn variable_to_header_name(name: &str) -> Result<HeaderName, InvalidHeaderName> {125125+ format!(126126+ "{}-{}",127127+ private::ENV_HEADER_PREFIX,128128+ name.trim_start_matches("GORDIAN_")129129+ )130130+ .replace('_', "-")131131+ .to_lowercase()132132+ .try_into()133133+}
+6
crates/knot/src/lib.rs
···11pub mod model;22+pub mod private;23pub mod public;34pub mod services;45pub mod types;66+77+/// Encoding used for [Timestamp Identitiers](https://atproto.com/specs/tid).88+pub const BASE32_SORTABLE: data_encoding::Encoding = data_encoding_macro::new_encoding! {99+ symbols: "234567abcdefghijklmnopqrstuvwxyz",1010+};
+88-9
crates/knot/src/main.rs
···11mod cli;22+mod hooks;2334use anyhow::Context;45use axum::{···1413use reqwest::ClientBuilder;1514use sqlx::postgres::PgPoolOptions;1615use std::{1616+ env,1717+ ffi::OsString,1718 net::{SocketAddr, ToSocketAddrs},1919+ path::PathBuf,2020+ process,1821 sync::{Arc, atomic::AtomicU64},1922 time::Duration,2023};···3530use url::Url;36313732fn main() {3333+ use knot::private::Hook;3434+3835 let stderr_layer = tracing_subscriber::fmt::layer().with_writer(std::io::stderr);3936 tracing_subscriber::registry()4037 .with(EnvFilter::from_default_env())···4437 .try_init()4538 .unwrap();46394747- let arguments = cli::parse();4848- tracing::debug!(?arguments);4949-5050- Builder::new_current_thread()4040+ let runtime = Builder::new_current_thread()5141 .enable_all()5242 .build()5353- .expect("Failed to build runtime")5454- .block_on(run(arguments))5555- .unwrap();4343+ .expect("Failed to build runtime");4444+4545+ let name = executable_name().expect("Executable should have a name");4646+ match Hook::try_from(name.as_os_str()) {4747+ Ok(hook) => {4848+ runtime.block_on(hooks::post_hook(&hook)).unwrap();4949+ }5050+ Err(_) => {5151+ let arguments = cli::parse();5252+ tracing::debug!(?arguments);5353+ runtime.block_on(run(arguments)).unwrap();5454+ }5555+ }5656+}5757+5858+fn executable_name() -> anyhow::Result<OsString> {5959+ Ok(env::args_os()6060+ .next()6161+ .map(PathBuf::from)6262+ .ok_or(anyhow::anyhow!("Expected at least one argument"))?6363+ .file_name()6464+ .ok_or(anyhow::anyhow!(6565+ "Failed to extract file name from executable"6666+ ))?6767+ .to_os_string())5668}57695870const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");···10781const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));1088210983pub async fn run(arguments: cli::Arguments) -> anyhow::Result<()> {8484+ let git_config_path = env::current_dir().unwrap().join("git_config");8585+ unsafe {8686+ env::set_var("GIT_CONFIG_GLOBAL", &git_config_path);8787+ }8888+8989+ // Setup git hooks9090+ let hooks_path = env::current_dir()?.join("hooks");9191+ hooks::setup_global_hooks(&hooks_path)?;9292+9393+ assert!(9494+ process::Command::new("/usr/bin/git")9595+ .args(["config", "set", "--global", "core.hooksPath"])9696+ .arg(&hooks_path)9797+ .spawn()?9898+ .wait()?9999+ .success()100100+ );101101+102102+ assert!(103103+ process::Command::new("/usr/bin/git")104104+ .args([105105+ "config",106106+ "set",107107+ "--global",108108+ "receive.advertisePushOptions",109109+ "true"110110+ ])111111+ .spawn()?112112+ .wait()?113113+ .success()114114+ );115115+110116 let pool = PgPoolOptions::new()111117 .max_connections(5)112118 .connect(arguments.db.as_str())···196138 .cursor(jetstream_cursor.map(|(_, ts)| ts))197139 .build(Url::parse(jetstream::PUBLIC_JETSTREAM_INSTANCES[0])?);198140141141+ let mut service = JoinSet::new();142142+ let mut private_sockets = Vec::new();143143+ for socket in "localhost:0".to_socket_addrs()? {144144+ let bound_socket = TcpListener::bind(socket).await?;145145+ private_sockets.push(bound_socket);146146+ }147147+148148+ let private_addrs: Vec<_> = private_sockets149149+ .iter()150150+ .map(|listener| listener.local_addr().unwrap())151151+ .collect();152152+199153 let config = KnotConfiguration::builder()200154 .instance_name(&arguments.name)201155 .owner_did(&resolved_owner)202156 .repo_path(&arguments.repos)157157+ .hook_path(&hooks_path)158158+ .git_config_path(&git_config_path)159159+ .private_sockets(&private_addrs)203160 .build()?;204161205162 let knot: Knot = KnotState::new(config, resolver, public_http, jetstream, store).into();163163+206164 let router = router207165 .layer(SetRequestIdLayer::new(208166 X_REQUEST_ID,···245171 tracing::trace!(?latency, status = ?response.status());246172 }),247173 )248248- .with_state(knot);174174+ .with_state(knot.clone());175175+176176+ let internal = knot::private::router().with_state(knot);177177+ for socket in private_sockets {178178+ let router = internal.clone();179179+ service.spawn(async move { axum::serve(socket, router).await });180180+ }249181250182 let mut sockets = Vec::with_capacity(arguments.addr.len());251183 for addr in &arguments.addr {···260180 }261181 }262182263263- let mut service = JoinSet::new();264183 for socket in sockets {265184 serve(&mut service, socket, router.clone()).await;266185 }
+85-4
crates/knot/src/model.rs
···66pub mod repository;7788use core::ops;99-use std::sync::Arc;99+use std::{ffi::OsString, sync::Arc};10101111use axum::{1212 extract::{FromRef, FromRequestParts},···1414};1515use identity::Resolver;1616use serve_git::state::GitServiceState;1717+use tokio::process::Command;17181819use crate::{2020+ private,1921 public::git::{Error, GitAuthorization, NotFound},2022 services::authorization::AuthorizationClaimsStore,2123 types::repository_path::RepositoryPath,···7472 ) -> Result<tokio::process::Command, Self::Rejection> {7573 use axum::extract::Path;7674 let Path(repopath) = Path::<RepositoryPath>::from_request_parts(parts, &()).await?;7575+ let resolved = self.resolve_repo_path(&repopath).await.map_err(NotFound)?;7776 let repo = self.open_repository(repopath).await.map_err(NotFound)?;7878- let command = serve_git::commands::upload_pack_info_refs("/usr/bin/git", repo.path());7777+ let path = repo.path();7878+7979+ let mut command = Command::new("/usr/bin/git");8080+ command8181+ .env_clear()8282+ .env(private::ENV_PRIVATE_ENDPOINTS, self.private_endpoints())8383+ .env(private::ENV_REPO_DID, resolved.owner.as_str())8484+ .env(private::ENV_REPO_RKEY, resolved.rkey.as_ref())8585+ .env("GIT_CONFIG_GLOBAL", self.git_config_path())8686+ .current_dir(path)8787+ .args([8888+ "upload-pack",8989+ "--http-backend-info-refs",9090+ "--stateless-rpc",9191+ "--strict",9292+ "--timeout=10",9393+ ])9494+ .arg(path.as_os_str());9595+7996 Ok(command)8097 }8198···10483 ) -> Result<tokio::process::Command, Self::Rejection> {10584 use axum::extract::Path;10685 let Path(repopath) = Path::<RepositoryPath>::from_request_parts(parts, &()).await?;8686+ let resolved = self.resolve_repo_path(&repopath).await.map_err(NotFound)?;10787 let repo = self.open_repository(repopath).await.map_err(NotFound)?;8888+ let path = repo.path();8989+9090+ let mut command = Command::new("/usr/bin/git");9191+ command9292+ .env_clear()9393+ .env(private::ENV_PRIVATE_ENDPOINTS, self.private_endpoints())9494+ .env(private::ENV_REPO_DID, resolved.owner.as_str())9595+ .env(private::ENV_REPO_RKEY, resolved.rkey.as_ref())9696+ .env("GIT_CONFIG_GLOBAL", self.git_config_path())9797+ .current_dir(path)9898+ .args(["upload-pack", "--strict", "--stateless-rpc"])9999+ .arg(path.as_os_str());100100+108101 let command = serve_git::commands::upload_pack("/usr/bin/git", repo.path());109102 Ok(command)110103 }···145110 }146111147112 let repo = self.open_repository(repopath).await.map_err(NotFound)?;148148- let command = serve_git::commands::receive_pack_info_refs("/usr/bin/git", repo.path());113113+ let path = repo.path();114114+115115+ let nonce_seed = self.generate_push_seed(&resolved);116116+ let mut command = Command::new("/usr/bin/git");117117+ command118118+ .env_clear()119119+ .env(private::ENV_PRIVATE_ENDPOINTS, self.private_endpoints())120120+ .env(private::ENV_REPO_DID, resolved.owner.as_str())121121+ .env(private::ENV_REPO_RKEY, resolved.rkey.as_ref())122122+ .env(private::ENV_USER_DID, auth.iss.as_str())123123+ .env("GIT_CONFIG_GLOBAL", self.git_config_path())124124+ .current_dir(path)125125+ .args([126126+ "-c",127127+ &nonce_seed,128128+ "receive-pack",129129+ "--http-backend-info-refs",130130+ "--stateless-rpc",131131+ ])132132+ .arg(path.as_os_str());149133 Ok(command)150134 }151135···190136 }191137192138 let repo = self.open_repository(repopath).await.map_err(NotFound)?;193193- let command = serve_git::commands::receive_pack("/usr/bin/git", repo.path());139139+ let path = repo.path();140140+141141+ let allowed_signers_path = std::env::current_dir()142142+ .unwrap()143143+ .join("allowed_signers")144144+ .join(auth.iss.as_str());145145+146146+ let mut allowed_signers_option = OsString::with_capacity(147147+ "gpg.ssh.allowedSignersFile=".len() + allowed_signers_path.as_os_str().len(),148148+ );149149+ allowed_signers_option.push("gpg.ssh.allowedSignersFile=");150150+ allowed_signers_option.push(&allowed_signers_path);151151+152152+ let nonce_seed = self.generate_push_seed(&resolved);153153+ let mut command = Command::new("/usr/bin/git");154154+ command155155+ .env_clear()156156+ .env(private::ENV_PRIVATE_ENDPOINTS, self.private_endpoints())157157+ .env(private::ENV_REPO_DID, resolved.owner.as_str())158158+ .env(private::ENV_REPO_RKEY, resolved.rkey.as_ref())159159+ .env(private::ENV_USER_DID, auth.iss.as_str())160160+ .env("GIT_CONFIG_GLOBAL", self.git_config_path())161161+ .current_dir(path)162162+ .args(["-c", &nonce_seed, "-c"])163163+ .arg(&allowed_signers_option)164164+ .args(["receive-pack", "--stateless-rpc"])165165+ .arg(path.as_os_str());166166+194167 Ok(command)195168 }196169}