very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[db] store the whole root commit in repo state (prev and sig)

dawn b4880fe5 6d695694

+169 -85
+3 -3
src/api/xrpc/get_latest_commit.rs
··· 41 41 } 42 42 43 43 // return whatever we last recorded; if we haven't synced at all yet, we have nothing to give 44 - let (Some(data), Some(rev_db)) = (state.data, state.rev) else { 44 + let Some(commit) = state.root else { 45 45 return Err(XrpcErrorResponse { 46 46 status: StatusCode::NOT_FOUND, 47 47 error: XrpcError::Xrpc(GetLatestCommitError::RepoNotFound(Some(CowStr::Borrowed( ··· 51 51 }; 52 52 53 53 Ok(Json(GetLatestCommitOutput { 54 - cid: Cid::from(data), 55 - rev: rev_db.to_tid(), 54 + cid: Cid::from(commit.data), 55 + rev: commit.rev.to_tid(), 56 56 extra_data: None, 57 57 })) 58 58 }
+1 -1
src/api/xrpc/get_repo_status.rs
··· 32 32 let (active, status) = repo_status_to_api(state.status); 33 33 34 34 // rev is only meaningful when the repo is active and has been synced at least once 35 - let rev = active.then(|| state.rev.map(|r| r.to_tid())).flatten(); 35 + let rev = active.then(|| state.root.map(|c| c.rev.to_tid())).flatten(); 36 36 37 37 Ok(Json(GetRepoStatusOutput { 38 38 active,
+3 -3
src/api/xrpc/list_repos.rs
··· 34 34 let (did, state) = item?; 35 35 36 36 // skip repos that haven't been synced at least once 37 - let (Some(data), Some(rev_db)) = (state.data, state.rev) else { 37 + let Some(commit) = state.root else { 38 38 continue; 39 39 }; 40 40 ··· 42 42 repos.push(Repo { 43 43 active: Some(active), 44 44 did: did.clone(), 45 - head: Cid::from(data), 46 - rev: rev_db.to_tid(), 45 + head: Cid::from(commit.data), 46 + rev: commit.rev.to_tid(), 47 47 status, 48 48 extra_data: None, 49 49 });
+8 -7
src/backfill/mod.rs
··· 1 - use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 1 + use crate::db::types::{DbAction, DbRkey, TrimmedDid}; 2 2 use crate::db::{self, Db, keys, ser_repo_state}; 3 3 use crate::filter::FilterMode; 4 4 use crate::ops; 5 5 use crate::resolver::ResolverError; 6 6 use crate::state::AppState; 7 7 use crate::types::{ 8 - AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState, 9 - StoredData, StoredEvent, 8 + AccountEvt, BroadcastEvent, Commit, GaugeState, RepoState, RepoStatus, ResyncErrorKind, 9 + ResyncState, StoredData, StoredEvent, 10 10 }; 11 11 12 12 use fjall::Slice; ··· 545 545 trace!("signature verified"); 546 546 } 547 547 548 + let root_commit = Commit::from(root_commit); 549 + 548 550 // 5. walk mst 549 551 let start = Instant::now(); 550 552 let mst: Mst<MemoryBlockStore> = Mst::load(store, root_commit.data, None); ··· 664 666 let evt = StoredEvent { 665 667 live: false, 666 668 did: TrimmedDid::from(&did), 667 - rev: DbTid::from(&rev), 669 + rev, 668 670 collection: CowStr::Borrowed(collection), 669 671 rkey, 670 672 action, ··· 700 702 let evt = StoredEvent { 701 703 live: false, 702 704 did: TrimmedDid::from(&did), 703 - rev: DbTid::from(&rev), 705 + rev, 704 706 collection: CowStr::Borrowed(&collection), 705 707 rkey, 706 708 action: DbAction::Delete, ··· 720 722 721 723 // 6. update data, status is updated in worker shard 722 724 state.tracked = true; 723 - state.rev = Some((&rev).into()); 724 - state.data = Some(root_commit.data); 725 + state.root = Some(root_commit); 725 726 state.touch(); 726 727 727 728 batch.insert(
+6 -2
src/control/repos.rs
··· 389 389 } 390 390 391 391 pub(crate) fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>) -> RepoInfo { 392 + let (rev, data) = s 393 + .root 394 + .map(|c| (Some(c.rev.to_tid()), Some(c.data))) 395 + .unwrap_or_default(); 392 396 RepoInfo { 393 397 did, 394 398 status: s.status, 395 399 tracked: s.tracked, 396 - rev: s.rev.map(|r| r.to_tid()), 397 - data: s.data, 400 + rev, 401 + data, 398 402 handle: s.handle.map(|h| h.into_static()), 399 403 pds: s.pds.and_then(|p| p.parse().ok()), 400 404 signing_key: s.signing_key.map(|k| k.into_static()),
+5 -2
src/db/migration/mod.rs
··· 5 5 use crate::db::keys::VERSIONING_KEY; 6 6 7 7 mod v1; 8 + mod v2; 8 9 9 10 type MigrationFn = fn(&Db, &mut OwnedWriteBatch) -> Result<()>; 10 11 11 12 /// ordered list of migrations. migration at index `i` upgrades the schema from version `i` to `i+1`. 12 - const MIGRATIONS: &[(&str, MigrationFn)] = 13 - &[("stable_firehose_cursors", v1::stable_firehose_cursors)]; 13 + const MIGRATIONS: &[(&str, MigrationFn)] = &[ 14 + ("stable_firehose_cursors", v1::stable_firehose_cursors), 15 + ("repo_state_root_commit", v2::repo_state_root_commit), 16 + ]; 14 17 15 18 fn read_version(db: &Db) -> Result<u64> { 16 19 db.counts
+68
src/db/migration/v2.rs
··· 1 + use bytes::Bytes; 2 + use cid::Cid as IpldCid; 3 + use fjall::OwnedWriteBatch; 4 + use jacquard_common::{CowStr, types::string::Handle}; 5 + use miette::{Context, IntoDiagnostic, Result}; 6 + use serde::{Deserialize, Serialize}; 7 + 8 + use crate::db::{ 9 + Db, 10 + types::{DbTid, DidKey}, 11 + }; 12 + use crate::types::v2::*; 13 + 14 + #[derive(Debug, Clone, Serialize, Deserialize)] 15 + #[serde(bound(deserialize = "'i: 'de"))] 16 + pub(crate) struct OldRepoState<'i> { 17 + pub status: RepoStatus, // from v2, old is same as new 18 + pub rev: Option<DbTid>, 19 + pub data: Option<IpldCid>, 20 + pub last_message_time: Option<i64>, 21 + pub last_updated_at: i64, 22 + pub tracked: bool, 23 + pub index_id: u64, 24 + #[serde(borrow)] 25 + pub signing_key: Option<DidKey<'i>>, 26 + #[serde(borrow)] 27 + pub pds: Option<CowStr<'i>>, 28 + #[serde(borrow)] 29 + pub handle: Option<Handle<'i>>, 30 + } 31 + 32 + pub(super) fn repo_state_root_commit(db: &Db, batch: &mut OwnedWriteBatch) -> Result<()> { 33 + for item in db.repos.iter() { 34 + let (k, v) = item.into_inner().into_diagnostic()?; 35 + let old: OldRepoState = rmp_serde::from_slice(&v) 36 + .into_diagnostic() 37 + .wrap_err("invalid old repo state")?; 38 + let new = RepoState { 39 + root: match (old.rev, old.data) { 40 + (Some(rev), Some(data)) => Some(Commit { 41 + version: -1, 42 + rev, 43 + data, 44 + prev: None, 45 + sig: Bytes::new(), 46 + }), 47 + _ => None, 48 + }, 49 + status: old.status, 50 + handle: old.handle, 51 + index_id: old.index_id, 52 + last_message_time: old.last_message_time, 53 + last_updated_at: old.last_updated_at, 54 + pds: old.pds, 55 + signing_key: old.signing_key, 56 + tracked: old.tracked, 57 + }; 58 + batch.insert( 59 + &db.repos, 60 + k, 61 + rmp_serde::to_vec(&new) 62 + .into_diagnostic() 63 + .wrap_err("cant serialize new repo state")?, 64 + ); 65 + } 66 + 67 + Ok(()) 68 + }
+10 -11
src/ingest/worker.rs
··· 435 435 repo_state.advance_message_time(commit.time.0.timestamp_millis()); 436 436 437 437 // skip replayed events (already seen revision) 438 - if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) { 438 + if matches!(repo_state.root, Some(ref root) if commit.rev.as_str() <= root.rev.to_tid().as_str()) 439 + { 439 440 debug!( 440 441 did = %did, 441 442 commit_rev = %commit.rev, 442 - state_rev = %repo_state.rev.as_ref().map(|r| r.to_tid()).expect("we checked in if"), 443 + state_rev = %repo_state.root.as_ref().map(|c| c.rev.to_tid()).expect("we checked in if"), 443 444 "skipping replayed event" 444 445 ); 445 446 return Ok(RepoProcessResult::Ok(repo_state)); 446 447 } 447 448 448 - if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data) 449 - && repo 450 - != &prev_commit 449 + if let (Some(repo_commit), Some(prev_commit)) = (&repo_state.root, &commit.prev_data) 450 + && repo_commit.data 451 + != prev_commit 451 452 .0 452 453 .to_ipld() 453 454 .into_diagnostic() ··· 455 456 { 456 457 warn!( 457 458 did = %did, 458 - repo = %repo, 459 + repo = %repo_commit.data, 459 460 prev_commit = %prev_commit.0, 460 461 "gap detected, triggering backfill" 461 462 ); ··· 508 509 509 510 match ops::verify_sync_event(sync.blocks.as_ref(), Self::fetch_key(ctx, did)?.as_ref()) { 510 511 Ok((root, rev)) => { 511 - if let Some(current_data) = &repo_state.data { 512 - if current_data == &root.to_ipld().expect("valid cid") { 512 + if let Some(current_commit) = &repo_state.root { 513 + if current_commit.data == root.to_ipld().expect("valid cid") { 513 514 debug!(did = %did, "skipping noop sync"); 514 515 return Ok(RepoProcessResult::Ok(repo_state)); 515 516 } 516 - } 517 517 518 - if let Some(current_rev) = &repo_state.rev { 519 - if rev.as_str() <= current_rev.to_tid().as_str() { 518 + if rev.as_str() <= current_commit.rev.to_tid().as_str() { 520 519 debug!(did = %did, "skipping replayed sync"); 521 520 return Ok(RepoProcessResult::Ok(repo_state)); 522 521 }
+3 -4
src/ops.rs
··· 256 256 .get(&parsed.root) 257 257 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 258 258 259 - let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 259 + let root_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 260 260 261 261 if let Some(key) = signing_key { 262 - repo_commit 262 + root_commit 263 263 .verify(key) 264 264 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 265 265 trace!(did = %did, "signature verified"); 266 266 } 267 267 268 - repo_state.rev = Some((&commit.rev).into()); 269 - repo_state.data = Some(repo_commit.data); 268 + repo_state.root = Some(root_commit.into()); 270 269 repo_state.touch(); 271 270 272 271 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?);
+62 -52
src/types.rs
··· 12 12 use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid}; 13 13 use crate::resolver::MiniDoc; 14 14 15 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 16 - pub enum RepoStatus { 17 - Backfilling, 18 - Synced, 19 - Error(SmolStr), 20 - Deactivated, 21 - Takendown, 22 - Suspended, 15 + pub(crate) mod v2 { 16 + use super::*; 17 + 18 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 19 + pub enum RepoStatus { 20 + Backfilling, 21 + Synced, 22 + Error(SmolStr), 23 + Deactivated, 24 + Takendown, 25 + Suspended, 26 + } 27 + 28 + #[derive(Debug, Clone, Serialize, Deserialize)] 29 + pub(crate) struct Commit { 30 + pub version: i64, 31 + pub rev: DbTid, 32 + pub data: IpldCid, 33 + pub prev: Option<IpldCid>, 34 + #[serde(with = "jacquard_common::serde_bytes_helper")] 35 + pub sig: Bytes, 36 + } 37 + 38 + #[derive(Debug, Clone, Serialize, Deserialize)] 39 + #[serde(bound(deserialize = "'i: 'de"))] 40 + pub(crate) struct RepoState<'i> { 41 + pub status: RepoStatus, 42 + pub root: Option<Commit>, 43 + // todo: is this actually valid? the spec says this is informal and intermadiate 44 + // services may change it. we should probably document it. if we cant use this 45 + // then how do we dedup account / identity ops? 46 + /// ms since epoch of the last firehose message we processed for this repo. 47 + /// used to deduplicate identity / account events that can arrive from multiple relays at 48 + /// different wall-clock times but represent the same underlying PDS event. 49 + pub last_message_time: Option<i64>, 50 + /// this is when we *ingested* any last updates 51 + pub last_updated_at: i64, // unix timestamp 52 + /// whether we are ingesting events for this repo 53 + pub tracked: bool, 54 + /// index id in pending keyspace 55 + pub index_id: u64, 56 + #[serde(borrow)] 57 + pub signing_key: Option<DidKey<'i>>, 58 + #[serde(borrow)] 59 + pub pds: Option<CowStr<'i>>, 60 + #[serde(borrow)] 61 + pub handle: Option<Handle<'i>>, 62 + } 23 63 } 64 + pub(crate) use v2::*; 24 65 25 66 impl Display for RepoStatus { 26 67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { ··· 35 76 } 36 77 } 37 78 38 - #[derive(Debug, Clone, Serialize, Deserialize)] 39 - #[serde(bound(deserialize = "'i: 'de"))] 40 - pub(crate) struct RepoState<'i> { 41 - pub status: RepoStatus, 42 - pub rev: Option<DbTid>, 43 - pub data: Option<IpldCid>, 44 - // todo: is this actually valid? the spec says this is informal and intermadiate 45 - // services may change it. we should probably document it. if we cant use this 46 - // then how do we dedup account / identity ops? 47 - /// ms since epoch of the last firehose message we processed for this repo. 48 - /// used to deduplicate identity / account events that can arrive from multiple relays at 49 - /// different wall-clock times but represent the same underlying PDS event. 50 - #[serde(default)] 51 - pub last_message_time: Option<i64>, 52 - /// this is when we *ingested* any last updates 53 - pub last_updated_at: i64, // unix timestamp 54 - /// whether we are ingesting events for this repo 55 - pub tracked: bool, 56 - /// index id in pending keyspace 57 - pub index_id: u64, 58 - #[serde(borrow)] 59 - pub signing_key: Option<DidKey<'i>>, 60 - #[serde(borrow)] 61 - pub pds: Option<CowStr<'i>>, 62 - #[serde(borrow)] 63 - pub handle: Option<Handle<'i>>, 79 + impl<'c> From<jacquard_repo::commit::Commit<'c>> for Commit { 80 + fn from(value: jacquard_repo::commit::Commit<'c>) -> Self { 81 + Self { 82 + data: value.data, 83 + prev: value.prev, 84 + rev: DbTid::from(&value.rev), 85 + sig: value.sig, 86 + version: value.version, 87 + } 88 + } 64 89 } 65 90 66 91 impl<'i> RepoState<'i> { 67 92 pub fn backfilling(index_id: u64) -> Self { 68 93 Self { 69 94 status: RepoStatus::Backfilling, 70 - rev: None, 71 - data: None, 95 + root: None, 72 96 last_updated_at: chrono::Utc::now().timestamp(), 73 97 index_id, 74 98 tracked: true, ··· 115 139 fn into_static(self) -> Self::Output { 116 140 RepoState { 117 141 status: self.status, 118 - rev: self.rev, 119 - data: self.data, 142 + root: self.root, 120 143 last_updated_at: self.last_updated_at, 121 144 index_id: self.index_id, 122 145 tracked: self.tracked, ··· 247 270 pub(crate) enum StoredData { 248 271 Nothing, 249 272 Ptr(IpldCid), 250 - #[serde(with = "serde_bytes_squared")] 273 + #[serde(with = "jacquard_common::serde_bytes_helper")] 251 274 Block(Bytes), 252 275 } 253 276 ··· 288 311 #[serde(default)] 289 312 #[serde(skip_serializing_if = "StoredData::is_nothing")] 290 313 pub data: StoredData, 291 - } 292 - 293 - mod serde_bytes_squared { 294 - use bytes::Bytes; 295 - use serde::{Deserialize, Deserializer, Serializer}; 296 - 297 - pub fn serialize<S: Serializer>(v: impl AsRef<[u8]>, s: S) -> Result<S::Ok, S::Error> { 298 - s.serialize_bytes(serde_bytes::Bytes::new(v.as_ref())) 299 - } 300 - 301 - pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Bytes, D::Error> { 302 - serde_bytes::ByteBuf::deserialize(d).map(|b| b.into_vec().into()) 303 - } 304 314 } 305 315 306 316 #[derive(Debug, PartialEq, Eq, Clone, Copy)]