very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest] refactor worker, use time from messages to prevent duplicate events

dawn 1aa36eda 117317c4

+440 -331
+1
CLAUDE.md
··· 1 + AGENTS.md
+7 -2
src/api/repos.rs
··· 8 8 response::{IntoResponse, Response}, 9 9 routing::{delete, get, put}, 10 10 }; 11 + use chrono::{DateTime, Utc}; 11 12 use jacquard_common::{IntoStatic, types::did::Did}; 12 13 use miette::IntoDiagnostic; 13 14 use rand::Rng; ··· 47 48 // this does not have the did:key: prefix 48 49 #[serde(skip_serializing_if = "Option::is_none")] 49 50 pub signing_key: Option<String>, 50 - pub last_updated_at: i64, 51 + #[serde(skip_serializing_if = "Option::is_none")] 52 + pub last_updated_at: Option<DateTime<Utc>>, 53 + #[serde(skip_serializing_if = "Option::is_none")] 54 + pub last_message_at: Option<DateTime<Utc>>, 51 55 } 52 56 53 57 #[derive(Deserialize)] ··· 352 356 handle: s.handle.map(|h| h.to_string()), 353 357 pds: s.pds.map(|p| p.to_string()), 354 358 signing_key: s.signing_key.map(|k| k.encode()), 355 - last_updated_at: s.last_updated_at, 359 + last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at), 360 + last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs), 356 361 } 357 362 } 358 363
+1 -1
src/backfill/mod.rs
··· 722 722 state.tracked = true; 723 723 state.rev = Some((&rev).into()); 724 724 state.data = Some(root_commit.data); 725 - state.last_updated_at = chrono::Utc::now().timestamp(); 725 + state.touch(); 726 726 727 727 batch.batch_mut().insert( 728 728 &app_state.db.repos,
+405 -322
src/ingest/worker.rs
··· 1 1 use crate::db::{self, keys}; 2 2 use crate::filter::FilterMode; 3 - use crate::ingest::stream::{Commit, SubscribeReposMessage}; 3 + use crate::ingest::stream::{Account, Commit, Identity, SubscribeReposMessage, Sync}; 4 4 use crate::ingest::{BufferRx, IngestMessage}; 5 5 use crate::ops; 6 6 use crate::resolver::{NoSigningKeyError, ResolverError}; ··· 48 48 } 49 49 } 50 50 51 + // gate returned by check_repo_state, tells the shard loop what to do with the message 52 + enum ProcessGate<'s, 'c> { 53 + // did not exist in db, newly queued for backfill, drop 54 + NewRepo, 55 + // explicitly untracked, backfilling, or in error, drop 56 + Drop, 57 + // inactive repo receiving a non-account message, buffer the commit if present, drop otherwise 58 + Buffer(Option<&'c Commit<'c>>), 59 + // ready to process with the latest state 60 + Ready(RepoState<'s>), 61 + } 62 + 63 + // result returned by a message handler after the gate has been resolved 51 64 #[derive(Debug)] 52 65 enum RepoProcessResult<'s, 'c> { 66 + // message processed successfully, here is the (possibly updated) state 67 + Ok(RepoState<'s>), 68 + // repo was deleted as part of processing 53 69 Deleted, 54 - Syncing(Option<&'c Commit<'c>>), 55 - Ok { 56 - state: RepoState<'s>, 57 - old_status: RepoStatus, 58 - }, 70 + // needs backfill; carries the triggering commit to buffer (None when already in the buffer) 71 + NeedsBackfill(Option<&'c Commit<'c>>), 59 72 } 60 73 61 74 pub struct FirehoseWorker { ··· 187 200 IngestMessage::BackfillFinished(did) => { 188 201 debug!(did = %did, "backfill finished, verifying state and draining buffer"); 189 202 190 - // load repo state to transition status and draining buffer 191 203 let repo_key = keys::repo_key(&did); 192 204 if let Ok(Some(state_bytes)) = state.db.repos.get(&repo_key).into_diagnostic() { 193 205 match crate::db::deser_repo_state(&state_bytes) { 194 206 Ok(repo_state) => { 195 207 let repo_state = repo_state.into_static(); 196 208 197 - let old_status = repo_state.status.clone(); 198 - match Self::drain_resync_buffer( 199 - &mut ctx, &did, repo_state, old_status, 200 - ) { 201 - Ok(res) => match res { 202 - RepoProcessResult::Ok { state: s, .. } => { 203 - // TODO: there might be a race condition here where we get a new commit 204 - // while the resync buffer is being drained, we should handle that probably 205 - // but also it should still be fine since we'll sync eventually anyway 206 - let res = ops::update_repo_status( 207 - ctx.batch.batch_mut(), 208 - &state.db, 209 - &did, 210 - s, 211 - RepoStatus::Synced, 212 - ); 213 - if let Err(e) = res { 214 - // this can only fail if serde retry fails which would be really weird 215 - error!(did = %did, err = %e, "failed to transition to synced"); 216 - } 209 + match Self::drain_resync_buffer(&mut ctx, &did, repo_state) { 210 + Ok(RepoProcessResult::Ok(s)) => { 211 + // TODO: there might be a race condition here where we get a new commit 212 + // while the resync buffer is being drained, we should handle that probably 213 + // but also it should still be fine since we'll sync eventually anyway 214 + let res = ops::update_repo_status( 215 + ctx.batch.batch_mut(), 216 + &state.db, 217 + &did, 218 + s, 219 + RepoStatus::Synced, 220 + ); 221 + if let Err(e) = res { 222 + // this can only fail if serde retry fails which would be really weird 223 + error!(did = %did, err = %e, "failed to transition to synced"); 217 224 } 218 - // we don't have to handle this since drain_resync_buffer doesn't delete 219 - // the commits from the resync buffer so they will get retried later 220 - RepoProcessResult::Syncing(_) => {} 221 - RepoProcessResult::Deleted => {} 222 - }, 225 + } 226 + // we don't have to handle this since drain_resync_buffer doesn't delete 227 + // the commits from the resync buffer so they will get retried later 228 + Ok(RepoProcessResult::NeedsBackfill(_)) => {} 229 + Ok(RepoProcessResult::Deleted) => {} 223 230 Err(e) => { 224 231 error!(did = %did, err = %e, "failed to drain resync buffer") 225 232 } ··· 238 245 _ => continue, 239 246 }; 240 247 241 - match Self::process_message(&mut ctx, &msg, did) { 242 - Ok(RepoProcessResult::Ok { .. }) => {} 243 - Ok(RepoProcessResult::Deleted) => {} 244 - Ok(RepoProcessResult::Syncing(Some(commit))) => { 245 - if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) { 246 - error!(did = %did, err = %e, "failed to persist commit to resync_buffer"); 248 + let gate = match Self::check_repo_state(&mut ctx, did, &msg) { 249 + Ok(g) => g, 250 + Err(e) => { 251 + if let IngestError::Generic(ref r) = e { 252 + db::check_poisoned_report(r); 247 253 } 254 + error!(did = %did, err = %e, "error in check_repo_state"); 255 + if let Some((_, cursor)) = state.relay_cursors.get(&relay_id) { 256 + cursor.store(seq, std::sync::atomic::Ordering::SeqCst); 257 + } 258 + continue; 248 259 } 249 - Ok(RepoProcessResult::Syncing(None)) => {} 250 - Err(e) => { 251 - if let IngestError::Generic(e) = &e { 252 - db::check_poisoned_report(e); 260 + }; 261 + 262 + match gate { 263 + ProcessGate::NewRepo | ProcessGate::Drop => {} 264 + ProcessGate::Buffer(commit) => { 265 + if let Some(commit) = commit { 266 + if let Err(e) = 267 + ops::persist_to_resync_buffer(&state.db, did, commit) 268 + { 269 + error!( 270 + did = %did, err = %e, 271 + "failed to persist commit to resync_buffer" 272 + ); 273 + } 253 274 } 254 - error!(did = %did, err = %e, "error processing message"); 255 - if Self::check_if_retriable_failure(&e) { 256 - if let SubscribeReposMessage::Commit(commit) = &msg { 275 + } 276 + ProcessGate::Ready(mut repo_state) => { 277 + let pre_status = repo_state.status.clone(); 278 + 279 + // if it was in deactivated/takendown/suspended state, we can mark it 280 + // as synced because we are receiving an active=true account event now. 281 + // we do this before dispatching so handle_account sees pre_status correctly 282 + if matches!( 283 + pre_status, 284 + RepoStatus::Deactivated 285 + | RepoStatus::Suspended 286 + | RepoStatus::Takendown 287 + ) { 288 + if let SubscribeReposMessage::Account(acc) = &msg { 289 + if acc.active { 290 + match ops::update_repo_status( 291 + ctx.batch.batch_mut(), 292 + &ctx.state.db, 293 + did, 294 + repo_state, 295 + RepoStatus::Synced, 296 + ) { 297 + Ok(rs) => { 298 + repo_state = rs; 299 + ctx.state.db.update_gauge_diff( 300 + &GaugeState::Resync(None), 301 + &GaugeState::Synced, 302 + ); 303 + } 304 + Err(e) => { 305 + error!( 306 + did = %did, err = %e, 307 + "failed to transition inactive repo to synced" 308 + ); 309 + if let Some((_, cursor)) = 310 + state.relay_cursors.get(&relay_id) 311 + { 312 + cursor.store( 313 + seq, 314 + std::sync::atomic::Ordering::SeqCst, 315 + ); 316 + } 317 + continue; 318 + } 319 + } 320 + } 321 + } 322 + } 323 + 324 + match Self::process_message(&mut ctx, &msg, did, repo_state, pre_status) 325 + { 326 + Ok(RepoProcessResult::Ok(_)) => {} 327 + Ok(RepoProcessResult::Deleted) => {} 328 + Ok(RepoProcessResult::NeedsBackfill(Some(commit))) => { 257 329 if let Err(e) = 258 330 ops::persist_to_resync_buffer(&state.db, did, commit) 259 331 { ··· 261 333 did = %did, err = %e, 262 334 "failed to persist commit to resync_buffer" 263 335 ); 336 + } 337 + } 338 + Ok(RepoProcessResult::NeedsBackfill(None)) => {} 339 + Err(e) => { 340 + if let IngestError::Generic(ref r) = e { 341 + db::check_poisoned_report(r); 342 + } 343 + error!(did = %did, err = %e, "error processing message"); 344 + if Self::check_if_retriable_failure(&e) { 345 + if let SubscribeReposMessage::Commit(commit) = &msg { 346 + if let Err(e) = ops::persist_to_resync_buffer( 347 + &state.db, did, commit, 348 + ) { 349 + error!( 350 + did = %did, err = %e, 351 + "failed to persist commit to resync_buffer" 352 + ); 353 + } 354 + } 264 355 } 265 356 } 266 357 } ··· 291 382 } 292 383 } 293 384 294 - // dont retry commit or sync on key fetch errors 385 + // don't retry commit or sync on key fetch errors 295 386 // since we'll just try again later if we get commit or sync again 296 387 fn check_if_retriable_failure(e: &IngestError) -> bool { 297 388 matches!( ··· 306 397 ctx: &mut WorkerContext, 307 398 msg: &'c SubscribeReposMessage<'static>, 308 399 did: &Did, 400 + repo_state: RepoState<'s>, 401 + pre_status: RepoStatus, 309 402 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 310 - let check_repo_res = Self::check_repo_state(ctx, did, msg)?; 311 - let (mut repo_state, old_status) = match check_repo_res { 312 - RepoProcessResult::Syncing(_) | RepoProcessResult::Deleted => { 313 - return Ok(check_repo_res); 314 - } 315 - RepoProcessResult::Ok { state, old_status } => (state, old_status), 316 - }; 317 - 318 403 match msg { 319 404 SubscribeReposMessage::Commit(commit) => { 320 - trace!(did = %did, "processing buffered commit"); 321 - 322 - let res = Self::process_commit(ctx, did, repo_state, commit)?; 323 - return match res { 324 - RepoProcessResult::Ok { state, .. } => { 325 - Ok(RepoProcessResult::Ok { state, old_status }) 326 - } 327 - other => Ok(other), 328 - }; 405 + trace!(did = %did, "processing commit"); 406 + Self::handle_commit(ctx, did, repo_state, commit) 329 407 } 330 408 SubscribeReposMessage::Sync(sync) => { 331 - debug!(did = %did, "processing buffered sync"); 332 - 333 - Self::refresh_doc(ctx, &mut repo_state, did)?; 334 - 335 - match ops::verify_sync_event( 336 - sync.blocks.as_ref(), 337 - Self::fetch_key(ctx, did)?.as_ref(), 338 - ) { 339 - Ok((root, rev)) => { 340 - if let Some(current_data) = &repo_state.data { 341 - if current_data == &root.to_ipld().expect("valid cid") { 342 - debug!(did = %did, "skipping noop sync"); 343 - return Ok(RepoProcessResult::Ok { 344 - state: repo_state, 345 - old_status, 346 - }); 347 - } 348 - } 349 - 350 - if let Some(current_rev) = &repo_state.rev { 351 - if rev.as_str() <= current_rev.to_tid().as_str() { 352 - debug!(did = %did, "skipping replayed sync"); 353 - return Ok(RepoProcessResult::Ok { 354 - state: repo_state, 355 - old_status, 356 - }); 357 - } 358 - } 359 - 360 - warn!(did = %did, "sync event, triggering backfill"); 361 - let mut batch = ctx.state.db.inner.batch(); 362 - repo_state = ops::update_repo_status( 363 - &mut batch, 364 - &ctx.state.db, 365 - did, 366 - repo_state, 367 - RepoStatus::Backfilling, 368 - )?; 369 - batch.commit().into_diagnostic()?; 370 - ctx.state 371 - .db 372 - .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 373 - ctx.state.notify_backfill(); 374 - return Ok(RepoProcessResult::Ok { 375 - state: repo_state, 376 - old_status, 377 - }); 378 - } 379 - Err(e) => { 380 - error!(did = %did, err = %e, "failed to process sync event"); 381 - } 382 - } 409 + debug!(did = %did, "processing sync"); 410 + Self::handle_sync(ctx, did, repo_state, sync) 383 411 } 384 412 SubscribeReposMessage::Identity(identity) => { 385 - debug!(did = %did, "processing buffered identity"); 386 - 387 - let changed = if identity.handle.is_none() { 388 - // we invalidate only if no handle is sent since its like a 389 - // "invalidate your caches" message then basically 390 - ctx.state.resolver.invalidate_sync(did); 391 - let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?; 392 - repo_state.update_from_doc(doc) 393 - } else { 394 - let old_handle = repo_state.handle.clone(); 395 - repo_state.handle = identity.handle.clone().or(repo_state.handle); 396 - repo_state.handle != old_handle 397 - }; 398 - 399 - ctx.batch.batch_mut().insert( 400 - &ctx.state.db.repos, 401 - keys::repo_key(did), 402 - crate::db::ser_repo_state(&repo_state)?, 403 - ); 404 - 405 - if changed { 406 - let evt = IdentityEvt { 407 - did: did.clone().into_static(), 408 - handle: repo_state.handle.clone(), 409 - }; 410 - ctx.broadcast_events 411 - .push(ops::make_identity_event(&ctx.state.db, evt)); 412 - } 413 + debug!(did = %did, "processing identity"); 414 + Self::handle_identity(ctx, did, repo_state, identity) 413 415 } 414 416 SubscribeReposMessage::Account(account) => { 415 - debug!(did = %did, "processing buffered account"); 416 - let was_inactive = matches!( 417 - old_status, 418 - RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 419 - ); 420 - let is_inactive = !account.active; 421 - let evt = AccountEvt { 422 - did: did.clone().into_static(), 423 - active: account.active, 424 - status: account.status.as_ref().map(|s| s.to_cowstr().into_static()), 425 - }; 426 - 427 - Self::refresh_doc(ctx, &mut repo_state, did)?; 428 - 429 - if !account.active { 430 - use crate::ingest::stream::AccountStatus; 431 - match &account.status { 432 - Some(AccountStatus::Deleted) => { 433 - debug!(did = %did, "account deleted, wiping data"); 434 - crate::ops::delete_repo( 435 - &mut ctx.batch, 436 - &ctx.state.db, 437 - did, 438 - &repo_state, 439 - )?; 440 - return Ok(RepoProcessResult::Deleted); 441 - } 442 - status => { 443 - let target_status = match status { 444 - Some(status) => match status { 445 - AccountStatus::Deleted => { 446 - unreachable!("deleted account status is handled before") 447 - } 448 - AccountStatus::Takendown => RepoStatus::Takendown, 449 - AccountStatus::Suspended => RepoStatus::Suspended, 450 - AccountStatus::Deactivated => RepoStatus::Deactivated, 451 - AccountStatus::Throttled => { 452 - RepoStatus::Error("throttled".into()) 453 - } 454 - AccountStatus::Desynchronized => { 455 - RepoStatus::Error("desynchronized".into()) 456 - } 457 - AccountStatus::Other(s) => { 458 - warn!( 459 - did = %did, status = %s, 460 - "unknown account status, will put in error state" 461 - ); 462 - RepoStatus::Error(s.to_smolstr()) 463 - } 464 - }, 465 - None => { 466 - warn!(did = %did, "account inactive but no status provided"); 467 - RepoStatus::Error("unknown".into()) 468 - } 469 - }; 470 - 471 - if repo_state.status == target_status { 472 - debug!(did = %did, ?target_status, "account status unchanged"); 473 - return Ok(RepoProcessResult::Ok { 474 - state: repo_state, 475 - old_status, 476 - }); 477 - } 478 - 479 - repo_state = ops::update_repo_status( 480 - ctx.batch.batch_mut(), 481 - &ctx.state.db, 482 - did, 483 - repo_state, 484 - target_status, 485 - )?; 486 - ctx.state 487 - .db 488 - .update_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None)); 489 - } 490 - } 491 - } else { 492 - // normally we would initiate backfill here 493 - // but we don't have to do anything because: 494 - // 1. we handle changing repo status to Synced before this (in check repo state) 495 - // 2. initiating backfilling is also handled there 496 - } 497 - 498 - if was_inactive != is_inactive || repo_state.status != old_status { 499 - ctx.broadcast_events 500 - .push(ops::make_account_event(&ctx.state.db, evt)); 501 - } 417 + debug!(did = %did, "processing account"); 418 + Self::handle_account(ctx, did, repo_state, pre_status, account) 502 419 } 503 420 _ => { 504 421 warn!(did = %did, "unknown message type in buffer"); 422 + Ok(RepoProcessResult::Ok(repo_state)) 505 423 } 506 424 } 507 - 508 - Ok(RepoProcessResult::Ok { 509 - state: repo_state, 510 - old_status, 511 - }) 512 425 } 513 426 514 - fn process_commit<'c, 'ns, 's: 'ns>( 427 + fn handle_commit<'s, 'c>( 515 428 ctx: &mut WorkerContext, 516 429 did: &Did, 517 - repo_state: RepoState<'s>, 430 + mut repo_state: RepoState<'s>, 518 431 commit: &'c Commit<'c>, 519 - ) -> Result<RepoProcessResult<'ns, 'c>, IngestError> { 520 - // check for replayed events (already seen revision) 432 + ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 433 + repo_state.advance_message_time(commit.time.0.timestamp_millis()); 434 + 435 + // skip replayed events (already seen revision) 521 436 if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) { 522 437 debug!( 523 438 did = %did, ··· 525 440 state_rev = %repo_state.rev.as_ref().map(|r| r.to_tid()).expect("we checked in if"), 526 441 "skipping replayed event" 527 442 ); 528 - let old_status = repo_state.status.clone(); 529 - return Ok(RepoProcessResult::Ok { 530 - state: repo_state, 531 - old_status, 532 - }); 443 + return Ok(RepoProcessResult::Ok(repo_state)); 533 444 } 534 445 535 446 if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data) ··· 556 467 RepoStatus::Backfilling, 557 468 )?; 558 469 batch.commit().into_diagnostic()?; 559 - ctx.state.db.update_gauge_diff( 560 - &crate::types::GaugeState::Synced, 561 - &crate::types::GaugeState::Pending, 562 - ); 470 + ctx.state 471 + .db 472 + .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 563 473 ctx.state.notify_backfill(); 564 - return Ok(RepoProcessResult::Syncing(Some(commit))); 474 + return Ok(RepoProcessResult::NeedsBackfill(Some(commit))); 565 475 } 566 476 567 477 let signing_key = Self::fetch_key(ctx, did)?; ··· 569 479 &mut ctx.batch, 570 480 &ctx.state.db, 571 481 repo_state, 572 - &commit, 482 + commit, 573 483 signing_key.as_ref(), 574 484 &ctx.state.filter.load(), 575 485 ctx.ephemeral, ··· 585 495 - 1, 586 496 )); 587 497 588 - let old_status = repo_state.status.clone(); 589 - Ok(RepoProcessResult::Ok { 590 - state: repo_state, 591 - old_status, 592 - }) 498 + Ok(RepoProcessResult::Ok(repo_state)) 499 + } 500 + 501 + fn handle_sync<'s, 'c>( 502 + ctx: &mut WorkerContext, 503 + did: &Did, 504 + mut repo_state: RepoState<'s>, 505 + sync: &'c Sync<'c>, 506 + ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 507 + repo_state.advance_message_time(sync.time.0.timestamp_millis()); 508 + 509 + Self::refresh_doc(ctx, &mut repo_state, did)?; 510 + 511 + match ops::verify_sync_event(sync.blocks.as_ref(), Self::fetch_key(ctx, did)?.as_ref()) { 512 + Ok((root, rev)) => { 513 + if let Some(current_data) = &repo_state.data { 514 + if current_data == &root.to_ipld().expect("valid cid") { 515 + debug!(did = %did, "skipping noop sync"); 516 + return Ok(RepoProcessResult::Ok(repo_state)); 517 + } 518 + } 519 + 520 + if let Some(current_rev) = &repo_state.rev { 521 + if rev.as_str() <= current_rev.to_tid().as_str() { 522 + debug!(did = %did, "skipping replayed sync"); 523 + return Ok(RepoProcessResult::Ok(repo_state)); 524 + } 525 + } 526 + 527 + warn!(did = %did, "sync event, triggering backfill"); 528 + let mut batch = ctx.state.db.inner.batch(); 529 + repo_state = ops::update_repo_status( 530 + &mut batch, 531 + &ctx.state.db, 532 + did, 533 + repo_state, 534 + RepoStatus::Backfilling, 535 + )?; 536 + batch.commit().into_diagnostic()?; 537 + ctx.state 538 + .db 539 + .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 540 + ctx.state.notify_backfill(); 541 + Ok(RepoProcessResult::Ok(repo_state)) 542 + } 543 + Err(e) => { 544 + error!(did = %did, err = %e, "failed to process sync event"); 545 + Ok(RepoProcessResult::Ok(repo_state)) 546 + } 547 + } 548 + } 549 + 550 + fn handle_identity<'s>( 551 + ctx: &mut WorkerContext, 552 + did: &Did, 553 + mut repo_state: RepoState<'s>, 554 + identity: &Identity<'_>, 555 + ) -> Result<RepoProcessResult<'s, 'static>, IngestError> { 556 + let event_ms = identity.time.0.timestamp_millis(); 557 + if repo_state.last_message_time.is_some_and(|t| event_ms <= t) { 558 + debug!(did = %did, "skipping stale/duplicate identity event"); 559 + return Ok(RepoProcessResult::Ok(repo_state)); 560 + } 561 + repo_state.advance_message_time(event_ms); 562 + 563 + let changed = if identity.handle.is_none() { 564 + // no handle sent is basically "invalidate your caches" 565 + ctx.state.resolver.invalidate_sync(did); 566 + let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?; 567 + repo_state.update_from_doc(doc) 568 + } else { 569 + let old_handle = repo_state.handle.clone(); 570 + repo_state.handle = identity 571 + .handle 572 + .clone() 573 + .map(IntoStatic::into_static) 574 + .or(repo_state.handle); 575 + repo_state.handle != old_handle 576 + }; 577 + 578 + repo_state.touch(); 579 + ctx.batch.batch_mut().insert( 580 + &ctx.state.db.repos, 581 + keys::repo_key(did), 582 + crate::db::ser_repo_state(&repo_state)?, 583 + ); 584 + 585 + if changed { 586 + let evt = IdentityEvt { 587 + did: did.clone().into_static(), 588 + handle: repo_state.handle.clone().map(IntoStatic::into_static), 589 + }; 590 + ctx.broadcast_events 591 + .push(ops::make_identity_event(&ctx.state.db, evt)); 592 + } 593 + 594 + Ok(RepoProcessResult::Ok(repo_state)) 595 + } 596 + 597 + fn handle_account<'s, 'c>( 598 + ctx: &mut WorkerContext, 599 + did: &Did, 600 + mut repo_state: RepoState<'s>, 601 + pre_status: RepoStatus, 602 + account: &'c Account<'c>, 603 + ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 604 + let event_ms = account.time.0.timestamp_millis(); 605 + if repo_state.last_message_time.is_some_and(|t| event_ms <= t) { 606 + debug!(did = %did, "skipping stale/duplicate account event"); 607 + return Ok(RepoProcessResult::Ok(repo_state)); 608 + } 609 + repo_state.advance_message_time(event_ms); 610 + 611 + // get active before we do any mutations 612 + let was_inactive = matches!( 613 + pre_status, 614 + RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 615 + ); 616 + let is_inactive = !account.active; 617 + let evt = AccountEvt { 618 + did: did.clone().into_static(), 619 + active: account.active, 620 + status: account.status.as_ref().map(|s| s.to_cowstr().into_static()), 621 + }; 622 + 623 + Self::refresh_doc(ctx, &mut repo_state, did)?; 624 + 625 + if !account.active { 626 + use crate::ingest::stream::AccountStatus; 627 + match &account.status { 628 + Some(AccountStatus::Deleted) => { 629 + debug!(did = %did, "account deleted, wiping data"); 630 + crate::ops::delete_repo(&mut ctx.batch, &ctx.state.db, did, &repo_state)?; 631 + return Ok(RepoProcessResult::Deleted); 632 + } 633 + status => { 634 + let target_status = match status { 635 + Some(status) => match status { 636 + AccountStatus::Deleted => { 637 + unreachable!("deleted account status is handled before") 638 + } 639 + AccountStatus::Takendown => RepoStatus::Takendown, 640 + AccountStatus::Suspended => RepoStatus::Suspended, 641 + AccountStatus::Deactivated => RepoStatus::Deactivated, 642 + AccountStatus::Throttled => RepoStatus::Error("throttled".into()), 643 + AccountStatus::Desynchronized => { 644 + RepoStatus::Error("desynchronized".into()) 645 + } 646 + AccountStatus::Other(s) => { 647 + warn!( 648 + did = %did, status = %s, 649 + "unknown account status, will put in error state" 650 + ); 651 + RepoStatus::Error(s.to_smolstr()) 652 + } 653 + }, 654 + None => { 655 + warn!(did = %did, "account inactive but no status provided"); 656 + RepoStatus::Error("unknown".into()) 657 + } 658 + }; 659 + 660 + if repo_state.status == target_status { 661 + debug!(did = %did, ?target_status, "account status unchanged"); 662 + ctx.batch.batch_mut().insert( 663 + &ctx.state.db.repos, 664 + keys::repo_key(did), 665 + crate::db::ser_repo_state(&repo_state)?, 666 + ); 667 + return Ok(RepoProcessResult::Ok(repo_state)); 668 + } 669 + 670 + repo_state = ops::update_repo_status( 671 + ctx.batch.batch_mut(), 672 + &ctx.state.db, 673 + did, 674 + repo_state, 675 + target_status, 676 + )?; 677 + ctx.state 678 + .db 679 + .update_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None)); 680 + } 681 + } 682 + } else { 683 + // active=true: transition to synced is handled in the shard dispatch before calling this 684 + } 685 + 686 + if was_inactive != is_inactive || repo_state.status != pre_status { 687 + ctx.broadcast_events 688 + .push(ops::make_account_event(&ctx.state.db, evt)); 689 + } 690 + 691 + // persist last_message_time for paths that don't go through update_repo_status 692 + // (active=true and already synced). harmless double-write for the status-changed path 693 + ctx.batch.batch_mut().insert( 694 + &ctx.state.db.repos, 695 + keys::repo_key(did), 696 + crate::db::ser_repo_state(&repo_state)?, 697 + ); 698 + 699 + Ok(RepoProcessResult::Ok(repo_state)) 593 700 } 594 701 595 - // checks the current state of the repo in the database 702 + // checks the current state of the repo in the database and returns a gate 703 + // indicating what the shard loop should do with the message. 596 704 // if the repo is new, creates initial state and triggers backfill 597 - // handles transitions between states (backfilling -> synced, etc) 705 + // for synced repos with buffered commits, drains the buffer first 706 + // so events are applied in order 598 707 fn check_repo_state<'s, 'c>( 599 708 ctx: &mut WorkerContext, 600 709 did: &Did<'_>, 601 710 msg: &'c SubscribeReposMessage<'static>, 602 - ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 711 + ) -> Result<ProcessGate<'s, 'c>, IngestError> { 603 712 let repo_key = keys::repo_key(&did); 604 713 let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else { 605 714 let filter = ctx.state.filter.load(); ··· 607 716 if filter.mode == FilterMode::Filter && !filter.signals.is_empty() { 608 717 let commit = match msg { 609 718 SubscribeReposMessage::Commit(c) => c, 610 - _ => return Ok(RepoProcessResult::Syncing(None)), 719 + _ => return Ok(ProcessGate::NewRepo), 611 720 }; 612 721 let touches_signal = commit.ops.iter().any(|op| { 613 722 op.path ··· 624 733 }); 625 734 if !touches_signal { 626 735 trace!(did = %did, "dropping commit, no signal-matching ops"); 627 - return Ok(RepoProcessResult::Syncing(None)); 736 + return Ok(ProcessGate::NewRepo); 628 737 } 629 738 } 630 739 ··· 645 754 batch.commit().into_diagnostic()?; 646 755 647 756 ctx.state.db.update_count("repos", 1); 648 - ctx.state.db.update_gauge_diff( 649 - &crate::types::GaugeState::Synced, 650 - &crate::types::GaugeState::Pending, 651 - ); 757 + ctx.state 758 + .db 759 + .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 652 760 653 761 ctx.state.notify_backfill(); 654 762 655 - return Ok(RepoProcessResult::Syncing(None)); 763 + return Ok(ProcessGate::NewRepo); 656 764 }; 657 - let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static(); 658 - let old_status = repo_state.status.clone(); 765 + 766 + let repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static(); 659 767 660 768 if !repo_state.tracked && repo_state.status != RepoStatus::Backfilling { 661 - trace!(did = %did, "ignoring active status as it is explicitly untracked"); 662 - return Ok(RepoProcessResult::Syncing(None)); 769 + trace!(did = %did, "ignoring message, repo is explicitly untracked"); 770 + return Ok(ProcessGate::Drop); 663 771 } 664 772 665 - // if we are backfilling or it is new, DON'T mark it as synced yet 666 - // the backfill worker will do that when it finishes 667 773 match &repo_state.status { 668 774 RepoStatus::Synced => { 669 - // lazy drain: if there are buffered commits, drain them now 775 + // lazy drain: if there are buffered commits, drain them now before 776 + // applying the live message so events are applied in order 670 777 if ops::has_buffered_commits(&ctx.state.db, did) { 671 - Self::drain_resync_buffer(ctx, did, repo_state, old_status) 672 - } else { 673 - Ok(RepoProcessResult::Ok { 674 - state: repo_state, 675 - old_status, 676 - }) 778 + return match Self::drain_resync_buffer(ctx, did, repo_state)? { 779 + RepoProcessResult::Ok(rs) => Ok(ProcessGate::Ready(rs)), 780 + // gap triggered during drain, so drop the live message 781 + RepoProcessResult::NeedsBackfill(_) => Ok(ProcessGate::Drop), 782 + RepoProcessResult::Deleted => Ok(ProcessGate::Drop), 783 + }; 677 784 } 785 + Ok(ProcessGate::Ready(repo_state)) 678 786 } 679 787 RepoStatus::Backfilling | RepoStatus::Error(_) => { 680 788 debug!( 681 789 did = %did, status = ?repo_state.status, 682 - "ignoring active status" 790 + "ignoring message, repo is backfilling or in error state" 683 791 ); 684 - Ok(RepoProcessResult::Syncing(None)) 792 + Ok(ProcessGate::Drop) 685 793 } 686 794 RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => { 687 - // if it was in deactivated/takendown/suspended state, we can mark it as synced 688 - // because we are receiving live events now 689 - // UNLESS it is an account status event that keeps it deactivated 690 - if let SubscribeReposMessage::Account(acc) = msg { 691 - if !acc.active { 692 - return Ok(RepoProcessResult::Ok { 693 - state: repo_state, 694 - old_status, 695 - }); 696 - } 697 - } else { 698 - // buffer commits and drop everything else until we get an active=true message 699 - return match msg { 700 - SubscribeReposMessage::Commit(commit) => { 701 - Ok(RepoProcessResult::Syncing(Some(commit))) 702 - } 703 - _ => Ok(RepoProcessResult::Syncing(None)), 704 - }; 795 + // account events always pass through because the 796 + // shard dispatch handles the active=true transition 797 + if let SubscribeReposMessage::Account(_) = msg { 798 + return Ok(ProcessGate::Ready(repo_state)); 705 799 } 706 - repo_state = ops::update_repo_status( 707 - ctx.batch.batch_mut(), 708 - &ctx.state.db, 709 - did, 710 - repo_state, 711 - RepoStatus::Synced, 712 - )?; 713 - ctx.state.db.update_gauge_diff( 714 - &crate::types::GaugeState::Resync(None), 715 - &crate::types::GaugeState::Synced, 716 - ); 717 - Ok(RepoProcessResult::Ok { 718 - state: repo_state, 719 - old_status, 720 - }) 800 + // buffer commits and drop everything else until we get an active=true message 801 + let commit = match msg { 802 + SubscribeReposMessage::Commit(c) => Some(c.as_ref()), 803 + _ => None, 804 + }; 805 + Ok(ProcessGate::Buffer(commit)) 721 806 } 722 807 } 723 808 } ··· 726 811 ctx: &mut WorkerContext, 727 812 did: &Did, 728 813 mut repo_state: RepoState<'s>, 729 - old_status: RepoStatus, 730 814 ) -> Result<RepoProcessResult<'s, 'static>, IngestError> { 731 815 let prefix = keys::resync_buffer_prefix(did); 732 816 ··· 734 818 let (key, value) = guard.into_inner().into_diagnostic()?; 735 819 let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?; 736 820 737 - let res = Self::process_commit(ctx, did, repo_state, &commit); 821 + let res = Self::handle_commit(ctx, did, repo_state, &commit); 738 822 let res = match res { 739 823 Ok(r) => r, 740 824 Err(e) => { ··· 747 831 } 748 832 }; 749 833 match res { 750 - RepoProcessResult::Ok { state: rs, .. } => { 834 + RepoProcessResult::Ok(rs) => { 751 835 ctx.batch 752 836 .batch_mut() 753 837 .remove(&ctx.state.db.resync_buffer, key); 754 838 repo_state = rs; 755 839 } 756 - RepoProcessResult::Syncing(_) => { 757 - return Ok(RepoProcessResult::Syncing(None)); 840 + RepoProcessResult::NeedsBackfill(_) => { 841 + // commit is already in the buffer, leave it there for the next backfill 842 + return Ok(RepoProcessResult::NeedsBackfill(None)); 758 843 } 759 844 RepoProcessResult::Deleted => { 760 845 ctx.batch ··· 765 850 } 766 851 } 767 852 768 - Ok(RepoProcessResult::Ok { 769 - state: repo_state, 770 - old_status, 771 - }) 853 + Ok(RepoProcessResult::Ok(repo_state)) 772 854 } 773 855 774 856 // refreshes the handle, pds url and signing key of a did ··· 780 862 ctx.state.resolver.invalidate_sync(did); 781 863 let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?; 782 864 repo_state.update_from_doc(doc); 865 + repo_state.touch(); 783 866 ctx.batch.batch_mut().insert( 784 867 &ctx.state.db.repos, 785 868 keys::repo_key(did),
+2 -2
src/ops.rs
··· 183 183 } 184 184 185 185 repo_state.status = new_status; 186 - repo_state.last_updated_at = chrono::Utc::now().timestamp(); 186 + repo_state.touch(); 187 187 188 188 batch.insert(&db.repos, &repo_key, ser_repo_state(&repo_state)?); 189 189 ··· 260 260 261 261 repo_state.rev = Some((&commit.rev).into()); 262 262 repo_state.data = Some(repo_commit.data); 263 - repo_state.last_updated_at = chrono::Utc::now().timestamp(); 263 + repo_state.touch(); 264 264 265 265 batch 266 266 .batch_mut()
+20
src/types.rs
··· 39 39 pub status: RepoStatus, 40 40 pub rev: Option<DbTid>, 41 41 pub data: Option<IpldCid>, 42 + // todo: is this actually valid? the spec says this is informal and intermadiate 43 + // services may change it. we should probably document it. if we cant use this 44 + // then how do we dedup account / identity ops? 45 + /// ms since epoch of the last firehose message we processed for this repo. 46 + /// used to deduplicate identity / account events that can arrive from multiple relays at 47 + /// different wall-clock times but represent the same underlying PDS event. 48 + #[serde(default)] 49 + pub last_message_time: Option<i64>, 42 50 /// this is when we *ingested* any last updates 43 51 pub last_updated_at: i64, // unix timestamp 44 52 /// whether we are ingesting events for this repo ··· 65 73 handle: None, 66 74 pds: None, 67 75 signing_key: None, 76 + last_message_time: None, 68 77 } 69 78 } 70 79 ··· 76 85 } 77 86 } 78 87 88 + // advances the high-water mark to event_ms if it's newer than what we've seen 89 + pub fn advance_message_time(&mut self, event_ms: i64) { 90 + self.last_message_time = Some(event_ms.max(self.last_message_time.unwrap_or(0))); 91 + } 92 + 93 + // updates last_updated_at to now 94 + pub fn touch(&mut self) { 95 + self.last_updated_at = chrono::Utc::now().timestamp(); 96 + } 97 + 79 98 pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool { 80 99 let new_signing_key = doc.key.map(From::from); 81 100 let changed = self.pds.as_deref() != Some(doc.pds.as_str()) ··· 102 121 handle: self.handle.map(IntoStatic::into_static), 103 122 pds: self.pds.map(IntoStatic::into_static), 104 123 signing_key: self.signing_key.map(IntoStatic::into_static), 124 + last_message_time: self.last_message_time, 105 125 } 106 126 } 107 127 }
+4 -4
tests/common.nu
··· 60 60 export def start-hydrant [binary: string, db_path: string, port: int] { 61 61 let log_file = $"($db_path)/hydrant.log" 62 62 print $"starting hydrant - logs at ($log_file)..." 63 - 63 + 64 64 let hydrant_vars = ($env | transpose k v | where k =~ "HYDRANT_" | reduce -f {} { |it, acc| $acc | upsert $it.k $it.v }) 65 65 let env_vars = { 66 66 HYDRANT_DATABASE_PATH: ($db_path), ··· 68 68 HYDRANT_API_PORT: ($port | into string), 69 69 HYDRANT_ENABLE_DEBUG: "true", 70 70 HYDRANT_DEBUG_PORT: ($port + 1 | into string), 71 - HYDRANT_LOG_LEVEL: "debug" 71 + RUST_LOG: "debug,hyper=error,tokio=error,h2=error,tower=error,rustls=error" 72 72 } | merge $hydrant_vars 73 73 74 74 let pid = (with-env $env_vars { 75 75 sh -c $"($binary) >($log_file) 2>&1 & echo $!" | str trim | into int 76 76 }) 77 - 77 + 78 78 print $"hydrant started with pid: ($pid)" 79 79 { pid: $pid, log: $log_file } 80 80 } ··· 115 115 print "backfill complete." 116 116 return true 117 117 } 118 - 118 + 119 119 sleep 2sec 120 120 } 121 121 false