lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement the collections list

this will be used for setting up the parallel merge walker for multi-collecion support

phil ca9ce226 9e2afd33

+229 -12
+23 -10
hacking.md
··· 35 35 - [x] #commit and #sync 36 36 - [x] make sure blocking db calls are in `spawn_blocking`!! 37 37 - [x] db queries 38 - - [~] configuration 39 - - [~] copy applicable from tap 40 - - [ ] copy applicable from collectiondir 38 + - [x] implement `com.atproto.sync.listRepos` 41 39 - [x] sync1.1!!! 42 40 - [x] verify #commit event 43 41 - [x] verify #sync event 44 42 - [x] inductive proof for #commits 43 + - [~] configuration 44 + - [~] copy applicable from tap 45 + - [ ] copy applicable from collectiondir 45 46 - [~] actually firehose-index!! 46 47 - [x] extract collections-added/removed directly from CAR slice 47 48 - [ ] (spend some time on tests here) 48 49 - [x] do the thing (write them to the db) 49 50 - [ ] swap in repo-stream 51 + - [~] metrics 52 + - [x] basic metrics 53 + - [x] serve prom-style 54 + - [ ] copy applicable ones from collectiondir 55 + - [~] prefix-merge walker (limit by total collections to be merged?) 56 + - [x] add an all-collections index 57 + - [ ] swap in repo-stream for backfill 58 + - [ ] with memory limit 59 + - [ ] with a global concurrency limit for big repos 60 + - [ ] with disk spilling for huge repo 61 + - [ ] with queueing resync for large repos if resources are taken?? 62 + - [ ] (self-reminder: get_repo should be rare in lightrail) 50 63 - [ ] lenient sync1.1 51 64 - [ ] *don't* allow non-validating commits that look like sync1.1 52 65 - [ ] rachet by PDS host: be lenient if we have never seen a sync1.1-looking commit, always strict after we see one. ··· 57 70 - [ ] "deep crawl" mode for relays that listHosts -> listRepos on host instead of relying on relay listRepos 58 71 - [ ] special did:web behaviour to keep reusing a stale resolution on failure 59 72 - [ ] filter dids from inactive accounts 60 - - [ ] implement `com.atproto.sync.listRepos` 61 73 - [ ] multi-collection parallel walk/merge 62 - - [ ] add collection-list index 63 - - [ ] prefix-merge walker (limit by total collections to be merged?) 64 - - [ ] do we need 65 - - [ ] metrics 66 - - [ ] copy applicable ones from collectiondir 67 - - [ ] serve prom-style 68 74 - [ ] admin view of backfill state etc 69 75 - [ ] vanity stats for optimizations, like how many in-flight repos were saved from resync due to high-water-mark firehose cursor persistence 70 76 - [ ] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale? ··· 106 112 ## state/db models 107 113 108 114 taking [inspiration from tap](https://github.com/bluesky-social/indigo/blob/main/cmd/tap/models/models.go) here! 115 + 116 + TODO: fix outdated prefixes here 109 117 110 118 ``` 111 119 main index: ··· 152 160 "repoPrev"||<did> => <rev:string>||<prevData:cid> 153 161 154 162 note: kept separate and small because it very frequently updates! 163 + 164 + 165 + all-collections list: 166 + 167 + "col"||<collection> => () 155 168 156 169 157 170 resync queue:
+196
src/storage/collection_list.rs
··· 1 + //! Global collection list — every NSID ever seen across all repos. 2 + //! 3 + //! Key: `"col"<nsid>` — presence indicates the collection exists (or once existed). 4 + //! Value: empty. 5 + //! 6 + //! Entries are written on every collection birth and are never deleted, so the 7 + //! set can retain NSIDs after all repos have removed their last record. This is 8 + //! acceptable: the index is a superset of live collections, not an exact set. 9 + //! 10 + //! Writes are blind overwrites — no read-before-write. With only a few thousand 11 + //! collections in the network the key space is tiny and compaction handles the 12 + //! redundant writes cheaply. 13 + 14 + use jacquard_common::types::nsid::Nsid; 15 + 16 + use crate::storage::{DbRef, PREFIX_COLLECTION_LIST, StorageResult, error::StorageError}; 17 + 18 + // --------------------------------------------------------------------------- 19 + // Key encoding 20 + // --------------------------------------------------------------------------- 21 + 22 + fn key(collection: Nsid<'_>) -> Vec<u8> { 23 + let s = collection.as_str(); 24 + let mut k = Vec::with_capacity(PREFIX_COLLECTION_LIST.len() + s.len()); 25 + k.extend_from_slice(&PREFIX_COLLECTION_LIST); 26 + k.extend_from_slice(s.as_bytes()); 27 + k 28 + } 29 + 30 + fn parse_nsid(raw_key: &[u8]) -> StorageResult<Nsid<'static>> { 31 + let prefix_len = PREFIX_COLLECTION_LIST.len(); 32 + let key_str = String::from_utf8_lossy(raw_key).to_string(); 33 + let nsid_bytes = raw_key.get(prefix_len..).ok_or(StorageError::Corrupt { 34 + key: key_str.clone(), 35 + reason: "collection list key shorter than prefix", 36 + })?; 37 + let nsid_str = std::str::from_utf8(nsid_bytes).map_err(|_| StorageError::Corrupt { 38 + key: key_str.clone(), 39 + reason: "non-UTF-8 NSID in collection list key", 40 + })?; 41 + Nsid::new_owned(nsid_str).map_err(|_| StorageError::Corrupt { 42 + key: key_str, 43 + reason: "invalid NSID in collection list key", 44 + }) 45 + } 46 + 47 + // --------------------------------------------------------------------------- 48 + // Write 49 + // --------------------------------------------------------------------------- 50 + 51 + /// Add `collection` to the global list within an existing batch. 52 + /// 53 + /// Blindly overwrites any existing entry — no read needed. 54 + pub fn insert_into(batch: &mut fjall::OwnedWriteBatch, db: &DbRef, collection: Nsid<'_>) { 55 + batch.insert(&db.ks, key(collection), b""); 56 + } 57 + 58 + // --------------------------------------------------------------------------- 59 + // Read 60 + // --------------------------------------------------------------------------- 61 + 62 + /// Iterate over all known collections, starting at `cursor` (inclusive). 63 + /// 64 + /// Returns at most `limit` NSIDs sorted lexicographically. `next` is the first 65 + /// NSID of the following page, or `None` if this is the last page. 66 + pub fn scan( 67 + db: &DbRef, 68 + cursor: Option<Nsid<'_>>, 69 + limit: usize, 70 + ) -> StorageResult<(Vec<Nsid<'static>>, Option<Nsid<'static>>)> { 71 + let start_key: Vec<u8> = { 72 + let mut k = PREFIX_COLLECTION_LIST.to_vec(); 73 + if let Some(ref nsid) = cursor { 74 + k.extend_from_slice(nsid.as_str().as_bytes()); 75 + } 76 + k 77 + }; 78 + 79 + let mut ranger = db.ks.range(start_key..); 80 + let mut collections = Vec::with_capacity(limit); 81 + 82 + for guard in ranger.by_ref() { 83 + let (k, _v) = guard.into_inner()?; 84 + if !k.starts_with(&PREFIX_COLLECTION_LIST) { 85 + break; 86 + } 87 + collections.push(parse_nsid(&k)?); 88 + if collections.len() >= limit { 89 + break; 90 + } 91 + } 92 + 93 + let next = loop { 94 + let Some(guard) = ranger.next() else { 95 + break None; 96 + }; 97 + let key = guard.key()?; 98 + if !key.starts_with(&PREFIX_COLLECTION_LIST) { 99 + break None; 100 + } 101 + match parse_nsid(&key) { 102 + Ok(nsid) => break Some(nsid), 103 + Err(_) => continue, 104 + } 105 + }; 106 + 107 + Ok((collections, next)) 108 + } 109 + 110 + // --------------------------------------------------------------------------- 111 + // Tests 112 + // --------------------------------------------------------------------------- 113 + 114 + #[cfg(test)] 115 + mod tests { 116 + use super::*; 117 + use crate::storage::open_temporary; 118 + 119 + fn nsid(s: &str) -> Nsid<'static> { 120 + Nsid::new_owned(s.to_owned()).unwrap() 121 + } 122 + 123 + #[test] 124 + fn insert_and_scan_single() { 125 + let db = open_temporary().unwrap(); 126 + let mut batch = db.database.batch(); 127 + insert_into(&mut batch, &db, nsid("app.bsky.feed.post")); 128 + batch.commit().unwrap(); 129 + 130 + let (cols, next) = scan(&db, None, 100).unwrap(); 131 + assert_eq!(cols, vec![nsid("app.bsky.feed.post")]); 132 + assert!(next.is_none()); 133 + } 134 + 135 + #[test] 136 + fn scan_is_sorted() { 137 + let db = open_temporary().unwrap(); 138 + let mut batch = db.database.batch(); 139 + insert_into(&mut batch, &db, nsid("app.bsky.graph.follow")); 140 + insert_into(&mut batch, &db, nsid("app.bsky.actor.profile")); 141 + insert_into(&mut batch, &db, nsid("app.bsky.feed.post")); 142 + batch.commit().unwrap(); 143 + 144 + let (cols, _) = scan(&db, None, 100).unwrap(); 145 + assert_eq!( 146 + cols, 147 + vec![ 148 + nsid("app.bsky.actor.profile"), 149 + nsid("app.bsky.feed.post"), 150 + nsid("app.bsky.graph.follow"), 151 + ] 152 + ); 153 + } 154 + 155 + #[test] 156 + fn blind_overwrite_is_idempotent() { 157 + let db = open_temporary().unwrap(); 158 + for _ in 0..3 { 159 + let mut batch = db.database.batch(); 160 + insert_into(&mut batch, &db, nsid("app.bsky.feed.post")); 161 + batch.commit().unwrap(); 162 + } 163 + let (cols, _) = scan(&db, None, 100).unwrap(); 164 + assert_eq!(cols, vec![nsid("app.bsky.feed.post")]); 165 + } 166 + 167 + #[test] 168 + fn scan_pagination() { 169 + let db = open_temporary().unwrap(); 170 + let mut batch = db.database.batch(); 171 + for col in &[ 172 + "app.bsky.actor.profile", 173 + "app.bsky.feed.like", 174 + "app.bsky.feed.post", 175 + "app.bsky.graph.follow", 176 + ] { 177 + insert_into(&mut batch, &db, nsid(col)); 178 + } 179 + batch.commit().unwrap(); 180 + 181 + let (page1, next) = scan(&db, None, 2).unwrap(); 182 + assert_eq!( 183 + page1, 184 + vec![nsid("app.bsky.actor.profile"), nsid("app.bsky.feed.like")] 185 + ); 186 + let cursor = next.unwrap(); 187 + assert_eq!(cursor, nsid("app.bsky.feed.post")); 188 + 189 + let (page2, next2) = scan(&db, Some(cursor), 2).unwrap(); 190 + assert_eq!( 191 + page2, 192 + vec![nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")] 193 + ); 194 + assert!(next2.is_none()); 195 + } 196 + }
+3
src/storage/mod.rs
··· 1 1 pub mod backfill_progress; 2 2 pub mod collection_index; 3 + pub mod collection_list; 3 4 pub mod error; 4 5 pub mod firehose_cursor; 5 6 pub mod repo; ··· 19 20 /// Fixed-length (3 byte) key prefix per data type 20 21 type KeyPrefix = [u8; 3]; 21 22 23 + /// Global collection list (collection → ()). See [`collection_list`]. 24 + pub(crate) const PREFIX_COLLECTION_LIST: KeyPrefix = *b"col"; 22 25 /// Main collection index (collection → did). See [`collection_index`]. 23 26 pub(crate) const PREFIX_RBC: KeyPrefix = *b"rbc"; 24 27 /// Reversed collection index (did → collection). See [`collection_index`].
+3
src/storage/repo.rs
··· 346 346 Vec<(Did<'static>, RepoInfo, Option<RepoPrev>)>, 347 347 Option<Did<'static>>, 348 348 )> { 349 + // TODO: use fjall prefix_range 350 + // TODO: probably snapshot so we get a consistent account view? 351 + 349 352 let prefix_len = PREFIX_REPO.len(); 350 353 351 354 let start_key: Vec<u8> = {
+4 -2
src/sync/firehose/commit_event.rs
··· 494 494 } 495 495 496 496 // All checks passed — atomically update the chain tip and the collection 497 - // index (born → insert, died → remove). 497 + // index (born → insert, died → remove). Also record each born collection 498 + // in the global collection list (blind overwrite, never deleted). 498 499 let mut batch = db.database.batch(); 499 500 storage::repo::put_prev_into( 500 501 &mut batch, ··· 506 507 }, 507 508 ); 508 509 for coll in born { 509 - storage::collection_index::insert_into(&mut batch, db, did.clone(), coll); 510 + storage::collection_index::insert_into(&mut batch, db, did.clone(), coll.clone()); 511 + storage::collection_list::insert_into(&mut batch, db, coll); 510 512 } 511 513 for coll in died { 512 514 storage::collection_index::remove_into(&mut batch, db, did.clone(), coll);