very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[api] websocket pong & better ping, round 2 #3

open opened by oyster.cafe targeting main

Hydrant should ping others! :P And also, we shouldn't let others pongtimeout, we should pong. Sorry I snuck in a little flake fix too.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mlang3tsq722
+271 -95
Diff #0
+1
flake.nix
··· 66 66 cmake 67 67 websocat 68 68 http-nu 69 + nushell 69 70 clang 70 71 wild 71 72 psmisc
+2
src/api/mod.rs
··· 19 19 mod stats; 20 20 #[cfg(feature = "indexer_stream")] 21 21 mod stream; 22 + #[cfg(any(feature = "relay", feature = "indexer_stream"))] 23 + mod ws; 22 24 mod xrpc; 23 25 24 26 pub async fn serve(hydrant: Hydrant, binds: ApiBinds) -> miette::Result<()> {
+31 -48
src/api/stream.rs
··· 6 6 response::IntoResponse, 7 7 }; 8 8 use axum_tws::{Message, WebSocket, WebSocketUpgrade}; 9 - use futures::StreamExt; 10 9 use serde::Deserialize; 11 10 use tracing::error; 12 11 12 + use super::ws::{WsAction, run_socket}; 13 + 13 14 pub fn router() -> Router<Hydrant> { 14 15 Router::new().route("/", get(handle_stream)) 15 16 } ··· 27 28 ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query)) 28 29 } 29 30 30 - async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: StreamQuery) { 31 + async fn handle_socket(socket: WebSocket, hydrant: Hydrant, query: StreamQuery) { 31 32 let send_timeout = hydrant.stream_send_timeout(); 32 - let mut stream = hydrant.subscribe(query.cursor); 33 - 34 - while let Some(item) = stream.next().await { 35 - let evt = match item { 36 - Ok(evt) => evt, 33 + let events = hydrant.subscribe(query.cursor); 34 + run_socket( 35 + socket, 36 + events, 37 + |item| match item { 38 + Ok(evt) => match serde_json::to_string(&evt) { 39 + Ok(json) => WsAction::Send(Message::text(json)), 40 + Err(e) => { 41 + error!(err = %e, "failed to serialize event"); 42 + WsAction::Skip 43 + } 44 + }, 37 45 Err(err) => { 38 46 let json = serde_json::json!({ 39 47 "type": "error", 40 48 "error": err.code(), 41 49 "message": err.to_string(), 42 50 }); 43 - let _ = tokio::time::timeout( 44 - send_timeout, 45 - socket.send(Message::text(json.to_string())), 46 - ) 47 - .await; 48 - let _ = 49 - tokio::time::timeout(std::time::Duration::from_secs(1), socket.close()).await; 50 - break; 51 - } 52 - }; 53 - 54 - match serde_json::to_string(&evt) { 55 - Ok(json) => { 56 - match tokio::time::timeout(send_timeout, socket.send(Message::text(json))).await { 57 - Ok(Ok(())) => {} 58 - Ok(Err(_)) => break, 59 - Err(_) => { 60 - let err = serde_json::json!({ 61 - "type": "error", 62 - "error": "ConsumerTooSlow", 63 - "message": format!( 64 - "stream socket send blocked for at least {} seconds", 65 - send_timeout.as_secs() 66 - ), 67 - }); 68 - let _ = tokio::time::timeout( 69 - std::time::Duration::from_secs(1), 70 - socket.send(Message::text(err.to_string())), 71 - ) 72 - .await; 73 - let _ = 74 - tokio::time::timeout(std::time::Duration::from_secs(1), socket.close()) 75 - .await; 76 - break; 77 - } 78 - } 79 - } 80 - Err(e) => { 81 - error!(err = %e, "failed to serialize event"); 51 + WsAction::Close(Some(Message::text(json.to_string()))) 82 52 } 83 - } 84 - } 53 + }, 54 + send_timeout, 55 + |timeout_dur| { 56 + let json = serde_json::json!({ 57 + "type": "error", 58 + "error": "ConsumerTooSlow", 59 + "message": format!( 60 + "stream socket send blocked for at least {} seconds", 61 + timeout_dur.as_secs() 62 + ), 63 + }); 64 + Some(Message::text(json.to_string())) 65 + }, 66 + ) 67 + .await; 85 68 }
+94
src/api/ws.rs
··· 1 + use std::time::Duration; 2 + 3 + use axum_tws::{Message, WebSocket}; 4 + use bytes::Bytes; 5 + use futures::{SinkExt, Stream, StreamExt}; 6 + use tokio::time::{MissedTickBehavior, interval, timeout}; 7 + use tracing::{debug, warn}; 8 + 9 + const PING_INTERVAL: Duration = Duration::from_secs(30); 10 + const CLOSE_TIMEOUT: Duration = Duration::from_secs(1); 11 + 12 + pub(super) enum WsAction { 13 + Send(Message), 14 + Skip, 15 + Close(Option<Message>), 16 + } 17 + 18 + pub(super) async fn run_socket<S, F, G>( 19 + socket: WebSocket, 20 + mut events: S, 21 + mut to_action: F, 22 + send_timeout: Duration, 23 + slow_consumer: G, 24 + ) where 25 + S: Stream + Unpin, 26 + F: FnMut(S::Item) -> WsAction, 27 + G: FnOnce(Duration) -> Option<Message>, 28 + { 29 + let (mut sink, mut ws_recv) = socket.split(); 30 + 31 + let mut ping_timer = interval(PING_INTERVAL); 32 + ping_timer.set_missed_tick_behavior(MissedTickBehavior::Delay); 33 + ping_timer.tick().await; 34 + 35 + let mut slow_consumer = Some(slow_consumer); 36 + 37 + loop { 38 + tokio::select! { 39 + inbound = ws_recv.next() => match inbound { 40 + Some(Ok(m)) if m.is_close() => break, 41 + Some(Ok(m)) if m.is_text() || m.is_binary() => { 42 + debug!("client sent unsolicited data frame, closing"); 43 + break; 44 + } 45 + Some(Ok(m)) if m.is_ping() => { 46 + if let Err(err) = sink.send(Message::pong(m.into_payload())).await { 47 + warn!(err = %err, "ws pong send error"); 48 + break; 49 + } 50 + } 51 + Some(Ok(_)) => {} 52 + Some(Err(e)) => { 53 + warn!(err = %e, "ws recv error"); 54 + break; 55 + } 56 + None => break, 57 + }, 58 + evt = events.next() => match evt { 59 + Some(item) => match to_action(item) { 60 + WsAction::Skip => {} 61 + WsAction::Send(msg) => match timeout(send_timeout, sink.send(msg)).await { 62 + Ok(Ok(())) => {} 63 + Ok(Err(err)) => { 64 + warn!(err = %err, "ws send error"); 65 + break; 66 + } 67 + Err(_) => { 68 + if let Some(final_msg) = 69 + slow_consumer.take().and_then(|f| f(send_timeout)) 70 + { 71 + let _ = timeout(CLOSE_TIMEOUT, sink.send(final_msg)).await; 72 + } 73 + break; 74 + } 75 + }, 76 + WsAction::Close(final_msg) => { 77 + if let Some(m) = final_msg { 78 + let _ = timeout(send_timeout, sink.send(m)).await; 79 + } 80 + break; 81 + } 82 + }, 83 + None => break, 84 + }, 85 + _ = ping_timer.tick() => { 86 + if let Err(err) = sink.send(Message::ping(Bytes::new())).await { 87 + warn!(err = %err, "ws ping send error"); 88 + break; 89 + } 90 + } 91 + } 92 + } 93 + let _ = timeout(CLOSE_TIMEOUT, sink.close()).await; 94 + }
+33 -43
src/api/xrpc/subscribe_repos.rs
··· 3 3 response::IntoResponse, 4 4 }; 5 5 use axum_tws::{Message, WebSocket, WebSocketUpgrade}; 6 - use futures::StreamExt; 7 6 use serde::Deserialize; 8 7 use tracing::error; 9 8 9 + use crate::api::ws::{WsAction, run_socket}; 10 10 use crate::control::{Hydrant, RelayStreamError}; 11 11 use crate::ingest::stream::encode_error_frame; 12 12 ··· 23 23 ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query)) 24 24 } 25 25 26 - async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) { 26 + async fn handle_socket(socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) { 27 27 let send_timeout = hydrant.stream_send_timeout(); 28 - let mut stream = hydrant.subscribe_repos(query.cursor); 29 - 30 - while let Some(item) = stream.next().await { 31 - let frame = match item { 32 - Ok(frame) => frame, 33 - Err(err) => { 34 - send_error_frame(&mut socket, send_timeout, &err).await; 35 - break; 36 - } 37 - }; 38 - 39 - match tokio::time::timeout(send_timeout, socket.send(Message::binary(frame))).await { 40 - Ok(Ok(())) => {} 41 - Ok(Err(_)) => break, 42 - Err(_) => { 43 - let err = RelayStreamError::ConsumerTooSlow { 44 - reason: format!( 45 - "relay stream socket send blocked for at least {} seconds", 46 - send_timeout.as_secs() 47 - ), 48 - }; 49 - send_error_frame(&mut socket, std::time::Duration::from_secs(1), &err).await; 50 - break; 28 + let stream = hydrant.subscribe_repos(query.cursor); 29 + run_socket( 30 + socket, 31 + stream, 32 + |item| match item { 33 + Ok(frame) => WsAction::Send(Message::binary(frame)), 34 + Err(err) => match encode_error_frame(err.code(), Some(&err.to_string())) { 35 + Ok(frame) => WsAction::Close(Some(Message::binary(frame))), 36 + Err(e) => { 37 + error!(err = %e, "failed to encode relay stream error frame"); 38 + WsAction::Close(None) 39 + } 40 + }, 41 + }, 42 + send_timeout, 43 + |timeout_dur| { 44 + let err = RelayStreamError::ConsumerTooSlow { 45 + reason: format!( 46 + "relay stream socket send blocked for at least {} seconds", 47 + timeout_dur.as_secs() 48 + ), 49 + }; 50 + match encode_error_frame(err.code(), Some(&err.to_string())) { 51 + Ok(frame) => Some(Message::binary(frame)), 52 + Err(e) => { 53 + error!(err = %e, "failed to encode relay stream error frame"); 54 + None 55 + } 51 56 } 52 - } 53 - } 54 - } 55 - 56 - async fn send_error_frame( 57 - socket: &mut WebSocket, 58 - timeout: std::time::Duration, 59 - err: &RelayStreamError, 60 - ) { 61 - match encode_error_frame(err.code(), Some(&err.to_string())) { 62 - Ok(frame) => { 63 - let _ = tokio::time::timeout(timeout, socket.send(Message::binary(frame))).await; 64 - } 65 - Err(e) => { 66 - error!(err = %e, "failed to encode relay stream error frame"); 67 - } 68 - } 69 - let _ = tokio::time::timeout(std::time::Duration::from_secs(1), socket.close()).await; 57 + }, 58 + ) 59 + .await; 70 60 }
+1 -1
src/ingest/stream.rs
··· 110 110 } 111 111 return Ok(bytes); 112 112 } 113 - msg if msg.is_ping() => self.ws.send(WsMsg::pong(msg.into_payload())).await?, 114 113 // if ws closed treat it as an error, since why would a host close the stream?? 115 114 // TODO: treat hosts that return these as offline ?????? 116 115 msg if msg.is_close() => { ··· 120 119 ); 121 120 return Err(FirehoseError::StreamClosed { code, reason }); 122 121 } 122 + msg if msg.is_ping() => self.ws.send(WsMsg::pong(msg.into_payload())).await?, 123 123 msg if msg.is_pong() => continue, 124 124 x => { 125 125 trace!(msg = ?x, "host sent unexpected message");
+1 -1
tests/authenticated_stream.nu
··· 26 26 let output_file = $"($db_path)/stream_output.txt" 27 27 print $"starting stream listener -> ($output_file)" 28 28 # use websocat to capture output. 29 - let stream_pid = (bash -c $"websocat '($ws_url)' > '($output_file)' & echo $!" | str trim | into int) 29 + let stream_pid = (bash -c $"websocat -n '($ws_url)' > '($output_file)' & echo $!" | str trim | into int) 30 30 print $"listener pid: ($stream_pid)" 31 31 32 32 # 4. add repo to hydrant (backfill trigger)
+2 -2
tests/stream.nu
··· 25 25 print $"starting stream listener -> ($live_output)" 26 26 27 27 # start websocat in background to capture live events (no cursor = live only) 28 - let stream_pid = (bash -c $"websocat '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int) 28 + let stream_pid = (bash -c $"websocat -n '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int) 29 29 print $"stream listener pid: ($stream_pid)" 30 30 sleep 1sec 31 31 ··· 77 77 # use same approach as test 1: background process with file output 78 78 # cursor=0 replays from the beginning (no cursor = live-tail only) 79 79 print "starting historical stream listener..." 80 - let history_pid = (bash -c $"websocat '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int) 80 + let history_pid = (bash -c $"websocat -n '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int) 81 81 print $"history listener pid: ($history_pid)" 82 82 83 83 # wait for events to be streamed (should be fast for historical replay)
+52
tests/stream_ping.nu
··· 1 + #!/usr/bin/env nu 2 + use common.nu * 3 + 4 + def main [] { 5 + let port = resolve-test-port 3010 6 + let url = $"http://localhost:($port)" 7 + let ws_url = $"ws://localhost:($port)/stream" 8 + let db_path = (mktemp -d -t hydrant_stream_ping_test.XXXXXX) 9 + 10 + print "testing ping/pong handling on /stream..." 11 + print $"database path: ($db_path)" 12 + 13 + let binary = build-hydrant 14 + let instance = start-hydrant $binary $db_path $port 15 + 16 + mut passed = false 17 + 18 + if (wait-for-api $url) { 19 + let log_file = $"($db_path)/ws.log" 20 + let pid_file = $"($db_path)/ws.pid" 21 + 22 + bash -c $"websocat -n --ping-interval 1 --ping-timeout 4 '($ws_url)' > '($log_file)' 2>&1 & echo $! > '($pid_file)'" 23 + sleep 200ms 24 + let ws_pid = (open $pid_file | str trim | into int) 25 + print $"websocat pid: ($ws_pid)" 26 + 27 + sleep 6sec 28 + 29 + let alive = (do { ^kill -0 $ws_pid } | complete | get exit_code) == 0 30 + if $alive { 31 + print "ping/pong test PASSED: connection alive after 6s of pings" 32 + $passed = true 33 + try { kill $ws_pid } 34 + } else { 35 + print "ping/pong test FAILED: websocat exited, pong likely not received in time" 36 + try { open $log_file | print } 37 + } 38 + } else { 39 + print "api failed to start." 40 + } 41 + 42 + let hydrant_pid = $instance.pid 43 + print $"stopping hydrant - pid: ($hydrant_pid)..." 44 + try { kill $hydrant_pid } 45 + 46 + if $passed { 47 + print "=== TEST PASSED ===" 48 + } else { 49 + print "=== TEST FAILED ===" 50 + exit 1 51 + } 52 + }
+54
tests/subscribe_repos_ping.nu
··· 1 + #!/usr/bin/env nu 2 + use common.nu * 3 + 4 + def main [] { 5 + let port = resolve-test-port 3011 6 + let url = $"http://localhost:($port)" 7 + let ws_url = $"ws://localhost:($port)/xrpc/com.atproto.sync.subscribeRepos" 8 + let db_path = (mktemp -d -t hydrant_subscribe_repos_ping_test.XXXXXX) 9 + 10 + print "testing ping/pong handling on /xrpc/com.atproto.sync.subscribeRepos..." 11 + print $"database path: ($db_path)" 12 + 13 + let binary = build-hydrant-relay 14 + let instance = (with-env { HYDRANT_RELAY: "true" } { 15 + start-hydrant $binary $db_path $port 16 + }) 17 + 18 + mut passed = false 19 + 20 + if (wait-for-api $url) { 21 + let log_file = $"($db_path)/ws.log" 22 + let pid_file = $"($db_path)/ws.pid" 23 + 24 + bash -c $"websocat -n --ping-interval 1 --ping-timeout 4 '($ws_url)' > '($log_file)' 2>&1 & echo $! > '($pid_file)'" 25 + sleep 200ms 26 + let ws_pid = (open $pid_file | str trim | into int) 27 + print $"websocat pid: ($ws_pid)" 28 + 29 + sleep 6sec 30 + 31 + let alive = (do { ^kill -0 $ws_pid } | complete | get exit_code) == 0 32 + if $alive { 33 + print "ping/pong test PASSED: connection alive after 6s of pings" 34 + $passed = true 35 + try { kill $ws_pid } 36 + } else { 37 + print "ping/pong test FAILED: websocat exited, pong likely not received in time" 38 + try { open $log_file | print } 39 + } 40 + } else { 41 + print "api failed to start." 42 + } 43 + 44 + let hydrant_pid = $instance.pid 45 + print $"stopping hydrant - pid: ($hydrant_pid)..." 46 + try { kill $hydrant_pid } 47 + 48 + if $passed { 49 + print "=== TEST PASSED ===" 50 + } else { 51 + print "=== TEST FAILED ===" 52 + exit 1 53 + } 54 + }

History

1 round 0 comments
sign up or login to add to the discussion
oyster.cafe submitted #0
patch application failed: error: patch failed: flake.nix:66 error: flake.nix: patch does not apply error: patch failed: src/api/mod.rs:19 error: src/api/mod.rs: patch does not apply error: patch failed: src/api/stream.rs:6 error: src/api/stream.rs: patch does not apply error: src/api/ws.rs: already exists in working directory error: patch failed: src/api/xrpc/subscribe_repos.rs:3 error: src/api/xrpc/subscribe_repos.rs: patch does not apply error: patch failed: src/ingest/stream.rs:110 error: src/ingest/stream.rs: patch does not apply error: tests/authenticated_stream.nu: No such file or directory error: tests/stream.nu: No such file or directory error: tests/stream_ping.nu: already exists in working directory
expand 0 comments