very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest] change the unbounded channels to be bounded

dawn fe41ba1f 13953dff

+108 -64
+49 -25
src/ingest/indexer.rs
··· 141 141 } 142 142 } 143 143 144 - // starts the worker threads and the main dispatch loop 145 - // the dispatch loop reads from the firehose channel and 146 - // distributes messages to shards based on the hash of the DID 147 - pub fn run(mut self, handle: TokioHandle) -> Result<()> { 148 - let mut shards = Vec::with_capacity(self.num_shards); 144 + pub fn run(self, handle: TokioHandle) -> Result<()> { 145 + use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered}; 146 + 147 + let mut shards: Vec<mpsc::Sender<IndexerMessage>> = Vec::with_capacity(self.num_shards); 149 148 150 149 for i in 0..self.num_shards { 151 - // unbounded here so we dont block other shards potentially 152 - // if one has a small lag or something 153 - let (tx, rx) = mpsc::unbounded_channel(); 150 + let (tx, rx) = mpsc::channel(64); 154 151 shards.push(tx); 155 152 156 153 let state = self.state.clone(); ··· 165 162 166 163 info!(num = self.num_shards, "started shards"); 167 164 168 - while let Some(msg) = self.rx.blocking_recv() { 169 - let did = match &msg { 170 - IndexerMessage::Event(e) => match &e.data { 171 - IndexerEventData::Commit(m) => &m.commit.repo, 172 - IndexerEventData::Identity(m) => &m.identity.did, 173 - IndexerEventData::Account(m) => &m.account.did, 174 - IndexerEventData::Sync(did) => did, 175 - }, 176 - IndexerMessage::NewRepo(did) => did, 177 - IndexerMessage::BackfillFinished(did) => did, 178 - }; 165 + let num_shards = self.num_shards; 166 + let mut rx = self.rx; 167 + 168 + handle.block_on(async move { 169 + let mut pending: FuturesUnordered< 170 + BoxFuture<'_, Result<(), mpsc::error::SendError<IndexerMessage>>>, 171 + > = FuturesUnordered::new(); 179 172 180 - let shard_idx = (util::hash(did) as usize) % self.num_shards; 181 - if let Err(e) = shards[shard_idx].send(msg) { 182 - error!(shard = shard_idx, err = %e, "failed to send message to shard, shard panicked?"); 183 - break; 173 + loop { 174 + tokio::select! { 175 + msg = rx.recv(), if pending.len() < num_shards => { 176 + let Some(msg) = msg else { break; }; 177 + let shard_idx = { 178 + let did = match &msg { 179 + IndexerMessage::Event(e) => match &e.data { 180 + IndexerEventData::Commit(m) => &m.commit.repo, 181 + IndexerEventData::Identity(m) => &m.identity.did, 182 + IndexerEventData::Account(m) => &m.account.did, 183 + IndexerEventData::Sync(did) => did, 184 + }, 185 + IndexerMessage::NewRepo(did) => did, 186 + IndexerMessage::BackfillFinished(did) => did, 187 + }; 188 + (util::hash(did) as usize) % num_shards 189 + }; 190 + match shards[shard_idx].try_send(msg) { 191 + Ok(()) => {} 192 + Err(mpsc::error::TrySendError::Full(msg)) => { 193 + pending.push(Box::pin(shards[shard_idx].send(msg))); 194 + } 195 + Err(mpsc::error::TrySendError::Closed(_)) => { 196 + error!(shard = shard_idx, "shard closed unexpectedly"); 197 + break; 198 + } 199 + } 200 + } 201 + Some(result) = pending.next(), if !pending.is_empty() => { 202 + if let Err(e) = result { 203 + error!(err = %e, "failed to send to shard, shard panicked?"); 204 + break; 205 + } 206 + } 207 + } 184 208 } 185 - } 209 + }); 186 210 187 211 Err(miette::miette!( 188 212 "firehose worker dispatcher shutting down, shard died?" ··· 192 216 #[inline(always)] 193 217 fn shard( 194 218 id: usize, 195 - mut rx: mpsc::UnboundedReceiver<IndexerMessage>, 219 + mut rx: mpsc::Receiver<IndexerMessage>, 196 220 state: Arc<AppState>, 197 221 handle: TokioHandle, 198 222 ) {
+59 -39
src/ingest/relay.rs
··· 90 90 } 91 91 } 92 92 93 - pub fn run(mut self, handle: Handle) -> Result<()> { 94 - let mut shards = Vec::with_capacity(self.num_shards); 93 + pub fn run(self, handle: Handle) -> Result<()> { 94 + use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered}; 95 + 96 + let mut shards: Vec<mpsc::Sender<WorkerMessage>> = Vec::with_capacity(self.num_shards); 95 97 96 98 for i in 0..self.num_shards { 97 - let (tx, rx) = mpsc::unbounded_channel(); 99 + let (tx, rx) = mpsc::channel(64); 98 100 shards.push(tx); 99 101 100 102 let state = self.state.clone(); ··· 125 127 126 128 info!(num = self.num_shards, "relay worker: started shards"); 127 129 128 - let _g = handle.enter(); 130 + let num_shards = self.num_shards; 131 + let mut rx = self.rx; 132 + 133 + handle.block_on(async move { 134 + let mut pending: FuturesUnordered< 135 + BoxFuture<'_, Result<(), mpsc::error::SendError<WorkerMessage>>>, 136 + > = FuturesUnordered::new(); 137 + 138 + loop { 139 + tokio::select! { 140 + msg = rx.recv(), if pending.len() < num_shards => { 141 + let Some(msg) = msg else { break; }; 142 + let IngestMessage::Firehose { url, is_pds, msg } = msg; 129 143 130 - while let Some(msg) = self.rx.blocking_recv() { 131 - let IngestMessage::Firehose { url, is_pds, msg } = msg; 144 + if let SubscribeReposMessage::Info(inf) = msg { 145 + match inf.name { 146 + InfoName::OutdatedCursor => {} 147 + InfoName::Other(name) => { 148 + let message = inf 149 + .message 150 + .unwrap_or_else(|| CowStr::Borrowed("<no message>")); 151 + info!(name = %name, "relay sent info: {message}"); 152 + } 153 + } 154 + continue; 155 + } 132 156 133 - // #info only pertains to us, the direct consumer 134 - if let SubscribeReposMessage::Info(inf) = msg { 135 - match inf.name { 136 - InfoName::OutdatedCursor => { 137 - // todo: handle 157 + let shard_idx = { 158 + let did = match &msg { 159 + SubscribeReposMessage::Commit(c) => &c.repo, 160 + SubscribeReposMessage::Identity(i) => &i.did, 161 + SubscribeReposMessage::Account(a) => &a.did, 162 + SubscribeReposMessage::Sync(s) => &s.did, 163 + _ => continue, 164 + }; 165 + (util::hash(did) as usize) % num_shards 166 + }; 167 + 168 + let worker_msg = WorkerMessage { firehose: url, is_pds, msg }; 169 + match shards[shard_idx].try_send(worker_msg) { 170 + Ok(()) => {} 171 + Err(mpsc::error::TrySendError::Full(worker_msg)) => { 172 + pending.push(Box::pin(shards[shard_idx].send(worker_msg))); 173 + } 174 + Err(mpsc::error::TrySendError::Closed(_)) => { 175 + error!(shard = shard_idx, "relay shard closed unexpectedly"); 176 + break; 177 + } 178 + } 138 179 } 139 - InfoName::Other(name) => { 140 - let message = inf 141 - .message 142 - .unwrap_or_else(|| CowStr::Borrowed("<no message>")); 143 - info!(name = %name, "relay sent info: {message}"); 180 + Some(result) = pending.next(), if !pending.is_empty() => { 181 + if let Err(e) = result { 182 + error!(err = %e, "relay worker: failed to send to shard, shard panicked?"); 183 + break; 184 + } 144 185 } 145 186 } 146 - continue; 147 187 } 148 - 149 - let shard_idx = { 150 - let did = match &msg { 151 - SubscribeReposMessage::Commit(c) => &c.repo, 152 - SubscribeReposMessage::Identity(i) => &i.did, 153 - SubscribeReposMessage::Account(a) => &a.did, 154 - SubscribeReposMessage::Sync(s) => &s.did, 155 - _ => continue, 156 - }; 157 - (util::hash(did) as usize) % self.num_shards 158 - }; 159 - 160 - if let Err(e) = shards[shard_idx].send(WorkerMessage { 161 - firehose: url, 162 - is_pds, 163 - msg, 164 - }) { 165 - error!(shard = shard_idx, err = %e, "relay worker: failed to send to shard"); 166 - break; 167 - } 168 - } 188 + }); 169 189 170 190 Err(miette::miette!("relay worker dispatcher shutting down")) 171 191 } 172 192 173 193 fn shard( 174 194 id: usize, 175 - mut rx: mpsc::UnboundedReceiver<WorkerMessage>, 195 + mut rx: mpsc::Receiver<WorkerMessage>, 176 196 state: Arc<AppState>, 177 197 #[cfg(feature = "indexer")] hook: crate::ingest::indexer::IndexerTx, 178 198 verify_signatures: bool,