Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

start merging sorting into a single endpoint

phil 6f17e4a5 8799d94c

+154 -41
+12
ufos/src/lib.rs
··· 241 241 dids_estimate: u64, 242 242 } 243 243 244 + #[derive(Debug)] 245 + pub enum OrderCollectionsBy { 246 + Lexi { cursor: Option<Vec<u8>> }, 247 + RecordsCreated, 248 + DidsEstimate, 249 + } 250 + impl Default for OrderCollectionsBy { 251 + fn default() -> Self { 252 + Self::Lexi { cursor: None } 253 + } 254 + } 255 + 244 256 #[cfg(test)] 245 257 mod tests { 246 258 use super::*;
+113 -29
ufos/src/server.rs
··· 1 1 use crate::index_html::INDEX_HTML; 2 2 use crate::storage::StoreReader; 3 3 use crate::store_types::{HourTruncatedCursor, WeekTruncatedCursor}; 4 - use crate::{ConsumerInfo, Cursor, Nsid, NsidCount, UFOsRecord}; 4 + use crate::{ConsumerInfo, Cursor, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord}; 5 5 use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; 6 6 use chrono::{DateTime, Utc}; 7 7 use dropshot::endpoint; ··· 166 166 let min_time_ago = SystemTime::now() - Duration::from_secs(86_400 * 3); // we want at least 3 days of data 167 167 let since: WeekTruncatedCursor = Cursor::at(min_time_ago).into(); 168 168 let (collections, _) = storage 169 - .get_all_collections(1000, None, Some(since.try_as().unwrap()), None) 169 + .get_collections( 170 + 1000, 171 + Default::default(), 172 + Some(since.try_as().unwrap()), 173 + None, 174 + ) 170 175 .await 171 176 .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 172 177 collections ··· 240 245 cursor: Option<String>, 241 246 } 242 247 #[derive(Debug, Deserialize, JsonSchema)] 243 - struct AllCollectionsQuery { 248 + #[serde(rename_all = "kebab-case")] 249 + pub enum CollectionsQueryOrder { 250 + RecordsCreated, 251 + DidsEstimate, 252 + } 253 + impl From<&CollectionsQueryOrder> for OrderCollectionsBy { 254 + fn from(q: &CollectionsQueryOrder) -> Self { 255 + match q { 256 + CollectionsQueryOrder::RecordsCreated => OrderCollectionsBy::RecordsCreated, 257 + CollectionsQueryOrder::DidsEstimate => OrderCollectionsBy::DidsEstimate, 258 + } 259 + } 260 + } 261 + #[derive(Debug, Deserialize, JsonSchema)] 262 + struct CollectionsQuery { 244 263 /// The maximum number of collections to return in one request. 245 264 /// 246 - /// Default: 100 247 - #[schemars(range(min = 1, max = 200), default = "all_collections_default_limit")] 248 - #[serde(default = "all_collections_default_limit")] 249 - limit: usize, 265 + /// Default: `100` normally, `32` if `order` is specified. 266 + #[schemars(range(min = 1, max = 200))] 267 + limit: Option<usize>, 268 + /// Get a paginated response with more collections. 269 + /// 250 270 /// Always omit the cursor for the first request. If more collections than the limit are available, the response will contain a non-null `cursor` to include with the next request. 271 + /// 272 + /// `cursor` is mutually exclusive with `order`. 251 273 cursor: Option<String>, 252 274 /// Limit collections and statistics to those seen after this UTC datetime 253 275 since: Option<DateTime<Utc>>, 254 276 /// Limit collections and statistics to those seen before this UTC datetime 255 277 until: Option<DateTime<Utc>>, 256 - } 257 - fn all_collections_default_limit() -> usize { 258 - 100 278 + /// Get a limited, sorted list 279 + /// 280 + /// Mutually exclusive with `cursor` -- sorted results cannot be paged. 281 + order: Option<CollectionsQueryOrder>, 259 282 } 260 283 #[endpoint { 261 284 method = GET, 262 - path = "/collections/all" 285 + path = "/collections" 263 286 }] 264 - /// Get all collections 287 + /// Get a list of collection NSIDs with statistics 288 + /// 289 + /// ## To fetch a full list: 265 290 /// 266 - /// There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests with `cursor`s to get them all. 291 + /// Omit the `order` parameter and page through the results using the `cursor`. There have been a lot of collections seen in the ATmosphere, well over 400 at time of writing, so you *will* need to make a series of paginaged requests with `cursor`s to get them all. 267 292 /// 268 293 /// The set of collections across multiple requests is not guaranteed to be a perfectly consistent snapshot: 269 294 /// ··· 275 300 /// 276 301 /// In practice this is close enough for most use-cases to not worry about. 277 302 /// 278 - /// Statistics are bucketed hourly, so the most granular effecitve time boundary for `since` and `until` is one hour. 279 - async fn get_all_collections( 303 + /// ## To fetch the top collection NSIDs: 304 + /// 305 + /// Specify the `order` parameter (must be either `records-created` or `did-estimate`). Note that ordered results cannot be paged. 306 + /// 307 + /// All statistics are bucketed hourly, so the most granular effecitve time boundary for `since` and `until` is one hour. 308 + async fn get_collections( 280 309 ctx: RequestContext<Context>, 281 - query: Query<AllCollectionsQuery>, 310 + query: Query<CollectionsQuery>, 282 311 ) -> OkCorsResponse<CollectionsResponse> { 283 312 let Context { storage, .. } = ctx.context(); 284 313 let q = query.into_inner(); 285 314 286 - if !(1..=200).contains(&q.limit) { 287 - let msg = format!("limit not in 1..=200: {}", q.limit); 288 - return Err(HttpError::for_bad_request(None, msg)); 315 + if q.cursor.is_some() && q.order.is_some() { 316 + let msg = "`cursor` is mutually exclusive with `order`. ordered results cannot be paged."; 317 + return Err(HttpError::for_bad_request(None, msg.to_string())); 289 318 } 290 319 291 - let cursor = q 292 - .cursor 293 - .and_then(|c| if c.is_empty() { None } else { Some(c) }) 294 - .map(|c| URL_SAFE_NO_PAD.decode(&c)) 295 - .transpose() 296 - .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 320 + let order = if let Some(ref o) = q.order { 321 + o.into() 322 + } else { 323 + let cursor = q 324 + .cursor 325 + .and_then(|c| if c.is_empty() { None } else { Some(c) }) 326 + .map(|c| URL_SAFE_NO_PAD.decode(&c)) 327 + .transpose() 328 + .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 329 + OrderCollectionsBy::Lexi { cursor } 330 + }; 331 + 332 + let limit = match (q.limit, q.order) { 333 + (Some(limit), _) => limit, 334 + (None, Some(_)) => 32, 335 + (None, None) => 100, 336 + }; 337 + 338 + if !(1..=200).contains(&limit) { 339 + let msg = format!("limit not in 1..=200: {}", limit); 340 + return Err(HttpError::for_bad_request(None, msg)); 341 + } 297 342 298 343 let since = q.since.map(dt_to_cursor).transpose()?; 299 344 let until = q.until.map(dt_to_cursor).transpose()?; 300 345 301 346 let (collections, next_cursor) = storage 302 - .get_all_collections(q.limit, cursor, since, until) 347 + .get_collections(limit, order, since, until) 303 348 .await 304 349 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 305 350 ··· 311 356 }) 312 357 } 313 358 359 + #[derive(Debug, Deserialize, JsonSchema)] 360 + struct TopByQuery { 361 + /// The maximum number of collections to return in one request. 362 + /// 363 + /// Default: 32 364 + #[schemars(range(min = 1, max = 200), default = "top_collections_default_limit")] 365 + #[serde(default = "top_collections_default_limit")] 366 + limit: usize, 367 + /// Limit collections and statistics to those seen after this UTC datetime 368 + since: Option<DateTime<Utc>>, 369 + /// Limit collections and statistics to those seen before this UTC datetime 370 + until: Option<DateTime<Utc>>, 371 + } 372 + fn top_collections_default_limit() -> usize { 373 + 32 374 + } 375 + 314 376 /// Get top collections by record count 315 377 #[endpoint { 316 378 method = GET, ··· 318 380 }] 319 381 async fn get_top_collections_by_count( 320 382 ctx: RequestContext<Context>, 383 + query: Query<TopByQuery>, 321 384 ) -> OkCorsResponse<Vec<NsidCount>> { 322 385 let Context { storage, .. } = ctx.context(); 386 + let q = query.into_inner(); 387 + 388 + if !(1..=200).contains(&q.limit) { 389 + let msg = format!("limit not in 1..=200: {}", q.limit); 390 + return Err(HttpError::for_bad_request(None, msg)); 391 + } 392 + 393 + let since = q.since.map(dt_to_cursor).transpose()?; 394 + let until = q.until.map(dt_to_cursor).transpose()?; 395 + 323 396 let collections = storage 324 - .get_top_collections_by_count(100, None, None) 397 + .get_top_collections_by_count(100, since, until) 325 398 .await 326 399 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 327 400 ··· 335 408 }] 336 409 async fn get_top_collections_by_dids( 337 410 ctx: RequestContext<Context>, 411 + query: Query<TopByQuery>, 338 412 ) -> OkCorsResponse<Vec<NsidCount>> { 339 413 let Context { storage, .. } = ctx.context(); 414 + let q = query.into_inner(); 415 + 416 + if !(1..=200).contains(&q.limit) { 417 + let msg = format!("limit not in 1..=200: {}", q.limit); 418 + return Err(HttpError::for_bad_request(None, msg)); 419 + } 420 + 421 + let since = q.since.map(dt_to_cursor).transpose()?; 422 + let until = q.until.map(dt_to_cursor).transpose()?; 423 + 340 424 let collections = storage 341 - .get_top_collections_by_dids(100, None, None) 425 + .get_top_collections_by_dids(100, since, until) 342 426 .await 343 427 .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 344 428 ··· 359 443 api.register(get_meta_info).unwrap(); 360 444 api.register(get_records_by_collections).unwrap(); 361 445 api.register(get_records_total_seen).unwrap(); 362 - api.register(get_all_collections).unwrap(); 446 + api.register(get_collections).unwrap(); 363 447 api.register(get_top_collections_by_count).unwrap(); 364 448 api.register(get_top_collections_by_dids).unwrap(); 365 449
+6 -3
ufos/src/storage.rs
··· 1 1 use crate::store_types::{HourTruncatedCursor, SketchSecretPrefix}; 2 - use crate::{error::StorageError, ConsumerInfo, Cursor, EventBatch, NsidCount, UFOsRecord}; 2 + use crate::{ 3 + error::StorageError, ConsumerInfo, Cursor, EventBatch, NsidCount, OrderCollectionsBy, 4 + UFOsRecord, 5 + }; 3 6 use async_trait::async_trait; 4 7 use jetstream::exports::{Did, Nsid}; 5 8 use std::collections::HashSet; ··· 73 76 74 77 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 75 78 76 - async fn get_all_collections( 79 + async fn get_collections( 77 80 &self, 78 81 limit: usize, 79 - cursor: Option<Vec<u8>>, 82 + order: OrderCollectionsBy, 80 83 since: Option<HourTruncatedCursor>, 81 84 until: Option<HourTruncatedCursor>, 82 85 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>;
+12 -6
ufos/src/storage_fjall.rs
··· 11 11 SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor, 12 12 WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, 13 13 }; 14 - use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, UFOsRecord}; 14 + use crate::{ 15 + CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord, 16 + }; 15 17 use async_trait::async_trait; 16 18 use fjall::Snapshot; 17 19 use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle}; ··· 385 387 Ok(cursor) 386 388 } 387 389 388 - fn get_all_collections( 390 + fn get_collections( 389 391 &self, 390 392 limit: usize, 391 - cursor: Option<Vec<u8>>, 393 + order: OrderCollectionsBy, 392 394 since: Option<HourTruncatedCursor>, 393 395 until: Option<HourTruncatedCursor>, 394 396 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { ··· 405 407 } 406 408 let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into()); 407 409 CursorBucket::buckets_spanning(lower, upper) 410 + }; 411 + 412 + let OrderCollectionsBy::Lexi { cursor } = order else { 413 + todo!() 408 414 }; 409 415 410 416 let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?; // TODO: bubble a *client* error type ··· 670 676 let s = self.clone(); 671 677 tokio::task::spawn_blocking(move || FjallReader::get_consumer_info(&s)).await? 672 678 } 673 - async fn get_all_collections( 679 + async fn get_collections( 674 680 &self, 675 681 limit: usize, 676 - cursor: Option<Vec<u8>>, 682 + order: OrderCollectionsBy, 677 683 since: Option<HourTruncatedCursor>, 678 684 until: Option<HourTruncatedCursor>, 679 685 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> { 680 686 let s = self.clone(); 681 687 tokio::task::spawn_blocking(move || { 682 - FjallReader::get_all_collections(&s, limit, cursor, since, until) 688 + FjallReader::get_collections(&s, limit, order, since, until) 683 689 }) 684 690 .await? 685 691 }
+5 -3
ufos/src/storage_mem.rs
··· 12 12 RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretPrefix, TakeoffKey, 13 13 TakeoffValue, WeekTruncatedCursor, WeeklyRollupKey, 14 14 }; 15 - use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, UFOsRecord}; 15 + use crate::{ 16 + CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord, 17 + }; 16 18 use async_trait::async_trait; 17 19 use jetstream::events::Cursor; 18 20 use lsm_tree::range::prefix_to_range; ··· 545 547 let s = self.clone(); 546 548 tokio::task::spawn_blocking(move || MemReader::get_consumer_info(&s)).await? 547 549 } 548 - async fn get_all_collections( 550 + async fn get_collections( 549 551 &self, 550 552 _: usize, 551 - _: Option<Vec<u8>>, 553 + _: OrderCollectionsBy, 552 554 _: Option<HourTruncatedCursor>, 553 555 _: Option<HourTruncatedCursor>, 554 556 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
+6
ufos/src/store_types.rs
··· 444 444 pub fn next(&self) -> Self { 445 445 Self(self.0 + MOD) 446 446 } 447 + pub fn prev(&self) -> Self { 448 + if self.0 < MOD { 449 + panic!("underflow: previous truncation start would be less than zero"); 450 + } 451 + Self(self.0 - MOD) 452 + } 447 453 } 448 454 impl<const MOD: u64> From<TruncatedCursor<MOD>> for Cursor { 449 455 fn from(truncated: TruncatedCursor<MOD>) -> Self {