very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[all] ephemeral mode

dawn 0081e8af b738c536

+187 -96
+13 -12
Cargo.lock
··· 1041 1041 1042 1042 [[package]] 1043 1043 name = "fjall" 1044 - version = "3.0.4" 1044 + version = "3.1.0" 1045 1045 source = "registry+https://github.com/rust-lang/crates.io-index" 1046 - checksum = "0ebf22b812878dcd767879cb19e03124fd62563dce6410f96538175fba0c132d" 1046 + checksum = "40cb1eb0cef3792900897b32c8282f6417bc978f6af46400a2f14bf0e649ae30" 1047 1047 dependencies = [ 1048 1048 "byteorder-lite", 1049 1049 "byteview", ··· 1536 1536 "jacquard-derive", 1537 1537 "jacquard-identity", 1538 1538 "jacquard-repo", 1539 + "lsm-tree", 1539 1540 "miette", 1540 1541 "mimalloc", 1541 1542 "multibase", ··· 2088 2089 2089 2090 [[package]] 2090 2091 name = "libc" 2091 - version = "0.2.182" 2092 + version = "0.2.183" 2092 2093 source = "registry+https://github.com/rust-lang/crates.io-index" 2093 - checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" 2094 + checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" 2094 2095 2095 2096 [[package]] 2096 2097 name = "libmimalloc-sys" ··· 2150 2151 2151 2152 [[package]] 2152 2153 name = "lsm-tree" 2153 - version = "3.0.4" 2154 + version = "3.1.0" 2154 2155 source = "registry+https://github.com/rust-lang/crates.io-index" 2155 - checksum = "e9bfd2a6ea0c1d430c13643002f35800a87f200fc8ac4827f18a2db9d9fd0644" 2156 + checksum = "fc5fa40c207eed45c811085aaa1b0a25fead22e298e286081cd4b98785fe759b" 2156 2157 dependencies = [ 2157 2158 "byteorder-lite", 2158 2159 "byteview", ··· 2673 2674 2674 2675 [[package]] 2675 2676 name = "quinn-proto" 2676 - version = "0.11.13" 2677 + version = "0.11.14" 2677 2678 source = "registry+https://github.com/rust-lang/crates.io-index" 2678 - checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" 2679 + checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" 2679 2680 dependencies = [ 2680 2681 "aws-lc-rs", 2681 2682 "bytes", ··· 4690 4691 4691 4692 [[package]] 4692 4693 name = "zerocopy" 4693 - version = "0.8.40" 4694 + version = "0.8.42" 4694 4695 source = "registry+https://github.com/rust-lang/crates.io-index" 4695 - checksum = "a789c6e490b576db9f7e6b6d661bcc9799f7c0ac8352f56ea20193b2681532e5" 4696 + checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" 4696 4697 dependencies = [ 4697 4698 "zerocopy-derive", 4698 4699 ] 4699 4700 4700 4701 [[package]] 4701 4702 name = "zerocopy-derive" 4702 - version = "0.8.40" 4703 + version = "0.8.42" 4703 4704 source = "registry+https://github.com/rust-lang/crates.io-index" 4704 - checksum = "f65c489a7071a749c849713807783f70672b28094011623e200cb86dcb835953" 4705 + checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" 4705 4706 dependencies = [ 4706 4707 "proc-macro2", 4707 4708 "quote",
+2 -1
Cargo.toml
··· 15 15 serde_json = "1.0" 16 16 rmp-serde = { git = "https://github.com/90-008/msgpack-rust.git" } 17 17 18 - fjall = "3.0" 18 + fjall = "3.1" 19 19 serde_ipld_dagcbor = "0.6" 20 20 serde_urlencoded = "0.7" 21 21 ··· 46 46 rustls = { version = "0.23", features = ["aws-lc-rs"] } 47 47 tokio-tungstenite = { version = "0.28.0", features = ["rustls-tls-native-roots"] } 48 48 multibase = "0.9.2" 49 + lsm-tree = "3.1.0" 49 50 50 51 [dev-dependencies] 51 52 tempfile = "3.26.0"
+1
README.md
··· 45 45 | `ENABLE_DEBUG` | `false` | enable debug endpoints. | 46 46 | `DEBUG_PORT` | `3001` | port for debug endpoints (if enabled). | 47 47 | `NO_LZ4_COMPRESSION` | `false` | disable lz4 compression for storage. | 48 + | `EPHEMERAL` | `false` | if enabled, no records are stored (XRPCs won't be reliable). events are only stored up to an hour for playback. | 48 49 | `ENABLE_FIREHOSE` | `true` | whether to ingest relay subscriptions. | 49 50 | `ENABLE_BACKFILL` | `true` | whether to backfill from PDS instances. | 50 51 | `ENABLE_CRAWLER` | `false` (if Filter), `true` (if Full) | whether to actively query the network for unknown repositories. |
+57 -66
src/config.rs
··· 41 41 pub relay_host: Url, 42 42 pub plc_urls: Vec<Url>, 43 43 pub full_network: bool, 44 + pub ephemeral: bool, 44 45 pub cursor_save_interval: Duration, 45 46 pub repo_fetch_timeout: Duration, 46 47 pub log_level: SmolStr, ··· 111 112 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec); 112 113 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 113 114 115 + let ephemeral: bool = cfg!("EPHEMERAL", false); 114 116 let database_path = cfg!("DATABASE_PATH", "./hydrant.db"); 115 117 let cache_size = cfg!("CACHE_SIZE", 256u64); 116 118 let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false); ··· 184 186 database_path, 185 187 relay_host, 186 188 plc_urls, 189 + ephemeral, 187 190 full_network, 188 191 cursor_save_interval, 189 192 repo_fetch_timeout, ··· 216 219 } 217 220 } 218 221 222 + macro_rules! config_line { 223 + ($f:expr, $label:expr, $value:expr) => { 224 + writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH) 225 + }; 226 + } 227 + 219 228 impl fmt::Display for Config { 220 229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 230 + const LABEL_WIDTH: usize = 27; 231 + 221 232 writeln!(f, "hydrant configuration:")?; 222 - writeln!(f, " log level: {}", self.log_level)?; 223 - writeln!(f, " relay host: {}", self.relay_host)?; 224 - writeln!(f, " plc urls: {:?}", self.plc_urls)?; 225 - writeln!(f, " full network indexing: {}", self.full_network)?; 226 - writeln!(f, " verify signatures: {}", self.verify_signatures)?; 227 - writeln!( 233 + config_line!(f, "log level", self.log_level)?; 234 + config_line!(f, "relay host", self.relay_host)?; 235 + config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?; 236 + config_line!(f, "full network indexing", self.full_network)?; 237 + config_line!(f, "verify signatures", self.verify_signatures)?; 238 + config_line!(f, "backfill concurrency", self.backfill_concurrency_limit)?; 239 + config_line!(f, "identity cache size", self.identity_cache_size)?; 240 + config_line!( 228 241 f, 229 - " backfill concurrency: {}", 230 - self.backfill_concurrency_limit 242 + "cursor save interval", 243 + format_args!("{}sec", self.cursor_save_interval.as_secs()) 231 244 )?; 232 - writeln!( 245 + config_line!( 233 246 f, 234 - " identity cache size: {}", 235 - self.identity_cache_size 247 + "repo fetch timeout", 248 + format_args!("{}sec", self.repo_fetch_timeout.as_secs()) 236 249 )?; 237 - writeln!( 250 + config_line!(f, "ephemeral", self.ephemeral)?; 251 + config_line!(f, "database path", self.database_path.to_string_lossy())?; 252 + config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?; 253 + config_line!(f, "disable lz4 compression", self.disable_lz4_compression)?; 254 + config_line!(f, "api port", self.api_port)?; 255 + config_line!(f, "firehose workers", self.firehose_workers)?; 256 + config_line!(f, "db worker threads", self.db_worker_threads)?; 257 + config_line!( 238 258 f, 239 - " cursor save interval: {}sec", 240 - self.cursor_save_interval.as_secs() 259 + "db journal size", 260 + format_args!("{} mb", self.db_max_journaling_size_mb) 241 261 )?; 242 - writeln!( 262 + config_line!( 243 263 f, 244 - " repo fetch timeout: {}sec", 245 - self.repo_fetch_timeout.as_secs() 264 + "db pending memtable", 265 + format_args!("{} mb", self.db_pending_memtable_size_mb) 246 266 )?; 247 - writeln!( 267 + config_line!( 248 268 f, 249 - " database path: {}", 250 - self.database_path.to_string_lossy() 269 + "db blocks memtable", 270 + format_args!("{} mb", self.db_blocks_memtable_size_mb) 251 271 )?; 252 - writeln!(f, " cache size: {} mb", self.cache_size)?; 253 - writeln!( 272 + config_line!( 254 273 f, 255 - " disable lz4 compression: {}", 256 - self.disable_lz4_compression 274 + "db repos memtable", 275 + format_args!("{} mb", self.db_repos_memtable_size_mb) 257 276 )?; 258 - writeln!(f, " api port: {}", self.api_port)?; 259 - writeln!(f, " firehose workers: {}", self.firehose_workers)?; 260 - writeln!(f, " db worker threads: {}", self.db_worker_threads)?; 261 - writeln!( 277 + config_line!( 262 278 f, 263 - " db journal size: {} mb", 264 - self.db_max_journaling_size_mb 279 + "db events memtable", 280 + format_args!("{} mb", self.db_events_memtable_size_mb) 265 281 )?; 266 - writeln!( 282 + config_line!( 267 283 f, 268 - " db pending memtable: {} mb", 269 - self.db_pending_memtable_size_mb 284 + "db records memtable", 285 + format_args!("{} mb", self.db_records_memtable_size_mb) 270 286 )?; 271 - writeln!( 287 + config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?; 288 + config_line!( 272 289 f, 273 - " db blocks memtable: {} mb", 274 - self.db_blocks_memtable_size_mb 275 - )?; 276 - writeln!( 277 - f, 278 - " db repos memtable: {} mb", 279 - self.db_repos_memtable_size_mb 280 - )?; 281 - writeln!( 282 - f, 283 - " db events memtable: {} mb", 284 - self.db_events_memtable_size_mb 285 - )?; 286 - writeln!( 287 - f, 288 - " db records memtable: {} mb", 289 - self.db_records_memtable_size_mb 290 - )?; 291 - 292 - writeln!( 293 - f, 294 - " crawler max pending: {}", 295 - self.crawler_max_pending_repos 296 - )?; 297 - writeln!( 298 - f, 299 - " crawler resume pending: {}", 290 + "crawler resume pending", 300 291 self.crawler_resume_pending_repos 301 292 )?; 302 293 if let Some(signals) = &self.filter_signals { 303 - writeln!(f, " filter signals: {:?}", signals)?; 294 + config_line!(f, "filter signals", format_args!("{:?}", signals))?; 304 295 } 305 296 if let Some(collections) = &self.filter_collections { 306 - writeln!(f, " filter collections: {:?}", collections)?; 297 + config_line!(f, "filter collections", format_args!("{:?}", collections))?; 307 298 } 308 299 if let Some(excludes) = &self.filter_excludes { 309 - writeln!(f, " filter excludes: {:?}", excludes)?; 300 + config_line!(f, "filter excludes", format_args!("{:?}", excludes))?; 310 301 } 311 - writeln!(f, " enable debug: {}", self.enable_debug)?; 302 + config_line!(f, "enable debug", self.enable_debug)?; 312 303 if self.enable_debug { 313 - writeln!(f, " debug port: {}", self.debug_port)?; 304 + config_line!(f, "debug port", self.debug_port)?; 314 305 } 315 306 Ok(()) 316 307 }
+5 -7
src/crawler/mod.rs
··· 24 24 use url::Url; 25 25 26 26 const MAX_RETRY_ATTEMPTS: u32 = 5; 27 - const MAX_RETRY_BATCH: usize = 512; 27 + const MAX_RETRY_BATCH: usize = 1000; 28 28 29 29 #[derive(Debug, Serialize, Deserialize)] 30 30 struct RetryState { ··· 192 192 } 193 193 } 194 194 195 - async fn get_cursor(crawler: &Self) -> Result<Cursor> { 196 - let cursor_bytes = Db::get(crawler.state.db.cursors.clone(), CURSOR_KEY).await?; 195 + async fn get_cursor(&self) -> Result<Cursor> { 196 + let cursor_bytes = Db::get(self.state.db.cursors.clone(), CURSOR_KEY).await?; 197 197 let cursor: Cursor = cursor_bytes 198 198 .as_deref() 199 199 .map(rmp_serde::from_slice) ··· 297 297 let mut rng: SmallRng = rand::make_rng(); 298 298 let db = &crawler.state.db; 299 299 300 - let mut cursor = Self::get_cursor(&crawler).await?; 300 + let mut cursor = crawler.get_cursor().await?; 301 301 302 302 match &cursor { 303 303 Cursor::Next(Some(c)) => info!(cursor = %c, "resuming"), ··· 401 401 }; 402 402 403 403 let mut batch = db.inner.batch(); 404 - let mut to_queue = Vec::new(); 405 404 let filter = crawler.state.filter.load(); 406 405 407 406 struct ParseResult { ··· 510 509 let state = RepoState::untracked(rng.next_u64()); 511 510 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 512 511 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 513 - to_queue.push(did.clone()); 514 512 } 515 513 516 514 if let Some(new_cursor) = next_cursor { ··· 545 543 }) 546 544 .ok(); 547 545 548 - crawler.account_new_repos(to_queue.len()).await; 546 + crawler.account_new_repos(valid_dids.len()).await; 549 547 550 548 if matches!(cursor, Cursor::Done(_)) { 551 549 tokio::time::sleep(Duration::from_secs(3600)).await;
+67
src/db/compaction.rs
··· 1 + use fjall::compaction::filter::Context; 2 + use lsm_tree::compaction::{CompactionFilter, Factory}; 3 + use lsm_tree::compaction::{ItemAccessor, Verdict}; 4 + 5 + mod drop_all { 6 + use super::*; 7 + 8 + pub struct DropAllFilter; 9 + 10 + impl CompactionFilter for DropAllFilter { 11 + fn filter_item(&mut self, _: ItemAccessor<'_>, _: &Context) -> lsm_tree::Result<Verdict> { 12 + Ok(Verdict::Destroy) 13 + } 14 + } 15 + 16 + pub struct DropAllFilterFactory; 17 + 18 + impl Factory for DropAllFilterFactory { 19 + fn name(&self) -> &str { 20 + "drop_all" 21 + } 22 + 23 + fn make_filter(&self, _: &Context) -> Box<dyn CompactionFilter> { 24 + Box::new(DropAllFilter) 25 + } 26 + } 27 + } 28 + pub use drop_all::*; 29 + 30 + mod drop_prefix { 31 + use super::*; 32 + 33 + pub struct DropPrefixFilter { 34 + prefix: &'static [u8], 35 + } 36 + 37 + impl CompactionFilter for DropPrefixFilter { 38 + fn filter_item( 39 + &mut self, 40 + item: ItemAccessor<'_>, 41 + _: &Context, 42 + ) -> lsm_tree::Result<Verdict> { 43 + Ok(item 44 + .key() 45 + .starts_with(&self.prefix) 46 + .then_some(Verdict::Destroy) 47 + .unwrap_or(Verdict::Keep)) 48 + } 49 + } 50 + 51 + pub struct DropPrefixFilterFactory { 52 + pub prefix: &'static [u8], 53 + } 54 + 55 + impl Factory for DropPrefixFilterFactory { 56 + fn name(&self) -> &str { 57 + "drop_prefix" 58 + } 59 + 60 + fn make_filter(&self, _: &Context) -> Box<dyn CompactionFilter> { 61 + Box::new(DropPrefixFilter { 62 + prefix: self.prefix, 63 + }) 64 + } 65 + } 66 + } 67 + pub use drop_prefix::*;
+42 -10
src/db/mod.rs
··· 1 + use crate::db::compaction::{DropAllFilterFactory, DropPrefixFilterFactory}; 1 2 use crate::types::{BroadcastEvent, RepoState}; 3 + use fjall::compaction::{Fifo, Levelled}; 2 4 use fjall::config::BlockSizePolicy; 3 5 use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode, Slice}; 4 6 use jacquard_common::IntoStatic; 5 7 use jacquard_common::types::string::Did; 8 + use lsm_tree::compaction::Factory; 6 9 use miette::{Context, IntoDiagnostic, Result}; 7 10 use scc::HashMap; 8 11 use smol_str::SmolStr; 9 12 10 13 use std::sync::Arc; 11 14 15 + pub mod compaction; 12 16 pub mod filter; 13 17 pub mod keys; 14 18 pub mod types; ··· 90 94 const fn kb(v: u32) -> u32 { 91 95 v * 1024 92 96 } 97 + const fn mb(v: u64) -> u64 { 98 + v * 1024 * 1024 99 + } 93 100 94 101 let db = Database::builder(&cfg.database_path) 95 102 .cache_size(cfg.cache_size * 2_u64.pow(20) / 2) ··· 100 107 .unwrap_or(fjall::CompressionType::Lz4), 101 108 ) 102 109 .worker_threads(cfg.db_worker_threads) 103 - .max_journaling_size(cfg.db_max_journaling_size_mb * 1024 * 1024) 110 + .max_journaling_size(mb(cfg.db_max_journaling_size_mb)) 111 + .with_compaction_filter_factories({ 112 + let ephemeral = cfg.ephemeral; 113 + let f = move |ks: &str| match ks { 114 + "records" => { 115 + ephemeral.then(|| -> Arc<dyn Factory> { Arc::new(DropAllFilterFactory) }) 116 + } 117 + "counts" => ephemeral.then(|| -> Arc<dyn Factory> { 118 + Arc::new(DropPrefixFilterFactory { 119 + prefix: keys::COUNT_COLLECTION_PREFIX, 120 + }) 121 + }), 122 + _ => None, 123 + }; 124 + Arc::new(f) 125 + }) 104 126 .open() 105 127 .into_diagnostic()?; 106 128 let db = Arc::new(db); ··· 115 137 opts() 116 138 // crawler checks if a repo doesn't exist 117 139 .expect_point_read_hits(false) 118 - .max_memtable_size(cfg.db_repos_memtable_size_mb * 1024 * 1024) 140 + .max_memtable_size(mb(cfg.db_repos_memtable_size_mb)) 119 141 .data_block_size_policy(BlockSizePolicy::all(kb(4))), 120 142 )?; 121 143 let blocks = open_ks( ··· 123 145 opts() 124 146 // point reads are used a lot by stream 125 147 .expect_point_read_hits(true) 126 - .max_memtable_size(cfg.db_blocks_memtable_size_mb * 1024 * 1024) 148 + .max_memtable_size(mb(cfg.db_blocks_memtable_size_mb)) 127 149 // 32 - 64 kb is probably fine, as the newer blocks will be in the first levels 128 150 // and any consumers will probably be streaming the newer events... 129 151 .data_block_size_policy(BlockSizePolicy::new([kb(4), kb(8), kb(32), kb(64)])), ··· 135 157 // since this keyspace is big, turning off bloom filters will help a lot 136 158 opts() 137 159 .expect_point_read_hits(true) 138 - .max_memtable_size(cfg.db_records_memtable_size_mb * 1024 * 1024) 160 + .max_memtable_size(mb(cfg.db_records_memtable_size_mb)) 139 161 .data_block_size_policy(BlockSizePolicy::all(kb(8))), 140 162 )?; 141 163 let cursors = open_ks( ··· 143 165 opts() 144 166 // cursor point reads hit almost 100% of the time 145 167 .expect_point_read_hits(true) 168 + .max_memtable_size(mb(4)) 146 169 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 147 170 )?; 148 171 let pending = open_ks( ··· 150 173 opts() 151 174 // iterated over as a queue, no point reads are used so bloom filters are disabled 152 175 .expect_point_read_hits(true) 153 - .max_memtable_size(cfg.db_pending_memtable_size_mb * 1024 * 1024) 176 + .max_memtable_size(mb(cfg.db_pending_memtable_size_mb)) 154 177 .data_block_size_policy(BlockSizePolicy::all(kb(4))), 155 178 )?; 156 179 // resync point reads often miss (because most repos aren't resyncing), so keeping the bloom filter helps avoid disk hits 157 180 let resync = open_ks( 158 181 "resync", 159 - opts().data_block_size_policy(BlockSizePolicy::all(kb(8))), 182 + opts() 183 + .max_memtable_size(mb(cfg.db_pending_memtable_size_mb)) 184 + .data_block_size_policy(BlockSizePolicy::all(kb(8))), 160 185 )?; 161 186 let resync_buffer = open_ks( 162 187 "resync_buffer", 163 188 opts() 164 189 // iterated during backfill, no point reads 165 190 .expect_point_read_hits(true) 191 + .max_memtable_size(mb(16)) 166 192 .data_block_size_policy(BlockSizePolicy::all(kb(32))), 167 193 )?; 168 194 let events = open_ks( ··· 170 196 opts() 171 197 // only iterators are used here, no point reads 172 198 .expect_point_read_hits(true) 173 - .max_memtable_size(cfg.db_events_memtable_size_mb * 1024 * 1024) 174 - .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])), 199 + .max_memtable_size(mb(cfg.db_events_memtable_size_mb)) 200 + .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])) 201 + .compaction_strategy(if cfg.ephemeral { 202 + Arc::new(Fifo::new(mb(512), Some(60 * 60))) 203 + } else { 204 + Arc::new(Levelled::default()) 205 + }), 175 206 )?; 176 207 let counts = open_ks( 177 208 "counts", 178 209 opts() 179 210 // count increments hit because counters are mostly pre-initialized 180 211 .expect_point_read_hits(true) 212 + .max_memtable_size(mb(32)) 181 213 // the data is very small 182 214 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 183 215 )?; ··· 188 220 "filter", 189 221 // this can be pretty small since the DIDs wont be compressed that well anyhow 190 222 opts() 191 - .max_memtable_size((kb(1024) * 16) as u64) 223 + .max_memtable_size(mb(16)) 192 224 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 193 225 )?; 194 226 195 227 let crawler = open_ks( 196 228 "crawler", 197 229 opts() 198 - .max_memtable_size((kb(1024) * 16) as u64) 230 + .max_memtable_size(mb(16)) 199 231 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 200 232 )?; 201 233