very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use super::*;
2
3#[cfg(feature = "indexer_stream")]
4/// a stream of [`Event`]s. returned by [`Hydrant::subscribe`].
5///
6/// implements [`futures::Stream`] and can be used with `StreamExt::next`,
7/// `while let Some(evt) = stream.next().await`, `forward`, etc.
8/// the stream terminates when the underlying channel closes (i.e. hydrant shuts down).
9pub struct EventStream(mpsc::Receiver<Event>);
10
11#[cfg(feature = "indexer_stream")]
12impl Stream for EventStream {
13 type Item = Event;
14
15 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
16 self.0.poll_recv(cx)
17 }
18}
19
20/// runtime control over the backfill worker component.
21///
22/// the backfill worker fetches full repo CAR files from each repo's PDS for any
23/// repository in the pending queue, parses the MST, and inserts all matching records
24/// into the database. concurrency is bounded by `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`.
25#[derive(Clone)]
26pub struct BackfillHandle(Arc<AppState>);
27
28impl BackfillHandle {
29 pub(crate) fn new(state: Arc<AppState>) -> Self {
30 Self(state)
31 }
32
33 /// enable the backfill worker, no-op if already enabled.
34 pub fn enable(&self) {
35 self.0.backfill_enabled.send_replace(true);
36 }
37 /// disable the backfill worker, in-flight repos complete before pausing.
38 pub fn disable(&self) {
39 self.0.backfill_enabled.send_replace(false);
40 }
41 /// returns the current enabled state of the backfill worker.
42 pub fn is_enabled(&self) -> bool {
43 *self.0.backfill_enabled.borrow()
44 }
45}
46
47#[cfg(feature = "indexer_stream")]
48impl Hydrant {
49 /// subscribe to the ordered event stream.
50 ///
51 /// returns an [`EventStream`] that implements [`futures::Stream`].
52 ///
53 /// - if `cursor` is `None`, streaming starts from the current head (live tail only).
54 /// - if `cursor` is `Some(id)`, all persisted `record` events from that ID onward are
55 /// replayed first, then the stream will switch to live tailing.
56 ///
57 /// `identity` and `account` events are ephemeral and are never replayed from a cursor,
58 /// only live ones are delivered. use [`ReposControl::info`] to fetch current state for
59 /// a specific repository.
60 ///
61 /// multiple concurrent subscribers each receive a full independent copy of the stream.
62 /// the stream ends when the `EventStream` is dropped.
63 pub fn subscribe(&self, cursor: Option<u64>) -> EventStream {
64 let (tx, rx) = mpsc::channel(500);
65 let state = self.state.clone();
66 let runtime = tokio::runtime::Handle::current();
67
68 std::thread::Builder::new()
69 .name("hydrant-stream".into())
70 .spawn(move || {
71 let _g = runtime.enter();
72 event_stream_thread(state, tx, cursor);
73 })
74 .expect("failed to spawn stream thread");
75
76 EventStream(rx)
77 }
78}