Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #17 from at-microcosm/ufos-popularity-contest

UFOs rank indices

authored by

phil and committed by
GitHub
84f63b52 ba4fe675

+387 -109
+24
ufos/src/lib.rs
··· 249 249 } 250 250 } 251 251 252 + #[derive(Debug)] 253 + pub struct QueryPeriod { 254 + from: Option<Cursor>, 255 + until: Option<Cursor>, 256 + } 257 + impl QueryPeriod { 258 + pub fn all_time() -> Self { 259 + QueryPeriod { 260 + from: None, 261 + until: None, 262 + } 263 + } 264 + pub fn is_all_time(&self) -> bool { 265 + self.from.is_none() && self.until.is_none() 266 + } 267 + } 268 + 269 + #[derive(Debug, Serialize, JsonSchema)] 270 + pub struct Count { 271 + thing: String, 272 + records: u64, 273 + dids_estimate: u64, 274 + } 275 + 252 276 #[cfg(test)] 253 277 mod tests { 254 278 use super::*;
+33 -1
ufos/src/server.rs
··· 1 1 use crate::index_html::INDEX_HTML; 2 2 use crate::storage::StoreReader; 3 - use crate::{ConsumerInfo, Nsid, TopCollections, UFOsRecord}; 3 + use crate::{ConsumerInfo, Count, Nsid, QueryPeriod, TopCollections, UFOsRecord}; 4 4 use dropshot::endpoint; 5 5 use dropshot::ApiDescription; 6 6 use dropshot::Body; ··· 213 213 ok_cors(seen_by_collection) 214 214 } 215 215 216 + /// Get top collections by record count 217 + #[endpoint { 218 + method = GET, 219 + path = "/collections/by-count" 220 + }] 221 + async fn get_top_collections_by_count(ctx: RequestContext<Context>) -> OkCorsResponse<Vec<Count>> { 222 + let Context { storage, .. } = ctx.context(); 223 + let collections = storage 224 + .get_top_collections_by_count(100, QueryPeriod::all_time()) 225 + .await 226 + .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 227 + 228 + ok_cors(collections) 229 + } 230 + 231 + /// Get top collections by estimated unique DIDs 232 + #[endpoint { 233 + method = GET, 234 + path = "/collections/by-dids" 235 + }] 236 + async fn get_top_collections_by_dids(ctx: RequestContext<Context>) -> OkCorsResponse<Vec<Count>> { 237 + let Context { storage, .. } = ctx.context(); 238 + let collections = storage 239 + .get_top_collections_by_dids(100, QueryPeriod::all_time()) 240 + .await 241 + .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 242 + 243 + ok_cors(collections) 244 + } 245 + 216 246 /// Get top collections 217 247 /// 218 248 /// The format of this API response will be changing soon. ··· 244 274 api.register(get_meta_info).unwrap(); 245 275 api.register(get_records_by_collections).unwrap(); 246 276 api.register(get_records_total_seen).unwrap(); 277 + api.register(get_top_collections_by_count).unwrap(); 278 + api.register(get_top_collections_by_dids).unwrap(); 247 279 api.register(get_top_collections).unwrap(); 248 280 249 281 let context = Context {
+16 -1
ufos/src/storage.rs
··· 1 - use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, TopCollections, UFOsRecord}; 1 + use crate::{ 2 + error::StorageError, ConsumerInfo, Count, Cursor, EventBatch, QueryPeriod, TopCollections, 3 + UFOsRecord, 4 + }; 2 5 use async_trait::async_trait; 3 6 use jetstream::exports::{Did, Nsid}; 4 7 use std::collections::HashSet; ··· 66 69 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value>; 67 70 68 71 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 72 + 73 + async fn get_top_collections_by_count( 74 + &self, 75 + limit: usize, 76 + period: QueryPeriod, 77 + ) -> StorageResult<Vec<Count>>; 78 + 79 + async fn get_top_collections_by_dids( 80 + &self, 81 + limit: usize, 82 + period: QueryPeriod, 83 + ) -> StorageResult<Vec<Count>>; 69 84 70 85 async fn get_top_collections(&self) -> StorageResult<TopCollections>; 71 86
+188 -29
ufos/src/storage_fjall.rs
··· 2 2 use crate::error::StorageError; 3 3 use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 4 4 use crate::store_types::{ 5 - AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 6 - HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue, 7 - JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey, 8 - NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, 9 - RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue, 10 - TrimCollectionCursorKey, WeekTruncatedCursor, WeeklyRollupKey, 5 + AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, 6 + DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, HourlyRecordsKey, HourlyRollupKey, 7 + JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, 8 + LiveCountsKey, NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, 9 + RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, 10 + TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, WeeklyDidsKey, WeeklyRecordsKey, 11 + WeeklyRollupKey, 11 12 }; 12 - use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord}; 13 + use crate::{ 14 + CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections, 15 + UFOsRecord, 16 + }; 13 17 use async_trait::async_trait; 14 18 use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle}; 15 19 use jetstream::events::Cursor; ··· 70 74 /// - key: "live_counts" || u64 || nullstr (js_cursor, nsid) 71 75 /// - val: u64 || HLL (count (not cursor), estimator) 72 76 /// 77 + /// 73 78 /// - Hourly total record counts and dids estimate per collection 74 79 /// - key: "hourly_counts" || u64 || nullstr (hour, nsid) 75 80 /// - val: u64 || HLL (count (not cursor), estimator) 76 81 /// 82 + /// - Hourly record count ranking 83 + /// - key: "hourly_rank_records" || u64 || u64 || nullstr (hour, count, nsid) 84 + /// - val: [empty] 85 + /// 86 + /// - Hourly did estimate ranking 87 + /// - key: "hourly_rank_dids" || u64 || u64 || nullstr (hour, dids estimate, nsid) 88 + /// - val: [empty] 89 + /// 90 + /// 77 91 /// - Weekly total record counts and dids estimate per collection 78 - /// - key: "weekly_counts" || u64 || nullstr (hour, nsid) 92 + /// - key: "weekly_counts" || u64 || nullstr (week, nsid) 79 93 /// - val: u64 || HLL (count (not cursor), estimator) 80 94 /// 95 + /// - Weekly record count ranking 96 + /// - key: "weekly_rank_records" || u64 || u64 || nullstr (week, count, nsid) 97 + /// - val: [empty] 98 + /// 99 + /// - Weekly did estimate ranking 100 + /// - key: "weekly_rank_dids" || u64 || u64 || nullstr (week, dids estimate, nsid) 101 + /// - val: [empty] 102 + /// 103 + /// 81 104 /// - All-time total record counts and dids estimate per collection 82 105 /// - key: "ever_counts" || nullstr (nsid) 83 106 /// - val: u64 || HLL (count (not cursor), estimator) 84 107 /// 85 - /// - TODO: sorted indexes for all-times? 108 + /// - All-time total record record count ranking 109 + /// - key: "ever_rank_records" || u64 || nullstr (count, nsid) 110 + /// - val: [empty] 111 + /// 112 + /// - All-time did estimate ranking 113 + /// - key: "ever_rank_dids" || u64 || nullstr (dids estimate, nsid) 114 + /// - val: [empty] 86 115 /// 87 116 /// 88 117 /// Partition: 'queues' ··· 313 342 }) 314 343 } 315 344 345 + fn get_top_collections_by_count( 346 + &self, 347 + limit: usize, 348 + period: QueryPeriod, 349 + ) -> StorageResult<Vec<Count>> { 350 + Ok(if period.is_all_time() { 351 + let snapshot = self.rollups.snapshot(); 352 + let mut out = Vec::with_capacity(limit); 353 + let prefix = AllTimeRecordsKey::from_prefix_to_db_bytes(&Default::default())?; 354 + for kv in snapshot.prefix(prefix).rev().take(limit) { 355 + let (key_bytes, _) = kv?; 356 + let key = db_complete::<AllTimeRecordsKey>(&key_bytes)?; 357 + let rollup_key = AllTimeRollupKey::new(key.collection()); 358 + let db_count_bytes = snapshot.get(rollup_key.to_db_bytes()?)?.expect( 359 + "integrity: all-time rank rollup must have corresponding all-time count rollup", 360 + ); 361 + let db_counts = db_complete::<CountsValue>(&db_count_bytes)?; 362 + assert_eq!(db_counts.records(), key.count()); 363 + out.push(Count { 364 + thing: key.collection().to_string(), 365 + records: db_counts.records(), 366 + dids_estimate: db_counts.dids().estimate() as u64, 367 + }); 368 + } 369 + out 370 + } else { 371 + todo!() 372 + }) 373 + } 374 + 375 + fn get_top_collections_by_dids( 376 + &self, 377 + limit: usize, 378 + period: QueryPeriod, 379 + ) -> StorageResult<Vec<Count>> { 380 + Ok(if period.is_all_time() { 381 + let snapshot = self.rollups.snapshot(); 382 + let mut out = Vec::with_capacity(limit); 383 + let prefix = AllTimeDidsKey::from_prefix_to_db_bytes(&Default::default())?; 384 + for kv in snapshot.prefix(prefix).rev().take(limit) { 385 + let (key_bytes, _) = kv?; 386 + let key = db_complete::<AllTimeDidsKey>(&key_bytes)?; 387 + let rollup_key = AllTimeRollupKey::new(key.collection()); 388 + let db_count_bytes = snapshot.get(rollup_key.to_db_bytes()?)?.expect( 389 + "integrity: all-time rank rollup must have corresponding all-time count rollup", 390 + ); 391 + let db_counts = db_complete::<CountsValue>(&db_count_bytes)?; 392 + assert_eq!(db_counts.dids().estimate() as u64, key.count()); 393 + out.push(Count { 394 + thing: key.collection().to_string(), 395 + records: db_counts.records(), 396 + dids_estimate: db_counts.dids().estimate() as u64, 397 + }); 398 + } 399 + out 400 + } else { 401 + todo!() 402 + }) 403 + } 404 + 316 405 fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 317 406 // TODO: limit nsid traversal depth 318 407 // TODO: limit nsid traversal breadth ··· 454 543 let s = self.clone(); 455 544 tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await? 456 545 } 546 + async fn get_top_collections_by_count( 547 + &self, 548 + limit: usize, 549 + period: QueryPeriod, 550 + ) -> StorageResult<Vec<Count>> { 551 + let s = self.clone(); 552 + tokio::task::spawn_blocking(move || { 553 + FjallReader::get_top_collections_by_count(&s, limit, period) 554 + }) 555 + .await? 556 + } 557 + async fn get_top_collections_by_dids( 558 + &self, 559 + limit: usize, 560 + period: QueryPeriod, 561 + ) -> StorageResult<Vec<Count>> { 562 + let s = self.clone(); 563 + tokio::task::spawn_blocking(move || { 564 + FjallReader::get_top_collections_by_dids(&s, limit, period) 565 + }) 566 + .await? 567 + } 457 568 async fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 458 569 let s = self.clone(); 459 570 tokio::task::spawn_blocking(move || FjallReader::get_top_collections(&s)).await? ··· 572 683 last_cursor = key.cursor(); 573 684 } 574 685 686 + // go through each new rollup thing and merge it with whatever might already be in the db 575 687 for ((nsid, rollup), counts) in counts_by_rollup { 576 - let key_bytes = match rollup { 688 + let rollup_key_bytes = match rollup { 577 689 Rollup::Hourly(hourly_cursor) => { 578 - let k = HourlyRollupKey::new(hourly_cursor, &nsid); 579 - k.to_db_bytes()? 690 + HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()? 580 691 } 581 692 Rollup::Weekly(weekly_cursor) => { 582 - let k = WeeklyRollupKey::new(weekly_cursor, &nsid); 583 - k.to_db_bytes()? 584 - } 585 - Rollup::AllTime => { 586 - let k = AllTimeRollupKey::new(&nsid); 587 - k.to_db_bytes()? 693 + WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()? 588 694 } 695 + Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?, 589 696 }; 590 697 let mut rolled: CountsValue = self 591 698 .rollups 592 - .get(&key_bytes)? 699 + .get(&rollup_key_bytes)? 593 700 .as_deref() 594 701 .map(db_complete::<CountsValue>) 595 702 .transpose()? 596 703 .unwrap_or_default(); 597 704 598 - // try to round-trip before inserting, for funsies 599 - let tripppin = counts.to_db_bytes()?; 600 - let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?; 601 - assert_eq!(n, tripppin.len()); 602 - assert_eq!(counts.prefix, and_back.prefix); 603 - assert_eq!(counts.dids().estimate(), and_back.dids().estimate()); 604 - if counts.records() > 200_000_000_000 { 605 - panic!("COUNTS maybe wtf? {counts:?}") 606 - } 705 + // now that we have values, we can know the exising ranks 706 + let before_records_count = rolled.records(); 707 + let before_dids_estimate = rolled.dids().estimate() as u64; 607 708 709 + // update the rollup 608 710 rolled.merge(&counts); 609 - batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?); 711 + 712 + // replace rank entries 713 + let (old_records, new_records, dids) = match rollup { 714 + Rollup::Hourly(hourly_cursor) => { 715 + let old_records = 716 + HourlyRecordsKey::new(hourly_cursor, before_records_count.into(), &nsid); 717 + let new_records = old_records.with_rank(rolled.records().into()); 718 + let new_estimate = rolled.dids().estimate() as u64; 719 + let dids = if new_estimate == before_dids_estimate { 720 + None 721 + } else { 722 + let old_dids = 723 + HourlyDidsKey::new(hourly_cursor, before_dids_estimate.into(), &nsid); 724 + let new_dids = old_dids.with_rank(new_estimate.into()); 725 + Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?)) 726 + }; 727 + (old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids) 728 + } 729 + Rollup::Weekly(weekly_cursor) => { 730 + let old_records = 731 + WeeklyRecordsKey::new(weekly_cursor, before_records_count.into(), &nsid); 732 + let new_records = old_records.with_rank(rolled.records().into()); 733 + let new_estimate = rolled.dids().estimate() as u64; 734 + let dids = if new_estimate == before_dids_estimate { 735 + None 736 + } else { 737 + let old_dids = 738 + WeeklyDidsKey::new(weekly_cursor, before_dids_estimate.into(), &nsid); 739 + let new_dids = old_dids.with_rank(new_estimate.into()); 740 + Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?)) 741 + }; 742 + (old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids) 743 + } 744 + Rollup::AllTime => { 745 + let old_records = AllTimeRecordsKey::new(before_records_count.into(), &nsid); 746 + let new_records = old_records.with_rank(rolled.records().into()); 747 + let new_estimate = rolled.dids().estimate() as u64; 748 + let dids = if new_estimate == before_dids_estimate { 749 + None 750 + } else { 751 + let old_dids = AllTimeDidsKey::new(before_dids_estimate.into(), &nsid); 752 + let new_dids = old_dids.with_rank(new_estimate.into()); 753 + Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?)) 754 + }; 755 + (old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids) 756 + } 757 + }; 758 + 759 + // replace the ranks 760 + batch.remove(&self.rollups, &old_records); 761 + batch.insert(&self.rollups, &new_records, ""); 762 + if let Some((old_dids, new_dids)) = dids { 763 + batch.remove(&self.rollups, &old_dids); 764 + batch.insert(&self.rollups, &new_dids, ""); 765 + } 766 + 767 + // replace the rollup 768 + batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?); 610 769 } 611 770 612 771 insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
+21 -7
ufos/src/storage_mem.rs
··· 12 12 RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue, 13 13 WeekTruncatedCursor, WeeklyRollupKey, 14 14 }; 15 - use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord}; 15 + use crate::{ 16 + CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections, 17 + UFOsRecord, 18 + }; 16 19 use async_trait::async_trait; 17 20 use jetstream::events::Cursor; 18 21 use lsm_tree::range::prefix_to_range; 19 - use std::collections::BTreeMap; 20 - use std::collections::HashMap; 21 - use std::collections::HashSet; 22 + use std::collections::{BTreeMap, HashMap, HashSet}; 22 23 use std::path::Path; 23 - use std::sync::Mutex; 24 - use std::sync::RwLock; 24 + use std::sync::{Mutex, RwLock}; 25 25 use std::time::SystemTime; 26 26 27 27 const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds ··· 584 584 let s = self.clone(); 585 585 tokio::task::spawn_blocking(move || MemReader::get_consumer_info(&s)).await? 586 586 } 587 - async fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 587 + async fn get_top_collections(&self) -> StorageResult<TopCollections> { 588 588 let s = self.clone(); 589 589 tokio::task::spawn_blocking(move || MemReader::get_top_collections(&s)).await? 590 + } 591 + async fn get_top_collections_by_count( 592 + &self, 593 + _: usize, 594 + _: QueryPeriod, 595 + ) -> StorageResult<Vec<Count>> { 596 + todo!() 597 + } 598 + async fn get_top_collections_by_dids( 599 + &self, 600 + _: usize, 601 + _: QueryPeriod, 602 + ) -> StorageResult<Vec<Count>> { 603 + todo!() 590 604 } 591 605 async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 592 606 let s = self.clone();
+105 -71
ufos/src/store_types.rs
··· 6 6 use cardinality_estimator_safe::CardinalityEstimator; 7 7 use std::ops::Range; 8 8 9 - /// key format: ["js_cursor"] 10 - #[derive(Debug, PartialEq)] 11 - pub struct JetstreamCursorKey {} 12 - impl StaticStr for JetstreamCursorKey { 13 - fn static_str() -> &'static str { 14 - "js_cursor" 15 - } 9 + macro_rules! static_str { 10 + ($prefix:expr, $name:ident) => { 11 + #[derive(Debug, PartialEq)] 12 + pub struct $name {} 13 + impl StaticStr for $name { 14 + fn static_str() -> &'static str { 15 + $prefix 16 + } 17 + } 18 + }; 16 19 } 20 + 21 + // key format: ["js_cursor"] 22 + static_str!("js_cursor", JetstreamCursorKey); 17 23 pub type JetstreamCursorValue = Cursor; 18 24 19 - /// key format: ["rollup_cursor"] 20 - #[derive(Debug, PartialEq)] 21 - pub struct NewRollupCursorKey {} 22 - impl StaticStr for NewRollupCursorKey { 23 - fn static_str() -> &'static str { 24 - "rollup_cursor" 25 - } 26 - } 25 + // key format: ["rollup_cursor"] 26 + static_str!("rollup_cursor", NewRollupCursorKey); 27 27 // pub type NewRollupCursorKey = DbStaticStr<_NewRollupCursorKey>; 28 28 /// value format: [rollup_cursor(Cursor)|collection(Nsid)] 29 29 pub type NewRollupCursorValue = Cursor; 30 30 31 - #[derive(Debug, PartialEq)] 32 - pub struct _TrimCollectionStaticStr {} 33 - impl StaticStr for _TrimCollectionStaticStr { 34 - fn static_str() -> &'static str { 35 - "trim_cursor" 36 - } 37 - } 31 + static_str!("trim_cursor", _TrimCollectionStaticStr); 38 32 type TrimCollectionCursorPrefix = DbStaticStr<_TrimCollectionStaticStr>; 39 33 pub type TrimCollectionCursorKey = DbConcat<TrimCollectionCursorPrefix, Nsid>; 40 34 impl TrimCollectionCursorKey { ··· 44 38 } 45 39 pub type TrimCollectionCursorVal = Cursor; 46 40 47 - /// key format: ["js_endpoint"] 48 - #[derive(Debug, PartialEq)] 49 - pub struct TakeoffKey {} 50 - impl StaticStr for TakeoffKey { 51 - fn static_str() -> &'static str { 52 - "takeoff" 53 - } 54 - } 41 + // key format: ["js_endpoint"] 42 + static_str!("takeoff", TakeoffKey); 55 43 pub type TakeoffValue = Cursor; 56 44 57 - /// key format: ["js_endpoint"] 58 - #[derive(Debug, PartialEq)] 59 - pub struct JetstreamEndpointKey {} 60 - impl StaticStr for JetstreamEndpointKey { 61 - fn static_str() -> &'static str { 62 - "js_endpoint" 63 - } 64 - } 45 + // key format: ["js_endpoint"] 46 + static_str!("js_endpoint", JetstreamEndpointKey); 65 47 #[derive(Debug, PartialEq)] 66 48 pub struct JetstreamEndpointValue(pub String); 67 49 /// String wrapper for jetstream endpoint value ··· 187 169 } 188 170 } 189 171 190 - #[derive(Debug, PartialEq)] 191 - pub struct _LiveRecordsStaticStr {} 192 - impl StaticStr for _LiveRecordsStaticStr { 193 - fn static_str() -> &'static str { 194 - "live_counts" 195 - } 196 - } 172 + static_str!("live_counts", _LiveRecordsStaticStr); 197 173 198 174 type LiveCountsStaticPrefix = DbStaticStr<_LiveRecordsStaticStr>; 199 175 type LiveCountsCursorPrefix = DbConcat<LiveCountsStaticPrefix, Cursor>; ··· 273 249 } 274 250 } 275 251 276 - #[derive(Debug, PartialEq)] 277 - pub struct _DeleteAccountStaticStr {} 278 - impl StaticStr for _DeleteAccountStaticStr { 279 - fn static_str() -> &'static str { 280 - "delete_acount" 281 - } 282 - } 252 + static_str!("delete_acount", _DeleteAccountStaticStr); 283 253 pub type DeleteAccountStaticPrefix = DbStaticStr<_DeleteAccountStaticStr>; 284 254 pub type DeleteAccountQueueKey = DbConcat<DeleteAccountStaticPrefix, Cursor>; 285 255 impl DeleteAccountQueueKey { ··· 289 259 } 290 260 pub type DeleteAccountQueueVal = Did; 291 261 292 - #[derive(Debug, PartialEq)] 293 - pub struct _HourlyRollupStaticStr {} 294 - impl StaticStr for _HourlyRollupStaticStr { 295 - fn static_str() -> &'static str { 296 - "hourly_counts" 262 + /// big-endian encoded u64 for LSM prefix-fiendly key 263 + #[derive(Debug, Clone, Copy, PartialEq)] 264 + pub struct KeyRank(u64); 265 + impl DbBytes for KeyRank { 266 + fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 267 + Ok(self.0.to_be_bytes().to_vec()) 268 + } 269 + fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 270 + if bytes.len() < 8 { 271 + return Err(EncodingError::DecodeNotEnoughBytes); 272 + } 273 + let bytes8 = TryInto::<[u8; 8]>::try_into(&bytes[..8])?; 274 + let rank = KeyRank(u64::from_be_bytes(bytes8)); 275 + Ok((rank, 8)) 297 276 } 298 277 } 278 + impl From<u64> for KeyRank { 279 + fn from(n: u64) -> Self { 280 + Self(n) 281 + } 282 + } 283 + impl From<KeyRank> for u64 { 284 + fn from(kr: KeyRank) -> Self { 285 + kr.0 286 + } 287 + } 288 + 289 + pub type BucketedRankRecordsKey<P, C> = 290 + DbConcat<DbConcat<DbStaticStr<P>, C>, DbConcat<KeyRank, Nsid>>; 291 + impl<P, C> BucketedRankRecordsKey<P, C> 292 + where 293 + P: StaticStr + PartialEq + std::fmt::Debug, 294 + C: DbBytes + PartialEq + std::fmt::Debug + Clone, 295 + { 296 + pub fn new(cursor: C, rank: KeyRank, nsid: &Nsid) -> Self { 297 + Self::from_pair( 298 + DbConcat::from_pair(Default::default(), cursor), 299 + DbConcat::from_pair(rank, nsid.clone()), 300 + ) 301 + } 302 + pub fn with_rank(&self, new_rank: KeyRank) -> Self { 303 + Self::new(self.prefix.suffix.clone(), new_rank, &self.suffix.suffix) 304 + } 305 + } 306 + 307 + static_str!("hourly_counts", _HourlyRollupStaticStr); 299 308 pub type HourlyRollupStaticPrefix = DbStaticStr<_HourlyRollupStaticStr>; 300 309 pub type HourlyRollupKey = DbConcat<DbConcat<HourlyRollupStaticPrefix, HourTruncatedCursor>, Nsid>; 301 310 impl HourlyRollupKey { ··· 308 317 } 309 318 pub type HourlyRollupVal = CountsValue; 310 319 311 - #[derive(Debug, PartialEq)] 312 - pub struct _WeeklyRollupStaticStr {} 313 - impl StaticStr for _WeeklyRollupStaticStr { 314 - fn static_str() -> &'static str { 315 - "weekly_counts" 316 - } 317 - } 320 + static_str!("hourly_rank_records", _HourlyRecordsStaticStr); 321 + pub type HourlyRecordsKey = BucketedRankRecordsKey<_HourlyRecordsStaticStr, HourTruncatedCursor>; 322 + 323 + static_str!("hourly_rank_dids", _HourlyDidsStaticStr); 324 + pub type HourlyDidsKey = BucketedRankRecordsKey<_HourlyDidsStaticStr, HourTruncatedCursor>; 325 + 326 + static_str!("weekly_counts", _WeeklyRollupStaticStr); 318 327 pub type WeeklyRollupStaticPrefix = DbStaticStr<_WeeklyRollupStaticStr>; 319 328 pub type WeeklyRollupKey = DbConcat<DbConcat<WeeklyRollupStaticPrefix, WeekTruncatedCursor>, Nsid>; 320 329 impl WeeklyRollupKey { ··· 327 336 } 328 337 pub type WeeklyRollupVal = CountsValue; 329 338 330 - #[derive(Debug, PartialEq)] 331 - pub struct _AllTimeRollupStaticStr {} 332 - impl StaticStr for _AllTimeRollupStaticStr { 333 - fn static_str() -> &'static str { 334 - "ever_counts" 335 - } 336 - } 339 + static_str!("weekly_rank_records", _WeeklyRecordsStaticStr); 340 + pub type WeeklyRecordsKey = BucketedRankRecordsKey<_WeeklyRecordsStaticStr, WeekTruncatedCursor>; 341 + 342 + static_str!("weekly_rank_dids", _WeeklyDidsStaticStr); 343 + pub type WeeklyDidsKey = BucketedRankRecordsKey<_WeeklyDidsStaticStr, WeekTruncatedCursor>; 344 + 345 + static_str!("ever_counts", _AllTimeRollupStaticStr); 337 346 pub type AllTimeRollupStaticPrefix = DbStaticStr<_AllTimeRollupStaticStr>; 338 347 pub type AllTimeRollupKey = DbConcat<AllTimeRollupStaticPrefix, Nsid>; 339 348 impl AllTimeRollupKey { ··· 345 354 } 346 355 } 347 356 pub type AllTimeRollupVal = CountsValue; 357 + 358 + pub type AllTimeRankRecordsKey<P> = DbConcat<DbStaticStr<P>, DbConcat<KeyRank, Nsid>>; 359 + impl<P> AllTimeRankRecordsKey<P> 360 + where 361 + P: StaticStr + PartialEq + std::fmt::Debug, 362 + { 363 + pub fn new(rank: KeyRank, nsid: &Nsid) -> Self { 364 + Self::from_pair(Default::default(), DbConcat::from_pair(rank, nsid.clone())) 365 + } 366 + pub fn with_rank(&self, new_rank: KeyRank) -> Self { 367 + Self::new(new_rank, &self.suffix.suffix) 368 + } 369 + pub fn count(&self) -> u64 { 370 + self.suffix.prefix.0 371 + } 372 + pub fn collection(&self) -> &Nsid { 373 + &self.suffix.suffix 374 + } 375 + } 376 + 377 + static_str!("ever_rank_records", _AllTimeRecordsStaticStr); 378 + pub type AllTimeRecordsKey = AllTimeRankRecordsKey<_AllTimeRecordsStaticStr>; 379 + 380 + static_str!("ever_rank_dids", _AllTimeDidsStaticStr); 381 + pub type AllTimeDidsKey = AllTimeRankRecordsKey<_AllTimeDidsStaticStr>; 348 382 349 383 #[derive(Debug, Copy, Clone, PartialEq, Hash, PartialOrd, Eq)] 350 384 pub struct TruncatedCursor<const MOD: u64>(u64);