Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

first step toward rollups

hourly truncated cursor

phil 3bf1f2b7 24865745

+66 -1
+2
ufos/src/db_types.rs
··· 42 42 DecodeTooManyBytes(usize), 43 43 #[error("expected exclusive bound from lsm_tree (likely bug)")] 44 44 BadRangeBound, 45 + #[error("expected an hourly-truncated u64, found remainder: {0}")] 46 + InvalidHourlyTruncated(u64) 45 47 } 46 48 47 49 fn bincode_conf() -> impl Config {
+13
ufos/src/storage_fjall.rs
··· 195 195 queues: PartitionHandle, 196 196 } 197 197 198 + impl FjallWriter { 199 + pub fn step_rollup(&mut self) -> Result<(), StorageError> { 200 + let mut batch = self.keyspace.batch(); 201 + 202 + // timelies 203 + // trim records 204 + // delete accounts 205 + 206 + batch.commit()?; 207 + Ok(()) 208 + } 209 + } 210 + 198 211 impl StoreWriter for FjallWriter { 199 212 fn insert_batch(&mut self, event_batch: EventBatch) -> Result<(), StorageError> { 200 213 let mut batch = self.keyspace.batch();
+51 -1
ufos/src/store_types.rs
··· 426 426 } 427 427 } 428 428 429 + 430 + const HOUR_IN_MICROS: u64 = 1_000_000 * 3600; 431 + #[derive(Debug, Copy, Clone, PartialEq, PartialOrd)] 432 + pub struct HourTrucatedCursor(u64); 433 + impl HourTrucatedCursor { 434 + fn truncate(raw: u64) -> u64 { 435 + let hours_ts = raw / HOUR_IN_MICROS; 436 + let truncated = hours_ts * HOUR_IN_MICROS; 437 + truncated 438 + } 439 + pub fn try_from_raw_u64(time_us: u64) -> Result<Self, EncodingError> { 440 + let rem = time_us % HOUR_IN_MICROS; 441 + if rem != 0 { 442 + return Err(EncodingError::InvalidHourlyTruncated(rem)) 443 + } 444 + Ok(Self(time_us)) 445 + } 446 + pub fn truncate_cursor(cursor: Cursor) -> Self { 447 + let raw = cursor.to_raw_u64(); 448 + let truncated = Self::truncate(raw); 449 + Self(truncated) 450 + } 451 + } 452 + impl From<HourTrucatedCursor> for Cursor { 453 + fn from(hour_truncated: HourTrucatedCursor) -> Self { 454 + Cursor::from_raw_u64(hour_truncated.0) 455 + } 456 + } 457 + 458 + 429 459 #[cfg(test)] 430 460 mod test { 431 - use super::{ByCollectionKey, ByCollectionValue, Cursor, Did, EncodingError, Nsid, RecordKey}; 461 + use super::{ByCollectionKey, ByCollectionValue, Cursor, Did, EncodingError, Nsid, RecordKey, HourTrucatedCursor, HOUR_IN_MICROS}; 432 462 use crate::db_types::DbBytes; 433 463 434 464 #[test] ··· 462 492 assert_eq!(bytes_consumed, serialized.len()); 463 493 464 494 Ok(()) 495 + } 496 + 497 + #[test] 498 + fn test_hour_truncated_cursor() { 499 + let us = Cursor::from_raw_u64(1_743_778_483_483_895); 500 + let hr = HourTrucatedCursor::truncate_cursor(us); 501 + let back: Cursor = hr.into(); 502 + assert!(back < us); 503 + let diff = us.to_raw_u64() - back.to_raw_u64(); 504 + assert!(diff < HOUR_IN_MICROS); 505 + } 506 + 507 + #[test] 508 + fn test_hour_truncated_cursor_already_truncated() { 509 + let us = Cursor::from_raw_u64(1_743_775_200_000_000); 510 + let hr = HourTrucatedCursor::truncate_cursor(us); 511 + let back: Cursor = hr.into(); 512 + assert_eq!(back, us); 513 + let diff = us.to_raw_u64() - back.to_raw_u64(); 514 + assert_eq!(diff, 0); 465 515 } 466 516 }