very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[indexer] add config option for only indexing links

dawn 56ea2004 b918a396

+53 -7
+1
README.md
··· 219 219 | `PLC_URL` | `https://plc.wtf`, `https://plc.directory` if full network | base URL(s) of the PLC directory (comma-separated for multiple). | 220 220 | `EPHEMERAL` | `false` | if enabled, no records are stored. events are deleted after a certain duration (`EPHEMERAL_TTL`). | 221 221 | `EPHEMERAL_TTL` | `60min`, `3d` in relay mode | decides after how long events should be deleted. | 222 + | `ONLY_INDEX_LINKS` | `false` | indexer only. if enabled, record blocks are not stored, only the index (records, counts, events) is kept. `getRecord`, `listRecords`, and `getRepo` will return errors. the event stream still works but create/update events will not include record values. | 222 223 | `FULL_NETWORK` | `false` (indexer), `true` (relay) | if `true`, discovers and indexes all repositories in the network. | 223 224 | `FILTER_SIGNALS` | | comma-separated list of NSID patterns to use for the filter (e.g. `app.bsky.feed.post,app.bsky.graph.*`). | 224 225 | `FILTER_COLLECTIONS` | | comma-separated list of NSID patterns to use for the collections filter. |
+11 -2
src/backfill/mod.rs
··· 567 567 568 568 tokio::task::spawn_blocking(move || { 569 569 let filter = app_state.filter.load(); 570 + let only_index_links = app_state.only_index_links; 570 571 let mut count = 0; 571 572 let mut delta = 0; 572 573 let mut added_blocks = 0; ··· 646 647 let cid_raw = cid.to_bytes(); 647 648 let block_key = Slice::from(keys::block_key(collection, &cid_raw)); 648 649 if !ephemeral { 649 - batch.insert(&app_state.db.blocks, block_key.clone(), val.as_ref()); 650 + if !only_index_links { 651 + batch.insert(&app_state.db.blocks, block_key.clone(), val.as_ref()); 652 + } 650 653 batch.insert(&app_state.db.records, db_key, cid_raw); 651 654 #[cfg(feature = "backlinks")] 652 655 if let Ok(value) = serde_ipld_dagcbor::from_slice::<jacquard_common::Data>(val.as_ref()) { ··· 675 678 collection: CowStr::Borrowed(collection), 676 679 rkey, 677 680 action, 678 - data: ephemeral.then_some(StoredData::Block(val)).unwrap_or_else(|| StoredData::Ptr(cid_obj.to_ipld().expect("valid cid"))), 681 + data: if ephemeral { 682 + StoredData::Block(val) 683 + } else if only_index_links { 684 + StoredData::Nothing 685 + } else { 686 + StoredData::Ptr(cid_obj.to_ipld().expect("valid cid")) 687 + }, 679 688 }; 680 689 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 681 690 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes);
+13
src/config.rs
··· 368 368 /// set via `HYDRANT_ENABLE_BACKLINKS=true`. 369 369 pub enable_backlinks: bool, 370 370 371 + /// if `true`, record blocks are not stored; only the index (records, counts, events) is kept. 372 + /// `getRecord`, `listRecords`, and `getRepo` will return errors when this is enabled. 373 + /// event stream still functions but create/update events will not include record values. 374 + /// only valid in indexer mode (not relay). 375 + /// set via `HYDRANT_ONLY_INDEX_LINKS=true`. 376 + pub only_index_links: bool, 377 + 371 378 /// base URL(s) of relay or aggregator services to seed firehose PDS sources from at startup. 372 379 /// 373 380 /// hydrant calls `com.atproto.sync.listHosts` on each URL and adds the returned PDSes ··· 484 491 filter_collections: None, 485 492 filter_excludes: None, 486 493 enable_backlinks: false, 494 + only_index_links: false, 487 495 tier_rules: vec![], 488 496 tier_policy: { 489 497 let mut tiers = HashMap::new(); ··· 659 667 }); 660 668 661 669 let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", defaults.enable_backlinks); 670 + let only_index_links: bool = cfg!("ONLY_INDEX_LINKS", defaults.only_index_links); 662 671 663 672 // start with built-in tier definitions, then layer in any env-defined overrides. 664 673 // format: HYDRANT_RATE_TIERS=name:base/mul/hourly/daily,... ··· 772 781 filter_collections, 773 782 filter_excludes, 774 783 enable_backlinks, 784 + only_index_links, 775 785 tier_policy, 776 786 tier_rules, 777 787 cache_size, ··· 885 895 } 886 896 if self.enable_backlinks { 887 897 config_line!(f, "backlinks", "enabled")?; 898 + } 899 + if self.only_index_links { 900 + config_line!(f, "only index links", "true")?; 888 901 } 889 902 if !self.seed_hosts.is_empty() { 890 903 config_line!(
+5
src/control/mod.rs
··· 128 128 pub async fn new(config: Config) -> Result<Self> { 129 129 info!("{config}"); 130 130 131 + #[cfg(feature = "relay")] 132 + if config.only_index_links { 133 + miette::bail!("HYDRANT_ONLY_INDEX_LINKS is not supported in relay mode"); 134 + } 135 + 131 136 // 1. open database and construct AppState 132 137 let state = AppState::new(&config)?; 133 138
+10
src/control/repos/indexer.rs
··· 334 334 impl<'i> RepoHandle<'i> { 335 335 /// gets a record from this repository. 336 336 pub async fn get_record(&self, collection: &str, rkey: &str) -> Result<Option<Record>> { 337 + if self.state.only_index_links { 338 + miette::bail!("block storage is disabled (HYDRANT_ONLY_INDEX_LINKS)"); 339 + } 340 + 337 341 let did = self.did.clone().into_static(); 338 342 let db_key = keys::record_key(&did, collection, &DbRkey::new(rkey)); 339 343 ··· 376 380 reverse: bool, 377 381 cursor: Option<&str>, 378 382 ) -> Result<RecordList> { 383 + if self.state.only_index_links { 384 + miette::bail!("block storage is disabled (HYDRANT_ONLY_INDEX_LINKS)"); 385 + } 379 386 let did = self.did.clone().into_static(); 380 387 381 388 let state = self.state.clone(); ··· 479 486 &self, 480 487 ) -> Result<Option<impl futures::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static>> 481 488 { 489 + if self.state.only_index_links { 490 + miette::bail!("block storage is disabled (HYDRANT_ONLY_INDEX_LINKS)"); 491 + } 482 492 use iroh_car::{CarHeader, CarWriter}; 483 493 use jacquard_repo::{BlockStore, MemoryBlockStore, Mst}; 484 494 use miette::WrapErr;
+1
src/ingest/indexer.rs
··· 476 476 validated, 477 477 &ctx.state.filter.load(), 478 478 ctx.ephemeral, 479 + ctx.state.only_index_links, 479 480 )?; 480 481 let repo_state = res.repo_state; 481 482 *ctx.added_blocks += res.blocks_count;
+10 -5
src/ops.rs
··· 210 210 validated: ValidatedCommit<'_>, 211 211 filter: &FilterConfig, 212 212 ephemeral: bool, 213 + only_index_links: bool, 213 214 ) -> Result<ApplyCommitResults<'s>> { 214 215 let commit = validated.commit; 215 216 let parsed = validated.parsed_blocks; ··· 257 258 258 259 blocks_count += 1; 259 260 if !ephemeral { 260 - batch.insert(&db.blocks, block_key.clone(), bytes.as_ref()); 261 + if !only_index_links { 262 + batch.insert(&db.blocks, block_key.clone(), bytes.as_ref()); 263 + } 261 264 batch.insert(&db.records, db_key.clone(), cid_raw); 262 265 // accumulate counts 263 266 if action == DbAction::Create { ··· 314 317 data: block 315 318 .map(StoredData::Block) 316 319 .or_else(|| { 317 - op.cid 318 - .as_ref() 319 - .map(|c| c.to_ipld().expect("valid cid")) 320 - .map(StoredData::Ptr) 320 + (!only_index_links).then(|| { 321 + op.cid 322 + .as_ref() 323 + .map(|c| c.to_ipld().expect("valid cid")) 324 + .map(StoredData::Ptr) 325 + })? 321 326 }) 322 327 .unwrap_or(StoredData::Nothing), 323 328 };
+2
src/state.rs
··· 34 34 #[cfg(feature = "indexer")] 35 35 pub backfill_enabled: watch::Sender<bool>, 36 36 pub ephemeral_ttl: Duration, 37 + pub only_index_links: bool, 37 38 pub throttler: Throttler, 38 39 } 39 40 ··· 107 108 #[cfg(feature = "indexer")] 108 109 backfill_enabled, 109 110 ephemeral_ttl: config.ephemeral_ttl.clone(), 111 + only_index_links: config.only_index_links, 110 112 throttler: Throttler::new(), 111 113 }) 112 114 }