very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] respond to pings, better treat close messages

dawn ceb03614 c1e028f9

+23 -14
+3 -4
src/ingest/firehose.rs
··· 101 101 if self.state.pds_meta.load().is_banned(host) { 102 102 break Ok(()); 103 103 } 104 - 105 104 self.enabled.wait_enabled("firehose").await; 106 105 106 + // sleep stream backoff out if we have any 107 107 tokio::time::sleep(backoff).await; 108 108 109 + // get cursor 109 110 let start_cursor = self 110 111 .state 111 112 .firehose_cursors ··· 114 115 (val > 0).then_some(val) 115 116 }) 116 117 .flatten(); 117 - 118 118 match start_cursor { 119 119 Some(c) => info!(cursor = %c, "resuming from cursor"), 120 120 None => info!("no cursor found, live tailing"), ··· 155 155 let res = loop { 156 156 tokio::select! { 157 157 msg = stream.next() => { 158 - let Some(bytes_res) = msg else { break Err(FirehoseError::EmptyFrame); }; 159 - let bytes = match bytes_res { 158 + let bytes = match msg { 160 159 Ok(b) => b, 161 160 Err(e) => break Err(e), 162 161 };
+20 -10
src/ingest/stream.rs
··· 2 2 3 3 use axum::http::Uri; 4 4 use bytes::Bytes; 5 - use futures::StreamExt; 5 + use futures::{SinkExt, StreamExt}; 6 6 use jacquard_common::error::DecodeError; 7 7 use jacquard_common::{ 8 8 CowStr, ··· 15 15 use serde::{Deserialize, Serialize}; 16 16 use smol_str::format_smolstr; 17 17 use thiserror::Error; 18 - use tokio_websockets::{ClientBuilder, WebSocketStream}; 18 + use tokio_websockets::{ClientBuilder, Message as WsMsg, WebSocketStream}; 19 19 use tracing::trace; 20 20 use url::Url; 21 21 ··· 42 42 UnknownType(String), 43 43 #[error("cbor decode error: {0}")] 44 44 Cbor(String), 45 + #[error("stream closed")] 46 + StreamClosed, 45 47 } 46 48 47 49 impl From<serde_ipld_dagcbor::DecodeError<Infallible>> for FirehoseError { ··· 76 78 } 77 79 78 80 /// gets the next message bytes from the firehose 79 - pub async fn next(&mut self) -> Option<Result<Bytes, FirehoseError>> { 81 + /// none means the stream is closed 82 + pub async fn next(&mut self) -> Result<Bytes, FirehoseError> { 80 83 loop { 81 - match self.ws.next().await? { 82 - Err(e) => return Some(Err(e.into())), 83 - Ok(msg) if msg.is_binary() => { 84 + let res = self 85 + .ws 86 + .next() 87 + .await 88 + .map(|m| m.map_err(Into::into)) 89 + .unwrap_or_else(|| Err(FirehoseError::StreamClosed))?; 90 + match res { 91 + msg if msg.is_binary() => { 84 92 let bytes: Bytes = msg.into_payload().into(); 85 93 if bytes.is_empty() { 86 - return Some(Err(FirehoseError::EmptyFrame)); 94 + return Err(FirehoseError::EmptyFrame); 87 95 } 88 - return Some(Ok(bytes)); 96 + return Ok(bytes); 89 97 } 90 - Ok(msg) if msg.is_close() => return None, 91 - Ok(x) => { 98 + msg if msg.is_ping() => self.ws.send(WsMsg::pong(msg.into_payload())).await?, 99 + // if ws closed treat it as an error, since why would a host close the stream?? 100 + msg if msg.is_close() => return Err(FirehoseError::StreamClosed), 101 + x => { 92 102 trace!(msg = ?x, "relay sent unexpected message"); 93 103 continue; 94 104 }