very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose,crawler] add 525 to list of status codes that is hosts fault

dawn f9fea797 fdc452f7

+34 -26
+4 -12
src/crawler/list_repos.rs
··· 3 3 use crate::state::AppState; 4 4 use crate::util::throttle::{OrFailure, ThrottleHandle, Throttler}; 5 5 use crate::util::{ 6 - ErrorForStatus, RetryOutcome, RetryWithBackoff, WatchEnabledExt, is_tls_cert_error, 7 - parse_retry_after, 6 + ErrorForStatus, RetryOutcome, RetryWithBackoff, WatchEnabledExt, is_status_their_fault, 7 + is_tls_cert_error, parse_retry_after, 8 8 }; 9 9 use chrono::{DateTime, TimeDelta, Utc}; 10 10 use fjall::OwnedWriteBatch; ··· 121 121 src = s.source(); 122 122 } 123 123 124 - e.status().map_or(false, |s| { 125 - matches!( 126 - s, 127 - StatusCode::BAD_GATEWAY 128 - | StatusCode::SERVICE_UNAVAILABLE 129 - | StatusCode::GATEWAY_TIMEOUT 130 - | crate::util::CONNECTION_TIMEOUT 131 - | crate::util::SITE_FROZEN 132 - ) 133 - }) 124 + e.status() 125 + .map_or(false, |s| is_status_their_fault(s.as_u16())) 134 126 } 135 127 136 128 /// shared describeRepo signal-checking logic used by both relay and retry producers.
+9 -12
src/ingest/firehose.rs
··· 3 3 use crate::ingest::{BufferTx, IngestMessage}; 4 4 use crate::state::AppState; 5 5 use crate::util::throttle::ThrottleHandle; 6 - use crate::util::{WatchEnabledExt, is_timeout, is_tls_cert_error, is_tls_error_our_fault}; 6 + use crate::util::{ 7 + WatchEnabledExt, is_status_their_fault, is_timeout, is_tls_cert_error, is_tls_error_their_fault, 8 + }; 7 9 use jacquard_common::IntoStatic; 8 10 use jacquard_common::types::did::Did; 9 11 use miette::{IntoDiagnostic, Result}; ··· 13 15 use std::time::Duration; 14 16 use tokio::sync::watch; 15 17 use tokio_websockets::Error as WsError; 18 + use tokio_websockets::upgrade::Error as WsUpgradeError; 16 19 use tracing::{Span, debug, error, info, trace, warn}; 17 20 use url::Url; 18 21 ··· 24 27 } 25 28 26 29 match e { 27 - WsError::Rustls(e) if is_tls_error_our_fault(e) => return true, 30 + WsError::Rustls(e) if is_tls_error_their_fault(e) => return true, 28 31 WsError::Io(io_err) if is_tls_cert_error(io_err) => return true, 29 32 WsError::CannotResolveHost => return true, 30 - WsError::Upgrade(tokio_websockets::upgrade::Error::DidNotSwitchProtocols(status)) => { 31 - return matches!( 32 - *status, 33 - 502 // BAD_GATEWAY 34 - | 503 // SERVICE_UNAVAILABLE 35 - | 504 // GATEWAY_TIMEOUT 36 - | 522 // CONNECTION_TIMEOUT 37 - | 530 // SITE_FROZEN 38 - | 404 // NOT FOUND 39 - ); 33 + WsError::Upgrade(WsUpgradeError::DidNotSwitchProtocols(status)) 34 + if is_status_their_fault(*status) => 35 + { 36 + return true; 40 37 } 41 38 WsError::Protocol(_) | WsError::PayloadTooLong { .. } => return true, 42 39 _ => {}
+21 -2
src/util/mod.rs
··· 41 41 return false; 42 42 }; 43 43 if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 44 - return is_tls_error_our_fault(rustls_err); 44 + return is_tls_error_their_fault(rustls_err); 45 45 } 46 46 if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 47 47 return is_tls_cert_error(nested_io); ··· 49 49 false 50 50 } 51 51 52 - pub fn is_tls_error_our_fault(e: &rustls::Error) -> bool { 52 + pub fn is_tls_error_their_fault(e: &rustls::Error) -> bool { 53 53 use rustls::Error::*; 54 54 matches!( 55 55 *e, ··· 69 69 | PeerSentOversizedRecord 70 70 | NoApplicationProtocol 71 71 ) 72 + } 73 + 74 + pub fn is_status_their_fault(status: u16) -> bool { 75 + return matches!( 76 + status, 77 + 502 // BAD_GATEWAY 78 + | 503 // SERVICE_UNAVAILABLE 79 + | 504 // GATEWAY_TIMEOUT 80 + | 522 // CONNECTION_TIMEOUT 81 + | 525 // SSL_HANDSHAKE_FAILURE 82 + | 530 // SITE_FROZEN 83 + | 404 // NOT FOUND: we know its not our fault because we use known xrpcs.. 84 + ); 72 85 } 73 86 74 87 /// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed. ··· 186 199 }; 187 200 pub const SITE_FROZEN: StatusCode = unsafe { 188 201 match StatusCode::from_u16(530) { 202 + Ok(s) => s, 203 + _ => std::hint::unreachable_unchecked(), 204 + } 205 + }; 206 + pub const SSL_HANDSHAKE_FAILURE: StatusCode = unsafe { 207 + match StatusCode::from_u16(525) { 189 208 Ok(s) => s, 190 209 _ => std::hint::unreachable_unchecked(), 191 210 }