don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: merge git-service crate into gordian-knot

Signed-off-by: tjh <x@tjh.dev>

tjh cf60e3d2 624ff92c

+427 -785
-18
Cargo.lock
··· 1165 1165 ] 1166 1166 1167 1167 [[package]] 1168 - name = "git-service" 1169 - version = "0.0.0" 1170 - dependencies = [ 1171 - "anyhow", 1172 - "axum", 1173 - "axum-extra", 1174 - "bytes", 1175 - "clap", 1176 - "serde", 1177 - "tokio", 1178 - "tokio-stream", 1179 - "tower", 1180 - "tracing", 1181 - "tracing-subscriber", 1182 - ] 1183 - 1184 - [[package]] 1185 1168 name = "gix" 1186 1169 version = "0.78.0" 1187 1170 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2124 2141 "dashmap", 2125 2142 "data-encoding", 2126 2143 "futures-util", 2127 - "git-service", 2128 2144 "gix", 2129 2145 "gordian-auth", 2130 2146 "gordian-identity",
+1 -2
Cargo.toml
··· 8 8 "crates/gordian-jetstream", 9 9 "crates/gordian-knot", 10 10 "crates/gordian-lexicon", 11 - "crates/git-service", "crates/mock-pds", 11 + "crates/mock-pds", 12 12 ] 13 13 default-members = ["crates/gordian-knot"] 14 14 ··· 26 26 gordian-identity = { path = "crates/gordian-identity" } 27 27 gordian-jetstream = { path = "crates/gordian-jetstream" } 28 28 gordian-lexicon = { path = "crates/gordian-lexicon" } 29 - git-service = { path = "crates/git-service"} 30 29 31 30 anyhow = "1.0.100" 32 31 axum = "0.8.4"
-22
crates/git-service/Cargo.toml
··· 1 - [package] 2 - name = "git-service" 3 - version.workspace = true 4 - authors.workspace = true 5 - repository.workspace = true 6 - license.workspace = true 7 - edition.workspace = true 8 - publish.workspace = true 9 - 10 - [dependencies] 11 - anyhow.workspace = true 12 - axum.workspace = true 13 - serde.workspace = true 14 - tracing.workspace = true 15 - 16 - axum-extra = { version = "0.12.1", features = ["async-read-body"] } 17 - clap = { version = "4.5.47", features = ["derive", "env", "string"] } 18 - tokio = { version = "1.47.1", features = ["io-util", "macros", "net", "process", "signal", "rt-multi-thread"] } 19 - tokio-stream = "0.1.17" 20 - tower = "0.5.2" 21 - tracing-subscriber = "0.3.20" 22 - bytes = "1.10.1"
-99
crates/git-service/src/commands.rs
··· 1 - //! Basic command constructors 2 - //! 3 - use std::{ffi::OsStr, path::Path}; 4 - use tokio::process::Command; 5 - 6 - pub fn upload_archive<S, P>(git: S, path: P) -> Command 7 - where 8 - S: AsRef<OsStr>, 9 - P: AsRef<Path>, 10 - { 11 - let path = path.as_ref(); 12 - 13 - let mut command = Command::new(git); 14 - command 15 - .env_clear() 16 - .current_dir(path) 17 - .args(["upload-archive"]) 18 - .arg(path.as_os_str()); 19 - 20 - command 21 - } 22 - 23 - pub fn upload_pack_info_refs<S, P>(git: S, path: P) -> Command 24 - where 25 - S: AsRef<OsStr>, 26 - P: AsRef<Path>, 27 - { 28 - let path = path.as_ref(); 29 - 30 - let mut command = Command::new(git); 31 - command 32 - .env_clear() 33 - .current_dir(path) 34 - .args([ 35 - "upload-pack", 36 - "--http-backend-info-refs", 37 - "--stateless-rpc", 38 - "--strict", 39 - "--timeout=10", 40 - ]) 41 - .arg(path.as_os_str()); 42 - 43 - command 44 - } 45 - 46 - pub fn upload_pack<S, P>(git: S, path: P) -> Command 47 - where 48 - S: AsRef<OsStr>, 49 - P: AsRef<Path>, 50 - { 51 - let path = path.as_ref(); 52 - 53 - let mut command = Command::new(git); 54 - command 55 - .env_clear() 56 - .current_dir(path) 57 - .args(["upload-pack", "--strict", "--stateless-rpc"]) 58 - .arg(path.as_os_str()); 59 - 60 - command 61 - } 62 - 63 - pub fn receive_pack_info_refs<S, P>(git: S, path: P) -> Command 64 - where 65 - S: AsRef<OsStr>, 66 - P: AsRef<Path>, 67 - { 68 - let path = path.as_ref(); 69 - 70 - let mut command = Command::new(git); 71 - command 72 - .env_clear() 73 - .current_dir(path) 74 - .args([ 75 - "receive-pack", 76 - "--http-backend-info-refs", 77 - "--stateless-rpc", 78 - ]) 79 - .arg(path.as_os_str()); 80 - 81 - command 82 - } 83 - 84 - pub fn receive_pack<S, P>(git: S, path: P) -> Command 85 - where 86 - S: AsRef<OsStr>, 87 - P: AsRef<Path>, 88 - { 89 - let path = path.as_ref(); 90 - 91 - let mut command = Command::new(git); 92 - command 93 - .env_clear() 94 - .current_dir(path) 95 - .args(["receive-pack", "--stateless-rpc"]) 96 - .arg(path.as_os_str()); 97 - 98 - command 99 - }
-60
crates/git-service/src/lib.rs
··· 1 - pub mod commands; 2 - pub mod state; 3 - pub mod types; 4 - 5 - mod receive_pack; 6 - mod upload_archive; 7 - mod upload_pack; 8 - 9 - pub mod util; 10 - 11 - use axum::{ 12 - Router, 13 - extract::{Query, Request, State}, 14 - response::{IntoResponse, Response}, 15 - }; 16 - use state::GitServiceState; 17 - use types::{GitProtocol, GitService}; 18 - 19 - pub fn router<S>() -> Router<S> 20 - where 21 - S: GitServiceState + Send + Sync + Clone + 'static, 22 - S::Rejection: From<std::io::Error>, 23 - S::Rejection: From<axum::Error>, 24 - { 25 - use axum::routing::{get, post}; 26 - Router::<S>::new() 27 - .route("/info/refs", get(info_refs::<S>)) 28 - .route( 29 - "/git-upload-archive", 30 - post(upload_archive::upload_archive::<S>), 31 - ) 32 - .route("/git-upload-pack", post(upload_pack::upload_pack::<S>)) 33 - .route("/git-receive-pack", post(receive_pack::receive_pack::<S>)) 34 - } 35 - 36 - /// Parameters for 'info/refs' 37 - #[derive(Debug, serde::Deserialize)] 38 - pub struct InfoRefsQuery { 39 - pub service: GitService, 40 - } 41 - 42 - pub async fn info_refs<S>( 43 - state: State<S>, 44 - Query(params): Query<InfoRefsQuery>, 45 - protocol: Option<GitProtocol>, 46 - req: Request, 47 - ) -> Response 48 - where 49 - S: GitServiceState, 50 - S::Rejection: From<std::io::Error>, 51 - { 52 - match params.service { 53 - GitService::GitUploadPack => upload_pack::advertise_upload_pack(state, protocol, req) 54 - .await 55 - .into_response(), 56 - GitService::GitReceivePack => receive_pack::advertise_receive_pack(state, protocol, req) 57 - .await 58 - .into_response(), 59 - } 60 - }
-32
crates/git-service/src/main.rs
··· 1 - use axum::Router; 2 - use clap::Parser; 3 - use std::path::PathBuf; 4 - use tokio::net::TcpListener; 5 - 6 - #[derive(Parser)] 7 - struct Arguments { 8 - #[arg(long, default_value = "localhost:8000")] 9 - addr: String, 10 - 11 - #[arg(default_value = ".")] 12 - path: PathBuf, 13 - } 14 - 15 - #[tokio::main] 16 - async fn main() -> anyhow::Result<()> { 17 - tracing_subscriber::fmt::init(); 18 - 19 - let args = Arguments::parse(); 20 - 21 - tracing::info!("serving repositories from {:?}", args.path); 22 - let app = Router::new() 23 - .nest("/{owner}/{name}", git_service::router()) 24 - .with_state(args.path); 25 - 26 - let listener = TcpListener::bind(args.addr).await?; 27 - tracing::info!("listening on {:?}", listener.local_addr().unwrap()); 28 - 29 - axum::serve(listener, app).await?; 30 - 31 - Ok(()) 32 - }
-83
crates/git-service/src/receive_pack.rs
··· 1 - use crate::{state::GitServiceState, types::GitProtocol, util::SetOptionEnv as _}; 2 - use axum::{ 3 - extract::{Request, State}, 4 - http::header::{CACHE_CONTROL, CONNECTION, CONTENT_TYPE}, 5 - response::IntoResponse, 6 - }; 7 - use axum_extra::body::AsyncReadBody; 8 - use std::process::Stdio; 9 - use tokio::net::unix::pipe::{Sender, pipe as async_pipe}; 10 - 11 - const RECEIVE_PACK_ADVERTISEMENT: &str = "application/x-git-receive-pack-advertisement"; 12 - const RECEIVE_PACK_RESULT: &str = "application/x-git-receive-pack-result"; 13 - 14 - const NO_CACHE: &str = "no-cache, max-age=0, must-revalidate"; 15 - const KEEP_ALIVE: &str = "keep-alive"; 16 - 17 - pub async fn advertise_receive_pack<S>( 18 - State(state): State<S>, 19 - protocol: Option<GitProtocol>, 20 - req: Request, 21 - ) -> Result<impl IntoResponse, S::Rejection> 22 - where 23 - S: GitServiceState, 24 - S::Rejection: From<std::io::Error>, 25 - { 26 - let (mut parts, _) = req.into_parts(); 27 - let mut command = state.init_receive_pack_advertisement(&mut parts).await?; 28 - 29 - let (mut stdout_tx, stdout_rx) = async_pipe()?; 30 - 31 - crate::util::pack_line(&mut stdout_tx, "# service=git-receive-pack\n").await?; 32 - crate::util::pack_flush(&mut stdout_tx).await?; 33 - 34 - let child = command 35 - .option_env("GIT_PROTOCOL", protocol.as_deref()) 36 - .stdout(stdout_tx.into_blocking_fd()?) 37 - .stderr(Stdio::piped()) 38 - .stdin(Stdio::null()) 39 - .spawn()?; 40 - 41 - tokio::task::spawn(crate::util::await_child(child)); 42 - 43 - let headers = [ 44 - (CONTENT_TYPE, RECEIVE_PACK_ADVERTISEMENT), 45 - (CONNECTION, KEEP_ALIVE), 46 - (CACHE_CONTROL, NO_CACHE), 47 - ]; 48 - 49 - Ok((headers, AsyncReadBody::new(stdout_rx))) 50 - } 51 - 52 - pub async fn receive_pack<S>( 53 - State(state): State<S>, 54 - protocol: Option<GitProtocol>, 55 - req: Request, 56 - ) -> Result<impl IntoResponse, S::Rejection> 57 - where 58 - S: GitServiceState, 59 - S::Rejection: From<std::io::Error>, 60 - { 61 - let (mut parts, body) = req.into_parts(); 62 - let mut command = state.init_receive_pack(&mut parts).await?; 63 - 64 - let mut child = command 65 - .option_env("GIT_PROTOCOL", protocol.as_deref()) 66 - .stdin(Stdio::piped()) 67 - .stdout(Stdio::piped()) 68 - .stderr(Stdio::piped()) 69 - .spawn()?; 70 - 71 - let stdin = child.stdin.take().expect("handle present"); 72 - let stdout = child.stdout.take().expect("handle present"); 73 - 74 - tokio::task::spawn(crate::util::stream_to_pipe( 75 - body.into_data_stream(), 76 - Sender::from_owned_fd(stdin.into_owned_fd()?)?, 77 - )); 78 - 79 - tokio::spawn(crate::util::await_child(child)); 80 - 81 - let headers = [(CONTENT_TYPE, RECEIVE_PACK_RESULT)]; 82 - Ok((headers, AsyncReadBody::new(stdout)).into_response()) 83 - }
-162
crates/git-service/src/state.rs
··· 1 - use axum::{http::request::Parts, response::IntoResponse}; 2 - use tokio::process::Command; 3 - 4 - const GIT_COMMAND: &str = "/usr/bin/git"; 5 - 6 - pub trait GitServiceState: Sized + Sync { 7 - type Rejection: IntoResponse; 8 - 9 - /// Maximum body size in bytes for requests. 10 - const MAX_REQUEST_BODY_SIZE: usize = 1024 * 1024; 11 - 12 - /// Initialise the [`Command`] to use for serving "$GIT_URL/git-upload-archive" 13 - /// requests. 14 - fn init_upload_archive( 15 - &self, 16 - parts: &mut Parts, 17 - ) -> impl Future<Output = Result<Command, Self::Rejection>> + Send; 18 - 19 - /// Initialise the [`Command`] to use for serving "$GIT_URL/info/refs?service=git-upload-pack" 20 - /// requests. 21 - fn init_upload_pack_advertisement( 22 - &self, 23 - parts: &mut Parts, 24 - ) -> impl Future<Output = Result<Command, Self::Rejection>> + Send; 25 - 26 - /// Initialise the [`Command`] to use for serving "$GIT_URL/git-upload-pack" 27 - /// requests. 28 - fn init_upload_pack( 29 - &self, 30 - parts: &mut Parts, 31 - ) -> impl Future<Output = Result<Command, Self::Rejection>> + Send; 32 - 33 - /// Initialise the [`Command`] to use for serving "$GIT_URL/info/refs?service=git-receive-pack" 34 - /// requests. 35 - fn init_receive_pack_advertisement( 36 - &self, 37 - parts: &mut Parts, 38 - ) -> impl Future<Output = Result<Command, Self::Rejection>> + Send; 39 - 40 - /// Initialise the [`Command`] to use for serving "$GIT_URL/git-receive-pack" 41 - /// requests. 42 - fn init_receive_pack( 43 - &self, 44 - parts: &mut Parts, 45 - ) -> impl Future<Output = Result<Command, Self::Rejection>> + Send; 46 - } 47 - 48 - mod path_impl { 49 - use super::GIT_COMMAND; 50 - use super::GitServiceState; 51 - use axum::extract::rejection::PathRejection; 52 - use axum::http::request::Parts; 53 - use axum::{ 54 - body::Body, 55 - http::{StatusCode, header::CONTENT_TYPE}, 56 - response::{IntoResponse, Response}, 57 - }; 58 - use std::path::PathBuf; 59 - use tokio::process::Command; 60 - 61 - pub struct Error(StatusCode, String); 62 - 63 - impl IntoResponse for Error { 64 - fn into_response(self) -> Response { 65 - let Self(status, error) = self; 66 - Response::builder() 67 - .status(status) 68 - .header(CONTENT_TYPE, "text/plain; charset=utf-8") 69 - .body(Body::from(error)) 70 - .expect("Failed to build body") 71 - } 72 - } 73 - 74 - pub struct NotFound<E: std::fmt::Display>(pub E); 75 - 76 - impl<E: std::fmt::Display> From<NotFound<E>> for Error { 77 - fn from(value: NotFound<E>) -> Self { 78 - Self(StatusCode::NOT_FOUND, value.0.to_string()) 79 - } 80 - } 81 - 82 - pub struct Forbidden<E: std::fmt::Display>(pub E); 83 - 84 - impl<E: std::fmt::Display> From<Forbidden<E>> for Error { 85 - fn from(value: Forbidden<E>) -> Self { 86 - Self(StatusCode::FORBIDDEN, value.0.to_string()) 87 - } 88 - } 89 - 90 - macro_rules! internal_error { 91 - ($typ:ty, $status:expr) => { 92 - impl From<$typ> for Error { 93 - fn from(value: $typ) -> Self { 94 - Self($status, value.to_string()) 95 - } 96 - } 97 - }; 98 - ($typ:ty) => { 99 - internal_error!($typ, StatusCode::INTERNAL_SERVER_ERROR); 100 - }; 101 - } 102 - 103 - internal_error!(std::io::Error); 104 - internal_error!(axum::Error); 105 - internal_error!(axum::extract::rejection::PathRejection); 106 - 107 - impl<P> GitServiceState for P 108 - where 109 - P: AsRef<std::path::Path> + Sync + 'static, 110 - { 111 - type Rejection = Error; 112 - 113 - async fn init_upload_archive(&self, parts: &mut Parts) -> Result<Command, Self::Rejection> { 114 - let repo_path = resolve_path(self.as_ref(), parts).await?; 115 - let command = crate::commands::upload_archive(GIT_COMMAND, repo_path); 116 - Ok(command) 117 - } 118 - 119 - async fn init_upload_pack_advertisement( 120 - &self, 121 - parts: &mut Parts, 122 - ) -> Result<Command, Self::Rejection> { 123 - let repo_path = resolve_path(self.as_ref(), parts).await?; 124 - let command = crate::commands::upload_pack_info_refs(GIT_COMMAND, repo_path); 125 - Ok(command) 126 - } 127 - 128 - async fn init_upload_pack(&self, parts: &mut Parts) -> Result<Command, Self::Rejection> { 129 - let repo_path = resolve_path(self.as_ref(), parts).await?; 130 - let command = crate::commands::upload_pack(GIT_COMMAND, repo_path); 131 - Ok(command) 132 - } 133 - 134 - async fn init_receive_pack_advertisement( 135 - &self, 136 - _: &mut Parts, 137 - ) -> Result<Command, Self::Rejection> { 138 - return Err(Forbidden("Push disabled"))?; 139 - } 140 - 141 - async fn init_receive_pack(&self, _: &mut Parts) -> Result<Command, Self::Rejection> { 142 - return Err(Forbidden("Push disabled"))?; 143 - } 144 - } 145 - 146 - pub async fn resolve_path( 147 - base: &std::path::Path, 148 - parts: &mut Parts, 149 - ) -> Result<PathBuf, PathRejection> { 150 - use axum::extract::FromRequestParts as _; 151 - use axum::extract::Path; 152 - 153 - let Path(path_parts) = Path::<Vec<String>>::from_request_parts(parts, &()).await?; 154 - 155 - let mut path = base.to_path_buf(); 156 - for part in path_parts { 157 - path.push(part); 158 - } 159 - 160 - Ok(path) 161 - } 162 - }
-46
crates/git-service/src/types.rs
··· 1 - use axum::{extract::OptionalFromRequestParts, http::request::Parts}; 2 - use serde::Deserialize; 3 - use std::{convert::Infallible, ops::Deref}; 4 - 5 - #[derive(Debug, Deserialize)] 6 - #[serde(rename_all = "kebab-case")] 7 - pub enum GitService { 8 - GitUploadPack, 9 - GitReceivePack, 10 - } 11 - 12 - /// Extract the "Git-Protocol" header from a request. 13 - #[derive(Debug)] 14 - pub struct GitProtocol(pub Box<str>); 15 - 16 - impl AsRef<str> for GitProtocol { 17 - #[inline] 18 - fn as_ref(&self) -> &str { 19 - &self.0 20 - } 21 - } 22 - 23 - impl Deref for GitProtocol { 24 - type Target = str; 25 - 26 - #[inline] 27 - fn deref(&self) -> &Self::Target { 28 - &self.0 29 - } 30 - } 31 - 32 - impl<S> OptionalFromRequestParts<S> for GitProtocol 33 - where 34 - S: Send + Sync, 35 - { 36 - type Rejection = Infallible; 37 - 38 - async fn from_request_parts(parts: &mut Parts, _: &S) -> Result<Option<Self>, Self::Rejection> { 39 - let header_value = parts 40 - .headers 41 - .get("Git-Protocol") 42 - .and_then(|header_value| header_value.to_str().ok()); 43 - 44 - Ok(header_value.map(|value| Self(value.into()))) 45 - } 46 - }
+22 -18
crates/git-service/src/upload_archive.rs crates/gordian-knot/src/public/git/upload_archive.rs
··· 8 8 use axum_extra::body::AsyncReadBody; 9 9 use tokio::{io::AsyncWriteExt as _, net::unix::pipe::Sender}; 10 10 11 - use crate::{state::GitServiceState, types::GitProtocol, util::SetOptionEnv as _}; 11 + use crate::{ 12 + command::{SetOptionEnv as _, TraceProcessCompletion as _}, 13 + extractors::{GitProtocol, request_id::RequestId}, 14 + model::{Knot, repository::TangledRepository}, 15 + }; 12 16 13 17 const UPLOAD_ARCHIVE_RESULT: &str = "application/x-git-upload-archive-result"; 14 18 15 - pub async fn upload_archive<S>( 16 - State(state): State<S>, 19 + pub async fn upload_archive( 20 + State(knot): State<Knot>, 17 21 protocol: Option<GitProtocol>, 18 - req: Request, 19 - ) -> Result<impl IntoResponse, S::Rejection> 20 - where 21 - S: GitServiceState, 22 - S::Rejection: From<std::io::Error>, 23 - S::Rejection: From<axum::Error>, 24 - { 25 - tracing::warn!(?req); 26 - let (mut parts, body) = req.into_parts(); 27 - let mut command = state.init_upload_archive(&mut parts).await?; 22 + request_id: Option<RequestId>, 23 + request: Request, 24 + ) -> Result<impl IntoResponse, super::Error> { 25 + let (mut parts, body) = request.into_parts(); 26 + let repository = TangledRepository::from_git_request(&mut parts, &knot).await?; 28 27 29 28 // The request body for git-upload-pack should be quite small, so just 30 29 // buffer it. 31 - let mut request_body = axum::body::to_bytes(body, S::MAX_REQUEST_BODY_SIZE) 30 + let mut request_body = axum::body::to_bytes(body, knot.max_git_request_size) 32 31 .await 33 32 .inspect_err(|error| tracing::error!(?error))?; 34 33 35 - let mut child = command 34 + let mut command: tokio::process::Command = repository.git().into(); 35 + command 36 36 .option_env("GIT_PROTOCOL", protocol.as_deref()) 37 + .option_env("X_REQUEST_ID", request_id) 38 + .args(["upload-archive"]) 39 + .arg(repository.path()) 37 40 .stdin(Stdio::piped()) 38 41 .stdout(Stdio::piped()) 39 - .stderr(Stdio::piped()) 40 - .spawn()?; 42 + .stderr(Stdio::piped()); 43 + 44 + let mut child = command.spawn()?; 41 45 42 46 let stdin = child.stdin.take().expect("handle present"); 43 47 let stdout = child.stdout.take().expect("handle present"); ··· 54 50 } 55 51 }); 56 52 57 - tokio::spawn(crate::util::await_child(child)); 53 + tokio::spawn(child.wait_with_completion_trace()); 58 54 59 55 let headers = [(CONTENT_TYPE, UPLOAD_ARCHIVE_RESULT)]; 60 56 Ok((headers, AsyncReadBody::new(stdout)).into_response())
+48 -34
crates/git-service/src/upload_pack.rs crates/gordian-knot/src/public/git/upload_pack.rs
··· 1 - use crate::{state::GitServiceState, types::GitProtocol, util::SetOptionEnv as _}; 2 1 use axum::{ 3 2 extract::{Request, State}, 4 3 http::header::{CACHE_CONTROL, CONNECTION, CONTENT_TYPE}, ··· 10 11 net::unix::pipe::{Sender, pipe as async_pipe}, 11 12 }; 12 13 14 + use crate::{ 15 + command::{SetOptionEnv as _, TraceProcessCompletion}, 16 + extractors::{GitProtocol, request_id::RequestId}, 17 + model::{Knot, repository::TangledRepository}, 18 + }; 19 + 13 20 const UPLOAD_PACK_ADVERTISEMENT: &str = "application/x-git-upload-pack-advertisement"; 14 21 const UPLOAD_PACK_RESULT: &str = "application/x-git-upload-pack-result"; 15 22 const NO_CACHE: &str = "no-cache, max-age=0, must-revalidate"; 16 23 const KEEP_ALIVE: &str = "keep-alive"; 17 24 18 25 /// Serve the "/info/refs?service=git-upload-pack" phase of a `git fetch` operation. 19 - pub async fn advertise_upload_pack<S>( 20 - State(state): State<S>, 26 + pub async fn advertise_upload_pack( 27 + State(knot): State<Knot>, 21 28 protocol: Option<GitProtocol>, 22 - req: Request, 23 - ) -> Result<impl IntoResponse, S::Rejection> 24 - where 25 - S: GitServiceState, 26 - S::Rejection: From<std::io::Error>, 27 - { 28 - let (mut parts, _) = req.into_parts(); 29 - let mut command = state.init_upload_pack_advertisement(&mut parts).await?; 29 + request_id: Option<RequestId>, 30 + request: Request, 31 + ) -> Result<impl IntoResponse, super::Error> { 32 + let (mut parts, _) = request.into_parts(); 33 + let repository = TangledRepository::from_git_request(&mut parts, &knot).await?; 30 34 31 35 let (mut stdout_tx, stdout_rx) = async_pipe()?; 32 - crate::util::pack_line(&mut stdout_tx, "# service=git-upload-pack\n").await?; 33 - crate::util::pack_flush(&mut stdout_tx).await?; 34 36 35 - let child = command 37 + super::pack_line(&mut stdout_tx, "# service=git-upload-pack\n").await?; 38 + super::pack_flush(&mut stdout_tx).await?; 39 + 40 + let mut command: tokio::process::Command = repository.git().into(); 41 + command 36 42 .option_env("GIT_PROTOCOL", protocol.as_deref()) 37 - .stdout(stdout_tx.into_blocking_fd()?) 38 - .stderr(Stdio::piped()) 43 + .option_env("X_REQUEST_ID", request_id) 44 + .args([ 45 + "upload-pack", 46 + "--http-backend-info-refs", 47 + "--stateless-rpc", 48 + "--strict", 49 + "--timeout=10", 50 + ]) 51 + .arg(repository.path()) 39 52 .stdin(Stdio::null()) 40 - .spawn()?; 53 + .stdout(stdout_tx.into_blocking_fd()?) 54 + .stderr(Stdio::piped()); 41 55 42 - tokio::task::spawn(crate::util::await_child(child)); 56 + let child = command.spawn()?; 57 + tokio::task::spawn(child.wait_with_completion_trace()); 43 58 44 59 let headers = [ 45 60 (CONTENT_TYPE, UPLOAD_PACK_ADVERTISEMENT), ··· 64 51 Ok((headers, AsyncReadBody::new(stdout_rx))) 65 52 } 66 53 67 - pub async fn upload_pack<S>( 68 - State(state): State<S>, 54 + pub async fn upload_pack( 55 + State(knot): State<Knot>, 69 56 protocol: Option<GitProtocol>, 70 - req: Request, 71 - ) -> Result<impl IntoResponse, S::Rejection> 72 - where 73 - S: GitServiceState, 74 - S::Rejection: From<std::io::Error>, 75 - S::Rejection: From<axum::Error>, 76 - { 77 - let (mut parts, body) = req.into_parts(); 78 - let mut command = state.init_upload_pack(&mut parts).await?; 57 + request_id: Option<RequestId>, 58 + request: Request, 59 + ) -> Result<impl IntoResponse, super::Error> { 60 + let (mut parts, body) = request.into_parts(); 61 + let repository = TangledRepository::from_git_request(&mut parts, &knot).await?; 79 62 80 63 // The request body for git-upload-pack should be quite small, so just 81 64 // buffer it. 82 - let mut request_body = axum::body::to_bytes(body, S::MAX_REQUEST_BODY_SIZE) 65 + let mut request_body = axum::body::to_bytes(body, knot.max_git_request_size) 83 66 .await 84 67 .inspect_err(|error| tracing::error!(?error))?; 85 68 86 - let mut child = command 69 + let mut command: tokio::process::Command = repository.git().into(); 70 + command 87 71 .option_env("GIT_PROTOCOL", protocol.as_deref()) 72 + .option_env("X_REQUEST_ID", request_id) 73 + .args(["upload-pack", "--strict", "--stateless-rpc"]) 74 + .arg(repository.path()) 88 75 .stdin(Stdio::piped()) 89 76 .stdout(Stdio::piped()) 90 - .stderr(Stdio::piped()) 91 - .spawn()?; 77 + .stderr(Stdio::piped()); 78 + 79 + let mut child = command.spawn()?; 92 80 93 81 let stdin = child.stdin.take().expect("handle present"); 94 82 let stdout = child.stdout.take().expect("handle present"); ··· 102 88 } 103 89 }); 104 90 105 - tokio::spawn(crate::util::await_child(child)); 91 + tokio::spawn(child.wait_with_completion_trace()); 106 92 107 93 let headers = [(CONTENT_TYPE, UPLOAD_PACK_RESULT)]; 108 94 Ok((headers, AsyncReadBody::new(stdout)).into_response())
+45 -60
crates/git-service/src/util.rs crates/gordian-knot/src/command.rs
··· 1 1 use std::ffi::OsStr; 2 2 3 - use axum::body::{BodyDataStream, HttpBody as _}; 4 - use bytes::BytesMut; 5 - use tokio::{ 6 - io::{AsyncWrite, AsyncWriteExt as _}, 7 - net::unix::pipe::Sender, 8 - }; 9 - use tokio_stream::StreamExt as _; 10 - 11 3 trait SetEnv { 12 4 fn env<K, V>(&mut self, key: K, val: V) -> &mut Self 13 5 where ··· 86 94 } 87 95 } 88 96 89 - pub async fn pack_line(mut w: impl AsyncWrite + Unpin, s: &str) -> std::io::Result<()> { 90 - let buffer = format!("{:04x}{s}", s.len() + 4); 91 - w.write_all(buffer.as_bytes()).await?; 92 - Ok(()) 97 + pub trait TraceProcessCompletion { 98 + fn wait_with_completion_trace(self) -> impl Future<Output = ()>; 93 99 } 94 100 95 - #[allow(unused)] 96 - pub async fn pack_message(mut w: impl AsyncWrite + Unpin, message: &str) -> std::io::Result<()> { 97 - let buffer = format!("{:04x}\u{2}{message}", message.len() + 5); 98 - w.write_all(buffer.as_bytes()).await?; 99 - Ok(()) 100 - } 101 + impl TraceProcessCompletion for tokio::process::Child { 102 + async fn wait_with_completion_trace(self) { 103 + use std::process::Output; 101 104 102 - pub async fn pack_flush(mut w: impl AsyncWrite + Unpin) -> std::io::Result<()> { 103 - w.write_all(b"0000").await?; 104 - Ok(()) 105 + match self.wait_with_output().await { 106 + Ok(Output { 107 + status, 108 + stdout: _, 109 + stderr, 110 + }) if status.success() => { 111 + if !stderr.is_empty() { 112 + let stderr = String::from_utf8_lossy(&stderr); 113 + tracing::warn!("subprocess finished with errors. stderr:\n{stderr}"); 114 + } 115 + } 116 + Ok(Output { 117 + status, 118 + stdout: _, 119 + stderr, 120 + }) => { 121 + let stderr = String::from_utf8_lossy(&stderr); 122 + tracing::error!( 123 + ?status, 124 + "subprocess finished with errors. stderr:\n{stderr}" 125 + ) 126 + } 127 + Err(error) => { 128 + tracing::error!(?error); 129 + } 130 + } 131 + } 105 132 } 106 133 107 134 pub async fn await_child(child: tokio::process::Child) -> bool { ··· 134 123 } 135 124 Ok(output) => { 136 125 let status = output.status; 137 - if let Ok(stderr) = std::str::from_utf8(&output.stderr) { tracing::error!( 138 - ?status, 139 - "error waiting for git child process. stderr:\n{stderr}" 140 - ) } else { tracing::error!( 141 - ?status, 142 - "error waiting for git child process. stderr:\n{:?}\n{}", 143 - &output.stderr, 144 - String::from_utf8_lossy(&output.stderr) 145 - ) } 126 + if let Ok(stderr) = std::str::from_utf8(&output.stderr) { 127 + tracing::error!( 128 + ?status, 129 + "error waiting for git child process. stderr:\n{stderr}" 130 + ) 131 + } else { 132 + tracing::error!( 133 + ?status, 134 + "error waiting for git child process. stderr:\n{:?}\n{}", 135 + &output.stderr, 136 + String::from_utf8_lossy(&output.stderr) 137 + ) 138 + } 146 139 false 147 140 } 148 141 Err(error) => { ··· 154 139 false 155 140 } 156 141 } 157 - } 158 - 159 - pub async fn stream_to_pipe( 160 - mut body_stream: BodyDataStream, 161 - mut pipe: Sender, 162 - ) -> std::result::Result<(), anyhow::Error> { 163 - let mut buffer = BytesMut::new(); 164 - let mut bytes_len = 0; 165 - loop { 166 - if !buffer.is_empty() { 167 - pipe.writable().await?; 168 - pipe.write_buf(&mut buffer).await?; 169 - } 170 - 171 - match body_stream.next().await { 172 - None if !buffer.is_empty() => continue, 173 - None => break, 174 - Some(Ok(chunk)) => { 175 - bytes_len += chunk.len(); 176 - buffer.extend_from_slice(&chunk); 177 - } 178 - Some(Err(error)) => { 179 - tracing::error!(?error, "error chunk"); 180 - break; 181 - } 182 - } 183 - } 184 - 185 - tracing::trace!(end = ?body_stream.is_end_stream(), ?bytes_len, "finished reading request body"); 186 - Ok(()) 187 142 }
-1
crates/gordian-knot/Cargo.toml
··· 14 14 gordian-identity.workspace = true 15 15 gordian-jetstream.workspace = true 16 16 gordian-lexicon.workspace = true 17 - git-service.workspace = true 18 17 19 18 anyhow.workspace = true 20 19 gix.workspace = true
+1 -3
crates/gordian-knot/src/cli/serve.rs
··· 135 135 let instance = format!("did:web:{name}").parse()?; 136 136 137 137 Ok(KnotConfiguration { 138 - owner, 139 - instance, 140 - repo_path, 141 138 git_config, 142 139 readmes: config::DEFAULT_READMES 143 140 .iter() ··· 145 148 idle: Duration::from_secs(repo_cache_idle), 146 149 live: Duration::from_secs(repo_cache_live), 147 150 }, 151 + ..KnotConfiguration::new(owner, instance, repo_path) 148 152 }) 149 153 } 150 154
+3
crates/gordian-knot/src/extractors.rs
··· 1 + mod git_protocol; 2 + pub use git_protocol::{GitProtocol, GitProtocolRejection}; 3 + 1 4 use core::fmt; 2 5 3 6 use axum::{
+48
crates/gordian-knot/src/extractors/git_protocol.rs
··· 1 + use std::ops; 2 + 3 + use axum::{extract::OptionalFromRequestParts, http::request::Parts, response::IntoResponse}; 4 + use reqwest::header::ToStrError; 5 + 6 + /// Extract the "Git-Protocol" header from a request. 7 + #[derive(Debug)] 8 + pub struct GitProtocol(pub String); 9 + 10 + impl GitProtocol { 11 + pub fn as_str(&self) -> &str { 12 + &self.0 13 + } 14 + } 15 + 16 + impl ops::Deref for GitProtocol { 17 + type Target = str; 18 + fn deref(&self) -> &Self::Target { 19 + &self.0 20 + } 21 + } 22 + 23 + impl<S> OptionalFromRequestParts<S> for GitProtocol 24 + where 25 + S: Send + Sync, 26 + { 27 + type Rejection = GitProtocolRejection; 28 + 29 + async fn from_request_parts(parts: &mut Parts, _: &S) -> Result<Option<Self>, Self::Rejection> { 30 + let header_value = parts 31 + .headers 32 + .get("Git-Protocol") 33 + .map(|header_value| header_value.to_str()) 34 + .transpose() 35 + .map_err(GitProtocolRejection)?; 36 + 37 + Ok(header_value.map(|value| Self(value.to_string()))) 38 + } 39 + } 40 + 41 + #[derive(Debug)] 42 + pub struct GitProtocolRejection(ToStrError); 43 + 44 + impl IntoResponse for GitProtocolRejection { 45 + fn into_response(self) -> axum::response::Response { 46 + todo!() 47 + } 48 + }
+1
crates/gordian-knot/src/lib.rs
··· 5 5 use tokio::{net::TcpListener, task::JoinSet}; 6 6 use tokio_util::sync::CancellationToken; 7 7 8 + pub mod command; 8 9 pub mod extractors; 9 10 pub mod model; 10 11 pub mod private;
+3 -142
crates/gordian-knot/src/model.rs
··· 6 6 pub mod repository; 7 7 8 8 use core::ops; 9 - use std::{borrow::Cow, ffi::OsString, net::SocketAddr, sync::Arc}; 9 + use std::{borrow::Cow, net::SocketAddr, sync::Arc}; 10 10 11 - use axum::{ 12 - extract::{FromRef, FromRequestParts, OptionalFromRequestParts}, 13 - http::request::Parts, 14 - }; 11 + use axum::extract::FromRef; 15 12 use futures_util::future::BoxFuture; 16 - use git_service::{state::GitServiceState, util::SetOptionEnv as _}; 17 13 use gordian_auth::jwt; 18 14 use gordian_identity::{HttpClient, Resolver}; 19 15 use gordian_lexicon::sh_tangled::knot::Member; 20 16 use gordian_types::Tid; 21 17 use time::OffsetDateTime; 22 - use tokio::process::Command; 23 18 24 19 use crate::{ 25 - extractors::request_id::RequestId, 26 - model::{config::KnotConfiguration, repository::TangledRepository}, 27 - private, 28 - public::git::{Error, GitAuthorization}, 20 + model::config::KnotConfiguration, 29 21 services::{ 30 22 authorization::{AuthorizationClaimsStore, AuthorizationClaimsStoreError}, 31 23 database::DataStore, ··· 117 125 now: i64, 118 126 ) -> BoxFuture<'_, Result<(), AuthorizationClaimsStoreError>> { 119 127 self.inner.store_claims(claims, now) 120 - } 121 - } 122 - 123 - impl GitServiceState for Knot { 124 - type Rejection = Error; 125 - 126 - async fn init_upload_archive(&self, parts: &mut Parts) -> Result<Command, Self::Rejection> { 127 - let request_id = RequestId::from_request_parts(parts, self).await.unwrap(); 128 - let repository = TangledRepository::from_git_request(parts, self).await?; 129 - let mut command = repository.git(); 130 - command 131 - .option_env("X_REQUEST_ID", request_id) 132 - .args(["upload-archive"]) 133 - .arg(repository.path()); 134 - 135 - Ok(command.into()) 136 - } 137 - 138 - async fn init_upload_pack_advertisement( 139 - &self, 140 - parts: &mut Parts, 141 - ) -> Result<tokio::process::Command, Self::Rejection> { 142 - let request_id = RequestId::from_request_parts(parts, self).await.unwrap(); 143 - let repository = TangledRepository::from_git_request(parts, self).await?; 144 - let mut command = repository.git(); 145 - command 146 - .option_env("X_REQUEST_ID", request_id) 147 - .args([ 148 - "upload-pack", 149 - "--http-backend-info-refs", 150 - "--stateless-rpc", 151 - "--strict", 152 - "--timeout=10", 153 - ]) 154 - .arg(repository.path()); 155 - 156 - Ok(command.into()) 157 - } 158 - 159 - async fn init_upload_pack( 160 - &self, 161 - parts: &mut Parts, 162 - ) -> Result<tokio::process::Command, Self::Rejection> { 163 - let request_id = RequestId::from_request_parts(parts, self).await.unwrap(); 164 - let repository = TangledRepository::from_git_request(parts, self).await?; 165 - let mut command = repository.git(); 166 - command 167 - .option_env("X_REQUEST_ID", request_id) 168 - .args(["upload-pack", "--strict", "--stateless-rpc"]) 169 - .arg(repository.path()); 170 - 171 - Ok(command.into()) 172 - } 173 - 174 - async fn init_receive_pack_advertisement( 175 - &self, 176 - parts: &mut Parts, 177 - ) -> Result<tokio::process::Command, Self::Rejection> { 178 - let GitAuthorization(auth) = GitAuthorization::from_request_parts(parts, self).await?; 179 - let request_id = RequestId::from_request_parts(parts, self).await.unwrap(); 180 - let repository = TangledRepository::from_git_request(parts, self).await?; 181 - 182 - if !self.can_push(repository.repository_key(), &auth.iss).await { 183 - tracing::error!(did = %auth.iss, "push denied"); 184 - return Err(Error::forbidden( 185 - self, 186 - format!( 187 - "'{}' does not have permission to push to this repository", 188 - auth.iss 189 - ), 190 - ))?; 191 - } 192 - 193 - let nonce_seed = self.generate_push_seed(repository.repository_key()); 194 - let mut command = repository.git(); 195 - command 196 - .env(private::ENV_USER_DID, auth.iss.as_str()) 197 - .option_env("X_REQUEST_ID", request_id) 198 - .args([ 199 - "-c", 200 - &nonce_seed, 201 - "receive-pack", 202 - "--http-backend-info-refs", 203 - "--stateless-rpc", 204 - ]) 205 - .arg(repository.path()); 206 - 207 - Ok(command.into()) 208 - } 209 - 210 - async fn init_receive_pack( 211 - &self, 212 - parts: &mut Parts, 213 - ) -> Result<tokio::process::Command, Self::Rejection> { 214 - let GitAuthorization(auth) = GitAuthorization::from_request_parts(parts, self).await?; 215 - let request_id = RequestId::from_request_parts(parts, self).await.unwrap(); 216 - let repository = TangledRepository::from_git_request(parts, self).await?; 217 - 218 - if !self.can_push(repository.repository_key(), &auth.iss).await { 219 - tracing::error!(did = %auth.iss, "push denied"); 220 - return Err(Error::forbidden( 221 - self, 222 - format!( 223 - "'{}' does not have permission to push to this repository", 224 - auth.iss 225 - ), 226 - ))?; 227 - } 228 - 229 - let allowed_signers_path = std::env::current_dir() 230 - .unwrap() 231 - .join("allowed_signers") 232 - .join(auth.iss.as_str()); 233 - 234 - let mut allowed_signers_option = OsString::with_capacity( 235 - "gpg.ssh.allowedSignersFile=".len() + allowed_signers_path.as_os_str().len(), 236 - ); 237 - allowed_signers_option.push("gpg.ssh.allowedSignersFile="); 238 - allowed_signers_option.push(&allowed_signers_path); 239 - 240 - let nonce_seed = self.generate_push_seed(repository.repository_key()); 241 - let mut command = repository.git(); 242 - command 243 - .env(private::ENV_USER_DID, auth.iss.as_str()) 244 - .option_env("X_REQUEST_ID", request_id) 245 - .args(["-c", &nonce_seed, "-c"]) 246 - .arg(&allowed_signers_option) 247 - .args(["receive-pack", "--stateless-rpc"]) 248 - .arg(repository.path()); 249 - 250 - Ok(command.into()) 251 128 } 252 129 }
+2
crates/gordian-knot/src/model/config.rs
··· 33 33 pub git_config: PathBuf, 34 34 pub readmes: FxHashSet<BString>, 35 35 pub repo_cache: RepoCacheConfig, 36 + pub max_git_request_size: usize, 36 37 } 37 38 38 39 #[derive(Default, Debug)] ··· 61 60 .map(|readme| BString::new(readme.to_vec())) 62 61 .collect(), 63 62 repo_cache: RepoCacheConfig::default(), 63 + max_git_request_size: 1024 * 1024, 64 64 } 65 65 } 66 66
+1 -1
crates/gordian-knot/src/model/repository.rs
··· 12 12 Json, 13 13 extract::{FromRef, FromRequestParts}, 14 14 }; 15 - use git_service::util::{SetOptionArg, SetOptionEnv as _}; 16 15 use gix::{ 17 16 ObjectId, 18 17 bstr::{BString, ByteSlice}, ··· 24 25 use serde::Deserialize; 25 26 26 27 use crate::{ 28 + command::{SetOptionArg as _, SetOptionEnv as _}, 27 29 model::{convert, errors, nicediff}, 28 30 public::xrpc::{XrpcError, XrpcQuery, XrpcResponse, XrpcResult}, 29 31 types::{
+1 -1
crates/gordian-knot/src/public.rs
··· 8 8 axum::Router::new() 9 9 .without_v07_checks() 10 10 .nest("/xrpc", xrpc::router()) 11 - .nest("/{owner}/{name}", git_service::router()) 11 + .nest("/{owner}/{name}", git::router()) 12 12 .route("/events", axum::routing::get(events::handler)) 13 13 }
+75
crates/gordian-knot/src/public/git.rs
··· 1 1 mod authorization; 2 2 mod error; 3 + mod receive_pack; 4 + mod upload_archive; 5 + mod upload_pack; 3 6 4 7 pub use authorization::GitAuthorization; 5 8 pub use error::{Error, NotFound}; 9 + 10 + use axum::{ 11 + extract::{Query, Request, State}, 12 + response::{IntoResponse as _, Response}, 13 + }; 14 + use tokio::io::{AsyncWrite, AsyncWriteExt as _}; 15 + 16 + use crate::{ 17 + extractors::{GitProtocol, request_id::RequestId}, 18 + model::Knot, 19 + }; 20 + 21 + pub fn router() -> axum::Router<Knot> { 22 + use axum::routing::{get, post}; 23 + axum::Router::new() 24 + .route("/info/refs", get(info_refs)) 25 + .route("/git-upload-archive", post(upload_archive::upload_archive)) 26 + .route("/git-upload-pack", post(upload_pack::upload_pack)) 27 + .route("/git-receive-pack", post(receive_pack::receive_pack)) 28 + } 29 + 30 + #[derive(Debug, serde::Deserialize)] 31 + #[serde(rename_all = "kebab-case")] 32 + pub enum GitService { 33 + GitUploadPack, 34 + GitReceivePack, 35 + } 36 + 37 + #[derive(Debug, serde::Deserialize)] 38 + pub struct InfoRefsQuery { 39 + pub service: GitService, 40 + } 41 + 42 + pub async fn info_refs( 43 + state: State<Knot>, 44 + Query(InfoRefsQuery { service }): Query<InfoRefsQuery>, 45 + protocol: Option<GitProtocol>, 46 + request_id: Option<RequestId>, 47 + request: Request, 48 + ) -> Response { 49 + tracing::info!("hello"); 50 + match service { 51 + GitService::GitUploadPack => { 52 + upload_pack::advertise_upload_pack(state, protocol, request_id, request) 53 + .await 54 + .into_response() 55 + } 56 + GitService::GitReceivePack => { 57 + receive_pack::advertise_receive_pack(state, protocol, request_id, request) 58 + .await 59 + .into_response() 60 + } 61 + } 62 + } 63 + 64 + pub async fn pack_line(mut w: impl AsyncWrite + Unpin, s: &str) -> std::io::Result<()> { 65 + let buffer = format!("{:04x}{s}", s.len() + 4); 66 + w.write_all(buffer.as_bytes()).await?; 67 + Ok(()) 68 + } 69 + 70 + #[allow(unused)] 71 + pub async fn pack_message(mut w: impl AsyncWrite + Unpin, message: &str) -> std::io::Result<()> { 72 + let buffer = format!("{:04x}\u{2}{message}", message.len() + 5); 73 + w.write_all(buffer.as_bytes()).await?; 74 + Ok(()) 75 + } 76 + 77 + pub async fn pack_flush(mut w: impl AsyncWrite + Unpin) -> std::io::Result<()> { 78 + w.write_all(b"0000").await?; 79 + Ok(()) 80 + }
+140
crates/gordian-knot/src/public/git/receive_pack.rs
··· 1 + use axum::{ 2 + extract::{FromRequestParts as _, Request, State}, 3 + http::header::{CACHE_CONTROL, CONNECTION, CONTENT_TYPE}, 4 + response::IntoResponse, 5 + }; 6 + use axum_extra::body::AsyncReadBody; 7 + use std::{ffi::OsString, process::Stdio}; 8 + use tokio::net::unix::pipe::{Sender, pipe as async_pipe}; 9 + 10 + use crate::{ 11 + command::{SetOptionEnv as _, TraceProcessCompletion as _}, 12 + extractors::{GitProtocol, request_id::RequestId}, 13 + model::{Knot, repository::TangledRepository}, 14 + private, 15 + public::git::GitAuthorization, 16 + services::git::stream_to_pipe, 17 + }; 18 + 19 + const RECEIVE_PACK_ADVERTISEMENT: &str = "application/x-git-receive-pack-advertisement"; 20 + const RECEIVE_PACK_RESULT: &str = "application/x-git-receive-pack-result"; 21 + 22 + const NO_CACHE: &str = "no-cache, max-age=0, must-revalidate"; 23 + const KEEP_ALIVE: &str = "keep-alive"; 24 + 25 + pub async fn advertise_receive_pack( 26 + State(knot): State<Knot>, 27 + protocol: Option<GitProtocol>, 28 + request_id: Option<RequestId>, 29 + request: Request, 30 + ) -> Result<impl IntoResponse, super::Error> { 31 + let (mut parts, _) = request.into_parts(); 32 + let repository = TangledRepository::from_git_request(&mut parts, &knot).await?; 33 + let GitAuthorization(auth) = GitAuthorization::from_request_parts(&mut parts, &knot).await?; 34 + 35 + if !knot.can_push(repository.repository_key(), &auth.iss).await { 36 + tracing::error!(did = %auth.iss, "push denied"); 37 + return Err(super::Error::forbidden( 38 + &knot, 39 + format!( 40 + "'{}' does not have permission to push to this repository", 41 + auth.iss 42 + ), 43 + ))?; 44 + } 45 + 46 + let nonce_seed = knot.generate_push_seed(repository.repository_key()); 47 + 48 + let (mut stdout_tx, stdout_rx) = async_pipe()?; 49 + super::pack_line(&mut stdout_tx, "# service=git-receive-pack\n").await?; 50 + super::pack_flush(&mut stdout_tx).await?; 51 + 52 + let mut command: tokio::process::Command = repository.git().into(); 53 + command 54 + .option_env("GIT_PROTOCOL", protocol.as_deref()) 55 + .option_env("X_REQUEST_ID", request_id) 56 + .env(private::ENV_USER_DID, auth.iss.as_str()) 57 + .args([ 58 + "-c", 59 + &nonce_seed, 60 + "receive-pack", 61 + "--http-backend-info-refs", 62 + "--stateless-rpc", 63 + ]) 64 + .arg(repository.path()) 65 + .stdin(Stdio::null()) 66 + .stdout(stdout_tx.into_blocking_fd()?) 67 + .stderr(Stdio::piped()); 68 + 69 + let child = command.spawn()?; 70 + tokio::task::spawn(child.wait_with_completion_trace()); 71 + 72 + let headers = [ 73 + (CONTENT_TYPE, RECEIVE_PACK_ADVERTISEMENT), 74 + (CONNECTION, KEEP_ALIVE), 75 + (CACHE_CONTROL, NO_CACHE), 76 + ]; 77 + 78 + Ok((headers, AsyncReadBody::new(stdout_rx))) 79 + } 80 + 81 + pub async fn receive_pack( 82 + State(knot): State<Knot>, 83 + protocol: Option<GitProtocol>, 84 + request_id: Option<RequestId>, 85 + request: Request, 86 + ) -> Result<impl IntoResponse, super::Error> { 87 + let (mut parts, body) = request.into_parts(); 88 + let repository = TangledRepository::from_git_request(&mut parts, &knot).await?; 89 + let GitAuthorization(auth) = GitAuthorization::from_request_parts(&mut parts, &knot).await?; 90 + 91 + if !knot.can_push(repository.repository_key(), &auth.iss).await { 92 + tracing::error!(did = %auth.iss, "push denied"); 93 + return Err(super::Error::forbidden( 94 + &knot, 95 + format!( 96 + "'{}' does not have permission to push to this repository", 97 + auth.iss 98 + ), 99 + ))?; 100 + } 101 + 102 + let nonce_seed = knot.generate_push_seed(repository.repository_key()); 103 + let allowed_signers_path = std::env::current_dir() 104 + .unwrap() 105 + .join("allowed_signers") 106 + .join(auth.iss.as_str()); 107 + 108 + let mut allowed_signers_option = OsString::with_capacity( 109 + "gpg.ssh.allowedSignersFile=".len() + allowed_signers_path.as_os_str().len(), 110 + ); 111 + allowed_signers_option.push("gpg.ssh.allowedSignersFile="); 112 + allowed_signers_option.push(&allowed_signers_path); 113 + 114 + let mut command: tokio::process::Command = repository.git().into(); 115 + command 116 + .option_env("GIT_PROTOCOL", protocol.as_deref()) 117 + .option_env("X_REQUEST_ID", request_id) 118 + .env(private::ENV_USER_DID, auth.iss.as_str()) 119 + .args(["-c", &nonce_seed, "-c"]) 120 + .arg(&allowed_signers_option) 121 + .args(["receive-pack", "--stateless-rpc"]) 122 + .arg(repository.path()) 123 + .stdin(Stdio::piped()) 124 + .stdout(Stdio::piped()) 125 + .stderr(Stdio::piped()); 126 + 127 + let mut child = command.spawn()?; 128 + let stdin = child.stdin.take().expect("handle present"); 129 + let stdout = child.stdout.take().expect("handle present"); 130 + 131 + tokio::task::spawn(stream_to_pipe( 132 + body.into_data_stream(), 133 + Sender::from_owned_fd(stdin.into_owned_fd()?)?, 134 + )); 135 + 136 + tokio::spawn(child.wait_with_completion_trace()); 137 + 138 + let headers = [(CONTENT_TYPE, RECEIVE_PACK_RESULT)]; 139 + Ok((headers, AsyncReadBody::new(stdout)).into_response()) 140 + }
+1 -1
crates/gordian-knot/src/public/xrpc/sh_tangled/repo/impl_archive.rs
··· 1 + use crate::command::SetOptionArg as _; 1 2 use crate::model::Knot; 2 3 use crate::model::errors; 3 4 use crate::model::repository::ResolvedRevspec; ··· 12 11 use axum::http::StatusCode; 13 12 use axum::response::IntoResponse; 14 13 use axum_extra::body::AsyncReadBody; 15 - use git_service::util::SetOptionArg as _; 16 14 use gordian_lexicon::sh_tangled::repo::archive::Format; 17 15 use gordian_lexicon::sh_tangled::repo::archive::Input; 18 16 use std::process::Stdio;
+1
crates/gordian-knot/src/services.rs
··· 1 1 pub mod atrepo; 2 2 pub mod authorization; 3 3 pub mod database; 4 + pub mod git; 4 5 pub mod jetstream; 5 6 pub mod rbac; 6 7 pub mod seed;
+34
crates/gordian-knot/src/services/git.rs
··· 1 + use std::io; 2 + 3 + use axum::body::{BodyDataStream, HttpBody as _}; 4 + use tokio::{io::AsyncWriteExt as _, net::unix::pipe::Sender}; 5 + 6 + pub async fn stream_to_pipe(mut body_stream: BodyDataStream, mut pipe: Sender) -> io::Result<()> { 7 + use tokio_stream::StreamExt as _; 8 + 9 + let mut buffer = bytes::BytesMut::new(); 10 + let mut bytes_len = 0; 11 + loop { 12 + if !buffer.is_empty() { 13 + pipe.writable().await?; 14 + pipe.write_buf(&mut buffer).await?; 15 + } 16 + 17 + match body_stream.next().await { 18 + None if !buffer.is_empty() => continue, 19 + None => break, 20 + Some(Ok(chunk)) => { 21 + bytes_len += chunk.len(); 22 + buffer.extend_from_slice(&chunk); 23 + } 24 + Some(Err(error)) => { 25 + tracing::error!(?error, "error chunk"); 26 + return Err(io::Error::new(io::ErrorKind::BrokenPipe, error)); 27 + } 28 + } 29 + } 30 + 31 + tracing::trace!(end = ?body_stream.is_end_stream(), ?bytes_len, "finished reading request body"); 32 + 33 + Ok(()) 34 + }