lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

--crawl-qps and --fjall-cache-mb configs

phil 392ff3b0 558e204c

+59 -27
+5 -1
hacking.md
··· 106 106 107 107 108 108 very much still todo but i'm getting tired 109 - - [x] add a `--heavy` mode that always uses `getRepo` and never `describeRepo` 109 + - [x] config: add a `--heavy` mode that always uses `getRepo` and never `describeRepo` 110 + - [x] config: db mem limit `--fjall-cache-mb` 111 + - [x] config: per-host request rate self-throttling `--crawl-qps` (name from collectiondir) 110 112 - [ ] resync: estimate CAR size from `getRecord` mst height; `getRepo` if it's likely very small 111 113 - [ ] multi-relay subscriber 112 114 - [ ] special did:web behaviour to keep reusing a stale resolution on failure ··· 115 117 - [ ] if the upstream is a PDS (check with describeServer?) then make only accept events for DIDs that have it as their PDS 116 118 - [ ] use `since` on getRepo for resync to get a smaller partial export in many cases (and then more-carefully do the actual resync) 117 119 - [ ] combine the throttled http client instance, the db, and the admin info into an appstate fineeeee 120 + - [ ] bad word filtering? (collectiondir has it) 121 + - [ ] check response headers and adjust self-throttling rate limits per-host if present 118 122 119 123 120 124 ### special-casing
+1 -1
src/examples/enqueue_resync.rs
··· 27 27 28 28 fn main() -> Result<(), Box<dyn std::error::Error>> { 29 29 let args = Args::parse(); 30 - let db = storage::open(&args.db_path)?; 30 + let db = storage::open(&args.db_path, 64)?; 31 31 let now = SystemTime::now() 32 32 .duration_since(UNIX_EPOCH) 33 33 .unwrap()
+1 -1
src/examples/list_repo_collections.rs
··· 29 29 #[tokio::main] 30 30 async fn main() -> Result<(), Box<dyn std::error::Error>> { 31 31 let args = Args::parse(); 32 - let client = lightrail::http::build_client(); 32 + let client = lightrail::http::build_client(std::num::NonZeroU32::new(10).unwrap()); 33 33 let base: jacquard_common::url::Url = args.base.parse()?; 34 34 let did = Did::new_owned(args.did)?; 35 35
+2 -5
src/http.rs
··· 18 18 use jacquard_common::http_client::{HttpClient, HttpClientExt}; 19 19 use jacquard_common::stream::{ByteStream, StreamError}; 20 20 21 - /// Default per-host request rate: 10 req/s. 22 - const DEFAULT_RATE_PER_SEC: u32 = 10; 23 - 24 21 /// State shared across all clones of a [`ThrottledClient`]. 25 22 struct Shared { 26 23 /// Duration of one token at the configured rate (= 1s / rate). ··· 78 75 } 79 76 80 77 /// Build the shared HTTP client used for all outbound ATProto requests. 81 - pub fn build_client() -> ThrottledClient { 82 - ThrottledClient::new(NonZeroU32::new(DEFAULT_RATE_PER_SEC).unwrap()) 78 + pub fn build_client(rate_per_sec: NonZeroU32) -> ThrottledClient { 79 + ThrottledClient::new(rate_per_sec) 83 80 } 84 81 85 82 impl HttpClient for ThrottledClient {
+10 -2
src/main.rs
··· 90 90 #[arg(long, action, env = "LIGHTRAIL_HEAVY")] 91 91 heavy: bool, 92 92 93 + /// Per-PDS HTTP rate limit for crawl/resync requests, in requests per second. 94 + #[arg(long, env = "LIGHTRAIL_CRAWL_QPS", default_value_t = std::num::NonZeroU32::new(10).unwrap())] 95 + crawl_qps: std::num::NonZeroU32, 96 + 97 + /// fjall block cache size in MiB. 98 + #[arg(long, env = "LIGHTRAIL_FJALL_CACHE_MB", default_value_t = 256)] 99 + fjall_cache_mb: u64, 100 + 93 101 /// Max concurrent per-PDS listRepos workers during deep crawl. 94 102 #[arg( 95 103 long, ··· 133 141 install_metrics(addr)?; 134 142 } 135 143 136 - let db = storage::open(&args.db_path)?; 137 - let client = lightrail::http::build_client(); 144 + let db = storage::open(&args.db_path, args.fjall_cache_mb)?; 145 + let client = lightrail::http::build_client(args.crawl_qps); 138 146 let token = CancellationToken::new(); 139 147 140 148 let mut tasks: JoinSet<Result<()>> = JoinSet::new();
+16 -13
src/storage/mod.rs
··· 65 65 pub type DbRef = Arc<Db>; 66 66 67 67 /// Open (or create) the fjall database at `path` and return a shared handle. 68 - pub fn open(path: &Path) -> StorageResult<DbRef> { 69 - open_inner(path, false) 68 + pub fn open(path: &Path, cache_mb: u64) -> StorageResult<DbRef> { 69 + open_inner(path, DbConfig::ForReal { cache_mb }) 70 + } 71 + 72 + enum DbConfig { 73 + /// temporary db for tests 74 + #[allow(dead_code)] 75 + Testing, 76 + /// bumpable cache for prod 77 + ForReal { cache_mb: u64 }, 70 78 } 71 79 72 80 /// Open a temporary database that deletes itself on drop. For tests only. ··· 76 84 static COUNTER: AtomicU64 = AtomicU64::new(0); 77 85 let n = COUNTER.fetch_add(1, Ordering::Relaxed); 78 86 let path = std::env::temp_dir().join(format!("lightrail-test-{}-{}", std::process::id(), n)); 79 - open_inner(&path, true) 87 + open_inner(&path, DbConfig::Testing) 80 88 } 81 89 82 - fn open_inner(path: &Path, testing: bool) -> StorageResult<DbRef> { 83 - let mut builder = fjall::Database::builder(path); 84 - 85 - builder = if testing { 86 - // for testing, we leave the small default cache and open as temporary 87 - builder.temporary(true) 88 - } else { 89 - // otherwise (prod) we want some more cache moneyyeyyyey 90 - builder.cache_size(256 * 1_024 * 1_024) 90 + fn open_inner(path: &Path, config: DbConfig) -> StorageResult<DbRef> { 91 + let builder = fjall::Database::builder(path); 92 + let builder = match config { 93 + DbConfig::Testing => builder.temporary(true), 94 + DbConfig::ForReal { cache_mb } => builder.cache_size(cache_mb * 2_u64.pow(20)), 91 95 }; 92 - 93 96 let database = builder.open()?; 94 97 let ks = database.keyspace("default", fjall::KeyspaceCreateOptions::default)?; 95 98 let index_ks = database.keyspace("index", || {
+24 -4
src/sync/firehose/event_dispatcher.rs
··· 453 453 async fn commits_for_same_did_are_sequential() { 454 454 let db = crate::storage::open_temporary().unwrap(); 455 455 let resolver = make_resolver(); 456 - let mut d = CommitDispatcher::new(resolver, db, 4, crate::http::build_client()); 456 + let mut d = CommitDispatcher::new( 457 + resolver, 458 + db, 459 + 4, 460 + crate::http::build_client(std::num::NonZeroU32::new(10).unwrap()), 461 + ); 457 462 458 463 let did: Did<'static> = Did::new_owned("did:plc:testsequential").unwrap(); 459 464 let c1 = { ··· 493 498 async fn commits_for_different_dids_run_in_parallel() { 494 499 let db = crate::storage::open_temporary().unwrap(); 495 500 let resolver = make_resolver(); 496 - let mut d = CommitDispatcher::new(resolver, db, 4, crate::http::build_client()); 501 + let mut d = CommitDispatcher::new( 502 + resolver, 503 + db, 504 + 4, 505 + crate::http::build_client(std::num::NonZeroU32::new(10).unwrap()), 506 + ); 497 507 498 508 let did_a: Did<'static> = Did::new_owned("did:plc:testa").unwrap(); 499 509 let did_b: Did<'static> = Did::new_owned("did:plc:testb").unwrap(); ··· 510 520 async fn watermark_advances_after_completion() { 511 521 let db = crate::storage::open_temporary().unwrap(); 512 522 let resolver = make_resolver(); 513 - let mut d = CommitDispatcher::new(resolver, db, 4, crate::http::build_client()); 523 + let mut d = CommitDispatcher::new( 524 + resolver, 525 + db, 526 + 4, 527 + crate::http::build_client(std::num::NonZeroU32::new(10).unwrap()), 528 + ); 514 529 515 530 let did_a: Did<'static> = Did::new_owned("did:plc:testwma").unwrap(); 516 531 let did_b: Did<'static> = Did::new_owned("did:plc:testwmb").unwrap(); ··· 531 546 async fn stalled_seq_evicted_from_watermark() { 532 547 let db = crate::storage::open_temporary().unwrap(); 533 548 let resolver = make_resolver(); 534 - let mut d = CommitDispatcher::new(resolver, db, 4, crate::http::build_client()); 549 + let mut d = CommitDispatcher::new( 550 + resolver, 551 + db, 552 + 4, 553 + crate::http::build_client(std::num::NonZeroU32::new(10).unwrap()), 554 + ); 535 555 536 556 // Manually inject an old entry into outstanding without spawning a worker. 537 557 let stale_instant = Instant::now() - std::time::Duration::from_secs(STALL_EVICT_SECS + 1);