very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] print better stream closed errors

dawn 05d3493c 5c5bef95

+17 -18
+16 -4
src/ingest/stream.rs
··· 42 42 UnknownType(String), 43 43 #[error("cbor decode error: {0}")] 44 44 Cbor(String), 45 - #[error("stream closed")] 46 - StreamClosed, 45 + #[error("stream closed: {code}: {reason}")] 46 + StreamClosed { code: u16, reason: String }, 47 47 } 48 48 49 49 impl From<serde_ipld_dagcbor::DecodeError<Infallible>> for FirehoseError { ··· 86 86 .next() 87 87 .await 88 88 .map(|m| m.map_err(Into::into)) 89 - .unwrap_or_else(|| Err(FirehoseError::StreamClosed))?; 89 + .unwrap_or_else(|| { 90 + Err(FirehoseError::StreamClosed { 91 + code: 0, 92 + reason: "closed".to_owned(), 93 + }) 94 + })?; 90 95 match res { 91 96 msg if msg.is_binary() => { 92 97 let bytes: Bytes = msg.into_payload().into(); ··· 97 102 } 98 103 msg if msg.is_ping() => self.ws.send(WsMsg::pong(msg.into_payload())).await?, 99 104 // if ws closed treat it as an error, since why would a host close the stream?? 100 - msg if msg.is_close() => return Err(FirehoseError::StreamClosed), 105 + // TODO: treat hosts that return these as offline ?????? 106 + msg if msg.is_close() => { 107 + let (code, reason) = msg.as_close().map_or_else( 108 + || (0, "no reason".to_string()), 109 + |(code, reason)| (code.into(), reason.to_owned()), 110 + ); 111 + return Err(FirehoseError::StreamClosed { code, reason }); 112 + } 101 113 x => { 102 114 trace!(msg = ?x, "relay sent unexpected message"); 103 115 continue;
+1 -14
src/util/mod.rs
··· 70 70 use rustls::AlertDescription; 71 71 use rustls::Error::*; 72 72 73 - if let AlertReceived(alert) = e { 74 - return !matches!( 75 - alert, 76 - // these mean we did something wrong 77 - AlertDescription::BadCertificate 78 - | AlertDescription::CertificateUnknown 79 - | AlertDescription::CertificateRequired 80 - | AlertDescription::UnknownCA 81 - | AlertDescription::AccessDenied 82 - | AlertDescription::InsufficientSecurity 83 - | AlertDescription::UnknownPSKIdentity 84 - ); 85 - } 86 - 87 73 matches!( 88 74 *e, 89 75 InvalidCertificate(_) ··· 95 81 | NoCertificatesPresented 96 82 | UnsupportedNameType 97 83 | DecryptError 84 + | AlertReceived(_) 98 85 | PeerIncompatible(_) 99 86 | InvalidCertRevocationList(_) 100 87 | InvalidEncryptedClientHello(_)