very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4use tracing::error;
5
6use crate::db::keys;
7use crate::state::AppState;
8use std::sync::atomic::Ordering;
9
10#[cfg(feature = "indexer_stream")]
11use {
12 super::Event,
13 crate::db,
14 crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredData, StoredEvent},
15 jacquard_common::types::cid::{ATP_CID_HASH, IpldCid},
16 jacquard_common::types::nsid::Nsid,
17 jacquard_common::types::string::Rkey,
18 jacquard_common::{CowStr, IntoStatic, RawData},
19 jacquard_repo::DAG_CBOR_CID_CODEC,
20 sha2::{Digest, Sha256},
21};
22
23#[cfg(feature = "indexer_stream")]
24pub(super) fn event_stream_thread(
25 state: Arc<AppState>,
26 tx: mpsc::Sender<Event>,
27 cursor: Option<u64>,
28) {
29 let db = &state.db;
30 let mut event_rx = db.event_tx.subscribe();
31 let ks = db.events.clone();
32 let mut current_id = match cursor {
33 Some(c) => c.checked_sub(1),
34 None => db.next_event_id.load(Ordering::SeqCst).checked_sub(1),
35 };
36 let mut needs_catch_up = cursor.is_some();
37
38 loop {
39 if needs_catch_up {
40 // catch up from db (record events only; ids are sparse due to ephemeral events)
41 let start = current_id.map(|id| id.saturating_add(1)).unwrap_or(0);
42 for item in ks.range(keys::event_key(start)..) {
43 let (k, v) = match item.into_inner() {
44 Ok(kv) => kv,
45 Err(e) => {
46 error!(err = %e, "failed to read event from db");
47 break;
48 }
49 };
50
51 let id = match k.as_ref().try_into().map(u64::from_be_bytes) {
52 Ok(id) => id,
53 Err(_) => {
54 error!("failed to parse event id");
55 continue;
56 }
57 };
58 current_id = Some(id);
59
60 let stored: StoredEvent = match rmp_serde::from_slice(&v) {
61 Ok(e) => e,
62 Err(e) => {
63 error!(err = %e, "failed to deserialize stored event");
64 continue;
65 }
66 };
67
68 let Some(out_evt) = stored_to_event(&state, id, stored, None) else {
69 continue;
70 };
71
72 if tx.blocking_send(out_evt).is_err() {
73 return; // receiver dropped
74 }
75 }
76 needs_catch_up = false;
77 }
78
79 // wait for live events
80 match event_rx.blocking_recv() {
81 Ok(BroadcastEvent::Persisted(_)) => needs_catch_up = true,
82 Ok(BroadcastEvent::LiveRecord(evt)) => {
83 let expected = current_id.map(|id| id.saturating_add(1)).unwrap_or(0);
84 if needs_catch_up || evt.id != expected {
85 needs_catch_up = true;
86 continue;
87 }
88
89 let stored = evt.stored.clone();
90 let Some(out_evt) =
91 stored_to_event(&state, evt.id, stored, evt.inline_block.clone())
92 else {
93 needs_catch_up = true;
94 continue;
95 };
96 let out_id = out_evt.id;
97 if tx.blocking_send(out_evt).is_err() {
98 return;
99 }
100 current_id = Some(out_id);
101 }
102 Ok(BroadcastEvent::Ephemeral(evt)) => {
103 let evt_id = evt.id;
104 if tx.blocking_send(*evt).is_err() {
105 return;
106 }
107 current_id = Some(current_id.unwrap_or(0).max(evt_id));
108 }
109 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => needs_catch_up = true,
110 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
111 }
112 }
113}
114
115#[cfg(feature = "relay")]
116pub(super) fn relay_stream_thread(
117 state: Arc<AppState>,
118 tx: mpsc::Sender<bytes::Bytes>,
119 cursor: Option<u64>,
120) {
121 use crate::types::RelayBroadcast;
122 use std::sync::atomic::Ordering;
123
124 let mut relay_rx = state.db.relay_broadcast_tx.subscribe();
125 let ks = state.db.relay_events.clone();
126 let mut current_seq = match cursor {
127 Some(c) => c.saturating_sub(1),
128 None => ks
129 .iter()
130 .next_back()
131 .and_then(|guard| {
132 guard
133 .key()
134 .ok()
135 .and_then(|k| k.as_ref().try_into().ok())
136 .map(u64::from_be_bytes)
137 })
138 .unwrap_or(0),
139 };
140 let mut head_seq = current_seq;
141 let mut needs_catch_up = true;
142
143 loop {
144 if needs_catch_up {
145 // catch up from db: send all stored frames from current_seq+1 onward
146 for item in ks.range(crate::db::keys::relay_event_key(current_seq + 1)..) {
147 let (k, v) = match item.into_inner() {
148 Ok(kv) => kv,
149 Err(e) => {
150 error!(err = %e, "relay stream: failed to read relay_events");
151 break;
152 }
153 };
154 let seq = match k.as_ref().try_into().map(u64::from_be_bytes) {
155 Ok(s) => s,
156 Err(_) => {
157 error!("relay stream: failed to parse relay event seq");
158 continue;
159 }
160 };
161 if seq != current_seq + 1 {
162 break;
163 }
164 if tx.blocking_send(bytes::Bytes::copy_from_slice(&v)).is_err() {
165 return; // subscriber dropped
166 }
167 current_seq = seq;
168 if current_seq >= head_seq {
169 break;
170 }
171 }
172 needs_catch_up = false;
173 }
174
175 // wait for live events
176 match relay_rx.blocking_recv() {
177 Ok(RelayBroadcast::Persisted(seq)) => {
178 head_seq = head_seq.max(seq);
179 needs_catch_up = current_seq < head_seq;
180 }
181 Ok(RelayBroadcast::Ephemeral(seq, frame)) => {
182 head_seq = head_seq.max(seq);
183 if seq != current_seq + 1 {
184 // out-of-order or gap: fall back to db catch-up to preserve ordering.
185 needs_catch_up = true;
186 continue;
187 }
188 if tx.blocking_send(frame).is_err() {
189 return;
190 }
191 current_seq = seq;
192 }
193 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => needs_catch_up = true,
194 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
195 }
196 }
197}
198
199#[cfg(feature = "indexer_stream")]
200fn stored_to_event(
201 state: &AppState,
202 id: u64,
203 stored: StoredEvent<'_>,
204 inline_block: Option<bytes::Bytes>,
205) -> Option<Event> {
206 let StoredEvent {
207 live,
208 did,
209 rev,
210 collection,
211 rkey,
212 action,
213 data,
214 } = stored;
215
216 let record = match data {
217 StoredData::Ptr(cid) => {
218 if let Some(bytes) = inline_block {
219 match serde_ipld_dagcbor::from_slice::<RawData>(&bytes) {
220 Ok(val) => Some((cid, serde_json::to_value(val).ok()?)),
221 Err(e) => {
222 error!(err = %e, "cant parse block");
223 return None;
224 }
225 }
226 } else {
227 let block = state
228 .db
229 .blocks
230 .get(&keys::block_key(collection.as_str(), &cid.to_bytes()));
231 match block {
232 Ok(Some(bytes)) => {
233 match serde_ipld_dagcbor::from_slice::<RawData>(bytes.as_ref()) {
234 Ok(val) => Some((cid, serde_json::to_value(val).ok()?)),
235 Err(e) => {
236 error!(err = %e, "cant parse block");
237 return None;
238 }
239 }
240 }
241 Ok(None) => {
242 error!("block not found, this is a bug");
243 return None;
244 }
245 Err(e) => {
246 error!(err = %e, "cant get block");
247 db::check_poisoned(&e);
248 return None;
249 }
250 }
251 }
252 }
253 StoredData::Block(block) => {
254 let digest = Sha256::digest(&block);
255 let hash =
256 cid::multihash::Multihash::wrap(ATP_CID_HASH, &digest).expect("valid sha256 hash");
257 let cid = IpldCid::new_v1(DAG_CBOR_CID_CODEC, hash);
258 match serde_ipld_dagcbor::from_slice::<RawData>(&block) {
259 Ok(val) => Some((cid, serde_json::to_value(val).ok()?)),
260 Err(e) => {
261 error!(err = %e, "cant parse block");
262 return None;
263 }
264 }
265 }
266 StoredData::Nothing => None,
267 };
268
269 let (cid, record) = record
270 .map(|(c, r)| (Some(c), Some(r)))
271 .unwrap_or((None, None));
272
273 Some(MarshallableEvt {
274 id,
275 kind: crate::types::EventType::Record,
276 record: Some(RecordEvt {
277 live,
278 did: did.to_did(),
279 rev: rev.to_tid(),
280 collection: Nsid::new_cow(collection.clone().into_static())
281 .expect("that collection is already validated"),
282 rkey: Rkey::new_cow(CowStr::Owned(rkey.to_smolstr()))
283 .expect("that rkey is already validated"),
284 action: CowStr::Borrowed(action.as_str()),
285 record,
286 cid,
287 }),
288 identity: None,
289 account: None,
290 })
291}