aturi indexer with listRecords and countRecords endpoints
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

stats reporting

dawn 685fb1dd 7d1ad7f3

+29 -3
+2
Cargo.toml
··· 18 18 19 19 [target.'cfg(not(target_env = "msvc"))'.dependencies] 20 20 tikv-jemallocator = "0.6" 21 + 22 +
+27 -3
src/main.rs
··· 7 7 use tracing::{info, error, warn}; 8 8 use tapped::{TapClient, Event, RecordAction, RecordEvent}; 9 9 use fjall::{Database, Keyspace, KeyspaceCreateOptions}; 10 + use std::sync::{Arc, atomic::{AtomicU64, Ordering}}; 11 + 10 12 11 13 #[cfg(not(target_env = "msvc"))] 12 14 #[global_allocator] ··· 31 33 counts: counts.clone(), 32 34 }; 33 35 36 + let ops_count = Arc::new(AtomicU64::new(0)); 37 + 34 38 // start tap consumer 35 39 let db_clone = state.db.clone(); 36 40 let counts_clone = state.counts.clone(); 41 + let ops_count_clone = ops_count.clone(); 37 42 tokio::spawn(async move { 38 - run_tap_consumer(db_clone, counts_clone).await; 43 + run_tap_consumer(db_clone, counts_clone, ops_count_clone).await; 44 + }); 45 + 46 + // start stats reporter 47 + let ops_count_stats = ops_count.clone(); 48 + tokio::spawn(async move { 49 + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 50 + let mut last_count = 0; 51 + // The first tick completes immediately 52 + interval.tick().await; 53 + 54 + loop { 55 + interval.tick().await; 56 + let current_count = ops_count_stats.load(Ordering::Relaxed); 57 + let delta = current_count - last_count; 58 + let ops_sec = delta as f64 / 60.0; 59 + info!("stats: total_ops={} delta_ops={} ops_sec={:.2}", current_count, delta, ops_sec); 60 + last_count = current_count; 61 + } 39 62 }); 40 63 41 64 let app = Router::new() ··· 50 73 Ok(()) 51 74 } 52 75 53 - async fn run_tap_consumer(db: Database, counts: Keyspace) { 76 + async fn run_tap_consumer(db: Database, counts: Keyspace, ops_count: Arc<AtomicU64>) { 54 77 let tap_url = "http://localhost:2480"; 55 78 56 79 loop { ··· 67 90 Ok(mut receiver) => { 68 91 info!("connected to tap firehose"); 69 92 while let Ok(event) = receiver.recv().await { 93 + ops_count.fetch_add(1, Ordering::Relaxed); 70 94 if let Event::Record(rec) = &*event { 71 95 if let Err(e) = handle_record(&db, &counts, rec) { 72 96 error!("error handling record: {}", e); ··· 113 137 } 114 138 RecordAction::Update => { 115 139 // info!("updating {} {} {}", rec.did, rec.collection, rec.rkey); 116 - records.insert(&key, &[])?; 140 + // records.insert(&key, &[])?; 117 141 } 118 142 RecordAction::Delete => { 119 143 // info!("deleting {} {} {}", rec.did, rec.collection, rec.rkey);