very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use super::*;
2
3/// the relay event stream produced by [`Hydrant::subscribe_repos`].
4pub struct RelayEventStream(mpsc::Receiver<bytes::Bytes>);
5
6impl futures::Stream for RelayEventStream {
7 type Item = bytes::Bytes;
8
9 fn poll_next(
10 mut self: std::pin::Pin<&mut Self>,
11 cx: &mut std::task::Context<'_>,
12 ) -> std::task::Poll<Option<Self::Item>> {
13 self.0.poll_recv(cx)
14 }
15}
16
17impl Hydrant {
18 /// subscribe to the relay's ordered `subscribeRepos` event stream.
19 ///
20 /// returns a [`RelayEventStream`] that yields pre-encoded CBOR binary frames
21 /// ready to forward directly to ATProto clients via WebSocket.
22 ///
23 /// - if `cursor` is `None`, streaming starts from the current head (live tail only).
24 /// - if `cursor` is `Some(seq)`, all persisted events from that seq onward are replayed first.
25 pub fn subscribe_repos(&self, cursor: Option<u64>) -> RelayEventStream {
26 let (tx, rx) = mpsc::channel(500);
27 let state = self.state.clone();
28 let runtime = tokio::runtime::Handle::current();
29
30 std::thread::Builder::new()
31 .name("hydrant-relay-stream".into())
32 .spawn(move || {
33 let _g = runtime.enter();
34 relay_stream_thread(state, tx, cursor);
35 })
36 .expect("failed to spawn relay stream thread");
37
38 RelayEventStream(rx)
39 }
40}