Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove TopCollections, fix /records nsid dups

phil 8799d94c c265181b

+124 -579
-71
ufos/src/lib.rs
··· 234 234 }, 235 235 } 236 236 237 - #[derive(Debug, Default, PartialEq, Serialize, JsonSchema)] 238 - pub struct TopCollections { 239 - total_records: u64, 240 - dids_estimate: u64, 241 - nsid_child_segments: HashMap<String, TopCollections>, 242 - } 243 - 244 - // this is not safe from ~DOS 245 - // todo: remove this and just iterate the all-time rollups to get nsids? (or recent rollups?) 246 - impl From<TopCollections> for Vec<String> { 247 - fn from(tc: TopCollections) -> Self { 248 - let mut me = vec![]; 249 - for (segment, children) in tc.nsid_child_segments { 250 - let child_segments: Self = children.into(); 251 - if child_segments.is_empty() { 252 - me.push(segment); 253 - } else { 254 - for ch in child_segments { 255 - let nsid = format!("{segment}.{ch}"); 256 - me.push(nsid); 257 - } 258 - } 259 - } 260 - me 261 - } 262 - } 263 - 264 - #[derive(Debug)] 265 - pub struct QueryPeriod { 266 - from: Option<Cursor>, 267 - until: Option<Cursor>, 268 - } 269 - impl QueryPeriod { 270 - pub fn all_time() -> Self { 271 - QueryPeriod { 272 - from: None, 273 - until: None, 274 - } 275 - } 276 - pub fn is_all_time(&self) -> bool { 277 - self.from.is_none() && self.until.is_none() 278 - } 279 - } 280 - 281 237 #[derive(Debug, Serialize, JsonSchema)] 282 238 pub struct NsidCount { 283 239 nsid: String, ··· 288 244 #[cfg(test)] 289 245 mod tests { 290 246 use super::*; 291 - 292 - #[test] 293 - fn test_top_collections_to_nsids() { 294 - let empty_tc = TopCollections::default(); 295 - assert_eq!(Into::<Vec<String>>::into(empty_tc), Vec::<String>::new()); 296 - 297 - let tc = TopCollections { 298 - nsid_child_segments: HashMap::from([ 299 - ( 300 - "a".to_string(), 301 - TopCollections { 302 - nsid_child_segments: HashMap::from([ 303 - ("b".to_string(), TopCollections::default()), 304 - ("c".to_string(), TopCollections::default()), 305 - ]), 306 - ..Default::default() 307 - }, 308 - ), 309 - ("z".to_string(), TopCollections::default()), 310 - ]), 311 - ..Default::default() 312 - }; 313 - 314 - let mut nsids: Vec<String> = tc.into(); 315 - nsids.sort(); 316 - assert_eq!(nsids, ["a.b", "a.c", "z"]); 317 - } 318 247 319 248 #[test] 320 249 fn test_truncating_insert_truncates() -> anyhow::Result<()> {
+20 -48
ufos/src/server.rs
··· 1 1 use crate::index_html::INDEX_HTML; 2 2 use crate::storage::StoreReader; 3 - use crate::store_types::HourTruncatedCursor; 4 - use crate::{ConsumerInfo, Nsid, NsidCount, QueryPeriod, TopCollections, UFOsRecord}; 3 + use crate::store_types::{HourTruncatedCursor, WeekTruncatedCursor}; 4 + use crate::{ConsumerInfo, Cursor, Nsid, NsidCount, UFOsRecord}; 5 5 use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; 6 6 use chrono::{DateTime, Utc}; 7 7 use dropshot::endpoint; ··· 19 19 use http::{Response, StatusCode}; 20 20 use schemars::JsonSchema; 21 21 use serde::{Deserialize, Serialize}; 22 - use std::collections::HashMap; 22 + use std::collections::{HashMap, HashSet}; 23 23 use std::sync::Arc; 24 - use std::time::{SystemTime, UNIX_EPOCH}; 24 + use std::time::{Duration, SystemTime, UNIX_EPOCH}; 25 25 26 26 struct Context { 27 27 pub spec: Arc<serde_json::Value>, ··· 109 109 consumer, 110 110 }) 111 111 } 112 - fn to_multiple_nsids(s: &str) -> Result<Vec<Nsid>, String> { 113 - let mut out = Vec::new(); 112 + fn to_multiple_nsids(s: &str) -> Result<HashSet<Nsid>, String> { 113 + let mut out = HashSet::new(); 114 114 for collection in s.split(',') { 115 115 let Ok(nsid) = Nsid::new(collection.to_string()) else { 116 116 return Err(format!("collection {collection:?} was not a valid NSID")); 117 117 }; 118 - out.push(nsid); 118 + out.insert(nsid); 119 119 } 120 120 Ok(out) 121 121 } ··· 162 162 to_multiple_nsids(&provided_collection) 163 163 .map_err(|reason| HttpError::for_bad_request(None, reason))? 164 164 } else { 165 - let all_collections_should_be_nsids: Vec<String> = storage 166 - .get_top_collections() 167 - .await 168 - .map_err(|e| { 169 - HttpError::for_internal_error(format!("failed to get top collections: {e:?}")) 170 - })? 171 - .into(); 172 - let mut all_collections = Vec::with_capacity(all_collections_should_be_nsids.len()); 173 - for raw_nsid in all_collections_should_be_nsids { 174 - let nsid = Nsid::new(raw_nsid).map_err(|e| { 175 - HttpError::for_internal_error(format!("failed to parse nsid: {e:?}")) 176 - })?; 177 - all_collections.push(nsid); 178 - } 179 - 180 165 limit = 12; 181 - all_collections 166 + let min_time_ago = SystemTime::now() - Duration::from_secs(86_400 * 3); // we want at least 3 days of data 167 + let since: WeekTruncatedCursor = Cursor::at(min_time_ago).into(); 168 + let (collections, _) = storage 169 + .get_all_collections(1000, None, Some(since.try_as().unwrap()), None) 170 + .await 171 + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 172 + collections 173 + .into_iter() 174 + .map(|c| Nsid::new(c.nsid).unwrap()) 175 + .collect() 182 176 }; 183 177 184 178 let records = storage 185 - .get_records_by_collections(&collections, limit, true) 179 + .get_records_by_collections(collections, limit, true) 186 180 .await 187 181 .map_err(|e| HttpError::for_internal_error(e.to_string()))? 188 182 .into_iter() ··· 327 321 ) -> OkCorsResponse<Vec<NsidCount>> { 328 322 let Context { storage, .. } = ctx.context(); 329 323 let collections = storage 330 - .get_top_collections_by_count(100, QueryPeriod::all_time()) 324 + .get_top_collections_by_count(100, None, None) 331 325 .await 332 326 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 333 327 ··· 344 338 ) -> OkCorsResponse<Vec<NsidCount>> { 345 339 let Context { storage, .. } = ctx.context(); 346 340 let collections = storage 347 - .get_top_collections_by_dids(100, QueryPeriod::all_time()) 341 + .get_top_collections_by_dids(100, None, None) 348 342 .await 349 343 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 350 344 351 345 ok_cors(collections) 352 346 } 353 347 354 - /// Get top collections 355 - /// 356 - /// The format of this API response will be changing soon. 357 - #[endpoint { 358 - method = GET, 359 - path = "/collections", 360 - /* 361 - * this is going away 362 - */ 363 - unpublished = true, 364 - }] 365 - async fn get_top_collections(ctx: RequestContext<Context>) -> OkCorsResponse<TopCollections> { 366 - let Context { storage, .. } = ctx.context(); 367 - let collections = storage 368 - .get_top_collections() 369 - .await 370 - .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 371 - 372 - ok_cors(collections) 373 - } 374 - 375 348 pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> { 376 349 let log = ConfigLogging::StderrTerminal { 377 350 level: ConfigLoggingLevel::Info, ··· 389 362 api.register(get_all_collections).unwrap(); 390 363 api.register(get_top_collections_by_count).unwrap(); 391 364 api.register(get_top_collections_by_dids).unwrap(); 392 - api.register(get_top_collections).unwrap(); 393 365 394 366 let context = Context { 395 367 spec: Arc::new(
+6 -9
ufos/src/storage.rs
··· 1 1 use crate::store_types::{HourTruncatedCursor, SketchSecretPrefix}; 2 - use crate::{ 3 - error::StorageError, ConsumerInfo, Cursor, EventBatch, NsidCount, QueryPeriod, TopCollections, 4 - UFOsRecord, 5 - }; 2 + use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, NsidCount, UFOsRecord}; 6 3 use async_trait::async_trait; 7 4 use jetstream::exports::{Did, Nsid}; 8 5 use std::collections::HashSet; ··· 87 84 async fn get_top_collections_by_count( 88 85 &self, 89 86 limit: usize, 90 - period: QueryPeriod, 87 + since: Option<HourTruncatedCursor>, 88 + until: Option<HourTruncatedCursor>, 91 89 ) -> StorageResult<Vec<NsidCount>>; 92 90 93 91 async fn get_top_collections_by_dids( 94 92 &self, 95 93 limit: usize, 96 - period: QueryPeriod, 94 + since: Option<HourTruncatedCursor>, 95 + until: Option<HourTruncatedCursor>, 97 96 ) -> StorageResult<Vec<NsidCount>>; 98 - 99 - async fn get_top_collections(&self) -> StorageResult<TopCollections>; 100 97 101 98 async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)>; 102 99 103 100 async fn get_records_by_collections( 104 101 &self, 105 - collections: &[Nsid], 102 + collections: HashSet<Nsid>, 106 103 limit: usize, 107 104 expand_each_collection: bool, 108 105 ) -> StorageResult<Vec<UFOsRecord>>;
+54 -265
ufos/src/storage_fjall.rs
··· 11 11 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, 12 12 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, 13 13 }; 14 - use crate::{ 15 - CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, QueryPeriod, TopCollections, 16 - UFOsRecord, 17 - }; 14 + use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, UFOsRecord}; 18 15 use async_trait::async_trait; 19 16 use fjall::Snapshot; 20 17 use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle}; ··· 515 512 fn get_top_collections_by_count( 516 513 &self, 517 514 limit: usize, 518 - period: QueryPeriod, 515 + since: Option<HourTruncatedCursor>, 516 + until: Option<HourTruncatedCursor>, 519 517 ) -> StorageResult<Vec<NsidCount>> { 520 - Ok(if period.is_all_time() { 518 + Ok(if since.is_none() && until.is_none() { 521 519 let snapshot = self.rollups.snapshot(); 522 520 let mut out = Vec::with_capacity(limit); 523 521 let prefix = AllTimeRecordsKey::from_prefix_to_db_bytes(&Default::default())?; ··· 545 543 fn get_top_collections_by_dids( 546 544 &self, 547 545 limit: usize, 548 - period: QueryPeriod, 546 + since: Option<HourTruncatedCursor>, 547 + until: Option<HourTruncatedCursor>, 549 548 ) -> StorageResult<Vec<NsidCount>> { 550 - Ok(if period.is_all_time() { 549 + Ok(if since.is_none() && until.is_none() { 551 550 let snapshot = self.rollups.snapshot(); 552 551 let mut out = Vec::with_capacity(limit); 553 552 let prefix = AllTimeDidsKey::from_prefix_to_db_bytes(&Default::default())?; ··· 572 571 }) 573 572 } 574 573 575 - fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 576 - // TODO: limit nsid traversal depth 577 - // TODO: limit nsid traversal breadth 578 - // TODO: be serious about anything 579 - 580 - // TODO: probably use a stack of segments to reduce to ~log-n merges 581 - 582 - #[derive(Default)] 583 - struct Blah { 584 - counts: CountsValue, 585 - children: HashMap<String, Blah>, 586 - } 587 - impl From<&Blah> for TopCollections { 588 - fn from(bla: &Blah) -> Self { 589 - Self { 590 - total_records: bla.counts.records(), 591 - dids_estimate: bla.counts.dids().estimate() as u64, 592 - nsid_child_segments: HashMap::from_iter( 593 - bla.children.iter().map(|(k, v)| (k.to_string(), v.into())), 594 - ), 595 - } 596 - } 597 - } 598 - 599 - let mut b = Blah::default(); 600 - let prefix = AllTimeRollupKey::from_prefix_to_db_bytes(&Default::default())?; 601 - for kv in self.rollups.prefix(&prefix.to_db_bytes()?) { 602 - let (key_bytes, val_bytes) = kv?; 603 - let key = db_complete::<AllTimeRollupKey>(&key_bytes)?; 604 - let val = db_complete::<CountsValue>(&val_bytes)?; 605 - 606 - let mut node = &mut b; 607 - node.counts.merge(&val); 608 - for segment in key.collection().split('.') { 609 - node = node.children.entry(segment.to_string()).or_default(); 610 - node.counts.merge(&val); 611 - } 612 - } 613 - 614 - Ok((&b).into()) 615 - } 616 - 617 574 fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 618 575 // 0. grab a snapshot in case rollups happen while we're working 619 576 let instant = self.keyspace.instant(); ··· 652 609 653 610 fn get_records_by_collections( 654 611 &self, 655 - collections: &[Nsid], 612 + collections: HashSet<Nsid>, 656 613 limit: usize, 657 614 expand_each_collection: bool, 658 615 ) -> StorageResult<Vec<UFOsRecord>> { ··· 661 618 } 662 619 let mut record_iterators = Vec::new(); 663 620 for collection in collections { 664 - let iter = RecordIterator::new(&self.feeds, self.records.clone(), collection, limit)?; 621 + let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?; 665 622 record_iterators.push(iter.peekable()); 666 623 } 667 624 let mut merged = Vec::new(); ··· 729 686 async fn get_top_collections_by_count( 730 687 &self, 731 688 limit: usize, 732 - period: QueryPeriod, 689 + since: Option<HourTruncatedCursor>, 690 + until: Option<HourTruncatedCursor>, 733 691 ) -> StorageResult<Vec<NsidCount>> { 734 692 let s = self.clone(); 735 693 tokio::task::spawn_blocking(move || { 736 - FjallReader::get_top_collections_by_count(&s, limit, period) 694 + FjallReader::get_top_collections_by_count(&s, limit, since, until) 737 695 }) 738 696 .await? 739 697 } 740 698 async fn get_top_collections_by_dids( 741 699 &self, 742 700 limit: usize, 743 - period: QueryPeriod, 701 + since: Option<HourTruncatedCursor>, 702 + until: Option<HourTruncatedCursor>, 744 703 ) -> StorageResult<Vec<NsidCount>> { 745 704 let s = self.clone(); 746 705 tokio::task::spawn_blocking(move || { 747 - FjallReader::get_top_collections_by_dids(&s, limit, period) 706 + FjallReader::get_top_collections_by_dids(&s, limit, since, until) 748 707 }) 749 708 .await? 750 709 } 751 - async fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 752 - let s = self.clone(); 753 - tokio::task::spawn_blocking(move || FjallReader::get_top_collections(&s)).await? 754 - } 755 710 async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 756 711 let s = self.clone(); 757 712 let collection = collection.clone(); ··· 760 715 } 761 716 async fn get_records_by_collections( 762 717 &self, 763 - collections: &[Nsid], 718 + collections: HashSet<Nsid>, 764 719 limit: usize, 765 720 expand_each_collection: bool, 766 721 ) -> StorageResult<Vec<UFOsRecord>> { 767 722 let s = self.clone(); 768 - let collections = collections.to_vec(); 769 723 tokio::task::spawn_blocking(move || { 770 - FjallReader::get_records_by_collections(&s, &collections, limit, expand_each_collection) 724 + FjallReader::get_records_by_collections(&s, collections, limit, expand_each_collection) 771 725 }) 772 726 .await? 773 727 } ··· 1550 1504 assert_eq!(records, 0); 1551 1505 assert_eq!(dids, 0); 1552 1506 1553 - let records = read.get_records_by_collections(&[collection], 2, false)?; 1507 + let records = read.get_records_by_collections([collection].into(), 2, false)?; 1554 1508 assert_eq!(records.len(), 1); 1555 1509 let rec = &records[0]; 1556 1510 assert_eq!(rec.record.get(), "{}"); 1557 1511 assert!(!rec.is_update); 1558 1512 1559 - let records = 1560 - read.get_records_by_collections(&[Nsid::new("d.e.f".to_string()).unwrap()], 2, false)?; 1513 + let records = read.get_records_by_collections( 1514 + [Nsid::new("d.e.f".to_string()).unwrap()].into(), 1515 + 2, 1516 + false, 1517 + )?; 1561 1518 assert_eq!(records.len(), 0); 1562 1519 1563 1520 Ok(()) ··· 1598 1555 write.insert_batch(batch.batch)?; 1599 1556 1600 1557 let records = read.get_records_by_collections( 1601 - &[ 1558 + HashSet::from([ 1602 1559 Nsid::new("a.a.a".to_string()).unwrap(), 1603 1560 Nsid::new("a.a.b".to_string()).unwrap(), 1604 1561 Nsid::new("a.a.c".to_string()).unwrap(), 1605 - ], 1562 + ]), 1606 1563 100, 1607 1564 false, 1608 1565 )?; ··· 1658 1615 write.insert_batch(batch.batch)?; 1659 1616 1660 1617 let records = read.get_records_by_collections( 1661 - &[ 1618 + HashSet::from([ 1662 1619 Nsid::new("a.a.a".to_string()).unwrap(), 1663 1620 Nsid::new("a.a.b".to_string()).unwrap(), 1664 1621 Nsid::new("a.a.c".to_string()).unwrap(), 1665 - ], 1622 + ]), 1666 1623 2, 1667 1624 true, 1668 1625 )?; ··· 1714 1671 assert_eq!(records, 1); 1715 1672 assert_eq!(dids, 1); 1716 1673 1717 - let records = read.get_records_by_collections(&[collection], 2, false)?; 1674 + let records = read.get_records_by_collections([collection].into(), 2, false)?; 1718 1675 assert_eq!(records.len(), 1); 1719 1676 let rec = &records[0]; 1720 1677 assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#); ··· 1752 1709 assert_eq!(records, 1); 1753 1710 assert_eq!(dids, 1); 1754 1711 1755 - let records = read.get_records_by_collections(&[collection], 2, false)?; 1712 + let records = read.get_records_by_collections([collection].into(), 2, false)?; 1756 1713 assert_eq!(records.len(), 0); 1757 1714 1758 1715 Ok(()) ··· 1798 1755 write.insert_batch(batch.batch)?; 1799 1756 1800 1757 let records = read.get_records_by_collections( 1801 - &[Nsid::new("a.a.a".to_string()).unwrap()], 1758 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1802 1759 100, 1803 1760 false, 1804 1761 )?; 1805 1762 assert_eq!(records.len(), 1); 1806 1763 let records = read.get_records_by_collections( 1807 - &[Nsid::new("a.a.b".to_string()).unwrap()], 1764 + HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 1808 1765 100, 1809 1766 false, 1810 1767 )?; 1811 1768 assert_eq!(records.len(), 10); 1812 1769 let records = read.get_records_by_collections( 1813 - &[Nsid::new("a.a.c".to_string()).unwrap()], 1770 + HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 1814 1771 100, 1815 1772 false, 1816 1773 )?; 1817 1774 assert_eq!(records.len(), 1); 1818 1775 let records = read.get_records_by_collections( 1819 - &[Nsid::new("a.a.d".to_string()).unwrap()], 1776 + HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 1820 1777 100, 1821 1778 false, 1822 1779 )?; ··· 1828 1785 write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?; 1829 1786 1830 1787 let records = read.get_records_by_collections( 1831 - &[Nsid::new("a.a.a".to_string()).unwrap()], 1788 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1832 1789 100, 1833 1790 false, 1834 1791 )?; 1835 1792 assert_eq!(records.len(), 1); 1836 1793 let records = read.get_records_by_collections( 1837 - &[Nsid::new("a.a.b".to_string()).unwrap()], 1794 + HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 1838 1795 100, 1839 1796 false, 1840 1797 )?; 1841 1798 assert_eq!(records.len(), 6); 1842 1799 let records = read.get_records_by_collections( 1843 - &[Nsid::new("a.a.c".to_string()).unwrap()], 1800 + HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 1844 1801 100, 1845 1802 false, 1846 1803 )?; 1847 1804 assert_eq!(records.len(), 1); 1848 1805 let records = read.get_records_by_collections( 1849 - &[Nsid::new("a.a.d".to_string()).unwrap()], 1806 + HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 1850 1807 100, 1851 1808 false, 1852 1809 )?; ··· 1883 1840 write.insert_batch(batch.batch)?; 1884 1841 1885 1842 let records = read.get_records_by_collections( 1886 - &[Nsid::new("a.a.a".to_string()).unwrap()], 1843 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1887 1844 100, 1888 1845 false, 1889 1846 )?; ··· 1894 1851 assert_eq!(records_deleted, 2); 1895 1852 1896 1853 let records = read.get_records_by_collections( 1897 - &[Nsid::new("a.a.a".to_string()).unwrap()], 1854 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1898 1855 100, 1899 1856 false, 1900 1857 )?; ··· 1925 1882 1926 1883 write.step_rollup()?; 1927 1884 1928 - let records = 1929 - read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?; 1885 + let records = read.get_records_by_collections( 1886 + [Nsid::new("a.a.a".to_string()).unwrap()].into(), 1887 + 1, 1888 + false, 1889 + )?; 1930 1890 assert_eq!(records.len(), 0); 1931 1891 1932 1892 Ok(()) ··· 1955 1915 batch.delete_account("did:plc:person-a", 10_001); 1956 1916 write.insert_batch(batch.batch)?; 1957 1917 1958 - let records = 1959 - read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?; 1918 + let records = read.get_records_by_collections( 1919 + [Nsid::new("a.a.a".to_string()).unwrap()].into(), 1920 + 1, 1921 + false, 1922 + )?; 1960 1923 assert_eq!(records.len(), 1); 1961 1924 1962 1925 let (n, _) = write.step_rollup()?; 1963 1926 assert_eq!(n, 1); 1964 1927 1965 - let records = 1966 - read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?; 1928 + let records = read.get_records_by_collections( 1929 + [Nsid::new("a.a.a".to_string()).unwrap()].into(), 1930 + 1, 1931 + false, 1932 + )?; 1967 1933 assert_eq!(records.len(), 0); 1968 1934 1969 1935 let mut batch = TestBatch::default(); ··· 2090 2056 // no more rollups left 2091 2057 let (n, _) = write.step_rollup()?; 2092 2058 assert_eq!(n, 0); 2093 - 2094 - Ok(()) 2095 - } 2096 - 2097 - #[test] 2098 - fn get_top_collections() -> anyhow::Result<()> { 2099 - let (read, mut write) = fjall_db(); 2100 - 2101 - let mut batch = TestBatch::default(); 2102 - batch.create( 2103 - "did:plc:person-a", 2104 - "a.a.a", 2105 - "rkey-aaa", 2106 - "{}", 2107 - Some("rev-aaa"), 2108 - None, 2109 - 10_000, 2110 - ); 2111 - batch.create( 2112 - "did:plc:person-b", 2113 - "a.a.b", 2114 - "rkey-bbb", 2115 - "{}", 2116 - Some("rev-bbb"), 2117 - None, 2118 - 10_001, 2119 - ); 2120 - batch.create( 2121 - "did:plc:person-c", 2122 - "a.b.c", 2123 - "rkey-ccc", 2124 - "{}", 2125 - Some("rev-ccc"), 2126 - None, 2127 - 10_002, 2128 - ); 2129 - batch.create( 2130 - "did:plc:person-a", 2131 - "a.a.a", 2132 - "rkey-aaa-2", 2133 - "{}", 2134 - Some("rev-aaa-2"), 2135 - None, 2136 - 10_003, 2137 - ); 2138 - write.insert_batch(batch.batch)?; 2139 - 2140 - let (n, _) = write.step_rollup()?; 2141 - assert_eq!(n, 3); // 3 collections 2142 - 2143 - let tops = read.get_top_collections()?; 2144 - assert_eq!( 2145 - tops, 2146 - TopCollections { 2147 - total_records: 4, 2148 - dids_estimate: 3, 2149 - nsid_child_segments: HashMap::from([( 2150 - "a".to_string(), 2151 - TopCollections { 2152 - total_records: 4, 2153 - dids_estimate: 3, 2154 - nsid_child_segments: HashMap::from([ 2155 - ( 2156 - "a".to_string(), 2157 - TopCollections { 2158 - total_records: 3, 2159 - dids_estimate: 2, 2160 - nsid_child_segments: HashMap::from([ 2161 - ( 2162 - "a".to_string(), 2163 - TopCollections { 2164 - total_records: 2, 2165 - dids_estimate: 1, 2166 - nsid_child_segments: HashMap::from([]), 2167 - }, 2168 - ), 2169 - ( 2170 - "b".to_string(), 2171 - TopCollections { 2172 - total_records: 1, 2173 - dids_estimate: 1, 2174 - nsid_child_segments: HashMap::from([]), 2175 - } 2176 - ), 2177 - ]), 2178 - }, 2179 - ), 2180 - ( 2181 - "b".to_string(), 2182 - TopCollections { 2183 - total_records: 1, 2184 - dids_estimate: 1, 2185 - nsid_child_segments: HashMap::from([( 2186 - "c".to_string(), 2187 - TopCollections { 2188 - total_records: 1, 2189 - dids_estimate: 1, 2190 - nsid_child_segments: HashMap::from([]), 2191 - }, 2192 - ),]), 2193 - }, 2194 - ), 2195 - ]), 2196 - }, 2197 - ),]), 2198 - } 2199 - ); 2200 - Ok(()) 2201 - } 2202 - 2203 - #[test] 2204 - fn get_top_collections_with_parent_nsid() -> anyhow::Result<()> { 2205 - let (read, mut write) = fjall_db(); 2206 - 2207 - let mut batch = TestBatch::default(); 2208 - batch.create( 2209 - "did:plc:inze6wrmsm7pjl7yta3oig77", 2210 - "a.a.a.a", 2211 - "aaaa", 2212 - r#""child nsid""#, 2213 - Some("rev-aaaa"), 2214 - None, 2215 - 100, 2216 - ); 2217 - batch.create( 2218 - "did:plc:inze6wrmsm7pjl7yta3oig77", 2219 - "a.a.a", 2220 - "aaa", 2221 - r#""parent nsid""#, 2222 - Some("rev-aaa"), 2223 - None, 2224 - 101, 2225 - ); 2226 - write.insert_batch(batch.batch)?; 2227 - 2228 - let (n, _) = write.step_rollup()?; 2229 - assert_eq!(n, 2); // 3 collections 2230 - 2231 - let tops = read.get_top_collections()?; 2232 - assert_eq!( 2233 - tops, 2234 - TopCollections { 2235 - total_records: 2, 2236 - dids_estimate: 1, 2237 - nsid_child_segments: HashMap::from([( 2238 - "a".to_string(), 2239 - TopCollections { 2240 - total_records: 2, 2241 - dids_estimate: 1, 2242 - nsid_child_segments: HashMap::from([( 2243 - "a".to_string(), 2244 - TopCollections { 2245 - total_records: 2, 2246 - dids_estimate: 1, 2247 - nsid_child_segments: HashMap::from([( 2248 - "a".to_string(), 2249 - TopCollections { 2250 - total_records: 2, 2251 - dids_estimate: 1, 2252 - nsid_child_segments: HashMap::from([( 2253 - "a".to_string(), 2254 - TopCollections { 2255 - total_records: 1, 2256 - dids_estimate: 1, 2257 - nsid_child_segments: HashMap::from([]), 2258 - }, 2259 - ),]), 2260 - }, 2261 - ),]), 2262 - }, 2263 - ),]), 2264 - }, 2265 - ),]), 2266 - } 2267 - ); 2268 - 2269 - // TODO: handle leaf node counts explicitly, since parent NSIDs can be leaves themselves 2270 2059 2271 2060 Ok(()) 2272 2061 }
+44 -186
ufos/src/storage_mem.rs
··· 12 12 RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretPrefix, TakeoffKey, 13 13 TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey, 14 14 }; 15 - use crate::{ 16 - CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, QueryPeriod, TopCollections, 17 - UFOsRecord, 18 - }; 15 + use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, UFOsRecord}; 19 16 use async_trait::async_trait; 20 17 use jetstream::events::Cursor; 21 18 use lsm_tree::range::prefix_to_range; ··· 453 450 }) 454 451 } 455 452 456 - fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 457 - // TODO: limit nsid traversal depth 458 - // TODO: limit nsid traversal breadth 459 - // TODO: be serious about anything 460 - 461 - // TODO: probably use a stack of segments to reduce to ~log-n merges 462 - 463 - #[derive(Default)] 464 - struct Blah { 465 - counts: CountsValue, 466 - children: HashMap<String, Blah>, 467 - } 468 - impl From<&Blah> for TopCollections { 469 - fn from(bla: &Blah) -> Self { 470 - Self { 471 - total_records: bla.counts.records(), 472 - dids_estimate: bla.counts.dids().estimate() as u64, 473 - nsid_child_segments: HashMap::from_iter( 474 - bla.children.iter().map(|(k, v)| (k.to_string(), v.into())), 475 - ), 476 - } 477 - } 478 - } 479 - 480 - let mut b = Blah::default(); 481 - let prefix = AllTimeRollupKey::from_prefix_to_db_bytes(&Default::default())?; 482 - for kv in self.rollups.prefix(&prefix.to_db_bytes()?) { 483 - let (key_bytes, val_bytes) = kv?; 484 - let key = db_complete::<AllTimeRollupKey>(&key_bytes)?; 485 - let val = db_complete::<CountsValue>(&val_bytes)?; 486 - 487 - let mut node = &mut b; 488 - node.counts.merge(&val); 489 - for segment in key.collection().split('.') { 490 - node = node.children.entry(segment.to_string()).or_default(); 491 - node.counts.merge(&val); 492 - } 493 - } 494 - 495 - Ok((&b).into()) 496 - } 497 - 498 453 fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 499 454 // 0. grab a snapshot in case rollups happen while we're working 500 455 let instant = self.keyspace.instant(); ··· 533 488 534 489 fn get_records_by_collections( 535 490 &self, 536 - collections: &[Nsid], 491 + collections: HashSet<Nsid>, 537 492 limit: usize, 538 493 _expand_each_collection: bool, 539 494 ) -> StorageResult<Vec<UFOsRecord>> { ··· 542 497 } 543 498 let mut record_iterators = Vec::new(); 544 499 for collection in collections { 545 - let iter = RecordIterator::new(&self.feeds, self.records.clone(), collection, limit)?; 500 + let iter = RecordIterator::new(&self.feeds, self.records.clone(), &collection, limit)?; 546 501 record_iterators.push(iter.peekable()); 547 502 } 548 503 let mut merged = Vec::new(); ··· 590 545 let s = self.clone(); 591 546 tokio::task::spawn_blocking(move || MemReader::get_consumer_info(&s)).await? 592 547 } 593 - async fn get_top_collections(&self) -> StorageResult<TopCollections> { 594 - let s = self.clone(); 595 - tokio::task::spawn_blocking(move || MemReader::get_top_collections(&s)).await? 596 - } 597 548 async fn get_all_collections( 598 549 &self, 599 550 _: usize, ··· 606 557 async fn get_top_collections_by_count( 607 558 &self, 608 559 _: usize, 609 - _: QueryPeriod, 560 + _: Option<HourTruncatedCursor>, 561 + _: Option<HourTruncatedCursor>, 610 562 ) -> StorageResult<Vec<NsidCount>> { 611 563 todo!() 612 564 } 613 565 async fn get_top_collections_by_dids( 614 566 &self, 615 567 _: usize, 616 - _: QueryPeriod, 568 + _: Option<HourTruncatedCursor>, 569 + _: Option<HourTruncatedCursor>, 617 570 ) -> StorageResult<Vec<NsidCount>> { 618 571 todo!() 619 572 } ··· 625 578 } 626 579 async fn get_records_by_collections( 627 580 &self, 628 - collections: &[Nsid], 581 + collections: HashSet<Nsid>, 629 582 limit: usize, 630 583 expand_each_collection: bool, 631 584 ) -> StorageResult<Vec<UFOsRecord>> { 632 585 let s = self.clone(); 633 - let collections = collections.to_vec(); 634 586 tokio::task::spawn_blocking(move || { 635 - MemReader::get_records_by_collections(&s, &collections, limit, expand_each_collection) 587 + MemReader::get_records_by_collections(&s, collections, limit, expand_each_collection) 636 588 }) 637 589 .await? 638 590 } ··· 1301 1253 assert_eq!(records, 0); 1302 1254 assert_eq!(dids, 0); 1303 1255 1304 - let records = read.get_records_by_collections(&[collection], 2, false)?; 1256 + let records = read.get_records_by_collections(HashSet::from([collection]), 2, false)?; 1305 1257 assert_eq!(records.len(), 1); 1306 1258 let rec = &records[0]; 1307 1259 assert_eq!(rec.record.get(), "{}"); 1308 1260 assert!(!rec.is_update); 1309 1261 1310 - let records = 1311 - read.get_records_by_collections(&[Nsid::new("d.e.f".to_string()).unwrap()], 2, false)?; 1262 + let records = read.get_records_by_collections( 1263 + HashSet::from([Nsid::new("d.e.f".to_string()).unwrap()]), 1264 + 2, 1265 + false, 1266 + )?; 1312 1267 assert_eq!(records.len(), 0); 1313 1268 1314 1269 Ok(()) ··· 1349 1304 write.insert_batch(batch.batch)?; 1350 1305 1351 1306 let records = read.get_records_by_collections( 1352 - &[ 1307 + HashSet::from([ 1353 1308 Nsid::new("a.a.a".to_string()).unwrap(), 1354 1309 Nsid::new("a.a.b".to_string()).unwrap(), 1355 1310 Nsid::new("a.a.c".to_string()).unwrap(), 1356 - ], 1311 + ]), 1357 1312 100, 1358 1313 false, 1359 1314 )?; ··· 1409 1364 assert_eq!(records, 1); 1410 1365 assert_eq!(dids, 1); 1411 1366 1412 - let records = read.get_records_by_collections(&[collection], 2, false)?; 1367 + let records = read.get_records_by_collections(HashSet::from([collection]), 2, false)?; 1413 1368 assert_eq!(records.len(), 1); 1414 1369 let rec = &records[0]; 1415 1370 assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#); ··· 1447 1402 assert_eq!(records, 1); 1448 1403 assert_eq!(dids, 1); 1449 1404 1450 - let records = read.get_records_by_collections(&[collection], 2, false)?; 1405 + let records = read.get_records_by_collections(HashSet::from([collection]), 2, false)?; 1451 1406 assert_eq!(records.len(), 0); 1452 1407 1453 1408 Ok(()) ··· 1493 1448 write.insert_batch(batch.batch)?; 1494 1449 1495 1450 let records = read.get_records_by_collections( 1496 - &[Nsid::new("a.a.a".to_string()).unwrap()], 1451 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1497 1452 100, 1498 1453 false, 1499 1454 )?; 1500 1455 assert_eq!(records.len(), 1); 1501 1456 let records = read.get_records_by_collections( 1502 - &[Nsid::new("a.a.b".to_string()).unwrap()], 1457 + HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 1503 1458 100, 1504 1459 false, 1505 1460 )?; 1506 1461 assert_eq!(records.len(), 10); 1507 1462 let records = read.get_records_by_collections( 1508 - &[Nsid::new("a.a.c".to_string()).unwrap()], 1463 + HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 1509 1464 100, 1510 1465 false, 1511 1466 )?; 1512 1467 assert_eq!(records.len(), 1); 1513 1468 let records = read.get_records_by_collections( 1514 - &[Nsid::new("a.a.d".to_string()).unwrap()], 1469 + HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 1515 1470 100, 1516 1471 false, 1517 1472 )?; ··· 1523 1478 write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6, false)?; 1524 1479 1525 1480 let records = read.get_records_by_collections( 1526 - &[Nsid::new("a.a.a".to_string()).unwrap()], 1481 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1527 1482 100, 1528 1483 false, 1529 1484 )?; 1530 1485 assert_eq!(records.len(), 1); 1531 1486 let records = read.get_records_by_collections( 1532 - &[Nsid::new("a.a.b".to_string()).unwrap()], 1487 + HashSet::from([Nsid::new("a.a.b".to_string()).unwrap()]), 1533 1488 100, 1534 1489 false, 1535 1490 )?; 1536 1491 assert_eq!(records.len(), 6); 1537 1492 let records = read.get_records_by_collections( 1538 - &[Nsid::new("a.a.c".to_string()).unwrap()], 1493 + HashSet::from([Nsid::new("a.a.c".to_string()).unwrap()]), 1539 1494 100, 1540 1495 false, 1541 1496 )?; 1542 1497 assert_eq!(records.len(), 1); 1543 1498 let records = read.get_records_by_collections( 1544 - &[Nsid::new("a.a.d".to_string()).unwrap()], 1499 + HashSet::from([Nsid::new("a.a.d".to_string()).unwrap()]), 1545 1500 100, 1546 1501 false, 1547 1502 )?; ··· 1578 1533 write.insert_batch(batch.batch)?; 1579 1534 1580 1535 let records = read.get_records_by_collections( 1581 - &[Nsid::new("a.a.a".to_string()).unwrap()], 1536 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1582 1537 100, 1583 1538 false, 1584 1539 )?; ··· 1589 1544 assert_eq!(records_deleted, 2); 1590 1545 1591 1546 let records = read.get_records_by_collections( 1592 - &[Nsid::new("a.a.a".to_string()).unwrap()], 1547 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1593 1548 100, 1594 1549 false, 1595 1550 )?; ··· 1620 1575 1621 1576 write.step_rollup()?; 1622 1577 1623 - let records = 1624 - read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?; 1578 + let records = read.get_records_by_collections( 1579 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1580 + 1, 1581 + false, 1582 + )?; 1625 1583 assert_eq!(records.len(), 0); 1626 1584 1627 1585 Ok(()) ··· 1650 1608 batch.delete_account("did:plc:person-a", 10_001); 1651 1609 write.insert_batch(batch.batch)?; 1652 1610 1653 - let records = 1654 - read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?; 1611 + let records = read.get_records_by_collections( 1612 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1613 + 1, 1614 + false, 1615 + )?; 1655 1616 assert_eq!(records.len(), 1); 1656 1617 1657 1618 let (n, _) = write.step_rollup()?; 1658 1619 assert_eq!(n, 1); 1659 1620 1660 - let records = 1661 - read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1, false)?; 1621 + let records = read.get_records_by_collections( 1622 + HashSet::from([Nsid::new("a.a.a".to_string()).unwrap()]), 1623 + 1, 1624 + false, 1625 + )?; 1662 1626 assert_eq!(records.len(), 0); 1663 1627 1664 1628 let mut batch = TestBatch::default(); ··· 1786 1750 let (n, _) = write.step_rollup()?; 1787 1751 assert_eq!(n, 0); 1788 1752 1789 - Ok(()) 1790 - } 1791 - 1792 - #[test] 1793 - fn get_top_collections() -> anyhow::Result<()> { 1794 - let (read, mut write) = fjall_db(); 1795 - 1796 - let mut batch = TestBatch::default(); 1797 - batch.create( 1798 - "did:plc:person-a", 1799 - "a.a.a", 1800 - "rkey-aaa", 1801 - "{}", 1802 - Some("rev-aaa"), 1803 - None, 1804 - 10_000, 1805 - ); 1806 - batch.create( 1807 - "did:plc:person-b", 1808 - "a.a.b", 1809 - "rkey-bbb", 1810 - "{}", 1811 - Some("rev-bbb"), 1812 - None, 1813 - 10_001, 1814 - ); 1815 - batch.create( 1816 - "did:plc:person-c", 1817 - "a.b.c", 1818 - "rkey-ccc", 1819 - "{}", 1820 - Some("rev-ccc"), 1821 - None, 1822 - 10_002, 1823 - ); 1824 - batch.create( 1825 - "did:plc:person-a", 1826 - "a.a.a", 1827 - "rkey-aaa-2", 1828 - "{}", 1829 - Some("rev-aaa-2"), 1830 - None, 1831 - 10_003, 1832 - ); 1833 - write.insert_batch(batch.batch)?; 1834 - 1835 - let (n, _) = write.step_rollup()?; 1836 - assert_eq!(n, 3); // 3 collections 1837 - 1838 - let tops = read.get_top_collections()?; 1839 - assert_eq!( 1840 - tops, 1841 - TopCollections { 1842 - total_records: 4, 1843 - dids_estimate: 3, 1844 - nsid_child_segments: HashMap::from([( 1845 - "a".to_string(), 1846 - TopCollections { 1847 - total_records: 4, 1848 - dids_estimate: 3, 1849 - nsid_child_segments: HashMap::from([ 1850 - ( 1851 - "a".to_string(), 1852 - TopCollections { 1853 - total_records: 3, 1854 - dids_estimate: 2, 1855 - nsid_child_segments: HashMap::from([ 1856 - ( 1857 - "a".to_string(), 1858 - TopCollections { 1859 - total_records: 2, 1860 - dids_estimate: 1, 1861 - nsid_child_segments: HashMap::from([]), 1862 - }, 1863 - ), 1864 - ( 1865 - "b".to_string(), 1866 - TopCollections { 1867 - total_records: 1, 1868 - dids_estimate: 1, 1869 - nsid_child_segments: HashMap::from([]), 1870 - } 1871 - ), 1872 - ]), 1873 - }, 1874 - ), 1875 - ( 1876 - "b".to_string(), 1877 - TopCollections { 1878 - total_records: 1, 1879 - dids_estimate: 1, 1880 - nsid_child_segments: HashMap::from([( 1881 - "c".to_string(), 1882 - TopCollections { 1883 - total_records: 1, 1884 - dids_estimate: 1, 1885 - nsid_child_segments: HashMap::from([]), 1886 - }, 1887 - ),]), 1888 - }, 1889 - ), 1890 - ]), 1891 - }, 1892 - ),]), 1893 - } 1894 - ); 1895 1753 Ok(()) 1896 1754 } 1897 1755 }