Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor iter setup to be bounds-focused

might be possible to go a step further but this is ok for now.

phil 31ac134e 6f17e4a5

+119 -78
+2
ufos/src/db_types.rs
··· 48 48 InvalidTruncated(u64, u64), 49 49 } 50 50 51 + pub type EncodingResult<T> = Result<T, EncodingError>; 52 + 51 53 fn bincode_conf() -> impl Config { 52 54 standard() 53 55 .with_big_endian()
+57 -64
ufos/src/storage_fjall.rs
··· 9 9 NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, 10 10 RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey, 11 11 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, 12 - WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, 12 + WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, 13 13 }; 14 14 use crate::{ 15 15 CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord, 16 16 }; 17 17 use async_trait::async_trait; 18 - use fjall::Snapshot; 19 - use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle}; 18 + use fjall::{ 19 + Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot, 20 + }; 20 21 use jetstream::events::Cursor; 21 22 use std::collections::{HashMap, HashSet}; 23 + use std::iter::Peekable; 22 24 use std::ops::Bound; 23 25 use std::path::Path; 24 26 use std::sync::{ ··· 328 330 } 329 331 } 330 332 333 + type GetCounts = Box<dyn FnOnce() -> StorageResult<CountsValue>>; 334 + type GetByterCounts = StorageResult<(Nsid, GetCounts)>; 335 + type NsidCounter = Box<dyn Iterator<Item = GetByterCounts>>; 336 + fn get_lexi_iter<T: WithCollection + DbBytes + 'static>( 337 + snapshot: &Snapshot, 338 + start: Bound<Vec<u8>>, 339 + end: Bound<Vec<u8>>, 340 + ) -> StorageResult<NsidCounter> { 341 + Ok(Box::new(snapshot.range((start, end)).map(|kv| { 342 + let (k_bytes, v_bytes) = kv?; 343 + let key = db_complete::<T>(&k_bytes)?; 344 + let nsid = key.collection().clone(); 345 + let get_counts: GetCounts = Box::new(move || Ok(db_complete::<CountsValue>(&v_bytes)?)); 346 + Ok((nsid, get_counts)) 347 + }))) 348 + } 349 + 331 350 impl FjallReader { 332 351 fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 333 352 let rollup_cursor = ··· 409 428 CursorBucket::buckets_spanning(lower, upper) 410 429 }; 411 430 412 - let OrderCollectionsBy::Lexi { cursor } = order else { 413 - todo!() 414 - }; 415 - 416 - let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?; // TODO: bubble a *client* error type 431 + let mut iters: Vec<Peekable<NsidCounter>> = Vec::with_capacity(buckets.len()); 417 432 418 - type Item = StorageResult<(Nsid, fjall::Slice)>; 419 - let mut iters = Vec::with_capacity(buckets.len()); 420 - for bucket in &buckets { 421 - match bucket { 422 - CursorBucket::Hour(hour) => { 423 - let prefix = HourlyRollupKey::week_prefix(*hour); 424 - let start = if let Some(ref nsid) = cursor_nsid { 425 - Bound::Excluded(HourlyRollupKey::new(*hour, &nsid.clone()).to_db_bytes()?) 426 - } else { 427 - Bound::Included(HourlyRollupKey::from_prefix_to_db_bytes(&prefix)?) 433 + match order { 434 + OrderCollectionsBy::Lexi { cursor } => { 435 + let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?; 436 + for bucket in &buckets { 437 + let it: NsidCounter = match bucket { 438 + CursorBucket::Hour(t) => { 439 + let start = cursor_nsid 440 + .as_ref() 441 + .map(|nsid| HourlyRollupKey::after_nsid(*t, nsid)) 442 + .unwrap_or_else(|| HourlyRollupKey::start(*t))?; 443 + let end = HourlyRollupKey::end(*t)?; 444 + get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)? 445 + } 446 + CursorBucket::Week(t) => { 447 + let start = cursor_nsid 448 + .as_ref() 449 + .map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid)) 450 + .unwrap_or_else(|| WeeklyRollupKey::start(*t))?; 451 + let end = WeeklyRollupKey::end(*t)?; 452 + get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)? 453 + } 454 + CursorBucket::AllTime => { 455 + let start = cursor_nsid 456 + .as_ref() 457 + .map(AllTimeRollupKey::after_nsid) 458 + .unwrap_or_else(AllTimeRollupKey::start)?; 459 + let end = AllTimeRollupKey::end()?; 460 + get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)? 461 + } 428 462 }; 429 - let end = Bound::Excluded(HourlyRollupKey::prefix_range_end(&prefix)?); 430 - let it = snapshot.range((start, end)).map(|kv| match kv { 431 - Ok((k_bytes, v_bytes)) => db_complete::<HourlyRollupKey>(&k_bytes) 432 - .map(|key| (key.collection().clone(), v_bytes)) 433 - .map_err(|e| e.into()), 434 - Err(e) => Err(e.into()), // lsm-tree error into fjall error 435 - }); 436 - let boxed: Box<dyn Iterator<Item = Item>> = Box::new(it); 437 - iters.push(boxed.peekable()); 438 - } 439 - CursorBucket::Week(week) => { 440 - let prefix = WeeklyRollupKey::week_prefix(*week); 441 - let start = if let Some(ref nsid) = cursor_nsid { 442 - Bound::Excluded(WeeklyRollupKey::new(*week, &nsid.clone()).to_db_bytes()?) 443 - } else { 444 - Bound::Included(WeeklyRollupKey::from_prefix_to_db_bytes(&prefix)?) 445 - }; 446 - let end = Bound::Excluded(WeeklyRollupKey::prefix_range_end(&prefix)?); 447 - let it = snapshot.range((start, end)).map(|kv| match kv { 448 - Ok((k_bytes, v_bytes)) => db_complete::<WeeklyRollupKey>(&k_bytes) 449 - .map(|key| (key.collection().clone(), v_bytes)) 450 - .map_err(|e| e.into()), 451 - Err(e) => Err(e.into()), // lsm-tree error into fjall error 452 - }); 453 - let boxed: Box<dyn Iterator<Item = Item>> = Box::new(it); 454 - iters.push(boxed.peekable()); 455 - } 456 - CursorBucket::AllTime => { 457 - let prefix = Default::default(); 458 - let start = if let Some(ref nsid) = cursor_nsid { 459 - Bound::Excluded(AllTimeRollupKey::new(nsid).to_db_bytes()?) 460 - } else { 461 - Bound::Included(AllTimeRollupKey::from_prefix_to_db_bytes(&prefix)?) 462 - }; 463 - let end = Bound::Excluded(AllTimeRollupKey::prefix_range_end(&prefix)?); 464 - let it = snapshot.range((start, end)).map(|kv| match kv { 465 - Ok((k_bytes, v_bytes)) => db_complete::<AllTimeRollupKey>(&k_bytes) 466 - .map(|key| (key.collection().clone(), v_bytes)) 467 - .map_err(|e| e.into()), 468 - Err(e) => Err(e.into()), // lsm-tree error into fjall error 469 - }); 470 - let boxed: Box<dyn Iterator<Item = Item>> = Box::new(it); 471 - iters.push(boxed.peekable()); 463 + iters.push(it.peekable()); 472 464 } 473 465 } 466 + OrderCollectionsBy::RecordsCreated => todo!(), 467 + OrderCollectionsBy::DidsEstimate => todo!(), 474 468 } 475 469 476 470 let mut out = Vec::new(); ··· 498 492 let mut merged = CountsValue::default(); 499 493 for iter in &mut iters { 500 494 // unwrap: potential fjall error was already checked & bailed over when peeking in the first loop 501 - if let Some(Ok((_, count_bytes))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid) 502 - { 503 - let counts = db_complete::<CountsValue>(&count_bytes)?; 495 + if let Some(Ok((_, get_counts))) = iter.next_if(|v| v.as_ref().unwrap().0 == nsid) { 496 + let counts = get_counts()?; 504 497 merged.merge(&counts); 505 498 } 506 499 }
+1 -1
ufos/src/storage_mem.rs
··· 10 10 JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey, 11 11 NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, 12 12 RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretPrefix, TakeoffKey, 13 - TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey, 13 + TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey, WithCollection, 14 14 }; 15 15 use crate::{ 16 16 CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord,
+59 -13
ufos/src/store_types.rs
··· 1 1 use crate::db_types::{ 2 - DbBytes, DbConcat, DbStaticStr, EncodingError, SerdeBytes, StaticStr, UseBincodePlz, 2 + DbBytes, DbConcat, DbStaticStr, EncodingError, EncodingResult, SerdeBytes, StaticStr, 3 + UseBincodePlz, 3 4 }; 4 5 use crate::{Cursor, Did, Nsid, PutAction, RecordKey, UFOsCommit}; 5 6 use bincode::{Decode, Encode}; 6 7 use cardinality_estimator_safe::Sketch; 7 - use std::ops::Range; 8 + use std::ops::{Bound, Range}; 8 9 9 10 macro_rules! static_str { 10 11 ($prefix:expr, $name:ident) => { ··· 62 63 let s = std::str::from_utf8(bytes)?.to_string(); 63 64 Ok((Self(s), bytes.len())) 64 65 } 66 + } 67 + 68 + pub trait WithCollection { 69 + fn collection(&self) -> &Nsid; 65 70 } 66 71 67 72 pub type NsidRecordFeedKey = DbConcat<Nsid, Cursor>; ··· 186 191 pub fn cursor(&self) -> Cursor { 187 192 self.prefix.suffix 188 193 } 189 - pub fn collection(&self) -> &Nsid { 194 + } 195 + impl WithCollection for LiveCountsKey { 196 + fn collection(&self) -> &Nsid { 190 197 &self.suffix 191 198 } 192 199 } ··· 319 326 nsid.clone(), 320 327 ) 321 328 } 322 - pub fn week_prefix(cursor: HourTruncatedCursor) -> HourlyRollupKeyHourPrefix { 323 - HourlyRollupKeyHourPrefix::from_pair(Default::default(), cursor) 324 - } 325 329 pub fn cursor(&self) -> HourTruncatedCursor { 326 330 self.prefix.suffix 327 331 } 328 - pub fn collection(&self) -> &Nsid { 332 + pub fn start(hour: HourTruncatedCursor) -> EncodingResult<Bound<Vec<u8>>> { 333 + let prefix = HourlyRollupKeyHourPrefix::from_pair(Default::default(), hour); 334 + let prefix_bytes = Self::from_prefix_to_db_bytes(&prefix)?; 335 + Ok(Bound::Included(prefix_bytes)) 336 + } 337 + pub fn after_nsid(hour: HourTruncatedCursor, nsid: &Nsid) -> EncodingResult<Bound<Vec<u8>>> { 338 + Ok(Bound::Excluded(Self::new(hour, nsid).to_db_bytes()?)) 339 + } 340 + pub fn end(hour: HourTruncatedCursor) -> EncodingResult<Bound<Vec<u8>>> { 341 + let prefix = HourlyRollupKeyHourPrefix::from_pair(Default::default(), hour); 342 + Ok(Bound::Excluded(Self::prefix_range_end(&prefix)?)) 343 + } 344 + } 345 + impl WithCollection for HourlyRollupKey { 346 + fn collection(&self) -> &Nsid { 329 347 &self.suffix 330 348 } 331 349 } ··· 348 366 nsid.clone(), 349 367 ) 350 368 } 351 - pub fn week_prefix(cursor: WeekTruncatedCursor) -> WeeklyRollupKeyWeekPrefix { 352 - WeeklyRollupKeyWeekPrefix::from_pair(Default::default(), cursor) 353 - } 354 369 pub fn cursor(&self) -> WeekTruncatedCursor { 355 370 self.prefix.suffix 356 371 } 357 - pub fn collection(&self) -> &Nsid { 372 + pub fn start(hour: WeekTruncatedCursor) -> EncodingResult<Bound<Vec<u8>>> { 373 + let prefix = WeeklyRollupKeyWeekPrefix::from_pair(Default::default(), hour); 374 + let prefix_bytes = Self::from_prefix_to_db_bytes(&prefix)?; 375 + Ok(Bound::Included(prefix_bytes)) 376 + } 377 + pub fn after_nsid(hour: WeekTruncatedCursor, nsid: &Nsid) -> EncodingResult<Bound<Vec<u8>>> { 378 + Ok(Bound::Excluded(Self::new(hour, nsid).to_db_bytes()?)) 379 + } 380 + pub fn end(hour: WeekTruncatedCursor) -> EncodingResult<Bound<Vec<u8>>> { 381 + let prefix = WeeklyRollupKeyWeekPrefix::from_pair(Default::default(), hour); 382 + Ok(Bound::Excluded(Self::prefix_range_end(&prefix)?)) 383 + } 384 + } 385 + impl WithCollection for WeeklyRollupKey { 386 + fn collection(&self) -> &Nsid { 358 387 &self.suffix 359 388 } 360 389 } ··· 373 402 pub fn new(nsid: &Nsid) -> Self { 374 403 Self::from_pair(Default::default(), nsid.clone()) 375 404 } 376 - pub fn collection(&self) -> &Nsid { 405 + pub fn start() -> EncodingResult<Bound<Vec<u8>>> { 406 + Ok(Bound::Included(Self::from_prefix_to_db_bytes( 407 + &Default::default(), 408 + )?)) 409 + } 410 + pub fn after_nsid(nsid: &Nsid) -> EncodingResult<Bound<Vec<u8>>> { 411 + Ok(Bound::Excluded(Self::new(nsid).to_db_bytes()?)) 412 + } 413 + pub fn end() -> EncodingResult<Bound<Vec<u8>>> { 414 + Ok(Bound::Excluded( 415 + Self::prefix_range_end(&Default::default())?, 416 + )) 417 + } 418 + } 419 + impl WithCollection for AllTimeRollupKey { 420 + fn collection(&self) -> &Nsid { 377 421 &self.suffix 378 422 } 379 423 } ··· 393 437 pub fn count(&self) -> u64 { 394 438 self.suffix.prefix.0 395 439 } 396 - pub fn collection(&self) -> &Nsid { 440 + } 441 + impl<P: StaticStr> WithCollection for AllTimeRankRecordsKey<P> { 442 + fn collection(&self) -> &Nsid { 397 443 &self.suffix.suffix 398 444 } 399 445 }