very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 40 lines 1.4 kB view raw
1use super::*; 2 3/// the relay event stream produced by [`Hydrant::subscribe_repos`]. 4pub struct RelayEventStream(mpsc::Receiver<bytes::Bytes>); 5 6impl futures::Stream for RelayEventStream { 7 type Item = bytes::Bytes; 8 9 fn poll_next( 10 mut self: std::pin::Pin<&mut Self>, 11 cx: &mut std::task::Context<'_>, 12 ) -> std::task::Poll<Option<Self::Item>> { 13 self.0.poll_recv(cx) 14 } 15} 16 17impl Hydrant { 18 /// subscribe to the relay's ordered `subscribeRepos` event stream. 19 /// 20 /// returns a [`RelayEventStream`] that yields pre-encoded CBOR binary frames 21 /// ready to forward directly to ATProto clients via WebSocket. 22 /// 23 /// - if `cursor` is `None`, streaming starts from the current head (live tail only). 24 /// - if `cursor` is `Some(seq)`, all persisted events from that seq onward are replayed first. 25 pub fn subscribe_repos(&self, cursor: Option<u64>) -> RelayEventStream { 26 let (tx, rx) = mpsc::channel(500); 27 let state = self.state.clone(); 28 let runtime = tokio::runtime::Handle::current(); 29 30 std::thread::Builder::new() 31 .name("hydrant-relay-stream".into()) 32 .spawn(move || { 33 let _g = runtime.enter(); 34 relay_stream_thread(state, tx, cursor); 35 }) 36 .expect("failed to spawn relay stream thread"); 37 38 RelayEventStream(rx) 39 } 40}