very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] send pings to hosts

dawn f1265e9f b1bb208e

+27 -9
+27 -8
src/ingest/stream.rs
··· 1 1 use std::convert::Infallible; 2 + use std::time::Duration; 2 3 3 4 use axum::http::Uri; 4 5 use bytes::Bytes; ··· 56 57 57 58 pub struct FirehoseStream { 58 59 ws: WebSocketStream<tokio_websockets::MaybeTlsStream<tokio::net::TcpStream>>, 60 + ping_timer: tokio::time::Interval, 59 61 } 60 62 61 63 impl FirehoseStream { ··· 76 78 .map_err(|e| FirehoseError::Cbor(format!("invalid uri: {e}")))?; 77 79 78 80 let (ws, _) = ClientBuilder::from_uri(uri).connect().await?; 79 - Ok(Self { ws }) 81 + 82 + let mut ping_timer = tokio::time::interval(Duration::from_secs(45)); 83 + ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 84 + ping_timer.tick().await; // consume the first tick 85 + 86 + Ok(Self { ws, ping_timer }) 80 87 } 81 88 82 89 /// gets the next message bytes from the firehose 83 90 /// none means the stream is closed 84 91 pub async fn next(&mut self) -> Result<Bytes, FirehoseError> { 85 92 loop { 86 - let res = self 87 - .ws 88 - .next() 89 - .await 90 - .map(|m| m.map_err(Into::into)) 91 - .unwrap_or(Err(FirehoseError::TcpDropped))?; 93 + let res = tokio::select! { 94 + _ = self.ping_timer.tick() => { 95 + self.ping().await?; 96 + continue; 97 + } 98 + res = self.ws.next() => { 99 + res.map(|m| m.map_err(Into::into)) 100 + .unwrap_or(Err(FirehoseError::TcpDropped))? 101 + } 102 + }; 92 103 match res { 93 104 msg if msg.is_binary() => { 94 105 let bytes: Bytes = msg.into_payload().into(); ··· 107 118 ); 108 119 return Err(FirehoseError::StreamClosed { code, reason }); 109 120 } 121 + msg if msg.is_pong() => continue, 110 122 x => { 111 - trace!(msg = ?x, "relay sent unexpected message"); 123 + trace!(msg = ?x, "host sent unexpected message"); 112 124 continue; 113 125 } 114 126 } 115 127 } 128 + } 129 + 130 + async fn ping(&mut self) -> Result<(), FirehoseError> { 131 + self.ws 132 + .send(WsMsg::ping(Bytes::new())) 133 + .await 134 + .map_err(Into::into) 116 135 } 117 136 } 118 137
-1
src/util/mod.rs
··· 67 67 } 68 68 69 69 pub fn is_tls_error_their_fault(e: &rustls::Error) -> bool { 70 - use rustls::AlertDescription; 71 70 use rustls::Error::*; 72 71 73 72 matches!(