Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(consumer): more duplicate backfill protections

Mia 1396b58e 2a36164b

+48 -30
+32 -25
crates/consumer/src/backfill/mod.rs
··· 13 13 use redis::aio::MultiplexedConnection; 14 14 use redis::AsyncTypedCommands; 15 15 use reqwest::Client; 16 - use std::collections::HashSet; 17 16 use std::path::PathBuf; 18 17 use std::str::FromStr; 19 18 use std::sync::Arc; 20 19 use std::time::Duration; 21 20 use tokio::sync::watch::Receiver as WatchReceiver; 22 - use tokio::sync::{Mutex, Semaphore}; 21 + use tokio::sync::Semaphore; 23 22 use tokio_util::task::TaskTracker; 24 23 use tracing::instrument; 25 24 26 25 mod repo; 27 26 mod utils; 28 27 29 - const BF_QUEUE: &str = "backfill_queue"; 28 + // The main backfill queue's key 29 + pub const BF_QUEUE: &str = "backfill_queue"; 30 + // A set which is added to when a new backfill is started for deduplication. 31 + pub const BF_DUPL_SET: &str = "bf_duplicate"; 32 + // A set which contains exactly the same info as backfill_queue. 33 + pub const BF_PENDING_SET: &str = "bf_pending"; 30 34 // There's a 4MiB limit on parakeet-index, so break delta batches up if there's loads. 31 35 // this should be plenty low enough to not trigger the size limit. (59k did slightly) 32 36 const DELTA_BATCH_SIZE: usize = 32 * 1024; 33 37 34 38 #[derive(Clone)] 35 39 pub struct BackfillManagerInner { 36 - // we don't need to store anything, just ensure only one thread in the status check at a time 37 - status_lookup_lock: Arc<Mutex<HashSet<String>>>, 38 40 index_client: Option<parakeet_index::Client>, 39 41 tmp_dir: PathBuf, 40 42 resolver: JacquardResolver, ··· 60 62 61 63 let client = Client::builder().brotli(true).build()?; 62 64 63 - let current_tasks = Arc::new(Mutex::new(HashSet::new())); 64 - 65 65 Ok(BackfillManager { 66 66 pool, 67 67 redis, 68 68 semaphore, 69 69 inner: BackfillManagerInner { 70 - status_lookup_lock: current_tasks, 71 70 index_client, 72 71 tmp_dir: PathBuf::from_str(&opts.download_tmp_dir)?, 73 72 resolver, ··· 107 106 tracker.spawn(async move { 108 107 let _p = p; 109 108 tracing::trace!("backfilling {did}"); 109 + if do_actor_duplicate_check(&mut rc, &did).await { 110 + return; 111 + } 110 112 111 113 if let Err(e) = do_actor_backfill(&mut conn, &mut rc, inner, &did).await { 112 114 tracing::error!(did, "backfill failed: {e}"); ··· 131 133 } 132 134 } 133 135 136 + /// returns `true` if backfill should stop. 137 + async fn do_actor_duplicate_check(rc: &mut MultiplexedConnection, did: &str) -> bool { 138 + match rc.sismember(BF_DUPL_SET, &did).await { 139 + Ok(true) => { 140 + tracing::info!("skipping duplicate repo {did}"); 141 + true 142 + } 143 + Ok(false) => { 144 + if let Err(e) = rc.sadd(BF_DUPL_SET, &did).await { 145 + tracing::error!("failed to update bf_duplicate: {e}"); 146 + true 147 + } else { 148 + false 149 + } 150 + } 151 + Err(e) => { 152 + tracing::error!("failed to check for duplicate (backfill continues): {e}"); 153 + false 154 + } 155 + } 156 + } 157 + 134 158 async fn do_actor_backfill( 135 159 conn: &mut Object, 136 160 rc: &mut MultiplexedConnection, 137 161 mut inner: BackfillManagerInner, 138 162 did: &str, 139 163 ) -> eyre::Result<()> { 140 - let mut l = inner.status_lookup_lock.lock().await; 141 - 142 - // has the repo already been downloaded? 143 - if l.contains(did) { 144 - tracing::info!("skipping duplicate repo {did}"); 145 - return Ok(()); 146 - } else { 147 - l.insert(did.to_string()); 148 - } 149 - 150 164 match db::actor_get_statuses(conn, did).await { 151 165 Ok(Some((_, state))) => { 152 166 if state == ActorSyncState::Synced || state == ActorSyncState::Processing { ··· 168 182 &[&did], 169 183 ) 170 184 .await?; 171 - 172 - drop(l); 173 185 174 186 let jd = Did::raw(did); 175 187 let (pds, handle) = match utils::resolve_service(&inner.resolver, &jd).await { ··· 225 237 db::actor_set_sync_status(conn, did, ActorSyncState::Dirty, Utc::now()).await?; 226 238 db::backfill_job_write(conn, did, "failed.write").await?; 227 239 } 228 - } 229 - 230 - { 231 - let mut l = inner.status_lookup_lock.lock().await; 232 - l.remove(did); 233 240 } 234 241 235 242 Ok(())
+16 -5
crates/consumer/src/indexer/mod.rs
··· 1 + use crate::backfill::{BF_PENDING_SET, BF_QUEUE}; 1 2 use crate::config::HistoryMode; 2 3 use crate::db; 3 4 use crate::firehose::{ ··· 18 19 use parakeet_db::types::{ActorStatus, ActorSyncState}; 19 20 use parakeet_index::AggregateType; 20 21 use redis::aio::MultiplexedConnection; 21 - use redis::AsyncCommands; 22 + use redis::AsyncTypedCommands; 22 23 use std::collections::{HashMap, HashSet}; 23 24 use std::hash::BuildHasher; 24 25 use tokio::sync::mpsc::{channel, Sender}; ··· 226 227 // don't care if we're not synced. also no point if !do_backfill bc we might not have a worker 227 228 if sync_state == ActorSyncState::Synced && state.do_backfill && sync.rev > current_rev { 228 229 tracing::debug!("triggering backfill due to #sync"); 229 - rc.rpush::<_, _, i32>("backfill_queue", sync.did).await?; 230 + request_backfill(rc, &sync.did).await?; 230 231 } 231 232 232 233 Ok(()) ··· 311 312 312 313 if trigger_bf { 313 314 tracing::debug!("triggering backfill due to account coming out of inactive state"); 314 - rc.rpush::<_, _, i32>("backfill_queue", account.did).await?; 315 + request_backfill(rc, &account.did).await?; 315 316 } 316 317 317 318 Ok(()) ··· 339 340 // TODO: bridgy doesn't implement since atm - we need a special case 340 341 if commit.since.is_some() { 341 342 if state.do_backfill && state.req_backfill { 342 - rc.rpush::<_, _, i32>("backfill_queue", commit.repo).await?; 343 + request_backfill(rc, &commit.repo).await?; 343 344 } 344 345 return Ok(()); 345 346 } ··· 370 371 371 372 if trigger_backfill { 372 373 if state.req_backfill { 373 - rc.rpush::<_, _, i32>("backfill_queue", commit.repo).await?; 374 + request_backfill(rc, &commit.repo).await?; 374 375 } 375 376 return Ok(()); 376 377 } ··· 855 856 856 857 Ok(()) 857 858 } 859 + 860 + pub async fn request_backfill(rc: &mut MultiplexedConnection, did: &str) -> redis::RedisResult<()> { 861 + if !rc.sismember(BF_PENDING_SET, did).await? { 862 + // trigger backfill 863 + rc.rpush(BF_QUEUE, did).await?; 864 + rc.sadd(BF_PENDING_SET, did).await?; 865 + } 866 + 867 + Ok(()) 868 + }