lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

db collection sync

phil e2a3db7e 0bc3d74d

+250 -16
+234 -12
src/storage/index.rs
··· 1 1 //! rbc / cbr index operations. 2 2 3 + use std::cmp::Ordering; 4 + 3 5 use jacquard_common::types::nsid::Nsid; 4 6 use jacquard_common::types::string::Did; 5 7 6 8 use crate::error::Result; 7 9 use crate::storage::{DbRef, keys}; 8 - use crate::sync::resync::RepoSnapshot; 10 + 11 + /// Add a `(collection, did)` pair to both indexes within an existing batch. 12 + /// 13 + /// Use this when building a larger multi-operation batch. For standalone 14 + /// inserts, prefer [`insert`]. 15 + pub fn insert_into( 16 + batch: &mut fjall::OwnedWriteBatch, 17 + db: &DbRef, 18 + did: Did<'_>, 19 + collection: Nsid<'_>, 20 + ) { 21 + batch.insert(&db.rbc, keys::rbc(collection.clone(), did.clone()), b""); 22 + batch.insert(&db.cbr, keys::cbr(did, collection), b""); 23 + } 24 + 25 + /// Remove a `(collection, did)` pair from both indexes within an existing batch. 26 + /// 27 + /// Use this when building a larger multi-operation batch. For standalone 28 + /// removes, prefer [`remove`]. 29 + pub fn remove_into( 30 + batch: &mut fjall::OwnedWriteBatch, 31 + db: &DbRef, 32 + did: Did<'_>, 33 + collection: Nsid<'_>, 34 + ) { 35 + batch.remove(&db.rbc, keys::rbc(collection.clone(), did.clone())); 36 + batch.remove(&db.cbr, keys::cbr(did, collection)); 37 + } 9 38 10 39 /// Insert a `(collection, did)` pair into both the rbc and cbr indexes. 11 40 pub fn insert(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> Result<()> { 12 41 let mut batch = db.database.batch(); 13 - batch.insert(&db.rbc, keys::rbc(collection.clone(), did.clone()), b""); 14 - batch.insert(&db.cbr, keys::cbr(did, collection), b""); 42 + insert_into(&mut batch, db, did, collection); 15 43 batch.commit()?; 16 44 Ok(()) 17 45 } ··· 19 47 /// Remove a `(collection, did)` pair from both indexes. 20 48 pub fn remove(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> Result<()> { 21 49 let mut batch = db.database.batch(); 22 - batch.remove(&db.rbc, keys::rbc(collection.clone(), did.clone())); 23 - batch.remove(&db.cbr, keys::cbr(did, collection)); 50 + remove_into(&mut batch, db, did, collection); 24 51 batch.commit()?; 25 52 Ok(()) 26 53 } ··· 52 79 Ok(()) 53 80 } 54 81 82 + /// Sync the rbc/cbr indexes for `did` to match `collections`. 83 + /// 84 + /// Reads the current collection set from the cbr index and merge-joins it 85 + /// against `collections` (which must be **sorted**). Collections present in 86 + /// the index but absent from the snapshot are deleted; collections in the 87 + /// snapshot but absent from the index are inserted. The whole diff is applied 88 + /// in a single batch. 89 + /// 90 + /// Typically called as `sync_collections(db, did, &snapshot.collections)`. 91 + pub fn sync_collections(db: &DbRef, did: Did<'_>, collections: &[Nsid<'static>]) -> Result<()> { 92 + // Read the current set from the cbr index. Fjall iterates in key order, and 93 + // the cbr prefix puts the collection NSID last, so the suffix scan is already 94 + // sorted lexicographically. 95 + let prefix = keys::cbr_prefix(did.clone()); 96 + let prefix_len = prefix.len(); 97 + let existing: Vec<Nsid<'static>> = db 98 + .cbr 99 + .prefix(prefix) 100 + .map(|guard| { 101 + guard 102 + .into_inner() 103 + .map(|(k, _v)| keys::cbr_parse_collection(&k, prefix_len)) 104 + }) 105 + .collect::<fjall::Result<Vec<_>>>()? 106 + .into_iter() 107 + .flatten() 108 + .collect(); 109 + 110 + let mut batch = db.database.batch(); 111 + 112 + // Merge-join two sorted sequences. 113 + // We separate the peek (to determine ordering) from the advance (next()) 114 + // so that the temporary peek borrows are dropped before we mutate the iterators. 115 + let mut ei = existing.into_iter().peekable(); 116 + let mut si = collections.iter().cloned().peekable(); 117 + 118 + loop { 119 + let ord = match (ei.peek(), si.peek()) { 120 + (None, None) => break, 121 + (Some(_), None) => Ordering::Less, 122 + (None, Some(_)) => Ordering::Greater, 123 + (Some(e), Some(s)) => e.cmp(s), 124 + }; 125 + // Peek borrows are dropped here; safe to call next() below. 126 + match ord { 127 + Ordering::Less => { 128 + remove_into(&mut batch, db, did.clone(), ei.next().unwrap()); 129 + } 130 + Ordering::Greater => { 131 + insert_into(&mut batch, db, did.clone(), si.next().unwrap()); 132 + } 133 + Ordering::Equal => { 134 + ei.next(); 135 + si.next(); 136 + } 137 + } 138 + } 139 + 140 + batch.commit()?; 141 + Ok(()) 142 + } 143 + 144 + #[cfg(test)] 145 + mod tests { 146 + use super::*; 147 + use crate::storage::open_temporary; 148 + 149 + fn nsid(s: &str) -> Nsid<'static> { 150 + Nsid::new_owned(s).unwrap() 151 + } 152 + 153 + fn did(s: &str) -> Did<'static> { 154 + Did::new_owned(s).unwrap() 155 + } 156 + 157 + fn collections_for(db: &crate::storage::DbRef, d: &Did<'static>) -> Vec<Nsid<'static>> { 158 + scan_cbr(db, d.clone(), None, 1000).unwrap() 159 + } 160 + 161 + #[test] 162 + fn sync_adds_to_empty_index() { 163 + let db = open_temporary().unwrap(); 164 + let d = did("did:web:alice.com"); 165 + 166 + sync_collections( 167 + &db, 168 + d.clone(), 169 + &[nsid("app.bsky.actor.profile"), nsid("app.bsky.feed.post")], 170 + ) 171 + .unwrap(); 172 + 173 + assert_eq!( 174 + collections_for(&db, &d), 175 + vec![nsid("app.bsky.actor.profile"), nsid("app.bsky.feed.post"),] 176 + ); 177 + } 178 + 179 + #[test] 180 + fn sync_removes_all_when_snapshot_empty() { 181 + let db = open_temporary().unwrap(); 182 + let d = did("did:web:alice.com"); 183 + 184 + insert(&db, d.clone(), nsid("app.bsky.actor.profile")).unwrap(); 185 + insert(&db, d.clone(), nsid("app.bsky.feed.post")).unwrap(); 186 + insert(&db, d.clone(), nsid("app.bsky.graph.follow")).unwrap(); 187 + 188 + sync_collections(&db, d.clone(), &[]).unwrap(); 189 + 190 + assert_eq!(collections_for(&db, &d), vec![]); 191 + } 192 + 193 + #[test] 194 + fn sync_noop_when_equal() { 195 + let db = open_temporary().unwrap(); 196 + let d = did("did:web:alice.com"); 197 + 198 + let cols = [nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")]; 199 + for col in &cols { 200 + insert(&db, d.clone(), col.clone()).unwrap(); 201 + } 202 + 203 + sync_collections(&db, d.clone(), &cols).unwrap(); 204 + 205 + assert_eq!(collections_for(&db, &d), cols.to_vec()); 206 + } 207 + 208 + #[test] 209 + fn sync_adds_and_removes_in_one_batch() { 210 + let db = open_temporary().unwrap(); 211 + let d = did("did:web:alice.com"); 212 + 213 + // Existing: profile, post. 214 + insert(&db, d.clone(), nsid("app.bsky.actor.profile")).unwrap(); 215 + insert(&db, d.clone(), nsid("app.bsky.feed.post")).unwrap(); 216 + 217 + // Snapshot: post, follow — profile removed, follow added. 218 + sync_collections( 219 + &db, 220 + d.clone(), 221 + &[nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")], 222 + ) 223 + .unwrap(); 224 + 225 + assert_eq!( 226 + collections_for(&db, &d), 227 + vec![nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow"),] 228 + ); 229 + } 230 + 231 + #[test] 232 + fn sync_keeps_rbc_consistent_with_cbr() { 233 + // Verify both sides of the index are updated atomically. 234 + let db = open_temporary().unwrap(); 235 + let d = did("did:web:alice.com"); 236 + 237 + insert(&db, d.clone(), nsid("app.bsky.actor.profile")).unwrap(); 238 + insert(&db, d.clone(), nsid("app.bsky.feed.post")).unwrap(); 239 + 240 + // Remove profile, keep post, add follow. 241 + sync_collections( 242 + &db, 243 + d.clone(), 244 + &[nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")], 245 + ) 246 + .unwrap(); 247 + 248 + assert_eq!( 249 + scan_rbc(&db, nsid("app.bsky.actor.profile"), None, 10).unwrap(), 250 + vec![] as Vec<Did<'static>>, 251 + "removed collection should not appear in rbc", 252 + ); 253 + assert_eq!( 254 + scan_rbc(&db, nsid("app.bsky.feed.post"), None, 10).unwrap(), 255 + vec![d.clone()], 256 + "unchanged collection should remain in rbc", 257 + ); 258 + assert_eq!( 259 + scan_rbc(&db, nsid("app.bsky.graph.follow"), None, 10).unwrap(), 260 + vec![d.clone()], 261 + "added collection should appear in rbc", 262 + ); 263 + } 264 + 265 + #[test] 266 + fn sync_does_not_affect_other_dids() { 267 + let db = open_temporary().unwrap(); 268 + let alice = did("did:web:alice.com"); 269 + let bob = did("did:web:bob.com"); 270 + 271 + insert(&db, alice.clone(), nsid("app.bsky.feed.post")).unwrap(); 272 + insert(&db, bob.clone(), nsid("app.bsky.feed.post")).unwrap(); 273 + 274 + // Sync alice to empty; bob should be unaffected. 275 + sync_collections(&db, alice.clone(), &[]).unwrap(); 276 + 277 + assert_eq!(collections_for(&db, &alice), vec![]); 278 + assert_eq!(collections_for(&db, &bob), vec![nsid("app.bsky.feed.post")]); 279 + } 280 + } 281 + 55 282 /// Iterate over DIDs in the rbc index for `collection`, starting after `cursor`. 56 283 /// 57 284 /// Returns at most `limit` DIDs. ··· 95 322 Ok(dids) 96 323 } 97 324 98 - /// Iterate over DIDs in the rbc index for `collection`, starting after `cursor`. 325 + /// Iterate over collections in the cbr index for `did`, starting after `cursor`. 99 326 /// 100 - /// Returns at most `limit` DIDs. 327 + /// Returns at most `limit` NSIDs. 101 328 /// 102 329 /// TODO: we can fjall range to the collection's next-after-max (might even be 103 330 /// exposed now?) or maybe use prefix + seek for the start? ··· 137 364 } 138 365 Ok(cols) 139 366 } 140 - 141 - pub fn sync_repo(db: &DbRef, did: Did<'_>, snapshot: RepoSnapshot) -> Result<()> { 142 - // TODO 143 - Ok(()) 144 - }
+15 -1
src/storage/mod.rs
··· 32 32 33 33 /// Open (or create) the fjall database at `path` and return a shared handle. 34 34 pub fn open(path: &Path) -> Result<DbRef> { 35 - let database = fjall::Database::builder(path).open()?; 35 + open_inner(path, false) 36 + } 37 + 38 + /// Open a temporary database that deletes itself on drop. For tests only. 39 + #[cfg(test)] 40 + pub(crate) fn open_temporary() -> Result<DbRef> { 41 + use std::sync::atomic::{AtomicU64, Ordering}; 42 + static COUNTER: AtomicU64 = AtomicU64::new(0); 43 + let n = COUNTER.fetch_add(1, Ordering::Relaxed); 44 + let path = std::env::temp_dir().join(format!("lightrail-test-{}-{}", std::process::id(), n)); 45 + open_inner(&path, true) 46 + } 47 + 48 + fn open_inner(path: &Path, temporary: bool) -> Result<DbRef> { 49 + let database = fjall::Database::builder(path).temporary(temporary).open()?; 36 50 37 51 let rbc = database.keyspace("rbc", fjall::KeyspaceCreateOptions::default)?; 38 52 let cbr = database.keyspace("cbr", fjall::KeyspaceCreateOptions::default)?;
+1 -3
src/sync/resync/mod.rs
··· 87 87 Err(e) => return Err(crate::error::Error::Other(e.to_string())), 88 88 }; 89 89 90 - for col in snapshot.collections { 91 - crate::storage::index::insert(db, did.clone(), col)?; 92 - } 90 + crate::storage::index::sync_collections(db, did, &snapshot.collections)?; 93 91 Ok(()) 94 92 } 95 93