lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

initial deep crawl setup

phil 9e8dd606 f9e47c74

+250 -4
+26 -2
src/main.rs
··· 10 10 use lightrail::error::{Error, Result}; 11 11 use lightrail::identity; 12 12 use lightrail::storage; 13 - use lightrail::sync::{backfill, firehose, resync}; 13 + use lightrail::sync::{self, backfill, firehose, resync}; 14 14 15 15 #[derive(Parser, Debug)] 16 16 #[command(name = "lightrail", about = "listReposByCollection indexing service")] ··· 80 80 /// Admin password for privileged API endpoints. 81 81 #[arg(long, env = "LIGHTRAIL_ADMIN_PASSWORD")] 82 82 admin_password: Option<String>, 83 + 84 + /// Enable deep crawl: discover PDS hosts via listHosts and crawl each one's repos. 85 + #[arg(long, env = "LIGHTRAIL_DEEP_CRAWL")] 86 + deep_crawl: bool, 87 + 88 + /// Max concurrent per-PDS listRepos workers during deep crawl. 89 + #[arg(long, env = "LIGHTRAIL_MAX_DEEP_CRAWL_WORKERS", default_value_t = 4)] 90 + max_deep_crawl_workers: usize, 83 91 } 84 92 85 93 #[tokio::main] ··· 116 124 } 117 125 118 126 let db = storage::open(&args.db_path)?; 127 + let client = lightrail::http::build_client(); 119 128 let token = CancellationToken::new(); 120 129 121 130 let mut tasks: JoinSet<Result<()>> = JoinSet::new(); ··· 140 149 tasks.spawn({ 141 150 let token = token.clone(); 142 151 let db = db.clone(); 152 + let client = client.clone(); 143 153 let host = subscribe_host.clone(); 144 154 async move { 145 - if backfill::run(host, db, token.clone()).await? { 155 + if backfill::run(host, db, client, token.clone()).await? { 146 156 info!("backfill task idling"); 147 157 token.cancelled().await; 148 158 } ··· 153 163 tasks.spawn({ 154 164 let token = token.clone(); 155 165 let db = db.clone(); 166 + let client = client.clone(); 156 167 let resolver = resolver.clone(); 157 168 async move { 158 169 resync::dispatcher::run( 159 170 resolver, 160 171 db, 172 + client, 161 173 args.max_resync_workers, 162 174 Duration::from_secs(args.describe_repo_fetch_timeout_secs), 163 175 Duration::from_secs(args.get_repo_fetch_timeout_secs), ··· 173 185 let addr = args.listen; 174 186 async move { lightrail::server::serve(addr, db, token).await } 175 187 }); 188 + 189 + if args.deep_crawl { 190 + tasks.spawn({ 191 + let token = token.clone(); 192 + let db = db.clone(); 193 + let client = client.clone(); 194 + let host = subscribe_host.clone(); 195 + async move { 196 + sync::deep_crawl::run(host, db, client, args.max_deep_crawl_workers, token).await 197 + } 198 + }); 199 + } 176 200 177 201 // Wait for a shutdown trigger: ctrl-c or any task exiting (including via 178 202 // panic, surfaced as a JoinError).
+2 -1
src/sync/backfill.rs
··· 12 12 use tracing::{info, trace, warn}; 13 13 14 14 use crate::error::Result; 15 + use crate::http::ThrottledClient; 15 16 use crate::storage::{ 16 17 self, DbRef, 17 18 backfill_progress::{BackfillProgress, get, set}, ··· 30 31 pub async fn run( 31 32 host: Host, 32 33 db: DbRef, 34 + client: ThrottledClient, 33 35 token: tokio_util::sync::CancellationToken, 34 36 ) -> Result<bool> { 35 - let client = crate::http::build_client(); 36 37 let base: jacquard_common::url::Url = format!("https://{host}") 37 38 .parse() 38 39 .map_err(|e: jacquard_common::url::ParseError| crate::error::Error::Other(e.to_string()))?;
+220
src/sync/deep_crawl.rs
··· 1 + //! Deep-crawl orchestration: discover PDS hosts via `listHosts`, then crawl 2 + //! each one's repos via the existing `backfill::run()` worker. 3 + //! 4 + //! Runs alongside (not instead of) the normal relay-level backfill. Hosts that 5 + //! have already had a completed backfill are skipped — new repos on those PDSes 6 + //! arrive via the firehose. 7 + 8 + use jacquard_api::com_atproto::sync::list_hosts::ListHosts; 9 + use jacquard_common::{url::Host, xrpc::XrpcExt}; 10 + use tokio::task::JoinSet; 11 + use tokio_util::sync::CancellationToken; 12 + use tracing::{info, warn}; 13 + 14 + use crate::error::Result; 15 + use crate::http::ThrottledClient; 16 + use crate::storage::{DbRef, backfill_progress, list_hosts_cursor}; 17 + use crate::sync::backfill; 18 + 19 + const PAGE_LIMIT: i64 = 500; 20 + /// Delay between retry attempts after a failed page request. 21 + const RETRY_DELAY_SECS: u64 = 10; 22 + /// How long to wait between full passes through all listHosts pages. 23 + const REPOLL_SECS: u64 = 30 * 60; 24 + 25 + /// Discover PDS hosts via `listHosts` on `relay_host` and crawl each one, 26 + /// re-polling every 30 minutes after each full pass. 27 + pub async fn run( 28 + relay_host: Host, 29 + db: DbRef, 30 + client: ThrottledClient, 31 + max_workers: usize, 32 + token: CancellationToken, 33 + ) -> Result<()> { 34 + info!(relay = %relay_host, "deep crawl started"); 35 + 36 + loop { 37 + if token.is_cancelled() { 38 + return Ok(()); 39 + } 40 + 41 + info!(relay = %relay_host, "deep crawl pass started"); 42 + do_pass( 43 + relay_host.clone(), 44 + db.clone(), 45 + client.clone(), 46 + max_workers, 47 + token.clone(), 48 + ) 49 + .await?; 50 + 51 + if token.is_cancelled() { 52 + return Ok(()); 53 + } 54 + 55 + info!( 56 + relay = %relay_host, 57 + repoll_secs = REPOLL_SECS, 58 + "deep crawl pass complete; re-polling in 30 minutes" 59 + ); 60 + 61 + tokio::select! { 62 + _ = token.cancelled() => return Ok(()), 63 + _ = tokio::time::sleep(tokio::time::Duration::from_secs(REPOLL_SECS)) => {} 64 + } 65 + } 66 + } 67 + 68 + async fn do_pass( 69 + relay_host: Host, 70 + db: DbRef, 71 + client: ThrottledClient, 72 + max_workers: usize, 73 + token: CancellationToken, 74 + ) -> Result<()> { 75 + let base: jacquard_common::url::Url = format!("https://{relay_host}") 76 + .parse() 77 + .map_err(|e: jacquard_common::url::ParseError| crate::error::Error::Other(e.to_string()))?; 78 + 79 + // Resume from the last saved cursor. 80 + let mut cursor: Option<String> = { 81 + let db = db.clone(); 82 + let host = relay_host.clone(); 83 + tokio::task::spawn_blocking(move || list_hosts_cursor::get(&db, &host)).await?? 84 + }; 85 + 86 + let mut workers: JoinSet<(Host, Result<bool>)> = JoinSet::new(); 87 + 88 + loop { 89 + if token.is_cancelled() { 90 + drain_workers(&mut workers).await; 91 + return Ok(()); 92 + } 93 + 94 + let (hosts, next_cursor) = { 95 + let req = ListHosts { 96 + cursor: cursor.as_deref().map(Into::into), 97 + limit: Some(PAGE_LIMIT), 98 + }; 99 + loop { 100 + let resp = match client.xrpc(base.clone()).send(&req).await { 101 + Ok(r) => r, 102 + Err(e) => { 103 + warn!( 104 + error = %e, 105 + relay = %relay_host, 106 + "listHosts send failed; retrying in {RETRY_DELAY_SECS}s" 107 + ); 108 + tokio::select! { 109 + _ = token.cancelled() => { 110 + drain_workers(&mut workers).await; 111 + return Ok(()); 112 + } 113 + _ = tokio::time::sleep(tokio::time::Duration::from_secs(RETRY_DELAY_SECS)) => {} 114 + } 115 + continue; 116 + } 117 + }; 118 + match resp.parse() { 119 + Ok(out) => { 120 + let next = out.cursor.as_deref().map(str::to_owned); 121 + let hostnames = out 122 + .hosts 123 + .into_iter() 124 + .map(|h| h.hostname.to_string()) 125 + .collect::<Vec<_>>(); 126 + break (hostnames, next); 127 + } 128 + Err(e) => { 129 + warn!( 130 + error = %e, 131 + relay = %relay_host, 132 + "listHosts parse failed; retrying in {RETRY_DELAY_SECS}s" 133 + ); 134 + tokio::select! { 135 + _ = token.cancelled() => { 136 + drain_workers(&mut workers).await; 137 + return Ok(()); 138 + } 139 + _ = tokio::time::sleep(tokio::time::Duration::from_secs(RETRY_DELAY_SECS)) => {} 140 + } 141 + } 142 + } 143 + } 144 + }; 145 + 146 + // Spawn a per-PDS backfill worker for each new host. 147 + for hostname in hosts { 148 + let pds_host = match Host::parse(&hostname) { 149 + Ok(h) => h, 150 + Err(e) => { 151 + warn!(hostname = %hostname, error = %e, "failed to parse PDS hostname; skipping"); 152 + continue; 153 + } 154 + }; 155 + 156 + // Skip hosts that have already been fully crawled. 157 + let already_done = { 158 + let db = db.clone(); 159 + let h = pds_host.clone(); 160 + tokio::task::spawn_blocking(move || backfill_progress::get(&db, &h)).await?? 161 + }; 162 + if already_done.and_then(|p| p.completed_at).is_some() { 163 + continue; 164 + } 165 + 166 + // If at capacity, wait for one worker to finish before spawning. 167 + if workers.len() >= max_workers 168 + && let Some(result) = workers.join_next().await 169 + { 170 + log_worker_result(result); 171 + } 172 + 173 + let db2 = db.clone(); 174 + let client2 = client.clone(); 175 + let child = token.child_token(); 176 + let host2 = pds_host.clone(); 177 + workers.spawn(async move { 178 + let outcome = backfill::run(host2.clone(), db2, client2, child).await; 179 + (host2, outcome) 180 + }); 181 + 182 + info!(pds = %pds_host, "spawned deep crawl backfill worker"); 183 + } 184 + 185 + // Persist the cursor after processing this page. 186 + let cursor_to_save = next_cursor.clone().unwrap_or_default(); 187 + { 188 + let db = db.clone(); 189 + let host = relay_host.clone(); 190 + tokio::task::spawn_blocking(move || { 191 + list_hosts_cursor::set(&db, &host, &cursor_to_save) 192 + }) 193 + .await??; 194 + } 195 + 196 + match next_cursor { 197 + Some(c) => cursor = Some(c), 198 + None => { 199 + // End of listHosts — drain remaining workers then return. 200 + drain_workers(&mut workers).await; 201 + return Ok(()); 202 + } 203 + } 204 + } 205 + } 206 + 207 + async fn drain_workers(workers: &mut JoinSet<(Host, Result<bool>)>) { 208 + while let Some(result) = workers.join_next().await { 209 + log_worker_result(result); 210 + } 211 + } 212 + 213 + fn log_worker_result(result: std::result::Result<(Host, Result<bool>), tokio::task::JoinError>) { 214 + match result { 215 + Ok((host, Ok(true))) => info!(pds = %host, "deep crawl backfill worker completed"), 216 + Ok((host, Ok(false))) => info!(pds = %host, "deep crawl backfill worker cancelled"), 217 + Ok((host, Err(e))) => warn!(pds = %host, error = %e, "deep crawl backfill worker failed"), 218 + Err(e) => warn!(error = %e, "deep crawl backfill worker panicked"), 219 + } 220 + }
+1
src/sync/mod.rs
··· 1 1 pub mod backfill; 2 + pub mod deep_crawl; 2 3 pub mod firehose; 3 4 pub mod resync;
+1 -1
src/sync/resync/dispatcher.rs
··· 45 45 pub async fn run( 46 46 resolver: std::sync::Arc<crate::identity::Resolver>, 47 47 db: DbRef, 48 + client: crate::http::ThrottledClient, 48 49 max_concurrent: usize, 49 50 describe_timeout: std::time::Duration, 50 51 get_repo_timeout: std::time::Duration, 51 52 token: tokio_util::sync::CancellationToken, 52 53 ) -> Result<()> { 53 - let client = crate::http::build_client(); 54 54 let mut busy: HashSet<Did<'static>> = HashSet::new(); 55 55 let mut task_dids: HashMap<TaskId, Did<'static>> = HashMap::new(); 56 56 let mut since: Option<Vec<u8>> = None;