···11+pub const DISALLOWED_TLDS: &[&str] = &[22+ "alt",33+ "arpa",44+ "example",55+ "internal",66+ "invalid",77+ "local",88+ "localhost",99+ "onion",1010+ #[cfg(not(test))]1111+ "test",1212+];1313+1414+#[derive(Debug, thiserror::Error)]1515+#[error("Handle is invalid")]1616+pub struct InvalidHandle;1717+1818+pub fn validate_handle(handle: &str) -> Result<(), InvalidHandle> {1919+ if handle.is_empty() || handle.len() > 253 {2020+ return Err(InvalidHandle);2121+ }2222+2323+ let mut last = "";2424+ let mut segments = 0;2525+ for segment in handle.split('.') {2626+ segments += 1;2727+ if segment.is_empty()2828+ || segment.len() > 632929+ || segment.starts_with('-')3030+ || segment.ends_with('-')3131+ || !segment3232+ .chars()3333+ .all(|c| c.is_ascii_alphanumeric() || c == '-')3434+ {3535+ return Err(InvalidHandle);3636+ }3737+3838+ last = segment;3939+ }4040+4141+ if segments < 2 {4242+ return Err(InvalidHandle);4343+ }4444+4545+ if DISALLOWED_TLDS.contains(&last) || last.starts_with(|c: char| c.is_ascii_digit()) {4646+ return Err(InvalidHandle);4747+ }4848+4949+ Ok(())5050+}5151+5252+#[cfg(test)]5353+mod tests {5454+ use super::validate_handle;5555+5656+ #[test]5757+ fn accept_valid() {5858+ for handle in [5959+ "jay.bsky.social",6060+ "8.cn",6161+ "name.t--t",6262+ "XX.LCS.MIT.EDU",6363+ "a.co",6464+ "xn--notarealidn.com",6565+ "xn--fiqa61au8b7zsevnm8ak20mc4a87e.xn--fiqs8s",6666+ "xn--ls8h.test",6767+ "example.t",6868+ ] {6969+ assert!(validate_handle(handle).is_ok(), "{handle} is invalid");7070+ }7171+ }7272+7373+ #[test]7474+ fn reject_invalid_syntax() {7575+ for handle in [7676+ "jo@hn.test",7777+ "💩.test",7878+ "john..test",7979+ "xn--bcher-.tld",8080+ "john.0",8181+ "cn.8",8282+ "www.masełkowski.pl.com",8383+ "org",8484+ "name.org.",8585+ ] {8686+ assert!(validate_handle(handle).is_err(), "{handle} is valid");8787+ }8888+ }8989+9090+ #[test]9191+ fn reject_restricted() {9292+ for handle in [9393+ "2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion",9494+ "laptop.local",9595+ "blah.arpa",9696+ ] {9797+ assert!(validate_handle(handle).is_err(), "{handle} is valid");9898+ }9999+ }100100+}
+71
crates/identity/src/lib.rs
···11+mod did;22+mod document;33+pub mod handles;44+mod resolvers;55+66+pub use did::Did;77+pub use document::*;88+99+use std::sync::Arc;1010+1111+pub const DEFAULT_PLC: &str = "https://plc.directory";1212+1313+#[async_trait::async_trait]1414+trait HandleResolve: std::fmt::Debug {1515+ /// Resolve a handle to a DID.1616+ ///1717+ /// Related: <https://docs.bsky.app/docs/api/com-atproto-identity-resolve-handle>1818+ async fn resolve_handle(&self, handle: &str) -> Result<Did, ResolveError>;1919+2020+ /// Resolve a DID to DID document.2121+ ///2222+ /// Related: <https://docs.bsky.app/docs/api/com-atproto-identity-resolve-did>2323+ async fn resolve_did(&self, did: &Did) -> Result<DidDocument, ResolveError>;2424+}2525+2626+#[derive(Debug, thiserror::Error)]2727+#[error("Failed to resolve something")]2828+pub struct ResolveError;2929+3030+impl ResolveError {3131+ pub fn not_found() -> Self {3232+ Self3333+ }3434+}3535+3636+#[derive(Clone, Debug)]3737+pub struct Resolver {3838+ inner: Arc<dyn HandleResolve + Sync + Send + 'static>,3939+}4040+4141+impl Resolver {4242+ /// Resolves an ATproto handle to a DID document.4343+ ///4444+ /// The resolved DID is always bidirectionally confirmed.4545+ pub async fn resolve_handle(&self, handle: &str) -> Result<(Did, DidDocument), ResolveError> {4646+ let handle = handle.trim_start_matches('@');4747+ let did = self.inner.resolve_handle(handle).await?;4848+ let doc = self.inner.resolve_did(&did).await?;4949+ Ok((did, doc))5050+ }5151+5252+ #[inline]5353+ pub async fn resolve_did(&self, handle: &str) -> Result<Did, ResolveError> {5454+ let handle = handle.trim_start_matches('@');5555+ self.inner.resolve_handle(handle).await5656+ }5757+5858+ #[inline]5959+ pub async fn resolve_identity(&self, did: &Did) -> Result<DidDocument, ResolveError> {6060+ self.inner.resolve_did(did).await6161+ }6262+}6363+6464+impl Default for Resolver {6565+ fn default() -> Self {6666+ let inner = resolvers::MemcachedResolver::default();6767+ Self {6868+ inner: Arc::new(inner),6969+ }7070+ }7171+}
+18
crates/identity/src/main.rs
···11+use identity::Resolver;22+33+#[tokio::main(flavor = "current_thread")]44+async fn main() {55+ tracing_subscriber::fmt::init();66+77+ let resolver = Resolver::default();88+ for handle in std::env::args().skip(1) {99+ match resolver.resolve_handle(&handle).await {1010+ Ok((_, document)) => {1111+ let pretty = serde_json::to_string_pretty(&document)1212+ .expect("Failed to serialize deserialized document");1313+ println!("{handle}: {pretty}");1414+ }1515+ Err(error) => eprintln!("{handle}: {error}"),1616+ };1717+ }1818+}
+232
crates/identity/src/resolvers.rs
···11+use crate::{DEFAULT_PLC, Did, ResolveError, document::DidDocument};22+pub use hickory_resolver::name_server::ConnectionProvider;33+use hickory_resolver::{44+ ResolveError as DnsResolveError, Resolver as DnsClient, TokioResolver,55+ name_server::TokioConnectionProvider,66+};77+use moka::sync::{Cache, CacheBuilder};88+use reqwest::Client as HttpClient;99+use std::{borrow::Cow, str::FromStr, time::Duration};1010+use tokio::time::Instant;1111+1212+#[derive(Debug)]1313+pub struct MemcachedResolver {1414+ did_cache: Cache<Box<str>, Did>,1515+ doc_cache: Cache<Did, DidDocument>,1616+ inner: DirectResolver<'static, TokioConnectionProvider>, //1717+}1818+1919+#[async_trait::async_trait]2020+impl super::HandleResolve for MemcachedResolver {2121+ // async fn resolve(&self, handle: &str) -> Result<Arc<(Did, DidDocument)>, ResolveError> {2222+ // if let Some(cached) = self.did_cache.get(handle) {2323+ // tracing::debug!(?handle, "reusing resolved did & document from cache");2424+ // return Ok(cached);2525+ // }2626+2727+ // let resolved = Arc::new(self.inner.resolve(handle).await?);2828+ // self.cache.insert(handle.into(), Arc::clone(&resolved));2929+3030+ // Ok(resolved)3131+ // }3232+3333+ async fn resolve_handle(&self, handle: &str) -> Result<Did, ResolveError> {3434+ if let Some(did) = self.did_cache.get(handle) {3535+ tracing::debug!(?handle, ?did, "reusing resolved did from cache");3636+ return Ok(did);3737+ }3838+3939+ let (did, doc) = self.inner.resolve(handle).await?;4040+ self.did_cache.insert(handle.into(), did.clone());4141+ self.doc_cache.insert(did.clone(), doc);4242+4343+ Ok(did.clone())4444+ }4545+4646+ async fn resolve_did(&self, did: &Did) -> Result<DidDocument, ResolveError> {4747+ if let Some(doc) = self.doc_cache.get(did) {4848+ return Ok(doc);4949+ }5050+5151+ let doc = self.inner.fetch_did_document(did).await?;5252+5353+ // @TODO Verify handle(s) in DID Document resolve to the DID.5454+5555+ self.doc_cache.insert(did.clone(), doc.clone());5656+ Ok(doc)5757+ }5858+}5959+6060+impl Default for MemcachedResolver {6161+ fn default() -> Self {6262+ let did_cache = CacheBuilder::new(1000)6363+ .time_to_live(Duration::from_secs(600))6464+ .build();6565+6666+ let doc_cache = CacheBuilder::new(1000)6767+ .time_to_live(Duration::from_secs(600))6868+ .build();6969+7070+ Self {7171+ did_cache,7272+ doc_cache,7373+ inner: DirectResolver::default(),7474+ }7575+ }7676+}7777+7878+pub struct DirectResolver<'plc, R: ConnectionProvider> {7979+ directory: Cow<'plc, str>,8080+ dns: DnsClient<R>,8181+ http: HttpClient,8282+}8383+8484+impl<'a, R: ConnectionProvider> std::fmt::Debug for DirectResolver<'a, R> {8585+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {8686+ f.debug_struct("DirectResolver")8787+ .field("directory", &self.directory)8888+ .finish_non_exhaustive()8989+ }9090+}9191+9292+impl<'plc, R: ConnectionProvider> DirectResolver<'plc, R> {9393+ pub async fn resolve(&self, handle: &str) -> Result<(Did, DidDocument), ResolveError> {9494+ let did = self.resolve_handle(handle).await?;9595+ let Ok(document) = self.fetch_did_document(&did).await else {9696+ return Err(ResolveError);9797+ };9898+9999+ // Verify the document has a matching handle.100100+ for alias in &document.also_known_as {101101+ if alias.host_str().is_some_and(|host| host == handle) {102102+ return Ok((did, document));103103+ }104104+ }105105+106106+ Err(ResolveError)107107+ }108108+109109+ /// Resolves an ATproto handle to a DID.110110+ ///111111+ /// Resolution is attempted via DNS and HTTP .well-known methods. The *first*112112+ /// method to return a valid result is used.113113+ pub async fn resolve_handle(&self, handle: &str) -> Result<Did, ResolveError> {114114+ let dns = resolve_handle_dns(&self.dns, handle);115115+ let http = resolve_handle_http(&self.http, handle);116116+117117+ let start = Instant::now();118118+ tokio::select! {119119+ Ok(Some(did)) = dns => {120120+ tracing::trace!(?handle, %did, elapsed = ?start.elapsed(), "resolved via dns");121121+ Ok(did)122122+ },123123+ Ok(Some(did)) = http => {124124+ tracing::trace!(?handle, %did, elapsed = ?start.elapsed(), "resolved via http");125125+ Ok(did)126126+ }127127+ else => Err(ResolveError::not_found()),128128+ }129129+ }130130+131131+ /// Fetches and parses the DID document.132132+ pub async fn fetch_did_document(&self, did: &Did) -> Result<DidDocument, ResolveError> {133133+ match did.method() {134134+ "plc" => self135135+ .fetch_plc_did_document(did)136136+ .await137137+ .map_err(|_| ResolveError),138138+ "web" => self139139+ .fetch_web_did_document(did)140140+ .await141141+ .map_err(|_| ResolveError),142142+ _ => unimplemented!(),143143+ }144144+ }145145+146146+ pub async fn fetch_plc_did_document(&self, did: &Did) -> Result<DidDocument, reqwest::Error> {147147+ self.http148148+ .get(format!("{}/{did}", self.directory))149149+ .send()150150+ .await?151151+ .error_for_status()?152152+ .json()153153+ .await154154+ }155155+156156+ pub async fn fetch_web_did_document(&self, did: &Did) -> Result<DidDocument, reqwest::Error> {157157+ self.http158158+ .get(format!("https://{}/.well-known/did.json", did.ident()))159159+ .send()160160+ .await?161161+ .error_for_status()?162162+ .json()163163+ .await164164+ }165165+}166166+167167+impl Default for DirectResolver<'static, TokioConnectionProvider> {168168+ fn default() -> Self {169169+ Self {170170+ directory: Cow::Borrowed(DEFAULT_PLC),171171+ dns: TokioResolver::builder_tokio()172172+ .expect("Failed to build default DNS resolver")173173+ .build(),174174+ http: HttpClient::new(),175175+ }176176+ }177177+}178178+179179+pub async fn resolve_handle_dns<R>(180180+ client: &DnsClient<R>,181181+ handle: &str,182182+) -> Result<Option<Did>, DnsResolveError>183183+where184184+ R: ConnectionProvider,185185+{186186+ let mut resolved_did = None;187187+ let txt_lookup = client.txt_lookup(format!("_atproto.{handle}.")).await?;188188+ for record in txt_lookup.iter() {189189+ for txt_data in record.txt_data() {190190+ let Ok(txt) = std::str::from_utf8(txt_data) else {191191+ continue;192192+ };193193+ let Some(txt_did) = txt.strip_prefix("did=") else {194194+ continue;195195+ };196196+ let Ok(did) = Did::from_str(txt_did) else {197197+ continue;198198+ };199199+200200+ if let Some(old_did) = resolved_did.replace(did.clone())201201+ && old_did != did202202+ {203203+ tracing::error!(204204+ ?handle,205205+ ?did,206206+ ?old_did,207207+ "multiple conflicting DIDs found for handle"208208+ );209209+ // @TODO Replace this with an error so we can retry with a210210+ // recursive dns resolver.211211+ return Ok(None);212212+ }213213+ }214214+ }215215+216216+ Ok(resolved_did)217217+}218218+219219+pub async fn resolve_handle_http(220220+ client: &HttpClient,221221+ handle: &str,222222+) -> Result<Option<Did>, reqwest::Error> {223223+ let response = client224224+ .get(format!("https://{handle}/.well-known/atproto-did"))225225+ .send()226226+ .await?227227+ .error_for_status()?228228+ .text()229229+ .await?;230230+231231+ Ok(Did::from_str(&response).ok())232232+}
+35
crates/knot/Cargo.toml
···11+[package]22+name = "knot"33+version.workspace = true44+edition.workspace = true55+authors.workspace = true66+publish.workspace = true77+88+[dependencies]99+anyhow = "1.0.99"1010+axum = { workspace = true, features = ["ws"] }1111+axum-extra = { version = "0.10.1", features = ["async-read-body"] }1212+bytes = "1.10.1"1313+clap = { version = "4.5.47", features = ["derive", "env", "string"] }1414+data-encoding = "2.9.0"1515+gix = { version = "0.73.0", features = ["max-performance"] }1616+hyper-util = { version = "0.1.17", features = ["client"] }1717+identity.workspace = true1818+reqwest = { version = "0.12.23", features = ["json"] }1919+rustc-hash = "2.1.1"2020+serde = { workspace = true, features = ["derive", "rc"] }2121+serde_json = { version = "1.0.145" }2222+thiserror = "2.0.16"2323+time = { version = "0.3.43", features = ["formatting", "macros", "parsing", "serde"] }2424+tokio = { version = "1.47.1", features = ["io-util", "macros", "net", "process", "signal", "rt-multi-thread"] }2525+tower = "0.5.2"2626+tower-http = { version = "0.6.6", features = ["decompression-gzip", "request-id", "trace", "tracing"] }2727+tracing.workspace = true2828+tracing-journald = "0.3.1"2929+tracing-subscriber = "0.3.20"3030+url = { version = "2.5.7", features = ["serde"] }3131+xrpc.workspace = true3232+3333+[[bin]]3434+name = "gordian-knot"3535+path = "src/main.rs"
+68
crates/knot/src/cli.rs
···11+use clap::Parser;22+use identity::{Did, ResolveError, Resolver};33+use std::{path::PathBuf, str::FromStr};44+55+#[derive(Debug, Parser)]66+pub struct Arguments {77+ /// Address(s) to bind the public API.88+ #[arg(99+ long,1010+ short,1111+ env = "KNOT_SERVER_ADDR",1212+ default_value = "localhost:5556"1313+ )]1414+ pub addr: Vec<String>,1515+1616+ /// DID or handle of the knot owner.1717+ #[arg(long, env = "KNOT_SERVER_OWNER")]1818+ pub owner: HandleOrDid,1919+2020+ /// Base directory to serve repositories from.2121+ #[arg(long, env = "KNOT_SERVER_BASE", default_value = default_repository_base().into_os_string())]2222+ pub repos: PathBuf,2323+2424+ /// Port number for the real knotserver.2525+ #[arg(long, env = "KNOT_SERVER_UPSTREAM")]2626+ pub upstream_port: Option<u16>,2727+}2828+2929+fn default_repository_base() -> PathBuf {3030+ std::env::current_dir().unwrap_or_default()3131+}3232+3333+pub fn parse() -> Arguments {3434+ Arguments::parse()3535+}3636+3737+#[derive(Clone, Debug)]3838+pub enum HandleOrDid {3939+ Did(Did),4040+ Handle(String),4141+}4242+4343+impl HandleOrDid {4444+ pub async fn into_did(self, resolver: &Resolver) -> Result<Did, ResolveError> {4545+ match self {4646+ Self::Did(did) => Ok(did),4747+ Self::Handle(handle) => {4848+ let did = resolver.resolve_did(&handle).await?;4949+ tracing::info!(?handle, ?did, "resolved owner");5050+ Ok(did)5151+ }5252+ }5353+ }5454+}5555+5656+impl FromStr for HandleOrDid {5757+ type Err = anyhow::Error;5858+5959+ fn from_str(s: &str) -> Result<Self, Self::Err> {6060+ if let Ok(did) = Did::from_str(s) {6161+ return Ok(Self::Did(did));6262+ }6363+6464+ // @TODO Check the handle is valid before trying to resolve it.6565+ let maybe_handle = s.trim_start_matches('@');6666+ Ok(Self::Handle(maybe_handle.to_owned()))6767+ }6868+}
+17
crates/knot/src/convert.rs
···11+//!22+//! Utility conversions.33+//!44+use gix::date::Time;55+use time::OffsetDateTime;66+use time::UtcOffset;77+use time::error::ComponentRange;88+99+/// Convert a [`gix::date::Time`] to a [`time::OffsetDateTime`].1010+///1111+#[allow(unused)]1212+pub fn time_to_offsetdatetime(time: &Time) -> Result<OffsetDateTime, ComponentRange> {1313+ let odt = OffsetDateTime::from_unix_timestamp(time.seconds)?1414+ .to_offset(UtcOffset::from_whole_seconds(time.offset)?);1515+1616+ Ok(odt)1717+}
+125
crates/knot/src/hex_object_id.rs
···11+use data_encoding::Encoding;22+use data_encoding::HEXLOWER_PERMISSIVE;33+use serde::Deserialize;44+use serde::Deserializer;55+use serde::Serializer;66+use serde::de::Error;77+88+const ENCODING: Encoding = HEXLOWER_PERMISSIVE;99+1010+pub fn serialize<S>(bytes: &[u8], serializer: S) -> Result<S::Ok, S::Error>1111+where1212+ S: Serializer,1313+{1414+ let encoded = ENCODING.encode(bytes);1515+ serializer.serialize_str(&encoded)1616+}1717+1818+pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>1919+where2020+ D: Deserializer<'de>,2121+{2222+ let encoded = <&[u8] as Deserialize>::deserialize(deserializer)?;2323+ let decoded = ENCODING.decode(encoded).map_err(Error::custom)?;2424+ Ok(decoded)2525+}2626+2727+pub mod vec {2828+ use super::ENCODING;2929+ use serde::Deserializer;3030+ use serde::Serializer;3131+ use serde::de::Visitor;3232+ use serde::ser::SerializeSeq;3333+3434+ pub fn serialize<S>(hashes: &[Vec<u8>], serializer: S) -> Result<S::Ok, S::Error>3535+ where3636+ S: Serializer,3737+ {3838+ let mut seq = serializer.serialize_seq(Some(hashes.len()))?;3939+ for hash in hashes {4040+ let encoded = ENCODING.encode(hash);4141+ seq.serialize_element(&encoded)?;4242+ }4343+ seq.end()4444+ }4545+4646+ struct VecVisitor;4747+4848+ impl<'de> Visitor<'de> for VecVisitor {4949+ type Value = Vec<Vec<u8>>;5050+5151+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {5252+ formatter.write_str("an sequence of commit hashes")5353+ }5454+5555+ fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>5656+ where5757+ A: serde::de::SeqAccess<'de>,5858+ {5959+ while let Some(value) = seq.next_element::<&str>()? {6060+ eprintln!("{value}");6161+ //6262+ }6363+6464+ Ok(Vec::new())6565+ }6666+ }6767+6868+ pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<Vec<u8>>, D::Error>6969+ where7070+ D: Deserializer<'de>,7171+ {7272+ deserializer.deserialize_seq(VecVisitor)7373+ }7474+}7575+7676+// pub mod smallvec {7777+// use super::super::ObjectId;7878+// use super::ENCODING;7979+// use serde::Deserializer;8080+// use serde::Serializer;8181+// use serde::de::Visitor;8282+// use serde::ser::SerializeSeq;8383+// use smallvec::SmallVec;8484+8585+// pub fn serialize<S>(hashes: &[ObjectId], serializer: S) -> Result<S::Ok, S::Error>8686+// where8787+// S: Serializer,8888+// {8989+// let mut seq = serializer.serialize_seq(Some(hashes.len()))?;9090+// for hash in hashes {9191+// let encoded = ENCODING.encode(hash);9292+// seq.serialize_element(&encoded)?;9393+// }9494+// seq.end()9595+// }9696+9797+// struct VecVisitor;9898+9999+// impl<'de> Visitor<'de> for VecVisitor {100100+// type Value = SmallVec<[ObjectId; 1]>;101101+102102+// fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {103103+// formatter.write_str("an sequence of commit hashes")104104+// }105105+106106+// fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>107107+// where108108+// A: serde::de::SeqAccess<'de>,109109+// {110110+// while let Some(value) = seq.next_element::<&str>()? {111111+// eprintln!("{value}");112112+// //113113+// }114114+115115+// Ok(SmallVec::new())116116+// }117117+// }118118+119119+// pub fn deserialize<'de, D>(deserializer: D) -> Result<SmallVec<[ObjectId; 1]>, D::Error>120120+// where121121+// D: Deserializer<'de>,122122+// {123123+// deserializer.deserialize_seq(VecVisitor)124124+// }125125+// }
+10
crates/knot/src/lib.rs
···11+pub(crate) mod convert;22+pub mod model;33+pub mod public;44+pub mod types;55+66+mod objectid;77+pub use objectid::ObjectId;88+99+mod repoid;1010+pub use repoid::RepoId;
+153
crates/knot/src/main.rs
···11+use axum::{22+ Router,33+ extract::Query,44+ http::{HeaderName, Request, Response, Uri},55+ response::IntoResponse,66+};77+use hyper_util::{88+ client::legacy::{Client, connect::HttpConnector},99+ rt::TokioExecutor,1010+};1111+use identity::Resolver;1212+use knot::{RepoId, model::Knot};1313+use reqwest::StatusCode;1414+use serde::Deserialize;1515+use std::{1616+ net::{SocketAddr, ToSocketAddrs},1717+ time::Duration,1818+};1919+use tokio::runtime::Builder;2020+use tokio::{net::TcpListener, task::JoinSet};2121+use tower_http::{2222+ decompression::RequestDecompressionLayer, request_id::PropagateRequestIdLayer,2323+ trace::TraceLayer,2424+};2525+use tracing::{Span, field::Empty};2626+use url::Url;2727+2828+mod cli;2929+3030+fn main() {3131+ tracing_subscriber::fmt::init();3232+3333+ let arguments = cli::parse();3434+ tracing::debug!(?arguments);3535+3636+ Builder::new_multi_thread()3737+ .enable_all()3838+ .build()3939+ .expect("Failed to build runtime")4040+ .block_on(run(arguments))4141+ .unwrap();4242+}4343+4444+const REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");4545+4646+#[derive(Deserialize)]4747+struct Repo {4848+ repo: RepoId,4949+}5050+5151+fn extract_request_id<B>(request: &Request<B>) -> Option<&str> {5252+ request5353+ .headers()5454+ .get(&REQUEST_ID)5555+ .and_then(|hv| std::str::from_utf8(hv.as_bytes()).ok())5656+}5757+5858+pub async fn run(arguments: cli::Arguments) -> anyhow::Result<()> {5959+ let resolver = Resolver::default();6060+ let owner = arguments6161+ .owner6262+ .into_did(&resolver)6363+ .await6464+ .expect("Failed to resolve owner handle");6565+6666+ let mut router = Router::new()6767+ .without_v07_checks()6868+ .merge(knot::public::router());6969+7070+ if let Some(upstream_port) = arguments.upstream_port {7171+ let url = Url::parse(&format!("http://localhost:{upstream_port}"))7272+ .expect("localhost URL should be valid");7373+ let client = Client::builder(TokioExecutor::new()).build(HttpConnector::new());7474+ router = router.fallback(async move |mut req: axum::extract::Request| -> Result<axum::response::Response, StatusCode>{7575+ // Re-write the request URI.7676+ let mut uri = url.clone();7777+ uri.set_path(req.uri().path());7878+ uri.set_query(req.uri().query());7979+ *req.uri_mut() = Uri::try_from(uri.as_str()).unwrap();8080+8181+ tracing::info!(%uri, "forwarding request to upstream");8282+8383+ Ok(client8484+ .request(req)8585+ .await8686+ .map_err(|_| StatusCode::BAD_GATEWAY)?8787+ .into_response())8888+ });8989+ }9090+9191+ let router = router9292+ .layer(RequestDecompressionLayer::new())9393+ .layer(PropagateRequestIdLayer::x_request_id())9494+ .layer(9595+ TraceLayer::new_for_http()9696+ .make_span_with(|request: &Request<_>| {9797+ let method = request.method();9898+ let uri = request.uri();9999+ let path = uri.path();100100+101101+ let span =102102+ tracing::info_span!("public", id = Empty, ?method, ?path, repo = Empty);103103+104104+ if let Some(request_id) = extract_request_id(request) {105105+ span.record("id", request_id);106106+ }107107+108108+ if let Ok(Query(Repo { repo })) = Query::try_from_uri(uri) {109109+ span.record("repo", format!("{}/{}", repo.owner(), repo.name()));110110+ }111111+112112+ span113113+ })114114+ .on_request(|_: &Request<_>, _: &Span| {})115115+ .on_response(|response: &Response<_>, latency: Duration, _: &Span| {116116+ tracing::info!(?latency, status = ?response.status());117117+ }),118118+ )119119+ .with_state(Knot::new(owner, arguments.repos));120120+121121+ let mut sockets = Vec::with_capacity(arguments.addr.len());122122+ for addr in &arguments.addr {123123+ for socket in addr.to_socket_addrs()? {124124+ sockets.push(socket);125125+ }126126+ }127127+128128+ let mut service = JoinSet::new();129129+ for socket in sockets {130130+ serve(&mut service, socket, router.clone()).await;131131+ }132132+133133+ for task in service.join_all().await {134134+ if let Err(error) = task {135135+ tracing::error!(?error, "failed to join axum::serve task");136136+ }137137+ }138138+139139+ Ok(())140140+}141141+142142+pub async fn serve(set: &mut JoinSet<std::io::Result<()>>, socket: SocketAddr, router: Router) {143143+ let listener = TcpListener::bind(socket)144144+ .await145145+ .expect("Failed to bind socket");146146+147147+ let addr = listener148148+ .local_addr()149149+ .expect("Failed to acquire local socket address");150150+151151+ tracing::info!(?addr, "listening on socket");152152+ set.spawn(async move { axum::serve(listener, router).await });153153+}
+508
crates/knot/src/model.rs
···11+use crate::{22+ RepoId,33+ convert::time_to_offsetdatetime,44+ model::errors::{HeadDetached, PathNotFound, RefNotFound, RepoEmpty, RepoError, RepoNotFound},55+ public::xrpc::XrpcError,66+ types::sh::tangled::repo::{77+ BlobEncoding, BlobParams, BlobResponse, Branch, BranchesParams, BranchesResponse,88+ DefaultBranchResponse, Diff, DiffParams, DiffResponse, GetDefaultBranchParams, JsonBlob,99+ LogParams, LogResponse, Readme, Reference, Tag, TagParams, TagsResponse, TreeEntry,1010+ TreeParams, TreeResponse,1111+ },1212+};1313+use gix::{1414+ Commit, ObjectId, Repository, ThreadSafeRepository, bstr::ByteSlice as _, open::Options,1515+};1616+use identity::Did;1717+use rustc_hash::FxHashMap;1818+use std::{1919+ cmp::Reverse,2020+ path::{Path, PathBuf},2121+ str::FromStr,2222+ sync::{Arc, Mutex},2323+};2424+2525+const READMES: &[&[u8]] = &[2626+ b"README.md",2727+ b"readme.md",2828+ b"README",2929+ b"readme",3030+ b"README.markdown",3131+ b"readme.markdown",3232+ b"README.txt",3333+ b"readme.txt",3434+ b"README.rst",3535+ b"readme.rst",3636+ b"README.org",3737+ b"readme.org",3838+ b"README.asciidoc",3939+ b"readme.asciidoc",4040+ b"index.rst",4141+];4242+4343+pub mod errors;4444+mod gitoxide;4545+4646+#[derive(Clone)]4747+pub struct Knot {4848+ inner: Arc<KnotState>,4949+}5050+5151+impl std::fmt::Debug for Knot {5252+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {5353+ f.debug_struct("Knot").finish_non_exhaustive()5454+ }5555+}5656+5757+struct KnotState {5858+ owner: Did,5959+ repository_base: PathBuf,6060+ repository_cache: Mutex<FxHashMap<RepoId, ThreadSafeRepository>>,6161+}6262+6363+trait RepositoryManager {6464+ fn open(&self, repo: &RepoId) -> Result<Repository, XrpcError>;6565+}6666+6767+impl RepositoryManager for Knot {6868+ fn open(&self, repo: &RepoId) -> Result<Repository, XrpcError> {6969+ let mut cache = self7070+ .inner7171+ .repository_cache7272+ .lock()7373+ .unwrap_or_else(|mut poison| {7474+ poison.get_mut().clear();7575+ poison.into_inner()7676+ });7777+7878+ if let Some(repository) = cache.get(repo) {7979+ return Ok(repository.to_thread_local());8080+ }8181+8282+ let mut path = self.inner.repository_base.clone();8383+ path.push(repo.owner());8484+ path.push(repo.name());8585+8686+ let repository = Options::default()8787+ .strict_config(true)8888+ .open_path_as_is(true)8989+ .open(path)9090+ .map_err(RepoNotFound)?;9191+9292+ let local = repository.to_thread_local();9393+ assert!(local.is_bare());9494+9595+ match std::env::var("NO_REPO_CACHE").as_deref() {9696+ Ok("true") => Ok(repository.to_thread_local()),9797+ _ => {9898+ cache.insert(repo.clone(), repository.clone());9999+ Ok(repository.to_thread_local())100100+ }101101+ }102102+ }103103+}104104+105105+impl Knot {106106+ pub fn new(owner: Did, repository_base: impl AsRef<Path>) -> Self {107107+ let inner = Arc::new(KnotState {108108+ owner,109109+ repository_base: repository_base.as_ref().to_owned(),110110+ repository_cache: Default::default(),111111+ });112112+113113+ Self { inner }114114+ }115115+116116+ #[inline]117117+ pub fn owner(&self) -> &Did {118118+ &self.inner.owner119119+ }120120+121121+ pub fn base_path(&self) -> &Path {122122+ &self.inner.repository_base123123+ }124124+}125125+126126+impl Knot {127127+ fn resolve_rev<'repo>(128128+ repo: &'repo Repository,129129+ rev: Option<&str>,130130+ ) -> Result<Commit<'repo>, XrpcError> {131131+ let revision = if let Some(refspec) = rev {132132+ match ObjectId::from_str(refspec) {133133+ Ok(id) => repo.find_commit(id).map_err(RefNotFound)?,134134+ Err(_) => {135135+ // Assume the refspec is a branch or tag.136136+ let mut reference = repo.find_reference(refspec).map_err(RefNotFound)?;137137+ reference.peel_to_commit().map_err(RefNotFound)?138138+ }139139+ }140140+ } else {141141+ repo.head_commit().map_err(RefNotFound)?142142+ };143143+144144+ Ok(revision)145145+ }146146+147147+ pub async fn get_default_branch(148148+ &self,149149+ params: &GetDefaultBranchParams,150150+ ) -> Result<DefaultBranchResponse, XrpcError> {151151+ tokio::task::block_in_place(|| {152152+ let repo = self.open(¶ms.repo)?;153153+154154+ // Assume HEAD points the intended default branch. This *should* be true155155+ // for a bare repository.156156+ let mut head = repo.head()?;157157+ let name = head158158+ .referent_name()159159+ .ok_or(HeadDetached)?160160+ .shorten()161161+ .to_string();162162+163163+ let hash = head.id().map(|id| id.into());164164+ let when = head165165+ .peel_to_commit_in_place()166166+ .ok()167167+ .and_then(|commit| {168168+ commit169169+ .committer()170170+ .ok()171171+ .and_then(|committer| committer.time().ok())172172+ })173173+ .and_then(|time| time_to_offsetdatetime(&time).ok());174174+175175+ Ok(DefaultBranchResponse { name, hash, when })176176+ })177177+ }178178+179179+ pub async fn branches(&self, params: &BranchesParams) -> Result<BranchesResponse, XrpcError> {180180+ tokio::task::block_in_place(|| {181181+ let repo = self.open(¶ms.repo)?;182182+183183+ // Assume HEAD points to the intended default branch. This *should* be184184+ // true for a bare repository.185185+ let head = repo.head()?;186186+ let default_name = head187187+ .referent_name()188188+ .ok_or(HeadDetached)?189189+ .shorten()190190+ .to_string();191191+192192+ let mut branches = Vec::new();193193+ for branch in repo194194+ .references()?195195+ .local_branches()?196196+ .skip(params.cursor)197197+ .take(params.limit.into())198198+ {199199+ let Ok(branch) = branch.inspect_err(|error| tracing::error!(?error)) else {200200+ continue;201201+ };202202+203203+ let name = branch.name().shorten().to_string();204204+ let Some(id) = branch.try_id() else {205205+ tracing::warn!(?name, "branch unborn, skipping");206206+ continue;207207+ };208208+209209+ let Ok(commit) = repo.find_commit(id) else {210210+ tracing::error!(?name, ?id, "failed to find commit for branch");211211+ continue;212212+ };213213+214214+ let is_default = name == default_name;215215+ branches.push(Branch {216216+ reference: Reference {217217+ name,218218+ hash: commit.id.into(),219219+ },220220+ commit: commit.try_into()?,221221+ is_default,222222+ });223223+ }224224+225225+ Ok(BranchesResponse { branches })226226+ })227227+ }228228+229229+ pub async fn log(&self, params: &LogParams) -> Result<LogResponse, XrpcError> {230230+ tokio::task::block_in_place(|| {231231+ let repo = self.open(¶ms.repo)?;232232+233233+ let commit_graph = repo.commit_graph_if_enabled().unwrap();234234+ let total = match &commit_graph {235235+ Some(cg) => cg236236+ .num_commits()237237+ .try_into()238238+ .expect("You must be at least 32 bits tall to enjoy this ride"),239239+ None => {240240+ tracing::warn!(?repo, "no commit-graph, counting commits manually");241241+ repo.rev_walk([repo.head_id().map_err(RepoEmpty)?])242242+ .all()243243+ .map_err(RepoError)?244244+ .count()245245+ }246246+ };247247+248248+ let tip = Self::resolve_rev(&repo, params.rev.as_deref())?;249249+250250+ let mut commits = Vec::new();251251+ for commit in repo252252+ .rev_walk([tip.id()])253253+ .with_commit_graph(commit_graph)254254+ .all()255255+ .map_err(RepoError)?256256+ .skip(params.cursor)257257+ .take(params.limit.into())258258+ {259259+ match commit {260260+ Ok(commit) => {261261+ let commit = repo.find_commit(commit.id()).map_err(RepoError)?;262262+ commits.push(commit.try_into().map_err(RepoError)?);263263+ }264264+ Err(error) => {265265+ tracing::error!(?error);266266+ break;267267+ }268268+ }269269+ }270270+271271+ Ok(LogResponse {272272+ commits,273273+ log: true,274274+ total,275275+ page: 1 + params.cursor / usize::from(params.limit),276276+ per_page: params.limit,277277+ })278278+ })279279+ }280280+281281+ pub async fn tags(&self, params: &TagParams) -> Result<TagsResponse, XrpcError> {282282+ // @TODO Implement cursor.283283+ tokio::task::block_in_place(|| {284284+ let repo = self.open(¶ms.repo)?;285285+286286+ let mut tags: Vec<_> = repo287287+ .references()?288288+ .tags()?289289+ .filter_map(|tag| {290290+ tag.inspect_err(|error| tracing::error!(?error))291291+ .ok()?292292+ .try_into()293293+ .inspect_err(|error| tracing::error!(?error))294294+ .ok()295295+ })296296+ .collect();297297+298298+ tags.sort_by_key(|tag: &Tag| {299299+ Reverse(300300+ tag.annotation301301+ .as_ref()302302+ .map(|an| an.tagger.as_ref().map(|tagger| tagger.when)),303303+ )304304+ });305305+306306+ tags.truncate(params.limit);307307+308308+ Ok(TagsResponse { tags })309309+ })310310+ }311311+312312+ pub async fn tree(&self, params: &TreeParams) -> Result<TreeResponse, XrpcError> {313313+ tokio::task::block_in_place(|| {314314+ let repo = self.open(¶ms.repo)?;315315+ let tip = Self::resolve_rev(&repo, params.rev.as_deref())?;316316+ let dotdot = params.path.clone().and_then(|mut path| {317317+ path.pop();318318+ match path.as_os_str().is_empty() {319319+ true => None,320320+ false => Some(path),321321+ }322322+ });323323+324324+ let mut parent = None;325325+ let mut tree = tip.tree()?;326326+ if let Some(subpath) = ¶ms.path {327327+ let entry = tree328328+ .lookup_entry_by_path(subpath)?329329+ .ok_or(PathNotFound(subpath.to_string_lossy()))?;330330+331331+ if !entry.mode().is_tree() {332332+ return Ok(TreeResponse {333333+ files: vec![],334334+ dotdot,335335+ parent: params.path.clone(),336336+ rev: params.rev.as_deref().unwrap_or_default().to_string(),337337+ readme: None,338338+ });339339+ }340340+341341+ let subtree = repo.find_tree(entry.id()).unwrap();342342+ tree = subtree;343343+ parent = Some(subpath.to_path_buf());344344+ }345345+346346+ let mut files: Vec<TreeEntry> = vec![];347347+ let mut readme = None;348348+ for entry in tree.iter() {349349+ let Ok(entry) = entry else {350350+ continue;351351+ };352352+353353+ if READMES.contains(&entry.filename().as_bytes())354354+ && entry.mode().is_blob()355355+ && readme.is_none()356356+ {357357+ let mut file = repo.find_blob(entry.id())?;358358+ if let Ok(contents) = String::from_utf8(file.take_data()) {359359+ readme.replace(Readme {360360+ contents,361361+ filename: entry.filename().to_string(),362362+ });363363+ }364364+ }365365+366366+ let Ok(tree_entry) = entry.try_into() else {367367+ continue;368368+ };369369+ files.push(tree_entry);370370+ }371371+372372+ let files: Vec<_> = tree373373+ .iter()374374+ .filter_map(|entry| {375375+ let entry = entry.ok()?;376376+ let file: TreeEntry = entry.try_into().ok()?;377377+ Some(file)378378+ })379379+ .collect();380380+381381+ Ok(TreeResponse {382382+ files,383383+ dotdot,384384+ parent,385385+ rev: params.rev.as_deref().unwrap_or_default().to_string(),386386+ readme,387387+ })388388+ })389389+ }390390+391391+ pub async fn blob(&self, params: &BlobParams) -> Result<BlobResponse, XrpcError> {392392+ tokio::task::block_in_place(|| {393393+ let repo = self.open(¶ms.repo)?;394394+ let tip = Self::resolve_rev(&repo, params.rev.as_deref())?;395395+ let entry = tip396396+ .tree()?397397+ .lookup_entry_by_path(¶ms.path)?398398+ .ok_or(PathNotFound(params.path.to_string_lossy()))?;399399+400400+ if !(entry.mode().is_blob() || entry.mode().is_link()) {401401+ panic!("Not a blob: {:?}", params.path);402402+ }403403+404404+ let mut blob = entry.object()?.into_blob();405405+ let data = blob.take_data();406406+ if params.raw {407407+ return Ok(BlobResponse::Raw(data));408408+ }409409+410410+ let size = data.len();411411+ let (content, is_binary, encoding) = match String::from_utf8(data) {412412+ Ok(content) => (content, false, BlobEncoding::Utf8),413413+ Err(error) => (414414+ data_encoding::BASE64.encode(&error.into_bytes()),415415+ true,416416+ BlobEncoding::Base64,417417+ ),418418+ };419419+420420+ Ok(BlobResponse::Json(JsonBlob {421421+ content,422422+ encoding,423423+ is_binary,424424+ mime_type: "".to_string(),425425+ path: params.path.to_owned(),426426+ rev: params.rev.as_deref().unwrap_or_default().to_owned(),427427+ size,428428+ }))429429+ })430430+ }431431+432432+ pub async fn diff(&self, params: &DiffParams) -> Result<DiffResponse, XrpcError> {433433+ {434434+ tokio::task::block_in_place(|| {435435+ let repo = self.open(¶ms.repo)?;436436+437437+ let revision = match ObjectId::from_str(¶ms.rev) {438438+ Ok(id) => repo.find_commit(id).map_err(RefNotFound)?,439439+ Err(_) => {440440+ // Assume the refspec is a branch or tag.441441+ let mut reference =442442+ repo.find_reference(¶ms.rev).map_err(RefNotFound)?;443443+ reference.peel_to_commit().map_err(RefNotFound)?444444+ }445445+ };446446+447447+ // let parent_tree = match revision.parent_ids().next() {448448+ // Some(id) => repository.find_commit(id)?.tree()?,449449+ // None => repository.empty_tree(),450450+ // };451451+452452+ // let mut cache = repository453453+ // .diff_resource_cache(454454+ // gix::diff::blob::pipeline::Mode::ToGit,455455+ // WorktreeRoots::default(),456456+ // )457457+ // .unwrap();458458+459459+ // let this_tree = revision.tree()?;460460+ // this_tree461461+ // .changes()?462462+ // .for_each_to_obtain_tree(&parent_tree, |change| {463463+ // tracing::debug!(?change);464464+ // if change.entry_mode().is_blob() {465465+ // println!("{}:", change.location().to_string());466466+ // let mut line_diff = change.diff(&mut cache).unwrap();467467+ // line_diff468468+ // .resource_cache469469+ // .options470470+ // .algorithm471471+ // .replace(Algorithm::Histogram);472472+473473+ // let out = line_diff.lines(|_| Ok::<_, Er>(()) )?;474474+475475+ // let input = out.interned_input();476476+ // let diff = gix::diff::blob::diff(477477+ // Algorithm::Histogram,478478+ // &input,479479+ // UnifiedDiff::new(480480+ // &input,481481+ // String::new(),482482+ // gix::diff::blob::unified_diff::NewlineSeparator::AfterHeaderAndWhenNeeded("\n"),483483+ // ContextSize::symmetrical(3),484484+ // ),485485+ // ).unwrap()486486+ // .finish();487487+488488+ // println!("{diff}");489489+ // }490490+ // Ok::<_, Er>(Action::Continue)491491+ // })?;492492+493493+ let diff = Diff {494494+ commit: revision.try_into()?,495495+ stat: Default::default(),496496+ deltas: vec![],497497+ };498498+499499+ let response = DiffResponse {500500+ rev: params.rev.to_owned(),501501+ diff,502502+ };503503+504504+ Ok(response)505505+ })506506+ }507507+ }508508+}