don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: simplify database setup

* Use sqlite as default database.
* Perform poor-man's backfill on first run.

Signed-off-by: tjh <x@tjh.dev>

tjh d8206397 b694a017

+849 -1011
+1
.gitignore
··· 6 6 jetstream.json 7 7 git_config 8 8 .env 9 + knot.db
+46 -5
Cargo.lock
··· 178 178 source = "registry+https://github.com/rust-lang/crates.io-index" 179 179 checksum = "107a4e9d9cab9963e04e84bb8dee0e25f2a987f9a8bad5ed054abd439caa8f8c" 180 180 dependencies = [ 181 - "bindgen", 181 + "bindgen 0.72.1", 182 182 "cc", 183 183 "cmake", 184 184 "dunce", ··· 299 299 300 300 [[package]] 301 301 name = "bindgen" 302 + version = "0.69.5" 303 + source = "registry+https://github.com/rust-lang/crates.io-index" 304 + checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" 305 + dependencies = [ 306 + "bitflags", 307 + "cexpr", 308 + "clang-sys", 309 + "itertools 0.12.1", 310 + "lazy_static", 311 + "lazycell", 312 + "proc-macro2", 313 + "quote", 314 + "regex", 315 + "rustc-hash 1.1.0", 316 + "shlex", 317 + "syn 2.0.110", 318 + ] 319 + 320 + [[package]] 321 + name = "bindgen" 302 322 version = "0.72.1" 303 323 source = "registry+https://github.com/rust-lang/crates.io-index" 304 324 checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" ··· 326 306 "bitflags", 327 307 "cexpr", 328 308 "clang-sys", 329 - "itertools", 309 + "itertools 0.13.0", 330 310 "log", 331 311 "prettyplease", 332 312 "proc-macro2", 333 313 "quote", 334 314 "regex", 335 - "rustc-hash", 315 + "rustc-hash 2.1.1", 336 316 "shlex", 337 317 "syn 2.0.110", 338 318 ] ··· 2647 2627 2648 2628 [[package]] 2649 2629 name = "itertools" 2630 + version = "0.12.1" 2631 + source = "registry+https://github.com/rust-lang/crates.io-index" 2632 + checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" 2633 + dependencies = [ 2634 + "either", 2635 + ] 2636 + 2637 + [[package]] 2638 + name = "itertools" 2650 2639 version = "0.13.0" 2651 2640 source = "registry+https://github.com/rust-lang/crates.io-index" 2652 2641 checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" ··· 2777 2748 "rand 0.9.2", 2778 2749 "rayon", 2779 2750 "reqwest", 2780 - "rustc-hash", 2751 + "rustc-hash 2.1.1", 2781 2752 "serde", 2782 2753 "serde_json", 2783 2754 "sqlx", ··· 2812 2783 dependencies = [ 2813 2784 "spin", 2814 2785 ] 2786 + 2787 + [[package]] 2788 + name = "lazycell" 2789 + version = "1.3.0" 2790 + source = "registry+https://github.com/rust-lang/crates.io-index" 2791 + checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" 2815 2792 2816 2793 [[package]] 2817 2794 name = "lexicon" ··· 2872 2837 source = "registry+https://github.com/rust-lang/crates.io-index" 2873 2838 checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" 2874 2839 dependencies = [ 2840 + "bindgen 0.69.5", 2875 2841 "pkg-config", 2876 2842 "vcpkg", 2877 2843 ] ··· 3652 3616 3653 3617 [[package]] 3654 3618 name = "rustc-hash" 3619 + version = "1.1.0" 3620 + source = "registry+https://github.com/rust-lang/crates.io-index" 3621 + checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" 3622 + 3623 + [[package]] 3624 + name = "rustc-hash" 3655 3625 version = "2.1.1" 3656 3626 source = "registry+https://github.com/rust-lang/crates.io-index" 3657 3627 checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" ··· 4047 4005 "indexmap", 4048 4006 "log", 4049 4007 "memchr", 4050 - "native-tls", 4051 4008 "once_cell", 4052 4009 "percent-encoding", 4053 4010 "serde",
+26 -2
crates/atproto/src/aturi.rs
··· 2 2 //! 3 3 //! <https://atproto.com/specs/at-uri-scheme> 4 4 //! 5 + use serde::{Deserialize, Serialize}; 6 + 5 7 use crate::{did::Did, handle::Handle, nsid::Nsid}; 6 8 7 9 #[derive(Debug, Hash, PartialEq, Eq)] ··· 14 12 } 15 13 16 14 impl<'a> AtUri<'a> { 17 - #[inline] 18 15 pub const fn did(&self) -> Option<&'a Did> { 19 16 match self.authority { 20 17 Authority::Did(did) => Some(did), ··· 21 20 } 22 21 } 23 22 24 - #[inline] 25 23 pub const fn handle(&self) -> Option<&'a Handle> { 26 24 match self.authority { 27 25 Authority::Handle(handle) => Some(handle), 28 26 _ => None, 29 27 } 28 + } 29 + 30 + pub fn collection_str(&self) -> Option<&str> { 31 + self.collection.map(|col| col.as_str()) 30 32 } 31 33 } 32 34 ··· 73 69 collection, 74 70 rkey, 75 71 }) 72 + } 73 + } 74 + 75 + impl<'de: 'a, 'a> Deserialize<'de> for AtUri<'a> { 76 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 77 + where 78 + D: serde::Deserializer<'de>, 79 + { 80 + let s = <&str as Deserialize>::deserialize(deserializer)?; 81 + let uri = Self::parse(s).map_err(serde::de::Error::custom)?; 82 + Ok(uri) 83 + } 84 + } 85 + 86 + impl Serialize for AtUri<'_> { 87 + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 88 + where 89 + S: serde::Serializer, 90 + { 91 + unimplemented!() 76 92 } 77 93 } 78 94
+1 -1
crates/atproto/src/tid.rs
··· 13 13 /// Timestamp Identifier 14 14 /// 15 15 /// See: <https://atproto.com/specs/tid> 16 - #[derive(Clone, Default, Hash, PartialEq, Eq, PartialOrd, Ord)] 16 + #[derive(Clone, Copy, Default, Hash, PartialEq, Eq, PartialOrd, Ord)] 17 17 pub struct Tid(u64); 18 18 19 19 impl Tid {
+19
crates/jetstream/src/de.rs
··· 57 57 58 58 #[derive(Debug, Default)] 59 59 pub enum AccountStatus { 60 + Active, 60 61 Takendown, 61 62 Suspended, 62 63 Deleted, ··· 87 86 E: serde::de::Error, 88 87 { 89 88 match v { 89 + "active" => Ok(AccountStatus::Active), 90 90 "takendown" => Ok(AccountStatus::Takendown), 91 91 "suspened" => Ok(AccountStatus::Suspended), 92 92 "deleted" => Ok(AccountStatus::Deleted), ··· 125 123 S: serde::Serializer, 126 124 { 127 125 match self { 126 + Self::Active => serializer.serialize_str("active"), 128 127 Self::Takendown => serializer.serialize_str("takendown"), 129 128 Self::Suspended => serializer.serialize_str("suspended"), 130 129 Self::Deleted => serializer.serialize_str("deleted"), ··· 153 150 Self::Identity(identity) => identity.did, 154 151 } 155 152 } 153 + 154 + pub const fn ts(&self) -> OffsetDateTime { 155 + match self { 156 + Self::Commit(commit) => commit.ts(), 157 + Self::Account(account) => account.ts, 158 + Self::Identity(identity) => identity.ts, 159 + } 160 + } 156 161 } 157 162 158 163 #[derive(Debug)] ··· 171 160 } 172 161 173 162 impl<'a> CommitEvent<'a> { 163 + pub const fn ts(&self) -> OffsetDateTime { 164 + match self { 165 + Self::Create(commit) => commit.ts, 166 + Self::Update(commit) => commit.ts, 167 + Self::Delete(commit) => commit.ts, 168 + } 169 + } 170 + 174 171 pub const fn did(&'a self) -> &'a Did { 175 172 match self { 176 173 Self::Create(commit) => commit.did,
+1 -1
crates/knot/Cargo.toml
··· 39 39 rand = "0.9.2" 40 40 rayon = "1.11.0" 41 41 rustc-hash = "2.1.1" 42 - sqlx = { version = "0.8.6", features = ["runtime-tokio", "tls-native-tls", "postgres", "time", "json", "macros", "derive"] } 42 + sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite-unbundled", "time", "json", "macros", "derive"] } 43 43 time.workspace = true 44 44 tokio = { version = "1.47.1", features = ["io-util", "macros", "net", "process", "signal", "rt-multi-thread"] } 45 45 tokio-rayon = "2.1.0"
+4 -3
crates/knot/migrations/20251103141538_init.down.sql
··· 1 + DROP TABLE jetstream_cursor; 1 2 DROP TABLE public_key; 2 3 DROP TABLE repository; 3 - -- DROP TABLE jetstream_log; 4 - -- DROP TYPE jetstream_action; 5 - DROP FUNCTION updated_at_trigger; 4 + DROP TABLE knot_member; 5 + DROP TABLE repository_member; 6 + DROP TABLE event;
+48 -59
crates/knot/migrations/20251103141538_init.up.sql
··· 1 - CREATE OR REPLACE FUNCTION updated_at_trigger() RETURNS TRIGGER 2 - LANGUAGE plpgsql AS 3 - $$BEGIN 4 - NEW.updated_at := current_timestamp; 5 - RETURN NEW; 6 - END;$$; 1 + CREATE TABLE jetstream_cursor ( 2 + id integer NOT NULL, 3 + cursor integer NOT NULL, 7 4 8 - DO $$ BEGIN 9 - CREATE TYPE jetstream_action AS ENUM ( 10 - 'create', 11 - 'update', 12 - 'delete' 13 - ); 14 - EXCEPTION 15 - WHEN duplicate_object THEN null; 16 - END $$; 5 + PRIMARY KEY (id) 6 + ); 17 7 18 - -- Commit updates from Jetstream (create, update, and delete). 19 - CREATE TABLE IF NOT EXISTS jetstream_log ( 20 - ts timestamp WITH TIME ZONE NOT NULL, 21 - did text NOT NULL, 22 - collection text NOT NULL, 23 - rkey text NOT NULL, 24 - rev text NOT NULL, 25 - action jetstream_action NOT NULL, 26 - cid text, 27 - payload json, 8 + CREATE TABLE knot_member ( 9 + did text NOT NULL, 28 10 29 - PRIMARY KEY (did, collection, rkey, rev) 11 + PRIMARY KEY (did) 30 12 ); 31 13 32 14 -- Public keys, derived from 'sh.tangled.publicKey' records. 33 - CREATE TABLE IF NOT EXISTS public_key ( 34 - did text NOT NULL, 35 - rkey text NOT NULL, 36 - cid text NOT NULL, 15 + CREATE TABLE public_key ( 16 + did text NOT NULL, 17 + rkey text NOT NULL, 18 + rev text NOT NULL, 19 + cid text NOT NULL, 37 20 38 21 -- 'sh.tangled.publicKey' fields 39 22 -- <https://tangled.org/@tangled.org/core/blob/master/lexicons/publicKey.json> 40 - name text NOT NULL, 41 - key text NOT NULL, 42 - created_at timestamp WITH TIME ZONE NOT NULL, 23 + name text NOT NULL, 24 + key text NOT NULL, 25 + created_at datetime NOT NULL, 43 26 44 - updated_at timestamp WITH TIME ZONE NOT NULL DEFAULT current_timestamp, 27 + PRIMARY KEY (did, rkey), 28 + FOREIGN KEY (did) REFERENCES knot_member (did) ON DELETE CASCADE 45 29 46 - PRIMARY KEY (did, rkey) 47 30 ); 48 31 49 - CREATE OR REPLACE TRIGGER public_key_update_trigger 50 - BEFORE UPDATE ON public_key 51 - FOR EACH ROW EXECUTE PROCEDURE updated_at_trigger(); 52 - 53 32 -- Repositories, derived from 'sh.tangled.repo' records. 54 - CREATE TABLE IF NOT EXISTS repository ( 55 - did text NOT NULL, 56 - rkey text NOT NULL, 57 - cid text NOT NULL, 33 + CREATE TABLE repository ( 34 + did text NOT NULL, 35 + rkey text NOT NULL, 36 + rev text NOT NULL, 37 + cid text NOT NULL, 58 38 59 39 -- 'sh.tangled.repo' fields 60 40 -- <https://tangled.org/@tangled.org/core/blob/master/lexicons/repo/repo.json> 61 - name text NOT NULL, 62 - knot text NOT NULL, 63 - spindle text, 64 - description text, 65 - website text, 66 - topics text[], 67 - source text, 68 - labels text[], 69 - created_at timestamp WITH TIME ZONE NOT NULL, 70 - 71 - xrpc_create_at timestamp WITH TIME ZONE, 72 - jetstream_at timestamp WITH TIME ZONE, 73 - updated_at timestamp WITH TIME ZONE NOT NULL DEFAULT current_timestamp, 41 + name text NOT NULL, 42 + knot text NOT NULL, 43 + spindle text, 44 + source text, 45 + created_at datetime NOT NULL, 74 46 75 47 PRIMARY KEY (did, rkey) 48 + FOREIGN KEY (did) REFERENCES knot_member (did) ON DELETE CASCADE 76 49 ); 77 50 78 - CREATE OR REPLACE TRIGGER repository_update_trigger 79 - BEFORE UPDATE ON repository 80 - FOR EACH ROW EXECUTE PROCEDURE updated_at_trigger(); 51 + CREATE TABLE repository_member ( 52 + repo_did text NOT NULL, 53 + repo_rkey text NOT NULL, 54 + did text NOT NULL, 55 + 56 + PRIMARY KEY (repo_did, repo_rkey, did), 57 + FOREIGN KEY (repo_did, repo_rkey) REFERENCES repository (did, rkey) ON DELETE CASCADE 58 + ); 59 + 60 + CREATE TABLE event ( 61 + id integer PRIMARY KEY AUTOINCREMENT NOT NULL, 62 + ts datetime NOT NULL, 63 + repo_did text NOT NULL, 64 + repo_rkey text NOT NULL, 65 + collection text NOT NULL, 66 + record jsonb NOT NULL, 67 + 68 + FOREIGN KEY (repo_did, repo_rkey) REFERENCES repository (did, rkey) ON DELETE CASCADE 69 + );
-2
crates/knot/migrations/20251109121913_members.down.sql
··· 1 - DROP TABLE repository_member; 2 - DROP TABLE knot_member;
-15
crates/knot/migrations/20251109121913_members.up.sql
··· 1 - CREATE TABLE repository_member ( 2 - repo_did text NOT NULL, 3 - repo_rkey text NOT NULL, 4 - member_did text NOT NULL, 5 - 6 - PRIMARY KEY (repo_did, repo_rkey, member_did), 7 - FOREIGN KEY (repo_did, repo_rkey) REFERENCES repository (did, rkey) ON DELETE CASCADE 8 - ); 9 - 10 - CREATE TABLE knot_member ( 11 - instance_name text NOT NULL, 12 - member_did text NOT NULL, 13 - 14 - PRIMARY KEY (instance_name, member_did) 15 - );
-1
crates/knot/migrations/20251126224502_events.down.sql
··· 1 - DROP TABLE events;
-9
crates/knot/migrations/20251126224502_events.up.sql
··· 1 - CREATE TABLE events ( 2 - id serial PRIMARY KEY, 3 - collection text NOT NULL, 4 - rkey text NOT NULL, 5 - event jsonb NOT NULL, 6 - 7 - UNIQUE (collection, rkey), 8 - UNIQUE (event) 9 - );
+8 -3
crates/knot/src/cli.rs
··· 2 2 use core::fmt; 3 3 use identity::{Did, ResolveError, Resolver}; 4 4 use std::{path::PathBuf, str::FromStr}; 5 - use url::Url; 6 5 7 6 #[derive(Parser)] 8 7 #[command(about, author, version)] ··· 32 33 )] 33 34 pub addr: Vec<String>, 34 35 35 - #[arg(long, short = 'D', env = "KNOT_SERVER_DATABASE_URL")] 36 - pub db: Url, 36 + /// Database filename 37 + #[arg( 38 + long, 39 + short = 'D', 40 + env = "KNOT_SERVER_DATABASE_PATH", 41 + default_value = "knot.db" 42 + )] 43 + pub db: PathBuf, 37 44 38 45 /// Port number of the real knot-server. 39 46 #[arg(long, short = 'U', env = "KNOT_SERVER_UPSTREAM")]
+55 -24
crates/knot/src/main.rs
··· 9 9 use identity::Resolver; 10 10 use knot::{ 11 11 model::{Knot, KnotState, config::KnotConfiguration}, 12 - services::database::{DataStore, PgDatabase}, 12 + services::database::DataStore, 13 13 }; 14 14 use reqwest::ClientBuilder; 15 - use sqlx::postgres::PgPoolOptions; 15 + use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; 16 16 use std::{ 17 17 env, 18 18 ffi::OsString, ··· 139 139 .success() 140 140 ); 141 141 142 - let pool = PgPoolOptions::new() 143 - .max_connections(5) 144 - .connect(arguments.db.as_str()) 145 - .await?; 142 + let pool = { 143 + let connect_options = SqliteConnectOptions::new() 144 + .filename("knot.db") 145 + .create_if_missing(true) 146 + .foreign_keys(true); 146 147 147 - let row: (String,) = sqlx::query_as("SELECT version()").fetch_one(&pool).await?; 148 - tracing::info!(version = %row.0, "connected to db"); 148 + SqlitePoolOptions::new() 149 + .connect_with(connect_options) 150 + .await? 151 + }; 152 + 153 + // Run database migrations. 149 154 sqlx::migrate!().run(&pool).await?; 155 + 156 + let db = DataStore::new(pool.clone()); 150 157 151 158 let rate_limit: &'static _ = Box::leak(Box::new( 152 159 ServiceBuilder::new() ··· 187 180 .without_v07_checks() 188 181 .merge(knot::public::router()); 189 182 190 - let store = DataStore::new(PgDatabase::new(pool)); 191 - let jetstream_cursor = store 192 - .jetstream_cursor() 193 - .await 194 - .context("Failed to query last jetstream cursor from datastore")? 195 - .map(|odt| (odt, (odt.unix_timestamp_nanos() / 1000).unsigned_abs())); 183 + let jetstream = { 184 + let cursor = db 185 + .get_jetstream_cursor() 186 + .await 187 + .context("Failed to query last jetstream cursor from datastore")? 188 + .map(|odt| (odt, (odt.unix_timestamp_nanos() / 1000).unsigned_abs())); 196 189 197 - if let Some((cursor, cursor_us)) = &jetstream_cursor { 198 - tracing::info!(?cursor, ?cursor_us, "found jetstream cursor"); 199 - } 190 + if let Some((cursor, cursor_us)) = &cursor { 191 + tracing::info!(?cursor, ?cursor_us, "found jetstream cursor"); 192 + } 200 193 201 - let jetstream = jetstream::builder() 202 - .collection("sh.tangled.*".try_into()?) 203 - .cursor(jetstream_cursor.map(|(_, ts)| ts)) 204 - .build(Url::parse(jetstream::PUBLIC_JETSTREAM_INSTANCES[0])?); 194 + jetstream::builder() 195 + .collection("sh.tangled.publicKey".try_into()?) 196 + .collection("sh.tangled.repo".try_into()?) 197 + .cursor(cursor.map(|(_, ts)| ts)) 198 + .build(Url::parse(jetstream::PUBLIC_JETSTREAM_INSTANCES[0])?) 199 + }; 205 200 206 201 let mut service = JoinSet::new(); 207 202 let mut private_sockets = Vec::new(); ··· 217 208 .map(|listener| listener.local_addr().unwrap()) 218 209 .collect(); 219 210 211 + tracing::info!(?private_addrs, "bound internal API"); 212 + 220 213 let config = KnotConfiguration::builder() 221 214 .instance_name(&arguments.name) 222 215 .owner_did(&resolved_owner) ··· 228 217 .private_sockets(&private_addrs) 229 218 .build()?; 230 219 231 - let knot: Knot = KnotState::new(config, resolver, public_http, jetstream, store).into(); 220 + let knot: Knot = KnotState::new(config, resolver, public_http, jetstream, db).into(); 221 + 222 + { 223 + let knot = knot.clone(); 224 + tokio::spawn(async move { 225 + let owner = resolved_owner.as_str(); 226 + match sqlx::query!( 227 + "INSERT INTO knot_member (did) VALUES (?) ON CONFLICT (did) DO NOTHING RETURNING did", 228 + owner 229 + ) 230 + .fetch_optional(&pool) 231 + .await { 232 + Ok(Some(_)) => { 233 + knot.backfill_public_keys().await.unwrap(); 234 + knot.backfill_repositories(&resolved_owner).await.unwrap(); 235 + }, 236 + Ok(None) => tracing::debug!("skipping public key backfill"), 237 + Err(error) => tracing::error!(?error) 238 + } 239 + }); 240 + } 232 241 233 242 let router = router 234 243 .layer(SetRequestIdLayer::new( ··· 264 233 let uri = request.uri(); 265 234 let path = uri.path(); 266 235 267 - let span = tracing::trace_span!("public", id = Empty, ?method, ?path); 236 + let span = tracing::info_span!("public", id = Empty, ?method, ?path); 268 237 if let Some(request_id) = extract_request_id(request) { 269 238 span.record("id", request_id); 270 239 } ··· 273 242 }) 274 243 .on_request(|_: &Request<_>, _: &Span| {}) 275 244 .on_response(|response: &Response<_>, latency: Duration, _: &Span| { 276 - tracing::trace!(?latency, status = ?response.status()); 245 + tracing::info!(?latency, status = ?response.status()); 277 246 }), 278 247 ) 279 248 .with_state(knot.clone());
+5 -5
crates/knot/src/model.rs
··· 80 80 .env_clear() 81 81 .env(private::ENV_PRIVATE_ENDPOINTS, self.private_endpoints()) 82 82 .env(private::ENV_REPO_DID, resolved.owner.as_str()) 83 - .env(private::ENV_REPO_RKEY, resolved.rkey.as_ref()) 83 + .env(private::ENV_REPO_RKEY, resolved.rkey) 84 84 .env("GIT_CONFIG_GLOBAL", self.git_config_path()) 85 85 .current_dir(path) 86 86 .args(["upload-archive"]) ··· 104 104 .env_clear() 105 105 .env(private::ENV_PRIVATE_ENDPOINTS, self.private_endpoints()) 106 106 .env(private::ENV_REPO_DID, resolved.owner.as_str()) 107 - .env(private::ENV_REPO_RKEY, resolved.rkey.as_ref()) 107 + .env(private::ENV_REPO_RKEY, resolved.rkey) 108 108 .env("GIT_CONFIG_GLOBAL", self.git_config_path()) 109 109 .current_dir(path) 110 110 .args([ ··· 134 134 .env_clear() 135 135 .env(private::ENV_PRIVATE_ENDPOINTS, self.private_endpoints()) 136 136 .env(private::ENV_REPO_DID, resolved.owner.as_str()) 137 - .env(private::ENV_REPO_RKEY, resolved.rkey.as_ref()) 137 + .env(private::ENV_REPO_RKEY, resolved.rkey) 138 138 .env("GIT_CONFIG_GLOBAL", self.git_config_path()) 139 139 .current_dir(path) 140 140 .args(["upload-pack", "--strict", "--stateless-rpc"]) ··· 173 173 .env_clear() 174 174 .env(private::ENV_PRIVATE_ENDPOINTS, self.private_endpoints()) 175 175 .env(private::ENV_REPO_DID, resolved.owner.as_str()) 176 - .env(private::ENV_REPO_RKEY, resolved.rkey.as_ref()) 176 + .env(private::ENV_REPO_RKEY, resolved.rkey) 177 177 .env(private::ENV_USER_DID, auth.iss.as_str()) 178 178 .env("GIT_CONFIG_GLOBAL", self.git_config_path()) 179 179 .current_dir(path) ··· 230 230 .env_clear() 231 231 .env(private::ENV_PRIVATE_ENDPOINTS, self.private_endpoints()) 232 232 .env(private::ENV_REPO_DID, resolved.owner.as_str()) 233 - .env(private::ENV_REPO_RKEY, resolved.rkey.as_ref()) 233 + .env(private::ENV_REPO_RKEY, resolved.rkey) 234 234 .env(private::ENV_USER_DID, auth.iss.as_str()) 235 235 .env("GIT_CONFIG_GLOBAL", self.git_config_path()) 236 236 .current_dir(path)
+104 -75
crates/knot/src/model/knot_state.rs
··· 6 6 }; 7 7 8 8 use atproto::{did::Did, tid::Tid}; 9 - use auth::{jwt, public_key}; 9 + use auth::jwt; 10 10 use bytes::Bytes; 11 11 use identity::{HttpClient, Resolver}; 12 12 use jetstream::JetstreamClient; 13 - use lexicon::{com::atproto::repo::list_records, sh::tangled::git::RefUpdate}; 13 + use lexicon::sh::tangled::{git::RefUpdate, repo::Repo}; 14 14 use rayon::{ThreadPool, ThreadPoolBuilder}; 15 15 use serde::Serialize; 16 16 use time::OffsetDateTime; ··· 18 18 19 19 use crate::{ 20 20 services::{ 21 + atrepo, 21 22 authorization::AuthorizationClaimsStore, 22 23 database::{DataStore, DataStoreError}, 23 24 }, ··· 65 64 /// Thread pool for running synchronous tasks. 66 65 pool: ThreadPool, 67 66 68 - events: tokio::sync::broadcast::Sender<(i32, Tid, Event)>, 67 + events: tokio::sync::broadcast::Sender<(i64, OffsetDateTime, Event)>, 69 68 70 69 /// Stores JWT claims to prevent re-use. 71 70 jwt_claims: Mutex<HashMap<Box<str>, jwt::Claims>>, ··· 153 152 &self.pool 154 153 } 155 154 156 - pub(crate) fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<(i32, Tid, Event)> { 155 + pub(crate) fn subscribe_events( 156 + &self, 157 + ) -> tokio::sync::broadcast::Receiver<(i64, OffsetDateTime, Event)> { 157 158 self.events.subscribe() 158 159 } 159 160 160 - pub(crate) async fn send_event(&self, id: i32, rkey: Tid, event: Event) { 161 - if self.events.send((id, rkey, event)).is_err() { 161 + pub(crate) async fn send_event(&self, id: i64, ts: OffsetDateTime, event: Event) { 162 + if self.events.send((id, ts, event)).is_err() { 162 163 tracing::warn!("no external listeners to consume events"); 163 164 } 164 165 } ··· 176 173 self.jwt_claims.lock().unwrap() 177 174 } 178 175 179 - /// Get public keys the knot holds for the specified DID. 180 - /// 181 - /// If the knot has no public keys stored, they will be fetched from the PDS associated with 182 - /// the DID. 183 - pub async fn public_keys(&self, did: &Did) -> Vec<public_key::PublicKey> { 184 - let keys: Vec<_> = self 185 - .store 186 - .public_keys_for_did(did) 187 - .await 188 - .inspect_err(|error| { 189 - tracing::error!(?error, "failed to query public keys from database") 190 - }) 191 - .unwrap_or_default() 192 - .into_iter() 193 - .filter_map(|pk| { 194 - public_key::PublicKey::from_openssh(&pk.key) 195 - .inspect_err(|error| { 196 - tracing::error!(?error, ?pk, "failed to parse public key from db") 197 - }) 198 - .ok() 199 - }) 200 - .collect(); 176 + pub async fn fetch_public_keys(&self, did: &Did) -> anyhow::Result<()> { 177 + let db = self.store().clone(); 178 + let did = did.to_owned(); 179 + let rev = Tid::MIN.to_string(); 201 180 202 - match keys.is_empty() { 203 - true => { 204 - tracing::info!(?did, "fetching sh.tangled.publicKeys for DID"); 205 - self.fetch_public_keys(did).await.unwrap_or_default() 206 - } 207 - false => keys, 208 - } 181 + atrepo::fetch_collection::<_, anyhow::Error>( 182 + self.resolver(), 183 + &self.public_http, 184 + &did.clone(), 185 + "sh.tangled.publicKey", 186 + async move |records| { 187 + for record in records { 188 + let rkey = record 189 + .uri 190 + .rkey 191 + .ok_or(anyhow::anyhow!("record uri missing rkey"))?; 192 + 193 + let Ok(public_key) = serde_json::from_str(record.value.get()) else { 194 + continue; 195 + }; 196 + 197 + db.upsert_public_key(&did, &rkey, &rev, &record.cid, &public_key) 198 + .await?; 199 + } 200 + 201 + Ok(()) 202 + }, 203 + ) 204 + .await?; 205 + 206 + Ok(()) 209 207 } 210 208 211 - pub async fn fetch_public_keys(&self, did: &Did) -> anyhow::Result<Vec<public_key::PublicKey>> { 212 - use lexicon::sh::tangled::PublicKey as LexiconPublicKey; 213 - use url::Url; 209 + pub async fn backfill_knot_members(&self) -> anyhow::Result<()> { 210 + Ok(()) 211 + } 214 212 215 - fn list_records_url(mut pds: Url, collection: &str, repo: &Did) -> Url { 216 - pds.set_path("/xrpc/com.atproto.repo.listRecords"); 217 - 218 - let mut query = pds.query_pairs_mut(); 219 - query.append_pair("repo", repo.as_str()); 220 - query.append_pair("collection", collection); 221 - drop(query); 222 - 223 - pds 213 + /// Fetch public keys for all knot members. 214 + pub async fn backfill_public_keys(&self) -> anyhow::Result<()> { 215 + let mut knot_members = self.store().knot_members(); 216 + while let Some(member) = knot_members.next().await { 217 + let member = member?; 218 + if let Err(error) = self.fetch_public_keys(&member).await { 219 + tracing::error!( 220 + ?error, 221 + ?member, 222 + "failed to backfill knot member's public keys" 223 + ); 224 + } 224 225 } 226 + Ok(()) 227 + } 225 228 226 - let (_, doc) = self.resolver.resolve(did.as_str()).await?; 227 - let pds = &doc 228 - .atproto_pds() 229 - .ok_or(anyhow::anyhow!("DID document does not declare a pds"))? 230 - .service_endpoint; 229 + pub async fn backfill_repositories(&self, did: &Did) -> anyhow::Result<()> { 230 + let base = self.config.repository_path().to_path_buf(); 231 + let db = self.store().clone(); 232 + let instance = self.instance_name().to_owned(); 233 + let did = did.to_owned(); 234 + let rev = Tid::MIN.to_string(); 231 235 232 - let response = self 233 - .public_http 234 - .get(list_records_url(pds.clone(), "sh.tangled.publicKey", did)) 235 - .send() 236 - .await? 237 - .error_for_status()? 238 - .bytes() 239 - .await?; 236 + atrepo::fetch_collection::<_, anyhow::Error>( 237 + self.resolver(), 238 + &self.public_http, 239 + &did.clone(), 240 + "sh.tangled.repo", 241 + async move |records| { 242 + for record in records { 243 + let rkey = record 244 + .uri 245 + .rkey 246 + .ok_or(anyhow::anyhow!("record uri missing rkey"))?; 240 247 241 - let public_keys: list_records::Output<LexiconPublicKey> = 242 - serde_json::from_slice(&response)?; 248 + let Ok(repo) = serde_json::from_str::<Repo>(record.value.get()) else { 249 + tracing::error!(?record, "error parsing record"); 250 + continue; 251 + }; 243 252 244 - for record in public_keys.records() { 245 - self.store.upsert_public_key_from_record(record).await?; 246 - } 253 + if repo.knot != instance { 254 + continue; 255 + } 247 256 248 - Ok(public_keys 249 - .records() 250 - .filter_map(|rec| { 251 - public_key::PublicKey::from_openssh(&rec.value.key) 252 - .inspect_err(|e| { 253 - tracing::error!(?e, ?did, "failed to parse public key from collection") 254 - }) 255 - .ok() 256 - }) 257 - .collect()) 257 + if let Ok(true) = db 258 + .insert_repository(&did, &rkey, &rev, &record.cid, &repo) 259 + .await 260 + { 261 + let path = base.join(did.as_str()).join(rkey); 262 + let Ok(new_repo) = gix::init_bare(&path) else { 263 + continue; 264 + }; 265 + 266 + tracing::info!(?new_repo, "created repository"); 267 + 268 + // Create a symlink to map the repository name -> rkey. 269 + let symlink_path = base.join(did.as_str()).join(repo.name.as_ref()); 270 + let _ = std::fs::remove_file(&symlink_path); 271 + let _ = std::os::unix::fs::symlink(rkey, &symlink_path); 272 + } 273 + } 274 + 275 + Ok(()) 276 + }, 277 + ) 278 + .await?; 279 + 280 + Ok(()) 258 281 } 259 282 260 283 pub async fn fetch_pds_record(
+14 -14
crates/knot/src/private.rs
··· 1 1 use core::fmt; 2 2 use std::{borrow::Cow, sync::Arc}; 3 3 4 - use atproto::{Did, tid::TidClock}; 4 + use atproto::Did; 5 5 use axum::{ 6 6 extract::{FromRequestParts, Path, State}, 7 7 http::{HeaderMap, StatusCode, request::Parts}, ··· 9 9 }; 10 10 use lexicon::sh::tangled::git::{Meta, RefUpdate}; 11 11 use serde::{Deserialize, Serialize}; 12 + use time::OffsetDateTime; 12 13 13 14 /// Environment variable containing one or more whitespace separated URLs for the internal API. 14 15 /// ··· 35 34 pub const ENV_HEADER_PREFIX: &str = "X-Gordian"; 36 35 37 36 use crate::{ 38 - model::{Knot, errors}, 37 + model::{Knot, errors, knot_state::Event}, 39 38 public::xrpc::XrpcError, 40 39 types::{push_certificate::PushCertificate, repository_key::RepositoryKey}, 41 40 }; 42 - 43 - static TID_CLOCK: TidClock = TidClock::with_id(0); 44 41 45 42 /// Build a new router for the internal API. 46 43 #[rustfmt::skip] ··· 304 305 r#ref: Cow::Owned(refname.into()), 305 306 committer_did: Cow::Owned(user_did.as_ref().into()), 306 307 repo_did: Cow::Owned(repo.owner.as_ref().into()), 307 - repo_name: Cow::Owned(repo_name.as_ref().into()), 308 + repo_name: Cow::Owned(repo_name.clone()), 308 309 old_sha: Cow::Owned(old_sha.into()), 309 310 new_sha: Cow::Owned(new_sha.into()), 310 311 meta: Meta { ··· 313 314 }, 314 315 }; 315 316 316 - let rkey = TID_CLOCK.next(); 317 + let ts = OffsetDateTime::now_utc(); 318 + let event = Event::RefUpdate(Arc::new(ref_update)); 317 319 let id = state 318 320 .store() 319 - .insert_event("sh.tangled.git.refUpdate", &rkey, &ref_update) 321 + .insert_event( 322 + ts, 323 + &repo.owner, 324 + &repo.rkey, 325 + "sh.tangled.git.refUpdate", 326 + &event, 327 + ) 320 328 .await 321 329 .unwrap(); 322 330 323 331 tracing::info!(?id); 324 - state 325 - .send_event( 326 - id, 327 - rkey, 328 - crate::model::knot_state::Event::RefUpdate(Arc::new(ref_update)), 329 - ) 330 - .await; 332 + state.send_event(id, ts, event).await; 331 333 } 332 334 333 335 Ok(StatusCode::NO_CONTENT)
+14 -12
crates/knot/src/public/events.rs
··· 1 1 use std::time::Duration; 2 2 3 + use atproto::tid::Tid; 3 4 use axum::{ 4 5 extract::{ 5 6 Query, State, WebSocketUpgrade, ··· 66 65 67 66 let (mut sender, mut receiver) = socket.split(); 68 67 69 - let mut past_events = state.store().get_events(start_ts); 70 - while let Some(Ok(db_event)) = past_events.next().await { 71 - cursor = db_event.id; 68 + let mut past_events = state.store().get_events(&start_ts); 69 + while let Some(Ok(event)) = past_events.next().await { 70 + cursor = event.id; 72 71 let wrapper = EventWrapper { 73 - nsid: &db_event.collection, 74 - rkey: &db_event.rkey.to_string(), 75 - event: &db_event.event, 72 + nsid: &event.collection, 73 + rkey: &Tid::from_datetime(event.ts, event.id.rem_euclid(1023).try_into().unwrap()) 74 + .to_string(), 75 + event: &event.record, 76 76 }; 77 77 78 78 let serialized = serde_json::to_string(&wrapper).unwrap(); ··· 84 82 } 85 83 86 84 loop { 87 - let (event_id, rkey, event) = tokio::select! { 85 + let (id, ts, event) = tokio::select! { 88 86 now = keep_alive.tick() => { 89 87 let bytes = (now.duration_since(start)).as_secs().to_string().into(); 90 88 if let Err(error) = sender.send(Message::Ping(bytes)).await { ··· 94 92 continue; 95 93 } 96 94 Ok(Some(message)) = receiver.try_next() => { 97 - tracing::debug!(?message); 95 + tracing::trace!(?message); 98 96 continue; 99 97 } 100 98 Ok(event) = events.recv() => event, 101 99 else => break, 102 100 }; 103 101 104 - if event_id < cursor { 105 - tracing::debug!(?event_id, "skipping event, client has already seen"); 102 + if id < cursor { 103 + tracing::debug!(?id, "skipping event, client has already seen"); 106 104 continue; 107 105 } 108 106 109 107 let wrapper = EventWrapper { 110 108 nsid: event.collection(), 111 - rkey: &rkey.to_string(), 109 + rkey: &Tid::from_datetime(ts, id.rem_euclid(1023).try_into().unwrap()).to_string(), 112 110 event: &event, 113 111 }; 114 112 ··· 118 116 return; 119 117 } 120 118 121 - cursor = event_id; 119 + cursor = id; 122 120 } 123 121 }
+10 -3
crates/knot/src/public/git/authorization.rs
··· 80 80 81 81 // Read the 'sh.tangled.publicKey' records the knot has associated 82 82 // with claimed issuer. 83 - let public_keys = knot.public_keys(&unverified_claims.iss).await; 83 + // let public_keys = knot.public_keys(&unverified_claims.iss).await; 84 + let public_keys = knot 85 + .store() 86 + .public_keys_for_did(&unverified_claims.iss) 87 + .await 88 + .unwrap_or_default() 89 + .into_iter() 90 + .filter_map(|public_key| PublicKey::from_openssh(&public_key.key).ok()); 84 91 85 92 // Try to decode and verify the JWT using any one of the public keys 86 93 // we have for the DID. 87 - for verification_key in verification_keys.iter().chain(public_keys.iter()) { 88 - if let Ok(token) = decode::<Claims>(credential, verification_key) { 94 + for verification_key in verification_keys.into_iter().chain(public_keys) { 95 + if let Ok(token) = decode::<Claims>(credential, &verification_key) { 89 96 let claims = token.claims; 90 97 91 98 // Re-verify the claims for the sake of paranoia.
+42 -13
crates/knot/src/public/xrpc/sh/tangled/repo.rs
··· 1 - use axum::{Json, extract::State}; 2 - use lexicon::sh::tangled::repo::{create, delete, get_default_branch, languages, tree}; 1 + use atproto::tid::Tid; 2 + use axum::{Json, extract::State, http::StatusCode}; 3 + use lexicon::{ 4 + com::atproto::repo::list_records::Record, 5 + sh::tangled::repo::{Repo, create, delete, get_default_branch, languages, tree}, 6 + }; 7 + use time::OffsetDateTime; 3 8 use tokio_rayon::AsyncThreadPool as _; 4 9 5 10 use crate::{ 6 11 model::{Knot, errors, repository::GixRepository}, 7 - public::xrpc::{XrpcQuery, XrpcResult}, 12 + public::xrpc::{XrpcError, XrpcQuery, XrpcResult}, 8 13 services::authorization::{Authorization, Verification}, 9 14 types::sh::tangled::repo::{blob, branches, diff, log, tags}, 10 15 }; ··· 75 70 .await 76 71 .map_err(errors::RepoError)?; 77 72 78 - let record = serde_json::from_slice(&response).map_err(errors::RepoError)?; 73 + let record = serde_json::from_slice::<Record>(&response) 74 + .inspect_err(|error| tracing::error!(?error)) 75 + .map_err(errors::RepoError)?; 79 76 77 + let repo: Repo = serde_json::from_str(record.value.get()).map_err(|error| { 78 + XrpcError::new( 79 + StatusCode::INTERNAL_SERVER_ERROR, 80 + "LexiconError", 81 + error.to_string(), 82 + ) 83 + })?; 84 + 85 + // Use the minimum rev value so *any* firehose-derived entry will have priority. 86 + let rev = Tid::MIN.to_string(); 80 87 let is_new = knot 81 88 .store() 82 - .insert_repository_from_record(&record) 89 + .insert_repository(&claims.iss, &params.rkey, &rev, "", &repo) 83 90 .await 84 91 .map_err(errors::RepoError)?; 85 92 86 - if is_new && record.value.knot == knot.instance_name() { 87 - knot.create_repo(&claims.iss, &params.rkey, &record.value.name) 93 + if is_new && repo.knot == knot.instance_name() { 94 + knot.create_repo(&claims.iss, &params.rkey, &repo.name) 88 95 .map_err(errors::RepoError)?; 89 96 } 90 97 ··· 139 122 )))?; 140 123 } 141 124 142 - knot.store() 143 - .delete_repository(&params.did, &params.rkey) 144 - .await 145 - .map_err(errors::RepoError)?; 125 + // Sythesize a jetstream delete commit. 126 + let ts = OffsetDateTime::now_utc(); 127 + let delete = jetstream::Delete { 128 + ts, 129 + did: &params.did, 130 + collection: "sh.tangled.repo", 131 + rkey: &params.rkey, 132 + rev: &Tid::from_datetime(ts, 0).to_string(), 133 + }; 146 134 147 - knot.delete_repo(&params.did, &params.rkey) 148 - .map_err(errors::RepoError)?; 135 + if let Some(record) = knot 136 + .store() 137 + .delete_repository(&delete) 138 + .await 139 + .map_err(errors::RepoError)? 140 + { 141 + knot.delete_repo(&record.did, &record.rkey) 142 + .map_err(errors::RepoError)?; 143 + } 149 144 150 145 Ok(().into()) 151 146 }
+1
crates/knot/src/services.rs
··· 1 + pub mod atrepo; 1 2 pub mod authorization; 2 3 pub mod database; 3 4 pub mod jetstream;
+77
crates/knot/src/services/atrepo.rs
··· 1 + use atproto::Did; 2 + use gix::bstr::ByteSlice as _; 3 + use lexicon::com::atproto::repo::list_records; 4 + 5 + #[derive(Debug, thiserror::Error)] 6 + pub enum Error<E> { 7 + #[error(transparent)] 8 + Reqwest(#[from] reqwest::Error), 9 + #[error(transparent)] 10 + Resolve(#[from] identity::ResolveError), 11 + #[error("DID document fails to declare PDS service endpoint")] 12 + MissingPDS, 13 + #[error(transparent)] 14 + InvalidAtUri(#[from] atproto::aturi::Error), 15 + #[error(transparent)] 16 + Serde(#[from] serde_json::Error), 17 + #[error("Error in callback")] 18 + Callback(E), 19 + } 20 + 21 + pub async fn fetch_collection<'de, F, E>( 22 + resolver: &identity::Resolver, 23 + http: &reqwest::Client, 24 + did: &Did, 25 + collection: &str, 26 + mut callback: F, 27 + ) -> Result<(), Error<E>> 28 + where 29 + F: AsyncFnMut(&[list_records::Record<'_>]) -> Result<(), E> + Send + 'static, 30 + { 31 + use url::Url; 32 + 33 + fn list_records_url(mut pds: Url, collection: &str, repo: &Did, cursor: Option<&str>) -> Url { 34 + pds.set_path("/xrpc/com.atproto.repo.listRecords"); 35 + 36 + let mut query = pds.query_pairs_mut(); 37 + query.append_pair("repo", repo.as_str()); 38 + query.append_pair("collection", collection); 39 + if let Some(cursor) = cursor { 40 + query.append_pair("cursor", cursor); 41 + } 42 + drop(query); 43 + 44 + pds 45 + } 46 + 47 + let (_, doc) = resolver.resolve(did.as_str()).await?; 48 + let pds = &doc.atproto_pds().ok_or(Error::MissingPDS)?.service_endpoint; 49 + 50 + let mut complete = false; 51 + let mut cursor: Option<String> = None; 52 + while !complete { 53 + let response = http 54 + .get(list_records_url( 55 + pds.clone(), 56 + collection, 57 + did, 58 + cursor.as_deref(), 59 + )) 60 + .send() 61 + .await? 62 + .error_for_status()? 63 + .bytes() 64 + .await?; 65 + 66 + let parsed: list_records::Output = serde_json::from_slice(response.as_bytes())?; 67 + cursor.replace(parsed.cursor.to_owned()); 68 + complete = parsed 69 + .records 70 + .last() 71 + .is_none_or(|last| last.uri.rkey == cursor.as_deref()); 72 + 73 + callback(&parsed.records).await.map_err(Error::Callback)?; 74 + } 75 + 76 + Ok(()) 77 + }
-2
crates/knot/src/services/authorization.rs
··· 74 74 } 75 75 } 76 76 77 - #[tracing::instrument(skip(store), ret)] 78 77 fn verify_unique( 79 78 store: &dyn AuthorizationClaimsStore<Claims>, 80 79 now: i64, ··· 130 131 { 131 132 type Rejection = XrpcError; 132 133 133 - #[tracing::instrument(skip(parts, state), ret)] 134 134 async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> { 135 135 let knot = Knot::from_ref(state); 136 136 let resolver = Resolver::from_ref(state);
+237 -231
crates/knot/src/services/database.rs
··· 1 - mod pg_impl; 1 + // mod pg_impl; 2 2 pub mod types; 3 3 4 - use atproto::{Did, tid::Tid}; 5 - use core::fmt; 6 - use futures_util::{FutureExt, StreamExt, TryStreamExt as _, future::BoxFuture, stream::BoxStream}; 7 - use jetstream::{Commit, CommitEvent}; 8 - use lexicon::{ 9 - com::atproto::repo::list_records::Record, 10 - sh::tangled::{PublicKey, repo::Repo}, 11 - }; 4 + use atproto::Did; 5 + use futures_util::{StreamExt, stream::BoxStream}; 6 + use jetstream::{Delete, Value}; 7 + use lexicon::sh::tangled::{PublicKey, repo::Repo}; 12 8 use serde::Serialize; 13 - use std::sync::Arc; 9 + use sqlx::SqlitePool; 14 10 use time::OffsetDateTime; 15 - use types::{EventRow, InsertRepositoryResult}; 16 - 17 - pub use pg_impl::PgDatabase; 18 - 19 - pub trait Database: fmt::Debug + Send + 'static { 20 - type Error: std::error::Error + fmt::Debug + Send + Sync; 21 - 22 - /// Get the most recent cursor timestamp stored in the Jetstream log. 23 - /// 24 - /// Default implementation return `None`. 25 - /// 26 - fn jetstream_cursor(&self) -> BoxFuture<Result<Option<OffsetDateTime>, Self::Error>> { 27 - async move { Ok(None) }.boxed() 28 - } 29 - 30 - fn upsert_jetstream_commit<'db: 'a, 'a>( 31 - &'db self, 32 - commit: &'a CommitEvent<'_>, 33 - ) -> BoxFuture<'a, Result<(), Self::Error>>; 34 - 35 - fn public_keys_for_did( 36 - &self, 37 - did: &Did, 38 - ) -> BoxStream<'_, Result<types::PublicKeyRecord, Self::Error>>; 39 - 40 - fn upsert_public_key<'d: 'a, 'a>( 41 - &'d self, 42 - did: &'a Did, 43 - rkey: &'a str, 44 - cid: &'a str, 45 - public_key: &'a lexicon::sh::tangled::PublicKey<'a>, 46 - ) -> BoxFuture<'a, Result<(), Self::Error>>; 47 - 48 - fn delete_public_key<'d: 'a, 'a>( 49 - &'d self, 50 - did: &'a Did, 51 - rkey: &'a str, 52 - ) -> BoxFuture<'a, Result<(), Self::Error>>; 53 - 54 - fn insert_repository<'d: 'a, 'a>( 55 - &'d self, 56 - did: &'a Did, 57 - rkey: &'a str, 58 - cid: &'a str, 59 - repo: &'a Repo<'a>, 60 - xrpc_create_at: Option<&'a OffsetDateTime>, 61 - jetstream_at: Option<&'a OffsetDateTime>, 62 - ) -> BoxFuture<'a, Result<Option<InsertRepositoryResult>, Self::Error>>; 63 - 64 - fn update_repository<'d: 'a, 'a>( 65 - &'d self, 66 - did: &'a Did, 67 - rkey: &'a str, 68 - cid: &'a str, 69 - repo: &'a Repo<'a>, 70 - ) -> BoxFuture<'a, Result<(), Self::Error>>; 71 - 72 - fn delete_repository<'d: 'a, 'a>( 73 - &'d self, 74 - did: &'a Did, 75 - rkey: &'a str, 76 - ) -> BoxFuture<'a, Result<(), Self::Error>>; 77 - 78 - fn resolve_repository<'d: 'a, 'a>( 79 - &'d self, 80 - did: &'a Did, 81 - name_or_rkey: &'a str, 82 - ) -> BoxFuture<'a, Result<Option<(Box<str>, Box<str>)>, Self::Error>>; 83 - 84 - fn repository_members<'d: 'a, 'a>( 85 - &'d self, 86 - did: &'a Did, 87 - rkey: &'a str, 88 - ) -> BoxStream<'a, Result<Box<Did>, Self::Error>>; 89 - 90 - fn knot_members<'d: 'a, 'a>( 91 - &'d self, 92 - instance_name: &'a str, 93 - ) -> BoxStream<'a, Result<Box<Did>, Self::Error>>; 94 - 95 - fn insert_event<'a: 'b, 'b>( 96 - &'a self, 97 - collection: &'b str, 98 - rkey: &'b Tid, 99 - event: &'b serde_json::Value, 100 - ) -> BoxFuture<'b, Result<i32, Self::Error>>; 101 - 102 - fn get_events<'a: 'b, 'b>( 103 - &'a self, 104 - from: OffsetDateTime, 105 - ) -> BoxStream<'b, Result<EventRow, Self::Error>>; 106 - } 11 + use types::{DeletedRecord, EventRow}; 107 12 108 13 #[derive(Debug, thiserror::Error)] 109 14 pub enum DataStoreError { ··· 18 113 Did(#[from] atproto::did::Error), 19 114 #[error("Failed to extract AT-URI: {0}")] 20 115 AtUri(#[from] atproto::aturi::Error), 116 + #[error(transparent)] 117 + DateTime(#[from] time::error::ComponentRange), 21 118 #[error("{0}")] 22 119 Other(#[from] anyhow::Error), 23 120 } 24 121 25 - #[derive(Debug)] 122 + #[derive(Clone, Debug)] 26 123 pub struct DataStore { 27 - inner: Arc<dyn Database<Error = DataStoreError> + Sync>, 124 + db: SqlitePool, 28 125 } 29 126 30 127 impl DataStore { 31 - pub fn new<DB: Database<Error = DataStoreError> + Sync>(db: DB) -> Self 32 - where 33 - DataStoreError: From<<DB as Database>::Error>, 34 - { 35 - Self { 36 - inner: Arc::new(db), 37 - } 128 + pub fn new(db: SqlitePool) -> Self { 129 + Self { db } 38 130 } 39 131 40 - pub async fn jetstream_cursor(&self) -> Result<Option<OffsetDateTime>, DataStoreError> { 41 - let cursor = self.inner.jetstream_cursor().await?; 132 + pub async fn get_jetstream_cursor(&self) -> Result<Option<OffsetDateTime>, DataStoreError> { 133 + let result = sqlx::query!("SELECT cursor FROM jetstream_cursor WHERE id = 1") 134 + .fetch_optional(&self.db) 135 + .await?; 136 + 137 + let cursor = result 138 + .map(|record| { 139 + OffsetDateTime::from_unix_timestamp_nanos(i128::from(record.cursor) * 1000) 140 + }) 141 + .transpose()?; 142 + 42 143 Ok(cursor) 43 144 } 44 145 45 - /// Store a [`jetstream::CommitEvent`] in the Jetstream log. 46 - pub async fn store_jetstream_commit( 146 + pub async fn store_jetstream_cursor( 47 147 &self, 48 - commit: &CommitEvent<'_>, 148 + cursor: OffsetDateTime, 49 149 ) -> Result<(), DataStoreError> { 50 - self.inner.upsert_jetstream_commit(commit).await?; 150 + let cursor = i64::try_from(cursor.unix_timestamp_nanos() / 1000).unwrap(); 151 + sqlx::query!("INSERT INTO jetstream_cursor (id, cursor) VALUES (1, ?) ON CONFLICT (id) DO UPDATE SET cursor = excluded.cursor", cursor).execute(&self.db).await?; 51 152 Ok(()) 153 + } 154 + 155 + pub fn knot_members(&self) -> BoxStream<'_, Result<Box<Did>, DataStoreError>> { 156 + sqlx::query!("SELECT did FROM knot_member") 157 + .fetch(&self.db) 158 + .map(|record| { 159 + let record = record?; 160 + let did = record.did.parse()?; 161 + Ok(did) 162 + }) 163 + .boxed() 52 164 } 53 165 54 166 /// Get the OpenSSH public keys for the specified DID. ··· 73 151 &self, 74 152 did: &Did, 75 153 ) -> Result<Vec<PublicKey<'static>>, DataStoreError> { 76 - let keys = self 77 - .inner 78 - .public_keys_for_did(did) 79 - .map(|key| key.map(|raw_key| raw_key.into())) 80 - .try_collect() 81 - .await?; 154 + let did = did.as_str(); 155 + let keys = sqlx::query_as!( 156 + PublicKey, 157 + "SELECT name, key, created_at FROM public_key WHERE did = ? ORDER BY rkey, rev", 158 + did 159 + ) 160 + .fetch_all(&self.db) 161 + .await?; 82 162 83 163 Ok(keys) 84 164 } 85 165 86 - pub async fn upsert_public_key_from_jetstream( 166 + pub async fn upsert_public_key( 87 167 &self, 88 - commit: &Commit<'_>, 89 - public_key: PublicKey<'_>, 168 + did: &str, 169 + rkey: &str, 170 + rev: &str, 171 + cid: &str, 172 + public_key: &PublicKey<'_>, 90 173 ) -> Result<(), DataStoreError> { 91 - self.inner 92 - .upsert_public_key(commit.did, commit.rkey, commit.cid, &public_key) 93 - .await?; 174 + sqlx::query!( 175 + "INSERT INTO public_key 176 + (did, rkey, rev, cid, name, key, created_at) 177 + VALUES (?, ?, ?, ?, ?, ?, ?) 178 + ON CONFLICT (did, rkey) 179 + DO UPDATE 180 + SET 181 + rev = excluded.rev, 182 + cid = excluded.cid, 183 + name = excluded.name, 184 + key = excluded.key, 185 + created_at = excluded.created_at 186 + WHERE excluded.rev > public_key.rev", 187 + did, 188 + rkey, 189 + rev, 190 + cid, 191 + public_key.name, 192 + public_key.key, 193 + public_key.created_at 194 + ) 195 + .execute(&self.db) 196 + .await?; 94 197 95 198 Ok(()) 96 199 } 97 200 98 - pub async fn upsert_public_key_from_record( 201 + pub async fn delete_public_key( 99 202 &self, 100 - record: &Record<'_, PublicKey<'_>>, 101 - ) -> Result<(), DataStoreError> { 102 - let uri = record.aturi()?; 103 - self.inner 104 - .upsert_public_key( 105 - uri.did() 106 - .ok_or(anyhow::anyhow!("AT-URI with a DID authority required"))?, 107 - uri.rkey.ok_or(anyhow::anyhow!("Missing rkey"))?, 108 - &record.cid, 109 - &record.value, 110 - ) 111 - .await?; 112 - Ok(()) 113 - } 203 + did: &str, 204 + rkey: &str, 205 + rev: &str, 206 + ) -> Result<Option<DeletedRecord>, DataStoreError> { 207 + let record = sqlx::query!( 208 + "DELETE FROM public_key WHERE did = ? AND rkey = ? AND rev <= ? RETURNING did, rkey, rev, cid", 209 + did, 210 + rkey, 211 + rev 212 + ) 213 + .fetch_optional(&self.db) 214 + .await?; 114 215 115 - pub async fn delete_public_key(&self, did: &Did, rkey: &str) -> Result<(), DataStoreError> { 116 - self.inner.delete_public_key(did, rkey).await?; 117 - Ok(()) 216 + Ok(match record { 217 + Some(record) => Some(DeletedRecord { 218 + did: record.did.parse()?, 219 + rkey: record.rkey, 220 + rev: record.rev, 221 + cid: record.cid, 222 + }), 223 + None => None, 224 + }) 118 225 } 119 226 120 227 /// Insert a new repository entry from a jetstream commit, returning `true` if the repository ··· 153 202 /// 154 203 /// This is *not* an UPSERT. 155 204 /// 156 - pub async fn insert_repository_from_jetstream( 205 + pub async fn insert_repository( 157 206 &self, 158 - commit: &Commit<'_>, 207 + did: &str, 208 + rkey: &str, 209 + rev: &str, 210 + cid: &str, 159 211 repository: &Repo<'_>, 160 212 ) -> Result<bool, DataStoreError> { 161 - let result = self 162 - .inner 163 - .insert_repository( 164 - commit.did, 165 - commit.rkey, 166 - commit.cid, 167 - repository, 168 - None, 169 - Some(&commit.ts), 170 - ) 171 - .await?; 213 + let result = sqlx::query!( 214 + "INSERT INTO repository (did, rkey, rev, cid, name, knot, spindle, source, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (did, rkey) DO UPDATE SET rev = excluded.rev, cid = excluded.cid, knot = excluded.knot, name = excluded.name, created_at = excluded.created_at WHERE excluded.rev > repository.rev AND excluded.cid <> '' RETURNING *", 215 + did, 216 + rkey, 217 + rev, 218 + cid, 219 + repository.name, 220 + repository.knot, 221 + repository.spindle, 222 + repository.source, 223 + repository.created_at 224 + ).fetch_optional(&self.db).await?; 172 225 173 - let is_new = match result { 174 - None => false, 175 - Some(result) => { 176 - result.old_xrpc_create_at.is_none() 177 - && result.new_xrpc_create_at.is_none() 178 - && result.old_jetstream_at.is_none() 179 - && result.new_jetstream_at.is_some() 180 - } 181 - }; 226 + tracing::debug!(?result); 182 227 183 - Ok(is_new) 184 - } 185 - 186 - pub async fn insert_repository_from_record( 187 - &self, 188 - record: &Record<'_, Repo<'_>>, 189 - ) -> Result<bool, DataStoreError> { 190 - let uri = record.aturi()?; 191 - let result = self 192 - .inner 193 - .insert_repository( 194 - uri.did() 195 - .ok_or(anyhow::anyhow!("AT-URI with DID authority required"))?, 196 - uri.rkey.ok_or(anyhow::anyhow!("Missing rkey"))?, 197 - &record.cid, 198 - &record.value, 199 - Some(&OffsetDateTime::now_utc()), 200 - None, 201 - ) 202 - .await?; 203 - 204 - let is_new = match result { 205 - None => false, 206 - Some(result) => { 207 - result.old_xrpc_create_at.is_none() 208 - && result.new_xrpc_create_at.is_some() 209 - && result.old_jetstream_at.is_none() 210 - && result.new_jetstream_at.is_none() 211 - } 212 - }; 213 - 214 - Ok(is_new) 228 + Ok(result.is_some_and(|record| record.rev == rev && record.cid == cid)) 215 229 } 216 230 217 231 pub async fn update_repository( 218 232 &self, 219 - did: &Did, 233 + did: &str, 220 234 rkey: &str, 235 + rev: &str, 221 236 cid: &str, 222 237 repository: &Repo<'_>, 223 238 ) -> Result<(), DataStoreError> { 224 - self.inner 225 - .update_repository(did, rkey, cid, repository) 226 - .await?; 239 + sqlx::query!( 240 + "UPDATE repository \ 241 + SET rev = ?,\ 242 + cid = ?,\ 243 + name = ?,\ 244 + knot = ?,\ 245 + spindle = ?,\ 246 + source = ?,\ 247 + created_at = ?\ 248 + WHERE 249 + did = ? 250 + AND rkey = ? 251 + AND rev <= ?", 252 + rev, 253 + cid, 254 + repository.name, 255 + repository.knot, 256 + repository.spindle, 257 + repository.source, 258 + repository.created_at, 259 + did, 260 + rkey, 261 + rev 262 + ) 263 + .execute(&self.db) 264 + .await?; 265 + 227 266 Ok(()) 228 267 } 229 268 230 - pub async fn delete_repository(&self, did: &Did, rkey: &str) -> Result<(), DataStoreError> { 231 - self.inner.delete_repository(did, rkey).await?; 232 - Ok(()) 269 + pub async fn delete_repository( 270 + &self, 271 + commit: &Delete<'_>, 272 + ) -> Result<Option<DeletedRecord>, DataStoreError> { 273 + assert_eq!(commit.collection, "sh.tangled.repo"); 274 + let did = commit.did.as_str(); 275 + let rkey = commit.rkey; 276 + let rev = commit.rev; 277 + 278 + let record = sqlx::query!( 279 + "DELETE FROM repository WHERE did = ? AND rkey = ? AND rev <= ? RETURNING did, rkey, rev, cid", 280 + did, 281 + rkey, 282 + rev 283 + ) 284 + .fetch_optional(&self.db) 285 + .await?; 286 + 287 + Ok(match record { 288 + Some(record) => Some(DeletedRecord { 289 + did: record.did.parse()?, 290 + rkey: record.rkey, 291 + rev: record.rev, 292 + cid: record.cid, 293 + }), 294 + None => None, 295 + }) 233 296 } 234 297 235 298 pub async fn resolve_repository( 236 299 &self, 237 300 did: &Did, 238 301 name_or_rkey: &str, 239 - ) -> Result<Option<(Box<str>, Box<str>)>, DataStoreError> { 240 - let result = self.inner.resolve_repository(did, name_or_rkey).await?; 241 - Ok(result) 302 + ) -> Result<Option<(String, String)>, DataStoreError> { 303 + let did = did.as_str(); 304 + let resolved = sqlx::query!( 305 + "SELECT rkey, name FROM repository WHERE did = ? AND (name = ? OR rkey = ?)", 306 + did, 307 + name_or_rkey, 308 + name_or_rkey 309 + ) 310 + .fetch_optional(&self.db) 311 + .await?; 312 + 313 + tracing::debug!(?resolved); 314 + 315 + Ok(resolved.map(|record| (record.rkey, record.name))) 242 316 } 243 317 244 318 pub async fn is_repository_member( ··· 271 295 repo_did: &Did, 272 296 repo_rkey: &str, 273 297 member_did: &Did, 274 - ) -> bool { 275 - self.inner 276 - .repository_members(repo_did, repo_rkey) 277 - .any(|did| async move { did.is_ok_and(|value| value == member_did) }) 278 - .await 298 + ) -> Result<bool, DataStoreError> { 299 + let repo_did = repo_did.as_str(); 300 + let did = member_did.as_str(); 301 + 302 + let member = sqlx::query!( 303 + "SELECT repo_did, repo_rkey, did FROM repository_member WHERE repo_did = ? AND repo_rkey = ? AND did = ?", 304 + repo_did, 305 + repo_rkey, 306 + did 307 + ).fetch_optional(&self.db).await?; 308 + 309 + Ok(member.is_some_and(|record| { 310 + record.repo_did == repo_did && record.repo_rkey == repo_rkey && record.did == did 311 + })) 279 312 } 280 313 281 - pub async fn is_knot_member(&self, instance_name: &str, did: &Did) -> bool { 282 - self.inner 283 - .knot_members(instance_name) 284 - .any(|member_did| async move { member_did.is_ok_and(|value| value == did) }) 285 - .await 314 + pub async fn is_knot_member(&self, did: &Did) -> Result<bool, DataStoreError> { 315 + let did = did.as_str(); 316 + let member = sqlx::query!("SELECT did FROM knot_member WHERE did = ? LIMIT 1", did,) 317 + .fetch_optional(&self.db) 318 + .await?; 319 + 320 + Ok(member.map(|record| record.did == did).unwrap_or_default()) 286 321 } 287 322 288 323 pub async fn insert_event<T>( 289 324 &self, 325 + ts: OffsetDateTime, 326 + repo_did: &str, 327 + repo_rkey: &str, 290 328 collection: &str, 291 - rkey: &Tid, 292 329 event: &T, 293 - ) -> Result<i32, DataStoreError> 330 + ) -> Result<i64, DataStoreError> 294 331 where 295 332 T: Serialize, 296 333 { 297 - let serialized = serde_json::to_value(event).unwrap(); 298 - let id = self 299 - .inner 300 - .insert_event(collection, rkey, &serialized) 301 - .await?; 334 + let record = serde_json::to_value(event).unwrap(); 335 + let result = sqlx::query!( 336 + "INSERT INTO event (ts, repo_did, repo_rkey, collection, record) VALUES (?, ?, ?, ?, ?) RETURNING id", 337 + ts, 338 + repo_did, 339 + repo_rkey, 340 + collection, 341 + record 342 + ).fetch_one(&self.db).await?; 302 343 303 - Ok(id) 344 + Ok(result.id) 304 345 } 305 346 306 - pub fn get_events(&self, from: OffsetDateTime) -> BoxStream<Result<EventRow, DataStoreError>> { 307 - self.inner.get_events(from) 347 + pub fn get_events<'a: 'b, 'b>( 348 + &'a self, 349 + from: &'b OffsetDateTime, 350 + ) -> BoxStream<'b, Result<EventRow, DataStoreError>> { 351 + sqlx::query_as!( 352 + EventRow, 353 + r#"SELECT id, ts, collection, record as "record: Value" FROM event WHERE ts >= ? ORDER BY id"#, 354 + *from 355 + ) 356 + .fetch(&self.db) 357 + .map(|record| { 358 + let record = record?; 359 + Ok(record) 360 + }) 361 + .boxed() 308 362 } 309 363 }
-403
crates/knot/src/services/database/pg_impl.rs
··· 1 - use super::{ 2 - DataStoreError, Database, 3 - types::{EventRow, InsertRepositoryResult, PublicKeyRecord}, 4 - }; 5 - use atproto::{Did, tid::Tid}; 6 - use futures_util::{FutureExt as _, StreamExt, TryStreamExt, future::BoxFuture, stream::BoxStream}; 7 - use jetstream::CommitEvent; 8 - use lexicon::sh::tangled::repo::Repo; 9 - use time::OffsetDateTime; 10 - 11 - #[derive(Debug, Clone)] 12 - pub struct PgDatabase { 13 - pool: sqlx::PgPool, 14 - } 15 - 16 - impl PgDatabase { 17 - pub fn new(pool: sqlx::PgPool) -> Self { 18 - Self { pool } 19 - } 20 - } 21 - 22 - impl Database for PgDatabase { 23 - type Error = super::DataStoreError; 24 - 25 - fn jetstream_cursor(&self) -> BoxFuture<Result<Option<OffsetDateTime>, Self::Error>> { 26 - async move { 27 - let cursor = sqlx::query_as("SELECT max(ts) from jetstream_log") 28 - .fetch_optional(&self.pool) 29 - .await? 30 - .and_then(|(odt,): (Option<OffsetDateTime>,)| odt); 31 - 32 - Ok(cursor) 33 - } 34 - .boxed() 35 - } 36 - 37 - fn upsert_jetstream_commit<'db: 'c, 'c>( 38 - &'db self, 39 - commit: &'c CommitEvent<'_>, 40 - ) -> BoxFuture<'c, Result<(), Self::Error>> { 41 - async move { 42 - use jetstream::CommitEvent; 43 - 44 - #[derive(sqlx::Type)] 45 - #[sqlx(type_name = "jetstream_action", rename_all = "lowercase")] 46 - pub enum Action { 47 - Create, 48 - Update, 49 - Delete, 50 - } 51 - 52 - let query = sqlx::query(include_str!("../../../sql/insert_jetstream_log.sql")); 53 - let query = match &commit { 54 - CommitEvent::Create(commit) => query 55 - .bind(commit.ts) 56 - .bind(commit.did.as_str()) 57 - .bind(commit.collection) 58 - .bind(commit.rkey) 59 - .bind(commit.rev) 60 - .bind(commit.cid) 61 - .bind(Action::Create) 62 - .bind(sqlx::types::Json(commit.record)), 63 - CommitEvent::Update(commit) => query 64 - .bind(commit.ts) 65 - .bind(commit.did.as_str()) 66 - .bind(commit.collection) 67 - .bind(commit.rkey) 68 - .bind(commit.rev) 69 - .bind(commit.cid) 70 - .bind(Action::Update) 71 - .bind(sqlx::types::Json(commit.record)), 72 - CommitEvent::Delete(delete) => query 73 - .bind(delete.ts) 74 - .bind(delete.did.as_str()) 75 - .bind(delete.collection) 76 - .bind(delete.rkey) 77 - .bind(delete.rev) 78 - .bind(None::<&str>) 79 - .bind(Action::Delete) 80 - .bind(sqlx::types::Json(())), 81 - }; 82 - query.execute(&self.pool).await?; 83 - 84 - Ok(()) 85 - } 86 - .boxed() 87 - } 88 - 89 - fn public_keys_for_did( 90 - &self, 91 - did: &Did, 92 - ) -> BoxStream<'_, Result<super::types::PublicKeyRecord, Self::Error>> { 93 - sqlx::query_as!( 94 - PublicKeyRecord, 95 - "SELECT did, rkey, cid, name, key, created_at FROM public_key WHERE did = $1", 96 - did.as_str() 97 - ) 98 - .fetch(&self.pool) 99 - .map_err(DataStoreError::Sqlx) 100 - .boxed() 101 - } 102 - 103 - fn upsert_public_key<'d: 'a, 'a>( 104 - &'d self, 105 - did: &'a Did, 106 - rkey: &'a str, 107 - cid: &'a str, 108 - public_key: &'a lexicon::sh::tangled::PublicKey<'a>, 109 - ) -> BoxFuture<'a, Result<(), Self::Error>> { 110 - async move { 111 - sqlx::query_file!( 112 - "sql/insert_public_key.sql", 113 - did.as_str(), 114 - rkey, 115 - cid, 116 - &public_key.name, 117 - &public_key.key, 118 - &public_key.created_at 119 - ) 120 - .execute(&self.pool) 121 - .await?; 122 - Ok(()) 123 - } 124 - .boxed() 125 - } 126 - 127 - fn delete_public_key<'d: 'a, 'a>( 128 - &'d self, 129 - did: &'a Did, 130 - rkey: &'a str, 131 - ) -> BoxFuture<'a, Result<(), Self::Error>> { 132 - async move { 133 - sqlx::query!( 134 - r#"DELETE FROM public_key WHERE did = $1 AND rkey = $2"#, 135 - did.as_str(), 136 - rkey 137 - ) 138 - .execute(&self.pool) 139 - .await?; 140 - 141 - Ok(()) 142 - } 143 - .boxed() 144 - } 145 - 146 - fn insert_repository<'d: 'a, 'a>( 147 - &'d self, 148 - did: &'a Did, 149 - rkey: &'a str, 150 - cid: &'a str, 151 - repo: &'a Repo<'a>, 152 - xrpc_create_at: Option<&'a OffsetDateTime>, 153 - jetstream_at: Option<&'a OffsetDateTime>, 154 - ) -> BoxFuture<'a, Result<Option<InsertRepositoryResult>, Self::Error>> { 155 - let topics: Vec<String> = repo 156 - .topics 157 - .iter() 158 - .map(|topic| topic.clone().into()) 159 - .collect(); 160 - 161 - let labels: Vec<String> = repo 162 - .labels 163 - .iter() 164 - .map(|label| label.clone().into()) 165 - .collect(); 166 - 167 - match (xrpc_create_at, jetstream_at) { 168 - (Some(xrpc_ts), None) => async move { 169 - let result = sqlx::query_file_as!( 170 - InsertRepositoryResult, 171 - "sql/insert_repository_xrpc.sql", 172 - did.as_str(), 173 - rkey, 174 - cid, 175 - &repo.name, 176 - &repo.knot, 177 - repo.spindle.as_deref(), 178 - repo.description.as_deref(), 179 - repo.website.as_deref(), 180 - topics.as_slice(), 181 - repo.source.as_deref(), 182 - labels.as_slice(), 183 - &repo.created_at, 184 - Some(xrpc_ts), 185 - None::<OffsetDateTime>, 186 - ) 187 - .fetch_optional(&self.pool) 188 - .await?; 189 - 190 - Ok(result) 191 - } 192 - .boxed(), 193 - (None, Some(jetstream_ts)) => async move { 194 - let result = sqlx::query_file_as!( 195 - InsertRepositoryResult, 196 - "sql/insert_repository_jetstream.sql", 197 - did.as_str(), 198 - rkey, 199 - cid, 200 - &repo.name, 201 - &repo.knot, 202 - repo.spindle.as_deref(), 203 - repo.description.as_deref(), 204 - repo.website.as_deref(), 205 - topics.as_slice(), 206 - repo.source.as_deref(), 207 - labels.as_slice(), 208 - &repo.created_at, 209 - None::<OffsetDateTime>, 210 - Some(jetstream_ts), 211 - ) 212 - .fetch_optional(&self.pool) 213 - .await?; 214 - 215 - Ok(result) 216 - } 217 - .boxed(), 218 - (Some(_), Some(_)) | (None, None) => panic!(), 219 - } 220 - } 221 - 222 - fn update_repository<'d: 'a, 'a>( 223 - &'d self, 224 - did: &'a Did, 225 - rkey: &'a str, 226 - cid: &'a str, 227 - repo: &'a Repo<'a>, 228 - ) -> BoxFuture<'a, Result<(), Self::Error>> { 229 - let topics: Vec<String> = repo 230 - .topics 231 - .iter() 232 - .map(|topic| topic.clone().into()) 233 - .collect(); 234 - 235 - let labels: Vec<String> = repo 236 - .labels 237 - .iter() 238 - .map(|label| label.clone().into()) 239 - .collect(); 240 - 241 - async move { 242 - sqlx::query_file!( 243 - "sql/update_repository.sql", 244 - did.as_str(), 245 - rkey, 246 - cid, 247 - &repo.name, 248 - &repo.knot, 249 - repo.spindle.as_deref(), 250 - repo.description.as_deref(), 251 - repo.website.as_deref(), 252 - topics.as_slice(), 253 - repo.source.as_deref(), 254 - labels.as_slice(), 255 - &repo.created_at, 256 - ) 257 - .execute(&self.pool) 258 - .await?; 259 - 260 - Ok(()) 261 - } 262 - .boxed() 263 - } 264 - 265 - fn delete_repository<'d: 'a, 'a>( 266 - &'d self, 267 - did: &'a Did, 268 - rkey: &'a str, 269 - ) -> BoxFuture<'a, Result<(), Self::Error>> { 270 - async move { 271 - sqlx::query!( 272 - "DELETE FROM repository WHERE did = $1 AND rkey = $2", 273 - did.as_str(), 274 - rkey 275 - ) 276 - .execute(&self.pool) 277 - .await?; 278 - Ok(()) 279 - } 280 - .boxed() 281 - } 282 - 283 - fn resolve_repository<'d: 'a, 'a>( 284 - &'d self, 285 - did: &'a Did, 286 - name_or_rkey: &'a str, 287 - ) -> BoxFuture<'a, Result<Option<(Box<str>, Box<str>)>, Self::Error>> { 288 - async move { 289 - #[derive(sqlx::FromRow)] 290 - struct Record { 291 - rkey: Box<str>, 292 - name: Box<str>, 293 - } 294 - 295 - let result: Option<Record> = sqlx::query_as!( 296 - Record, 297 - "SELECT rkey, name FROM repository WHERE did = $1 AND (rkey = $2 OR name = $2)", 298 - did.as_str(), 299 - name_or_rkey 300 - ) 301 - .fetch_optional(&self.pool) 302 - .await?; 303 - 304 - Ok(result.map(|record| (record.rkey, record.name))) 305 - } 306 - .boxed() 307 - } 308 - 309 - fn repository_members<'d: 'a, 'a>( 310 - &'d self, 311 - did: &'a Did, 312 - rkey: &'a str, 313 - ) -> BoxStream<'a, Result<Box<Did>, Self::Error>> { 314 - #[derive(sqlx::FromRow)] 315 - struct Record { 316 - member_did: Box<str>, 317 - } 318 - 319 - sqlx::query_as!( 320 - Record, 321 - "SELECT member_did FROM repository_member WHERE repo_did = $1 AND repo_rkey = $2", 322 - did.as_str(), 323 - rkey 324 - ) 325 - .fetch(&self.pool) 326 - .map(|record| { 327 - let did = record?.member_did.parse()?; 328 - Ok(did) 329 - }) 330 - .boxed() 331 - } 332 - 333 - fn knot_members<'d: 'a, 'a>( 334 - &'d self, 335 - instance_name: &'a str, 336 - ) -> BoxStream<'a, Result<Box<Did>, Self::Error>> { 337 - #[derive(sqlx::FromRow)] 338 - struct Record { 339 - member_did: Box<str>, 340 - } 341 - 342 - sqlx::query_as!( 343 - Record, 344 - "SELECT member_did FROM knot_member WHERE instance_name = $1", 345 - instance_name 346 - ) 347 - .fetch(&self.pool) 348 - .map(|record| { 349 - let did = record?.member_did.parse()?; 350 - Ok(did) 351 - }) 352 - .boxed() 353 - } 354 - 355 - fn insert_event<'a: 'b, 'b>( 356 - &'a self, 357 - collection: &'b str, 358 - rkey: &'b atproto::tid::Tid, 359 - event: &'b serde_json::Value, 360 - ) -> BoxFuture<'b, Result<i32, Self::Error>> { 361 - #[derive(sqlx::FromRow)] 362 - struct Row { 363 - id: i32, 364 - } 365 - 366 - async move { 367 - let Row { id } = sqlx::query_as!( 368 - Row, 369 - "INSERT INTO events (collection, rkey, event) VALUES ($1, $2, $3) RETURNING id", 370 - collection, 371 - rkey.to_string(), 372 - event 373 - ) 374 - .fetch_one(&self.pool) 375 - .await?; 376 - 377 - Ok(id) 378 - } 379 - .boxed() 380 - } 381 - 382 - fn get_events<'a: 'b, 'b>( 383 - &'a self, 384 - from: OffsetDateTime, 385 - ) -> BoxStream<'b, Result<EventRow, Self::Error>> { 386 - let rkey = Tid::from_datetime(from, 0); 387 - sqlx::query!( 388 - "SELECT id, collection, rkey, event FROM events WHERE rkey >= $1 ORDER BY id", 389 - rkey.to_string() 390 - ) 391 - .fetch(&self.pool) 392 - .map(|record| { 393 - let record = record?; 394 - Ok(EventRow { 395 - id: record.id, 396 - collection: record.collection, 397 - rkey: record.rkey.parse().unwrap(), 398 - event: record.event, 399 - }) 400 - }) 401 - .boxed() 402 - } 403 - }
+11 -48
crates/knot/src/services/database/types.rs
··· 25 25 } 26 26 } 27 27 28 - impl<'a> TryFrom<&'a lexicon::com::atproto::repo::list_records::Record<'a, PublicKey<'a>>> 29 - for PublicKeyRecordRef<'a> 30 - { 31 - type Error = anyhow::Error; 32 - fn try_from( 33 - value: &'a lexicon::com::atproto::repo::list_records::Record<'a, PublicKey<'a>>, 34 - ) -> Result<Self, Self::Error> { 35 - let uri = value.aturi()?; 36 - Ok(Self { 37 - did: uri 38 - .did() 39 - .ok_or(anyhow::anyhow!("AT-URI with a DID authority required"))?, 40 - rkey: uri.rkey.ok_or(anyhow::anyhow!("Missing rkey"))?, 41 - cid: &value.cid, 42 - name: &value.value.name, 43 - key: &value.value.key, 44 - created_at: &value.value.created_at, 45 - }) 46 - } 47 - } 48 - 49 28 /// An owned, flattened public key record. 50 29 pub struct PublicKeyRecord { 51 30 pub did: String, ··· 45 66 key: key.key.into(), 46 67 created_at: key.created_at, 47 68 } 48 - } 49 - } 50 - 51 - impl<'a> TryFrom<&'a lexicon::com::atproto::repo::list_records::Record<'a, PublicKey<'a>>> 52 - for PublicKeyRecord 53 - { 54 - type Error = anyhow::Error; 55 - 56 - fn try_from( 57 - value: &'a lexicon::com::atproto::repo::list_records::Record<'a, PublicKey<'a>>, 58 - ) -> Result<Self, Self::Error> { 59 - let uri = value.aturi()?; 60 - Ok(Self { 61 - did: uri 62 - .did() 63 - .ok_or(anyhow::anyhow!("AT-URI with a DID authority required"))? 64 - .as_str() 65 - .into(), 66 - rkey: uri.rkey.ok_or(anyhow::anyhow!("Missing rkey"))?.into(), 67 - cid: value.cid.clone().into(), 68 - name: value.value.name.clone().into(), 69 - key: value.value.key.clone().into(), 70 - created_at: value.value.created_at, 71 - }) 72 69 } 73 70 } 74 71 ··· 69 114 70 115 #[derive(Debug)] 71 116 pub struct EventRow { 72 - pub id: i32, 117 + pub id: i64, 118 + pub ts: OffsetDateTime, 73 119 pub collection: String, 74 - pub rkey: atproto::tid::Tid, 75 - pub event: serde_json::Value, 120 + pub record: serde_json::Value, 121 + } 122 + 123 + #[derive(Debug)] 124 + pub struct DeletedRecord { 125 + pub did: Box<Did>, 126 + pub rkey: String, 127 + pub rev: String, 128 + pub cid: String, 76 129 }
+80 -51
crates/knot/src/services/jetstream.rs
··· 1 - use super::database::DataStore; 2 1 use crate::{ 3 2 model::KnotState, 4 3 services::rbac::{Policy, RepositoryCreatePolicy, RepositoryDeletePolicy, RepositoryRef}, 5 4 }; 6 5 use jetstream::{CommitEvent, Event}; 7 6 use lexicon::Lexicon; 8 - use std::sync::Arc; 7 + use std::{sync::Arc, time::Duration}; 8 + use tokio::time::Instant; 9 9 10 10 pub async fn jetstream_task(state: Arc<KnotState>, jetstream_rx: jetstream::Receiver) { 11 + let mut last_jetstream_sync: Option<Instant> = None; 12 + 11 13 while let Some(event) = jetstream_rx.recv_async().await { 12 14 let event = match event.deserialize() { 13 15 Ok(event) => event, ··· 20 18 } 21 19 }; 22 20 23 - tracing::trace!(?event); 24 - 25 21 match &event { 26 - Event::Commit(commit) => { 27 - if let Err(error) = state.store().store_jetstream_commit(commit).await { 28 - tracing::error!(?error, "failed to log jetstream event"); 29 - }; 30 - 31 - // If we failed to log the event we probably won't be able to 32 - // process it, but we'll try anyway. 33 - 34 - match commit.collection() { 35 - "sh.tangled.publicKey" => { 36 - if let Err(error) = process_public_key(state.store(), commit).await { 37 - tracing::error!( 38 - ?error, 39 - "failed to process 'sh.tangled.publicKey' record" 40 - ) 41 - } 22 + Event::Commit(commit) => match commit.collection() { 23 + "sh.tangled.publicKey" => { 24 + if let Err(error) = process_public_key(&state, commit).await { 25 + tracing::error!(?error, "failed to process 'sh.tangled.publicKey' record") 42 26 } 43 - "sh.tangled.repo" => { 44 - if let Err(error) = process_repo(&state, commit).await { 45 - tracing::error!(?error, "failed to process 'sh.tangled.repo' record") 46 - } 47 - } 48 - _ => {} 49 27 } 50 - } 28 + "sh.tangled.repo" => { 29 + if let Err(error) = process_repo(&state, commit).await { 30 + tracing::error!(?error, "failed to process 'sh.tangled.repo' record") 31 + } 32 + } 33 + _ => {} 34 + }, 51 35 Event::Account(account) => { 52 36 state.resolver().invalidate_did(account.did).await; 53 37 } ··· 41 53 state.resolver().invalidate_did(identity.did).await; 42 54 } 43 55 } 56 + 57 + if last_jetstream_sync.is_none_or(|value| value.elapsed() > Duration::from_secs(1)) { 58 + match state.store().store_jetstream_cursor(event.ts()).await { 59 + Ok(()) => last_jetstream_sync = Some(Instant::now()), 60 + Err(error) => tracing::error!(?error, "failed to log jetstream event"), 61 + }; 62 + } 44 63 } 45 64 46 65 tracing::warn!("jetstream consumer task completed"); 47 66 } 48 67 49 68 async fn process_public_key<'db, 'c, 'k>( 50 - db: &'db DataStore, 69 + state: &KnotState, 51 70 event: &'c CommitEvent<'c>, 52 71 ) -> anyhow::Result<()> 53 72 where ··· 63 68 { 64 69 match event { 65 70 CommitEvent::Create(commit) | CommitEvent::Update(commit) => { 66 - let record = commit.record.get(); 67 - let Lexicon::PublicKey(key) = serde_json::from_str(record)? else { 71 + let Lexicon::PublicKey(key) = serde_json::from_str(commit.record.get())? else { 68 72 return Err(anyhow::anyhow!("expected a 'sh.tangled.publicKey' record")); 69 73 }; 70 - db.upsert_public_key_from_jetstream(commit, key).await?; 74 + 75 + if !state.store().is_knot_member(event.did()).await? { 76 + return Ok(()); 77 + } 78 + 79 + state 80 + .store() 81 + .upsert_public_key(commit.did, commit.rkey, commit.rev, commit.cid, &key) 82 + .await?; 71 83 } 72 84 CommitEvent::Delete(delete) => { 73 - db.delete_public_key(delete.did, delete.rkey).await?; 85 + assert_eq!(delete.collection, "sh.tangled.publicKey"); 86 + state 87 + .store() 88 + .delete_public_key(delete.did, delete.rkey, delete.rev) 89 + .await?; 74 90 } 75 91 } 76 92 77 93 Ok(()) 78 94 } 79 95 96 + #[tracing::instrument(skip(state))] 80 97 async fn process_repo(state: &KnotState, event: &CommitEvent<'_>) -> anyhow::Result<()> { 81 98 use crate::services::rbac::{Action, PolicyResult::*}; 82 99 ··· 97 90 let Lexicon::Repo(repository) = serde_json::from_str(commit.record.get())? else { 98 91 return Err(anyhow::anyhow!("expected a 'sh.tangled.repo' record")); 99 92 }; 93 + 94 + if repository.knot != state.instance_name() { 95 + tracing::debug!( 96 + did = %event.did(), 97 + rkey = %event.rkey(), 98 + name = %repository.name, 99 + "repository is not for this knot, ignoring" 100 + ); 101 + return Ok(()); 102 + } 100 103 101 104 let policy = RepositoryCreatePolicy; 102 105 let can_create = policy ··· 120 103 121 104 let is_new = state 122 105 .store() 123 - .insert_repository_from_jetstream(commit, &repository) 106 + .insert_repository(commit.did, commit.rkey, commit.rev, commit.cid, &repository) 124 107 .await?; 125 108 126 109 if !is_new { ··· 133 116 return Ok(()); 134 117 } 135 118 119 + state.create_repo(commit.did, commit.rkey, &repository.name)?; 120 + } 121 + CommitEvent::Update(commit) => { 122 + let Lexicon::Repo(repository) = serde_json::from_str(commit.record.get())? else { 123 + return Err(anyhow::anyhow!("expected a 'sh.tangled.repo' record")); 124 + }; 125 + 136 126 if repository.knot != state.instance_name() { 137 127 tracing::debug!( 138 128 did = %event.did(), ··· 150 126 return Ok(()); 151 127 } 152 128 153 - state.create_repo(commit.did, commit.rkey, &repository.name)?; 154 - } 155 - CommitEvent::Update(commit) => { 156 - let Lexicon::Repo(repository) = serde_json::from_str(commit.record.get())? else { 157 - return Err(anyhow::anyhow!("expected a 'sh.tangled.repo' record")); 158 - }; 159 - 160 129 // @TODO Does this need auth? 161 130 162 131 state 163 132 .store() 164 - .update_repository(commit.did, commit.rkey, commit.cid, &repository) 133 + .update_repository(commit.did, commit.rkey, commit.rev, commit.cid, &repository) 165 134 .await?; 166 135 } 167 - CommitEvent::Delete(delete) => { 136 + CommitEvent::Delete(commit) => { 137 + // First determine whether the repository exists on this knot 138 + if state 139 + .store() 140 + .resolve_repository(commit.did, commit.rkey) 141 + .await? 142 + .is_none() 143 + { 144 + tracing::debug!( 145 + did = %event.did(), 146 + rkey = %event.rkey(), 147 + "repository is not for this knot, ignoring" 148 + ); 149 + return Ok(()); 150 + }; 151 + 168 152 let policy = RepositoryDeletePolicy; 169 - let repository = RepositoryRef::new(delete.did, delete.rkey); 153 + let repository = RepositoryRef::new(commit.did, commit.rkey); 170 154 let can_create = policy 171 - .evaluate_access(&delete.did, &Action::RepositoryDelete, &repository, state) 155 + .evaluate_access(&commit.did, &Action::RepositoryDelete, &repository, state) 172 156 .await; 173 157 174 158 if !matches!(can_create, Granted) { 175 - tracing::warn!(?delete, "RepositoryDelete permission denied"); 159 + tracing::warn!(?commit, "RepositoryDelete permission denied"); 176 160 return Ok(()); 177 161 } 178 162 179 - state 180 - .store() 181 - .delete_repository(delete.did, delete.rkey) 182 - .await?; 183 - 184 - state.delete_repo(delete.did, delete.rkey)?; 163 + if let Some(record) = state.store().delete_repository(commit).await? { 164 + state.delete_repo(&record.did, &record.rkey)?; 165 + } 185 166 } 186 167 } 187 168
+5 -3
crates/knot/src/services/rbac.rs
··· 75 75 || context 76 76 .store() 77 77 .is_repository_member(&resource.owner, &resource.rkey, subject) 78 - .await; 78 + .await 79 + .is_ok_and(|val| val); 79 80 80 81 match (action, is_member) { 81 82 (Action::RepositoryPush, true) => PolicyResult::Granted, ··· 101 100 let is_member = subject == resource.owner_did() 102 101 || context 103 102 .store() 104 - .is_knot_member(context.instance_name(), subject) 105 - .await; 103 + .is_knot_member(subject) 104 + .await 105 + .is_ok_and(|val| val); 106 106 107 107 match (action, is_member) { 108 108 (Action::RepositoryCreate, true) => PolicyResult::Granted,
+1 -1
crates/knot/src/types/repository_key.rs
··· 11 11 pub owner: Box<Did>, 12 12 13 13 /// Repository record key. 14 - pub rkey: Box<str>, 14 + pub rkey: String, 15 15 } 16 16 17 17 impl RepositoryKey {
+1 -3
crates/lexicon/Cargo.toml
··· 13 13 14 14 data-encoding.workspace = true 15 15 serde.workspace = true 16 + serde_json.workspace = true 16 17 thiserror.workspace = true 17 18 time.workspace = true 18 19 19 20 gix-hash = "^0.20.0" 20 - 21 - [dev-dependencies] 22 - serde_json.workspace = true
+8 -20
crates/lexicon/src/com/atproto/repo.rs
··· 7 7 //! 8 8 use atproto::aturi::AtUri; 9 9 use serde::{Deserialize, Serialize}; 10 + use serde_json::value::RawValue; 10 11 use std::borrow::Cow; 11 12 12 13 #[derive(Debug, Deserialize, Serialize)] ··· 35 34 } 36 35 37 36 #[derive(Debug, Deserialize, Serialize)] 38 - pub struct Output<'a, T> { 39 - #[serde(skip_serializing_if = "Option::is_none")] 40 - pub cursor: Option<Cow<'a, str>>, 41 - 37 + pub struct Output<'a> { 42 38 #[serde(borrow)] 43 - pub records: Vec<Record<'a, T>>, 44 - } 39 + pub cursor: &'a str, 45 40 46 - impl<'a, T> Output<'a, T> { 47 - pub fn records(&'a self) -> impl Iterator<Item = &'a Record<'a, T>> { 48 - self.records.iter() 49 - } 41 + pub records: Vec<Record<'a>>, 50 42 } 51 43 52 44 #[derive(Debug, Deserialize, Serialize)] 53 - pub struct Record<'a, T> { 45 + pub struct Record<'a> { 54 46 #[serde(borrow)] 55 - pub uri: Cow<'a, str>, 47 + pub uri: AtUri<'a>, 56 48 57 - pub cid: Cow<'a, str>, 49 + pub cid: String, 58 50 59 - pub value: T, 60 - } 61 - 62 - impl<T> Record<'_, T> { 63 - pub fn aturi(&self) -> Result<AtUri, atproto::aturi::Error> { 64 - AtUri::parse(&self.uri) 65 - } 51 + pub value: &'a RawValue, 66 52 } 67 53 }
+28 -1
crates/lexicon/src/sh/tangled.rs
··· 33 33 /// `sh.tangled.publicKey` record. 34 34 /// 35 35 /// <https://tangled.org/@tangled.org/core/blob/master/lexicons/publicKey.json> 36 - #[derive(Debug, Hash, PartialEq, Eq, Deserialize, Serialize)] 36 + #[derive(Clone, Debug, Hash, PartialEq, Eq, Deserialize, Serialize)] 37 37 #[serde(rename_all = "camelCase")] 38 38 pub struct PublicKey<'a> { 39 39 /// Public key contents ··· 46 46 /// Key upload timestamp 47 47 #[serde(alias = "created", with = "time::serde::rfc3339")] 48 48 pub created_at: OffsetDateTime, 49 + } 50 + 51 + /// `sh.tangled.publicKey` record. 52 + /// 53 + /// <https://tangled.org/@tangled.org/core/blob/master/lexicons/publicKey.json> 54 + #[derive(Clone, Debug, Hash, PartialEq, Eq, Deserialize, Serialize)] 55 + #[serde(rename_all = "camelCase")] 56 + pub struct PublicKeyOwned { 57 + /// Public key contents 58 + pub key: String, 59 + 60 + /// Human-readable name for this key 61 + pub name: String, 62 + 63 + /// Key upload timestamp 64 + #[serde(alias = "created", with = "time::serde::rfc3339")] 65 + pub created_at: OffsetDateTime, 66 + } 67 + 68 + impl<'a> From<&'a PublicKeyOwned> for PublicKey<'a> { 69 + fn from(value: &'a PublicKeyOwned) -> Self { 70 + Self { 71 + key: Cow::Borrowed(&value.key), 72 + name: Cow::Borrowed(&value.name), 73 + created_at: value.created_at, 74 + } 75 + } 49 76 } 50 77 51 78 #[derive(Debug, Hash, PartialEq, Eq, Deserialize, Serialize)]
+1
crates/lexicon/src/sh/tangled/repo.rs
··· 68 68 pub labels: Vec<Cow<'a, str>>, 69 69 70 70 #[serde(with = "time::serde::rfc3339")] 71 + #[serde(alias = "addedAt")] 71 72 pub created_at: OffsetDateTime, 72 73 } 73 74
+1 -1
rust-toolchain.toml
··· 1 1 [toolchain] 2 - channel = "1.86" 2 + channel = "stable" 3 3 profile = "default"