very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest] keep repo state cached for the duration of that batch since we dont insert to db until thats over

dawn 19362e3d 0b37289f

+62 -19
+6 -4
src/api/repo.rs
··· 1 1 use crate::api::AppState; 2 - use crate::db::{keys, ser_repo_state, Db}; 2 + use crate::db::{Db, keys, ser_repo_state}; 3 + use crate::ops::send_backfill_req; 3 4 use crate::types::RepoState; 4 - use axum::{extract::State, http::StatusCode, routing::post, Json, Router}; 5 - use jacquard::{types::did::Did, IntoStatic}; 5 + use axum::{Json, Router, extract::State, http::StatusCode, routing::post}; 6 + use jacquard::types::did::Did; 6 7 use serde::Deserialize; 7 8 use std::sync::Arc; 8 9 ··· 59 60 60 61 // trigger backfill 61 62 for did in to_backfill { 62 - let _ = state.backfill_tx.send(did.into_static()); 63 + send_backfill_req(&state, did) 64 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 63 65 } 64 66 } 65 67 Ok(StatusCode::OK)
+5 -2
src/api/stream.rs
··· 3 3 use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent}; 4 4 use axum::{ 5 5 extract::{ 6 - ws::{Message, WebSocket, WebSocketUpgrade}, 7 6 Query, State, 7 + ws::{Message, WebSocket, WebSocketUpgrade}, 8 8 }, 9 9 response::IntoResponse, 10 10 }; ··· 97 97 rkey, 98 98 action, 99 99 record: record_val, 100 - cid, 100 + cid: cid.map(|c| match c { 101 + jacquard::types::cid::Cid::Ipld { s, .. } => s, 102 + jacquard::types::cid::Cid::Str(s) => s, 103 + }), 101 104 }), 102 105 identity: None, 103 106 account: None,
+42 -6
src/ingest/worker.rs
··· 106 106 let mut failed = Vec::<BufferedMessage>::new(); 107 107 108 108 let _g = handle.enter(); 109 + let mut repo_cache = HashMap::new(); 110 + let mut deleted = HashSet::new(); 109 111 110 112 loop { 111 113 let mut batch = self.state.db.inner.batch(); 112 - let mut deleted = HashSet::new(); 114 + repo_cache.clear(); 115 + deleted.clear(); 113 116 114 117 // resolve signing keys for commits and syncs if verification is enabled 115 118 let keys = if self.verify_signatures { ··· 149 152 continue; 150 153 } 151 154 152 - match self.process_message(&mut batch, &msg, did, &keys) { 155 + match self.process_message(&mut repo_cache, &mut batch, &msg, did, &keys) { 153 156 Ok(ProcessResult::Ok) => {} 154 157 Ok(ProcessResult::Deleted) => { 155 158 deleted.insert(did.clone()); ··· 210 213 211 214 fn process_message( 212 215 &self, 216 + repo_cache: &mut HashMap<Did<'static>, RepoState<'static>>, 213 217 batch: &mut OwnedWriteBatch, 214 218 msg: &BufferedMessage, 215 219 did: &Did, ··· 218 222 let state = &self.state; 219 223 let verify_signatures = self.verify_signatures; 220 224 221 - let RepoCheckResult::Ok(repo_state) = Self::check_repo_state(batch, state, did)? else { 225 + let RepoCheckResult::Ok(repo_state) = 226 + Self::check_repo_state(repo_cache, batch, state, did, msg)? 227 + else { 222 228 return Ok(ProcessResult::Ok); 223 229 }; 224 230 ··· 278 284 return Ok(ProcessResult::Ok); 279 285 } 280 286 281 - ops::apply_commit(batch, &state.db, repo_state, &commit, get_key()?)?(); 287 + let (new_state, cb) = 288 + ops::apply_commit(batch, &state.db, repo_state, &commit, get_key()?)?; 289 + cb(); 290 + repo_cache.insert(did.clone().into_static(), new_state); 282 291 } 283 292 SubscribeReposMessage::Sync(sync) => { 284 293 debug!("processing buffered sync for {did}"); ··· 348 357 return Ok(ProcessResult::Deleted); 349 358 } 350 359 status => { 351 - let status = match status { 360 + let target_status = match status { 352 361 Some(status) => match status { 353 362 AccountStatus::Deleted => { 354 363 unreachable!("deleted account status is handled before") ··· 374 383 RepoStatus::Error("unknown".into()) 375 384 } 376 385 }; 377 - ops::update_repo_status(batch, &state.db, did, repo_state, status)?; 386 + 387 + if repo_state.status == target_status { 388 + debug!("account status unchanged for {did}: {target_status:?}"); 389 + return Ok(ProcessResult::Ok); 390 + } 391 + 392 + let new_state = ops::update_repo_status( 393 + batch, 394 + &state.db, 395 + did, 396 + repo_state, 397 + target_status, 398 + )?; 399 + repo_cache.insert(did.clone().into_static(), new_state); 378 400 } 379 401 } 380 402 } else { ··· 395 417 } 396 418 397 419 fn check_repo_state( 420 + repo_cache: &mut HashMap<Did<'static>, RepoState<'static>>, 398 421 batch: &mut OwnedWriteBatch, 399 422 state: &AppState, 400 423 did: &Did<'_>, 424 + msg: &BufferedMessage, 401 425 ) -> Result<RepoCheckResult> { 402 426 // check if we have this repo 427 + if let Some(state) = repo_cache.get(did) { 428 + return Ok(RepoCheckResult::Ok(state.clone())); 429 + } 430 + 403 431 let repo_key = keys::repo_key(&did); 404 432 let Some(state_bytes) = state.db.repos.get(&repo_key).into_diagnostic()? else { 405 433 // we don't know this repo, but we are receiving events for it ··· 444 472 RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => { 445 473 // if it was in deactivated/takendown/suspended state, we can mark it as synced 446 474 // because we are receiving live events now 475 + // UNLESS it is an account status event that keeps it deactivated 476 + if let SubscribeReposMessage::Account(acc) = msg { 477 + if !acc.active { 478 + return Ok(RepoCheckResult::Ok(repo_state)); 479 + } 480 + } 481 + 447 482 repo_state = ops::update_repo_status( 448 483 batch, 449 484 &state.db, ··· 451 486 repo_state, 452 487 RepoStatus::Synced, 453 488 )?; 489 + repo_cache.insert(did.clone().into_static(), repo_state.clone()); 454 490 Ok(RepoCheckResult::Ok(repo_state)) 455 491 } 456 492 }
+5 -5
src/ops.rs
··· 170 170 )) 171 171 } 172 172 173 - pub fn apply_commit<'batch, 'db>( 173 + pub fn apply_commit<'batch, 'db, 's>( 174 174 batch: &'batch mut OwnedWriteBatch, 175 175 db: &'db Db, 176 - mut repo_state: RepoState, 176 + mut repo_state: RepoState<'s>, 177 177 commit: &Commit<'_>, 178 178 signing_key: Option<&PublicKey>, 179 - ) -> Result<impl FnOnce() + use<'db>> { 179 + ) -> Result<(RepoState<'s>, impl FnOnce() + use<'db>)> { 180 180 let did = &commit.repo; 181 181 debug!("applying commit {} for {did}", &commit.commit); 182 182 ··· 281 281 db.next_event_id.load(Ordering::SeqCst) - 1, 282 282 )); 283 283 284 - Ok(move || { 284 + Ok((repo_state, move || { 285 285 if blocks_count > 0 { 286 286 db.update_count("blocks", blocks_count); 287 287 } ··· 291 291 if events_count > 0 { 292 292 db.update_count("events", events_count); 293 293 } 294 - }) 294 + })) 295 295 } 296 296 297 297 pub fn parse_path(path: &str) -> Result<(&str, &str)> {
+2 -2
src/types.rs
··· 1 1 use std::fmt::Display; 2 2 3 3 use jacquard::{ 4 - types::{cid::Cid, tid::Tid}, 5 4 CowStr, IntoStatic, 5 + types::{cid::Cid, tid::Tid}, 6 6 }; 7 7 use jacquard_common::types::string::Did; 8 8 use serde::{Deserialize, Serialize}; ··· 140 140 #[serde(skip_serializing_if = "Option::is_none")] 141 141 pub record: Option<Value>, 142 142 #[serde(skip_serializing_if = "Option::is_none")] 143 - pub cid: Option<Cid<'i>>, 143 + pub cid: Option<CowStr<'i>>, 144 144 } 145 145 146 146 #[derive(Debug, Serialize, Deserialize, Clone)]
+2
tests/authenticated_stream_test.nu
··· 105 105 } catch { 106 106 print "warning: failed to add repo (might already be tracked), continuing..." 107 107 } 108 + 109 + sleep 5sec 108 110 109 111 # 5. perform actions 110 112 let collection = "app.bsky.feed.post"