Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

add some metrics

authored by

phil and committed by tangled.org ac6d48c1 bd5a9b12

+82 -33
+41 -25
slingshot/src/consumer.rs
··· 1 - use crate::CachedRecord; 2 1 use crate::error::ConsumerError; 2 + use crate::{CachedRecord, Identity, IdentityKey}; 3 3 use foyer::HybridCache; 4 4 use jetstream::{ 5 5 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, ··· 11 11 jetstream_endpoint: String, 12 12 cursor: Option<Cursor>, 13 13 no_zstd: bool, 14 + identity: Identity, 14 15 shutdown: CancellationToken, 15 16 cache: HybridCache<String, CachedRecord>, 16 17 ) -> Result<(), ConsumerError> { ··· 46 47 break; 47 48 }; 48 49 49 - if event.kind != EventKind::Commit { 50 - continue; 51 - } 52 - let Some(ref mut commit) = event.commit else { 53 - log::warn!("consumer: commit event missing commit data, ignoring"); 54 - continue; 55 - }; 50 + match event.kind { 51 + EventKind::Commit => { 52 + let Some(ref mut commit) = event.commit else { 53 + log::warn!("consumer: commit event missing commit data, ignoring"); 54 + continue; 55 + }; 56 56 57 - // TODO: something a bit more robust 58 - let at_uri = format!( 59 - "at://{}/{}/{}", 60 - &*event.did, &*commit.collection, &*commit.rkey 61 - ); 57 + // TODO: something a bit more robust 58 + let at_uri = format!( 59 + "at://{}/{}/{}", 60 + &*event.did, &*commit.collection, &*commit.rkey 61 + ); 62 62 63 - if commit.operation == CommitOp::Delete { 64 - cache.insert(at_uri, CachedRecord::Deleted); 65 - } else { 66 - let Some(record) = commit.record.take() else { 67 - log::warn!("consumer: commit insert or update missing record, ignoring"); 68 - continue; 69 - }; 70 - let Some(cid) = commit.cid.take() else { 71 - log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 - continue; 73 - }; 63 + if commit.operation == CommitOp::Delete { 64 + cache.insert(at_uri, CachedRecord::Deleted); 65 + } else { 66 + let Some(record) = commit.record.take() else { 67 + log::warn!("consumer: commit insert or update missing record, ignoring"); 68 + continue; 69 + }; 70 + let Some(cid) = commit.cid.take() else { 71 + log::warn!("consumer: commit insert or update missing CID, ignoring"); 72 + continue; 73 + }; 74 74 75 - cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 75 + cache.insert(at_uri, CachedRecord::Found((cid, record).into())); 76 + } 77 + } 78 + EventKind::Identity => { 79 + let Some(ident) = event.identity else { 80 + log::warn!("consumer: identity event missing identity data, ignoring"); 81 + continue; 82 + }; 83 + if let Some(handle) = ident.handle { 84 + metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1); 85 + identity.queue_refresh(IdentityKey::Handle(handle)).await; 86 + } 87 + metrics::counter!("identity_did_refresh_queued", "reason" => "identity event") 88 + .increment(1); 89 + identity.queue_refresh(IdentityKey::Did(ident.did)).await; 90 + } 91 + EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete) 76 92 } 77 93 } 78 94
+19 -5
slingshot/src/identity.rs
··· 38 38 const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 39 39 40 40 #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 41 - enum IdentityKey { 41 + pub enum IdentityKey { 42 42 Handle(Handle), 43 43 Did(Did), 44 44 } ··· 186 186 /// multi-producer *single consumer* queue 187 187 refresh_queue: Arc<Mutex<RefreshQueue>>, 188 188 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 189 - refresher: Arc<Mutex<()>>, 189 + refresher_task: Arc<Mutex<()>>, 190 190 } 191 191 192 192 impl Identity { ··· 225 225 did_resolver: Arc::new(did_resolver), 226 226 cache, 227 227 refresh_queue: Default::default(), 228 - refresher: Default::default(), 228 + refresher_task: Default::default(), 229 229 }) 230 230 } 231 231 ··· 293 293 } 294 294 IdentityData::NotFound => { 295 295 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 296 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 296 297 self.queue_refresh(key).await; 297 298 } 298 299 Ok(None) 299 300 } 300 301 IdentityData::Did(did) => { 301 302 if (now - *last_fetch) >= MIN_TTL { 303 + metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 302 304 self.queue_refresh(key).await; 303 305 } 304 306 Ok(Some(did.clone())) ··· 347 349 } 348 350 IdentityData::NotFound => { 349 351 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 352 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 350 353 self.queue_refresh(key).await; 351 354 } 352 355 Ok(None) 353 356 } 354 357 IdentityData::Doc(mini_did) => { 355 358 if (now - *last_fetch) >= MIN_TTL { 359 + metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 356 360 self.queue_refresh(key).await; 357 361 } 358 362 Ok(Some(mini_did.clone())) ··· 363 367 /// put a refresh task on the queue 364 368 /// 365 369 /// this can be safely called from multiple concurrent tasks 366 - async fn queue_refresh(&self, key: IdentityKey) { 370 + pub async fn queue_refresh(&self, key: IdentityKey) { 367 371 // todo: max queue size 368 372 let mut q = self.refresh_queue.lock().await; 369 373 if !q.items.contains(&key) { ··· 440 444 /// run the refresh queue consumer 441 445 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 442 446 let _guard = self 443 - .refresher 447 + .refresher_task 444 448 .try_lock() 445 449 .expect("there to only be one refresher running"); 446 450 loop { ··· 462 466 log::trace!("refreshing handle {handle:?}"); 463 467 match self.handle_resolver.resolve(handle).await { 464 468 Ok(did) => { 469 + metrics::counter!("identity_handle_refresh", "success" => "true") 470 + .increment(1); 465 471 self.cache.insert( 466 472 task_key.clone(), 467 473 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 468 474 ); 469 475 } 470 476 Err(atrium_identity::Error::NotFound) => { 477 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "not found").increment(1); 471 478 self.cache.insert( 472 479 task_key.clone(), 473 480 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 474 481 ); 475 482 } 476 483 Err(err) => { 484 + metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "other").increment(1); 477 485 log::warn!( 478 486 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 479 487 ); ··· 488 496 Ok(did_doc) => { 489 497 // TODO: fix in atrium: should verify id is did 490 498 if did_doc.id != did.to_string() { 499 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "wrong did").increment(1); 491 500 log::warn!( 492 501 "refreshed did doc failed: wrong did doc id. dropping refresh." 493 502 ); ··· 496 505 let mini_doc = match did_doc.try_into() { 497 506 Ok(md) => md, 498 507 Err(e) => { 508 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 499 509 log::warn!( 500 510 "converting mini doc failed: {e:?}. dropping refresh." 501 511 ); 502 512 continue; 503 513 } 504 514 }; 515 + metrics::counter!("identity_did_refresh", "success" => "true") 516 + .increment(1); 505 517 self.cache.insert( 506 518 task_key.clone(), 507 519 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 508 520 ); 509 521 } 510 522 Err(atrium_identity::Error::NotFound) => { 523 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "not found").increment(1); 511 524 self.cache.insert( 512 525 task_key.clone(), 513 526 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 514 527 ); 515 528 } 516 529 Err(err) => { 530 + metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "other").increment(1); 517 531 log::warn!( 518 532 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 519 533 );
+1 -1
slingshot/src/lib.rs
··· 9 9 pub use consumer::consume; 10 10 pub use firehose_cache::firehose_cache; 11 11 pub use healthcheck::healthcheck; 12 - pub use identity::Identity; 12 + pub use identity::{Identity, IdentityKey}; 13 13 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 14 pub use server::serve;
+4 -1
slingshot/src/main.rs
··· 154 154 155 155 let repo = Repo::new(identity.clone()); 156 156 157 + let identity_for_server = identity.clone(); 157 158 let server_shutdown = shutdown.clone(); 158 159 let server_cache_handle = cache.clone(); 159 160 let bind = args.bind; 160 161 tasks.spawn(async move { 161 162 serve( 162 163 server_cache_handle, 163 - identity, 164 + identity_for_server, 164 165 repo, 165 166 args.acme_domain, 166 167 args.acme_contact, ··· 173 174 Ok(()) 174 175 }); 175 176 177 + let identity_refreshable = identity.clone(); 176 178 let consumer_shutdown = shutdown.clone(); 177 179 let consumer_cache = cache.clone(); 178 180 tasks.spawn(async move { ··· 180 182 args.jetstream, 181 183 None, 182 184 args.jetstream_no_zstd, 185 + identity_refreshable, 183 186 consumer_shutdown, 184 187 consumer_cache, 185 188 )
+17 -1
slingshot/src/server.rs
··· 12 12 use tokio_util::sync::CancellationToken; 13 13 14 14 use poem::{ 15 - Endpoint, EndpointExt, Route, Server, 15 + Endpoint, EndpointExt, IntoResponse, Route, Server, 16 16 endpoint::{StaticFileEndpoint, make_sync}, 17 17 http::Method, 18 18 listener::{ ··· 772 772 .allow_credentials(false), 773 773 ) 774 774 .with(CatchPanic::new()) 775 + .around(request_counter) 775 776 .with(Tracing); 777 + 776 778 Server::new(listener) 777 779 .name("slingshot") 778 780 .run_with_graceful_shutdown(app, shutdown.cancelled(), None) ··· 780 782 .map_err(ServerError::ServerExited) 781 783 .inspect(|()| log::info!("server ended. goodbye.")) 782 784 } 785 + 786 + async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> { 787 + let t0 = std::time::Instant::now(); 788 + let method = req.method().to_string(); 789 + let path = req.uri().path().to_string(); 790 + let res = next.call(req).await?.into_response(); 791 + metrics::histogram!( 792 + "server_request", 793 + "endpoint" => format!("{method} {path}"), 794 + "status" => res.status().to_string(), 795 + ) 796 + .record(t0.elapsed()); 797 + Ok(res) 798 + }