Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(consumer): backfill default rate limit

Mia 807cd8c5 f09cf903

+35 -2
+7 -1
crates/consumer/src/backfill/mod.rs
··· 198 198 let _ = rc.zadd(utils::BF_REM_KEY, &pds, rem).await; 199 199 let _ = rc.zadd(utils::BF_RESET_KEY, &pds, reset).await; 200 200 } 201 - Ok(None) => tracing::debug!(pds, "got response with no ratelimit headers."), 201 + Ok(None) => { 202 + tracing::debug!( 203 + pds, 204 + "got response with no ratelimit headers, using defaults." 205 + ); 206 + utils::handle_default_ratelimit(&mut rc, &pds).await?; 207 + } 202 208 Err(e) => { 203 209 tracing::error!(did, "backfill failed: {e}"); 204 210 db::actor_set_sync_status(conn, did, ActorSyncState::Dirty, Utc::now()).await?;
+28 -1
crates/consumer/src/backfill/utils.rs
··· 1 - use chrono::prelude::*; 1 + use chrono::{prelude::*, TimeDelta}; 2 2 use deadpool_postgres::Object; 3 3 use jacquard_common::types::{did::Did, string::Handle}; 4 4 use jacquard_identity::{resolver::IdentityResolver, JacquardResolver}; ··· 11 11 12 12 pub const BF_RESET_KEY: &str = "bf_ratelimit_reset"; 13 13 pub const BF_REM_KEY: &str = "bf_ratelimit_rem"; 14 + const BF_REM_DEFAULT: i32 = 1000; 14 15 15 16 #[derive(Debug, Deserialize)] 16 17 pub struct GetRepoStatusRes { ··· 129 130 }; 130 131 } 131 132 } 133 + 134 + Ok(()) 135 + } 136 + 137 + pub async fn handle_default_ratelimit( 138 + rc: &mut MultiplexedConnection, 139 + pds: &str, 140 + ) -> eyre::Result<()> { 141 + let score = rc.zscore(BF_REM_KEY, pds).await?; 142 + let reset_at = rc 143 + .zscore(BF_RESET_KEY, pds) 144 + .await? 145 + .and_then(|at| DateTime::from_timestamp(at as i64, 0)); 146 + let now = Utc::now(); 147 + 148 + let (new_reset, new_rem) = match (reset_at, score) { 149 + // if reset is in the future, decrease the counter. in all other cases, set defaults 150 + (Some(reset_at), Some(rem)) if reset_at > now => (reset_at.timestamp(), rem as i32 - 1), 151 + (_, _) => { 152 + let new_reset = now + TimeDelta::seconds(300); 153 + (new_reset.timestamp(), BF_REM_DEFAULT) 154 + } 155 + }; 156 + 157 + let _ = rc.zadd(BF_REM_KEY, &pds, new_rem).await; 158 + let _ = rc.zadd(BF_RESET_KEY, &pds, new_reset).await; 132 159 133 160 Ok(()) 134 161 }