very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[all] dont thread ephemeral bool through the code, use from app state

dawn 6d64eb9a 56ea2004

+15 -35
+4 -11
src/backfill/mod.rs
··· 40 40 http: reqwest::Client, 41 41 semaphore: Arc<Semaphore>, 42 42 verify_signatures: bool, 43 - ephemeral: bool, 44 43 in_flight: Arc<scc::HashSet<Did<'static>>>, 45 44 enabled: tokio::sync::watch::Receiver<bool>, 46 45 } ··· 52 51 timeout: Duration, 53 52 concurrency_limit: usize, 54 53 verify_signatures: bool, 55 - ephemeral: bool, 56 54 enabled: tokio::sync::watch::Receiver<bool>, 57 55 ) -> Self { 58 56 Self { ··· 67 65 .expect("failed to build http client"), 68 66 semaphore: Arc::new(Semaphore::new(concurrency_limit)), 69 67 verify_signatures, 70 - ephemeral, 71 68 in_flight: Arc::new(scc::HashSet::new()), 72 69 enabled, 73 70 } ··· 143 140 let did = did.clone(); 144 141 let buffer_tx = self.buffer_tx.clone(); 145 142 let verify = self.verify_signatures; 146 - let ephemeral = self.ephemeral; 147 143 148 144 let span = tracing::info_span!("backfill", did = %did); 149 145 tokio::spawn( 150 146 async move { 151 147 let _guard = guard; 152 - let res = did_task( 153 - &state, http, buffer_tx, &did, key, permit, verify, ephemeral, 154 - ) 155 - .await; 148 + let res = 149 + did_task(&state, http, buffer_tx, &did, key, permit, verify).await; 156 150 157 151 if let Err(e) = res { 158 152 error!(err = %e, "process failed"); ··· 187 181 pending_key: Slice, 188 182 _permit: tokio::sync::OwnedSemaphorePermit, 189 183 verify_signatures: bool, 190 - ephemeral: bool, 191 184 ) -> Result<(), BackfillError> { 192 185 let db = &state.db; 193 186 194 - match process_did(&state, &http, &did, verify_signatures, ephemeral).await { 187 + match process_did(&state, &http, &did, verify_signatures).await { 195 188 Ok(Some(_repo_state)) => { 196 189 let did_key = keys::repo_key(&did); 197 190 ··· 391 384 http: &reqwest::Client, 392 385 did: &Did<'static>, 393 386 verify_signatures: bool, 394 - ephemeral: bool, 395 387 ) -> Result<Option<RepoState<'static>>, BackfillError> { 396 388 debug!("starting..."); 397 389 ··· 567 559 568 560 tokio::task::spawn_blocking(move || { 569 561 let filter = app_state.filter.load(); 562 + let ephemeral = app_state.ephemeral; 570 563 let only_index_links = app_state.only_index_links; 571 564 let mut count = 0; 572 565 let mut delta = 0;
+1 -10
src/control/mod.rs
··· 268 268 config.verify_signatures, 269 269 SignatureVerification::Full | SignatureVerification::BackfillOnly 270 270 ), 271 - config.ephemeral, 272 271 state.backfill_enabled.subscribe(), 273 272 ) 274 273 .run() ··· 618 617 let state = state.clone(); 619 618 let handle = tokio::runtime::Handle::current(); 620 619 let config = config.clone(); 621 - move || { 622 - FirehoseWorker::new( 623 - state, 624 - indexer_rx, 625 - config.ephemeral, 626 - config.firehose_workers, 627 - ) 628 - .run(handle) 629 - } 620 + move || FirehoseWorker::new(state, indexer_rx, config.firehose_workers).run(handle) 630 621 }); 631 622 632 623 #[cfg(feature = "indexer")]
+3 -11
src/ingest/indexer.rs
··· 116 116 pub struct FirehoseWorker { 117 117 state: Arc<AppState>, 118 118 rx: IndexerRx, 119 - ephemeral: bool, 120 119 num_shards: usize, 121 120 } 122 121 123 122 struct WorkerContext<'a> { 124 - ephemeral: bool, 125 123 state: &'a AppState, 126 124 batch: OwnedWriteBatch, 127 125 added_blocks: &'a mut i64, ··· 130 128 } 131 129 132 130 impl FirehoseWorker { 133 - pub fn new(state: Arc<AppState>, rx: IndexerRx, ephemeral: bool, num_shards: usize) -> Self { 131 + pub fn new(state: Arc<AppState>, rx: IndexerRx, num_shards: usize) -> Self { 134 132 Self { 135 133 state, 136 134 rx, 137 - ephemeral, 138 135 num_shards, 139 136 } 140 137 } ··· 152 149 shards.push(tx); 153 150 154 151 let state = self.state.clone(); 155 - let ephemeral = self.ephemeral; 156 152 let handle = handle.clone(); 157 153 std::thread::Builder::new() 158 154 .name(format!("ingest-shard-{i}")) 159 155 .spawn(move || { 160 - Self::shard(i, rx, state, ephemeral, handle); 156 + Self::shard(i, rx, state, handle); 161 157 }) 162 158 .into_diagnostic()?; 163 159 } ··· 193 189 id: usize, 194 190 mut rx: mpsc::UnboundedReceiver<IndexerMessage>, 195 191 state: Arc<AppState>, 196 - ephemeral: bool, 197 192 handle: TokioHandle, 198 193 ) { 199 194 let _guard = handle.enter(); ··· 214 209 added_blocks: &mut added_blocks, 215 210 records_delta: &mut records_delta, 216 211 broadcast_events: &mut broadcast_events, 217 - ephemeral, 218 212 }; 219 213 220 214 match msg { ··· 471 465 472 466 let res = ops::apply_commit( 473 467 &mut ctx.batch, 474 - db, 468 + ctx.state, 475 469 repo_state, 476 470 validated, 477 471 &ctx.state.filter.load(), 478 - ctx.ephemeral, 479 - ctx.state.only_index_links, 480 472 )?; 481 473 let repo_state = res.repo_state; 482 474 *ctx.added_blocks += res.blocks_count;
+5 -3
src/ops.rs
··· 15 15 use crate::filter::FilterConfig; 16 16 use crate::ingest::stream::Commit; 17 17 use crate::ingest::validation::ValidatedCommit; 18 + use crate::state::AppState; 18 19 use crate::types::StoredData; 19 20 use crate::types::{ 20 21 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, ··· 205 206 206 207 pub fn apply_commit<'s>( 207 208 batch: &mut OwnedWriteBatch, 208 - db: &Db, 209 + state: &AppState, 209 210 mut repo_state: RepoState<'s>, 210 211 validated: ValidatedCommit<'_>, 211 212 filter: &FilterConfig, 212 - ephemeral: bool, 213 - only_index_links: bool, 214 213 ) -> Result<ApplyCommitResults<'s>> { 214 + let db = &state.db; 215 + let ephemeral = state.ephemeral; 216 + let only_index_links = state.only_index_links; 215 217 let commit = validated.commit; 216 218 let parsed = validated.parsed_blocks; 217 219 let did = &commit.repo;
+2
src/state.rs
··· 33 33 pub firehose_enabled: watch::Sender<bool>, 34 34 #[cfg(feature = "indexer")] 35 35 pub backfill_enabled: watch::Sender<bool>, 36 + pub ephemeral: bool, 36 37 pub ephemeral_ttl: Duration, 37 38 pub only_index_links: bool, 38 39 pub throttler: Throttler, ··· 107 108 firehose_enabled, 108 109 #[cfg(feature = "indexer")] 109 110 backfill_enabled, 111 + ephemeral: config.ephemeral, 110 112 ephemeral_ttl: config.ephemeral_ttl.clone(), 111 113 only_index_links: config.only_index_links, 112 114 throttler: Throttler::new(),