Code and data for arewedecentralizedyet.online and related projects
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add some rate statistics

+134 -2
+134 -2
data-fetchers/at-mau-watcher/rs/src/main.rs
··· 76 76 pds_pool: HashMap<String, Arc<str>>, 77 77 } 78 78 79 + struct RateStats { 80 + current_events: AtomicU64, 81 + current_bytes: AtomicU64, 82 + events_per_sec: AtomicU64, 83 + bytes_per_sec: AtomicU64, 84 + events_per_sec_1m: AtomicU64, 85 + bytes_per_sec_1m: AtomicU64, 86 + events_per_sec_1h: AtomicU64, 87 + bytes_per_sec_1h: AtomicU64, 88 + events_per_sec_1d: AtomicU64, 89 + bytes_per_sec_1d: AtomicU64, 90 + } 91 + 92 + impl RateStats { 93 + fn new() -> Self { 94 + Self { 95 + current_events: AtomicU64::new(0), 96 + current_bytes: AtomicU64::new(0), 97 + events_per_sec: AtomicU64::new(0), 98 + bytes_per_sec: AtomicU64::new(0), 99 + events_per_sec_1m: AtomicU64::new(0), 100 + bytes_per_sec_1m: AtomicU64::new(0), 101 + events_per_sec_1h: AtomicU64::new(0), 102 + bytes_per_sec_1h: AtomicU64::new(0), 103 + events_per_sec_1d: AtomicU64::new(0), 104 + bytes_per_sec_1d: AtomicU64::new(0), 105 + } 106 + } 107 + 108 + fn record(&self, events: u64, bytes: u64) { 109 + self.current_events.fetch_add(events, Ordering::Relaxed); 110 + self.current_bytes.fetch_add(bytes, Ordering::Relaxed); 111 + } 112 + 113 + fn snapshot(&self) -> (u64, u64, u64, u64, u64, u64, u64, u64) { 114 + ( 115 + self.events_per_sec.load(Ordering::Relaxed), 116 + self.bytes_per_sec.load(Ordering::Relaxed), 117 + self.events_per_sec_1m.load(Ordering::Relaxed), 118 + self.bytes_per_sec_1m.load(Ordering::Relaxed), 119 + self.events_per_sec_1h.load(Ordering::Relaxed), 120 + self.bytes_per_sec_1h.load(Ordering::Relaxed), 121 + self.events_per_sec_1d.load(Ordering::Relaxed), 122 + self.bytes_per_sec_1d.load(Ordering::Relaxed), 123 + ) 124 + } 125 + } 126 + 127 + async fn rate_sampler(stats: Arc<RateStats>) { 128 + let mut ticker = tokio::time::interval(Duration::from_secs(1)); 129 + let mut events_window: std::collections::VecDeque<u64> = std::collections::VecDeque::new(); 130 + let mut bytes_window: std::collections::VecDeque<u64> = std::collections::VecDeque::new(); 131 + let mut sum_events_1m = 0u64; 132 + let mut sum_bytes_1m = 0u64; 133 + let mut sum_events_1h = 0u64; 134 + let mut sum_bytes_1h = 0u64; 135 + let mut sum_events_1d = 0u64; 136 + let mut sum_bytes_1d = 0u64; 137 + 138 + loop { 139 + ticker.tick().await; 140 + let events = stats.current_events.swap(0, Ordering::Relaxed); 141 + let bytes = stats.current_bytes.swap(0, Ordering::Relaxed); 142 + 143 + stats.events_per_sec.store(events, Ordering::Relaxed); 144 + stats.bytes_per_sec.store(bytes, Ordering::Relaxed); 145 + 146 + events_window.push_back(events); 147 + bytes_window.push_back(bytes); 148 + sum_events_1m += events; 149 + sum_bytes_1m += bytes; 150 + sum_events_1h += events; 151 + sum_bytes_1h += bytes; 152 + sum_events_1d += events; 153 + sum_bytes_1d += bytes; 154 + 155 + if events_window.len() > 60 { 156 + sum_events_1m -= events_window[events_window.len() - 61]; 157 + sum_bytes_1m -= bytes_window[bytes_window.len() - 61]; 158 + } 159 + if events_window.len() > 3600 { 160 + sum_events_1h -= events_window[events_window.len() - 3601]; 161 + sum_bytes_1h -= bytes_window[bytes_window.len() - 3601]; 162 + } 163 + if events_window.len() > 86400 { 164 + sum_events_1d -= events_window[0]; 165 + sum_bytes_1d -= bytes_window[0]; 166 + events_window.pop_front(); 167 + bytes_window.pop_front(); 168 + } 169 + 170 + let len_1m = events_window.len().min(60) as u64; 171 + let len_1h = events_window.len().min(3600) as u64; 172 + let len_1d = events_window.len().min(86400) as u64; 173 + 174 + stats 175 + .events_per_sec_1m 176 + .store(if len_1m > 0 { sum_events_1m / len_1m } else { 0 }, Ordering::Relaxed); 177 + stats 178 + .bytes_per_sec_1m 179 + .store(if len_1m > 0 { sum_bytes_1m / len_1m } else { 0 }, Ordering::Relaxed); 180 + stats 181 + .events_per_sec_1h 182 + .store(if len_1h > 0 { sum_events_1h / len_1h } else { 0 }, Ordering::Relaxed); 183 + stats 184 + .bytes_per_sec_1h 185 + .store(if len_1h > 0 { sum_bytes_1h / len_1h } else { 0 }, Ordering::Relaxed); 186 + stats 187 + .events_per_sec_1d 188 + .store(if len_1d > 0 { sum_events_1d / len_1d } else { 0 }, Ordering::Relaxed); 189 + stats 190 + .bytes_per_sec_1d 191 + .store(if len_1d > 0 { sum_bytes_1d / len_1d } else { 0 }, Ordering::Relaxed); 192 + } 193 + } 194 + 79 195 impl AccountStore { 80 196 fn intern_pds(&mut self, value: &str) -> Arc<str> { 81 197 if let Some(existing) = self.pds_pool.get(value) { ··· 441 557 resolve_enqueued: Arc<AtomicU64>, 442 558 resolve_processed: Arc<AtomicU64>, 443 559 resolve_dropped: Arc<AtomicU64>, 560 + rate_stats: Arc<RateStats>, 444 561 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 445 562 let base = relay.unwrap_or_else(|| "wss://bsky.network/xrpc".to_string()); 446 563 if base.contains(NSID) { ··· 478 595 let bytes = match message { 479 596 Message::Binary(bytes) => { 480 597 binary_messages += 1; 598 + rate_stats.record(1, bytes.len() as u64); 481 599 bytes 482 600 } 483 601 Message::Text(text) => { 484 602 text_messages += 1; 603 + rate_stats.record(1, text.len() as u64); 485 604 text.into_bytes() 486 605 } 487 606 Message::Close(_) => { ··· 580 699 } 581 700 582 701 if total_messages % 1000 == 0 { 702 + let (eps, bps, eps_1m, bps_1m, eps_1h, bps_1h, eps_1d, bps_1d) = 703 + rate_stats.snapshot(); 583 704 let updates_backlog = update_enqueued.load(Ordering::Relaxed) 584 705 .saturating_sub(update_processed.load(Ordering::Relaxed)); 585 706 let resolves_backlog = resolve_enqueued.load(Ordering::Relaxed) 586 707 .saturating_sub(resolve_processed.load(Ordering::Relaxed)); 587 708 println!( 588 - "Firehose stats: total={}, binary={}, text={}, close={}, other={}, frame_failures={}, body_failures={}, commit={}, account={}, identity={}, other_events={}, error_frames={}, enqueued={}, updates_backlog={}, resolves_backlog={}, resolve_dropped={}", 709 + "Firehose stats: total={}, binary={}, text={}, close={}, other={}, frame_failures={}, body_failures={}, commit={}, account={}, identity={}, other_events={}, error_frames={}, enqueued={}, updates_backlog={}, resolves_backlog={}, resolve_dropped={}, eps={}, bps={}, eps_1m={}, bps_1m={}, eps_1h={}, bps_1h={}, eps_1d={}, bps_1d={}", 589 710 total_messages, 590 711 binary_messages, 591 712 text_messages, ··· 601 722 enqueued_updates, 602 723 updates_backlog, 603 724 resolves_backlog, 604 - resolve_dropped.load(Ordering::Relaxed) 725 + resolve_dropped.load(Ordering::Relaxed), 726 + eps, 727 + bps, 728 + eps_1m, 729 + bps_1m, 730 + eps_1h, 731 + bps_1h, 732 + eps_1d, 733 + bps_1d 605 734 ); 606 735 } 607 736 } ··· 818 947 let resolve_enqueued = Arc::new(AtomicU64::new(0)); 819 948 let resolve_processed = Arc::new(AtomicU64::new(0)); 820 949 let resolve_dropped = Arc::new(AtomicU64::new(0)); 950 + let rate_stats = Arc::new(RateStats::new()); 821 951 822 952 let (resolve_tx, resolve_rx) = mpsc::channel(RESOLUTION_QUEUE_SIZE); 823 953 let (update_tx, update_rx) = mpsc::unbounded_channel(); ··· 849 979 state.clone(), 850 980 args.snapshot_interval, 851 981 )); 982 + tokio::spawn(rate_sampler(rate_stats.clone())); 852 983 853 984 #[cfg(unix)] 854 985 { ··· 896 1027 resolve_enqueued.clone(), 897 1028 resolve_processed.clone(), 898 1029 resolve_dropped.clone(), 1030 + rate_stats.clone(), 899 1031 ) 900 1032 .await; 901 1033