very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[api] websocket pong & better ping #2

open opened by oyster.cafe targeting main

Hydrant should ping others! :P And also, we shouldn't let others pongtimeout, we should pong. Sorry I snuck in a little flake fix too.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3ml6yvtgmei22
+187 -29
Diff #0
+1
flake.nix
··· 66 66 cmake 67 67 websocat 68 68 http-nu 69 + nushell 69 70 clang 70 71 wild 71 72 psmisc
+2
src/api/mod.rs
··· 17 17 mod stats; 18 18 #[cfg(feature = "indexer_stream")] 19 19 mod stream; 20 + #[cfg(any(feature = "relay", feature = "indexer_stream"))] 21 + mod ws; 20 22 mod xrpc; 21 23 22 24 pub async fn serve(hydrant: Hydrant, port: u16) -> miette::Result<()> {
+9 -15
src/api/stream.rs
··· 6 6 response::IntoResponse, 7 7 }; 8 8 use axum_tws::{Message, WebSocket, WebSocketUpgrade}; 9 - use futures::StreamExt; 10 9 use serde::Deserialize; 11 10 use tracing::error; 12 11 ··· 27 26 ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query)) 28 27 } 29 28 30 - async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: StreamQuery) { 31 - let mut stream = hydrant.subscribe(query.cursor); 32 - 33 - while let Some(evt) = stream.next().await { 34 - match serde_json::to_string(&evt) { 35 - Ok(json) => { 36 - if socket.send(Message::text(json)).await.is_err() { 37 - break; 38 - } 39 - } 40 - Err(e) => { 41 - error!(err = %e, "failed to serialize event"); 42 - } 29 + async fn handle_socket(socket: WebSocket, hydrant: Hydrant, query: StreamQuery) { 30 + let events = hydrant.subscribe(query.cursor); 31 + super::ws::run_socket(socket, events, |evt| match serde_json::to_string(&evt) { 32 + Ok(json) => Some(Message::text(json)), 33 + Err(e) => { 34 + error!(err = %e, "failed to serialize event"); 35 + None 43 36 } 44 - } 37 + }) 38 + .await; 45 39 }
+62
src/api/ws.rs
··· 1 + use std::time::Duration; 2 + 3 + use axum_tws::{Message, WebSocket}; 4 + use bytes::Bytes; 5 + use futures::{SinkExt, Stream, StreamExt}; 6 + use tokio::time::{MissedTickBehavior, interval}; 7 + use tracing::{debug, warn}; 8 + 9 + const PING_INTERVAL: Duration = Duration::from_secs(30); 10 + 11 + pub(super) async fn run_socket<S, F>(socket: WebSocket, mut events: S, mut to_message: F) 12 + where 13 + S: Stream + Unpin, 14 + F: FnMut(S::Item) -> Option<Message>, 15 + { 16 + let (mut sink, mut ws_recv) = socket.split(); 17 + 18 + let mut ping_timer = interval(PING_INTERVAL); 19 + ping_timer.set_missed_tick_behavior(MissedTickBehavior::Delay); 20 + ping_timer.tick().await; 21 + 22 + loop { 23 + tokio::select! { 24 + inbound = ws_recv.next() => match inbound { 25 + Some(Ok(m)) if m.is_close() => break, 26 + Some(Ok(m)) if m.is_text() || m.is_binary() => { 27 + debug!("client sent unsolicited data frame, closing"); 28 + break; 29 + } 30 + Some(Ok(m)) if m.is_ping() => { 31 + if let Err(err) = sink.send(Message::pong(m.into_payload())).await { 32 + warn!(err = %err, "ws pong send error"); 33 + break; 34 + } 35 + } 36 + Some(Ok(_)) => {} 37 + Some(Err(e)) => { 38 + warn!(err = %e, "ws recv error"); 39 + break; 40 + } 41 + None => break, 42 + }, 43 + evt = events.next() => match evt { 44 + Some(item) => { 45 + let Some(msg) = to_message(item) else { continue }; 46 + if let Err(err) = sink.send(msg).await { 47 + warn!(err = %err, "ws send error"); 48 + break; 49 + } 50 + } 51 + None => break, 52 + }, 53 + _ = ping_timer.tick() => { 54 + if let Err(err) = sink.send(Message::ping(Bytes::new())).await { 55 + warn!(err = %err, "ws ping send error"); 56 + break; 57 + } 58 + } 59 + } 60 + } 61 + let _ = sink.close().await; 62 + }
+3 -9
src/api/xrpc/subscribe_repos.rs
··· 3 3 response::IntoResponse, 4 4 }; 5 5 use axum_tws::{Message, WebSocket, WebSocketUpgrade}; 6 - use futures::StreamExt; 7 6 use serde::Deserialize; 8 7 9 8 use crate::control::Hydrant; ··· 21 20 ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query)) 22 21 } 23 22 24 - async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) { 25 - let mut stream = hydrant.subscribe_repos(query.cursor); 26 - 27 - while let Some(frame) = stream.next().await { 28 - if socket.send(Message::binary(frame)).await.is_err() { 29 - break; 30 - } 31 - } 23 + async fn handle_socket(socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) { 24 + let stream = hydrant.subscribe_repos(query.cursor); 25 + crate::api::ws::run_socket(socket, stream, |frame| Some(Message::binary(frame))).await; 32 26 }
+1 -2
src/ingest/stream.rs
··· 110 110 } 111 111 return Ok(bytes); 112 112 } 113 - msg if msg.is_ping() => self.ws.send(WsMsg::pong(msg.into_payload())).await?, 114 113 // if ws closed treat it as an error, since why would a host close the stream?? 115 114 // TODO: treat hosts that return these as offline ?????? 116 115 msg if msg.is_close() => { ··· 120 119 ); 121 120 return Err(FirehoseError::StreamClosed { code, reason }); 122 121 } 123 - msg if msg.is_pong() => continue, 122 + msg if msg.is_ping() || msg.is_pong() => continue, 124 123 x => { 125 124 trace!(msg = ?x, "host sent unexpected message"); 126 125 continue;
+1 -1
tests/authenticated_stream.nu
··· 26 26 let output_file = $"($db_path)/stream_output.txt" 27 27 print $"starting stream listener -> ($output_file)" 28 28 # use websocat to capture output. 29 - let stream_pid = (bash -c $"websocat '($ws_url)' > '($output_file)' & echo $!" | str trim | into int) 29 + let stream_pid = (bash -c $"websocat -n '($ws_url)' > '($output_file)' & echo $!" | str trim | into int) 30 30 print $"listener pid: ($stream_pid)" 31 31 32 32 # 4. add repo to hydrant (backfill trigger)
+2 -2
tests/stream.nu
··· 25 25 print $"starting stream listener -> ($live_output)" 26 26 27 27 # start websocat in background to capture live events (no cursor = live only) 28 - let stream_pid = (bash -c $"websocat '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int) 28 + let stream_pid = (bash -c $"websocat -n '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int) 29 29 print $"stream listener pid: ($stream_pid)" 30 30 sleep 1sec 31 31 ··· 77 77 # use same approach as test 1: background process with file output 78 78 # cursor=0 replays from the beginning (no cursor = live-tail only) 79 79 print "starting historical stream listener..." 80 - let history_pid = (bash -c $"websocat '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int) 80 + let history_pid = (bash -c $"websocat -n '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int) 81 81 print $"history listener pid: ($history_pid)" 82 82 83 83 # wait for events to be streamed (should be fast for historical replay)
+52
tests/stream_ping.nu
··· 1 + #!/usr/bin/env nu 2 + use common.nu * 3 + 4 + def main [] { 5 + let port = resolve-test-port 3010 6 + let url = $"http://localhost:($port)" 7 + let ws_url = $"ws://localhost:($port)/stream" 8 + let db_path = (mktemp -d -t hydrant_stream_ping_test.XXXXXX) 9 + 10 + print "testing ping/pong handling on /stream..." 11 + print $"database path: ($db_path)" 12 + 13 + let binary = build-hydrant 14 + let instance = start-hydrant $binary $db_path $port 15 + 16 + mut passed = false 17 + 18 + if (wait-for-api $url) { 19 + let log_file = $"($db_path)/ws.log" 20 + let pid_file = $"($db_path)/ws.pid" 21 + 22 + bash -c $"websocat -n --ping-interval 1 --ping-timeout 4 '($ws_url)' > '($log_file)' 2>&1 & echo $! > '($pid_file)'" 23 + sleep 200ms 24 + let ws_pid = (open $pid_file | str trim | into int) 25 + print $"websocat pid: ($ws_pid)" 26 + 27 + sleep 6sec 28 + 29 + let alive = (do { ^kill -0 $ws_pid } | complete | get exit_code) == 0 30 + if $alive { 31 + print "ping/pong test PASSED: connection alive after 6s of pings" 32 + $passed = true 33 + try { kill $ws_pid } 34 + } else { 35 + print "ping/pong test FAILED: websocat exited, pong likely not received in time" 36 + try { open $log_file | print } 37 + } 38 + } else { 39 + print "api failed to start." 40 + } 41 + 42 + let hydrant_pid = $instance.pid 43 + print $"stopping hydrant - pid: ($hydrant_pid)..." 44 + try { kill $hydrant_pid } 45 + 46 + if $passed { 47 + print "=== TEST PASSED ===" 48 + } else { 49 + print "=== TEST FAILED ===" 50 + exit 1 51 + } 52 + }
+54
tests/subscribe_repos_ping.nu
··· 1 + #!/usr/bin/env nu 2 + use common.nu * 3 + 4 + def main [] { 5 + let port = resolve-test-port 3011 6 + let url = $"http://localhost:($port)" 7 + let ws_url = $"ws://localhost:($port)/xrpc/com.atproto.sync.subscribeRepos" 8 + let db_path = (mktemp -d -t hydrant_subscribe_repos_ping_test.XXXXXX) 9 + 10 + print "testing ping/pong handling on /xrpc/com.atproto.sync.subscribeRepos..." 11 + print $"database path: ($db_path)" 12 + 13 + let binary = build-hydrant-relay 14 + let instance = (with-env { HYDRANT_RELAY: "true" } { 15 + start-hydrant $binary $db_path $port 16 + }) 17 + 18 + mut passed = false 19 + 20 + if (wait-for-api $url) { 21 + let log_file = $"($db_path)/ws.log" 22 + let pid_file = $"($db_path)/ws.pid" 23 + 24 + bash -c $"websocat -n --ping-interval 1 --ping-timeout 4 '($ws_url)' > '($log_file)' 2>&1 & echo $! > '($pid_file)'" 25 + sleep 200ms 26 + let ws_pid = (open $pid_file | str trim | into int) 27 + print $"websocat pid: ($ws_pid)" 28 + 29 + sleep 6sec 30 + 31 + let alive = (do { ^kill -0 $ws_pid } | complete | get exit_code) == 0 32 + if $alive { 33 + print "ping/pong test PASSED: connection alive after 6s of pings" 34 + $passed = true 35 + try { kill $ws_pid } 36 + } else { 37 + print "ping/pong test FAILED: websocat exited, pong likely not received in time" 38 + try { open $log_file | print } 39 + } 40 + } else { 41 + print "api failed to start." 42 + } 43 + 44 + let hydrant_pid = $instance.pid 45 + print $"stopping hydrant - pid: ($hydrant_pid)..." 46 + try { kill $hydrant_pid } 47 + 48 + if $passed { 49 + print "=== TEST PASSED ===" 50 + } else { 51 + print "=== TEST FAILED ===" 52 + exit 1 53 + } 54 + }

History

1 round 0 comments
sign up or login to add to the discussion
oyster.cafe submitted #0
patch application failed: error: patch failed: flake.nix:66 error: flake.nix: patch does not apply error: patch failed: src/api/mod.rs:17 error: src/api/mod.rs: patch does not apply error: patch failed: src/api/stream.rs:6 error: src/api/stream.rs: patch does not apply error: src/api/ws.rs: already exists in working directory error: patch failed: src/api/xrpc/subscribe_repos.rs:3 error: src/api/xrpc/subscribe_repos.rs: patch does not apply error: patch failed: src/ingest/stream.rs:110 error: src/ingest/stream.rs: patch does not apply error: tests/authenticated_stream.nu: No such file or directory error: tests/stream.nu: No such file or directory error: tests/stream_ping.nu: already exists in working directory
expand 0 comments