···4848tower = { version = "0.5.2", features = ["buffer", "filter", "limit"] }4949tower-http = { version = "0.6.6", features = ["decompression-gzip", "request-id", "trace", "tracing", "util"] }5050tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }5151+dashmap = "6.1.0"5252+mock-pds = { version = "0.0.0", path = "../mock-pds" }51535254[dev-dependencies]5355http-body-util = "0.1.3"
+190-340
crates/knot/src/lib.rs
···99pub mod private;1010pub mod public;1111pub mod services;1212+pub mod sync;1213pub mod types;1314mod util;1515+1616+#[cfg(test)]1717+pub(crate) mod mock;14181519pub async fn serve_all(1620 router: Router,···44404541#[cfg(test)]4642mod tests {4747- use std::{4848- borrow::Cow,4949- collections::HashMap,5050- net::{SocketAddr, TcpListener},5151- sync::{Arc, Mutex},5252- };4343+ use std::borrow::Cow;53445454- use atproto::{Did, did::OwnedDid, tid::Tid};5555- use auth::jwt::{Claims, Header};5656- use aws_lc_rs::{5757- encoding::{AsBigEndian, EcPublicKeyCompressedBin},5858- rand::SystemRandom,5959- signature::{ECDSA_P256K1_SHA256_FIXED_SIGNING, EcdsaKeyPair, KeyPair},6060- };4545+ use atproto::{Did, tid::Tid};4646+ use auth::jwt::Claims;4747+6148 use axum::{6262- Json, Router,6349 body::Body,6464- extract::{Query, State},6565- http::{HeaderValue, Method, Request, Response, StatusCode, header},5050+ http::{Request, StatusCode},6651 };6767- use futures_util::FutureExt;6868- use identity::{DidDocument, ResolveIdentity, Resolver};6969- use sqlx::SqlitePool;7070- use tempfile::TempDir;7152 use time::{OffsetDateTime, format_description::well_known::Rfc3339};7253 use tower::ServiceExt;73547474- use crate::{7575- model::{Knot, config::KnotConfiguration},7676- services::database::DataStore,7777- };5555+ use crate::model::Knot;78567957 const TEST_DID: &str = "did:plc:65gha4t3avpfpzmvpbwovss7";8080- const TEST_INSTANCE: &str = "did:web:test-knot";8181-8282- /// Mock PDS, PLC directory and identity resolver.8383- #[derive(Clone, Debug)]8484- struct Pds {8585- addr: Option<SocketAddr>,8686- identities: Arc<Mutex<Vec<(DidDocument, EcdsaKeyPair)>>>,8787- documents: Arc<Mutex<HashMap<String, serde_json::Value>>>,8888- }8989-9090- impl Default for Pds {9191- fn default() -> Self {9292- Self::new()9393- }9494- }9595-9696- impl Pds {9797- fn new() -> Self {9898- Self {9999- addr: None,100100- identities: Default::default(),101101- documents: Default::default(),102102- }103103- }104104-105105- fn service_endpoint(&self) -> Option<url::Url> {106106- self.addr.map(|addr| {107107- format!("http://{addr}/")108108- .parse()109109- .expect("service endpoint should be a valid URL")110110- })111111- }112112-113113- fn get_record(&self, uri: &str) -> Option<serde_json::Value> {114114- self.documents.lock().unwrap().get(uri).cloned()115115- }116116-117117- fn insert_record(118118- &self,119119- did: &Did,120120- collection: &str,121121- rkey: &str,122122- doc: serde_json::Value,123123- ) -> String {124124- let uri = format!("at://{did}/{collection}/{rkey}");125125- self.documents.lock().unwrap().insert(uri.clone(), doc);126126- uri127127- }128128-129129- /// Add a DID document created from `did`, `handle`, and a random ecdsa key-pair to the PDS.130130- ///131131- /// If [`Self::serve`] has been called before adding the identity, the local URL of the132132- /// PDS will be set as the new DID document's '#atproto_pds' service endpoint.133133- ///134134- fn add_identity(&self, did: &Did, handle: &str) {135135- let mut doc = DidDocument::new(did, handle).expect("valid did for did document");136136- if let Some(service_endpoint) = self.service_endpoint() {137137- doc.service138138- .push(identity::Service::atproto_pds(service_endpoint));139139- }140140-141141- // Generate a key pair and encode the public key as verification method for142142- // the mock user.143143- let key_pair = EcdsaKeyPair::generate(&ECDSA_P256K1_SHA256_FIXED_SIGNING).unwrap();144144- let public_key: EcPublicKeyCompressedBin = key_pair.public_key().as_be_bytes().unwrap();145145- let mut key_data = vec![0xe7, 0x01];146146- key_data.extend_from_slice(public_key.as_ref());147147- let public_key_multibase = multibase::encode(multibase::Base::Base58Btc, key_data);148148- doc.verification_method149149- .push(identity::VerificationMethod::Multikey {150150- id: format!("{}#atproto", doc.id),151151- controller: doc.id.clone(),152152- public_key_multibase,153153- });154154-155155- self.identities.lock().unwrap().push((doc, key_pair));156156- }157157-158158- // Create an inter-service auth header for an account in the fake PDS.159159- fn service_auth(&self, claims: &Claims) -> HeaderValue {160160- use data_encoding::BASE64URL_NOPAD as Encoding;161161-162162- let mut token = String::new();163163- let header = Encoding.encode(164164- &serde_json::to_vec(&Header {165165- typ: auth::jwt::Type::JWT,166166- alg: auth::jwt::Algorithm::ES256K,167167- crv: None,168168- })169169- .unwrap(),170170- );171171-172172- token.push_str(&header);173173-174174- let claims_enc = Encoding.encode(&serde_json::to_vec(claims).unwrap());175175- token.push('.');176176- token.push_str(&claims_enc);177177-178178- let guard = self.identities.lock().unwrap();179179- let key_pair = guard180180- .iter()181181- .find(|(doc, _)| doc.id == claims.iss)182182- .map(|(_, key_pair)| key_pair)183183- .expect("DID should exist to issue a service auth request");184184-185185- let signature = key_pair186186- .sign(&SystemRandom::new(), token.as_bytes())187187- .unwrap();188188-189189- let signature = Encoding.encode(signature.as_ref());190190- token.push('.');191191- token.push_str(&signature);192192-193193- HeaderValue::from_str(&format!("Bearer {token}"))194194- .expect("Service auth header should be valid")195195- }196196-197197- fn service_auth_with<F>(&self, iss: &Did, aud: &Did, modify_claims: F) -> HeaderValue198198- where199199- F: FnOnce(&mut Claims),200200- {201201- let jti: [u8; 16] = rand::random();202202- let jti = data_encoding::BASE32_NOPAD_VISUAL203203- .encode(&jti)204204- .to_lowercase();205205-206206- let mut claims = Claims {207207- iss: iss.into(),208208- aud: aud.into(),209209- iat: OffsetDateTime::now_utc().unix_timestamp(),210210- exp: OffsetDateTime::now_utc().unix_timestamp() + 10,211211- lxm: None,212212- jti: jti.into(),213213- };214214-215215- modify_claims(&mut claims);216216-217217- self.service_auth(&claims)218218- }219219-220220- fn service_auth_from(&self, iss: &Did, aud: &Did, lxm: &str) -> HeaderValue {221221- self.service_auth_with(iss, aud, |claims| {222222- claims.lxm = Some(223223- lxm.try_into()224224- .expect("Lexicon method should be a valid NSID"),225225- );226226- })227227- }228228-229229- fn serve(&mut self) {230230- #[derive(serde::Deserialize)]231231- struct GetRecord {232232- repo: String,233233- collection: String,234234- rkey: String,235235- }236236-237237- assert!(self.addr.is_none(), "serve() already called");238238- let pds = Router::new()239239- .route(240240- "/xrpc/com.atproto.repo.getRecord",241241- axum::routing::get(242242- async move |State(state): State<Pds>,243243- Query(GetRecord {244244- repo,245245- collection,246246- rkey,247247- }): Query<GetRecord>| {248248- Json(state.get_record(&format!("at://{repo}/{collection}/{rkey}")))249249- },250250- ),251251- )252252- .with_state(self.clone());253253-254254- let listener = TcpListener::bind("127.0.0.1:0").unwrap();255255- listener.set_nonblocking(true).unwrap();256256- self.addr = Some(listener.local_addr().unwrap());257257-258258- tokio::spawn(async move {259259- axum::serve(tokio::net::TcpListener::from_std(listener).unwrap(), pds)260260- .await261261- .unwrap();262262- });263263- }264264- }265265-266266- impl ResolveIdentity for Pds {267267- fn resolve_handle<'s: 'h, 'h>(268268- &'s self,269269- handle: &'h str,270270- ) -> futures_util::future::BoxFuture<'h, Result<OwnedDid, identity::ResolveError>> {271271- async move {272272- self.identities273273- .lock()274274- .unwrap()275275- .iter()276276- .find_map(|(doc, _)| {277277- match doc.primary_alias().is_some_and(|alias| alias == handle) {278278- true => Some(doc.id.clone()),279279- false => None,280280- }281281- })282282- .ok_or(identity::ResolveError::UnresolvedHandle)283283- }284284- .boxed()285285- }286286-287287- fn resolve_did<'s: 'd, 'd>(288288- &'s self,289289- did: &'d Did,290290- ) -> futures_util::future::BoxFuture<'d, Result<DidDocument, identity::ResolveError>>291291- {292292- async move {293293- self.identities294294- .lock()295295- .unwrap()296296- .iter()297297- .find_map(|(doc, _)| match doc.id == did {298298- true => Some(doc.clone()),299299- false => None,300300- })301301- .ok_or(identity::ResolveError::UnresolvedHandle)302302- }303303- .boxed()304304- }305305- }306306-307307- async fn setup(owner_did: Option<&str>) -> (TempDir, Pds, Knot) {308308- let base = tempfile::tempdir().expect("temporary directory");309309- let pool = SqlitePool::connect("sqlite://:memory:").await.unwrap();310310- sqlx::migrate!().run(&pool).await.unwrap();311311-312312- let mut pds = Pds::new();313313- pds.serve();314314-315315- let config = KnotConfiguration::new(316316- OwnedDid::from_static(TEST_DID),317317- OwnedDid::from_static(TEST_INSTANCE),318318- base.path(),319319- );320320-321321- let database = DataStore::new(pool);322322- let resolver = Resolver::new(pds.clone());323323-324324- let knot = Knot::new(config, resolver, reqwest::Client::new(), database, []).unwrap();325325- if let Some(owner_did) = owner_did326326- .map(Did::parse)327327- .transpose()328328- .expect("knot owner did should be valid")329329- {330330- knot.database()331331- .upsert_knot_member(332332- "",333333- "",334334- "",335335- &lexicon::sh_tangled::knot::Member {336336- subject: Cow::Borrowed(&owner_did),337337- domain: Cow::Borrowed(knot.instance_ident()),338338- created_at: OffsetDateTime::now_utc(),339339- },340340- )341341- .await342342- .expect("knot member inserted into db");343343- }344344-345345- (base, pds, knot)346346- }5858+ const TEST_INSTANCE: &str = "lib-knot-test";3475934860 fn get(uri: &str) -> Request<Body> {34961 Request::builder().uri(uri).body(Body::empty()).unwrap()···6734768348 #[tokio::test]69349 async fn can_query_knot_owner() {7070- let (_, _, knot) = setup(None).await;350350+ let (_, _, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;71351 let response = super::public::router()72352 .with_state(knot)73353 .oneshot(get("/xrpc/sh.tangled.owner"))···9037091371 #[tokio::test]92372 async fn xrpc_sh_tangled_repo_missing_repo() {9393- let (_, _, knot) = setup(None).await;373373+ let (_, _, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;94374 for particle in ["tree", "log", "tags", "branches"] {95375 let response = super::public::router()96376 .with_state(knot.clone())···104384105385 #[tokio::test]106386 async fn xrpc_sh_tangled_repo_bad_repo_format() {107107- let (_, _, knot) = setup(None).await;387387+ let (_, _, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;108388 for particle in ["tree", "log", "tags", "branches"] {109389 // Missing repo name110390 let response = super::public::router()···135415136416 #[tokio::test]137417 async fn xrpc_sh_tangled_repo_not_found() {138138- let (_, _, knot) = setup(None).await;418418+ let (_, _, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;139419 for particle in ["tree", "log", "tags", "branches"] {140420 let response = super::public::router()141421 .with_state(knot.clone())···152432 mod sh_tangled_repo_create {153433 use super::super::public;154434 use super::*;435435+ use axum::http::{HeaderValue, Method, Response, header};436436+437437+ fn make_claims<F>(iss: &Did, aud: &Did, modify_claims: F) -> Claims438438+ where439439+ F: FnOnce(&mut Claims),440440+ {441441+ let jti: [u8; 16] = rand::random();442442+ let jti = data_encoding::BASE32_NOPAD_VISUAL443443+ .encode(&jti)444444+ .to_lowercase();445445+446446+ let mut claims = Claims {447447+ iss: iss.into(),448448+ aud: aud.into(),449449+ iat: OffsetDateTime::now_utc().unix_timestamp(),450450+ exp: OffsetDateTime::now_utc().unix_timestamp() + 10,451451+ lxm: None,452452+ jti: jti.into(),453453+ };454454+455455+ modify_claims(&mut claims);456456+ claims457457+ }458458+459459+ async fn service_auth_with<F>(460460+ pds: &mock_pds::Pds,461461+ iss: &Did,462462+ aud: &Did,463463+ modify_claims: F,464464+ ) -> HeaderValue465465+ where466466+ F: FnOnce(&mut Claims),467467+ {468468+ let claims = make_claims(iss, aud, modify_claims);469469+ let authorization = pds.service_auth(&claims).await;470470+ HeaderValue::from_str(&authorization).unwrap()471471+ }155472156473 #[tokio::test]157474 async fn reject_wrong_method() {158158- let (_, _, knot) = setup(None).await;475475+ let (_, _, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;159476 let response = public::router()160477 .with_state(knot.clone())161478 .oneshot(get("/xrpc/sh.tangled.repo.create"))···204447205448 async fn create_repo_with<F>(206449 knot: &Knot,207207- pds: Pds,450450+ pds: mock_pds::Pds,208451 did: &Did,209452 rkey: &str,210453 repo_name: &str,···219462 did,220463 "sh.tangled.repo",221464 rkey,222222- serde_json::json!({223223- "uri": format!("at://{did}/sh.tangled.repo/{rkey}"),224224- "cid": "bafyreie7ym6v4gepcdi2ul2kchylo5aahlw3nmvjg3veipoi76kziixfoa",225225- "value": {226226- "name": repo_name,227227- "knot": knot.instance_ident(),228228- "source": source,229229- "createdAt": OffsetDateTime::now_utc().format(&Rfc3339).unwrap()230230- }465465+ &serde_json::json!({466466+ "name": repo_name,467467+ "knot": knot.instance_ident(),468468+ "source": source,469469+ "createdAt": OffsetDateTime::now_utc().format(&Rfc3339).unwrap()231470 }),232232- );471471+ )472472+ .await;233473234474 // Generate the body of the 'sh.tangled.repo.create' request.235475 let create = lexicon::sh_tangled::repo::create::Input {···235481 source: None,236482 };237483238238- let auth = pds.service_auth_with(&did, &knot.instance, |claims| {484484+ let auth = service_auth_with(&pds, &did, &knot.instance, |claims| {239485 claims.lxm = Some("sh.tangled.repo.create".try_into().unwrap());240486 modify_claims(claims);241241- });487487+ })488488+ .await;242489243490 let response = public::router()244491 .with_state(knot.clone())···258503259504 async fn create_repo(260505 knot: &Knot,261261- pds: Pds,506506+ pds: mock_pds::Pds,262507 did: &Did,263508 rkey: &str,264509 repo_name: &str,···278523279524 #[tokio::test]280525 async fn can_create_repo() {281281- let (_base, pds, knot) = setup(Some(TEST_DID)).await;526526+ let (_base, pds, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;282527283528 let did = Did::from_static(TEST_DID);284284- pds.add_identity(did, "tjh.dev");529529+ pds.insert_identity(did, "tjh.dev").await;530530+ knot.add_member(531531+ "",532532+ "",533533+ "",534534+ &lexicon::sh_tangled::knot::Member::new(535535+ &did,536536+ knot.instance_ident(),537537+ OffsetDateTime::now_utc(),538538+ ),539539+ )540540+ .await541541+ .unwrap();285542286543 let rkey = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();287544 assert_eq!(···308541309542 #[tokio::test]310543 async fn can_create_fork_from_at() {311311- let (_base, pds, knot) = setup(Some(TEST_DID)).await;544544+ let (_base, pds, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;312545313546 let did = Did::from_static(TEST_DID);314314- pds.add_identity(did, "tjh.dev");547547+ pds.insert_identity(did, "tjh.dev").await;548548+ knot.add_member(549549+ "",550550+ "",551551+ "",552552+ &lexicon::sh_tangled::knot::Member::new(553553+ &did,554554+ knot.instance_ident(),555555+ OffsetDateTime::now_utc(),556556+ ),557557+ )558558+ .await559559+ .unwrap();315560316561 // Create a record for the repository to fork from.317562 // <https://pdsls.dev/at://did:plc:65gha4t3avpfpzmvpbwovss7/sh.tangled.repo/3m24udbjajf22#record>318318- let aturi = pds.insert_record(319319- did,320320- "sh.tangled.repo",321321- "3m24udbjajf22",322322- serde_json::json!({323323- "uri": format!("at://{did}/sh.tangled.repo/3m24udbjajf22"),324324- "cid": "some_cid",325325- "value": {563563+ let aturi = pds564564+ .insert_record(565565+ did,566566+ "sh.tangled.repo",567567+ "3m24udbjajf22",568568+ &serde_json::json!({326569 "name": "gordian",327570 "knot": "gordian.tjh.dev",328571 "createdAt": "2025-10-01T10:45:52Z"329329- }330330- }),331331- );572572+ }),573573+ )574574+ .await;332575333576 let rkey = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();334577 assert_eq!(···353576354577 #[tokio::test]355578 async fn can_create_fork_from_http() {356356- let (_base, pds, knot) = setup(Some(TEST_DID)).await;579579+ let (_base, pds, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;357580358581 let did = Did::from_static(TEST_DID);359359- pds.add_identity(did, "tjh.dev");582582+ pds.insert_identity(did, "tjh.dev").await;583583+ knot.add_member(584584+ "",585585+ "",586586+ "",587587+ &lexicon::sh_tangled::knot::Member::new(588588+ &did,589589+ knot.instance_ident(),590590+ OffsetDateTime::now_utc(),591591+ ),592592+ )593593+ .await594594+ .unwrap();360595361596 let rkey = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();362597 let source =···385596386597 #[tokio::test]387598 async fn can_create_fork_from_http_fail() {388388- let (base, pds, knot) = setup(Some(TEST_DID)).await;599599+ let (base, pds, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;389600390601 let did = Did::from_static(TEST_DID);391391- pds.add_identity(did, "tjh.dev");602602+ pds.insert_identity(did, "tjh.dev").await;603603+ knot.add_member(604604+ "",605605+ "",606606+ "",607607+ &lexicon::sh_tangled::knot::Member::new(608608+ &did,609609+ knot.instance_ident(),610610+ OffsetDateTime::now_utc(),611611+ ),612612+ )613613+ .await614614+ .unwrap();392615393616 let rkey = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();394617 let source =···423622424623 #[tokio::test]425624 async fn rejects_if_owner_is_not_a_member() {426426- let (_base, pds, knot) = setup(None).await;625625+ let (_base, pds, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;427626428627 let did = Did::from_static(TEST_DID);429429- pds.add_identity(did, "tjh.dev");628628+ pds.insert_identity(did, "tjh.dev").await;430629431630 let rkey = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();432631 assert_ne!(···441640442641 #[tokio::test]443642 async fn rejects_auth_issued_in_future() {444444- let (_base, pds, knot) = setup(Some(TEST_DID)).await;643643+ let (_base, pds, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;445644446645 let did = Did::from_static(TEST_DID);447447- pds.add_identity(did, "tjh.dev");646646+ pds.insert_identity(did, "tjh.dev").await;647647+ knot.add_member(648648+ "",649649+ "",650650+ "",651651+ &lexicon::sh_tangled::knot::Member::new(652652+ &did,653653+ knot.instance_ident(),654654+ OffsetDateTime::now_utc(),655655+ ),656656+ )657657+ .await658658+ .unwrap();448659449660 let rkey = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();450661 assert_eq!(···475662476663 #[tokio::test]477664 async fn rejects_auth_expired() {478478- let (_base, pds, knot) = setup(Some(TEST_DID)).await;665665+ let (_base, pds, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;479666480667 let did = Did::from_static(TEST_DID);481481- pds.add_identity(did, "tjh.dev");668668+ pds.insert_identity(did, "tjh.dev").await;669669+ knot.add_member(670670+ "",671671+ "",672672+ "",673673+ &lexicon::sh_tangled::knot::Member::new(674674+ &did,675675+ knot.instance_ident(),676676+ OffsetDateTime::now_utc(),677677+ ),678678+ )679679+ .await680680+ .unwrap();482681483682 let rkey = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();484683 assert_eq!(···507682508683 #[tokio::test]509684 async fn can_delete_repo() {510510- let (base, pds, knot) = setup(Some(TEST_DID)).await;685685+ let (base, pds, knot) = crate::mock::setup(TEST_DID, TEST_INSTANCE).await;511686512687 let did = Did::from_static(TEST_DID);513513- pds.add_identity(did, "tjh.dev");688688+ pds.insert_identity(did, "tjh.dev").await;689689+ knot.add_member(690690+ "",691691+ "",692692+ "",693693+ &lexicon::sh_tangled::knot::Member::new(694694+ &did,695695+ knot.instance_ident(),696696+ OffsetDateTime::now_utc(),697697+ ),698698+ )699699+ .await700700+ .unwrap();514701515702 let rkey = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();516703 let name = "another-test-repo";···566729 assert!(repo_exists_in_db(&knot, &did, &rkey).await);567730568731 // Or with the wrong auth.569569- let auth = pds.service_auth_from(&did, &knot.instance(), "sh.tangled.repo.create");732732+ let auth = service_auth_with(&pds, &did, &knot.instance(), |claims| {733733+ claims.lxm = Some("sh.tangled.repo.create".try_into().unwrap());734734+ })735735+ .await;736736+570737 assert_eq!(571738 public::router()572739 .with_state(knot.clone())···594753 assert!(repo_exists_in_db(&knot, &did, &rkey).await);595754596755 // Valid auth, empty request body.597597- let auth = pds.service_auth_from(&did, &knot.instance(), "sh.tangled.repo.delete");756756+ // Or with the wrong auth.757757+ let auth = service_auth_with(&pds, &did, &knot.instance(), |claims| {758758+ claims.lxm = Some("sh.tangled.repo.delete".try_into().unwrap());759759+ })760760+ .await;598761 assert_eq!(599762 public::router()600763 .with_state(knot.clone())···621776 gix::open(base.path().join(did.as_str()).join(&rkey)).expect("repository should exist");622777 assert!(repo_exists_in_db(&knot, &did, &rkey).await);623778624624- let auth = pds.service_auth_from(&did, &knot.instance(), "sh.tangled.repo.delete");779779+ // Or with the wrong auth.780780+ let auth = service_auth_with(&pds, &did, &knot.instance(), |claims| {781781+ claims.lxm = Some("sh.tangled.repo.delete".try_into().unwrap());782782+ })783783+ .await;784784+625785 assert_eq!(626786 public::router()627787 .with_state(knot.clone())
+35
crates/knot/src/mock.rs
···11+use crate::{22+ model::{Knot, config::KnotConfiguration},33+ services::database::DataStore,44+};55+use atproto::did::OwnedDid;66+use identity::Resolver;77+88+pub async fn setup(99+ owner_did: &str,1010+ instance_name: &str,1111+) -> (tempfile::TempDir, mock_pds::Pds, Knot) {1212+ let base = tempfile::tempdir().expect("temporary directory");1313+ let pool = sqlx::SqlitePool::connect("sqlite://:memory:")1414+ .await1515+ .unwrap();1616+1717+ sqlx::migrate!().run(&pool).await.unwrap();1818+1919+ let (pds, listener) = mock_pds::init().await;2020+ let pds_api = mock_pds::router(pds.clone());2121+ tokio::spawn(async move {2222+ axum::serve(listener, pds_api).await.unwrap();2323+ });2424+2525+ let owner_did = OwnedDid::parse(owner_did).expect("owner DID must be valid");2626+ let instance = OwnedDid::parse(format!("did:web:{instance_name}"))2727+ .expect("instance name should form a valid DID");2828+2929+ let database = DataStore::new(pool);3030+ let resolver = Resolver::new(pds.clone());3131+ let config = KnotConfiguration::new(owner_did.clone(), instance, base.path());3232+ let knot = Knot::new(config, resolver, reqwest::Client::new(), database, []).unwrap();3333+3434+ (base, pds, knot)3535+}
+5
crates/knot/src/sync.rs
···11+//!22+//! Atmosphere synchronization.33+//!44+55+pub mod tap;
···11+use crate::Pds;22+33+pub fn router<S>(state: Pds) -> axum::Router<S> {44+ axum::Router::new()55+ .merge(com_atproto::repo::get_record())66+ .merge(com_atproto::repo::list_records())77+ .with_state(state)88+}99+1010+pub mod com_atproto {1111+ pub mod repo {1212+ use atproto::did::OwnedDid;1313+ use axum::{1414+ Json, Router,1515+ extract::{FromRef, Query, State},1616+ http::StatusCode,1717+ response::IntoResponse,1818+ };1919+ use serde_json::Value;2020+ use sqlx::Row as _;2121+2222+ use crate::Pds;2323+2424+ #[derive(serde::Serialize)]2525+ pub struct Record {2626+ uri: String,2727+ cid: String,2828+ value: Value,2929+ }3030+3131+ pub fn get_record<S: Clone + Send + Sync + 'static>() -> Router<S>3232+ where3333+ Pds: FromRef<S>,3434+ {3535+ const LXM: &str = "com.atproto.repo.getRecord";3636+3737+ #[derive(serde::Deserialize)]3838+ pub struct Params {3939+ repo: OwnedDid,4040+ collection: String,4141+ rkey: String,4242+ cid: Option<String>,4343+ }4444+4545+ #[tracing::instrument(target = "com_atproto::repo::get_record", skip(pds))]4646+ async fn handle(4747+ State(pds): State<Pds>,4848+ Query(Params {4949+ repo,5050+ collection,5151+ rkey,5252+ cid,5353+ }): Query<Params>,5454+ ) -> Result<Json<Record>, StatusCode> {5555+ assert_eq!(cid, None, "Get record by CID not supported");5656+5757+ match sqlx::query("SELECT cid, data FROM record LEFT JOIN identity USING (did) WHERE did = ? AND collection = ? AND rkey = ?")5858+ .bind(repo.as_ref())5959+ .bind(&collection)6060+ .bind(&rkey)6161+ .fetch_optional(pds.db())6262+ .await {6363+ Err(error) => {6464+ tracing::error!(?error);6565+ Err(StatusCode::INTERNAL_SERVER_ERROR)6666+ }6767+ Ok(None) => Err(StatusCode::NOT_FOUND),6868+ Ok(Some(row)) => {6969+ let cid: String = row.get("cid");7070+ let data: &str = row.get("data");7171+7272+ let uri = format!("at://{repo}/{collection}/{rkey}");7373+ let value: Value = serde_json::from_str(data).expect("Record value in db should be valid json");7474+7575+ Ok(Json(Record { uri, cid, value }))7676+ }7777+ }7878+ }7979+8080+ Router::new().route(&format!("/xrpc/{LXM}"), axum::routing::get(handle))8181+ }8282+8383+ pub fn list_records<S: Clone + Send + Sync + 'static>() -> Router<S>8484+ where8585+ Pds: FromRef<S>,8686+ {8787+ const LXM: &str = "com.atproto.repo.listRecords";8888+8989+ #[derive(serde::Deserialize)]9090+ struct Params {9191+ repo: OwnedDid,9292+ collection: String,9393+ }9494+9595+ #[tracing::instrument(target = "com_atproto::repo::list_records")]9696+ async fn handle(9797+ State(pds): State<Pds>,9898+ Query(Params { repo, collection }): Query<Params>,9999+ ) -> impl IntoResponse {100100+ let rows = sqlx::query(101101+ "SELECT rkey, data FROM record WHERE did = ? AND collection = ? ORDER BY rkey",102102+ )103103+ .bind(repo.as_ref())104104+ .bind(&collection)105105+ .fetch_all(pds.db())106106+ .await107107+ .unwrap();108108+109109+ let records = rows110110+ .into_iter()111111+ .map(|row| {112112+ let rkey: &str = row.get("rkey");113113+ let data: &str = row.get("data");114114+115115+ let uri = format!("at://{repo}/{collection}/{rkey}");116116+ let cid = "bafyreie7ym6v4gepcdi2ul2kchylo5aahlw3nmvjg3veipoi76kziixfoa"117117+ .to_string();118118+ let value: Value = serde_json::from_str(data)119119+ .expect("Record value in db should be valid json");120120+121121+ Record { uri, cid, value }122122+ })123123+ .collect::<Vec<_>>();124124+125125+ Json(serde_json::json!({"records": records }))126126+ }127127+128128+ Router::new().route(&format!("/xrpc/{LXM}"), axum::routing::get(handle))129129+ }130130+ }131131+}
+6
crates/mock-pds/src/lib.rs
···11+mod api;22+mod state;33+44+pub use api::router;55+pub use state::Pds;66+pub use state::init;
+278
crates/mock-pds/src/state.rs
···11+use std::{fmt::Debug, net::SocketAddr, sync::Arc};22+33+use atproto::{did::OwnedDid, tid::Tid};44+use auth::jwt::{Claims, Header};55+use aws_lc_rs::{66+ encoding::{AsBigEndian as _, EcPublicKeyCompressedBin},77+ rand::SystemRandom,88+ signature::{ECDSA_P256K1_SHA256_FIXED_SIGNING, EcdsaKeyPair, KeyPair as _},99+};1010+use futures_util::FutureExt as _;1111+use identity::DidDocument;1212+use sqlx::{1313+ SqlitePool,1414+ sqlite::{SqliteConnectOptions, SqlitePoolOptions},1515+ types::time::OffsetDateTime,1616+};1717+use tokio::{1818+ net::TcpListener,1919+ sync::broadcast::{self, Receiver, Sender},2020+};2121+2222+pub type Event = ();2323+2424+#[derive(Debug)]2525+pub struct Pds {2626+ inner: Arc<Inner>,2727+}2828+2929+impl Pds {3030+ pub fn events(&self) -> Receiver<Event> {3131+ self.inner.tx.subscribe()3232+ }3333+3434+ pub fn db(&self) -> &SqlitePool {3535+ &self.inner.db3636+ }3737+3838+ /// Get the service endpoint for the PDS.3939+ pub fn service_endpoint(&self) -> url::Url {4040+ format!("http://{}/", self.inner.addr)4141+ .parse()4242+ .expect("service endpoint should be a valid URL")4343+ }4444+4545+ /// Add a DID document created from `did`, `handle`, and a random ecdsa key-pair to the PDS.4646+ ///4747+ /// The internal address of the mock PDS will be set as the "#atproto_pds" service for4848+ /// the new identity.4949+ ///5050+ pub async fn insert_identity(&self, did: &atproto::Did, handle: &str) {5151+ let mut doc = DidDocument::new(did, handle).expect("valid did for did document");5252+ doc.service5353+ .push(identity::Service::atproto_pds(self.service_endpoint()));5454+5555+ // Generate a key pair and encode the public key as verification method for5656+ // the mock user.5757+ let key_pair = EcdsaKeyPair::generate(&ECDSA_P256K1_SHA256_FIXED_SIGNING).unwrap();5858+ let public_key: EcPublicKeyCompressedBin = key_pair.public_key().as_be_bytes().unwrap();5959+ let mut key_data = vec![0xe7, 0x01];6060+ key_data.extend_from_slice(public_key.as_ref());6161+ let public_key_multibase = multibase::encode(multibase::Base::Base58Btc, key_data);6262+ doc.verification_method6363+ .push(identity::VerificationMethod::Multikey {6464+ id: format!("{}#atproto", doc.id),6565+ controller: doc.id.clone(),6666+ public_key_multibase,6767+ });6868+6969+ let rev = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();7070+ let doc = serde_json::to_string(&doc).unwrap();7171+ let key = key_pair.to_pkcs8v1().unwrap();7272+7373+ sqlx::query("INSERT INTO identity (handle, did, rev, doc, key) VALUES (?, ?, ?, ?, ?)")7474+ .bind(handle)7575+ .bind(did)7676+ .bind(rev)7777+ .bind(doc)7878+ .bind(key.as_ref())7979+ .execute(self.db())8080+ .await8181+ .unwrap();8282+ }8383+8484+ pub async fn insert_record<T>(8585+ &self,8686+ repo: &atproto::Did,8787+ collection: &str,8888+ rkey: &str,8989+ value: &T,9090+ ) -> String9191+ where9292+ T: serde::Serialize,9393+ {9494+ let rev = Tid::from_datetime(OffsetDateTime::now_utc(), 0).to_string();9595+ let cid = "bafyreie7ym6v4gepcdi2ul2kchylo5aahlw3nmvjg3veipoi76kziixfoa";9696+ let data = serde_json::to_string(value).expect("Value must serialize to json");9797+9898+ let mut tx = self.db().begin().await.unwrap();9999+ sqlx::query("INSERT INTO record (did, collection, rkey, cid, data) VALUES (?, ?, ?, ?, ?)")100100+ .bind(repo)101101+ .bind(collection)102102+ .bind(rkey)103103+ .bind(cid)104104+ .bind(data)105105+ .execute(&mut *tx)106106+ .await107107+ .unwrap();108108+109109+ sqlx::query("UPDATE identity SET rev = ? WHERE did = ?")110110+ .bind(rev)111111+ .bind(repo)112112+ .execute(&mut *tx)113113+ .await114114+ .unwrap();115115+116116+ tx.commit().await.unwrap();117117+118118+ format!("at://{repo}/{collection}/{rkey}")119119+ }120120+121121+ // Create an inter-service auth header for an account in the fake PDS.122122+ pub async fn service_auth(&self, claims: &Claims) -> String {123123+ use data_encoding::BASE64URL_NOPAD as Encoding;124124+ use sqlx::Row as _;125125+126126+ let mut token = String::new();127127+ let header = Encoding.encode(128128+ &serde_json::to_vec(&Header {129129+ typ: auth::jwt::Type::JWT,130130+ alg: auth::jwt::Algorithm::ES256K,131131+ crv: None,132132+ })133133+ .unwrap(),134134+ );135135+136136+ token.push_str(&header);137137+138138+ let claims_enc = Encoding.encode(&serde_json::to_vec(claims).unwrap());139139+ token.push('.');140140+ token.push_str(&claims_enc);141141+142142+ let result = sqlx::query("SELECT key FROM identity WHERE did = ?")143143+ .bind(claims.iss.as_ref())144144+ .fetch_one(self.db())145145+ .await146146+ .unwrap();147147+148148+ let pkcs8: &[u8] = result.get("key");149149+ let key_pair = EcdsaKeyPair::from_pkcs8(&ECDSA_P256K1_SHA256_FIXED_SIGNING, pkcs8)150150+ .expect("PKCSv8 key must be valid");151151+152152+ let signature = key_pair153153+ .sign(&SystemRandom::new(), token.as_bytes())154154+ .unwrap();155155+156156+ let signature = Encoding.encode(signature.as_ref());157157+ token.push('.');158158+ token.push_str(&signature);159159+160160+ format!("Bearer {token}")161161+ }162162+}163163+164164+impl Clone for Pds {165165+ fn clone(&self) -> Self {166166+ let inner = Arc::clone(&self.inner);167167+ Self { inner }168168+ }169169+}170170+171171+impl identity::ResolveIdentity for Pds {172172+ fn resolve_handle<'s: 'h, 'h>(173173+ &'s self,174174+ handle: &'h str,175175+ ) -> futures_util::future::BoxFuture<'h, Result<OwnedDid, identity::ResolveError>> {176176+ use sqlx::Row as _;177177+ async move {178178+ let result = sqlx::query("SELECT did FROM identity WHERE handle = ?")179179+ .bind(handle)180180+ .fetch_one(self.db())181181+ .await182182+ .inspect_err(|error| eprintln!("{error:?}"))183183+ .map_err(|_| identity::ResolveError::UnresolvedHandle)?;184184+185185+ let did: &atproto::Did = result.get("did");186186+ Ok(did.to_owned())187187+ }188188+ .boxed()189189+ }190190+191191+ fn resolve_did<'s: 'd, 'd>(192192+ &'s self,193193+ did: &'d atproto::Did,194194+ ) -> futures_util::future::BoxFuture<'d, Result<DidDocument, identity::ResolveError>> {195195+ use sqlx::Row as _;196196+ async move {197197+ let result = sqlx::query("SELECT doc FROM identity WHERE did = ?")198198+ .bind(did)199199+ .fetch_one(self.db())200200+ .await201201+ .inspect_err(|error| eprintln!("{error:?}"))202202+ .map_err(|_| identity::ResolveError::UnresolvedHandle)?;203203+204204+ let doc: &str = result.get("doc");205205+ let doc = serde_json::from_str(doc).unwrap();206206+ Ok(doc)207207+ }208208+ .boxed()209209+ }210210+}211211+212212+#[derive(Debug)]213213+struct Inner {214214+ db: SqlitePool,215215+ tx: Sender<Event>,216216+ addr: SocketAddr,217217+}218218+219219+pub async fn init() -> (Pds, TcpListener) {220220+ let db = SqlitePoolOptions::new()221221+ .max_connections(1)222222+ .connect_with(223223+ SqliteConnectOptions::new()224224+ .in_memory(true)225225+ .shared_cache(true)226226+ .foreign_keys(true),227227+ )228228+ .await229229+ .unwrap();230230+231231+ sqlx::query(232232+ "CREATE TABLE identity (233233+ handle text NOT NULL,234234+ did text NOT NULL,235235+ rev text NOT NULL,236236+ doc json NOT NULL,237237+ key blob NOT NULL,238238+239239+ PRIMARY KEY (did),240240+ UNIQUE (handle)241241+ )",242242+ )243243+ .execute(&db)244244+ .await245245+ .unwrap();246246+247247+ sqlx::query(248248+ "CREATE TABLE record (249249+ did text NOT NULL,250250+ collection text NOT NULL,251251+ rkey text NOT NULL,252252+ cid text NOT NULL,253253+ data json NOT NULL,254254+255255+ PRIMARY KEY (did, collection, rkey),256256+ FOREIGN KEY (did) REFERENCES identity (did) ON DELETE CASCADE257257+ )",258258+ )259259+ .execute(&db)260260+ .await261261+ .unwrap();262262+263263+ let (tx, _) = broadcast::channel(1);264264+ let listener = TcpListener::bind("127.0.0.1:0")265265+ .await266266+ .expect("Must be able to bind a socket");267267+268268+ let addr = listener269269+ .local_addr()270270+ .expect("Listener must have a local socket");271271+272272+ (273273+ Pds {274274+ inner: Arc::new(Inner { addr, db, tx }),275275+ },276276+ listener,277277+ )278278+}