very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[db] fix training to not do a full scan

dawn 6d735f19 6729188d

+56 -59
+36 -9
src/crawler/mod.rs
··· 350 350 tasks.spawn( 351 351 async move { 352 352 loop { 353 - enabled.wait_enabled("crawler").await; 354 - if let Err(e) = Self::crawl(crawler.clone(), &url).await { 353 + if let Err(e) = Self::crawl(crawler.clone(), &url, &mut enabled).await { 355 354 error!(err = ?e, "fatal error, restarting in 30s"); 356 355 tokio::time::sleep(Duration::from_secs(30)).await; 357 356 } ··· 379 378 Ok(url) 380 379 } 381 380 382 - async fn crawl(crawler: Arc<Self>, relay_host: &Url) -> Result<()> { 381 + async fn crawl( 382 + crawler: Arc<Self>, 383 + relay_host: &Url, 384 + enabled: &mut watch::Receiver<bool>, 385 + ) -> Result<()> { 383 386 let base_url = Self::base_url(relay_host)?; 384 387 385 388 let mut rng: SmallRng = rand::make_rng(); ··· 395 398 396 399 let mut was_throttled = false; 397 400 loop { 401 + enabled.wait_enabled("crawler").await; 402 + 398 403 // throttle check 399 404 loop { 400 405 let pending = crawler.state.db.get_count("pending").await; ··· 408 413 was_throttled = true; 409 414 crawler.throttled.store(true, Ordering::Relaxed); 410 415 } 411 - tokio::time::sleep(Duration::from_secs(5)).await; 416 + tokio::select! { 417 + _ = tokio::time::sleep(Duration::from_secs(5)) => {} 418 + _ = enabled.changed() => { 419 + if !*enabled.borrow() { return Ok(()); } 420 + } 421 + } 412 422 } else if pending > crawler.resume_pending as u64 { 413 423 if !was_throttled { 414 424 debug!( ··· 430 440 resume = crawler.resume_pending, 431 441 "cooldown, waiting" 432 442 ); 433 - tokio::time::sleep(Duration::from_secs(5)).await; 443 + tokio::select! { 444 + _ = tokio::time::sleep(Duration::from_secs(5)) => {} 445 + _ = enabled.changed() => { 446 + if !*enabled.borrow() { return Ok(()); } 447 + } 448 + } 434 449 } 435 450 break; 436 451 } else { ··· 573 588 cursor = Cursor::Done(c); 574 589 } 575 590 info!("sleeping 1h before next enumeration pass"); 576 - tokio::time::sleep(Duration::from_secs(3600)).await; 577 - info!("resuming after 1h sleep"); 591 + tokio::select! { 592 + _ = tokio::time::sleep(Duration::from_secs(3600)) => { 593 + info!("resuming after 1h sleep"); 594 + } 595 + _ = enabled.changed() => { 596 + if !*enabled.borrow() { return Ok(()); } 597 + } 598 + } 578 599 continue; 579 600 } 580 601 Err(e) => return Err(e).wrap_err("error while crawling"), ··· 640 661 641 662 if matches!(cursor, Cursor::Done(_)) { 642 663 info!("enumeration complete, sleeping 1h before next pass"); 643 - tokio::time::sleep(Duration::from_secs(3600)).await; 644 - info!("resuming after 1h sleep"); 664 + tokio::select! { 665 + _ = tokio::time::sleep(Duration::from_secs(3600)) => { 666 + info!("resuming after 1h sleep"); 667 + } 668 + _ = enabled.changed() => { 669 + if !*enabled.borrow() { return Ok(()); } 670 + } 671 + } 645 672 } 646 673 } 647 674 }
+20 -50
src/db/mod.rs
··· 422 422 _ => 16_384, 423 423 }; 424 424 425 - let samples = if ks_name == "blocks" { 426 - let mut collections = HashSet::new(); 427 - let mut iter = ks.iter(); 428 - while let Some(guard) = iter.next() { 429 - let key = guard.key().into_diagnostic()?; 430 - if let Some(sep_idx) = key.iter().position(|&b| b == keys::SEP) { 431 - if let Ok(col) = std::str::from_utf8(&key[..sep_idx]) { 432 - collections.insert(col.to_string()); 433 - let mut next_prefix = key[..sep_idx].to_vec(); 434 - next_prefix.push(keys::SEP + 1); 435 - iter = ks.range(next_prefix..); 436 - } else { 437 - break; 438 - } 439 - } else { 440 - break; 441 - } 442 - } 443 - 444 - let mut all_samples = Vec::new(); 445 - let mut seen_keys = HashSet::new(); 446 - let captured_keys = RefCell::new(Vec::new()); 447 - 448 - for t in collections { 449 - let prefix_str = format!("{t}|"); 450 - let prefix = prefix_str.as_bytes(); 451 - let mut end_prefix = prefix.to_vec(); 452 - if let Some(last) = end_prefix.last_mut() { 453 - *last = last.saturating_add(1); 454 - } 455 - 456 - let new = ks 457 - .sample_data_blocks(200, |first, last| { 458 - let passes = first < end_prefix.as_slice() 459 - && last >= prefix 460 - && !seen_keys.contains(&(first.to_vec(), last.to_vec())); 461 - if passes { 462 - captured_keys 463 - .borrow_mut() 464 - .push((first.to_vec(), last.to_vec())); 465 - } 466 - passes 467 - }) 468 - .into_diagnostic()?; 425 + let samples: Vec<Vec<u8>> = if ks_name == "blocks" { 426 + // sample up to 200 data blocks per collection, discovered lazily in the predicate 427 + let per_collection_limit = 200usize; 428 + let collection_counts: RefCell<std::collections::HashMap<Vec<u8>, usize>> = 429 + RefCell::new(std::collections::HashMap::new()); 469 430 470 - for (s, keys) in new.into_iter().zip(captured_keys.borrow_mut().drain(..)) { 471 - if seen_keys.insert(keys) { 472 - all_samples.push(s.to_vec()); 431 + let new = ks 432 + .sample_data_blocks(5000, |first, _last| { 433 + let Some(sep_idx) = first.iter().position(|&b| b == keys::SEP) else { 434 + return false; 435 + }; 436 + let mut counts = collection_counts.borrow_mut(); 437 + let count = counts.entry(first[..sep_idx].to_vec()).or_insert(0); 438 + if *count >= per_collection_limit { 439 + return false; 473 440 } 474 - } 475 - } 476 - all_samples 441 + *count += 1; 442 + true 443 + }) 444 + .into_diagnostic()?; 445 + 446 + new.into_iter().map(|s| s.to_vec()).collect() 477 447 } else { 478 448 let mut seen_keys = HashSet::new(); 479 449 let captured_keys = RefCell::new(Vec::new());