very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] growing backoff on stream connect retry, also wait on enabled changes when waiting for rate allowance

dawn 97bdeb2e 3ccb5332

+30 -12
+30 -12
src/ingest/firehose.rs
··· 54 54 let host = self.relay_host.host_str().unwrap_or("").to_string(); 55 55 let count_key = crate::db::keys::pds_account_count_key(&host); 56 56 57 + let mut backoff = Duration::from_secs(5); 58 + const MAX_BACKOFF: Duration = Duration::from_secs(60 * 15); // 15 mins 59 + 57 60 loop { 58 61 self.enabled.wait_enabled("firehose").await; 59 62 ··· 71 74 None => info!("no cursor found, live tailing"), 72 75 } 73 76 74 - let mut stream = 75 - match FirehoseStream::connect(self.relay_host.clone(), start_cursor).await { 76 - Ok(s) => s, 77 - Err(e) => { 78 - error!(err = %e, "failed to connect to firehose, retrying in 5s"); 79 - tokio::time::sleep(Duration::from_secs(5)).await; 80 - continue; 81 - } 82 - }; 77 + let mut stream = match FirehoseStream::connect(self.relay_host.clone(), start_cursor) 78 + .await 79 + { 80 + Ok(s) => s, 81 + Err(e) => { 82 + error!(err = %e, backoff_secs = backoff.as_secs(), "failed to connect to firehose, retrying"); 83 + tokio::time::sleep(backoff).await; 84 + backoff = (backoff * 2).min(MAX_BACKOFF); 85 + continue; 86 + } 87 + }; 83 88 84 89 info!("firehose connected"); 90 + backoff = Duration::from_secs(5); 85 91 86 92 let disconnected_by_error = loop { 87 93 tokio::select! { ··· 99 105 if self.is_pds { 100 106 let accounts = self.state.db.get_count(&count_key).await; 101 107 let tier = self.state.pds_tier_for(&host); 102 - self.throttle.wait_for_allow(accounts, &tier).await; 108 + tokio::select! { 109 + _ = self.throttle.wait_for_allow(accounts, &tier) => {} 110 + _ = self.enabled.changed() => { 111 + if !*self.enabled.borrow() { 112 + info!("firehose disabled, disconnecting"); 113 + break false; 114 + } 115 + } 116 + } 103 117 } 104 118 self.handle_message(msg).await 105 119 }, ··· 135 149 }; 136 150 137 151 if disconnected_by_error { 138 - error!("firehose disconnected, reconnecting in 5s..."); 139 - tokio::time::sleep(Duration::from_secs(5)).await; 152 + error!( 153 + backoff_secs = backoff.as_secs(), 154 + "firehose disconnected, reconnecting" 155 + ); 156 + tokio::time::sleep(backoff).await; 157 + backoff = (backoff * 2).min(MAX_BACKOFF); 140 158 } 141 159 } 142 160 }