very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose,api] use tokio-websockets instead of tokio-tungstenite

dawn c9e93d07 206a720c

+111 -92
+57 -41
Cargo.lock
··· 164 164 dependencies = [ 165 165 "axum-core", 166 166 "axum-macros", 167 - "base64", 168 167 "bytes", 169 168 "form_urlencoded", 170 169 "futures-util", ··· 183 182 "serde_json", 184 183 "serde_path_to_error", 185 184 "serde_urlencoded", 186 - "sha1", 187 185 "sync_wrapper", 188 186 "tokio", 189 - "tokio-tungstenite 0.28.0", 190 187 "tower", 191 188 "tower-layer", 192 189 "tower-service", ··· 221 218 "proc-macro2", 222 219 "quote", 223 220 "syn", 221 + ] 222 + 223 + [[package]] 224 + name = "axum-tws" 225 + version = "0.6.0" 226 + source = "registry+https://github.com/rust-lang/crates.io-index" 227 + checksum = "08bf5c1e7d60af9632c55c23bbfa208d0fe08aa59fcda87216c9119e30a9d6d4" 228 + dependencies = [ 229 + "axum-core", 230 + "base64", 231 + "bytes", 232 + "futures-util", 233 + "http", 234 + "hyper", 235 + "hyper-util", 236 + "sha1_smol", 237 + "tokio", 238 + "tokio-websockets", 224 239 ] 225 240 226 241 [[package]] ··· 1486 1501 "ahash", 1487 1502 "arc-swap", 1488 1503 "axum", 1504 + "axum-tws", 1489 1505 "bytes", 1490 1506 "chrono", 1491 1507 "cid", ··· 1523 1539 "tempfile", 1524 1540 "thiserror 2.0.18", 1525 1541 "tokio", 1526 - "tokio-tungstenite 0.28.0", 1527 1542 "tokio-util", 1543 + "tokio-websockets", 1528 1544 "tower-http", 1529 1545 "tracing", 1530 1546 "tracing-subscriber", ··· 3303 3319 ] 3304 3320 3305 3321 [[package]] 3322 + name = "sha1_smol" 3323 + version = "1.0.1" 3324 + source = "registry+https://github.com/rust-lang/crates.io-index" 3325 + checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" 3326 + 3327 + [[package]] 3306 3328 name = "sha2" 3307 3329 version = "0.10.9" 3308 3330 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3353 3375 version = "0.3.9" 3354 3376 source = "registry+https://github.com/rust-lang/crates.io-index" 3355 3377 checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214" 3378 + 3379 + [[package]] 3380 + name = "simdutf8" 3381 + version = "0.1.5" 3382 + source = "registry+https://github.com/rust-lang/crates.io-index" 3383 + checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" 3356 3384 3357 3385 [[package]] 3358 3386 name = "siphasher" ··· 3691 3719 "rustls-pki-types", 3692 3720 "tokio", 3693 3721 "tokio-rustls", 3694 - "tungstenite 0.24.0", 3695 - ] 3696 - 3697 - [[package]] 3698 - name = "tokio-tungstenite" 3699 - version = "0.28.0" 3700 - source = "registry+https://github.com/rust-lang/crates.io-index" 3701 - checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" 3702 - dependencies = [ 3703 - "futures-util", 3704 - "log", 3705 - "rustls", 3706 - "rustls-native-certs", 3707 - "rustls-pki-types", 3708 - "tokio", 3709 - "tokio-rustls", 3710 - "tungstenite 0.28.0", 3722 + "tungstenite", 3711 3723 ] 3712 3724 3713 3725 [[package]] ··· 3724 3736 "rustls", 3725 3737 "thiserror 1.0.69", 3726 3738 "tokio", 3727 - "tokio-tungstenite 0.24.0", 3739 + "tokio-tungstenite", 3728 3740 "wasm-bindgen", 3729 3741 "web-sys", 3730 3742 ] ··· 3741 3753 "futures-util", 3742 3754 "pin-project-lite", 3743 3755 "tokio", 3756 + ] 3757 + 3758 + [[package]] 3759 + name = "tokio-websockets" 3760 + version = "0.13.2" 3761 + source = "registry+https://github.com/rust-lang/crates.io-index" 3762 + checksum = "dad543404f98bfc969aeb71994105c592acfc6c43323fddcd016bb208d1c65cb" 3763 + dependencies = [ 3764 + "aws-lc-rs", 3765 + "base64", 3766 + "bytes", 3767 + "futures-core", 3768 + "futures-sink", 3769 + "http", 3770 + "httparse", 3771 + "rand 0.10.0", 3772 + "rustls-native-certs", 3773 + "rustls-pki-types", 3774 + "sha1_smol", 3775 + "simdutf8", 3776 + "tokio", 3777 + "tokio-rustls", 3778 + "tokio-util", 3744 3779 ] 3745 3780 3746 3781 [[package]] ··· 3891 3926 "rustls-pki-types", 3892 3927 "sha1", 3893 3928 "thiserror 1.0.69", 3894 - "utf-8", 3895 - ] 3896 - 3897 - [[package]] 3898 - name = "tungstenite" 3899 - version = "0.28.0" 3900 - source = "registry+https://github.com/rust-lang/crates.io-index" 3901 - checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" 3902 - dependencies = [ 3903 - "bytes", 3904 - "data-encoding", 3905 - "http", 3906 - "httparse", 3907 - "log", 3908 - "rand 0.9.2", 3909 - "rustls", 3910 - "rustls-pki-types", 3911 - "sha1", 3912 - "thiserror 2.0.18", 3913 3929 "utf-8", 3914 3930 ] 3915 3931
+3 -2
Cargo.toml
··· 33 33 smol_str = "0.3" 34 34 futures = "0.3" 35 35 reqwest = { version = "0.12.8", features = ["json", "rustls-tls", "stream", "gzip", "brotli", "zstd", "http2"], default-features = false } 36 - axum = { version = "0.8.8", features = ["ws", "macros"] } 36 + axum = { version = "0.8.8", features = ["macros"] } 37 37 tower-http = { version = "0.6.6", features = ["cors", "trace"] } 38 + axum-tws = "0.6.0" 38 39 39 40 jacquard-common = { version = "0.11", default-features = false, features = ["tracing", "std", "crypto"] } 40 41 jacquard-api = { version = "0.11" } ··· 55 56 glob = "0.3" 56 57 arc-swap = "1.8.2" 57 58 rustls = { version = "0.23", features = ["aws-lc-rs"] } 58 - tokio-tungstenite = { version = "0.28.0", features = ["rustls-tls-native-roots"] } 59 + tokio-websockets = { version = "0.13.2", features = ["client", "server", "rustls-native-roots", "aws_lc_rs", "rand"] } 59 60 multibase = "0.9.2" 60 61 sha2 = "0.10.9" 61 62 parking_lot = "0.12.5"
+1 -1
README.md
··· 4 4 -> [vs tap](#vs-tap) | [stream](#stream-behavior) | [multi-relay](#multiple-relay-support) | [seeding](#firehose-seeding) | [crawler sources](#crawler-sources)</br> 5 5 -> [configuration](#configuration) | [build features](#build-features)</br> 6 6 -> [rest api](#rest-api) | [filter](#filter-management) | [ingestion](#ingestion-control) | [crawler](#crawler-management) | [firehose](#firehose-management) | [pds](#pds-management) | [repos](#repository-management)</br> 7 - -> [xrpc api](#data-access-xrpc) | [backlinks](#bluemicrocosmlinks) | [identity](#bluemicrocosmidentity) | [atproto](#comatproto) | [custom](#systemsgazehydrant) 7 + -> [xrpc api](#data-access-xrpc) | [atproto](#comatproto) | [backlinks](#bluemicrocosmlinks) | [identity](#bluemicrocosmidentity) | [custom](#systemsgazehydrant) 8 8 9 9 # hydrant 10 10
+3 -5
src/api/stream.rs
··· 2 2 use axum::Router; 3 3 use axum::routing::get; 4 4 use axum::{ 5 - extract::{ 6 - Query, State, 7 - ws::{Message, WebSocket, WebSocketUpgrade}, 8 - }, 5 + extract::{Query, State}, 9 6 response::IntoResponse, 10 7 }; 8 + use axum_tws::{Message, WebSocket, WebSocketUpgrade}; 11 9 use futures::StreamExt; 12 10 use serde::Deserialize; 13 11 use tracing::error; ··· 35 33 while let Some(evt) = stream.next().await { 36 34 match serde_json::to_string(&evt) { 37 35 Ok(json) => { 38 - if socket.send(Message::Text(json.into())).await.is_err() { 36 + if socket.send(Message::text(json)).await.is_err() { 39 37 break; 40 38 } 41 39 }
+3 -5
src/api/xrpc/subscribe_repos.rs
··· 1 1 use axum::{ 2 - extract::{ 3 - Query, State, 4 - ws::{Message, WebSocket, WebSocketUpgrade}, 5 - }, 2 + extract::{Query, State}, 6 3 response::IntoResponse, 7 4 }; 5 + use axum_tws::{Message, WebSocket, WebSocketUpgrade}; 8 6 use futures::StreamExt; 9 7 use serde::Deserialize; 10 8 ··· 27 25 let mut stream = hydrant.subscribe_repos(query.cursor); 28 26 29 27 while let Some(frame) = stream.next().await { 30 - if socket.send(Message::Binary(frame)).await.is_err() { 28 + if socket.send(Message::binary(frame)).await.is_err() { 31 29 break; 32 30 } 33 31 }
+29 -28
src/ingest/firehose.rs
··· 4 4 use crate::state::AppState; 5 5 use crate::util::throttle::ThrottleHandle; 6 6 use crate::util::{WatchEnabledExt, is_timeout, is_tls_cert_error}; 7 - use hyper::StatusCode; 8 7 use jacquard_common::IntoStatic; 9 8 use jacquard_common::types::did::Did; 10 9 use miette::{IntoDiagnostic, Result}; ··· 13 12 use std::sync::atomic::Ordering; 14 13 use std::time::Duration; 15 14 use tokio::sync::watch; 16 - use tokio_tungstenite::tungstenite::{Error as WsError, error::TlsError as WsTlsError}; 15 + use tokio_websockets::Error as WsError; 17 16 use tracing::{Span, debug, error, info, trace, warn}; 18 17 use url::Url; 19 18 ··· 24 23 return true; 25 24 } 26 25 27 - if let WsError::Tls(e) = e { 28 - match e { 29 - WsTlsError::Rustls(e) => { 30 - return matches!(e.as_ref(), rustls::Error::InvalidCertificate(_)); 26 + match e { 27 + WsError::Rustls(e) => { 28 + if matches!(e, rustls::Error::InvalidCertificate(_)) { 29 + return true; 31 30 } 32 - WsTlsError::InvalidDnsName => {} 33 - _ => {} 34 31 } 35 - } else { 36 - let mut src = e.source(); 37 - while let Some(s) = src { 38 - if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 39 - if is_tls_cert_error(io_err) { 40 - return true; 41 - } 32 + WsError::Io(io_err) => { 33 + if is_tls_cert_error(io_err) { 34 + return true; 42 35 } 43 - src = s.source(); 36 + } 37 + WsError::Upgrade(tokio_websockets::upgrade::Error::DidNotSwitchProtocols(status)) => { 38 + return matches!( 39 + *status, 40 + 502 // BAD_GATEWAY 41 + | 503 // SERVICE_UNAVAILABLE 42 + | 504 // GATEWAY_TIMEOUT 43 + | 522 // CONNECTION_TIMEOUT 44 + | 530 // SITE_FROZEN 45 + ); 44 46 } 47 + _ => {} 45 48 } 46 49 47 - if let WsError::Http(resp) = e { 48 - return matches!( 49 - resp.status(), 50 - StatusCode::BAD_GATEWAY 51 - | StatusCode::SERVICE_UNAVAILABLE 52 - | StatusCode::GATEWAY_TIMEOUT 53 - | crate::util::CONNECTION_TIMEOUT 54 - | crate::util::SITE_FROZEN 55 - ); 50 + let mut src = e.source(); 51 + while let Some(s) = src { 52 + if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 53 + if is_tls_cert_error(io_err) { 54 + return true; 55 + } 56 + } 57 + src = s.source(); 56 58 } 57 59 58 60 matches!( 59 61 e, 60 - WsError::AttackAttempt | WsError::Io(_) | WsError::Protocol(_) 62 + WsError::Io(_) | WsError::Protocol(_) | WsError::PayloadTooLong { .. } 61 63 ) 62 64 } 63 65 ··· 97 99 98 100 #[tracing::instrument(skip(self), fields(relay = %self.relay_host))] 99 101 pub async fn run(mut self) -> Result<()> { 100 - // extract host as owned String to avoid borrow conflicts with &self inside the loop 101 - let host = self.relay_host.host_str().unwrap_or("").to_string(); 102 + let host = self.relay_host.host_str().unwrap_or(""); 102 103 let count_key = crate::db::keys::pds_account_count_key(&host); 103 104 104 105 // this is not for connection throttling (thats handled by ThrottleHandle)
+13 -8
src/ingest/stream.rs
··· 1 1 use std::convert::Infallible; 2 - use std::option::Option; 3 2 3 + use axum::http::Uri; 4 4 use bytes::Bytes; 5 5 use futures::StreamExt; 6 6 use jacquard_common::error::DecodeError; ··· 15 15 use serde::{Deserialize, Serialize}; 16 16 use smol_str::format_smolstr; 17 17 use thiserror::Error; 18 - use tokio::net::TcpStream; 19 - use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message}; 18 + use tokio_websockets::{ClientBuilder, WebSocketStream}; 20 19 use tracing::trace; 21 20 use url::Url; 22 21 23 22 #[derive(Debug, Error, Diagnostic)] 24 23 pub enum FirehoseError { 25 24 #[error("websocket error: {0}")] 26 - WebSocket(#[from] tokio_tungstenite::tungstenite::Error), 25 + WebSocket(#[from] tokio_websockets::Error), 27 26 #[error("unknown scheme: {0}")] 28 27 UnknownScheme(String), 29 28 #[error("decode error: {0}")] ··· 52 51 } 53 52 54 53 pub struct FirehoseStream { 55 - ws: WebSocketStream<MaybeTlsStream<TcpStream>>, 54 + ws: WebSocketStream<tokio_websockets::MaybeTlsStream<tokio::net::TcpStream>>, 56 55 } 57 56 58 57 impl FirehoseStream { ··· 67 66 let cursor = cursor.map(|c| format_smolstr!("cursor={c}")); 68 67 relay.set_query(cursor.as_deref()); 69 68 70 - let (ws, _) = connect_async(relay.as_str()).await?; 69 + let uri: Uri = relay 70 + .as_str() 71 + .parse() 72 + .map_err(|e| FirehoseError::Cbor(format!("invalid uri: {e}")))?; 73 + 74 + let (ws, _) = ClientBuilder::from_uri(uri).connect().await?; 71 75 Ok(Self { ws }) 72 76 } 73 77 ··· 76 80 loop { 77 81 match self.ws.next().await? { 78 82 Err(e) => return Some(Err(e.into())), 79 - Ok(Message::Binary(bytes)) => { 83 + Ok(msg) if msg.is_binary() => { 84 + let bytes: Bytes = msg.into_payload().into(); 80 85 if bytes.is_empty() { 81 86 return Some(Err(FirehoseError::EmptyFrame)); 82 87 } 83 88 return Some(Ok(bytes)); 84 89 } 85 - Ok(Message::Close(_)) => return None, 90 + Ok(msg) if msg.is_close() => return None, 86 91 Ok(x) => { 87 92 trace!(msg = ?x, "relay sent unexpected message"); 88 93 continue;
+2 -2
tests/common.nu
··· 56 56 } 57 57 58 58 export def resolve-pds [did: string] { 59 - let doc = (http get $"https://plc.gaze.systems/($did)" | from json) 59 + let doc = (http get $"https://plc.wtf/($did)" | from json) 60 60 ($doc.service | where type == "AtprotoPersonalDataServer" | first).serviceEndpoint 61 61 } 62 62 ··· 123 123 HYDRANT_API_PORT: ($port | into string), 124 124 HYDRANT_ENABLE_DEBUG: "true", 125 125 HYDRANT_DEBUG_PORT: (resolve-test-debug-port ($port + 1) | into string), 126 - HYDRANT_PLC_URL: "https://plc.gaze.systems", 126 + HYDRANT_PLC_URL: "https://plc.wtf", 127 127 RUST_LOG: "debug,hyper=error,tokio=error,h2=error,tower=error,rustls=error" 128 128 } | merge $hydrant_vars 129 129