very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest,stream] dont read from db, use live event

dawn c93a1abc 89ecab45

+249 -92
+103 -46
src/control/stream.rs
··· 30 30 let mut event_rx = db.event_tx.subscribe(); 31 31 let ks = db.events.clone(); 32 32 let mut current_id = match cursor { 33 - Some(c) => c.saturating_sub(1), 34 - None => db.next_event_id.load(Ordering::SeqCst).saturating_sub(1), 33 + Some(c) => c.checked_sub(1), 34 + None => db.next_event_id.load(Ordering::SeqCst).checked_sub(1), 35 35 }; 36 + let mut needs_catch_up = cursor.is_some(); 36 37 37 38 loop { 38 - // catch up from db 39 - loop { 40 - let mut found = false; 41 - for item in ks.range(keys::event_key(current_id + 1)..) { 39 + if needs_catch_up { 40 + // catch up from db (record events only; ids are sparse due to ephemeral events) 41 + let start = current_id.map(|id| id.saturating_add(1)).unwrap_or(0); 42 + for item in ks.range(keys::event_key(start)..) { 42 43 let (k, v) = match item.into_inner() { 43 44 Ok(kv) => kv, 44 45 Err(e) => { ··· 54 55 continue; 55 56 } 56 57 }; 57 - current_id = id; 58 + current_id = Some(id); 58 59 59 60 let stored: StoredEvent = match rmp_serde::from_slice(&v) { 60 61 Ok(e) => e, ··· 64 65 } 65 66 }; 66 67 67 - let Some(evt) = stored_to_event(&state, id, stored) else { 68 + let Some(out_evt) = stored_to_event(&state, id, stored, None) else { 68 69 continue; 69 70 }; 70 71 71 - if tx.blocking_send(evt).is_err() { 72 + if tx.blocking_send(out_evt).is_err() { 72 73 return; // receiver dropped 73 74 } 74 - found = true; 75 75 } 76 - if !found { 77 - break; 78 - } 76 + needs_catch_up = false; 79 77 } 80 78 81 79 // wait for live events 82 80 match event_rx.blocking_recv() { 83 - Ok(BroadcastEvent::Persisted(_)) => {} // re-run catch-up 81 + Ok(BroadcastEvent::Persisted(_)) => needs_catch_up = true, 82 + Ok(BroadcastEvent::LiveRecord(evt)) => { 83 + let expected = current_id.map(|id| id.saturating_add(1)).unwrap_or(0); 84 + if needs_catch_up || evt.id != expected { 85 + needs_catch_up = true; 86 + continue; 87 + } 88 + 89 + let stored = evt.stored.clone(); 90 + let Some(out_evt) = 91 + stored_to_event(&state, evt.id, stored, evt.inline_block.clone()) 92 + else { 93 + needs_catch_up = true; 94 + continue; 95 + }; 96 + let out_id = out_evt.id; 97 + if tx.blocking_send(out_evt).is_err() { 98 + return; 99 + } 100 + current_id = Some(out_id); 101 + } 84 102 Ok(BroadcastEvent::Ephemeral(evt)) => { 103 + let evt_id = evt.id; 85 104 if tx.blocking_send(*evt).is_err() { 86 105 return; 87 106 } 107 + current_id = Some(current_id.unwrap_or(0).max(evt_id)); 88 108 } 89 - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} 109 + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => needs_catch_up = true, 90 110 Err(tokio::sync::broadcast::error::RecvError::Closed) => break, 91 111 } 92 112 } ··· 105 125 let ks = state.db.relay_events.clone(); 106 126 let mut current_seq = match cursor { 107 127 Some(c) => c.saturating_sub(1), 108 - None => state 109 - .db 110 - .next_relay_seq 111 - .load(Ordering::Relaxed) 112 - .saturating_sub(1), 128 + None => ks 129 + .iter() 130 + .next_back() 131 + .and_then(|guard| { 132 + guard 133 + .key() 134 + .ok() 135 + .and_then(|k| k.as_ref().try_into().ok()) 136 + .map(u64::from_be_bytes) 137 + }) 138 + .unwrap_or(0), 113 139 }; 140 + let mut head_seq = current_seq; 141 + let mut needs_catch_up = true; 114 142 115 143 loop { 116 - // catch up from db: send all stored frames from current_seq+1 onward 117 - loop { 118 - let mut found = false; 144 + if needs_catch_up { 145 + // catch up from db: send all stored frames from current_seq+1 onward 119 146 for item in ks.range(crate::db::keys::relay_event_key(current_seq + 1)..) { 120 147 let (k, v) = match item.into_inner() { 121 148 Ok(kv) => kv, ··· 131 158 continue; 132 159 } 133 160 }; 134 - current_seq = seq; 161 + if seq != current_seq + 1 { 162 + break; 163 + } 135 164 if tx.blocking_send(bytes::Bytes::copy_from_slice(&v)).is_err() { 136 165 return; // subscriber dropped 137 166 } 138 - found = true; 167 + current_seq = seq; 168 + if current_seq >= head_seq { 169 + break; 170 + } 139 171 } 140 - if !found { 141 - break; 142 - } 172 + needs_catch_up = false; 143 173 } 144 174 145 175 // wait for live events 146 176 match relay_rx.blocking_recv() { 147 - Ok(RelayBroadcast::Persisted(_)) => {} // re-run catch-up 148 - Ok(RelayBroadcast::Ephemeral(frame)) => { 177 + Ok(RelayBroadcast::Persisted(seq)) => { 178 + head_seq = head_seq.max(seq); 179 + needs_catch_up = current_seq < head_seq; 180 + } 181 + Ok(RelayBroadcast::Ephemeral(seq, frame)) => { 182 + head_seq = head_seq.max(seq); 183 + if seq != current_seq + 1 { 184 + // out-of-order or gap: fall back to db catch-up to preserve ordering. 185 + needs_catch_up = true; 186 + continue; 187 + } 149 188 if tx.blocking_send(frame).is_err() { 150 189 return; 151 190 } 191 + current_seq = seq; 152 192 } 153 - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} // re-run catch-up 193 + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => needs_catch_up = true, 154 194 Err(tokio::sync::broadcast::error::RecvError::Closed) => break, 155 195 } 156 196 } 157 197 } 158 198 159 199 #[cfg(feature = "indexer_stream")] 160 - fn stored_to_event(state: &AppState, id: u64, stored: StoredEvent<'_>) -> Option<Event> { 200 + fn stored_to_event( 201 + state: &AppState, 202 + id: u64, 203 + stored: StoredEvent<'_>, 204 + inline_block: Option<bytes::Bytes>, 205 + ) -> Option<Event> { 161 206 let StoredEvent { 162 207 live, 163 208 did, ··· 170 215 171 216 let record = match data { 172 217 StoredData::Ptr(cid) => { 173 - let block = state 174 - .db 175 - .blocks 176 - .get(&keys::block_key(collection.as_str(), &cid.to_bytes())); 177 - match block { 178 - Ok(Some(bytes)) => match serde_ipld_dagcbor::from_slice::<RawData>(&bytes) { 218 + if let Some(bytes) = inline_block { 219 + match serde_ipld_dagcbor::from_slice::<RawData>(&bytes) { 179 220 Ok(val) => Some((cid, serde_json::to_value(val).ok()?)), 180 221 Err(e) => { 181 222 error!(err = %e, "cant parse block"); 182 223 return None; 183 224 } 184 - }, 185 - Ok(None) => { 186 - error!("block not found, this is a bug"); 187 - return None; 188 225 } 189 - Err(e) => { 190 - error!(err = %e, "cant get block"); 191 - db::check_poisoned(&e); 192 - return None; 226 + } else { 227 + let block = state 228 + .db 229 + .blocks 230 + .get(&keys::block_key(collection.as_str(), &cid.to_bytes())); 231 + match block { 232 + Ok(Some(bytes)) => { 233 + match serde_ipld_dagcbor::from_slice::<RawData>(bytes.as_ref()) { 234 + Ok(val) => Some((cid, serde_json::to_value(val).ok()?)), 235 + Err(e) => { 236 + error!(err = %e, "cant parse block"); 237 + return None; 238 + } 239 + } 240 + } 241 + Ok(None) => { 242 + error!("block not found, this is a bug"); 243 + return None; 244 + } 245 + Err(e) => { 246 + error!(err = %e, "cant get block"); 247 + db::check_poisoned(&e); 248 + return None; 249 + } 193 250 } 194 251 } 195 252 }
+13 -2
src/ingest/indexer.rs
··· 490 490 *ctx.added_blocks += res.blocks_count; 491 491 *ctx.records_delta += res.records_delta; 492 492 #[cfg(feature = "indexer_stream")] 493 - ctx.broadcast_events 494 - .push(BroadcastEvent::Persisted(db.next_event_id.load(SeqCst) - 1)); 493 + { 494 + use std::sync::Arc; 495 + 496 + let mut live_events = res.live_events; 497 + for evt in live_events.drain(..) { 498 + ctx.broadcast_events 499 + .push(BroadcastEvent::LiveRecord(Arc::new(evt))); 500 + } 501 + if let Some(last_id) = res.last_event_id { 502 + ctx.broadcast_events 503 + .push(BroadcastEvent::Persisted(last_id)); 504 + } 505 + } 495 506 496 507 Ok(RepoProcessResult::Ok(repo_state)) 497 508 }
+2
src/ingest/relay.rs
··· 921 921 let frame = make_frame(seq as i64)?; 922 922 self.batch 923 923 .insert(&db.relay_events, keys::relay_event_key(seq), frame.as_ref()); 924 + self.pending_broadcasts 925 + .push(RelayBroadcast::Ephemeral(seq, frame)); 924 926 self.pending_broadcasts.push(RelayBroadcast::Persisted(seq)); 925 927 Ok(()) 926 928 }
+74 -32
src/ops.rs
··· 19 19 #[cfg(feature = "indexer_stream")] 20 20 use { 21 21 crate::types::{ 22 - AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, StoredData, StoredEvent, 22 + AccountEvt, BroadcastEvent, IdentityEvt, LiveRecordEvent, MarshallableEvt, StoredData, 23 + StoredEvent, 23 24 }, 24 - jacquard_common::CowStr, 25 + jacquard_common::{CowStr, IntoStatic}, 25 26 std::sync::atomic::Ordering, 26 27 }; 27 28 ··· 207 208 pub repo_state: RepoState<'s>, 208 209 pub records_delta: i64, 209 210 pub blocks_count: i64, 211 + #[cfg(feature = "indexer_stream")] 212 + pub live_events: Vec<LiveRecordEvent>, 213 + #[cfg(feature = "indexer_stream")] 214 + pub last_event_id: Option<u64>, 210 215 } 211 216 212 217 pub fn apply_commit<'s>( ··· 231 236 let mut records_delta = 0; 232 237 let mut blocks_count = 0; 233 238 let mut collection_deltas: HashMap<&str, i64> = HashMap::new(); 239 + let rev = DbTid::from(&commit.rev); 240 + 241 + #[cfg(feature = "indexer_stream")] 242 + let should_broadcast_live = db.event_tx.receiver_count() > 0; 243 + #[cfg(feature = "indexer_stream")] 244 + let mut live_events = Vec::new(); 245 + #[cfg(feature = "indexer_stream")] 246 + let mut last_event_id = None; 234 247 235 248 for op in &commit.ops { 236 249 let (collection, rkey) = parse_path(&op.path)?; ··· 242 255 let rkey = DbRkey::new(rkey); 243 256 let db_key = keys::record_key(did, collection, &rkey); 244 257 258 + let action = DbAction::try_from(op.action.as_str())?; 259 + 245 260 #[cfg(feature = "indexer_stream")] 246 - let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 261 + let mut cid_for_event: Option<jacquard_common::types::cid::IpldCid> = None; 262 + #[cfg(feature = "indexer_stream")] 263 + let mut block_inline_for_event: Option<bytes::Bytes> = None; 264 + #[cfg(feature = "indexer_stream")] 265 + let mut inline_block: Option<bytes::Bytes> = None; 247 266 248 - let action = DbAction::try_from(op.action.as_str())?; 249 - let block: Option<bytes::Bytes> = match action { 267 + match action { 250 268 DbAction::Create | DbAction::Update => { 251 269 let Some(cid) = &op.cid else { 252 270 continue; ··· 255 273 .to_ipld() 256 274 .into_diagnostic() 257 275 .wrap_err("expected valid cid from relay")?; 276 + #[cfg(feature = "indexer_stream")] 277 + { 278 + cid_for_event = Some(cid_ipld.clone()); 279 + } 258 280 259 281 let Some(bytes) = parsed.blocks.get(&cid_ipld) else { 260 282 return Err(miette::miette!( ··· 286 308 &value, 287 309 )?; 288 310 } 289 - None 290 - } else { 291 - // in ephemeral mode, capture bytes inline for event emission 292 311 #[cfg(feature = "indexer_stream")] 293 - { 294 - Some(bytes.clone()) 312 + if should_broadcast_live && !only_index_links { 313 + // inline record bytes for live tailing so we don't have to load from blocks. 314 + inline_block = Some(bytes.clone()); 295 315 } 296 - #[cfg(not(feature = "indexer_stream"))] 316 + } else { 317 + #[cfg(feature = "indexer_stream")] 297 318 { 298 - let _ = bytes; 299 - None 319 + // in ephemeral mode, the event payload is the only place we persist the record. 320 + block_inline_for_event = Some(bytes.clone()); 300 321 } 301 322 } 302 323 } ··· 317 338 &rkey.to_smolstr(), 318 339 )?; 319 340 } 320 - 321 - None 322 341 } 323 342 }; 324 343 325 344 #[cfg(feature = "indexer_stream")] 326 345 { 346 + let data = block_inline_for_event 347 + .clone() 348 + .map(StoredData::Block) 349 + .or_else(|| { 350 + (!only_index_links) 351 + .then(|| cid_for_event.clone().map(StoredData::Ptr)) 352 + .flatten() 353 + }) 354 + .unwrap_or(StoredData::Nothing); 355 + 356 + let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 357 + last_event_id = Some(event_id); 358 + let did_trimmed = TrimmedDid::from(did); 359 + let collection = CowStr::Borrowed(collection); 360 + 327 361 let evt = StoredEvent { 328 362 live: true, 329 - did: TrimmedDid::from(did), 330 - rev: DbTid::from(&commit.rev), 331 - collection: CowStr::Borrowed(collection), 332 - rkey, 363 + did: did_trimmed.clone(), 364 + rev, 365 + collection: collection.clone(), 366 + rkey: rkey.clone(), 333 367 action, 334 - data: block 335 - .map(StoredData::Block) 336 - .or_else(|| { 337 - (!only_index_links).then(|| { 338 - op.cid 339 - .as_ref() 340 - .map(|c| c.to_ipld().expect("valid cid")) 341 - .map(StoredData::Ptr) 342 - })? 343 - }) 344 - .unwrap_or(StoredData::Nothing), 368 + data: data.clone(), 345 369 }; 346 370 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 347 371 batch.insert(&db.events, keys::event_key(event_id), bytes); 372 + 373 + if should_broadcast_live { 374 + live_events.push(LiveRecordEvent { 375 + id: event_id, 376 + stored: StoredEvent { 377 + live: evt.live, 378 + did: did_trimmed.into_static(), 379 + rev: evt.rev, 380 + collection: collection.into_static(), 381 + rkey, 382 + action: evt.action, 383 + data, 384 + }, 385 + inline_block, 386 + }); 387 + } 348 388 } 349 - #[cfg(not(feature = "indexer_stream"))] 350 - drop(block); 351 389 } 352 390 353 391 // update counts ··· 361 399 repo_state, 362 400 records_delta, 363 401 blocks_count, 402 + #[cfg(feature = "indexer_stream")] 403 + live_events, 404 + #[cfg(feature = "indexer_stream")] 405 + last_event_id, 364 406 }) 365 407 } 366 408
+18 -3
src/types.rs
··· 206 206 #[cfg(feature = "indexer_stream")] 207 207 #[derive(Clone, Debug)] 208 208 pub(crate) enum BroadcastEvent { 209 - #[allow(dead_code)] 210 - Persisted(u64), 209 + Persisted(#[allow(dead_code)] u64), 210 + /// a durable record event with optional inline block bytes for live tailing. 211 + /// 212 + /// used to avoid re-reading `events`/`blocks` from the database when tailing. 213 + LiveRecord(std::sync::Arc<super::LiveRecordEvent>), 211 214 Ephemeral(Box<MarshallableEvt<'static>>), 212 215 } 213 216 ··· 423 426 pub data: StoredData, 424 427 } 425 428 429 + /// a durable record event that is also emitted on the in-memory stream after commit. 430 + /// 431 + /// `inline_block` is only used for live tailing to avoid loading the record block from `blocks`. 432 + /// cursor replay continues to read from the database. 433 + #[cfg(feature = "indexer_stream")] 434 + #[derive(Debug, Clone)] 435 + pub(crate) struct LiveRecordEvent { 436 + pub id: u64, 437 + pub stored: StoredEvent<'static>, 438 + pub inline_block: Option<Bytes>, 439 + } 440 + 426 441 #[cfg(feature = "relay")] 427 442 #[derive(Clone)] 428 443 pub(crate) enum RelayBroadcast { 429 444 Persisted(#[allow(dead_code)] u64), 430 445 #[allow(dead_code)] 431 - Ephemeral(bytes::Bytes), 446 + Ephemeral(u64, bytes::Bytes), 432 447 }
+39 -9
tests/run_all.nu
··· 21 21 } 22 22 23 23 def run-test [] { 24 - let result = (with-env { 24 + let name = $in.name 25 + let base_env = { 25 26 HYDRANT_API_PORT: $in.api 26 27 HYDRANT_DEBUG_PORT: $in.debug 27 28 HYDRANT_TEST_MOCK_PORT: $in.mock 28 - HYDRANT_BINARY: "target/x86_64-unknown-linux-gnu/debug/hydrant" 29 - } { 30 - ^nu $"tests/($in.name).nu" | complete 29 + } 30 + let binary = $in.binary? 31 + let env_vars = if $binary == null { $base_env } else { $base_env | insert HYDRANT_BINARY $binary } 32 + 33 + let result = (with-env $env_vars { 34 + ^nu $"tests/($name).nu" | complete 31 35 }) 32 36 { 33 - name: $in.name 37 + name: $name 34 38 success: ($result.exit_code == 0) 35 39 output: $result.stdout 36 40 stderr: $result.stderr 41 + } 42 + } 43 + 44 + def test-needs-relay-binary [name: string] { 45 + # tests that build a relay-only binary must run last (and serially) to avoid racing on 46 + # `target/` artifacts while other tests are executing. 47 + try { 48 + open --raw $"tests/($name).nu" | str contains "build-hydrant-relay" 49 + } catch { 50 + false 37 51 } 38 52 } 39 53 ··· 64 78 } 65 79 let ports = get_free_ports (($tests | length) * 3) 66 80 81 + let relay_tests = $tests | where {|t| test-needs-relay-binary $t } 82 + 67 83 mut assigned = [] 68 84 for test in ($tests | enumerate) { 69 85 let p = {($test | get index) * 3 + $in} 86 + let name = ($test | get item) 87 + let binary = if ($relay_tests | any {$in == $name}) { null } else { "target/x86_64-unknown-linux-gnu/debug/hydrant" } 70 88 let entry = { 71 - name: ($test | get item), 89 + name: $name, 72 90 api: ($ports | get (0 | do $p)), 73 91 debug: ($ports | get (1 | do $p)), 74 - mock: ($ports | get (2 | do $p)) 92 + mock: ($ports | get (2 | do $p)), 93 + binary: $binary, 75 94 } 76 95 $assigned = ($assigned | append $entry) 77 96 } 97 + 98 + let relay_assigned = $assigned | where {|t| $t.binary == null } 99 + let parallel_assigned = $assigned | where {|t| $t.binary != null } 78 100 79 101 let groups = { 80 102 "authenticated_stream": "event_dependent", 81 103 "count_tracking": "event_dependent", 82 104 "signal_filter": "event_dependent", 83 105 } 84 - let grouped = $assigned | group-by {|t| $groups | get -o $t.name | default $t.name} 106 + let grouped = $parallel_assigned | group-by { 107 + let name = $in.name 108 + $groups | get -o $name | default $name 109 + } 85 110 86 111 print $"running ($assigned | length) tests...\n" 112 + if not ($relay_assigned | is-empty) { 113 + print $"note: relay-binary tests will run last and not in parallel: (($relay_assigned | get name) | str join ', ')\n" 114 + } 87 115 88 116 let run_group = {each {timeit -o {run-test} | {time: $in.time, ...$in.output}}}; 89 - let results = $grouped | values | par-each {do $run_group} | flatten 117 + let parallel_results = $grouped | values | par-each {do $run_group} | flatten 118 + let relay_results = if ($relay_assigned | is-empty) { [] } else { $relay_assigned | do $run_group } 119 + let results = $parallel_results | append $relay_results 90 120 91 121 print "\n=== results ===\n" 92 122 for r in $results {