Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

add some rocksdb metrics

finally!!!!!

phil 94fdd0f2 adb75378

+91 -26
+3 -3
constellation/readme.md
··· 147 147 - [ ] nginx: support http2 148 148 - [x] nginx metrics 149 149 - [ ] add TimeoutLayer for axum 150 - - [ ] rocksdb metrics 151 - - [ ] write ops (count? per actionable?) 152 - - [ ] write time hist 150 + - [~] rocksdb metrics 151 + - [x] write ops (count? per actionable?) 152 + - [x] write time hist 153 153 - [ ] read ops (api) 154 154 - [ ] expose internal stats? 155 155 - [ ] figure out what's the right thing to do if merge op fails. happened on startup after an unclean reboot.
+4
constellation/src/bin/main.rs
··· 1 1 use anyhow::Result; 2 2 use clap::{Parser, ValueEnum}; 3 3 use metrics_exporter_prometheus::PrometheusBuilder; 4 + use std::num::NonZero; 4 5 use std::path::PathBuf; 5 6 use std::sync::{atomic::AtomicU32, Arc}; 6 7 use std::thread; ··· 206 207 let host = [0, 0, 0, 0]; 207 208 let port = 8765; 208 209 PrometheusBuilder::new() 210 + .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 211 + .set_bucket_duration(time::Duration::from_secs(30))? 212 + .set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here. 209 213 .set_enable_unit_suffix(true) 210 214 .with_http_listener((host, port)) 211 215 .install()?;
+84 -23
constellation/src/storage/rocks_store.rs
··· 3 3 use anyhow::{bail, Result}; 4 4 use bincode::Options as BincodeOptions; 5 5 use links::CollectedLink; 6 + use metrics::{counter, describe_counter, describe_histogram, histogram, Unit}; 6 7 use rocksdb::{ 7 8 AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, MergeOperands, 8 9 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, ··· 16 17 atomic::{AtomicU64, Ordering}, 17 18 Arc, 18 19 }; 20 + use std::time::Instant; 19 21 20 22 static DID_IDS_CF: &str = "did_ids"; 21 23 static TARGET_IDS_CF: &str = "target_ids"; ··· 239 241 240 242 impl RocksStorage { 241 243 pub fn new(path: impl AsRef<Path>) -> Result<Self> { 244 + Self::describe_metrics(); 242 245 let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF); 243 246 let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF); 244 247 ··· 273 276 }) 274 277 } 275 278 279 + fn describe_metrics() { 280 + describe_histogram!( 281 + "storage_rocksdb_read_seconds", 282 + Unit::Seconds, 283 + "duration of the read stage of actions" 284 + ); 285 + describe_histogram!( 286 + "storage_rocksdb_action_seconds", 287 + Unit::Seconds, 288 + "duration of read + write of actions" 289 + ); 290 + describe_counter!( 291 + "storage_rocksdb_batch_ops_total", 292 + Unit::Count, 293 + "total batched operations from actions" 294 + ); 295 + describe_histogram!( 296 + "storage_rocksdb_delete_account_ops", 297 + Unit::Count, 298 + "total batched ops for account deletions" 299 + ); 300 + } 301 + 276 302 fn merge_op_extend_did_ids( 277 303 key: &[u8], 278 304 existing: Option<&[u8]>, ··· 522 548 Ok(()) 523 549 } 524 550 525 - fn update_links( 526 - &mut self, 527 - record_id: &RecordId, 528 - new_links: &[CollectedLink], 529 - batch: &mut WriteBatch, 530 - ) -> Result<()> { 531 - self.remove_links(record_id, batch)?; 532 - self.add_links(record_id, new_links, batch)?; 533 - Ok(()) 534 - } 535 - 536 551 fn set_account(&mut self, did: &Did, active: bool, batch: &mut WriteBatch) -> Result<()> { 537 552 // this needs to be read-modify-write since the did_id needs to stay the same, 538 553 // which has a benefit of allowing to avoid adding entries for dids we don't ··· 544 559 Ok(()) 545 560 } 546 561 547 - fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<()> { 562 + fn delete_account(&mut self, did: &Did, batch: &mut WriteBatch) -> Result<usize> { 563 + let mut total_batched_ops = 0; 548 564 let Some(DidIdValue(did_id, _)) = self.did_id_table.get_id_val(&self.db, did)? else { 549 - return Ok(()); // ignore updates for dids we don't know about 565 + return Ok(total_batched_ops); // ignore updates for dids we don't know about 550 566 }; 551 567 self.delete_did_id_value(batch, did); 552 568 // TODO: also delete the reverse!! ··· 572 588 })?; 573 589 } 574 590 } 591 + total_batched_ops += mini_batch.len(); 575 592 self.db.write(mini_batch)?; // todo 576 593 } 577 - Ok(()) 594 + Ok(total_batched_ops) 578 595 } 579 596 } 580 597 ··· 609 626 } 610 627 611 628 fn push(&mut self, event: &ActionableEvent, cursor: u64) -> Result<()> { 629 + // normal ops 612 630 let mut batch = WriteBatch::default(); 613 - match event { 631 + let t0 = Instant::now(); 632 + if let Some(action) = match event { 614 633 ActionableEvent::CreateLinks { record_id, links } => { 615 - self.add_links(record_id, links, &mut batch)? 634 + self.add_links(record_id, links, &mut batch)?; 635 + Some("create_links") 616 636 } 617 637 ActionableEvent::UpdateLinks { 618 638 record_id, 619 639 new_links, 620 - } => self.update_links(record_id, new_links, &mut batch)?, 621 - ActionableEvent::DeleteRecord(record_id) => self.remove_links(record_id, &mut batch)?, 622 - ActionableEvent::ActivateAccount(did) => self.set_account(did, true, &mut batch)?, 623 - ActionableEvent::DeactivateAccount(did) => self.set_account(did, false, &mut batch)?, 624 - ActionableEvent::DeleteAccount(did) => self.delete_account(did, &mut batch)?, 640 + } => { 641 + self.remove_links(record_id, &mut batch)?; 642 + self.add_links(record_id, new_links, &mut batch)?; 643 + Some("update_links") 644 + } 645 + ActionableEvent::DeleteRecord(record_id) => { 646 + self.remove_links(record_id, &mut batch)?; 647 + Some("delete_record") 648 + } 649 + ActionableEvent::ActivateAccount(did) => { 650 + self.set_account(did, true, &mut batch)?; 651 + Some("set_account_status") 652 + } 653 + ActionableEvent::DeactivateAccount(did) => { 654 + self.set_account(did, false, &mut batch)?; 655 + Some("set_account_status") 656 + } 657 + ActionableEvent::DeleteAccount(_) => None, // delete account is handled specially 658 + } { 659 + let t_read = t0.elapsed(); 660 + batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor)); 661 + let batch_ops = batch.len(); 662 + self.db.write(batch)?; 663 + let t_total = t0.elapsed(); 664 + 665 + histogram!("storage_rocksdb_read_seconds", "action" => action) 666 + .record(t_read.as_secs_f64()); 667 + histogram!("storage_rocksdb_action_seconds", "action" => action) 668 + .record(t_total.as_secs_f64()); 669 + counter!("storage_rocksdb_batch_ops_total", "action" => action) 670 + .increment(batch_ops as u64); 625 671 } 626 - batch.put(JETSTREAM_CURSOR_KEY.as_bytes(), _rv(cursor)); 627 - self.db.write(batch)?; 672 + 673 + // special metrics for account deletion which can be arbitrarily expensive 674 + let mut outer_batch = WriteBatch::default(); 675 + let t0 = Instant::now(); 676 + if let ActionableEvent::DeleteAccount(did) = event { 677 + let inner_batch_ops = self.delete_account(did, &mut outer_batch)?; 678 + let total_batch_ops = inner_batch_ops + outer_batch.len(); 679 + self.db.write(outer_batch)?; 680 + let t_total = t0.elapsed(); 681 + 682 + histogram!("storage_rocksdb_action_seconds", "action" => "delete_account") 683 + .record(t_total.as_secs_f64()); 684 + counter!("storage_rocksdb_batch_ops_total", "action" => "delete_account") 685 + .increment(total_batch_ops as u64); 686 + histogram!("storage_rocksdb_delete_account_ops").record(total_batch_ops as f64); 687 + } 688 + 628 689 Ok(()) 629 690 } 630 691