very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 78 lines 2.9 kB view raw
1use super::*; 2 3#[cfg(feature = "indexer_stream")] 4/// a stream of [`Event`]s. returned by [`Hydrant::subscribe`]. 5/// 6/// implements [`futures::Stream`] and can be used with `StreamExt::next`, 7/// `while let Some(evt) = stream.next().await`, `forward`, etc. 8/// the stream terminates when the underlying channel closes (i.e. hydrant shuts down). 9pub struct EventStream(mpsc::Receiver<Event>); 10 11#[cfg(feature = "indexer_stream")] 12impl Stream for EventStream { 13 type Item = Event; 14 15 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 16 self.0.poll_recv(cx) 17 } 18} 19 20/// runtime control over the backfill worker component. 21/// 22/// the backfill worker fetches full repo CAR files from each repo's PDS for any 23/// repository in the pending queue, parses the MST, and inserts all matching records 24/// into the database. concurrency is bounded by `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`. 25#[derive(Clone)] 26pub struct BackfillHandle(Arc<AppState>); 27 28impl BackfillHandle { 29 pub(crate) fn new(state: Arc<AppState>) -> Self { 30 Self(state) 31 } 32 33 /// enable the backfill worker, no-op if already enabled. 34 pub fn enable(&self) { 35 self.0.backfill_enabled.send_replace(true); 36 } 37 /// disable the backfill worker, in-flight repos complete before pausing. 38 pub fn disable(&self) { 39 self.0.backfill_enabled.send_replace(false); 40 } 41 /// returns the current enabled state of the backfill worker. 42 pub fn is_enabled(&self) -> bool { 43 *self.0.backfill_enabled.borrow() 44 } 45} 46 47#[cfg(feature = "indexer_stream")] 48impl Hydrant { 49 /// subscribe to the ordered event stream. 50 /// 51 /// returns an [`EventStream`] that implements [`futures::Stream`]. 52 /// 53 /// - if `cursor` is `None`, streaming starts from the current head (live tail only). 54 /// - if `cursor` is `Some(id)`, all persisted `record` events from that ID onward are 55 /// replayed first, then the stream will switch to live tailing. 56 /// 57 /// `identity` and `account` events are ephemeral and are never replayed from a cursor, 58 /// only live ones are delivered. use [`ReposControl::info`] to fetch current state for 59 /// a specific repository. 60 /// 61 /// multiple concurrent subscribers each receive a full independent copy of the stream. 62 /// the stream ends when the `EventStream` is dropped. 63 pub fn subscribe(&self, cursor: Option<u64>) -> EventStream { 64 let (tx, rx) = mpsc::channel(500); 65 let state = self.state.clone(); 66 let runtime = tokio::runtime::Handle::current(); 67 68 std::thread::Builder::new() 69 .name("hydrant-stream".into()) 70 .spawn(move || { 71 let _g = runtime.enter(); 72 event_stream_thread(state, tx, cursor); 73 }) 74 .expect("failed to spawn stream thread"); 75 76 EventStream(rx) 77 } 78}