Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

at debug 182 lines 5.9 kB view raw
1use crate::logo; 2 3use governor::{ 4 NotUntil, Quota, RateLimiter, 5 clock::{Clock, DefaultClock}, 6 state::keyed::DefaultKeyedStateStore, 7}; 8use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9use std::{ 10 convert::TryInto, 11 net::{IpAddr, Ipv6Addr}, 12 sync::{Arc, LazyLock}, 13 time::Duration, 14}; 15use tokio::sync::oneshot; 16 17static CLOCK: LazyLock<DefaultClock> = LazyLock::new(DefaultClock::default); 18 19const IP6_64_MASK: Ipv6Addr = Ipv6Addr::from_bits(0xFFFF_FFFF_FFFF_FFFF_0000_0000_0000_0000); 20type IP6_56 = [u8; 7]; 21type IP6_48 = [u8; 6]; 22 23fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 24 let period = quota.replenish_interval() / factor; 25 let burst = quota 26 .burst_size() 27 .checked_mul(factor.try_into().expect("factor to be non-zero")) 28 .expect("burst to be able to multiply"); 29 Quota::with_period(period).map(|q| q.allow_burst(burst)) 30} 31 32#[derive(Debug)] 33struct IpLimiters { 34 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 35 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 36 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, 37} 38 39impl IpLimiters { 40 pub fn new(quota: Quota) -> Self { 41 Self { 42 per_ip: RateLimiter::keyed(quota), 43 ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")), 44 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 45 } 46 } 47 pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { 48 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 49 match ip { 50 addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf), 51 IpAddr::V6(a) => { 52 // always check all limiters 53 let check_ip = self 54 .per_ip 55 .check_key(&IpAddr::V6(a & IP6_64_MASK)) 56 .map_err(asdf); 57 let check_56 = self 58 .ip6_56 59 .check_key( 60 a.octets()[..7] 61 .try_into() 62 .expect("to check ip6 /56 limiter"), 63 ) 64 .map_err(asdf); 65 let check_48 = self 66 .ip6_48 67 .check_key( 68 a.octets()[..6] 69 .try_into() 70 .expect("to check ip6 /48 limiter"), 71 ) 72 .map_err(asdf); 73 check_ip.and(check_56).and(check_48) 74 } 75 } 76 } 77} 78 79/// Once the rate limit has been reached, the middleware will respond with 80/// status code 429 (too many requests) and a `Retry-After` header with the amount 81/// of time that needs to pass before another request will be allowed. 82#[derive(Debug)] 83pub struct GovernorMiddleware { 84 #[allow(dead_code)] 85 stop_on_drop: oneshot::Sender<()>, 86 limiters: Arc<IpLimiters>, 87} 88 89impl GovernorMiddleware { 90 /// Limit request rates 91 /// 92 /// a little gross but this spawns a tokio task for housekeeping: 93 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping 94 pub fn new(quota: Quota) -> Self { 95 let limiters = Arc::new(IpLimiters::new(quota)); 96 let (stop_on_drop, mut stopped) = oneshot::channel(); 97 tokio::task::spawn({ 98 let limiters = limiters.clone(); 99 async move { 100 loop { 101 tokio::select! { 102 _ = &mut stopped => break, 103 _ = tokio::time::sleep(Duration::from_secs(60)) => {}, 104 }; 105 log::debug!( 106 "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 107 limiters.per_ip.len(), 108 limiters.ip6_56.len(), 109 limiters.ip6_48.len(), 110 ); 111 limiters.per_ip.retain_recent(); 112 limiters.ip6_56.retain_recent(); 113 limiters.ip6_48.retain_recent(); 114 } 115 } 116 }); 117 Self { 118 stop_on_drop, 119 limiters, 120 } 121 } 122} 123 124impl<E: Endpoint> Middleware<E> for GovernorMiddleware { 125 type Output = GovernorMiddlewareImpl<E>; 126 fn transform(&self, ep: E) -> Self::Output { 127 GovernorMiddlewareImpl { 128 ep, 129 limiters: self.limiters.clone(), 130 } 131 } 132} 133 134pub struct GovernorMiddlewareImpl<E> { 135 ep: E, 136 limiters: Arc<IpLimiters>, 137} 138 139impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> { 140 type Output = E::Output; 141 142 async fn call(&self, req: Request) -> Result<Self::Output> { 143 let remote = req 144 .remote_addr() 145 .as_socket_addr() 146 .expect("failed to get request's remote addr") // TODO 147 .ip(); 148 149 log::trace!("remote: {remote}"); 150 151 match self.limiters.check_key(remote) { 152 Ok(_) => { 153 log::debug!("allowing remote {remote}"); 154 self.ep.call(req).await 155 } 156 Err(d) => { 157 let wait_time = d.as_secs(); 158 159 log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s"); 160 161 let res = Response::builder() 162 .status(StatusCode::TOO_MANY_REQUESTS) 163 .header("x-ratelimit-after", wait_time) 164 .header("retry-after", wait_time) 165 .body(booo()); 166 Err(poem::Error::from_response(res)) 167 } 168 } 169 } 170} 171 172fn booo() -> String { 173 format!( 174 r#"{} 175 176You're going a bit too fast. 177 178Tip: check out the `x-ratelimit-after` response header. 179"#, 180 logo("mirror 429") 181 ) 182}