Hydrant should ping others! :P And also, we shouldn't let others pongtimeout, we should pong. Sorry I snuck in a little flake fix too.
+271
-95
Diff
round #0
+1
flake.nix
+1
flake.nix
+2
src/api/mod.rs
+2
src/api/mod.rs
+31
-48
src/api/stream.rs
+31
-48
src/api/stream.rs
···
6
6
response::IntoResponse,
7
7
};
8
8
use axum_tws::{Message, WebSocket, WebSocketUpgrade};
9
-
use futures::StreamExt;
10
9
use serde::Deserialize;
11
10
use tracing::error;
12
11
12
+
use super::ws::{WsAction, run_socket};
13
+
13
14
pub fn router() -> Router<Hydrant> {
14
15
Router::new().route("/", get(handle_stream))
15
16
}
···
27
28
ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query))
28
29
}
29
30
30
-
async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: StreamQuery) {
31
+
async fn handle_socket(socket: WebSocket, hydrant: Hydrant, query: StreamQuery) {
31
32
let send_timeout = hydrant.stream_send_timeout();
32
-
let mut stream = hydrant.subscribe(query.cursor);
33
-
34
-
while let Some(item) = stream.next().await {
35
-
let evt = match item {
36
-
Ok(evt) => evt,
33
+
let events = hydrant.subscribe(query.cursor);
34
+
run_socket(
35
+
socket,
36
+
events,
37
+
|item| match item {
38
+
Ok(evt) => match serde_json::to_string(&evt) {
39
+
Ok(json) => WsAction::Send(Message::text(json)),
40
+
Err(e) => {
41
+
error!(err = %e, "failed to serialize event");
42
+
WsAction::Skip
43
+
}
44
+
},
37
45
Err(err) => {
38
46
let json = serde_json::json!({
39
47
"type": "error",
40
48
"error": err.code(),
41
49
"message": err.to_string(),
42
50
});
43
-
let _ = tokio::time::timeout(
44
-
send_timeout,
45
-
socket.send(Message::text(json.to_string())),
46
-
)
47
-
.await;
48
-
let _ =
49
-
tokio::time::timeout(std::time::Duration::from_secs(1), socket.close()).await;
50
-
break;
51
-
}
52
-
};
53
-
54
-
match serde_json::to_string(&evt) {
55
-
Ok(json) => {
56
-
match tokio::time::timeout(send_timeout, socket.send(Message::text(json))).await {
57
-
Ok(Ok(())) => {}
58
-
Ok(Err(_)) => break,
59
-
Err(_) => {
60
-
let err = serde_json::json!({
61
-
"type": "error",
62
-
"error": "ConsumerTooSlow",
63
-
"message": format!(
64
-
"stream socket send blocked for at least {} seconds",
65
-
send_timeout.as_secs()
66
-
),
67
-
});
68
-
let _ = tokio::time::timeout(
69
-
std::time::Duration::from_secs(1),
70
-
socket.send(Message::text(err.to_string())),
71
-
)
72
-
.await;
73
-
let _ =
74
-
tokio::time::timeout(std::time::Duration::from_secs(1), socket.close())
75
-
.await;
76
-
break;
77
-
}
78
-
}
79
-
}
80
-
Err(e) => {
81
-
error!(err = %e, "failed to serialize event");
51
+
WsAction::Close(Some(Message::text(json.to_string())))
82
52
}
83
-
}
84
-
}
53
+
},
54
+
send_timeout,
55
+
|timeout_dur| {
56
+
let json = serde_json::json!({
57
+
"type": "error",
58
+
"error": "ConsumerTooSlow",
59
+
"message": format!(
60
+
"stream socket send blocked for at least {} seconds",
61
+
timeout_dur.as_secs()
62
+
),
63
+
});
64
+
Some(Message::text(json.to_string()))
65
+
},
66
+
)
67
+
.await;
85
68
}
+94
src/api/ws.rs
+94
src/api/ws.rs
···
1
+
use std::time::Duration;
2
+
3
+
use axum_tws::{Message, WebSocket};
4
+
use bytes::Bytes;
5
+
use futures::{SinkExt, Stream, StreamExt};
6
+
use tokio::time::{MissedTickBehavior, interval, timeout};
7
+
use tracing::{debug, warn};
8
+
9
+
const PING_INTERVAL: Duration = Duration::from_secs(30);
10
+
const CLOSE_TIMEOUT: Duration = Duration::from_secs(1);
11
+
12
+
pub(super) enum WsAction {
13
+
Send(Message),
14
+
Skip,
15
+
Close(Option<Message>),
16
+
}
17
+
18
+
pub(super) async fn run_socket<S, F, G>(
19
+
socket: WebSocket,
20
+
mut events: S,
21
+
mut to_action: F,
22
+
send_timeout: Duration,
23
+
slow_consumer: G,
24
+
) where
25
+
S: Stream + Unpin,
26
+
F: FnMut(S::Item) -> WsAction,
27
+
G: FnOnce(Duration) -> Option<Message>,
28
+
{
29
+
let (mut sink, mut ws_recv) = socket.split();
30
+
31
+
let mut ping_timer = interval(PING_INTERVAL);
32
+
ping_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
33
+
ping_timer.tick().await;
34
+
35
+
let mut slow_consumer = Some(slow_consumer);
36
+
37
+
loop {
38
+
tokio::select! {
39
+
inbound = ws_recv.next() => match inbound {
40
+
Some(Ok(m)) if m.is_close() => break,
41
+
Some(Ok(m)) if m.is_text() || m.is_binary() => {
42
+
debug!("client sent unsolicited data frame, closing");
43
+
break;
44
+
}
45
+
Some(Ok(m)) if m.is_ping() => {
46
+
if let Err(err) = sink.send(Message::pong(m.into_payload())).await {
47
+
warn!(err = %err, "ws pong send error");
48
+
break;
49
+
}
50
+
}
51
+
Some(Ok(_)) => {}
52
+
Some(Err(e)) => {
53
+
warn!(err = %e, "ws recv error");
54
+
break;
55
+
}
56
+
None => break,
57
+
},
58
+
evt = events.next() => match evt {
59
+
Some(item) => match to_action(item) {
60
+
WsAction::Skip => {}
61
+
WsAction::Send(msg) => match timeout(send_timeout, sink.send(msg)).await {
62
+
Ok(Ok(())) => {}
63
+
Ok(Err(err)) => {
64
+
warn!(err = %err, "ws send error");
65
+
break;
66
+
}
67
+
Err(_) => {
68
+
if let Some(final_msg) =
69
+
slow_consumer.take().and_then(|f| f(send_timeout))
70
+
{
71
+
let _ = timeout(CLOSE_TIMEOUT, sink.send(final_msg)).await;
72
+
}
73
+
break;
74
+
}
75
+
},
76
+
WsAction::Close(final_msg) => {
77
+
if let Some(m) = final_msg {
78
+
let _ = timeout(send_timeout, sink.send(m)).await;
79
+
}
80
+
break;
81
+
}
82
+
},
83
+
None => break,
84
+
},
85
+
_ = ping_timer.tick() => {
86
+
if let Err(err) = sink.send(Message::ping(Bytes::new())).await {
87
+
warn!(err = %err, "ws ping send error");
88
+
break;
89
+
}
90
+
}
91
+
}
92
+
}
93
+
let _ = timeout(CLOSE_TIMEOUT, sink.close()).await;
94
+
}
+33
-43
src/api/xrpc/subscribe_repos.rs
+33
-43
src/api/xrpc/subscribe_repos.rs
···
3
3
response::IntoResponse,
4
4
};
5
5
use axum_tws::{Message, WebSocket, WebSocketUpgrade};
6
-
use futures::StreamExt;
7
6
use serde::Deserialize;
8
7
use tracing::error;
9
8
9
+
use crate::api::ws::{WsAction, run_socket};
10
10
use crate::control::{Hydrant, RelayStreamError};
11
11
use crate::ingest::stream::encode_error_frame;
12
12
···
23
23
ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query))
24
24
}
25
25
26
-
async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) {
26
+
async fn handle_socket(socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) {
27
27
let send_timeout = hydrant.stream_send_timeout();
28
-
let mut stream = hydrant.subscribe_repos(query.cursor);
29
-
30
-
while let Some(item) = stream.next().await {
31
-
let frame = match item {
32
-
Ok(frame) => frame,
33
-
Err(err) => {
34
-
send_error_frame(&mut socket, send_timeout, &err).await;
35
-
break;
36
-
}
37
-
};
38
-
39
-
match tokio::time::timeout(send_timeout, socket.send(Message::binary(frame))).await {
40
-
Ok(Ok(())) => {}
41
-
Ok(Err(_)) => break,
42
-
Err(_) => {
43
-
let err = RelayStreamError::ConsumerTooSlow {
44
-
reason: format!(
45
-
"relay stream socket send blocked for at least {} seconds",
46
-
send_timeout.as_secs()
47
-
),
48
-
};
49
-
send_error_frame(&mut socket, std::time::Duration::from_secs(1), &err).await;
50
-
break;
28
+
let stream = hydrant.subscribe_repos(query.cursor);
29
+
run_socket(
30
+
socket,
31
+
stream,
32
+
|item| match item {
33
+
Ok(frame) => WsAction::Send(Message::binary(frame)),
34
+
Err(err) => match encode_error_frame(err.code(), Some(&err.to_string())) {
35
+
Ok(frame) => WsAction::Close(Some(Message::binary(frame))),
36
+
Err(e) => {
37
+
error!(err = %e, "failed to encode relay stream error frame");
38
+
WsAction::Close(None)
39
+
}
40
+
},
41
+
},
42
+
send_timeout,
43
+
|timeout_dur| {
44
+
let err = RelayStreamError::ConsumerTooSlow {
45
+
reason: format!(
46
+
"relay stream socket send blocked for at least {} seconds",
47
+
timeout_dur.as_secs()
48
+
),
49
+
};
50
+
match encode_error_frame(err.code(), Some(&err.to_string())) {
51
+
Ok(frame) => Some(Message::binary(frame)),
52
+
Err(e) => {
53
+
error!(err = %e, "failed to encode relay stream error frame");
54
+
None
55
+
}
51
56
}
52
-
}
53
-
}
54
-
}
55
-
56
-
async fn send_error_frame(
57
-
socket: &mut WebSocket,
58
-
timeout: std::time::Duration,
59
-
err: &RelayStreamError,
60
-
) {
61
-
match encode_error_frame(err.code(), Some(&err.to_string())) {
62
-
Ok(frame) => {
63
-
let _ = tokio::time::timeout(timeout, socket.send(Message::binary(frame))).await;
64
-
}
65
-
Err(e) => {
66
-
error!(err = %e, "failed to encode relay stream error frame");
67
-
}
68
-
}
69
-
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), socket.close()).await;
57
+
},
58
+
)
59
+
.await;
70
60
}
+1
-1
src/ingest/stream.rs
+1
-1
src/ingest/stream.rs
···
110
110
}
111
111
return Ok(bytes);
112
112
}
113
-
msg if msg.is_ping() => self.ws.send(WsMsg::pong(msg.into_payload())).await?,
114
113
// if ws closed treat it as an error, since why would a host close the stream??
115
114
// TODO: treat hosts that return these as offline ??????
116
115
msg if msg.is_close() => {
···
120
119
);
121
120
return Err(FirehoseError::StreamClosed { code, reason });
122
121
}
122
+
msg if msg.is_ping() => self.ws.send(WsMsg::pong(msg.into_payload())).await?,
123
123
msg if msg.is_pong() => continue,
124
124
x => {
125
125
trace!(msg = ?x, "host sent unexpected message");
+1
-1
tests/authenticated_stream.nu
+1
-1
tests/authenticated_stream.nu
···
26
26
let output_file = $"($db_path)/stream_output.txt"
27
27
print $"starting stream listener -> ($output_file)"
28
28
# use websocat to capture output.
29
-
let stream_pid = (bash -c $"websocat '($ws_url)' > '($output_file)' & echo $!" | str trim | into int)
29
+
let stream_pid = (bash -c $"websocat -n '($ws_url)' > '($output_file)' & echo $!" | str trim | into int)
30
30
print $"listener pid: ($stream_pid)"
31
31
32
32
# 4. add repo to hydrant (backfill trigger)
+2
-2
tests/stream.nu
+2
-2
tests/stream.nu
···
25
25
print $"starting stream listener -> ($live_output)"
26
26
27
27
# start websocat in background to capture live events (no cursor = live only)
28
-
let stream_pid = (bash -c $"websocat '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int)
28
+
let stream_pid = (bash -c $"websocat -n '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int)
29
29
print $"stream listener pid: ($stream_pid)"
30
30
sleep 1sec
31
31
···
77
77
# use same approach as test 1: background process with file output
78
78
# cursor=0 replays from the beginning (no cursor = live-tail only)
79
79
print "starting historical stream listener..."
80
-
let history_pid = (bash -c $"websocat '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int)
80
+
let history_pid = (bash -c $"websocat -n '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int)
81
81
print $"history listener pid: ($history_pid)"
82
82
83
83
# wait for events to be streamed (should be fast for historical replay)
+52
tests/stream_ping.nu
+52
tests/stream_ping.nu
···
1
+
#!/usr/bin/env nu
2
+
use common.nu *
3
+
4
+
def main [] {
5
+
let port = resolve-test-port 3010
6
+
let url = $"http://localhost:($port)"
7
+
let ws_url = $"ws://localhost:($port)/stream"
8
+
let db_path = (mktemp -d -t hydrant_stream_ping_test.XXXXXX)
9
+
10
+
print "testing ping/pong handling on /stream..."
11
+
print $"database path: ($db_path)"
12
+
13
+
let binary = build-hydrant
14
+
let instance = start-hydrant $binary $db_path $port
15
+
16
+
mut passed = false
17
+
18
+
if (wait-for-api $url) {
19
+
let log_file = $"($db_path)/ws.log"
20
+
let pid_file = $"($db_path)/ws.pid"
21
+
22
+
bash -c $"websocat -n --ping-interval 1 --ping-timeout 4 '($ws_url)' > '($log_file)' 2>&1 & echo $! > '($pid_file)'"
23
+
sleep 200ms
24
+
let ws_pid = (open $pid_file | str trim | into int)
25
+
print $"websocat pid: ($ws_pid)"
26
+
27
+
sleep 6sec
28
+
29
+
let alive = (do { ^kill -0 $ws_pid } | complete | get exit_code) == 0
30
+
if $alive {
31
+
print "ping/pong test PASSED: connection alive after 6s of pings"
32
+
$passed = true
33
+
try { kill $ws_pid }
34
+
} else {
35
+
print "ping/pong test FAILED: websocat exited, pong likely not received in time"
36
+
try { open $log_file | print }
37
+
}
38
+
} else {
39
+
print "api failed to start."
40
+
}
41
+
42
+
let hydrant_pid = $instance.pid
43
+
print $"stopping hydrant - pid: ($hydrant_pid)..."
44
+
try { kill $hydrant_pid }
45
+
46
+
if $passed {
47
+
print "=== TEST PASSED ==="
48
+
} else {
49
+
print "=== TEST FAILED ==="
50
+
exit 1
51
+
}
52
+
}
+54
tests/subscribe_repos_ping.nu
+54
tests/subscribe_repos_ping.nu
···
1
+
#!/usr/bin/env nu
2
+
use common.nu *
3
+
4
+
def main [] {
5
+
let port = resolve-test-port 3011
6
+
let url = $"http://localhost:($port)"
7
+
let ws_url = $"ws://localhost:($port)/xrpc/com.atproto.sync.subscribeRepos"
8
+
let db_path = (mktemp -d -t hydrant_subscribe_repos_ping_test.XXXXXX)
9
+
10
+
print "testing ping/pong handling on /xrpc/com.atproto.sync.subscribeRepos..."
11
+
print $"database path: ($db_path)"
12
+
13
+
let binary = build-hydrant-relay
14
+
let instance = (with-env { HYDRANT_RELAY: "true" } {
15
+
start-hydrant $binary $db_path $port
16
+
})
17
+
18
+
mut passed = false
19
+
20
+
if (wait-for-api $url) {
21
+
let log_file = $"($db_path)/ws.log"
22
+
let pid_file = $"($db_path)/ws.pid"
23
+
24
+
bash -c $"websocat -n --ping-interval 1 --ping-timeout 4 '($ws_url)' > '($log_file)' 2>&1 & echo $! > '($pid_file)'"
25
+
sleep 200ms
26
+
let ws_pid = (open $pid_file | str trim | into int)
27
+
print $"websocat pid: ($ws_pid)"
28
+
29
+
sleep 6sec
30
+
31
+
let alive = (do { ^kill -0 $ws_pid } | complete | get exit_code) == 0
32
+
if $alive {
33
+
print "ping/pong test PASSED: connection alive after 6s of pings"
34
+
$passed = true
35
+
try { kill $ws_pid }
36
+
} else {
37
+
print "ping/pong test FAILED: websocat exited, pong likely not received in time"
38
+
try { open $log_file | print }
39
+
}
40
+
} else {
41
+
print "api failed to start."
42
+
}
43
+
44
+
let hydrant_pid = $instance.pid
45
+
print $"stopping hydrant - pid: ($hydrant_pid)..."
46
+
try { kill $hydrant_pid }
47
+
48
+
if $passed {
49
+
print "=== TEST PASSED ==="
50
+
} else {
51
+
print "=== TEST FAILED ==="
52
+
exit 1
53
+
}
54
+
}
History
1 round
0 comments
oyster.cafe
submitted
#0
patch application failed: error: patch failed: flake.nix:66
error: flake.nix: patch does not apply
error: patch failed: src/api/mod.rs:19
error: src/api/mod.rs: patch does not apply
error: patch failed: src/api/stream.rs:6
error: src/api/stream.rs: patch does not apply
error: src/api/ws.rs: already exists in working directory
error: patch failed: src/api/xrpc/subscribe_repos.rs:3
error: src/api/xrpc/subscribe_repos.rs: patch does not apply
error: patch failed: src/ingest/stream.rs:110
error: src/ingest/stream.rs: patch does not apply
error: tests/authenticated_stream.nu: No such file or directory
error: tests/stream.nu: No such file or directory
error: tests/stream_ping.nu: already exists in working directory