very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[config] implement functions for standard / filtered and full-network config defaults

dawn a882c07e b757f20f

+111 -55
+111 -55
src/config.rs
··· 170 170 pub crawler_sources: Vec<CrawlerSource>, 171 171 } 172 172 173 + impl Default for Config { 174 + fn default() -> Self { 175 + const BASE_MEMTABLE_MB: u64 = 32; 176 + Self { 177 + database_path: PathBuf::from("./hydrant.db"), 178 + relays: vec![Url::parse("wss://relay.fire.hose.cam/").unwrap()], 179 + plc_urls: vec![Url::parse("https://plc.wtf").unwrap()], 180 + full_network: false, 181 + ephemeral: false, 182 + ephemeral_ttl: Duration::from_secs(3600), 183 + cursor_save_interval: Duration::from_secs(3), 184 + repo_fetch_timeout: Duration::from_secs(300), 185 + cache_size: 256, 186 + backfill_concurrency_limit: 16, 187 + data_compression: Compression::Lz4, 188 + journal_compression: Compression::Lz4, 189 + verify_signatures: SignatureVerification::Full, 190 + identity_cache_size: 1_000_000, 191 + enable_firehose: true, 192 + enable_crawler: None, 193 + firehose_workers: 8, 194 + db_worker_threads: 4, 195 + db_max_journaling_size_mb: 400, 196 + db_blocks_memtable_size_mb: BASE_MEMTABLE_MB, 197 + db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 2, 198 + db_events_memtable_size_mb: BASE_MEMTABLE_MB, 199 + db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2, 200 + crawler_max_pending_repos: 2000, 201 + crawler_resume_pending_repos: 1000, 202 + filter_signals: None, 203 + filter_collections: None, 204 + filter_excludes: None, 205 + enable_backlinks: false, 206 + crawler_sources: vec![CrawlerSource { 207 + url: Url::parse("https://lightrail.microcosm.blue").unwrap(), 208 + mode: CrawlerMode::ByCollection, 209 + }], 210 + } 211 + } 212 + } 213 + 173 214 impl Config { 215 + /// returns the default config for full network usage. 216 + pub fn full_network() -> Self { 217 + const BASE_MEMTABLE_MB: u64 = 192; 218 + Self { 219 + full_network: true, 220 + plc_urls: vec![Url::parse("https://plc.directory").unwrap()], 221 + backfill_concurrency_limit: 64, 222 + firehose_workers: 24, 223 + db_worker_threads: 8, 224 + db_max_journaling_size_mb: 1024, 225 + db_blocks_memtable_size_mb: BASE_MEMTABLE_MB, 226 + db_events_memtable_size_mb: BASE_MEMTABLE_MB, 227 + db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 2, 228 + db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2, 229 + crawler_sources: vec![CrawlerSource { 230 + url: Url::parse("wss://relay.fire.hose.cam/").unwrap(), 231 + mode: CrawlerMode::Relay, 232 + }], 233 + ..Self::default() 234 + } 235 + } 236 + 237 + /// reads and builds the config from environment variables. 174 238 pub fn from_env() -> Result<Self> { 175 239 macro_rules! cfg { 176 240 (@val $key:expr) => { ··· 180 244 cfg!(@val $key) 181 245 .ok() 182 246 .and_then(|s| humantime::parse_duration(&s).ok()) 183 - .unwrap_or(Duration::from_secs($default)) 247 + .unwrap_or($default) 184 248 }; 185 249 ($key:expr, $default:expr) => { 186 250 cfg!(@val $key) ··· 191 255 }; 192 256 } 193 257 194 - let relay_host: Url = cfg!( 195 - "RELAY_HOST", 196 - Url::parse("wss://relay.fire.hose.cam/").unwrap() 197 - ); 258 + // full_network is read first since it determines which defaults to use. 259 + let full_network: bool = cfg!("FULL_NETWORK", false); 260 + let defaults = full_network 261 + .then(Self::full_network) 262 + .unwrap_or_else(Self::default); 263 + 264 + let relay_host: Url = cfg!("RELAY_HOST", defaults.relays[0].clone()); 198 265 let relay_hosts = std::env::var("HYDRANT_RELAY_HOSTS") 199 266 .ok() 200 267 .and_then(|hosts| { ··· 211 278 .then(|| vec![relay_host]) 212 279 .unwrap_or(relay_hosts); 213 280 214 - let full_network: bool = cfg!("FULL_NETWORK", false); 215 - 216 281 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 217 282 .ok() 218 283 .map(|s| { ··· 221 286 .collect::<Result<Vec<_>, _>>() 222 287 .map_err(|e| miette::miette!("invalid PLC URL: {e}")) 223 288 }) 224 - .unwrap_or_else(|| { 225 - Ok(vec![ 226 - full_network 227 - .then_some(Url::parse("https://plc.directory").unwrap()) 228 - .unwrap_or(Url::parse("https://plc.wtf").unwrap()), 229 - ]) 230 - })?; 289 + .unwrap_or_else(|| Ok(defaults.plc_urls.clone()))?; 231 290 232 - let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 3, sec); 233 - let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 291 + let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", defaults.cursor_save_interval, sec); 292 + let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", defaults.repo_fetch_timeout, sec); 234 293 235 - let ephemeral: bool = cfg!("EPHEMERAL", false); 236 - let ephemeral_ttl = cfg!("EPHEMERAL_TTL", 60 * 60, sec); 237 - let database_path = cfg!("DATABASE_PATH", "./hydrant.db"); 238 - let cache_size = cfg!("CACHE_SIZE", 256u64); 239 - let data_compression = cfg!("DATA_COMPRESSION", Compression::Lz4); 240 - let journal_compression = cfg!("JOURNAL_COMPRESSION", Compression::Lz4); 294 + let ephemeral: bool = cfg!("EPHEMERAL", defaults.ephemeral); 295 + let ephemeral_ttl = cfg!("EPHEMERAL_TTL", defaults.ephemeral_ttl, sec); 296 + let database_path = cfg!("DATABASE_PATH", defaults.database_path); 297 + let cache_size = cfg!("CACHE_SIZE", defaults.cache_size); 298 + let data_compression = cfg!("DATA_COMPRESSION", defaults.data_compression); 299 + let journal_compression = cfg!("JOURNAL_COMPRESSION", defaults.journal_compression); 241 300 242 - let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 243 - let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64); 244 - let enable_firehose = cfg!("ENABLE_FIREHOSE", true); 301 + let verify_signatures = cfg!("VERIFY_SIGNATURES", defaults.verify_signatures); 302 + let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", defaults.identity_cache_size); 303 + let enable_firehose = cfg!("ENABLE_FIREHOSE", defaults.enable_firehose); 245 304 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER") 246 305 .ok() 247 306 .and_then(|s| s.parse().ok()); 248 307 249 308 let backfill_concurrency_limit = cfg!( 250 309 "BACKFILL_CONCURRENCY_LIMIT", 251 - full_network.then_some(64usize).unwrap_or(16usize) 310 + defaults.backfill_concurrency_limit 252 311 ); 253 - let firehose_workers = cfg!( 254 - "FIREHOSE_WORKERS", 255 - full_network.then_some(24usize).unwrap_or(8usize) 256 - ); 312 + let firehose_workers = cfg!("FIREHOSE_WORKERS", defaults.firehose_workers); 257 313 258 - let ( 259 - default_db_worker_threads, 260 - default_db_max_journaling_size_mb, 261 - default_db_memtable_size_mb, 262 - ): (usize, u64, u64) = full_network 263 - .then_some((8usize, 1024u64, 192u64)) 264 - .unwrap_or((4usize, 400u64, 32u64)); 265 - 266 - let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads); 314 + let db_worker_threads = cfg!("DB_WORKER_THREADS", defaults.db_worker_threads); 267 315 let db_max_journaling_size_mb = cfg!( 268 316 "DB_MAX_JOURNALING_SIZE_MB", 269 - default_db_max_journaling_size_mb 317 + defaults.db_max_journaling_size_mb 318 + ); 319 + let db_blocks_memtable_size_mb = cfg!( 320 + "DB_BLOCKS_MEMTABLE_SIZE_MB", 321 + defaults.db_blocks_memtable_size_mb 322 + ); 323 + let db_events_memtable_size_mb = cfg!( 324 + "DB_EVENTS_MEMTABLE_SIZE_MB", 325 + defaults.db_events_memtable_size_mb 270 326 ); 271 - let db_blocks_memtable_size_mb = 272 - cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 273 - let db_events_memtable_size_mb = 274 - cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 275 327 let db_records_memtable_size_mb = cfg!( 276 328 "DB_RECORDS_MEMTABLE_SIZE_MB", 277 - // records is did + col + rkey -> CID so its pretty cheap 278 - default_db_memtable_size_mb / 3 * 2 329 + defaults.db_records_memtable_size_mb 330 + ); 331 + let db_repos_memtable_size_mb = cfg!( 332 + "DB_REPOS_MEMTABLE_SIZE_MB", 333 + defaults.db_repos_memtable_size_mb 279 334 ); 280 - let db_repos_memtable_size_mb = 281 - cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb / 2); 282 335 283 - let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize); 284 - let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize); 336 + let crawler_max_pending_repos = cfg!( 337 + "CRAWLER_MAX_PENDING_REPOS", 338 + defaults.crawler_max_pending_repos 339 + ); 340 + let crawler_resume_pending_repos = cfg!( 341 + "CRAWLER_RESUME_PENDING_REPOS", 342 + defaults.crawler_resume_pending_repos 343 + ); 285 344 286 345 let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| { 287 346 s.split(',') ··· 304 363 .collect() 305 364 }); 306 365 307 - let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", false); 366 + let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", defaults.enable_backlinks); 308 367 309 368 let default_mode = CrawlerMode::default_for(full_network); 310 369 let crawler_sources = match std::env::var("HYDRANT_CRAWLER_URLS") { ··· 322 381 mode: CrawlerMode::Relay, 323 382 }) 324 383 .collect(), 325 - CrawlerMode::ByCollection => vec![CrawlerSource { 326 - url: Url::parse("https://lightrail.microcosm.blue").unwrap(), 327 - mode: CrawlerMode::ByCollection, 328 - }], 384 + CrawlerMode::ByCollection => defaults.crawler_sources.clone(), 329 385 }, 330 386 }; 331 387