lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

configs and timeouts

phil 452d98b0 ca9ce226

+133 -26
+50 -4
src/main.rs
··· 1 1 use std::net::SocketAddr; 2 2 use std::path::PathBuf; 3 + use std::time::Duration; 3 4 4 5 use clap::Parser; 5 6 use tokio::task::JoinSet; ··· 43 44 )] 44 45 slingshot_url: Option<jacquard_common::url::Url>, 45 46 46 - /// Max identities kept in in-process identity cache 47 - #[arg(long, env = "LIGHTRAIL_IDENT_CACHE_SIZE", default_value_t = 1_000_000)] 47 + /// Max identities kept in in-process identity cache. 48 + #[arg(long, env = "LIGHTRAIL_IDENT_CACHE_SIZE", default_value_t = 2_000_000)] 48 49 ident_cache_size: u64, 49 50 51 + /// Maximum concurrent firehose commit worker tasks. 52 + #[arg(long, env = "LIGHTRAIL_MAX_FIREHOSE_WORKERS", default_value_t = 10)] 53 + max_firehose_workers: usize, 54 + 55 + /// Maximum concurrent resync worker tasks. 56 + #[arg(long, env = "LIGHTRAIL_MAX_RESYNC_WORKERS", default_value_t = 16)] 57 + max_resync_workers: usize, 58 + 59 + /// How often to flush the firehose cursor watermark to storage, in seconds. 60 + #[arg(long, env = "LIGHTRAIL_CURSOR_SAVE_INTERVAL", default_value_t = 1)] 61 + cursor_save_interval_secs: u64, 62 + 63 + /// HTTP timeout for describeRepo + getLatestCommit during resync, in seconds. 64 + #[arg( 65 + long, 66 + env = "LIGHTRAIL_DESCRIBE_REPO_FETCH_TIMEOUT", 67 + default_value_t = 30 68 + )] 69 + describe_repo_fetch_timeout_secs: u64, 70 + 71 + /// HTTP timeout for getRepo (full CAR download) during resync, in seconds. 72 + #[arg(long, env = "LIGHTRAIL_GET_REPO_FETCH_TIMEOUT", default_value_t = 300)] 73 + get_repo_fetch_timeout_secs: u64, 74 + 50 75 /// TCP address for the Prometheus metrics HTTP endpoint. 51 76 /// If not set, metrics are not exported. 52 77 #[arg(long, env = "LIGHTRAIL_METRICS_BIND", num_args = 0..=1, default_missing_value = "0.0.0.0:6789")] 53 78 metrics_bind: Option<SocketAddr>, 79 + 80 + /// Admin password for privileged API endpoints. 81 + #[arg(long, env = "LIGHTRAIL_ADMIN_PASSWORD")] 82 + admin_password: Option<String>, 54 83 55 84 /// Log an error when a commit claims a collection birth but the index 56 85 /// already has that collection for the DID (temporary diagnostic flag). ··· 103 132 let resolver = resolver.clone(); 104 133 let validate_births = args.validate_births; 105 134 async move { 106 - let mut sub = firehose::Subscriber::new(host, db, resolver, validate_births); 135 + let mut sub = firehose::Subscriber::new( 136 + host, 137 + db, 138 + resolver, 139 + validate_births, 140 + args.max_firehose_workers, 141 + Duration::from_secs(args.cursor_save_interval_secs), 142 + ); 107 143 sub.run(token).await 108 144 } 109 145 }); ··· 124 160 let token = token.clone(); 125 161 let db = db.clone(); 126 162 let resolver = resolver.clone(); 127 - async move { resync::dispatcher::run(resolver, db, 20, token).await } 163 + async move { 164 + resync::dispatcher::run( 165 + resolver, 166 + db, 167 + args.max_resync_workers, 168 + Duration::from_secs(args.describe_repo_fetch_timeout_secs), 169 + Duration::from_secs(args.get_repo_fetch_timeout_secs), 170 + token, 171 + ) 172 + .await 173 + } 128 174 }); 129 175 130 176 tasks.spawn({
+10
src/sync/firehose/commit_event.rs
··· 496 496 // All checks passed — atomically update the chain tip and the collection 497 497 // index (born → insert, died → remove). Also record each born collection 498 498 // in the global collection list (blind overwrite, never deleted). 499 + let n_born = born.len() as u64; 500 + let n_died = died.len() as u64; 499 501 let mut batch = db.database.batch(); 500 502 storage::repo::put_prev_into( 501 503 &mut batch, ··· 516 518 batch 517 519 .commit() 518 520 .map_err(Into::<crate::storage::StorageError>::into)?; 521 + 522 + if n_born > 0 { 523 + metrics::counter!("lightrail_collection_births_total").increment(n_born); 524 + } 525 + if n_died > 0 { 526 + metrics::counter!("lightrail_collection_deaths_total").increment(n_died); 527 + } 528 + metrics::counter!("lightrail_commits_indexed_total").increment(1); 519 529 520 530 Ok(()) 521 531 }
+8 -8
src/sync/firehose/mod.rs
··· 43 43 /// Maximum reconnect delay. 44 44 const MAX_BACKOFF_SECS: u64 = 64; 45 45 46 - /// How often to flush the watermark cursor to storage. 47 - const CURSOR_FLUSH_INTERVAL: Duration = Duration::from_secs(1); 48 - 49 - /// Maximum commit worker tasks running concurrently. 50 - const MAX_COMMIT_WORKERS: usize = 16; 51 - 52 46 /// Manages a single logical connection to a relay firehose, with reconnection. 53 47 pub struct Subscriber { 54 48 host: Host, 55 49 db: DbRef, 56 50 resolver: Arc<crate::identity::Resolver>, 57 51 validate_births: bool, 52 + max_workers: usize, 53 + cursor_save_interval: Duration, 58 54 } 59 55 60 56 impl Subscriber { ··· 63 59 db: DbRef, 64 60 resolver: Arc<crate::identity::Resolver>, 65 61 validate_births: bool, 62 + max_workers: usize, 63 + cursor_save_interval: Duration, 66 64 ) -> Self { 67 65 Self { 68 66 host, 69 67 db, 70 68 resolver, 71 69 validate_births, 70 + max_workers, 71 + cursor_save_interval, 72 72 } 73 73 } 74 74 ··· 89 89 let mut dispatcher = CommitDispatcher::new( 90 90 self.resolver.clone(), 91 91 self.db.clone(), 92 - MAX_COMMIT_WORKERS, 92 + self.max_workers, 93 93 self.validate_births, 94 94 ); 95 95 // When Some, use this cursor on the next reconnect instead of loading ··· 234 234 Event::Worker(None) => {} // JoinSet drained (shouldn't happen with guard) 235 235 } 236 236 237 - if cursor_tick.elapsed() >= CURSOR_FLUSH_INTERVAL { 237 + if cursor_tick.elapsed() >= self.cursor_save_interval { 238 238 dispatcher.evict_stalled(); 239 239 let wm = dispatcher.watermark(last_seq); 240 240 let db = self.db.clone();
+25 -3
src/sync/resync/dispatcher.rs
··· 44 44 resolver: std::sync::Arc<crate::identity::Resolver>, 45 45 db: DbRef, 46 46 max_concurrent: usize, 47 + describe_timeout: std::time::Duration, 48 + get_repo_timeout: std::time::Duration, 47 49 token: tokio_util::sync::CancellationToken, 48 50 ) -> Result<()> { 49 51 let client = crate::http::build_client(); ··· 117 119 let client = client.clone(); 118 120 let resolver = resolver.clone(); 119 121 let db = db.clone(); 120 - let handle = workers 121 - .spawn(async move { run_worker(item, &resolver, &client, &db).await }); 122 + let handle = workers.spawn(async move { 123 + run_worker( 124 + item, 125 + &resolver, 126 + &client, 127 + &db, 128 + describe_timeout, 129 + get_repo_timeout, 130 + ) 131 + .await 132 + }); 122 133 task_dids.insert(handle.id(), did_str.clone()); 123 134 metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 124 135 trace!(did = %did_str, running = workers.len(), "spawned resync worker"); ··· 232 243 resolver: &crate::identity::Resolver, 233 244 client: &crate::http::ThrottledClient, 234 245 db: &DbRef, 246 + describe_timeout: std::time::Duration, 247 + get_repo_timeout: std::time::Duration, 235 248 ) -> WorkerOutcome { 236 249 let did_str = item.did.as_str().to_string(); 237 - match super::index_repo(client, resolver, item.did, db).await { 250 + match super::index_repo( 251 + client, 252 + resolver, 253 + item.did, 254 + db, 255 + describe_timeout, 256 + get_repo_timeout, 257 + ) 258 + .await 259 + { 238 260 Ok(()) => { 239 261 trace!(did = %did_str, "resync completed"); 240 262 WorkerOutcome::Success
+40 -11
src/sync/resync/mod.rs
··· 19 19 pub mod dispatcher; 20 20 pub mod get_repo; 21 21 22 + use std::time::Duration; 23 + 22 24 use jacquard_common::IntoStatic; 23 25 use jacquard_common::http_client::HttpClient; 24 26 use jacquard_common::types::{cid::Cid, nsid::Nsid, string::Did, tid::Tid}; ··· 103 105 resolver: &crate::identity::Resolver, 104 106 did: Did<'_>, 105 107 db: &DbRef, 108 + describe_timeout: Duration, 109 + get_repo_timeout: Duration, 106 110 ) -> Result<()> 107 111 where 108 112 C: HttpClient + Sync, ··· 114 118 let resolved = resolver.resolve(&did).await?; 115 119 let base = &*resolved.pds; 116 120 117 - let snapshot = match fetch_collections(client, base, did.clone()).await { 121 + let snapshot = match fetch_collections( 122 + client, 123 + base, 124 + did.clone(), 125 + describe_timeout, 126 + get_repo_timeout, 127 + ) 128 + .await 129 + { 118 130 Ok(s) => s, 119 131 Err(GetCollectionsError::RepoNotFound) => return Ok(()), 120 132 Err(GetCollectionsError::RepoGone(reason)) => { ··· 184 196 client: &C, 185 197 base: &jacquard_common::url::Url, 186 198 did: Did<'_>, 199 + describe_timeout: Duration, 200 + get_repo_timeout: Duration, 187 201 ) -> std::result::Result<RepoSnapshot, GetCollectionsError> 188 202 where 189 203 C: HttpClient + Sync, 190 204 { 191 - match describe_repo::fetch_collections(client, base, did.clone()).await { 192 - Ok(snapshot) if !snapshot.collections.is_empty() => return Ok(snapshot), 205 + let describe_result = tokio::time::timeout( 206 + describe_timeout, 207 + describe_repo::fetch_collections(client, base, did.clone()), 208 + ) 209 + .await; 210 + 211 + match describe_result { 212 + Ok(Ok(snapshot)) if !snapshot.collections.is_empty() => return Ok(snapshot), 193 213 // Empty list: PDS may have a bug or not paginate large collection sets. 194 214 // Fall through to the full CAR walk. 195 - Ok(_) => {} 215 + Ok(Ok(_)) => {} 196 216 // Rate-limited: don't escalate to a heavier getRepo request. 197 - Err(e @ GetCollectionsError::RateLimited(_)) => return Err(e), 217 + Ok(Err(e @ GetCollectionsError::RateLimited(_))) => return Err(e), 198 218 // Definitively gone: getRepo would return the same answer. 199 - Err(e @ (GetCollectionsError::RepoNotFound | GetCollectionsError::RepoGone(_))) => { 219 + Ok(Err(e @ (GetCollectionsError::RepoNotFound | GetCollectionsError::RepoGone(_)))) => { 200 220 return Err(e); 201 221 } 202 - // Any other failure (404, generic 4xx/5xx, decode error, unrecognised XRPC 203 - // error): fall through. The PDS may not implement describeRepo, or may have 204 - // a bug this endpoint doesn't hit. 205 - Err(_) => {} 222 + // Any other failure or timeout: fall through. The PDS may not implement 223 + // describeRepo, or may have a bug this endpoint doesn't hit. 224 + Ok(Err(_)) | Err(_) => {} 206 225 } 207 - get_repo::fetch_collections(client, base, did).await 226 + 227 + tokio::time::timeout( 228 + get_repo_timeout, 229 + get_repo::fetch_collections(client, base, did), 230 + ) 231 + .await 232 + .unwrap_or_else(|_| { 233 + Err(GetCollectionsError::Request( 234 + "getRepo timed out".to_string(), 235 + )) 236 + }) 208 237 }