very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 291 lines 10 kB view raw
1use std::sync::Arc; 2 3use tokio::sync::mpsc; 4use tracing::error; 5 6use crate::db::keys; 7use crate::state::AppState; 8use std::sync::atomic::Ordering; 9 10#[cfg(feature = "indexer_stream")] 11use { 12 super::Event, 13 crate::db, 14 crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredData, StoredEvent}, 15 jacquard_common::types::cid::{ATP_CID_HASH, IpldCid}, 16 jacquard_common::types::nsid::Nsid, 17 jacquard_common::types::string::Rkey, 18 jacquard_common::{CowStr, IntoStatic, RawData}, 19 jacquard_repo::DAG_CBOR_CID_CODEC, 20 sha2::{Digest, Sha256}, 21}; 22 23#[cfg(feature = "indexer_stream")] 24pub(super) fn event_stream_thread( 25 state: Arc<AppState>, 26 tx: mpsc::Sender<Event>, 27 cursor: Option<u64>, 28) { 29 let db = &state.db; 30 let mut event_rx = db.event_tx.subscribe(); 31 let ks = db.events.clone(); 32 let mut current_id = match cursor { 33 Some(c) => c.checked_sub(1), 34 None => db.next_event_id.load(Ordering::SeqCst).checked_sub(1), 35 }; 36 let mut needs_catch_up = cursor.is_some(); 37 38 loop { 39 if needs_catch_up { 40 // catch up from db (record events only; ids are sparse due to ephemeral events) 41 let start = current_id.map(|id| id.saturating_add(1)).unwrap_or(0); 42 for item in ks.range(keys::event_key(start)..) { 43 let (k, v) = match item.into_inner() { 44 Ok(kv) => kv, 45 Err(e) => { 46 error!(err = %e, "failed to read event from db"); 47 break; 48 } 49 }; 50 51 let id = match k.as_ref().try_into().map(u64::from_be_bytes) { 52 Ok(id) => id, 53 Err(_) => { 54 error!("failed to parse event id"); 55 continue; 56 } 57 }; 58 current_id = Some(id); 59 60 let stored: StoredEvent = match rmp_serde::from_slice(&v) { 61 Ok(e) => e, 62 Err(e) => { 63 error!(err = %e, "failed to deserialize stored event"); 64 continue; 65 } 66 }; 67 68 let Some(out_evt) = stored_to_event(&state, id, stored, None) else { 69 continue; 70 }; 71 72 if tx.blocking_send(out_evt).is_err() { 73 return; // receiver dropped 74 } 75 } 76 needs_catch_up = false; 77 } 78 79 // wait for live events 80 match event_rx.blocking_recv() { 81 Ok(BroadcastEvent::Persisted(_)) => needs_catch_up = true, 82 Ok(BroadcastEvent::LiveRecord(evt)) => { 83 let expected = current_id.map(|id| id.saturating_add(1)).unwrap_or(0); 84 if needs_catch_up || evt.id != expected { 85 needs_catch_up = true; 86 continue; 87 } 88 89 let stored = evt.stored.clone(); 90 let Some(out_evt) = 91 stored_to_event(&state, evt.id, stored, evt.inline_block.clone()) 92 else { 93 needs_catch_up = true; 94 continue; 95 }; 96 let out_id = out_evt.id; 97 if tx.blocking_send(out_evt).is_err() { 98 return; 99 } 100 current_id = Some(out_id); 101 } 102 Ok(BroadcastEvent::Ephemeral(evt)) => { 103 let evt_id = evt.id; 104 if tx.blocking_send(*evt).is_err() { 105 return; 106 } 107 current_id = Some(current_id.unwrap_or(0).max(evt_id)); 108 } 109 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => needs_catch_up = true, 110 Err(tokio::sync::broadcast::error::RecvError::Closed) => break, 111 } 112 } 113} 114 115#[cfg(feature = "relay")] 116pub(super) fn relay_stream_thread( 117 state: Arc<AppState>, 118 tx: mpsc::Sender<bytes::Bytes>, 119 cursor: Option<u64>, 120) { 121 use crate::types::RelayBroadcast; 122 use std::sync::atomic::Ordering; 123 124 let mut relay_rx = state.db.relay_broadcast_tx.subscribe(); 125 let ks = state.db.relay_events.clone(); 126 let mut current_seq = match cursor { 127 Some(c) => c.saturating_sub(1), 128 None => ks 129 .iter() 130 .next_back() 131 .and_then(|guard| { 132 guard 133 .key() 134 .ok() 135 .and_then(|k| k.as_ref().try_into().ok()) 136 .map(u64::from_be_bytes) 137 }) 138 .unwrap_or(0), 139 }; 140 let mut head_seq = current_seq; 141 let mut needs_catch_up = true; 142 143 loop { 144 if needs_catch_up { 145 // catch up from db: send all stored frames from current_seq+1 onward 146 for item in ks.range(crate::db::keys::relay_event_key(current_seq + 1)..) { 147 let (k, v) = match item.into_inner() { 148 Ok(kv) => kv, 149 Err(e) => { 150 error!(err = %e, "relay stream: failed to read relay_events"); 151 break; 152 } 153 }; 154 let seq = match k.as_ref().try_into().map(u64::from_be_bytes) { 155 Ok(s) => s, 156 Err(_) => { 157 error!("relay stream: failed to parse relay event seq"); 158 continue; 159 } 160 }; 161 if seq != current_seq + 1 { 162 break; 163 } 164 if tx.blocking_send(bytes::Bytes::copy_from_slice(&v)).is_err() { 165 return; // subscriber dropped 166 } 167 current_seq = seq; 168 if current_seq >= head_seq { 169 break; 170 } 171 } 172 needs_catch_up = false; 173 } 174 175 // wait for live events 176 match relay_rx.blocking_recv() { 177 Ok(RelayBroadcast::Persisted(seq)) => { 178 head_seq = head_seq.max(seq); 179 needs_catch_up = current_seq < head_seq; 180 } 181 Ok(RelayBroadcast::Ephemeral(seq, frame)) => { 182 head_seq = head_seq.max(seq); 183 if seq != current_seq + 1 { 184 // out-of-order or gap: fall back to db catch-up to preserve ordering. 185 needs_catch_up = true; 186 continue; 187 } 188 if tx.blocking_send(frame).is_err() { 189 return; 190 } 191 current_seq = seq; 192 } 193 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => needs_catch_up = true, 194 Err(tokio::sync::broadcast::error::RecvError::Closed) => break, 195 } 196 } 197} 198 199#[cfg(feature = "indexer_stream")] 200fn stored_to_event( 201 state: &AppState, 202 id: u64, 203 stored: StoredEvent<'_>, 204 inline_block: Option<bytes::Bytes>, 205) -> Option<Event> { 206 let StoredEvent { 207 live, 208 did, 209 rev, 210 collection, 211 rkey, 212 action, 213 data, 214 } = stored; 215 216 let record = match data { 217 StoredData::Ptr(cid) => { 218 if let Some(bytes) = inline_block { 219 match serde_ipld_dagcbor::from_slice::<RawData>(&bytes) { 220 Ok(val) => Some((cid, serde_json::to_value(val).ok()?)), 221 Err(e) => { 222 error!(err = %e, "cant parse block"); 223 return None; 224 } 225 } 226 } else { 227 let block = state 228 .db 229 .blocks 230 .get(&keys::block_key(collection.as_str(), &cid.to_bytes())); 231 match block { 232 Ok(Some(bytes)) => { 233 match serde_ipld_dagcbor::from_slice::<RawData>(bytes.as_ref()) { 234 Ok(val) => Some((cid, serde_json::to_value(val).ok()?)), 235 Err(e) => { 236 error!(err = %e, "cant parse block"); 237 return None; 238 } 239 } 240 } 241 Ok(None) => { 242 error!("block not found, this is a bug"); 243 return None; 244 } 245 Err(e) => { 246 error!(err = %e, "cant get block"); 247 db::check_poisoned(&e); 248 return None; 249 } 250 } 251 } 252 } 253 StoredData::Block(block) => { 254 let digest = Sha256::digest(&block); 255 let hash = 256 cid::multihash::Multihash::wrap(ATP_CID_HASH, &digest).expect("valid sha256 hash"); 257 let cid = IpldCid::new_v1(DAG_CBOR_CID_CODEC, hash); 258 match serde_ipld_dagcbor::from_slice::<RawData>(&block) { 259 Ok(val) => Some((cid, serde_json::to_value(val).ok()?)), 260 Err(e) => { 261 error!(err = %e, "cant parse block"); 262 return None; 263 } 264 } 265 } 266 StoredData::Nothing => None, 267 }; 268 269 let (cid, record) = record 270 .map(|(c, r)| (Some(c), Some(r))) 271 .unwrap_or((None, None)); 272 273 Some(MarshallableEvt { 274 id, 275 kind: crate::types::EventType::Record, 276 record: Some(RecordEvt { 277 live, 278 did: did.to_did(), 279 rev: rev.to_tid(), 280 collection: Nsid::new_cow(collection.clone().into_static()) 281 .expect("that collection is already validated"), 282 rkey: Rkey::new_cow(CowStr::Owned(rkey.to_smolstr())) 283 .expect("that rkey is already validated"), 284 action: CowStr::Borrowed(action.as_str()), 285 record, 286 cid, 287 }), 288 identity: None, 289 account: None, 290 }) 291}