very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest] return error clearly in dispatcher

dawn 978c38e2 cef95082

+9 -6
+9 -6
src/ingest/worker.rs
··· 115 115 let mut shards = Vec::with_capacity(self.num_shards); 116 116 117 117 for i in 0..self.num_shards { 118 + // unbounded here so we dont block other shards potentially 119 + // if one has a small lag or something 118 120 let (tx, rx) = mpsc::unbounded_channel(); 119 121 shards.push(tx); 120 122 ··· 131 133 .into_diagnostic()?; 132 134 } 133 135 134 - info!("started {} ingest shards", self.num_shards); 136 + info!(num = self.num_shards, "started shards"); 135 137 136 138 let _g = handle.enter(); 137 139 ··· 148 150 IngestMessage::BackfillFinished(did) => did, 149 151 }; 150 152 153 + // todo: consider using a different hasher? 151 154 let mut hasher = DefaultHasher::new(); 152 155 did.hash(&mut hasher); 153 156 let hash = hasher.finish(); 154 157 let shard_idx = (hash as usize) % self.num_shards; 155 158 156 159 if let Err(e) = shards[shard_idx].send(msg) { 157 - error!(shard = shard_idx, err = %e, "failed to send message to shard"); 158 - // break if send fails; receiver likely closed 160 + error!(shard = shard_idx, err = %e, "failed to send message to shard, shard panicked?"); 159 161 break; 160 162 } 161 163 } 162 164 163 - error!("firehose worker dispatcher shutting down"); 164 - 165 - Ok(()) 165 + Err(miette::miette!( 166 + "firehose worker dispatcher shutting down, shard died?" 167 + )) 166 168 } 167 169 168 170 #[inline(always)] ··· 357 359 } 358 360 } 359 361 362 + // todo: consider not using seqcst 360 363 state 361 364 .relay_cursors 362 365 .peek_with(&relay, |_, c| c.store(seq, SeqCst));