Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

basic server metrics

phil 9c2091db 530ecc8a

+282 -187
+282 -187
ufos/src/server/mod.rs
··· 19 19 use dropshot::ConfigLogging; 20 20 use dropshot::ConfigLoggingLevel; 21 21 use dropshot::HttpError; 22 + use dropshot::HttpResponse; 22 23 use dropshot::Query; 23 24 use dropshot::RequestContext; 24 25 use dropshot::ServerBuilder; 25 - 26 - use http::{Response, StatusCode}; 26 + use dropshot::ServerContext; 27 + use http::{ 28 + header::{ORIGIN, USER_AGENT}, 29 + Response, StatusCode, 30 + }; 31 + use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 27 32 use schemars::JsonSchema; 28 33 use serde::{Deserialize, Serialize}; 29 34 use std::collections::{HashMap, HashSet}; 35 + use std::future::Future; 30 36 use std::sync::Arc; 37 + use std::time::Instant; 31 38 use std::time::{Duration, SystemTime, UNIX_EPOCH}; 32 39 40 + fn describe_metrics() { 41 + describe_counter!( 42 + "server_requests_total", 43 + Unit::Count, 44 + "total requests handled" 45 + ); 46 + describe_histogram!( 47 + "server_handler_latency", 48 + Unit::Microseconds, 49 + "time to respond to a request in microseconds, excluding dropshot overhead" 50 + ); 51 + } 52 + 53 + async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError> 54 + where 55 + R: HttpResponse, 56 + H: Future<Output = Result<R, HttpError>>, 57 + T: ServerContext, 58 + { 59 + let start = Instant::now(); 60 + let result = handler.await; 61 + let latency = start.elapsed(); 62 + let status_code = match &result { 63 + Ok(response) => response.status_code(), 64 + Err(ref e) => e.status_code.as_status(), 65 + } 66 + .to_string(); 67 + let endpoint = ctx.endpoint.operation_id.clone(); 68 + let headers = ctx.request.headers(); 69 + let origin = headers 70 + .get(ORIGIN) 71 + .and_then(|v| v.to_str().ok()) 72 + .unwrap_or("") 73 + .to_string(); 74 + let ua = headers 75 + .get(USER_AGENT) 76 + .and_then(|v| v.to_str().ok()) 77 + .map(|ua| { 78 + if ua.starts_with("Mozilla/5.0 ") { 79 + "browser" 80 + } else { 81 + ua 82 + } 83 + }) 84 + .unwrap_or("") 85 + .to_string(); 86 + counter!("server_requests_total", 87 + "endpoint" => endpoint.clone(), 88 + "origin" => origin, 89 + "ua" => ua, 90 + "status_code" => status_code, 91 + ) 92 + .increment(1); 93 + histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64); 94 + result 95 + } 96 + 33 97 struct Context { 34 98 pub spec: Arc<serde_json::Value>, 35 99 storage: Box<dyn StoreReader>, ··· 63 127 */ 64 128 unpublished = true, 65 129 }] 66 - async fn index(_ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> { 67 - Ok(Response::builder() 68 - .status(StatusCode::OK) 69 - .header(http::header::CONTENT_TYPE, "text/html") 70 - .body(INDEX_HTML.into())?) 130 + async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> { 131 + instrument_handler(&ctx, async { 132 + Ok(Response::builder() 133 + .status(StatusCode::OK) 134 + .header(http::header::CONTENT_TYPE, "text/html") 135 + .body(INDEX_HTML.into())?) 136 + }) 137 + .await 71 138 } 72 139 73 140 /// Meta: get the openapi spec for this api ··· 80 147 unpublished = true, 81 148 }] 82 149 async fn get_openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> { 83 - let spec = (*ctx.context().spec).clone(); 84 - OkCors(spec).into() 150 + instrument_handler(&ctx, async { 151 + let spec = (*ctx.context().spec).clone(); 152 + OkCors(spec).into() 153 + }) 154 + .await 85 155 } 86 156 87 157 #[derive(Debug, Serialize, JsonSchema)] ··· 100 170 let failed_to_get = 101 171 |what| move |e| HttpError::for_internal_error(format!("failed to get {what}: {e:?}")); 102 172 103 - let storage_info = storage 104 - .get_storage_stats() 105 - .await 106 - .map_err(failed_to_get("storage info"))?; 173 + instrument_handler(&ctx, async { 174 + let storage_info = storage 175 + .get_storage_stats() 176 + .await 177 + .map_err(failed_to_get("storage info"))?; 107 178 108 - let consumer = storage 109 - .get_consumer_info() 110 - .await 111 - .map_err(failed_to_get("consumer info"))?; 179 + let consumer = storage 180 + .get_consumer_info() 181 + .await 182 + .map_err(failed_to_get("consumer info"))?; 112 183 113 - OkCors(MetaInfo { 114 - storage_name: storage.name(), 115 - storage: storage_info, 116 - consumer, 184 + OkCors(MetaInfo { 185 + storage_name: storage.name(), 186 + storage: storage_info, 187 + consumer, 188 + }) 189 + .into() 117 190 }) 118 - .into() 191 + .await 119 192 } 120 193 121 194 // TODO: replace with normal (🙃) multi-qs value somehow ··· 168 241 collection_query: Query<RecordsCollectionsQuery>, 169 242 ) -> OkCorsResponse<Vec<ApiRecord>> { 170 243 let Context { storage, .. } = ctx.context(); 171 - let mut limit = 42; 172 - let query = collection_query.into_inner(); 173 - let collections = if let Some(provided_collection) = query.collection { 174 - to_multiple_nsids(&provided_collection) 175 - .map_err(|reason| HttpError::for_bad_request(None, reason))? 176 - } else { 177 - limit = 12; 178 - let min_time_ago = SystemTime::now() - Duration::from_secs(86_400 * 3); // we want at least 3 days of data 179 - let since: WeekTruncatedCursor = Cursor::at(min_time_ago).into(); 180 - let (collections, _) = storage 181 - .get_collections( 182 - 1000, 183 - Default::default(), 184 - Some(since.try_as().unwrap()), 185 - None, 186 - ) 244 + instrument_handler(&ctx, async { 245 + let mut limit = 42; 246 + let query = collection_query.into_inner(); 247 + let collections = if let Some(provided_collection) = query.collection { 248 + to_multiple_nsids(&provided_collection) 249 + .map_err(|reason| HttpError::for_bad_request(None, reason))? 250 + } else { 251 + limit = 12; 252 + let min_time_ago = SystemTime::now() - Duration::from_secs(86_400 * 3); // we want at least 3 days of data 253 + let since: WeekTruncatedCursor = Cursor::at(min_time_ago).into(); 254 + let (collections, _) = storage 255 + .get_collections( 256 + 1000, 257 + Default::default(), 258 + Some(since.try_as().unwrap()), 259 + None, 260 + ) 261 + .await 262 + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 263 + collections 264 + .into_iter() 265 + .map(|c| Nsid::new(c.nsid).unwrap()) 266 + .collect() 267 + }; 268 + 269 + let records = storage 270 + .get_records_by_collections(collections, limit, true) 187 271 .await 188 - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 189 - collections 272 + .map_err(|e| HttpError::for_internal_error(e.to_string()))? 190 273 .into_iter() 191 - .map(|c| Nsid::new(c.nsid).unwrap()) 192 - .collect() 193 - }; 274 + .map(|r| r.into()) 275 + .collect(); 194 276 195 - let records = storage 196 - .get_records_by_collections(collections, limit, true) 197 - .await 198 - .map_err(|e| HttpError::for_internal_error(e.to_string()))? 199 - .into_iter() 200 - .map(|r| r.into()) 201 - .collect(); 202 - 203 - OkCors(records).into() 277 + OkCors(records).into() 278 + }) 279 + .await 204 280 } 205 281 206 282 #[derive(Debug, Deserialize, JsonSchema)] ··· 232 308 query: Query<CollectionsStatsQuery>, 233 309 ) -> OkCorsResponse<HashMap<String, JustCount>> { 234 310 let Context { storage, .. } = ctx.context(); 235 - let q = query.into_inner(); 236 - let collections: HashSet<Nsid> = collections_query.try_into()?; 237 311 238 - let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| { 239 - let week_ago_secs = 7 * 86_400; 240 - let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs); 241 - Cursor::at(week_ago).into() 242 - }); 312 + instrument_handler(&ctx, async { 313 + let q = query.into_inner(); 314 + let collections: HashSet<Nsid> = collections_query.try_into()?; 243 315 244 - let until = q.until.map(dt_to_cursor).transpose()?; 316 + let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| { 317 + let week_ago_secs = 7 * 86_400; 318 + let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs); 319 + Cursor::at(week_ago).into() 320 + }); 245 321 246 - let mut seen_by_collection = HashMap::with_capacity(collections.len()); 322 + let until = q.until.map(dt_to_cursor).transpose()?; 247 323 248 - for collection in &collections { 249 - let counts = storage 250 - .get_collection_counts(collection, since, until) 251 - .await 252 - .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 324 + let mut seen_by_collection = HashMap::with_capacity(collections.len()); 325 + 326 + for collection in &collections { 327 + let counts = storage 328 + .get_collection_counts(collection, since, until) 329 + .await 330 + .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; 253 331 254 - seen_by_collection.insert(collection.to_string(), counts); 255 - } 332 + seen_by_collection.insert(collection.to_string(), counts); 333 + } 256 334 257 - OkCors(seen_by_collection).into() 335 + OkCors(seen_by_collection).into() 336 + }) 337 + .await 258 338 } 259 339 260 340 #[derive(Debug, Serialize, JsonSchema)] ··· 337 417 let Context { storage, .. } = ctx.context(); 338 418 let q = query.into_inner(); 339 419 340 - if q.cursor.is_some() && q.order.is_some() { 341 - let msg = "`cursor` is mutually exclusive with `order`. ordered results cannot be paged."; 342 - return Err(HttpError::for_bad_request(None, msg.to_string())); 343 - } 420 + instrument_handler(&ctx, async { 421 + if q.cursor.is_some() && q.order.is_some() { 422 + let msg = 423 + "`cursor` is mutually exclusive with `order`. ordered results cannot be paged."; 424 + return Err(HttpError::for_bad_request(None, msg.to_string())); 425 + } 344 426 345 - let order = if let Some(ref o) = q.order { 346 - o.into() 347 - } else { 348 - let cursor = q 349 - .cursor 350 - .and_then(|c| if c.is_empty() { None } else { Some(c) }) 351 - .map(|c| URL_SAFE_NO_PAD.decode(&c)) 352 - .transpose() 353 - .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 354 - OrderCollectionsBy::Lexi { cursor } 355 - }; 427 + let order = if let Some(ref o) = q.order { 428 + o.into() 429 + } else { 430 + let cursor = q 431 + .cursor 432 + .and_then(|c| if c.is_empty() { None } else { Some(c) }) 433 + .map(|c| URL_SAFE_NO_PAD.decode(&c)) 434 + .transpose() 435 + .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 436 + OrderCollectionsBy::Lexi { cursor } 437 + }; 356 438 357 - let limit = match (q.limit, q.order) { 358 - (Some(limit), _) => limit, 359 - (None, Some(_)) => 32, 360 - (None, None) => 100, 361 - }; 439 + let limit = match (q.limit, q.order) { 440 + (Some(limit), _) => limit, 441 + (None, Some(_)) => 32, 442 + (None, None) => 100, 443 + }; 362 444 363 - if !(1..=200).contains(&limit) { 364 - let msg = format!("limit not in 1..=200: {}", limit); 365 - return Err(HttpError::for_bad_request(None, msg)); 366 - } 445 + if !(1..=200).contains(&limit) { 446 + let msg = format!("limit not in 1..=200: {}", limit); 447 + return Err(HttpError::for_bad_request(None, msg)); 448 + } 367 449 368 - let since = q.since.map(dt_to_cursor).transpose()?; 369 - let until = q.until.map(dt_to_cursor).transpose()?; 450 + let since = q.since.map(dt_to_cursor).transpose()?; 451 + let until = q.until.map(dt_to_cursor).transpose()?; 370 452 371 - let (collections, next_cursor) = storage 372 - .get_collections(limit, order, since, until) 373 - .await 374 - .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 453 + let (collections, next_cursor) = storage 454 + .get_collections(limit, order, since, until) 455 + .await 456 + .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 375 457 376 - let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c)); 458 + let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c)); 377 459 378 - OkCors(CollectionsResponse { 379 - collections, 380 - cursor: next_cursor, 460 + OkCors(CollectionsResponse { 461 + collections, 462 + cursor: next_cursor, 463 + }) 464 + .into() 381 465 }) 382 - .into() 466 + .await 383 467 } 384 468 385 469 #[derive(Debug, Serialize, JsonSchema)] ··· 459 543 let Context { storage, .. } = ctx.context(); 460 544 let q = query.into_inner(); 461 545 462 - let prefix = NsidPrefix::new(&q.prefix).map_err(|e| { 463 - HttpError::for_bad_request( 464 - None, 465 - format!("{:?} was not a valid NSID prefix: {e:?}", q.prefix), 466 - ) 467 - })?; 546 + instrument_handler(&ctx, async { 547 + let prefix = NsidPrefix::new(&q.prefix).map_err(|e| { 548 + HttpError::for_bad_request( 549 + None, 550 + format!("{:?} was not a valid NSID prefix: {e:?}", q.prefix), 551 + ) 552 + })?; 468 553 469 - if q.cursor.is_some() && q.order.is_some() { 470 - let msg = "`cursor` is mutually exclusive with `order`. ordered results cannot be paged."; 471 - return Err(HttpError::for_bad_request(None, msg.to_string())); 472 - } 554 + if q.cursor.is_some() && q.order.is_some() { 555 + let msg = 556 + "`cursor` is mutually exclusive with `order`. ordered results cannot be paged."; 557 + return Err(HttpError::for_bad_request(None, msg.to_string())); 558 + } 473 559 474 - let order = if let Some(ref o) = q.order { 475 - o.into() 476 - } else { 477 - let cursor = q 478 - .cursor 479 - .and_then(|c| if c.is_empty() { None } else { Some(c) }) 480 - .map(|c| URL_SAFE_NO_PAD.decode(&c)) 481 - .transpose() 482 - .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 483 - OrderCollectionsBy::Lexi { cursor } 484 - }; 560 + let order = if let Some(ref o) = q.order { 561 + o.into() 562 + } else { 563 + let cursor = q 564 + .cursor 565 + .and_then(|c| if c.is_empty() { None } else { Some(c) }) 566 + .map(|c| URL_SAFE_NO_PAD.decode(&c)) 567 + .transpose() 568 + .map_err(|e| HttpError::for_bad_request(None, format!("invalid cursor: {e:?}")))?; 569 + OrderCollectionsBy::Lexi { cursor } 570 + }; 485 571 486 - let limit = match (q.limit, q.order) { 487 - (Some(limit), _) => limit, 488 - (None, Some(_)) => 32, 489 - (None, None) => 100, 490 - }; 572 + let limit = match (q.limit, q.order) { 573 + (Some(limit), _) => limit, 574 + (None, Some(_)) => 32, 575 + (None, None) => 100, 576 + }; 491 577 492 - if !(1..=200).contains(&limit) { 493 - let msg = format!("limit not in 1..=200: {}", limit); 494 - return Err(HttpError::for_bad_request(None, msg)); 495 - } 578 + if !(1..=200).contains(&limit) { 579 + let msg = format!("limit not in 1..=200: {}", limit); 580 + return Err(HttpError::for_bad_request(None, msg)); 581 + } 496 582 497 - let since = q.since.map(dt_to_cursor).transpose()?; 498 - let until = q.until.map(dt_to_cursor).transpose()?; 583 + let since = q.since.map(dt_to_cursor).transpose()?; 584 + let until = q.until.map(dt_to_cursor).transpose()?; 499 585 500 - let (total, children, next_cursor) = storage 501 - .get_prefix(prefix, limit, order, since, until) 502 - .await 503 - .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 586 + let (total, children, next_cursor) = storage 587 + .get_prefix(prefix, limit, order, since, until) 588 + .await 589 + .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 504 590 505 - let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c)); 591 + let next_cursor = next_cursor.map(|c| URL_SAFE_NO_PAD.encode(c)); 506 592 507 - OkCors(PrefixResponse { 508 - total, 509 - children, 510 - cursor: next_cursor, 593 + OkCors(PrefixResponse { 594 + total, 595 + children, 596 + cursor: next_cursor, 597 + }) 598 + .into() 511 599 }) 512 - .into() 600 + .await 513 601 } 514 602 515 603 #[derive(Debug, Deserialize, JsonSchema)] ··· 549 637 let Context { storage, .. } = ctx.context(); 550 638 let q = query.into_inner(); 551 639 552 - let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| { 553 - let week_ago_secs = 7 * 86_400; 554 - let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs); 555 - Cursor::at(week_ago).into() 556 - }); 640 + instrument_handler(&ctx, async { 641 + let since = q.since.map(dt_to_cursor).transpose()?.unwrap_or_else(|| { 642 + let week_ago_secs = 7 * 86_400; 643 + let week_ago = SystemTime::now() - Duration::from_secs(week_ago_secs); 644 + Cursor::at(week_ago).into() 645 + }); 557 646 558 - let until = q.until.map(dt_to_cursor).transpose()?; 647 + let until = q.until.map(dt_to_cursor).transpose()?; 559 648 560 - let step = if let Some(secs) = q.step { 561 - if secs < 3600 { 562 - let msg = format!("step is too small: {}", secs); 563 - Err(HttpError::for_bad_request(None, msg))?; 564 - } 565 - (secs / 3600) * 3600 // trucate to hour 566 - } else { 567 - 86_400 568 - }; 649 + let step = if let Some(secs) = q.step { 650 + if secs < 3600 { 651 + let msg = format!("step is too small: {}", secs); 652 + Err(HttpError::for_bad_request(None, msg))?; 653 + } 654 + (secs / 3600) * 3600 // trucate to hour 655 + } else { 656 + 86_400 657 + }; 569 658 570 - let nsid = Nsid::new(q.collection).map_err(|e| { 571 - HttpError::for_bad_request(None, format!("collection was not a valid NSID: {:?}", e)) 572 - })?; 659 + let nsid = Nsid::new(q.collection).map_err(|e| { 660 + HttpError::for_bad_request(None, format!("collection was not a valid NSID: {:?}", e)) 661 + })?; 573 662 574 - let (range_cursors, series) = storage 575 - .get_timeseries(vec![nsid], since, until, step) 576 - .await 577 - .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 663 + let (range_cursors, series) = storage 664 + .get_timeseries(vec![nsid], since, until, step) 665 + .await 666 + .map_err(|e| HttpError::for_internal_error(format!("oh shoot: {e:?}")))?; 578 667 579 - let range = range_cursors 580 - .into_iter() 581 - .map(|c| DateTime::<Utc>::from_timestamp_micros(c.to_raw_u64() as i64).unwrap()) 582 - .collect(); 668 + let range = range_cursors 669 + .into_iter() 670 + .map(|c| DateTime::<Utc>::from_timestamp_micros(c.to_raw_u64() as i64).unwrap()) 671 + .collect(); 583 672 584 - let series = series 585 - .into_iter() 586 - .map(|(k, v)| (k.to_string(), v.iter().map(Into::into).collect())) 587 - .collect(); 673 + let series = series 674 + .into_iter() 675 + .map(|(k, v)| (k.to_string(), v.iter().map(Into::into).collect())) 676 + .collect(); 588 677 589 - OkCors(CollectionTimeseriesResponse { range, series }).into() 678 + OkCors(CollectionTimeseriesResponse { range, series }).into() 679 + }) 680 + .await 590 681 } 591 682 592 683 #[derive(Debug, Deserialize, JsonSchema)] ··· 611 702 ) -> OkCorsResponse<SearchResponse> { 612 703 let Context { storage, .. } = ctx.context(); 613 704 let q = query.into_inner(); 614 - // TODO: query validation 615 - // TODO: also handle multi-space stuff (ufos-app tries to on client) 616 - let terms: Vec<String> = q.q.split(' ').map(Into::into).collect(); 617 - let matches = storage 618 - .search_collections(terms) 619 - .await 620 - .map_err(|e| HttpError::for_internal_error(format!("oh ugh: {e:?}")))?; 621 - OkCors(SearchResponse { matches }).into() 705 + instrument_handler(&ctx, async { 706 + // TODO: query validation 707 + // TODO: also handle multi-space stuff (ufos-app tries to on client) 708 + let terms: Vec<String> = q.q.split(' ').map(Into::into).collect(); 709 + let matches = storage 710 + .search_collections(terms) 711 + .await 712 + .map_err(|e| HttpError::for_internal_error(format!("oh ugh: {e:?}")))?; 713 + OkCors(SearchResponse { matches }).into() 714 + }) 715 + .await 622 716 } 623 717 624 718 pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> { 719 + describe_metrics(); 625 720 let log = ConfigLogging::StderrTerminal { 626 - level: ConfigLoggingLevel::Info, 721 + level: ConfigLoggingLevel::Warn, 627 722 } 628 - .to_logger("hello-ufos") 723 + .to_logger("server") 629 724 .map_err(|e| e.to_string())?; 630 725 631 726 let mut api = ApiDescription::new();