lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

reshuffle everything

phil cb3b3e1d 285adc5c

+386 -292
-1
Cargo.lock
··· 2187 2187 "bytes", 2188 2188 "clap", 2189 2189 "fjall", 2190 - "iroh-car", 2191 2190 "jacquard-api", 2192 2191 "jacquard-axum", 2193 2192 "jacquard-common",
+1 -2
Cargo.toml
··· 7 7 axum = "0.8.8" 8 8 clap = { version = "4.5.60", features = ["derive", "env"] } 9 9 fjall = "3.0.3" 10 - iroh-car = "0.5.1" 11 10 jacquard-api = { version = "0.9.5", default-features = false, features = ["com_atproto"] } 12 11 jacquard-axum = { version = "0.9.6", default-features = false, features = ["tracing"] } 13 12 jacquard-common = { version = "0.9.5", features = ["websocket", "reqwest-client"] } 14 13 jacquard-repo = "0.9.6" 15 14 metrics = "0.24.3" 16 - metrics-exporter-prometheus = { version = "0.18.1", features = ["http-listener"] } 17 15 bytes = "1" 16 + metrics-exporter-prometheus = { version = "0.18.1", features = ["http-listener"] } 18 17 reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } 19 18 serde = { version = "1", features = ["derive"] } 20 19 thiserror = "2.0.18"
+92
authenticated-collection-list.md
··· 1 + # authenticated collection listing (future work) 2 + 3 + right now we're just doing `describeRepo` to get collections, which *is* what 4 + `collectiondir` also, does but it's not what we want to stick with because: 5 + 6 + - the collections list isn't paginated. not clear what happens if a repo has a huge 7 + number of collections -- will `describeRepo` eventually fail? 8 + - the contents of the collecitons list aren't authenticated. it's *possible* for a 9 + PDS to lie and make our index incorrect, but the threat we're considering here is 10 + more about just PDS bugs causing the list to be wrong. 11 + - there is no `commit` or even `rev` in the response, so actually we can't know if 12 + firehose commits after `describeRepo` follow correctly/without gaps. 13 + 14 + there are a few ways we can do better. 15 + 16 + 17 + ## `com.atproto.sync.getRepo` 18 + 19 + obviously we can just do full backfill of repo contents. but then we couldn't call 20 + ourselves *light*rail. 21 + 22 + what we can do is detect small repos and use `getRepo` just for them. repo size can 23 + be estimated from any CAR slice by measuring the root node height. we get a car slice 24 + from firehose commits and from any `sync.getRecord` request. 25 + 26 + 27 + ## collection-boundary `com.atproto.sync.getRecord` probing 28 + 29 + mst keys have the form `<collection>/<rkey>` (lexicographic order). 30 + `com.atproto.sync.getRecord` returns a CAR proof path from the repo root to the 31 + queried key, and that usually includes keys immediately adjacent to the queried key. 32 + in particular, when the record does *not* exist, the proof path must include adjacent 33 + keys (required to prove the key is absent). 34 + 35 + we can exploit this: 36 + 37 + 1. query `getRecord` with the **minimum legal MST key** 38 + (`a-----...0.0-----...0.A/-`). the record usually won't exist, but the right- 39 + adjacent key in the CAR slice reveals the first collection present in the repo. 40 + 2. for that collection, compute the **maximum** legal rkey (`~` × 512) and query 41 + `getRecord` with `<collection>/<max_rkey>`. The right-adjacent key, if present, is 42 + the first key of the *next* collection. 43 + 3. if we don't have a immediate-right-adjacent key, we can *increment* the rkey to 44 + minimum next legal key and retry until we do get the next collection. 45 + 4. repeat from step 2 until the end (no more right-adjacent collections). 46 + 47 + 48 + this probing costs ~one request per collection discovered. wrinkles: 49 + 50 + - on the first request, estimate repo size and just do `getRepo` if it's small. 51 + probing requests count toward PDS rate-limit. 52 + 53 + - the repository can update while we are probing. this is easily detected because 54 + every CAR slice response includes the commit object and MST root, which updates 55 + for any update to the MST. the really nice way to deal with this is to maintain 56 + a sparse MST tree built up from all the probe requests, which can usually be 57 + *updated* directly from the upper changed nodes. at the end, we have a repo- 58 + spanning valid-but-sparse MST that proves all collection boundaries simultaneously. 59 + 60 + what do we do if a collection is added or removed by a mid-probe update? TODO! 61 + 62 + 63 + ## skeleton shower from `com.atproto.sync.getBlocks` 64 + 65 + instead of scanning across the key range on collection boundaries, we could build 66 + our own sparse collection-boundary tree top-down: 67 + 68 + 1. make any `..sync.getRecord` query, to obtain the MST root node 69 + 2. request every MST child node that spans a collection change, using 70 + `com.atproto.sync.getBlocks`. 71 + 3. continue down like this, layer by layer, until reaching the bottom layer. since 72 + `getBlocks` accepts multiple CIDs, we can fetch everything required from each 73 + layer together in one request per layer (unless we need too many blocks) to fit 74 + in the querystring. 75 + 76 + we end up with a nice sparse tree that proves all collection boundaries. MSTs are not 77 + very tall so this might actually be pretty nice, and we directly build a consistent 78 + point-in-time snapshot. 79 + 80 + this fails when 81 + 82 + - any block we need is updated or removed while we're climbing down. in that case we 83 + can retry or fall back to `getRepo`. 84 + - a PDS doesn't implement `getBlocks`. (i have no idea how common it is?) 85 + 86 + 87 + ## we can dream: `"com.atproto.sync.getRepoCollections"` 88 + 89 + maybe one day a PDS endpoint like this will exist, which serves the sparse MST 90 + containing blocks on all collection boundary paths our approaches here end up 91 + building, proving the exact set of collections present in the repo assocaited with an 92 + exact commit.
+1
hacking.md
··· 24 24 - iroh-car: robust, simple, async 25 25 - manual CAR processing: since we need access to adjacent keys 26 26 - TODO: repo-stream will expose this soon probably 27 + - TODO: right now we use jacquard_repo but i think it's easier in our case to handle it more manually. 27 28 - fjall: workload is write-heavy so LSM is a good fit, space efficiency also very desirable 28 29 29 30
+4 -8
readme.md
··· 2 2 3 3 **status: in development** 4 4 5 - lightrail uses the adjacent keys included in CAR slices from firehose commits and `com.atproto.sync.getRecord` responses to detect the first record added and last record removed from a collection in an atproto repo. 6 - 7 - for backfill of large repositories, lightrail probes the repo with `getRecord` requests instead of trusting the `collections` property from `com.atproto.repo.describeRepo`. since there are concrete minimum and maximum `rkey`s for collections, and since `getRecord` always returns adjacent keys *even when a key is not found in a repo*, lightrail can precisely probe the repo along collection boundaries to enumerate every collection. 8 - 9 - repo `#sync` events similarly probe the repository to diff against the recorded repo collections. 10 - 11 - small repositories are detected by inspecting the MST nodes near the root of a CAR slice (eg., from the first `getRecord` probe): small repos are statistically unlikely to contain keys with high MST levels (every CAR slice must include the maximum-level keys of a repository), providing a statistical basis for estimating total repo size. Small repos are fetched in their entirety with `com.atproto.sync.getRepo` instead of probing with numerous `getRecord` requests. 5 + lightrail uses the adjacent keys included in CAR slices from firehose commits to detect the first record added and last record removed from a collection in an atproto repo. 12 6 13 7 compared to Bluesky's [`collectiondir`](https://github.com/bluesky-social/indigo/tree/main/cmd/collectiondir) service, lightrail: 14 8 ··· 16 10 - handles sync1.1 `#sync` events 17 11 - avoids updating its index unless commits actually add or remove collections 18 12 - removes repos from the index when the last record from a repo's collection is removed 19 - - uses authenticated repo contents for backfill instead of `com.atproto.repo.describeRepo` 20 13 21 14 lightrail's CAR slice techniques enable its lightweight implementation, but its primary focus is on accuracy and correctness. 22 15 16 + for backfill, lightrail currently uses `com.atproto.repo.describeRepo`, like Bluesky's `collectiondir`. This is not a robust approach, and will hopefully be replaced by probing that authenticated repo contents (see [./authenticated-collection-list.md](./authenticated-collection-list.md)) soon. 17 + 23 18 24 19 ### wishlist features (probably doable?): 25 20 26 21 - accept multiple collections for `listReposbyCollection` (merge + dedup by DID; works bc key is `<collection>||<did>`) 27 22 - `listReposByCollectionPrefix`, either with additional indexes up the NSID hierarchy, or via merge+dedup. 28 23 - subscribe to multiple relays 24 + - use authenticated repo contents for backfill instead of `com.atproto.repo.describeRepo` (see [./authenticated-collection-list.md](./authenticated-collection-list.md)) 29 25 30 26 31 27 ## contributing
+2 -2
src/api/handler.rs src/server/handler.rs
··· 3 3 //! Uses `jacquard-axum`'s `ExtractXrpc` extractor to deserialise query parameters 4 4 //! directly into the lexicon-generated request types. 5 5 6 - use axum::{extract::State, http::StatusCode, Json}; 6 + use axum::{Json, extract::State, http::StatusCode}; 7 7 use jacquard_api::com_atproto::sync::{ 8 8 get_repo_status::{GetRepoStatusOutput, GetRepoStatusRequest}, 9 9 list_repos_by_collection::{ListReposByCollectionOutput, ListReposByCollectionRequest}, 10 10 }; 11 11 use jacquard_axum::ExtractXrpc; 12 12 13 - use crate::db::DbRef; 13 + use crate::storage::DbRef; 14 14 15 15 /// Handler for `GET /xrpc/com.atproto.sync.listReposByCollection`. 16 16 ///
+2 -3
src/api/mod.rs src/server/mod.rs
··· 8 8 use std::net::SocketAddr; 9 9 10 10 use jacquard_api::com_atproto::sync::{ 11 - get_repo_status::GetRepoStatusRequest, 12 - list_repos_by_collection::ListReposByCollectionRequest, 11 + get_repo_status::GetRepoStatusRequest, list_repos_by_collection::ListReposByCollectionRequest, 13 12 }; 14 13 use jacquard_axum::IntoRouter; 15 14 16 - use crate::db::DbRef; 17 15 use crate::error::Result; 16 + use crate::storage::DbRef; 18 17 19 18 /// Build and serve the axum application on `addr`. 20 19 ///
-17
src/backfill/list_repos.rs
··· 1 - //! Walk `com.atproto.sync.listRepos` with cursor pagination. 2 - //! 3 - //! For each DID encountered, either enqueues it for probing (large repos) or 4 - //! dispatches it to the small-repo fast path. 5 - 6 - use crate::db::DbRef; 7 - use crate::error::Result; 8 - 9 - /// Walk the full `listRepos` feed for `host`, persisting progress after each 10 - /// page so it can be resumed on restart. 11 - /// 12 - /// Uses `jacquard-api`'s `com_atproto::sync::list_repos` XRPC call with cursor 13 - /// pagination. Per-page progress is written via `db::cursor::set_list_repos_progress`. 14 - pub async fn run(host: &str, db: DbRef) -> Result<()> { 15 - let _ = (host, db); 16 - todo!("paginate listRepos, probe each DID, persist cursor after each page") 17 - }
-25
src/backfill/mod.rs
··· 1 - //! Backfill subsystem. 2 - //! 3 - //! Walks `com.atproto.sync.listRepos` and probes each repository to populate 4 - //! the rbc/cbr index before or alongside the live firehose feed. 5 - //! 6 - //! Large repos are enumerated via sequential `getRecord` probing (`probe`): 7 - //! one request per collection, walking right-adjacent MST keys from the minimum 8 - //! legal key to the end of the repo. 9 - //! Small repos take the fast path of fetching the full repo CAR (`small_repo`). 10 - 11 - pub mod list_repos; 12 - pub mod probe; 13 - pub mod small_repo; 14 - 15 - use crate::db::DbRef; 16 - use crate::error::Result; 17 - 18 - /// Run the backfill subsystem for `host`. 19 - /// 20 - /// Resumes from the last-saved listRepos cursor if one exists, then pages 21 - /// through all repos and probes each one. Runs indefinitely until an error 22 - /// occurs (fatal errors) or until the full backfill completes. 23 - pub async fn run(host: String, db: DbRef) -> Result<()> { 24 - list_repos::run(&host, db).await 25 - }
-172
src/backfill/probe.rs
··· 1 - //! `getRecord`-probing for large-repo backfill. 2 - //! 3 - //! MST keys have the form `<collection>/<rkey>`, where `collection` is an NSID 4 - //! and `rkey` is a Record Key, both subject to format restrictions defined in 5 - //! the AT Protocol specs. 6 - //! 7 - //! `getRecord` usually includes the keys adjacent to the queried key in its CAR 8 - //! slice response, and always does when the requested key does not exist. The 9 - //! probing algorithm exploits this to enumerate every collection with one 10 - //! request per collection: 11 - //! 12 - //! 1. Query `getRecord` with the **minimum legal MST key** — the 13 - //! lexicographically lowest string that is a valid `<collection>/<rkey>`. 14 - //! The record won't exist, but the right-adjacent key in the CAR slice is 15 - //! the lowest key actually present in the repo, revealing the first 16 - //! collection. 17 - //! 2. For that collection, compute the **maximum legal rkey** and query 18 - //! `getRecord` with `<collection>/<max_rkey>`. The right-adjacent key in 19 - //! the response is the first key of the *next* collection in the repo. 20 - //! 3. Repeat step 2 for each newly discovered collection until no right- 21 - //! adjacent key is returned, signalling that all collections have been 22 - //! found. 23 - //! 24 - //! It is *possible* for the maximum legal rkey of a collection to be present in 25 - //! a repo, and for its immediate right-adjacent key *not* to be present in the 26 - //! CAR slice response. In such cases, we can compute the very next legal 27 - //! collection and request it with the minimum legal rkey. 28 - //! 29 - //! Repositories can update while being probed. This is detectable because every 30 - //! probe response includes at least one parent block on any changed key's path. 31 - //! Need to get in the weeds but I think if we maintain a sparse tree from the 32 - //! probes, we might even be able to know whether an update added or removed any 33 - //! collections within the area we've already covered? 34 - //! 35 - //! TODO also: there is a `getBlocks` endpoint, which might be an alternative 36 - //! to probing: we could do one probe to get the root, then walk down the tree 37 - //! (on parallel paths even) to build out the sparse collection-boundary 38 - //! skeleton tree. Is this more efficient than probing with min/max rkeys? 39 - //! 40 - //! in either case, if handling repo updates leads to too many re-fetches, we 41 - //! should fall back to `getRepo` and full mst walking. 42 - //! 43 - //! Each discovered `(did, collection)` pair is written to the rbc/cbr index 44 - //! via `db::index::insert`. 45 - 46 - use bytes::Bytes; 47 - use jacquard_api::com_atproto::sync::get_record::{GetRecord, GetRecordError}; 48 - use jacquard_common::{ 49 - error::ClientErrorKind, 50 - types::string::{Did, Nsid, RecordKey, Rkey}, 51 - xrpc::{XrpcError, XrpcExt}, 52 - }; 53 - 54 - use crate::db::DbRef; 55 - use crate::error::{Error, Result}; 56 - 57 - /// minimum legal NSID 58 - /// 59 - /// - whole domain authority must be lowercase 60 - /// - top level domain must start with an alphabetic character 61 - /// - other domain segments cannot begin or end with hyphens 62 - /// - max 253 chars of domain authority before the last name segment 63 - /// - name segment accepts uppercase and must begin with an alphabetic 64 - const MIN_COLLECTION: &str = "a-------------------------------------------------------------0.0-------------------------------------------------------------0.0-------------------------------------------------------------0.0-----------------------------------------------------------0.A"; 65 - 66 - /// minimum legal rkey: `-` = ordinal 45 (.:_~ are 46, 58, 95, 127 respectively) 67 - const MIN_RKEY: &str = "-"; 68 - 69 - /// maximum legal rkey: 512 of the max legal character 70 - const MAX_RKEY: &str = "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"; 71 - 72 - /// Extract the collection segment from an MST key of the form 73 - /// `<collection>/<rkey>`. 74 - fn collection_from_key(key: &str) -> Option<&str> { 75 - key.split_once('/').map(|(col, _)| col) 76 - } 77 - 78 - /// Probe `did` to enumerate its collections via sequential `getRecord` requests. 79 - /// 80 - /// Starts from the minimum legal MST key and follows right-adjacent keys one 81 - /// collection at a time until the end of the repo is reached. One XRPC request 82 - /// is issued per collection present in the repo. 83 - pub async fn probe_repo(host: &str, did: &str, db: DbRef) -> Result<()> { 84 - // MAX_RKEY is 512 '~' characters — the lexicographically largest valid rkey. 85 - let max_rkey: String = "~".repeat(512); 86 - 87 - let client = reqwest::Client::new(); 88 - let base: jacquard_common::url::Url = format!("https://{}", host) 89 - .parse() 90 - .map_err(|e: jacquard_common::url::ParseError| Error::Other(e.to_string()))?; 91 - 92 - // Step 1: probe the minimum legal MST key to discover the first collection. 93 - let probe_key = format!("{}/{}", MIN_COLLECTION, MIN_RKEY); 94 - let car = match fetch_car(&client, &base, did, MIN_COLLECTION, MIN_RKEY).await? { 95 - Some(bytes) => bytes, 96 - None => return Ok(()), // repo is inaccessible or does not exist 97 - }; 98 - let adjacent = crate::mst::adjacent::extract_adjacent(&car, &probe_key).await?; 99 - let mut current_collection = match adjacent.next.as_deref().and_then(collection_from_key) { 100 - Some(col) => col.to_owned(), 101 - None => return Ok(()), // repo has no records 102 - }; 103 - 104 - // Steps 2+: for each discovered collection, insert it and walk to the next. 105 - loop { 106 - crate::db::index::insert(&db, did, &current_collection)?; 107 - 108 - let probe_key = format!("{}/{}", current_collection, max_rkey); 109 - let car = match fetch_car(&client, &base, did, &current_collection, &max_rkey).await? { 110 - Some(bytes) => bytes, 111 - None => break, // repo became inaccessible mid-probe 112 - }; 113 - let adjacent = crate::mst::adjacent::extract_adjacent(&car, &probe_key).await?; 114 - let next_collection = match adjacent.next.as_deref().and_then(collection_from_key) { 115 - Some(col) => col.to_owned(), 116 - None => break, // no more collections 117 - }; 118 - 119 - if next_collection == current_collection { 120 - // Safety guard: the adjacent key should always be in the next 121 - // collection, but avoid an infinite loop if it is not. 122 - break; 123 - } 124 - current_collection = next_collection; 125 - } 126 - 127 - Ok(()) 128 - } 129 - 130 - /// Make one `com.atproto.sync.getRecord` request and return the raw CAR bytes. 131 - /// 132 - /// Returns `None` if the repository is inaccessible (taken down, suspended, 133 - /// deactivated, or not found) or if an unexpected HTTP error occurs. 134 - async fn fetch_car( 135 - client: &reqwest::Client, 136 - base: &jacquard_common::url::Url, 137 - did: &str, 138 - collection: &str, 139 - rkey: &str, 140 - ) -> Result<Option<Bytes>> { 141 - let req = GetRecord { 142 - collection: Nsid::new_owned(collection).map_err(|e| Error::Other(e.to_string()))?, 143 - did: Did::new_owned(did).map_err(|e| Error::Other(e.to_string()))?, 144 - rkey: RecordKey(Rkey::new_owned(rkey).map_err(|e| Error::Other(e.to_string()))?), 145 - }; 146 - 147 - let resp = match client.xrpc(base.clone()).send(&req).await { 148 - Ok(resp) => resp, 149 - Err(e) => { 150 - return match e.kind() { 151 - // Network or unexpected HTTP-level errors: skip this repo. 152 - ClientErrorKind::Transport | ClientErrorKind::Http { .. } => Ok(None), 153 - _ => Err(Error::Other(e.to_string())), 154 - }; 155 - } 156 - }; 157 - 158 - // resp is HTTP 200 or 400 at this point (401 with WWW-Authenticate is 159 - // already surfaced as Err by send()). 160 - match resp.parse() { 161 - Ok(output) => Ok(Some(output.body)), 162 - Err(XrpcError::Xrpc(err)) => match err { 163 - GetRecordError::RepoNotFound(_) 164 - | GetRecordError::RepoTakendown(_) 165 - | GetRecordError::RepoSuspended(_) 166 - | GetRecordError::RepoDeactivated(_) 167 - | GetRecordError::RecordNotFound(_) 168 - | GetRecordError::Unknown(_) => Ok(None), 169 - }, 170 - Err(e) => Err(Error::Other(e.to_string())), 171 - } 172 - }
-20
src/backfill/small_repo.rs
··· 1 - //! Fast path for small repositories: fetch the full repo CAR and extract all 2 - //! collections in a single pass. 3 - //! 4 - //! Small repos are detected by inspecting MST node levels from an initial 5 - //! `getRecord` probe — a high-level node present in a partial CAR slice means 6 - //! the repo is large; absence of high-level nodes suggests a small repo. 7 - 8 - use crate::db::DbRef; 9 - use crate::error::Result; 10 - 11 - /// Fetch the entire repo CAR for `did` via `com.atproto.sync.getRepo` and 12 - /// extract all collection NSIDs in one pass. 13 - /// 14 - /// Streams the CAR response (via `iroh-car` or `jacquard-repo`'s CAR reader), 15 - /// walks every MST leaf to collect all record keys, groups them by collection, 16 - /// and writes the result to the rbc/cbr index. 17 - pub async fn index_small_repo(host: &str, did: &str, db: DbRef) -> Result<()> { 18 - let _ = (host, did, db); 19 - todo!("stream getRepo CAR, parse all MST leaves, index collections") 20 - }
+10 -5
src/db/cursor.rs src/storage/cursor.rs
··· 1 1 //! Firehose cursor and listRepos backfill progress tracking. 2 2 3 - use crate::db::{keys, DbRef}; 4 3 use crate::error::Result; 4 + use crate::storage::{DbRef, keys}; 5 5 6 6 /// Read the last-saved firehose cursor for `host`. 7 7 /// ··· 11 11 match db.cursors.get(key)? { 12 12 None => Ok(None), 13 13 Some(v) => { 14 - let bytes: [u8; 8] = v.as_ref().try_into().map_err(|_| { 15 - crate::error::Error::Other("corrupt cursor value".into()) 16 - })?; 14 + let bytes: [u8; 8] = v 15 + .as_ref() 16 + .try_into() 17 + .map_err(|_| crate::error::Error::Other("corrupt cursor value".into()))?; 17 18 Ok(Some(u64::from_be_bytes(bytes))) 18 19 } 19 20 } ··· 41 42 } 42 43 43 44 /// Write the listRepos backfill progress for `host`. 44 - pub fn set_list_repos_progress(_db: &DbRef, _host: &str, _progress: &ListReposProgress) -> Result<()> { 45 + pub fn set_list_repos_progress( 46 + _db: &DbRef, 47 + _host: &str, 48 + _progress: &ListReposProgress, 49 + ) -> Result<()> { 45 50 todo!("serialize and write ListReposProgress") 46 51 }
+7 -2
src/db/index.rs src/storage/index.rs
··· 1 1 //! rbc / cbr index operations. 2 2 3 - use crate::db::{keys, DbRef}; 4 3 use crate::error::Result; 4 + use crate::storage::{DbRef, keys}; 5 5 6 6 /// Insert a `(collection, did)` pair into both the rbc and cbr indexes. 7 7 pub fn insert(db: &DbRef, did: &str, collection: &str) -> Result<()> { ··· 55 55 /// Iterate over DIDs in the rbc index for `collection`, starting after `cursor`. 56 56 /// 57 57 /// Returns at most `limit` DID strings. 58 - pub fn scan_rbc(db: &DbRef, collection: &str, cursor: Option<&str>, limit: usize) -> Result<Vec<String>> { 58 + pub fn scan_rbc( 59 + db: &DbRef, 60 + collection: &str, 61 + cursor: Option<&str>, 62 + limit: usize, 63 + ) -> Result<Vec<String>> { 59 64 let prefix = keys::rbc_prefix(collection); 60 65 let prefix_len = prefix.len(); 61 66
+2 -1
src/db/keys.rs src/storage/keys.rs
··· 81 81 /// `"subscribeRepos"\0<host>\0"cursor"` — firehose cursor for a relay host. 82 82 pub fn subscribe_cursor(host: &str) -> Vec<u8> { 83 83 const SUFFIX: &[u8] = b"cursor"; 84 - let mut key = Vec::with_capacity(PREFIX_SUBSCRIBE_REPOS.len() + 1 + host.len() + 1 + SUFFIX.len()); 84 + let mut key = 85 + Vec::with_capacity(PREFIX_SUBSCRIBE_REPOS.len() + 1 + host.len() + 1 + SUFFIX.len()); 85 86 key.extend_from_slice(PREFIX_SUBSCRIBE_REPOS); 86 87 key.push(SEP); 87 88 key.extend_from_slice(host.as_bytes());
src/db/mod.rs src/storage/mod.rs
+1 -1
src/db/repo.rs src/storage/repo.rs
··· 1 1 //! Per-repo state storage. 2 2 3 - use crate::db::{keys, DbRef}; 4 3 use crate::error::Result; 4 + use crate::storage::{DbRef, keys}; 5 5 6 6 /// High-level lifecycle state of a repo. 7 7 #[derive(Debug, Clone, PartialEq, Eq)]
+1 -1
src/db/resync.rs src/storage/resync.rs
··· 3 3 //! Keys: `"repoResyncQueue"\0<ts_be:u64>\0<did>` 4 4 //! Values: CBOR payload with the triggering commit, retry count, and retry reason. 5 5 6 - use crate::db::DbRef; 7 6 use crate::error::Result; 7 + use crate::storage::DbRef; 8 8 9 9 /// An item waiting in the resync queue. 10 10 #[derive(Debug, Clone)]
-20
src/firehose/mod.rs
··· 1 - //! Firehose subsystem. 2 - //! 3 - //! Connects to an ATProto relay, validates incoming commits via sync1.1 inductive 4 - //! proofs, and updates the rbc/cbr index on collection additions/removals. 5 - 6 - mod subscriber; 7 - 8 - pub use subscriber::Subscriber; 9 - 10 - use crate::db::DbRef; 11 - use crate::error::Result; 12 - 13 - /// Spawn the firehose subscriber task for `host` and run until it returns. 14 - /// 15 - /// This is the top-level entry point called from `main`. The subscriber handles 16 - /// reconnection internally, so this future only resolves on a fatal error. 17 - pub async fn run(host: String, db: DbRef) -> Result<()> { 18 - let mut sub = Subscriber::new(host, db); 19 - sub.run().await 20 - }
+8 -1
src/firehose/subscriber.rs src/sync/subscribe_repos.rs
··· 1 + //! Firehose subsystem. 2 + //! 3 + //! Connects to an ATProto relay, validates incoming commits via sync1.1 inductive 4 + //! proofs, and updates the rbc/cbr index on collection additions/removals. 5 + 1 6 //! Firehose WebSocket subscriber. 2 7 //! 3 8 //! Connects to an ATProto relay using `jacquard-common`'s `SubscriptionExt` + 4 9 //! `TungsteniteClient`, persists/restores the sequence cursor via `db::cursor`, 5 10 //! and dispatches decoded events to the appropriate handlers. 6 11 7 - use crate::db::DbRef; 12 + // pub use subscriber::Subscriber; 13 + 8 14 use crate::error::Result; 15 + use crate::storage::DbRef; 9 16 10 17 /// Manages a single WebSocket connection to a relay firehose. 11 18 pub struct Subscriber {
+4 -11
src/main.rs
··· 1 - mod api; 2 - mod backfill; 3 - mod db; 4 1 mod error; 5 - mod firehose; 6 2 mod mst; 3 + mod server; 4 + mod storage; 5 + mod sync; 7 6 8 7 use clap::Parser; 9 8 use std::net::SocketAddr; ··· 30 29 #[tokio::main] 31 30 async fn main() -> Result<()> { 32 31 let args = Args::parse(); 33 - let db = db::open(&args.db_path)?; 34 - 35 - tokio::select! { 36 - result = firehose::run(args.subscribe.clone(), db.clone()) => result?, 37 - result = backfill::run(args.subscribe, db.clone()) => result?, 38 - result = api::serve(args.listen, db) => result?, 39 - } 32 + let _db = storage::open(&args.db_path)?; 40 33 41 34 Ok(()) 42 35 }
+19
src/sync/backfill.rs
··· 1 + //! Walk `com.atproto.sync.listRepos` with cursor pagination. 2 + //! 3 + //! For each DID encountered, calls `small_repo::index_repo` to enumerate its 4 + //! collections via `describeRepo` (or `getRepo` fallback) and write them to 5 + //! the rbc/cbr index. 6 + 7 + use crate::error::Result; 8 + use crate::storage::DbRef; 9 + 10 + /// Walk the full `listRepos` feed for `host`, persisting progress after each 11 + /// page so it can be resumed on restart. 12 + /// 13 + /// Uses `jacquard-api`'s `com_atproto::sync::list_repos` XRPC call with cursor 14 + /// pagination. For each DID, calls `small_repo::index_repo`. Per-page progress 15 + /// is written via `db::cursor::set_list_repos_progress`. 16 + pub async fn run(host: &str, db: DbRef) -> Result<()> { 17 + let _ = (host, db); 18 + todo!("paginate listRepos, call small_repo::index_repo for each DID, persist cursor") 19 + }
+48
src/sync/describe_repo.rs
··· 1 + //! Per-repository indexing via `com.atproto.repo.describeRepo` with a 2 + //! `com.atproto.sync.getRepo` full-CAR fallback. 3 + //! 4 + //! `describeRepo` is tried first because it is cheap: one request, no CAR 5 + //! parsing, and the PDS directly returns its `collections` array. If the 6 + //! request fails or returns no collections, the full repository CAR is fetched 7 + //! and every MST leaf is walked to enumerate collections. 8 + 9 + use jacquard_api::com_atproto::repo::describe_repo::DescribeRepo; 10 + use jacquard_common::{error::ClientErrorKind, types::ident::AtIdentifier, xrpc::XrpcExt}; 11 + 12 + use crate::error::{Error, Result}; 13 + 14 + /// Call `com.atproto.repo.describeRepo` and return the collections list. 15 + /// 16 + /// Returns `None` on any network or XRPC error (the caller falls back to 17 + /// getRepo). Returns `Some(vec![])` if the response contains no collections. 18 + async fn try_describe_repo( 19 + client: &reqwest::Client, 20 + base: &jacquard_common::url::Url, 21 + did: &str, 22 + ) -> Result<Option<Vec<String>>> { 23 + let req = DescribeRepo { 24 + repo: AtIdentifier::new_owned(did).map_err(|e| Error::Other(e.to_string()))?, 25 + }; 26 + 27 + let resp = match client.xrpc(base.clone()).send(&req).await { 28 + Ok(resp) => resp, 29 + Err(e) => { 30 + return match e.kind() { 31 + ClientErrorKind::Transport | ClientErrorKind::Http { .. } => Ok(None), 32 + _ => Err(Error::Other(e.to_string())), 33 + }; 34 + } 35 + }; 36 + 37 + match resp.parse() { 38 + Ok(output) => { 39 + let collections = output 40 + .collections 41 + .iter() 42 + .map(|c| c.as_str().to_owned()) 43 + .collect(); 44 + Ok(Some(collections)) 45 + } 46 + Err(_) => Ok(None), 47 + } 48 + }
+138
src/sync/get_repo.rs
··· 1 + use crate::storage::DbRef; 2 + use crate::sync::try_describe_repo; 3 + use bytes::Bytes; 4 + use jacquard_api::com_atproto::sync::get_repo::GetRepoError; 5 + use jacquard_common::error::ClientErrorKind; 6 + use jacquard_common::xrpc::XrpcError; 7 + use jacquard_common::xrpc::XrpcExt; 8 + use jacquard_repo::MemoryBlockStore; 9 + use jacquard_repo::Mst; 10 + use jacquard_repo::car::parse_car_bytes; 11 + use jacquard_repo::commit::Commit; 12 + use jacquard_repo::mst::CursorPosition; 13 + use jacquard_repo::mst::MstCursor; 14 + use std::sync::Arc; 15 + 16 + use crate::error::{Error, Result}; 17 + use jacquard_api::com_atproto::sync::get_repo::GetRepo; 18 + 19 + use jacquard_common::types::string::Did; 20 + 21 + /// Fetch the full repo CAR via `com.atproto.sync.getRepo`. 22 + /// 23 + /// Returns `None` if the repo is inaccessible (taken down, suspended, etc.). 24 + async fn fetch_repo_car( 25 + client: &reqwest::Client, 26 + base: &jacquard_common::url::Url, 27 + did: &str, 28 + ) -> Result<Option<Bytes>> { 29 + let req = GetRepo { 30 + did: Did::new_owned(did).map_err(|e| Error::Other(e.to_string()))?, 31 + since: None, 32 + }; 33 + 34 + let resp = match client.xrpc(base.clone()).send(&req).await { 35 + Ok(resp) => resp, 36 + Err(e) => { 37 + return match e.kind() { 38 + ClientErrorKind::Transport | ClientErrorKind::Http { .. } => Ok(None), 39 + _ => Err(Error::Other(e.to_string())), 40 + }; 41 + } 42 + }; 43 + 44 + match resp.parse() { 45 + Ok(output) => Ok(Some(output.body)), 46 + Err(XrpcError::Xrpc(err)) => match err { 47 + GetRepoError::RepoNotFound(_) 48 + | GetRepoError::RepoTakendown(_) 49 + | GetRepoError::RepoSuspended(_) 50 + | GetRepoError::RepoDeactivated(_) 51 + | GetRepoError::Unknown(_) => Ok(None), 52 + }, 53 + Err(e) => Err(Error::Other(e.to_string())), 54 + } 55 + } 56 + 57 + /// Fetch the full repo CAR via `com.atproto.sync.getRepo`, walk every MST 58 + /// leaf, and write each discovered collection to the index. 59 + async fn index_via_get_repo( 60 + client: &reqwest::Client, 61 + base: &jacquard_common::url::Url, 62 + did: &str, 63 + db: &DbRef, 64 + ) -> Result<()> { 65 + let car = match fetch_repo_car(client, base, did).await? { 66 + Some(b) => b, 67 + None => return Ok(()), // repo inaccessible; skip silently 68 + }; 69 + 70 + let parsed = parse_car_bytes(&car) 71 + .await 72 + .map_err(|e| Error::Other(e.to_string()))?; 73 + 74 + let mst_root = { 75 + let commit_bytes = parsed 76 + .blocks 77 + .get(&parsed.root) 78 + .ok_or_else(|| Error::Other("getRepo CAR has no commit block".into()))?; 79 + let commit = Commit::from_cbor(commit_bytes.as_ref()) 80 + .map_err(|e| Error::Other(format!("bad commit in getRepo CAR: {}", e)))?; 81 + *commit.data() 82 + }; 83 + 84 + let storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 85 + let mst = Mst::load(storage, mst_root, None); 86 + 87 + // MST keys are `<collection>/<rkey>` in sorted order. Records within a 88 + // collection are consecutive, so tracking the last-seen collection avoids 89 + // redundant index writes. 90 + let mut cursor = MstCursor::new(mst); 91 + let mut last_col: Option<String> = None; 92 + 93 + loop { 94 + match cursor.current() { 95 + CursorPosition::End => break, 96 + CursorPosition::Leaf { key, .. } => { 97 + if let Some(col) = key.as_str().split_once('/').map(|(c, _)| c) { 98 + if last_col.as_deref() != Some(col) { 99 + crate::storage::index::insert(db, did, col)?; 100 + last_col = Some(col.to_owned()); 101 + } 102 + } 103 + cursor.advance().await.ok(); 104 + } 105 + CursorPosition::Tree { .. } => { 106 + cursor.advance().await.ok(); 107 + } 108 + } 109 + } 110 + 111 + Ok(()) 112 + } 113 + 114 + /// Index a repository by enumerating its collections. 115 + /// 116 + /// Tries `com.atproto.repo.describeRepo` first. On success, inserts each 117 + /// returned collection into the rbc/cbr index and returns. On failure or an 118 + /// empty collection list, fetches the full repo CAR via 119 + /// `com.atproto.sync.getRepo` and walks every MST leaf instead. 120 + pub async fn index_repo(host: &str, did: &str, db: DbRef) -> Result<()> { 121 + let client = reqwest::Client::new(); 122 + let base: jacquard_common::url::Url = format!("https://{}", host) 123 + .parse() 124 + .map_err(|e: jacquard_common::url::ParseError| Error::Other(e.to_string()))?; 125 + 126 + // Try describeRepo first — it is cheap and usually sufficient. 127 + if let Some(collections) = try_describe_repo(&client, &base, did).await? { 128 + if !collections.is_empty() { 129 + for col in &collections { 130 + crate::storage::index::insert(&db, did, col)?; 131 + } 132 + return Ok(()); 133 + } 134 + } 135 + 136 + // Fall back to getRepo: fetch the full CAR and walk the MST. 137 + index_via_get_repo(&client, &base, did, &db).await 138 + }
+46
src/sync/mod.rs
··· 1 + use jacquard_api::com_atproto::repo::describe_repo::DescribeRepo; 2 + use jacquard_common::error::ClientErrorKind; 3 + 4 + use crate::error::{Error, Result}; 5 + use jacquard_common::types::string::AtIdentifier; 6 + use jacquard_common::xrpc::XrpcExt; 7 + pub mod backfill; 8 + pub mod describe_repo; 9 + pub mod get_repo; 10 + pub mod subscribe_repos; 11 + 12 + /// Call `com.atproto.repo.describeRepo` and return the collections list. 13 + /// 14 + /// Returns `None` on any network or XRPC error (the caller falls back to 15 + /// getRepo). Returns `Some(vec![])` if the response contains no collections. 16 + async fn try_describe_repo( 17 + client: &reqwest::Client, 18 + base: &jacquard_common::url::Url, 19 + did: &str, 20 + ) -> Result<Option<Vec<String>>> { 21 + let req = DescribeRepo { 22 + repo: AtIdentifier::new_owned(did).map_err(|e| Error::Other(e.to_string()))?, 23 + }; 24 + 25 + let resp = match client.xrpc(base.clone()).send(&req).await { 26 + Ok(resp) => resp, 27 + Err(e) => { 28 + return match e.kind() { 29 + ClientErrorKind::Transport | ClientErrorKind::Http { .. } => Ok(None), 30 + _ => Err(Error::Other(e.to_string())), 31 + }; 32 + } 33 + }; 34 + 35 + match resp.parse() { 36 + Ok(output) => { 37 + let collections = output 38 + .collections 39 + .iter() 40 + .map(|c| c.as_str().to_owned()) 41 + .collect(); 42 + Ok(Some(collections)) 43 + } 44 + Err(_) => Ok(None), 45 + } 46 + }