very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest] handle sync messages, error and skip message if we cant fetch key properly for verification

dawn a4b764b4 41cb51ee

+174 -27
+131 -21
src/ingest/worker.rs
··· 1 1 use crate::db::{self, keys}; 2 2 use crate::ingest::BufferedMessage; 3 3 use crate::ops::{self, send_backfill_req}; 4 + use crate::resolver::NoSigningKeyError; 4 5 use crate::state::AppState; 5 6 use crate::types::{AccountEvt, IdentityEvt, RepoState, RepoStatus}; 6 7 use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage; ··· 11 12 use jacquard::types::did::Did; 12 13 use jacquard_common::IntoStatic; 13 14 use jacquard_common::types::crypto::PublicKey; 14 - use miette::{IntoDiagnostic, Result}; 15 + use jacquard_repo::error::CommitError; 16 + use miette::{Diagnostic, IntoDiagnostic, Result}; 15 17 use smol_str::ToSmolStr; 16 18 use std::collections::{HashMap, HashSet}; 17 19 use std::sync::Arc; ··· 19 21 use tokio::sync::mpsc; 20 22 use tracing::{debug, error, trace, warn}; 21 23 24 + #[derive(Debug)] 25 + struct KeyFetchError(miette::Report); 26 + 27 + impl std::fmt::Display for KeyFetchError { 28 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 29 + write!(f, "{}", self.0) 30 + } 31 + } 32 + 33 + impl std::error::Error for KeyFetchError { 34 + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { 35 + self.0.source() 36 + } 37 + } 38 + 39 + impl Diagnostic for KeyFetchError { 40 + fn code<'a>(&'a self) -> Option<Box<dyn std::fmt::Display + 'a>> { 41 + self.0.code() 42 + } 43 + 44 + fn help<'a>(&'a self) -> Option<Box<dyn std::fmt::Display + 'a>> { 45 + self.0.help() 46 + } 47 + 48 + fn labels(&self) -> Option<Box<dyn Iterator<Item = miette::LabeledSpan> + '_>> { 49 + self.0.labels() 50 + } 51 + 52 + fn diagnostic_source(&self) -> Option<&dyn Diagnostic> { 53 + self.0.diagnostic_source() 54 + } 55 + 56 + fn related<'a>(&'a self) -> Option<Box<dyn Iterator<Item = &'a dyn Diagnostic> + 'a>> { 57 + self.0.related() 58 + } 59 + 60 + fn source_code(&self) -> Option<&dyn miette::SourceCode> { 61 + self.0.source_code() 62 + } 63 + 64 + fn severity(&self) -> Option<miette::Severity> { 65 + self.0.severity() 66 + } 67 + 68 + fn url<'a>(&'a self) -> Option<Box<dyn std::fmt::Display + 'a>> { 69 + self.0.url() 70 + } 71 + } 72 + 22 73 #[derive(Debug, Clone, Copy)] 23 74 enum ProcessResult { 24 75 Deleted, ··· 58 109 let mut batch = self.state.db.inner.batch(); 59 110 let mut deleted = HashSet::new(); 60 111 61 - // resolve signing keys for commits if verification is enabled 112 + // resolve signing keys for commits and syncs if verification is enabled 62 113 let keys = if self.verify_signatures { 63 114 let dids: HashSet<Did> = buf 64 115 .iter() 65 116 .filter_map(|msg| match msg { 66 117 SubscribeReposMessage::Commit(c) => Some(c.repo.clone()), 118 + SubscribeReposMessage::Sync(s) => Some(s.did.clone()), 67 119 _ => None, 68 120 }) 69 121 .collect(); 70 122 71 123 let futures = dids.into_iter().map(|did| async { 72 - match self.state.resolver.resolve_signing_key(&did).await { 73 - Ok(key) => Some((did, key)), 74 - Err(e) => { 75 - warn!("failed to resolve key for {did}: {e}"); 76 - None 77 - } 78 - } 124 + let res = self.state.resolver.resolve_signing_key(&did).await; 125 + (did, res) 79 126 }); 80 127 81 - handle 82 - .block_on(join_all(futures)) 83 - .into_iter() 84 - .flatten() 85 - .collect() 128 + handle.block_on(join_all(futures)).into_iter().collect() 86 129 } else { 87 130 HashMap::new() 88 131 }; ··· 104 147 continue; 105 148 } 106 149 107 - match Self::process_message(&self.state, &mut batch, &msg, did, &keys) { 150 + match self.process_message(&mut batch, &msg, did, &keys) { 108 151 Ok(ProcessResult::Ok) => {} 109 152 Ok(ProcessResult::Deleted) => { 110 153 deleted.insert(did.clone()); 111 154 } 112 155 Err(e) => { 113 - error!("failed to process buffered message for {did}: {e}"); 156 + error!("error processing message for {did}: {e}"); 114 157 db::check_poisoned_report(&e); 115 - failed.push(msg); 158 + // dont retry commit or sync on key fetch errors 159 + // since we'll just try again later if we get commit or sync again 160 + if e.downcast_ref::<KeyFetchError>().is_none() 161 + && e.downcast_ref::<CommitError>().is_none() 162 + && e.downcast_ref::<NoSigningKeyError>().is_none() 163 + { 164 + failed.push(msg); 165 + } 116 166 } 117 167 } 118 168 ··· 154 204 } 155 205 156 206 fn process_message( 157 - state: &Arc<AppState>, 207 + &self, 158 208 batch: &mut OwnedWriteBatch, 159 209 msg: &BufferedMessage, 160 210 did: &Did, 161 - keys: &HashMap<Did<'static>, PublicKey<'static>>, 211 + keys: &HashMap<Did<'static>, Result<PublicKey<'static>>>, 162 212 ) -> Result<ProcessResult> { 213 + let state = &self.state; 214 + let verify_signatures = self.verify_signatures; 215 + 163 216 let RepoCheckResult::Ok(repo_state) = Self::check_repo_state(batch, state, did)? else { 164 217 return Ok(ProcessResult::Ok); 165 218 }; 166 219 220 + let get_key = || { 221 + if verify_signatures { 222 + let key = keys.get(did).ok_or_else(|| { 223 + KeyFetchError(miette::miette!( 224 + "!!! THIS IS A BUG !!! missing pubkey for {did}" 225 + )) 226 + })?; 227 + match key { 228 + Ok(key) => Ok(Some(key)), 229 + Err(e) => { 230 + return Err(KeyFetchError(miette::miette!( 231 + "failed to get pubkey for {did}: {e}" 232 + ))); 233 + } 234 + } 235 + } else { 236 + Ok(None) 237 + } 238 + }; 239 + 167 240 match msg { 168 241 SubscribeReposMessage::Commit(commit) => { 169 242 trace!("processing buffered commit for {did}"); ··· 195 268 RepoStatus::Backfilling, 196 269 )?; 197 270 batch.commit().into_diagnostic()?; 198 - 199 271 send_backfill_req(state, did.clone().into_static())?; 200 272 201 273 return Ok(ProcessResult::Ok); 202 274 } 203 275 204 - ops::apply_commit(batch, &state.db, repo_state, &commit, keys.get(did))?(); 276 + ops::apply_commit(batch, &state.db, repo_state, &commit, get_key()?)?(); 277 + } 278 + SubscribeReposMessage::Sync(sync) => { 279 + debug!("processing buffered sync for {did}"); 280 + 281 + match ops::verify_sync_event(sync.blocks.as_ref(), get_key()?) { 282 + Ok((root, rev)) => { 283 + if let Some(current_data) = &repo_state.data { 284 + if current_data == &root { 285 + debug!("skipping noop sync for {did}"); 286 + return Ok(ProcessResult::Ok); 287 + } 288 + } 289 + 290 + if let Some(current_rev) = &repo_state.rev { 291 + if rev.as_str() <= current_rev.as_str() { 292 + debug!("skipping replayed sync for {did}"); 293 + return Ok(ProcessResult::Ok); 294 + } 295 + } 296 + 297 + warn!("sync event for {did}: triggering backfill"); 298 + let mut batch = state.db.inner.batch(); 299 + ops::update_repo_status( 300 + &mut batch, 301 + &state.db, 302 + did, 303 + repo_state, 304 + RepoStatus::Backfilling, 305 + )?; 306 + batch.commit().into_diagnostic()?; 307 + 308 + send_backfill_req(state, did.clone().into_static())?; 309 + return Ok(ProcessResult::Ok); 310 + } 311 + Err(e) => { 312 + error!("failed to process sync event for {did}: {e}"); 313 + } 314 + } 205 315 } 206 316 SubscribeReposMessage::Identity(identity) => { 207 317 debug!("processing buffered identity for {did}");
+26
src/ops.rs
··· 7 7 }; 8 8 use fjall::OwnedWriteBatch; 9 9 use jacquard::CowStr; 10 + use jacquard::IntoStatic; 10 11 use jacquard::cowstr::ToCowStr; 11 12 use jacquard::types::cid::Cid; 12 13 use jacquard_api::com_atproto::sync::subscribe_repos::Commit; ··· 142 143 batch.insert(&db.repos, &key, ser_repo_state(&repo_state)?); 143 144 144 145 Ok(repo_state) 146 + } 147 + pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> { 148 + let parsed = tokio::task::block_in_place(|| { 149 + tokio::runtime::Handle::current() 150 + .block_on(parse_car_bytes(blocks)) 151 + .into_diagnostic() 152 + })?; 153 + 154 + let root_bytes = parsed 155 + .blocks 156 + .get(&parsed.root) 157 + .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 158 + 159 + let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 160 + 161 + if let Some(key) = key { 162 + repo_commit 163 + .verify(key) 164 + .map_err(|e| miette::miette!("signature verification failed: {e}"))?; 165 + } 166 + 167 + Ok(( 168 + Cid::ipld(repo_commit.data).into_static(), 169 + repo_commit.rev.to_string(), 170 + )) 145 171 } 146 172 147 173 pub fn apply_commit<'batch, 'db>(
+17 -6
src/resolver.rs
··· 1 + use std::fmt::Display; 1 2 use std::ops::Not; 2 3 use std::sync::Arc; 3 4 use std::time::Duration; ··· 78 79 Ok((pds, handle)) 79 80 } 80 81 81 - pub async fn resolve_signing_key(&self, did: &Did<'_>) -> Result<PublicKey<'static>> { 82 - let did_static = did.clone().into_static(); 83 - 84 - if let Some(entry) = self.inner.key_cache.get_async(&did_static).await { 82 + pub async fn resolve_signing_key(&self, did: &Did<'static>) -> Result<PublicKey<'static>> { 83 + if let Some(entry) = self.inner.key_cache.get_async(did).await { 85 84 return Ok(entry.get().clone()); 86 85 } 87 86 ··· 96 95 let key = doc 97 96 .atproto_public_key() 98 97 .into_diagnostic()? 99 - .ok_or_else(|| miette::miette!("no atproto signing key in DID doc for {did}"))?; 98 + .ok_or_else(|| NoSigningKeyError(did.clone())) 99 + .into_diagnostic()?; 100 100 101 101 let _ = self 102 102 .inner 103 103 .key_cache 104 - .put_async(did_static, key.clone()) 104 + .put_async(did.clone(), key.clone()) 105 105 .await; 106 106 107 107 Ok(key) 108 108 } 109 109 } 110 + 111 + #[derive(Debug)] 112 + pub struct NoSigningKeyError(Did<'static>); 113 + 114 + impl Display for NoSigningKeyError { 115 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 116 + write!(f, "no atproto signing key in DID doc for {}", self.0) 117 + } 118 + } 119 + 120 + impl std::error::Error for NoSigningKeyError {}