lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

test resync queue interface

phil 049236f0 9f90b9e3

+181 -6
+41
src/storage/keys.rs
··· 331 331 assert_eq!(cbr_parse_collection(&key, prefix_len), Some(col)); 332 332 } 333 333 334 + // --- rbc_suffix --- 335 + 336 + #[test] 337 + fn rbc_suffix_concatenates_to_full_key() { 338 + let col = nsid("app.bsky.feed.post"); 339 + let d = did("did:web:example.com"); 340 + let mut assembled = rbc_prefix(col.clone()); 341 + assembled.extend_from_slice(&rbc_suffix(d.clone())); 342 + assert_eq!(assembled, rbc(col, d)); 343 + } 344 + 334 345 // --- resync_queue --- 335 346 336 347 #[test] ··· 365 376 let (parsed_ts, parsed_did) = resync_queue_parse(&key).unwrap(); 366 377 assert_eq!(parsed_ts, ts); 367 378 assert_eq!(parsed_did, d); 379 + } 380 + 381 + #[test] 382 + fn resync_queue_prefix_all_is_prefix_of_any_entry() { 383 + let prefix_all = resync_queue_prefix_all(); 384 + let key = resync_queue(100, did("did:web:example.com")); 385 + assert!(key.starts_with(&prefix_all)); 386 + } 387 + 388 + #[test] 389 + fn resync_queue_ts_midfix_upper_bound_excludes_entries_at_that_ts() { 390 + // An entry at ts=100 should sort AFTER the midfix for ts=100, 391 + // so it falls outside an exclusive upper bound of ts_midfix(100). 392 + let prefix_all = resync_queue_prefix_all(); 393 + let entry = resync_queue(100, did("did:web:example.com")); 394 + let mut upper = prefix_all.clone(); 395 + upper.extend_from_slice(&resync_queue_ts_midfix(100)); 396 + // entry at ts=100 sorts >= upper bound → excluded from ..upper range 397 + assert!(entry >= upper); 398 + } 399 + 400 + #[test] 401 + fn resync_queue_ts_midfix_upper_bound_includes_earlier_ts() { 402 + // An entry at ts=99 should sort strictly before the midfix for ts=100. 403 + let prefix_all = resync_queue_prefix_all(); 404 + let entry = resync_queue(99, did("did:web:example.com")); 405 + let mut upper = prefix_all.clone(); 406 + upper.extend_from_slice(&resync_queue_ts_midfix(100)); 407 + // entry at ts=99 sorts < upper bound → included in ..upper range 408 + assert!(entry < upper); 368 409 } 369 410 370 411 #[test]
+140 -6
src/storage/resync.rs
··· 96 96 97 97 let Some(guard) = db 98 98 .ks 99 - .range(prefixed_range(prefix, lower_suffix..upper_suffix)) 99 + .range(prefixed_range(&prefix, lower_suffix..upper_suffix)) 100 100 .next() 101 101 else { 102 102 return Ok(None); 103 103 }; 104 104 105 105 let (key_slice, val_slice) = guard.into_inner()?; 106 - let key_bytes = key_slice.to_vec(); 107 - let (ts, did) = keys::resync_queue_parse(&key_bytes)?; 106 + let key_bytes = key_slice.as_ref(); 107 + let (ts, did) = keys::resync_queue_parse(key_bytes)?; 108 108 assert!(ts < now); 109 - let key_str = String::from_utf8_lossy(&key_bytes).into_owned(); 109 + let key_str = String::from_utf8_lossy(key_bytes).into_owned(); 110 110 let item = decode(val_slice.as_ref(), &key_str, did)?; 111 - db.ks.remove(&key_bytes)?; 112 - Ok(Some((item, key_bytes))) 111 + db.ks.remove(key_bytes)?; 112 + let next_since = key_bytes 113 + .get(prefix.len()..) 114 + .expect("a resync queue key that parsed to have the resync queue prefix"); 115 + Ok(Some((item, next_since.to_vec()))) 116 + } 117 + 118 + #[cfg(test)] 119 + mod tests { 120 + use super::*; 121 + use crate::storage::open_temporary; 122 + 123 + fn did(s: &str) -> Did<'static> { 124 + Did::new_owned(s).unwrap() 125 + } 126 + 127 + fn item(did_str: &str, retry_count: u16, reason: &str, cbor: &[u8]) -> ResyncItem { 128 + ResyncItem { 129 + did: did(did_str), 130 + retry_count, 131 + retry_reason: reason.to_owned(), 132 + commit_cbor: cbor.to_vec(), 133 + } 134 + } 135 + 136 + // --- encode / decode --- 137 + 138 + #[test] 139 + fn encode_decode_roundtrips() { 140 + let original = item("did:web:example.com", 3, "detected gap", &[0xAB, 0xCD]); 141 + let bytes = encode(&original); 142 + let decoded = decode(&bytes, "test-key", did("did:web:example.com")).unwrap(); 143 + assert_eq!(decoded.retry_count, 3); 144 + assert_eq!(decoded.retry_reason, "detected gap"); 145 + assert_eq!(decoded.commit_cbor, vec![0xAB, 0xCD]); 146 + } 147 + 148 + #[test] 149 + fn encode_decode_empty_commit_cbor() { 150 + let original = item("did:web:example.com", 0, "first attempt", &[]); 151 + let bytes = encode(&original); 152 + let decoded = decode(&bytes, "test-key", did("did:web:example.com")).unwrap(); 153 + assert_eq!(decoded.retry_count, 0); 154 + assert_eq!(decoded.retry_reason, "first attempt"); 155 + assert!(decoded.commit_cbor.is_empty()); 156 + } 157 + 158 + #[test] 159 + fn decode_rejects_truncated_header() { 160 + assert!(decode(&[0, 1, 2], "k", did("did:web:example.com")).is_err()); 161 + } 162 + 163 + // --- enqueue / dequeue_ready --- 164 + 165 + #[test] 166 + fn dequeue_returns_none_on_empty_queue() { 167 + let db = open_temporary().unwrap(); 168 + assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 169 + } 170 + 171 + #[test] 172 + fn enqueue_and_dequeue_basic() { 173 + let db = open_temporary().unwrap(); 174 + enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[1, 2, 3])).unwrap(); 175 + 176 + let (got, _cursor) = dequeue_ready(&db, 101, None).unwrap().unwrap(); 177 + assert_eq!(got.did.as_str(), "did:web:a.com"); 178 + assert_eq!(got.retry_reason, "backfill"); 179 + assert_eq!(got.commit_cbor, vec![1, 2, 3]); 180 + 181 + // Item was removed from the queue. 182 + assert!(dequeue_ready(&db, 101, None).unwrap().is_none()); 183 + } 184 + 185 + #[test] 186 + fn dequeue_excludes_items_at_or_after_now() { 187 + let db = open_temporary().unwrap(); 188 + enqueue(&db, 100, &item("did:web:a.com", 0, "test", &[])).unwrap(); 189 + 190 + // ts=100 is the upper bound (exclusive): not returned. 191 + assert!(dequeue_ready(&db, 100, None).unwrap().is_none()); 192 + // ts=99 is also before the entry: still not returned. 193 + assert!(dequeue_ready(&db, 99, None).unwrap().is_none()); 194 + // ts=101 is strictly after: returned. 195 + assert!(dequeue_ready(&db, 101, None).unwrap().is_some()); 196 + } 197 + 198 + #[test] 199 + fn dequeue_returns_oldest_entry_first() { 200 + let db = open_temporary().unwrap(); 201 + enqueue(&db, 200, &item("did:web:b.com", 0, "later", &[])).unwrap(); 202 + enqueue(&db, 100, &item("did:web:a.com", 0, "earlier", &[])).unwrap(); 203 + 204 + let (first, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 205 + assert_eq!(first.retry_reason, "earlier"); 206 + 207 + let (second, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 208 + assert_eq!(second.retry_reason, "later"); 209 + } 210 + 211 + #[test] 212 + fn since_cursor_skips_entries_before_cursor_position() { 213 + let db = open_temporary().unwrap(); 214 + enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap(); 215 + enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap(); 216 + 217 + // Dequeue the first item and hold the cursor. 218 + let (first, cursor) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 219 + assert_eq!(first.retry_reason, "first"); 220 + 221 + // A late-arriving entry enqueued at ts=5 (before the cursor position). 222 + enqueue(&db, 5, &item("did:web:late.com", 0, "late", &[])).unwrap(); 223 + 224 + // Without cursor, the late entry at ts=5 would be returned first. 225 + // With cursor, everything at or before ts=10/a.com is skipped; 226 + // ts=20 is the next result. 227 + let (second, _) = dequeue_ready(&db, 9999, Some(cursor)).unwrap().unwrap(); 228 + assert_eq!(second.retry_reason, "second"); 229 + } 230 + 231 + #[test] 232 + fn consecutive_dequeues_drain_in_order() { 233 + let db = open_temporary().unwrap(); 234 + enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap(); 235 + enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap(); 236 + enqueue(&db, 30, &item("did:web:c.com", 0, "third", &[])).unwrap(); 237 + 238 + let (a, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 239 + let (b, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 240 + let (c, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 241 + assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 242 + 243 + assert_eq!(a.retry_reason, "first"); 244 + assert_eq!(b.retry_reason, "second"); 245 + assert_eq!(c.retry_reason, "third"); 246 + } 113 247 }