Hydrant should ping others! :P And also, we shouldn't let others pongtimeout, we should pong. Sorry I snuck in a little flake fix too.
+187
-29
Diff
round #0
+1
flake.nix
+1
flake.nix
+2
src/api/mod.rs
+2
src/api/mod.rs
+9
-15
src/api/stream.rs
+9
-15
src/api/stream.rs
···
6
6
response::IntoResponse,
7
7
};
8
8
use axum_tws::{Message, WebSocket, WebSocketUpgrade};
9
-
use futures::StreamExt;
10
9
use serde::Deserialize;
11
10
use tracing::error;
12
11
···
27
26
ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query))
28
27
}
29
28
30
-
async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: StreamQuery) {
31
-
let mut stream = hydrant.subscribe(query.cursor);
32
-
33
-
while let Some(evt) = stream.next().await {
34
-
match serde_json::to_string(&evt) {
35
-
Ok(json) => {
36
-
if socket.send(Message::text(json)).await.is_err() {
37
-
break;
38
-
}
39
-
}
40
-
Err(e) => {
41
-
error!(err = %e, "failed to serialize event");
42
-
}
29
+
async fn handle_socket(socket: WebSocket, hydrant: Hydrant, query: StreamQuery) {
30
+
let events = hydrant.subscribe(query.cursor);
31
+
super::ws::run_socket(socket, events, |evt| match serde_json::to_string(&evt) {
32
+
Ok(json) => Some(Message::text(json)),
33
+
Err(e) => {
34
+
error!(err = %e, "failed to serialize event");
35
+
None
43
36
}
44
-
}
37
+
})
38
+
.await;
45
39
}
+62
src/api/ws.rs
+62
src/api/ws.rs
···
1
+
use std::time::Duration;
2
+
3
+
use axum_tws::{Message, WebSocket};
4
+
use bytes::Bytes;
5
+
use futures::{SinkExt, Stream, StreamExt};
6
+
use tokio::time::{MissedTickBehavior, interval};
7
+
use tracing::{debug, warn};
8
+
9
+
const PING_INTERVAL: Duration = Duration::from_secs(30);
10
+
11
+
pub(super) async fn run_socket<S, F>(socket: WebSocket, mut events: S, mut to_message: F)
12
+
where
13
+
S: Stream + Unpin,
14
+
F: FnMut(S::Item) -> Option<Message>,
15
+
{
16
+
let (mut sink, mut ws_recv) = socket.split();
17
+
18
+
let mut ping_timer = interval(PING_INTERVAL);
19
+
ping_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
20
+
ping_timer.tick().await;
21
+
22
+
loop {
23
+
tokio::select! {
24
+
inbound = ws_recv.next() => match inbound {
25
+
Some(Ok(m)) if m.is_close() => break,
26
+
Some(Ok(m)) if m.is_text() || m.is_binary() => {
27
+
debug!("client sent unsolicited data frame, closing");
28
+
break;
29
+
}
30
+
Some(Ok(m)) if m.is_ping() => {
31
+
if let Err(err) = sink.send(Message::pong(m.into_payload())).await {
32
+
warn!(err = %err, "ws pong send error");
33
+
break;
34
+
}
35
+
}
36
+
Some(Ok(_)) => {}
37
+
Some(Err(e)) => {
38
+
warn!(err = %e, "ws recv error");
39
+
break;
40
+
}
41
+
None => break,
42
+
},
43
+
evt = events.next() => match evt {
44
+
Some(item) => {
45
+
let Some(msg) = to_message(item) else { continue };
46
+
if let Err(err) = sink.send(msg).await {
47
+
warn!(err = %err, "ws send error");
48
+
break;
49
+
}
50
+
}
51
+
None => break,
52
+
},
53
+
_ = ping_timer.tick() => {
54
+
if let Err(err) = sink.send(Message::ping(Bytes::new())).await {
55
+
warn!(err = %err, "ws ping send error");
56
+
break;
57
+
}
58
+
}
59
+
}
60
+
}
61
+
let _ = sink.close().await;
62
+
}
+3
-9
src/api/xrpc/subscribe_repos.rs
+3
-9
src/api/xrpc/subscribe_repos.rs
···
3
3
response::IntoResponse,
4
4
};
5
5
use axum_tws::{Message, WebSocket, WebSocketUpgrade};
6
-
use futures::StreamExt;
7
6
use serde::Deserialize;
8
7
9
8
use crate::control::Hydrant;
···
21
20
ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query))
22
21
}
23
22
24
-
async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) {
25
-
let mut stream = hydrant.subscribe_repos(query.cursor);
26
-
27
-
while let Some(frame) = stream.next().await {
28
-
if socket.send(Message::binary(frame)).await.is_err() {
29
-
break;
30
-
}
31
-
}
23
+
async fn handle_socket(socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) {
24
+
let stream = hydrant.subscribe_repos(query.cursor);
25
+
crate::api::ws::run_socket(socket, stream, |frame| Some(Message::binary(frame))).await;
32
26
}
+1
-2
src/ingest/stream.rs
+1
-2
src/ingest/stream.rs
···
110
110
}
111
111
return Ok(bytes);
112
112
}
113
-
msg if msg.is_ping() => self.ws.send(WsMsg::pong(msg.into_payload())).await?,
114
113
// if ws closed treat it as an error, since why would a host close the stream??
115
114
// TODO: treat hosts that return these as offline ??????
116
115
msg if msg.is_close() => {
···
120
119
);
121
120
return Err(FirehoseError::StreamClosed { code, reason });
122
121
}
123
-
msg if msg.is_pong() => continue,
122
+
msg if msg.is_ping() || msg.is_pong() => continue,
124
123
x => {
125
124
trace!(msg = ?x, "host sent unexpected message");
126
125
continue;
+1
-1
tests/authenticated_stream.nu
+1
-1
tests/authenticated_stream.nu
···
26
26
let output_file = $"($db_path)/stream_output.txt"
27
27
print $"starting stream listener -> ($output_file)"
28
28
# use websocat to capture output.
29
-
let stream_pid = (bash -c $"websocat '($ws_url)' > '($output_file)' & echo $!" | str trim | into int)
29
+
let stream_pid = (bash -c $"websocat -n '($ws_url)' > '($output_file)' & echo $!" | str trim | into int)
30
30
print $"listener pid: ($stream_pid)"
31
31
32
32
# 4. add repo to hydrant (backfill trigger)
+2
-2
tests/stream.nu
+2
-2
tests/stream.nu
···
25
25
print $"starting stream listener -> ($live_output)"
26
26
27
27
# start websocat in background to capture live events (no cursor = live only)
28
-
let stream_pid = (bash -c $"websocat '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int)
28
+
let stream_pid = (bash -c $"websocat -n '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int)
29
29
print $"stream listener pid: ($stream_pid)"
30
30
sleep 1sec
31
31
···
77
77
# use same approach as test 1: background process with file output
78
78
# cursor=0 replays from the beginning (no cursor = live-tail only)
79
79
print "starting historical stream listener..."
80
-
let history_pid = (bash -c $"websocat '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int)
80
+
let history_pid = (bash -c $"websocat -n '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int)
81
81
print $"history listener pid: ($history_pid)"
82
82
83
83
# wait for events to be streamed (should be fast for historical replay)
+52
tests/stream_ping.nu
+52
tests/stream_ping.nu
···
1
+
#!/usr/bin/env nu
2
+
use common.nu *
3
+
4
+
def main [] {
5
+
let port = resolve-test-port 3010
6
+
let url = $"http://localhost:($port)"
7
+
let ws_url = $"ws://localhost:($port)/stream"
8
+
let db_path = (mktemp -d -t hydrant_stream_ping_test.XXXXXX)
9
+
10
+
print "testing ping/pong handling on /stream..."
11
+
print $"database path: ($db_path)"
12
+
13
+
let binary = build-hydrant
14
+
let instance = start-hydrant $binary $db_path $port
15
+
16
+
mut passed = false
17
+
18
+
if (wait-for-api $url) {
19
+
let log_file = $"($db_path)/ws.log"
20
+
let pid_file = $"($db_path)/ws.pid"
21
+
22
+
bash -c $"websocat -n --ping-interval 1 --ping-timeout 4 '($ws_url)' > '($log_file)' 2>&1 & echo $! > '($pid_file)'"
23
+
sleep 200ms
24
+
let ws_pid = (open $pid_file | str trim | into int)
25
+
print $"websocat pid: ($ws_pid)"
26
+
27
+
sleep 6sec
28
+
29
+
let alive = (do { ^kill -0 $ws_pid } | complete | get exit_code) == 0
30
+
if $alive {
31
+
print "ping/pong test PASSED: connection alive after 6s of pings"
32
+
$passed = true
33
+
try { kill $ws_pid }
34
+
} else {
35
+
print "ping/pong test FAILED: websocat exited, pong likely not received in time"
36
+
try { open $log_file | print }
37
+
}
38
+
} else {
39
+
print "api failed to start."
40
+
}
41
+
42
+
let hydrant_pid = $instance.pid
43
+
print $"stopping hydrant - pid: ($hydrant_pid)..."
44
+
try { kill $hydrant_pid }
45
+
46
+
if $passed {
47
+
print "=== TEST PASSED ==="
48
+
} else {
49
+
print "=== TEST FAILED ==="
50
+
exit 1
51
+
}
52
+
}
+54
tests/subscribe_repos_ping.nu
+54
tests/subscribe_repos_ping.nu
···
1
+
#!/usr/bin/env nu
2
+
use common.nu *
3
+
4
+
def main [] {
5
+
let port = resolve-test-port 3011
6
+
let url = $"http://localhost:($port)"
7
+
let ws_url = $"ws://localhost:($port)/xrpc/com.atproto.sync.subscribeRepos"
8
+
let db_path = (mktemp -d -t hydrant_subscribe_repos_ping_test.XXXXXX)
9
+
10
+
print "testing ping/pong handling on /xrpc/com.atproto.sync.subscribeRepos..."
11
+
print $"database path: ($db_path)"
12
+
13
+
let binary = build-hydrant-relay
14
+
let instance = (with-env { HYDRANT_RELAY: "true" } {
15
+
start-hydrant $binary $db_path $port
16
+
})
17
+
18
+
mut passed = false
19
+
20
+
if (wait-for-api $url) {
21
+
let log_file = $"($db_path)/ws.log"
22
+
let pid_file = $"($db_path)/ws.pid"
23
+
24
+
bash -c $"websocat -n --ping-interval 1 --ping-timeout 4 '($ws_url)' > '($log_file)' 2>&1 & echo $! > '($pid_file)'"
25
+
sleep 200ms
26
+
let ws_pid = (open $pid_file | str trim | into int)
27
+
print $"websocat pid: ($ws_pid)"
28
+
29
+
sleep 6sec
30
+
31
+
let alive = (do { ^kill -0 $ws_pid } | complete | get exit_code) == 0
32
+
if $alive {
33
+
print "ping/pong test PASSED: connection alive after 6s of pings"
34
+
$passed = true
35
+
try { kill $ws_pid }
36
+
} else {
37
+
print "ping/pong test FAILED: websocat exited, pong likely not received in time"
38
+
try { open $log_file | print }
39
+
}
40
+
} else {
41
+
print "api failed to start."
42
+
}
43
+
44
+
let hydrant_pid = $instance.pid
45
+
print $"stopping hydrant - pid: ($hydrant_pid)..."
46
+
try { kill $hydrant_pid }
47
+
48
+
if $passed {
49
+
print "=== TEST PASSED ==="
50
+
} else {
51
+
print "=== TEST FAILED ==="
52
+
exit 1
53
+
}
54
+
}
History
1 round
0 comments
oyster.cafe
submitted
#0
patch application failed: error: patch failed: flake.nix:66
error: flake.nix: patch does not apply
error: patch failed: src/api/mod.rs:17
error: src/api/mod.rs: patch does not apply
error: patch failed: src/api/stream.rs:6
error: src/api/stream.rs: patch does not apply
error: src/api/ws.rs: already exists in working directory
error: patch failed: src/api/xrpc/subscribe_repos.rs:3
error: src/api/xrpc/subscribe_repos.rs: patch does not apply
error: patch failed: src/ingest/stream.rs:110
error: src/ingest/stream.rs: patch does not apply
error: tests/authenticated_stream.nu: No such file or directory
error: tests/stream.nu: No such file or directory
error: tests/stream_ping.nu: already exists in working directory