Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
67
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'bulk-backfill' into 'main'

Feat: Faster Backfill

See merge request parakeet-social/parakeet!18

Mia 3f6e355e 745ab2bd

+419 -128
+25
Cargo.lock
··· 752 752 "did-resolver", 753 753 "eyre", 754 754 "figment", 755 + "flume", 755 756 "foldhash", 756 757 "futures", 757 758 "ipld-core", ··· 1338 1339 version = "0.5.7" 1339 1340 source = "registry+https://github.com/rust-lang/crates.io-index" 1340 1341 checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" 1342 + 1343 + [[package]] 1344 + name = "flume" 1345 + version = "0.11.1" 1346 + source = "registry+https://github.com/rust-lang/crates.io-index" 1347 + checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" 1348 + dependencies = [ 1349 + "futures-core", 1350 + "futures-sink", 1351 + "nanorand", 1352 + "spin", 1353 + ] 1341 1354 1342 1355 [[package]] 1343 1356 name = "fnv" ··· 2527 2540 checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" 2528 2541 2529 2542 [[package]] 2543 + name = "nanorand" 2544 + version = "0.7.0" 2545 + source = "registry+https://github.com/rust-lang/crates.io-index" 2546 + checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" 2547 + dependencies = [ 2548 + "getrandom 0.2.15", 2549 + ] 2550 + 2551 + [[package]] 2530 2552 name = "native-tls" 2531 2553 version = "0.2.12" 2532 2554 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3972 3994 version = "0.9.8" 3973 3995 source = "registry+https://github.com/rust-lang/crates.io-index" 3974 3996 checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 3997 + dependencies = [ 3998 + "lock_api", 3999 + ] 3975 4000 3976 4001 [[package]] 3977 4002 name = "spki"
+1
consumer/Cargo.toml
··· 11 11 did-resolver = { path = "../did-resolver" } 12 12 eyre = "0.6.12" 13 13 figment = { version = "0.10.19", features = ["env", "toml"] } 14 + flume = { version = "0.11", features = ["async"] } 14 15 foldhash = "0.1.4" 15 16 futures = "0.3.31" 16 17 ipld-core = "0.4.1"
+334
consumer/src/backfill/downloader.rs
··· 1 + use super::{DL_DONE_KEY, PDS_SERVICE_ID}; 2 + use crate::db; 3 + use chrono::prelude::*; 4 + use deadpool_postgres::{Client as PgClient, Pool}; 5 + use did_resolver::Resolver; 6 + use futures::TryStreamExt; 7 + use metrics::{counter, histogram}; 8 + use parakeet_db::types::{ActorStatus, ActorSyncState}; 9 + use redis::aio::MultiplexedConnection; 10 + use redis::AsyncTypedCommands; 11 + use reqwest::header::HeaderMap; 12 + use reqwest::Client as HttpClient; 13 + use std::path::{Path, PathBuf}; 14 + use std::sync::Arc; 15 + use tokio::sync::watch::Receiver as WatchReceiver; 16 + use tokio::time::{Duration, Instant}; 17 + use tokio_postgres::types::Type; 18 + use tokio_util::io::StreamReader; 19 + use tokio_util::task::TaskTracker; 20 + use tracing::instrument; 21 + 22 + const BF_RESET_KEY: &str = "bf_download_ratelimit_reset"; 23 + const BF_REM_KEY: &str = "bf_download_ratelimit_rem"; 24 + const DL_DUP_KEY: &str = "bf_downloaded"; 25 + 26 + pub async fn downloader( 27 + mut rc: MultiplexedConnection, 28 + pool: Pool, 29 + resolver: Arc<Resolver>, 30 + tmp_dir: PathBuf, 31 + concurrency: usize, 32 + buffer: usize, 33 + tracker: TaskTracker, 34 + stop: WatchReceiver<bool>, 35 + ) { 36 + let (tx, rx) = flume::bounded(64); 37 + let mut conn = pool.get().await.unwrap(); 38 + 39 + let http = HttpClient::new(); 40 + 41 + for _ in 0..concurrency { 42 + tracker.spawn(download_thread( 43 + rc.clone(), 44 + pool.clone(), 45 + resolver.clone(), 46 + http.clone(), 47 + rx.clone(), 48 + tmp_dir.clone(), 49 + )); 50 + } 51 + 52 + let status_stmt = conn.prepare_typed_cached( 53 + "INSERT INTO actors (did, sync_state, last_indexed) VALUES ($1, 'processing', NOW()) ON CONFLICT (did) DO UPDATE SET sync_state = 'processing', last_indexed=NOW()", 54 + &[Type::TEXT] 55 + ).await.unwrap(); 56 + 57 + loop { 58 + if stop.has_changed().unwrap_or(true) { 59 + tracing::info!("stopping downloader"); 60 + break; 61 + } 62 + 63 + if let Ok(count) = rc.llen(DL_DONE_KEY).await { 64 + if count > buffer { 65 + tracing::info!("waiting due to full buffer"); 66 + tokio::time::sleep(Duration::from_secs(5)).await; 67 + continue; 68 + } 69 + } 70 + 71 + let did: String = match rc.lpop("backfill_queue", None).await { 72 + Ok(Some(did)) => did, 73 + Ok(None) => { 74 + tokio::time::sleep(Duration::from_millis(250)).await; 75 + continue; 76 + } 77 + Err(e) => { 78 + tracing::error!("failed to get item from backfill queue: {e}"); 79 + continue; 80 + } 81 + }; 82 + 83 + tracing::trace!("resolving repo {did}"); 84 + 85 + // has the repo already been downloaded? 86 + if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() { 87 + tracing::warn!("skipping duplicate repo {did}"); 88 + continue; 89 + } 90 + 91 + // check if they're already synced in DB too 92 + match db::actor_get_statuses(&mut conn, &did).await { 93 + Ok(Some((_, state))) => { 94 + if state == ActorSyncState::Synced || state == ActorSyncState::Processing { 95 + tracing::warn!("skipping duplicate repo {did}"); 96 + continue; 97 + } 98 + } 99 + Ok(None) => {} 100 + Err(e) => { 101 + tracing::error!(did, "failed to check current repo status: {e}"); 102 + db::backfill_job_write(&mut conn, &did, "failed.resolve") 103 + .await 104 + .unwrap(); 105 + } 106 + } 107 + 108 + match resolver.resolve_did(&did).await { 109 + Ok(Some(did_doc)) => { 110 + let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else { 111 + tracing::warn!("bad DID doc for {did}"); 112 + db::backfill_job_write(&mut conn, &did, "failed.resolve") 113 + .await 114 + .unwrap(); 115 + continue; 116 + }; 117 + let service = service.service_endpoint.clone(); 118 + 119 + // set the repo to processing 120 + if let Err(e) = conn.execute(&status_stmt, &[&did]).await { 121 + tracing::error!("failed to update repo status for {did}: {e}"); 122 + continue; 123 + } 124 + 125 + let handle = did_doc 126 + .also_known_as 127 + .and_then(|akas| akas.first().map(|v| v[5..].to_owned())); 128 + 129 + tracing::trace!("resolved repo {did} {service}"); 130 + if let Err(e) = tx.send_async((service, did, handle)).await { 131 + tracing::error!("failed to send: {e}"); 132 + } 133 + } 134 + Ok(None) => { 135 + tracing::warn!(did, "bad DID doc"); 136 + db::backfill_job_write(&mut conn, &did, "failed.resolve") 137 + .await 138 + .unwrap(); 139 + } 140 + Err(e) => { 141 + tracing::error!(did, "failed to resolve DID doc: {e}"); 142 + db::backfill_job_write(&mut conn, &did, "failed.resolve") 143 + .await 144 + .unwrap(); 145 + } 146 + } 147 + } 148 + } 149 + 150 + async fn download_thread( 151 + mut rc: MultiplexedConnection, 152 + pool: Pool, 153 + resolver: Arc<Resolver>, 154 + http: reqwest::Client, 155 + rx: flume::Receiver<(String, String, Option<String>)>, 156 + tmp_dir: PathBuf, 157 + ) { 158 + tracing::debug!("spawning thread"); 159 + 160 + // this will return Err(_) and exit when all senders (only held above) are dropped 161 + while let Ok((pds, did, maybe_handle)) = rx.recv_async().await { 162 + if let Err(e) = enforce_ratelimit(&mut rc, &pds).await { 163 + tracing::error!("ratelimiter error: {e}"); 164 + continue; 165 + }; 166 + 167 + { 168 + tracing::trace!("getting DB conn..."); 169 + let mut conn = pool.get().await.unwrap(); 170 + tracing::trace!("got DB conn..."); 171 + match check_and_update_repo_status(&http, &mut conn, &pds, &did).await { 172 + Ok(true) => {} 173 + Ok(false) => continue, 174 + Err(e) => { 175 + tracing::error!(pds, did, "failed to check repo status: {e}"); 176 + db::backfill_job_write(&mut conn, &did, "failed.resolve") 177 + .await 178 + .unwrap(); 179 + continue; 180 + } 181 + } 182 + 183 + tracing::debug!("trying to resolve handle..."); 184 + if let Some(handle) = maybe_handle { 185 + if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await { 186 + tracing::error!(pds, did, "failed to resolve handle: {e}"); 187 + db::backfill_job_write(&mut conn, &did, "failed.resolve") 188 + .await 189 + .unwrap(); 190 + } 191 + } 192 + } 193 + 194 + let start = Instant::now(); 195 + 196 + tracing::trace!("downloading repo {did}"); 197 + 198 + match download_car(&http, &tmp_dir, &pds, &did).await { 199 + Ok(Some((rem, reset))) => { 200 + let _ = rc.zadd(BF_REM_KEY, &pds, rem).await; 201 + let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await; 202 + } 203 + Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."), 204 + Err(e) => { 205 + tracing::error!(pds, did, "failed to download repo: {e}"); 206 + continue; 207 + } 208 + } 209 + 210 + histogram!("backfill_download_dur", "pds" => pds).record(start.elapsed().as_secs_f64()); 211 + 212 + let _ = rc.sadd(DL_DUP_KEY, &did).await; 213 + if let Err(e) = rc.rpush(DL_DONE_KEY, &did).await { 214 + tracing::error!(did, "failed to mark download complete: {e}"); 215 + } else { 216 + counter!("backfill_downloaded").increment(1); 217 + } 218 + } 219 + 220 + tracing::debug!("thread exiting"); 221 + } 222 + 223 + async fn enforce_ratelimit(rc: &mut MultiplexedConnection, pds: &str) -> eyre::Result<()> { 224 + let score = rc.zscore(BF_REM_KEY, pds).await?; 225 + 226 + if let Some(rem) = score { 227 + if (rem as i32) < 100 { 228 + // if we've got None for some reason, just hope that the next req will contain the reset header. 229 + if let Some(at) = rc.zscore(BF_RESET_KEY, pds).await? { 230 + tracing::debug!("rate limit for {pds} resets at {at}"); 231 + let time = chrono::DateTime::from_timestamp(at as i64, 0).unwrap(); 232 + let delta = (time - Utc::now()).num_milliseconds().max(0); 233 + 234 + tokio::time::sleep(Duration::from_millis(delta as u64)).await; 235 + }; 236 + } 237 + } 238 + 239 + Ok(()) 240 + } 241 + 242 + // you wouldn't... 243 + #[instrument(skip(http, tmp_dir, pds))] 244 + async fn download_car( 245 + http: &HttpClient, 246 + tmp_dir: &Path, 247 + pds: &str, 248 + did: &str, 249 + ) -> eyre::Result<Option<(i32, i32)>> { 250 + let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 251 + 252 + let res = http 253 + .get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}")) 254 + .send() 255 + .await? 256 + .error_for_status()?; 257 + 258 + let headers = res.headers(); 259 + let ratelimit_rem = header_to_int(headers, "ratelimit-remaining"); 260 + let ratelimit_reset = header_to_int(headers, "ratelimit-reset"); 261 + 262 + let strm = res.bytes_stream().map_err(std::io::Error::other); 263 + let mut reader = StreamReader::new(strm); 264 + 265 + tokio::io::copy(&mut reader, &mut file).await?; 266 + 267 + Ok(ratelimit_rem.zip(ratelimit_reset)) 268 + } 269 + 270 + // there's no ratelimit handling here because we pretty much always call download_car after. 271 + #[instrument(skip(http, conn, pds))] 272 + async fn check_and_update_repo_status( 273 + http: &HttpClient, 274 + conn: &mut PgClient, 275 + pds: &str, 276 + repo: &str, 277 + ) -> eyre::Result<bool> { 278 + match super::check_pds_repo_status(http, pds, repo).await? { 279 + Some(status) => { 280 + if !status.active { 281 + tracing::debug!("repo is inactive"); 282 + 283 + let status = status 284 + .status 285 + .unwrap_or(crate::firehose::AtpAccountStatus::Deleted); 286 + conn.execute( 287 + "UPDATE actors SET sync_state='dirty', status=$2 WHERE did=$1", 288 + &[&repo, &ActorStatus::from(status)], 289 + ) 290 + .await?; 291 + 292 + Ok(false) 293 + } else { 294 + Ok(true) 295 + } 296 + } 297 + None => { 298 + // this repo can't be found - set dirty and assume deleted. 299 + tracing::debug!("repo was deleted"); 300 + conn.execute( 301 + "UPDATE actors SET sync_state='dirty', status='deleted' WHERE did=$1", 302 + &[&repo], 303 + ) 304 + .await?; 305 + 306 + Ok(false) 307 + } 308 + } 309 + } 310 + 311 + async fn resolve_and_set_handle( 312 + conn: &PgClient, 313 + resolver: &Resolver, 314 + did: &str, 315 + handle: &str, 316 + ) -> eyre::Result<()> { 317 + if let Some(handle_did) = resolver.resolve_handle(handle).await? { 318 + if handle_did == did { 319 + conn.execute("UPDATE actors SET handle=$2 WHERE did=$1", &[&did, &handle]) 320 + .await?; 321 + } else { 322 + tracing::warn!("requested DID ({did}) doesn't match handle"); 323 + } 324 + } 325 + 326 + Ok(()) 327 + } 328 + 329 + fn header_to_int(headers: &HeaderMap, name: &str) -> Option<i32> { 330 + headers 331 + .get(name) 332 + .and_then(|v| v.to_str().ok()) 333 + .and_then(|v| v.parse().ok()) 334 + }
+29 -105
consumer/src/backfill/mod.rs
··· 9 9 use metrics::counter; 10 10 use parakeet_db::types::{ActorStatus, ActorSyncState}; 11 11 use redis::aio::MultiplexedConnection; 12 - use redis::{AsyncCommands, Direction}; 12 + use redis::AsyncTypedCommands; 13 13 use reqwest::{Client, StatusCode}; 14 + use std::path::PathBuf; 14 15 use std::str::FromStr; 15 16 use std::sync::Arc; 16 17 use tokio::sync::watch::Receiver as WatchReceiver; ··· 18 19 use tokio_util::task::TaskTracker; 19 20 use tracing::instrument; 20 21 22 + mod downloader; 21 23 mod repo; 22 24 mod types; 23 25 26 + const DL_DONE_KEY: &str = "bf_download_complete"; 24 27 const PDS_SERVICE_ID: &str = "#atproto_pds"; 25 28 // There's a 4MiB limit on parakeet-index, so break delta batches up if there's loads. 26 29 // this should be plenty low enough to not trigger the size limit. (59k did slightly) ··· 28 31 29 32 #[derive(Clone)] 30 33 pub struct BackfillManagerInner { 31 - resolver: Arc<Resolver>, 32 - client: Client, 33 34 index_client: Option<parakeet_index::Client>, 34 - opts: BackfillConfig, 35 + tmp_dir: PathBuf, 35 36 } 36 37 37 38 pub struct BackfillManager { 38 39 pool: Pool, 39 40 redis: MultiplexedConnection, 41 + resolver: Arc<Resolver>, 40 42 semaphore: Arc<Semaphore>, 43 + opts: BackfillConfig, 41 44 inner: BackfillManagerInner, 42 45 } 43 46 ··· 49 52 index_client: Option<parakeet_index::Client>, 50 53 opts: BackfillConfig, 51 54 ) -> eyre::Result<Self> { 52 - let client = Client::builder().brotli(true).build()?; 53 55 let semaphore = Arc::new(Semaphore::new(opts.backfill_workers as usize)); 54 56 55 57 Ok(BackfillManager { 56 58 pool, 57 59 redis, 60 + resolver, 58 61 semaphore, 59 62 inner: BackfillManagerInner { 60 - resolver, 61 - client, 62 63 index_client, 63 - opts, 64 + tmp_dir: PathBuf::from_str(&opts.download_tmp_dir)?, 64 65 }, 66 + opts, 65 67 }) 66 68 } 67 69 68 70 pub async fn run(mut self, stop: WatchReceiver<bool>) -> eyre::Result<()> { 69 71 let tracker = TaskTracker::new(); 70 72 73 + tracker.spawn(downloader::downloader( 74 + self.redis.clone(), 75 + self.pool.clone(), 76 + self.resolver, 77 + self.inner.tmp_dir.clone(), 78 + self.opts.download_workers, 79 + self.opts.download_buffer, 80 + tracker.clone(), 81 + stop.clone(), 82 + )); 83 + 71 84 loop { 72 85 if stop.has_changed().unwrap_or(true) { 73 86 tracker.close(); 87 + tracing::info!("stopping backfiller"); 74 88 break; 75 89 } 76 90 77 - let Some(job) = self 78 - .redis 79 - .lmove::<_, _, Option<String>>( 80 - "backfill_queue", 81 - "backfill_processing", 82 - Direction::Left, 83 - Direction::Right, 84 - ) 85 - .await? 86 - else { 91 + let Some(job): Option<String> = self.redis.lpop(DL_DONE_KEY, None).await? else { 87 92 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 88 93 continue; 89 94 }; ··· 92 97 93 98 let mut inner = self.inner.clone(); 94 99 let mut conn = self.pool.get().await?; 95 - let mut redis = self.redis.clone(); 96 100 97 101 tracker.spawn(async move { 98 102 let _p = p; ··· 102 106 tracing::error!(did = &job, "backfill failed: {e}"); 103 107 counter!("backfill_failure").increment(1); 104 108 105 - db::backfill_job_write(&mut conn, &job, "failed") 109 + db::backfill_job_write(&mut conn, &job, "failed.write") 106 110 .await 107 111 .unwrap(); 108 112 } else { ··· 113 117 .unwrap(); 114 118 } 115 119 116 - redis 117 - .lrem::<_, _, i32>("backfill_processing", 1, &job) 118 - .await 119 - .unwrap(); 120 + if let Err(e) = tokio::fs::remove_file(inner.tmp_dir.join(&job)).await { 121 + tracing::error!(did = &job, "failed to remove file: {e}"); 122 + } 120 123 }); 121 124 } 122 125 ··· 132 135 inner: &mut BackfillManagerInner, 133 136 did: &str, 134 137 ) -> eyre::Result<()> { 135 - let Some((status, sync_state)) = db::actor_get_statuses(conn, did).await? else { 136 - tracing::error!("skipping backfill on unknown repo"); 137 - return Ok(()); 138 - }; 139 - 140 - if sync_state != ActorSyncState::Dirty || status != ActorStatus::Active { 141 - tracing::debug!("skipping non-dirty or inactive repo"); 142 - return Ok(()); 143 - } 144 - 145 - // resolve the did to a PDS (also validates the handle) 146 - let Some(did_doc) = inner.resolver.resolve_did(did).await? else { 147 - eyre::bail!("missing did doc"); 148 - }; 149 - 150 - let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else { 151 - eyre::bail!("DID doc contained no service endpoint"); 152 - }; 153 - 154 - let pds_url = service.service_endpoint.clone(); 155 - 156 - // check the repo status before we attempt to resolve the handle. There's a case where we can't 157 - // resolve the handle in the DID doc because the acc is already deleted. 158 - let Some(repo_status) = check_pds_repo_status(&inner.client, &pds_url, did).await? else { 159 - // this repo can't be found - set dirty and assume deleted. 160 - tracing::debug!("repo was deleted"); 161 - db::actor_upsert( 162 - conn, 163 - did, 164 - ActorStatus::Deleted, 165 - ActorSyncState::Dirty, 166 - Utc::now(), 167 - ) 168 - .await?; 169 - return Ok(()); 170 - }; 171 - 172 - if !repo_status.active { 173 - tracing::debug!("repo is inactive"); 174 - let status = repo_status 175 - .status 176 - .unwrap_or(crate::firehose::AtpAccountStatus::Deleted); 177 - db::actor_upsert(conn, did, status.into(), ActorSyncState::Dirty, Utc::now()).await?; 178 - return Ok(()); 179 - } 180 - 181 - if !inner.opts.skip_handle_validation { 182 - // at this point, the account will be active and we can attempt to resolve the handle. 183 - let Some(handle) = did_doc 184 - .also_known_as 185 - .and_then(|aka| aka.first().cloned()) 186 - .and_then(|handle| handle.strip_prefix("at://").map(String::from)) 187 - else { 188 - eyre::bail!("DID doc contained no handle"); 189 - }; 190 - 191 - // in theory, we can use com.atproto.identity.resolveHandle against a PDS, but that seems 192 - // like a way to end up with really sus handles. 193 - if let Some(handle_did) = inner.resolver.resolve_handle(&handle).await? { 194 - if handle_did != did { 195 - tracing::warn!("requested DID doesn't match handle"); 196 - } else { 197 - // set the handle from above 198 - db::actor_upsert_handle( 199 - conn, 200 - did, 201 - ActorSyncState::Processing, 202 - Some(handle), 203 - Utc::now(), 204 - ) 205 - .await?; 206 - } 207 - } 208 - } 209 - 210 - // now we can start actually backfilling 211 - db::actor_set_sync_status(conn, did, ActorSyncState::Processing, Utc::now()).await?; 212 - 213 138 let mut t = conn.transaction().await?; 214 139 t.execute("SET CONSTRAINTS ALL DEFERRED", &[]).await?; 215 140 216 - tracing::trace!("pulling repo"); 141 + tracing::trace!("loading repo"); 217 142 218 - let (commit, mut deltas, copies) = 219 - repo::stream_and_insert_repo(&mut t, &inner.client, did, &pds_url).await?; 143 + let (commit, mut deltas, copies) = repo::insert_repo(&mut t, &inner.tmp_dir, did).await?; 220 144 221 145 db::actor_set_repo_state(&mut t, did, &commit.rev, commit.data).await?; 222 146
+5 -18
consumer/src/backfill/repo.rs
··· 6 6 use crate::indexer::types::{AggregateDeltaStore, RecordTypes}; 7 7 use crate::{db, indexer}; 8 8 use deadpool_postgres::Transaction; 9 - use futures::TryStreamExt; 10 9 use ipld_core::cid::Cid; 11 10 use iroh_car::CarReader; 12 11 use metrics::counter; 13 12 use parakeet_index::AggregateType; 14 - use reqwest::Client; 15 13 use std::collections::HashMap; 16 - use std::io::ErrorKind; 14 + use std::path::Path; 17 15 use tokio::io::BufReader; 18 - use tokio_util::io::StreamReader; 19 16 20 17 type BackfillDeltaStore = HashMap<(String, i32), i32>; 21 18 22 - pub async fn stream_and_insert_repo( 19 + pub async fn insert_repo( 23 20 t: &mut Transaction<'_>, 24 - client: &Client, 21 + tmp_dir: &Path, 25 22 repo: &str, 26 - pds: &str, 27 23 ) -> eyre::Result<(CarCommitEntry, BackfillDeltaStore, CopyStore)> { 28 - let res = client 29 - .get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={repo}")) 30 - .send() 31 - .await? 32 - .error_for_status()?; 33 - 34 - let strm = res 35 - .bytes_stream() 36 - .map_err(|err| std::io::Error::new(ErrorKind::Other, err)); 37 - let reader = StreamReader::new(strm); 38 - let mut car_stream = CarReader::new(BufReader::new(reader)).await?; 24 + let car = tokio::fs::File::open(tmp_dir.join(repo)).await?; 25 + let mut car_stream = CarReader::new(BufReader::new(car)).await?; 39 26 40 27 // the root should be the commit block 41 28 let root = car_stream.header().roots().first().cloned().unwrap();
+16 -2
consumer/src/config.rs
··· 40 40 /// You can use this to move handle resolution out of event handling and into another place. 41 41 #[serde(default)] 42 42 pub skip_handle_validation: bool, 43 + /// Whether to submit backfill requests for new repos. (Only when history_mode == BackfillHistory). 44 + #[serde(default)] 45 + pub request_backfill: bool, 43 46 } 44 47 45 48 #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Deserialize)] ··· 57 60 pub backfill_workers: u8, 58 61 #[serde(default)] 59 62 pub skip_aggregation: bool, 60 - #[serde(default)] 61 - pub skip_handle_validation: bool, 63 + #[serde(default = "default_download_workers")] 64 + pub download_workers: usize, 65 + #[serde(default = "default_download_buffer")] 66 + pub download_buffer: usize, 67 + pub download_tmp_dir: String, 62 68 } 63 69 64 70 fn default_backfill_workers() -> u8 { ··· 68 74 fn default_indexer_workers() -> u8 { 69 75 4 70 76 } 77 + 78 + fn default_download_workers() -> usize { 79 + 25 80 + } 81 + 82 + fn default_download_buffer() -> usize { 83 + 25_000 84 + }
+8 -3
consumer/src/indexer/mod.rs
··· 30 30 pub struct RelayIndexerOpts { 31 31 pub history_mode: HistoryMode, 32 32 pub skip_handle_validation: bool, 33 + pub request_backfill: bool, 33 34 } 34 35 35 36 #[derive(Clone)] ··· 38 39 resolver: Arc<Resolver>, 39 40 do_backfill: bool, 40 41 do_handle_res: bool, 42 + req_backfill: bool, 41 43 } 42 44 43 45 pub struct RelayIndexer { ··· 66 68 state: RelayIndexerState { 67 69 resolver, 68 70 do_backfill: opts.history_mode == HistoryMode::BackfillHistory, 71 + req_backfill: opts.request_backfill, 69 72 do_handle_res: !opts.skip_handle_validation, 70 73 idxc_tx, 71 74 }, ··· 275 278 .map(ActorStatus::from) 276 279 .unwrap_or(ActorStatus::Active); 277 280 278 - let trigger_bf = if state.do_backfill && status == ActorStatus::Active { 281 + let trigger_bf = if state.do_backfill && state.req_backfill && status == ActorStatus::Active { 279 282 // check old status - if they exist (Some(*)), AND were previously != Active but not Deleted, 280 283 // AND have a rev == null, then trigger backfill. 281 284 db::actor_get_status_and_rev(conn, &account.did) ··· 325 328 // backfill for them and they can be marked active and indexed normally. 326 329 // TODO: bridgy doesn't implement since atm - we need a special case 327 330 if commit.since.is_some() { 328 - if state.do_backfill { 331 + if state.do_backfill && state.req_backfill { 329 332 rc.rpush::<_, _, i32>("backfill_queue", commit.repo).await?; 330 333 } 331 334 return Ok(()); ··· 356 359 .await?; 357 360 358 361 if trigger_backfill { 359 - rc.rpush::<_, _, i32>("backfill_queue", commit.repo).await?; 362 + if state.req_backfill { 363 + rc.rpush::<_, _, i32>("backfill_queue", commit.repo).await?; 364 + } 360 365 return Ok(()); 361 366 } 362 367
+1
consumer/src/main.rs
··· 115 115 let indexer_opts = indexer::RelayIndexerOpts { 116 116 history_mode: indexer_cfg.history_mode, 117 117 skip_handle_validation: indexer_cfg.skip_handle_validation, 118 + request_backfill: indexer_cfg.request_backfill, 118 119 }; 119 120 120 121 let relay_indexer = indexer::RelayIndexer::new(