don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(knot): consolidate logic for repository creation

Signed-off-by: tjh <x@tjh.dev>

tjh 283404ae f910acaf

+123 -51
+34 -2
crates/knot/src/model/knot_state.rs
··· 11 11 use atproto::did::Did; 12 12 use futures_util::{FutureExt, future::BoxFuture}; 13 13 use identity::{HttpClient, Resolver}; 14 - use lexicon::sh::tangled::git::RefUpdate; 14 + use lexicon::sh::tangled::{git::RefUpdate, repo::Repo}; 15 15 use moka::future::{Cache, CacheBuilder}; 16 16 use rayon::{ThreadPool, ThreadPoolBuilder}; 17 17 use serde::Serialize; ··· 23 23 database::{DataStore, DataStoreError}, 24 24 }, 25 25 types::{ 26 + RecordKey, 26 27 repository_key::RepositoryKey, 27 28 repository_path::{self, RepositoryPath}, 28 29 }, ··· 238 237 matches!(result, PolicyResult::Granted) 239 238 } 240 239 241 - pub fn create_repo(&self, repo_key: &RepositoryKey, name: &str) -> anyhow::Result<()> { 240 + pub async fn create_repo(&self, rec: &RecordKey<'_>, repo: &Repo<'_>) -> anyhow::Result<()> { 241 + let RecordKey { 242 + did, 243 + collection, 244 + rkey, 245 + rev, 246 + cid, 247 + } = rec; 248 + 249 + assert_eq!(*collection, "sh.tangled.repo"); 250 + assert_eq!(repo.knot, self.instance_ident()); 251 + 252 + repository_path::validate(&did)?; 253 + repository_path::validate(&rkey)?; 254 + repository_path::validate(&repo.name)?; 255 + 256 + let is_new = self 257 + .database() 258 + .insert_repository(did, rkey, rev, cid, &repo) 259 + .await?; 260 + 261 + if !is_new { 262 + return Ok(()); 263 + } 264 + 265 + let repo_key = RepositoryKey::new(*did, *rkey)?; 266 + self.init_repo(&repo_key, &repo.name)?; 267 + 268 + Ok(()) 269 + } 270 + 271 + pub fn init_repo(&self, repo_key: &RepositoryKey, name: &str) -> anyhow::Result<()> { 242 272 repository_path::validate(&repo_key.owner)?; 243 273 repository_path::validate(&repo_key.rkey)?; 244 274 repository_path::validate(name)?;
+5 -14
crates/knot/src/public/xrpc/sh/tangled/repo.rs
··· 17 17 authorization::{Authorization, Verification}, 18 18 }, 19 19 types::{ 20 - repository_key::RepositoryKey, 20 + RecordKey, 21 21 repository_path::RepositoryPath, 22 22 sh::tangled::repo::{blob, branches, compare, diff, log, tags}, 23 23 }, ··· 123 123 ) 124 124 })?; 125 125 126 - // Use the minimum rev value so *any* firehose-derived entry will have priority. 127 - let rev = Tid::MIN.to_string(); 128 - let is_new = knot 129 - .database() 130 - .insert_repository(&claims.iss, &params.rkey, &rev, "", &repo) 126 + let rec = RecordKey::try_from(&record).map_err(errors::InvalidRequest)?; 127 + knot.create_repo(&rec, &repo) 131 128 .await 132 - .map_err(errors::RepoError)?; 133 - 134 - if is_new && repo.knot == knot.instance_ident() { 135 - let repo_key = 136 - RepositoryKey::new(claims.iss, params.rkey).map_err(errors::InvalidRequest)?; 137 - knot.create_repo(&repo_key, &repo.name) 138 - .map_err(errors::RepoError)?; 139 - } 129 + .inspect_err(|error| tracing::error!(?error, "failed to create repository")) 130 + .map_err(errors::Internal)?; 140 131 141 132 Ok(().into()) 142 133 }
+3 -18
crates/knot/src/services/jetstream.rs
··· 5 5 RemoveCollaboratorPolicy, RemoveMemberPolicy, RepositoryCreatePolicy, 6 6 RepositoryDeletePolicy, RepositoryRef, 7 7 }, 8 - types::repository_key::RepositoryKey, 9 8 }; 10 9 use futures_util::StreamExt as _; 11 10 use jetstream::{CommitEvent, Event, JetstreamClient, client_config::JetstreamConfig}; ··· 211 212 return Ok(()); 212 213 } 213 214 214 - let is_new = knot 215 - .database() 216 - .insert_repository(commit.did, commit.rkey, commit.rev, commit.cid, &repository) 217 - .await?; 218 - 219 - if !is_new { 220 - tracing::debug!( 221 - did = %event.did(), 222 - rkey = %event.rkey(), 223 - name = %repository.name, 224 - "repository already known, not creating" 225 - ); 226 - return Ok(()); 227 - } 228 - 229 - let repo_key = RepositoryKey::new(commit.did, commit.rkey)?; 230 - knot.create_repo(&repo_key, &repository.name)?; 215 + knot.create_repo(&commit.try_into()?, &repository) 216 + .await 217 + .inspect_err(|error| tracing::error!(?error, "failed to create repository"))?; 231 218 } 232 219 CommitEvent::Update(commit) => { 233 220 let Lexicon::Repo(repository) = serde_json::from_str(commit.record.get())? else {
+4 -17
crates/knot/src/services/seed.rs
··· 1 1 use atproto::{Did, tid::Tid}; 2 2 use lexicon::sh::tangled::{knot::Member, repo::Repo}; 3 3 4 - use crate::{model::Knot, services::atrepo, types::repository_key::RepositoryKey}; 4 + use crate::{model::Knot, services::atrepo, types::RecordKey}; 5 5 6 6 pub async fn all(knot: &Knot) -> anyhow::Result<()> { 7 7 let knot = knot.clone(); ··· 78 78 79 79 pub async fn repositories(knot: &Knot, did: &Did) -> anyhow::Result<()> { 80 80 let did = did.to_owned(); 81 - let rev = Tid::MIN.to_string(); 82 81 83 82 atrepo::fetch_collection::<_, anyhow::Error>( 84 83 knot.resolver(), ··· 102 103 continue; 103 104 } 104 105 105 - let repo_key = RepositoryKey::new(did.clone(), rkey)?; 106 - 107 - if let Ok(true) = knot 108 - .database() 109 - .insert_repository( 110 - &repo_key.owner, 111 - &repo_key.rkey, 112 - &rev, 113 - &record.cid, 114 - &repo, 115 - ) 116 - .await 117 - && let Err(error) = knot.create_repo(&repo_key, &repo.name) 118 - { 106 + let rec = RecordKey::try_from(record)?; 107 + if let Err(error) = knot.create_repo(&rec, &repo).await { 119 108 tracing::error!(?error, ?repo, "failed to create repository"); 120 109 continue; 121 - }; 110 + } 122 111 123 112 tracing::info!(?did, ?rkey, name = %repo.name, "new repository"); 124 113 }
+77
crates/knot/src/types.rs
··· 1 + use core::fmt; 2 + 3 + use atproto::Did; 4 + use lexicon::com::atproto::repo::list_records::Record; 5 + 1 6 pub mod push_certificate; 2 7 pub mod repository_key; 3 8 pub mod repository_path; 4 9 pub mod sh; 10 + 11 + pub struct RecordKey<'a> { 12 + pub did: &'a Did, 13 + pub collection: &'a str, 14 + pub rkey: &'a str, 15 + pub rev: &'a str, 16 + pub cid: &'a str, 17 + } 18 + 19 + #[derive(Debug)] 20 + pub struct FromRecordError(&'static str); 21 + 22 + impl fmt::Display for FromRecordError { 23 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 24 + fmt::Display::fmt(&self.0, f) 25 + } 26 + } 27 + 28 + impl From<&'static str> for FromRecordError { 29 + fn from(value: &'static str) -> Self { 30 + Self(value) 31 + } 32 + } 33 + 34 + impl core::error::Error for FromRecordError {} 35 + 36 + impl<'a> TryFrom<&'a Record<'a>> for RecordKey<'a> { 37 + type Error = FromRecordError; 38 + fn try_from(value: &'a Record<'a>) -> Result<Self, Self::Error> { 39 + let did = value 40 + .uri 41 + .did() 42 + .ok_or("'uri' does not have a did authority")?; 43 + let collection = value 44 + .uri 45 + .collection_str() 46 + .ok_or("'uri' does not declare a collection")?; 47 + let rkey = value.uri.rkey.ok_or("'uri' does not declare an rkey")?; 48 + 49 + Ok(Self { 50 + did, 51 + collection, 52 + rkey, 53 + rev: "2222222222222", 54 + cid: &value.cid, 55 + }) 56 + } 57 + } 58 + 59 + impl<'a> TryFrom<&'a jetstream::Commit<'a>> for RecordKey<'a> { 60 + type Error = FromRecordError; 61 + 62 + fn try_from(value: &'a jetstream::Commit<'a>) -> Result<Self, Self::Error> { 63 + let jetstream::Commit { 64 + ts: _, 65 + did, 66 + collection, 67 + rkey, 68 + rev, 69 + cid, 70 + record: _, 71 + } = value; 72 + 73 + Ok(Self { 74 + did, 75 + collection, 76 + rkey, 77 + rev, 78 + cid, 79 + }) 80 + } 81 + }