Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

paginate the full collections list

phil 4cef69bd e7f3fa81

+145 -39
+1
Cargo.lock
··· 3809 3809 dependencies = [ 3810 3810 "anyhow", 3811 3811 "async-trait", 3812 + "base64 0.22.1", 3812 3813 "bincode 2.0.1", 3813 3814 "cardinality-estimator-safe", 3814 3815 "clap",
+1
ufos/Cargo.toml
··· 6 6 [dependencies] 7 7 anyhow = "1.0.97" 8 8 async-trait = "0.1.88" 9 + base64 = "0.22.1" 9 10 bincode = { version = "2.0.1", features = ["serde"] } 10 11 cardinality-estimator-safe = { version = "4.0.1", features = ["with_serde", "with_digest"] } 11 12 clap = { version = "4.5.31", features = ["derive"] }
+2 -2
ufos/src/lib.rs
··· 279 279 } 280 280 281 281 #[derive(Debug, Serialize, JsonSchema)] 282 - pub struct Count { 283 - thing: String, 282 + pub struct NsidCount { 283 + nsid: String, 284 284 records: u64, 285 285 dids_estimate: u64, 286 286 }
+67 -12
ufos/src/server.rs
··· 1 1 use crate::index_html::INDEX_HTML; 2 2 use crate::storage::StoreReader; 3 - use crate::{ConsumerInfo, Count, Nsid, QueryPeriod, TopCollections, UFOsRecord}; 3 + use crate::{ConsumerInfo, Nsid, NsidCount, QueryPeriod, TopCollections, UFOsRecord}; 4 + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; 4 5 use dropshot::endpoint; 5 6 use dropshot::ApiDescription; 6 7 use dropshot::Body; ··· 213 214 ok_cors(seen_by_collection) 214 215 } 215 216 216 - /// Get all collections 217 - /// 218 - /// TODO: paginate 219 - /// 220 - /// WARNING: this endpoint will return an object instead of array when pagination is added 217 + #[derive(Debug, Serialize, JsonSchema)] 218 + struct CollectionsResponse { 219 + /// Each known collection and its associated statistics 220 + /// 221 + /// The order is unspecified. 222 + collections: Vec<NsidCount>, 223 + /// Include in a follow-up request to get the next page of results, if more are available 224 + cursor: Option<String>, 225 + } 226 + #[derive(Debug, Deserialize, JsonSchema)] 227 + struct AllCollectionsQuery { 228 + /// The maximum number of collections to return in one request. 229 + #[schemars(range(min = 1, max = 200), default = "all_collections_default_limit")] 230 + limit: usize, 231 + /// Always omit the cursor for the first request. If more collections than the limit are available, the response will contain a non-null `cursor` to include with the next request. 232 + cursor: Option<String>, 233 + } 234 + fn all_collections_default_limit() -> usize { 235 + 100 236 + } 221 237 #[endpoint { 222 238 method = GET, 223 239 path = "/collections/all" 224 240 }] 225 - async fn get_all_collections(ctx: RequestContext<Context>) -> OkCorsResponse<Vec<Count>> { 241 + /// Get all collections 242 + /// 243 + /// There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests using the `cursor` response property and request parameter to get them all. 244 + /// 245 + /// The set of collections across multiple requests is not guaranteed to be a perfectly consistent snapshot: 246 + /// 247 + /// - all collection NSIDs observed before the first request will be included in the results 248 + /// 249 + /// - *new* NSIDs observed in the firehose *while paging* might be included or excluded from the final set 250 + /// 251 + /// - no duplicate NSIDs will occur in the combined results 252 + /// 253 + /// In practice this is close enough for most use-cases to not worry about. 254 + async fn get_all_collections( 255 + ctx: RequestContext<Context>, 256 + query: Query<AllCollectionsQuery>, 257 + ) -> OkCorsResponse<CollectionsResponse> { 226 258 let Context { storage, .. } = ctx.context(); 227 - let collections = storage 228 - .get_all_collections(QueryPeriod::all_time()) 259 + let q = query.into_inner(); 260 + 261 + if !(1..=200).contains(&q.limit) { 262 + let msg = format!("limit not in 1..=200: {}", q.limit); 263 + return Err(HttpError::for_bad_request(None, msg)); 264 + } 265 + 266 + let cursor = q 267 + .cursor 268 + .and_then(|c| if c.is_empty() { None } else { Some(c) }) 269 + .map(|c| URL_SAFE_NO_PAD.decode(&c)) 270 + .transpose() 271 + .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 272 + 273 + let (collections, next_cursor) = storage 274 + .get_all_collections(QueryPeriod::all_time(), q.limit, cursor) 229 275 .await 230 276 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 231 277 232 - ok_cors(collections) 278 + let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c)); 279 + 280 + ok_cors(CollectionsResponse { 281 + collections, 282 + cursor: next_cursor, 283 + }) 233 284 } 234 285 235 286 /// Get top collections by record count ··· 237 288 method = GET, 238 289 path = "/collections/by-count" 239 290 }] 240 - async fn get_top_collections_by_count(ctx: RequestContext<Context>) -> OkCorsResponse<Vec<Count>> { 291 + async fn get_top_collections_by_count( 292 + ctx: RequestContext<Context>, 293 + ) -> OkCorsResponse<Vec<NsidCount>> { 241 294 let Context { storage, .. } = ctx.context(); 242 295 let collections = storage 243 296 .get_top_collections_by_count(100, QueryPeriod::all_time()) ··· 252 305 method = GET, 253 306 path = "/collections/by-dids" 254 307 }] 255 - async fn get_top_collections_by_dids(ctx: RequestContext<Context>) -> OkCorsResponse<Vec<Count>> { 308 + async fn get_top_collections_by_dids( 309 + ctx: RequestContext<Context>, 310 + ) -> OkCorsResponse<Vec<NsidCount>> { 256 311 let Context { storage, .. } = ctx.context(); 257 312 let collections = storage 258 313 .get_top_collections_by_dids(100, QueryPeriod::all_time())
+9 -4
ufos/src/storage.rs
··· 1 1 use crate::store_types::SketchSecretPrefix; 2 2 use crate::{ 3 - error::StorageError, ConsumerInfo, Count, Cursor, EventBatch, QueryPeriod, TopCollections, 3 + error::StorageError, ConsumerInfo, Cursor, EventBatch, NsidCount, QueryPeriod, TopCollections, 4 4 UFOsRecord, 5 5 }; 6 6 use async_trait::async_trait; ··· 76 76 77 77 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 78 78 79 - async fn get_all_collections(&self, period: QueryPeriod) -> StorageResult<Vec<Count>>; 79 + async fn get_all_collections( 80 + &self, 81 + period: QueryPeriod, 82 + limit: usize, 83 + cursor: Option<Vec<u8>>, 84 + ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>; 80 85 81 86 async fn get_top_collections_by_count( 82 87 &self, 83 88 limit: usize, 84 89 period: QueryPeriod, 85 - ) -> StorageResult<Vec<Count>>; 90 + ) -> StorageResult<Vec<NsidCount>>; 86 91 87 92 async fn get_top_collections_by_dids( 88 93 &self, 89 94 limit: usize, 90 95 period: QueryPeriod, 91 - ) -> StorageResult<Vec<Count>>; 96 + ) -> StorageResult<Vec<NsidCount>>; 92 97 93 98 async fn get_top_collections(&self) -> StorageResult<TopCollections>; 94 99
+56 -17
ufos/src/storage_fjall.rs
··· 11 11 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, 12 12 }; 13 13 use crate::{ 14 - CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections, 14 + CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, QueryPeriod, TopCollections, 15 15 UFOsRecord, 16 16 }; 17 17 use async_trait::async_trait; 18 18 use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle}; 19 19 use jetstream::events::Cursor; 20 20 use std::collections::{HashMap, HashSet}; 21 + use std::ops::Bound; 21 22 use std::path::Path; 22 23 use std::sync::{ 23 24 atomic::{AtomicBool, Ordering}, ··· 372 373 }) 373 374 } 374 375 375 - fn get_all_collections(&self, period: QueryPeriod) -> StorageResult<Vec<Count>> { 376 + fn get_all_collections( 377 + &self, 378 + period: QueryPeriod, 379 + limit: usize, 380 + cursor: Option<Vec<u8>>, 381 + ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 376 382 Ok(if period.is_all_time() { 377 383 let snapshot = self.rollups.snapshot(); 384 + 385 + let start = if let Some(cursor_bytes) = cursor { 386 + let nsid = db_complete::<Nsid>(&cursor_bytes)?; // TODO: bubble a *client* error type 387 + Bound::Excluded( 388 + AllTimeRollupKey::from_pair(Default::default(), nsid).to_db_bytes()?, 389 + ) 390 + } else { 391 + Bound::Included(AllTimeRollupKey::from_prefix_to_db_bytes( 392 + &Default::default(), 393 + )?) 394 + }; 395 + 396 + let end_bytes = AllTimeRollupKey::prefix_range_end(&Default::default())?; 397 + let end = Bound::Excluded(end_bytes.clone()); 398 + 378 399 let mut out = Vec::new(); 379 - let prefix = AllTimeRollupKey::from_prefix_to_db_bytes(&Default::default())?; 380 - for kv in snapshot.prefix(prefix) { 400 + let mut next_cursor = None; 401 + log::warn!( 402 + "ranging snapshot with limit: {limit}, end: {:?}", 403 + str::from_utf8(&end_bytes) 404 + ); 405 + for (i, kv) in snapshot.range((start, end)).take(limit).enumerate() { 381 406 let (key_bytes, val_bytes) = kv?; 382 407 let key = db_complete::<AllTimeRollupKey>(&key_bytes)?; 383 408 let db_counts = db_complete::<CountsValue>(&val_bytes)?; 384 - out.push(Count { 385 - thing: key.collection().to_string(), 409 + out.push(NsidCount { 410 + nsid: key.collection().to_string(), 386 411 records: db_counts.records(), 387 412 dids_estimate: db_counts.dids().estimate() as u64, 388 413 }); 414 + if i == limit - 1 { 415 + log::warn!("reached limit, setting next cursor"); 416 + let nsid_bytes = key.collection().to_db_bytes()?; 417 + next_cursor = Some(nsid_bytes); 418 + } 389 419 } 390 - out 420 + 421 + (out, next_cursor) 391 422 } else { 392 423 todo!() 393 424 }) ··· 397 428 &self, 398 429 limit: usize, 399 430 period: QueryPeriod, 400 - ) -> StorageResult<Vec<Count>> { 431 + ) -> StorageResult<Vec<NsidCount>> { 401 432 Ok(if period.is_all_time() { 402 433 let snapshot = self.rollups.snapshot(); 403 434 let mut out = Vec::with_capacity(limit); ··· 411 442 ); 412 443 let db_counts = db_complete::<CountsValue>(&db_count_bytes)?; 413 444 assert_eq!(db_counts.records(), key.count()); 414 - out.push(Count { 415 - thing: key.collection().to_string(), 445 + out.push(NsidCount { 446 + nsid: key.collection().to_string(), 416 447 records: db_counts.records(), 417 448 dids_estimate: db_counts.dids().estimate() as u64, 418 449 }); ··· 427 458 &self, 428 459 limit: usize, 429 460 period: QueryPeriod, 430 - ) -> StorageResult<Vec<Count>> { 461 + ) -> StorageResult<Vec<NsidCount>> { 431 462 Ok(if period.is_all_time() { 432 463 let snapshot = self.rollups.snapshot(); 433 464 let mut out = Vec::with_capacity(limit); ··· 441 472 ); 442 473 let db_counts = db_complete::<CountsValue>(&db_count_bytes)?; 443 474 assert_eq!(db_counts.dids().estimate() as u64, key.count()); 444 - out.push(Count { 445 - thing: key.collection().to_string(), 475 + out.push(NsidCount { 476 + nsid: key.collection().to_string(), 446 477 records: db_counts.records(), 447 478 dids_estimate: db_counts.dids().estimate() as u64, 448 479 }); ··· 594 625 let s = self.clone(); 595 626 tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await? 596 627 } 597 - async fn get_all_collections(&self, period: QueryPeriod) -> StorageResult<Vec<Count>> { 628 + async fn get_all_collections( 629 + &self, 630 + period: QueryPeriod, 631 + limit: usize, 632 + cursor: Option<Vec<u8>>, 633 + ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 598 634 let s = self.clone(); 599 - tokio::task::spawn_blocking(move || FjallReader::get_all_collections(&s, period)).await? 635 + tokio::task::spawn_blocking(move || { 636 + FjallReader::get_all_collections(&s, period, limit, cursor) 637 + }) 638 + .await? 600 639 } 601 640 async fn get_top_collections_by_count( 602 641 &self, 603 642 limit: usize, 604 643 period: QueryPeriod, 605 - ) -> StorageResult<Vec<Count>> { 644 + ) -> StorageResult<Vec<NsidCount>> { 606 645 let s = self.clone(); 607 646 tokio::task::spawn_blocking(move || { 608 647 FjallReader::get_top_collections_by_count(&s, limit, period) ··· 613 652 &self, 614 653 limit: usize, 615 654 period: QueryPeriod, 616 - ) -> StorageResult<Vec<Count>> { 655 + ) -> StorageResult<Vec<NsidCount>> { 617 656 let s = self.clone(); 618 657 tokio::task::spawn_blocking(move || { 619 658 FjallReader::get_top_collections_by_dids(&s, limit, period)
+9 -4
ufos/src/storage_mem.rs
··· 13 13 TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey, 14 14 }; 15 15 use crate::{ 16 - CommitAction, ConsumerInfo, Count, Did, EventBatch, Nsid, QueryPeriod, TopCollections, 16 + CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, QueryPeriod, TopCollections, 17 17 UFOsRecord, 18 18 }; 19 19 use async_trait::async_trait; ··· 594 594 let s = self.clone(); 595 595 tokio::task::spawn_blocking(move || MemReader::get_top_collections(&s)).await? 596 596 } 597 - async fn get_all_collections(&self, _: QueryPeriod) -> StorageResult<Vec<Count>> { 597 + async fn get_all_collections( 598 + &self, 599 + _: QueryPeriod, 600 + _: usize, 601 + _: Option<Vec<u8>>, 602 + ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 598 603 todo!() 599 604 } 600 605 async fn get_top_collections_by_count( 601 606 &self, 602 607 _: usize, 603 608 _: QueryPeriod, 604 - ) -> StorageResult<Vec<Count>> { 609 + ) -> StorageResult<Vec<NsidCount>> { 605 610 todo!() 606 611 } 607 612 async fn get_top_collections_by_dids( 608 613 &self, 609 614 _: usize, 610 615 _: QueryPeriod, 611 - ) -> StorageResult<Vec<Count>> { 616 + ) -> StorageResult<Vec<NsidCount>> { 612 617 todo!() 613 618 } 614 619 async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {