very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

[all] add indexer_stream feature to decouple the events stream from the indexer

dawn ffd155a9 03bfa99c

+236 -135
+2 -1
Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [features] 7 - default = ["indexer"] 7 + default = ["indexer", "indexer_stream"] 8 8 __persist_sync_all = [] 9 9 backlinks = [] 10 10 relay = [] 11 11 indexer = [] 12 + indexer_stream = ["indexer"] 12 13 13 14 [dependencies] 14 15 tokio = { version = "1.0", features = ["full"] }
+1
README.md
··· 250 250 | feature | default | description | 251 251 | :--- | :--- | :--- | 252 252 | `indexer` | yes | makes hydrant act as an indexer. incompatible with the relay feature. | 253 + | `indexer_stream` | yes | enables the event stream for the indexer. requires indexer feature. | 253 254 | `relay` | no | makes hydrant act as a relay. incompatible with the indexer feature. | 254 255 | `backlinks` | no | enables the backlinks indexer and XRPC endpoints (`blue.microcosm.links.*`). requires indexer feature. | 255 256
+33 -14
src/api/debug.rs
··· 1 1 use crate::api::AppState; 2 2 use crate::db::keys; 3 - use crate::types::{RepoState, ResyncState, StoredEvent}; 3 + #[cfg(feature = "indexer_stream")] 4 + use crate::types::StoredEvent; 5 + use crate::types::{RepoState, ResyncState}; 4 6 use axum::routing::{get, post}; 5 7 use axum::{ 6 8 Json, ··· 32 34 let r = axum::Router::new() 33 35 .route("/debug/get", get(handle_debug_get)) 34 36 .route("/debug/iter", get(handle_debug_iter)) 35 - .route("/debug/compact", post(handle_debug_compact)) 37 + .route("/debug/compact", post(handle_debug_compact)); 38 + 39 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 40 + let r = r 36 41 .route( 37 42 "/debug/ephemeral_ttl_tick", 38 43 post(handle_debug_ephemeral_ttl_tick), ··· 100 105 return serde_json::to_value(state).unwrap_or(Value::Null); 101 106 } 102 107 } 108 + #[cfg(feature = "indexer_stream")] 103 109 "events" => { 104 110 if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) { 105 111 return serde_json::to_value(event).unwrap_or(Value::Null); ··· 279 285 "pending" => Ok(db.pending.clone()), 280 286 #[cfg(feature = "indexer")] 281 287 "resync" => Ok(db.resync.clone()), 282 - #[cfg(feature = "indexer")] 288 + #[cfg(feature = "indexer_stream")] 283 289 "events" => Ok(db.events.clone()), 284 290 #[cfg(feature = "indexer")] 285 291 "records" => Ok(db.records.clone()), ··· 312 318 Ok(StatusCode::OK) 313 319 } 314 320 321 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 315 322 pub async fn handle_debug_ephemeral_ttl_tick( 316 323 State(state): State<Arc<AppState>>, 317 324 ) -> Result<StatusCode, StatusCode> { 318 - tokio::task::spawn_blocking(move || { 319 - #[cfg(feature = "indexer")] 320 - let res = crate::db::ephemeral::ephemeral_ttl_tick(&state.db, &state.ephemeral_ttl); 325 + tokio::task::spawn_blocking(move || -> miette::Result<()> { 326 + #[cfg(feature = "indexer_stream")] 327 + crate::db::ephemeral::ephemeral_ttl_tick(&state.db, &state.ephemeral_ttl)?; 321 328 #[cfg(feature = "relay")] 322 - let res = crate::db::ephemeral::relay_events_ttl_tick(&state.db, &state.ephemeral_ttl); 323 - res 329 + crate::db::ephemeral::relay_events_ttl_tick(&state.db, &state.ephemeral_ttl)?; 330 + Ok(()) 324 331 }) 325 332 .await 326 333 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? ··· 330 337 } 331 338 332 339 #[derive(Deserialize)] 340 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 333 341 pub struct DebugSeedWatermarkRequest { 334 342 /// unix timestamp (seconds) to write the watermark at 335 343 pub ts: u64, ··· 340 348 /// writes an event watermark entry directly to the cursors keyspace, using identical 341 349 /// key/value encoding to the real TTL worker. used in tests to plant a past watermark 342 350 /// so the real `ephemeral_ttl_tick` code path is exercised without waiting 3600 seconds. 351 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 343 352 pub async fn handle_debug_seed_watermark( 344 353 State(state): State<Arc<AppState>>, 345 354 Query(req): Query<DebugSeedWatermarkRequest>, 346 355 ) -> Result<StatusCode, StatusCode> { 347 - tokio::task::spawn_blocking(move || { 348 - #[cfg(feature = "indexer")] 349 - let key = crate::db::keys::event_watermark_key(req.ts); 356 + tokio::task::spawn_blocking(move || -> Result<(), StatusCode> { 357 + #[cfg(feature = "indexer_stream")] 358 + state 359 + .db 360 + .cursors 361 + .insert( 362 + crate::db::keys::event_watermark_key(req.ts), 363 + req.event_id.to_be_bytes(), 364 + ) 365 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 350 366 #[cfg(feature = "relay")] 351 - let key = crate::db::keys::relay_event_watermark_key(req.ts); 352 367 state 353 368 .db 354 369 .cursors 355 - .insert(key, req.event_id.to_be_bytes()) 356 - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) 370 + .insert( 371 + crate::db::keys::relay_event_watermark_key(req.ts), 372 + req.event_id.to_be_bytes(), 373 + ) 374 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 375 + Ok(()) 357 376 }) 358 377 .await 359 378 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??;
+2 -2
src/api/mod.rs
··· 15 15 mod pds; 16 16 mod repos; 17 17 mod stats; 18 - #[cfg(feature = "indexer")] 18 + #[cfg(feature = "indexer_stream")] 19 19 mod stream; 20 20 mod xrpc; 21 21 ··· 39 39 .route("/health", get(async || "OK")) 40 40 .route("/_health", get(async || "OK")) 41 41 .route("/stats", get(stats::get_stats)); 42 - #[cfg(feature = "indexer")] 42 + #[cfg(feature = "indexer_stream")] 43 43 let app = app.nest("/stream", stream::router()); 44 44 let app = app 45 45 .merge(xrpc::router(blocks_available))
+50 -36
src/backfill/mod.rs
··· 4 4 use crate::ops; 5 5 use crate::resolver::ResolverError; 6 6 use crate::state::AppState; 7 - use crate::types::{ 8 - AccountEvt, BroadcastEvent, Commit, GaugeState, RepoState, RepoStatus, ResyncErrorKind, 9 - ResyncState, StoredData, StoredEvent, 10 - }; 7 + use crate::types::{Commit, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState}; 11 8 12 9 use fjall::Slice; 13 10 use jacquard_api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 11 + use jacquard_common::IntoStatic; 14 12 use jacquard_common::error::{ClientError, ClientErrorKind}; 15 13 use jacquard_common::types::cid::Cid; 16 14 use jacquard_common::types::did::Did; 17 15 use jacquard_common::xrpc::{XrpcError, XrpcExt}; 18 - use jacquard_common::{CowStr, IntoStatic}; 19 16 use jacquard_repo::mst::Mst; 20 17 use jacquard_repo::{BlockStore, MemoryBlockStore}; 21 18 use miette::{Diagnostic, IntoDiagnostic, Result}; ··· 23 20 use smol_str::{SmolStr, ToSmolStr}; 24 21 use std::collections::HashMap; 25 22 use std::sync::Arc; 26 - use std::sync::atomic::Ordering; 27 23 use std::time::{Duration, Instant}; 24 + 28 25 use thiserror::Error; 29 26 use tokio::sync::Semaphore; 30 27 use tracing::{Instrument, debug, error, info, trace, warn}; 28 + #[cfg(feature = "indexer_stream")] 29 + use { 30 + crate::types::{AccountEvt, BroadcastEvent, StoredData, StoredEvent}, 31 + jacquard_common::CowStr, 32 + std::sync::atomic::Ordering, 33 + }; 31 34 32 35 pub mod manager; 33 36 ··· 412 415 ); 413 416 state.update_from_doc(doc); 414 417 418 + #[cfg(feature = "indexer_stream")] 415 419 let emit_identity = |status: &RepoStatus, active: bool| { 416 420 let status = match status { 417 421 RepoStatus::Deactivated => "deactivated", ··· 463 467 if let Some(status) = inactive_status { 464 468 warn!(?status, "repo is inactive, stopping backfill"); 465 469 470 + #[cfg(feature = "indexer_stream")] 466 471 emit_identity(&status, false); 467 472 468 473 let resync_state = ResyncState::Gone { ··· 491 496 }; 492 497 493 498 // emit identity event so any consumers know, but only if something changed 499 + #[cfg(feature = "indexer_stream")] 494 500 if state.active != previous_state.active 495 501 || state.status != previous_state.status 496 502 || previous_state.pds.is_none() ··· 555 561 let result = { 556 562 let app_state = app_state.clone(); 557 563 let did = did.clone(); 564 + #[cfg(feature = "indexer_stream")] 558 565 let rev = root_commit.rev; 559 566 560 567 tokio::task::spawn_blocking(move || { ··· 663 670 *collection_counts.entry(path.0.clone()).or_default() += 1; 664 671 } 665 672 666 - let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 667 - let evt = StoredEvent { 668 - live: false, 669 - did: TrimmedDid::from(&did), 670 - rev, 671 - collection: CowStr::Borrowed(collection), 672 - rkey, 673 - action, 674 - data: if ephemeral { 675 - StoredData::Block(val) 676 - } else if only_index_links { 677 - StoredData::Nothing 678 - } else { 679 - StoredData::Ptr(cid_obj.to_ipld().expect("valid cid")) 680 - }, 681 - }; 682 - let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 683 - batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 673 + #[cfg(feature = "indexer_stream")] 674 + { 675 + let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 676 + let evt = StoredEvent { 677 + live: false, 678 + did: TrimmedDid::from(&did), 679 + rev, 680 + collection: CowStr::Borrowed(collection), 681 + rkey, 682 + action, 683 + data: if ephemeral { 684 + StoredData::Block(val) 685 + } else if only_index_links { 686 + StoredData::Nothing 687 + } else { 688 + StoredData::Ptr(cid_obj.to_ipld().expect("valid cid")) 689 + }, 690 + }; 691 + let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 692 + batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 693 + } 684 694 685 695 count += 1; 686 696 } ··· 705 715 &rkey.to_smolstr(), 706 716 )?; 707 717 708 - let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 709 - let evt = StoredEvent { 710 - live: false, 711 - did: TrimmedDid::from(&did), 712 - rev, 713 - collection: CowStr::Borrowed(&collection), 714 - rkey, 715 - action: DbAction::Delete, 716 - data: StoredData::Nothing, 717 - }; 718 - let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 719 - batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 718 + #[cfg(feature = "indexer_stream")] 719 + { 720 + let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 721 + let evt = StoredEvent { 722 + live: false, 723 + did: TrimmedDid::from(&did), 724 + rev, 725 + collection: CowStr::Borrowed(&collection), 726 + rkey, 727 + action: DbAction::Delete, 728 + data: StoredData::Nothing, 729 + }; 730 + let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 731 + batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 732 + } 720 733 721 734 delta -= 1; 722 735 count += 1; ··· 812 825 "committed backfill batch" 813 826 ); 814 827 828 + #[cfg(feature = "indexer_stream")] 815 829 let _ = db.event_tx.send(BroadcastEvent::Persisted( 816 830 db.next_event_id.load(Ordering::SeqCst) - 1, 817 831 ));
+3
src/control/indexer.rs
··· 1 1 use super::*; 2 2 3 + #[cfg(feature = "indexer_stream")] 3 4 /// a stream of [`Event`]s. returned by [`Hydrant::subscribe`]. 4 5 /// 5 6 /// implements [`futures::Stream`] and can be used with `StreamExt::next`, ··· 7 8 /// the stream terminates when the underlying channel closes (i.e. hydrant shuts down). 8 9 pub struct EventStream(mpsc::Receiver<Event>); 9 10 11 + #[cfg(feature = "indexer_stream")] 10 12 impl Stream for EventStream { 11 13 type Item = Event; 12 14 ··· 42 44 } 43 45 } 44 46 47 + #[cfg(feature = "indexer_stream")] 45 48 impl Hydrant { 46 49 /// subscribe to the ordered event stream. 47 50 ///
+13 -6
src/control/mod.rs
··· 48 48 use crate::ingest::indexer::FirehoseWorker; 49 49 use crate::pds_meta::{PdsMeta, PdsMetaHandle}; 50 50 use crate::state::AppState; 51 + #[cfg(feature = "indexer_stream")] 51 52 use crate::types::MarshallableEvt; 52 - 53 53 use firehose::FirehoseShared; 54 - #[cfg(feature = "indexer")] 54 + #[cfg(feature = "indexer_stream")] 55 55 use stream::event_stream_thread; 56 56 #[cfg(feature = "relay")] 57 57 use stream::relay_stream_thread; ··· 77 77 /// - `"account"`: a repo's active/inactive status changed. carries an [`AccountEvt`]. ephemeral, not replayable. 78 78 /// 79 79 /// the `id` field is a monotonically increasing sequence number usable as a cursor for [`Hydrant::subscribe`]. 80 + #[cfg(feature = "indexer_stream")] 80 81 pub type Event = MarshallableEvt<'static>; 81 82 82 83 /// the top-level handle to a hydrant instance. ··· 294 295 } 295 296 296 297 // 7. ephemeral GC thread (not used in relay mode) 297 - #[cfg(feature = "indexer")] 298 + #[cfg(feature = "indexer_stream")] 298 299 if config.ephemeral { 299 300 let state = state.clone(); 300 301 std::thread::Builder::new() ··· 344 345 }); 345 346 346 347 // 9. events/sec stats ticker 348 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 347 349 tokio::spawn({ 348 350 let state = state.clone(); 349 351 let get_id = |state: &AppState| { 350 - #[cfg(feature = "indexer")] 352 + #[cfg(feature = "indexer_stream")] 351 353 let id = state.db.next_event_id.load(Ordering::Relaxed); 352 354 #[cfg(feature = "relay")] 353 355 let id = state.db.next_relay_seq.load(Ordering::Relaxed); ··· 681 683 count_keys.push("resync"); 682 684 } 683 685 686 + #[cfg_attr( 687 + not(any(feature = "indexer_stream", feature = "relay")), 688 + allow(unused_mut) 689 + )] 684 690 let mut counts: BTreeMap<&'static str, u64> = 685 691 futures::future::join_all(count_keys.into_iter().map(|name| { 686 692 let state = state.clone(); ··· 690 696 .into_iter() 691 697 .collect(); 692 698 693 - #[cfg(feature = "indexer")] 699 + #[cfg(feature = "indexer_stream")] 694 700 counts.insert("events", state.db.events.approximate_len() as u64); 695 701 696 702 #[cfg(feature = "relay")] ··· 714 720 s.insert("pending", state.db.pending.disk_space()); 715 721 s.insert("resync", state.db.resync.disk_space()); 716 722 s.insert("resync_buffer", state.db.resync_buffer.disk_space()); 717 - s.insert("events", state.db.events.disk_space()); 718 723 } 724 + #[cfg(feature = "indexer_stream")] 725 + s.insert("events", state.db.events.disk_space()); 719 726 720 727 #[cfg(feature = "relay")] 721 728 s.insert("relay_events", state.db.relay_events.disk_space());
+3 -3
src/control/stream.rs
··· 7 7 use crate::state::AppState; 8 8 use std::sync::atomic::Ordering; 9 9 10 - #[cfg(feature = "indexer")] 10 + #[cfg(feature = "indexer_stream")] 11 11 use { 12 12 super::Event, 13 13 crate::db, ··· 20 20 sha2::{Digest, Sha256}, 21 21 }; 22 22 23 - #[cfg(feature = "indexer")] 23 + #[cfg(feature = "indexer_stream")] 24 24 pub(super) fn event_stream_thread( 25 25 state: Arc<AppState>, 26 26 tx: mpsc::Sender<Event>, ··· 156 156 } 157 157 } 158 158 159 - #[cfg(feature = "indexer")] 159 + #[cfg(feature = "indexer_stream")] 160 160 fn stored_to_event(state: &AppState, id: u64, stored: StoredEvent<'_>) -> Option<Event> { 161 161 let StoredEvent { 162 162 live,
+13 -9
src/db/ephemeral.rs
··· 1 - use crate::db::{Db, keys}; 2 - use fjall::Keyspace; 3 - use miette::{IntoDiagnostic, WrapErr}; 4 - use std::sync::Arc; 5 - use std::sync::atomic::Ordering; 6 - use std::time::Duration; 7 - use tracing::{debug, error, info}; 1 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 2 + use { 3 + crate::db::{Db, keys}, 4 + fjall::Keyspace, 5 + miette::{IntoDiagnostic, WrapErr}, 6 + std::sync::Arc, 7 + std::sync::atomic::Ordering, 8 + std::time::Duration, 9 + tracing::{debug, error, info}, 10 + }; 8 11 9 - #[cfg(feature = "indexer")] 12 + #[cfg(feature = "indexer_stream")] 10 13 pub fn ephemeral_ttl_worker(state: Arc<crate::state::AppState>) { 11 14 info!("ephemeral TTL worker started"); 12 15 loop { ··· 28 31 } 29 32 } 30 33 31 - #[cfg(feature = "indexer")] 34 + #[cfg(feature = "indexer_stream")] 32 35 pub fn ephemeral_ttl_tick(db: &Db, ttl: &Duration) -> miette::Result<()> { 33 36 let current_seq = db.next_event_id.load(Ordering::SeqCst); 34 37 ttl_tick_inner( ··· 54 57 ) 55 58 } 56 59 60 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 57 61 fn ttl_tick_inner( 58 62 db: &Db, 59 63 ttl: &Duration,
+2
src/db/keys/indexer.rs
··· 4 4 use super::SEP; 5 5 use crate::db::types::{DbRkey, DbTid, TrimmedDid}; 6 6 7 + #[cfg(feature = "indexer_stream")] 7 8 pub const EVENT_WATERMARK_PREFIX: &[u8] = b"ewm|"; 8 9 9 10 pub fn pending_key(id: u64) -> [u8; 8] { 10 11 id.to_be_bytes() 11 12 } 12 13 14 + #[cfg(feature = "indexer_stream")] 13 15 pub fn event_watermark_key(timestamp_secs: u64) -> Vec<u8> { 14 16 let mut key = Vec::with_capacity(EVENT_WATERMARK_PREFIX.len() + 8); 15 17 key.extend_from_slice(EVENT_WATERMARK_PREFIX);
+1
src/db/keys/mod.rs
··· 51 51 } 52 52 53 53 // key format: {SEQ} 54 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 54 55 pub fn event_key(seq: u64) -> [u8; 8] { 55 56 seq.to_be_bytes() 56 57 }
+18 -16
src/db/mod.rs
··· 1 1 use crate::config::Compression; 2 2 use crate::db::compaction::DropPrefixFilterFactory; 3 - #[cfg(feature = "indexer")] 3 + use crate::types::{RepoMetadata, RepoState}; 4 + 5 + #[cfg(feature = "indexer_stream")] 4 6 use crate::types::BroadcastEvent; 5 7 #[cfg(feature = "relay")] 6 8 use crate::types::RelayBroadcast; 7 - use crate::types::{RepoMetadata, RepoState}; 8 9 9 10 use fjall::config::{BlockSizePolicy, CompressionPolicy, RestartIntervalPolicy}; 10 11 use fjall::{ ··· 18 19 use std::cell::RefCell; 19 20 use std::collections::HashSet; 20 21 use std::sync::Arc; 21 - use std::sync::atomic::AtomicU64; 22 - 23 22 use url::Url; 24 23 25 24 pub mod compaction; ··· 30 29 pub mod pds_meta; 31 30 pub mod types; 32 31 33 - use tokio::sync::broadcast; 34 32 use tracing::error; 35 33 34 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 35 + use {std::sync::atomic::AtomicU64, tokio::sync::broadcast}; 36 + 36 37 fn default_opts() -> KeyspaceCreateOptions { 37 38 KeyspaceCreateOptions::default() 38 39 } ··· 56 57 pub resync: Keyspace, 57 58 #[cfg(feature = "indexer")] 58 59 pub resync_buffer: Keyspace, 59 - #[cfg(feature = "indexer")] 60 + #[cfg(feature = "indexer_stream")] 60 61 pub events: Keyspace, 61 62 #[cfg(feature = "backlinks")] 62 63 pub backlinks: Keyspace, 63 - #[cfg(feature = "indexer")] 64 + #[cfg(feature = "indexer_stream")] 64 65 pub(crate) event_tx: broadcast::Sender<BroadcastEvent>, 65 - #[cfg(feature = "indexer")] 66 + #[cfg(feature = "indexer_stream")] 66 67 pub next_event_id: Arc<AtomicU64>, 67 68 #[cfg(feature = "relay")] 68 69 pub(crate) relay_events: Keyspace, ··· 323 324 .data_block_compression_policy(CompressionPolicy::disabled()) 324 325 .data_block_restart_interval_policy(RestartIntervalPolicy::all(16)), 325 326 )?; 326 - #[cfg(feature = "indexer")] 327 + #[cfg(feature = "indexer_stream")] 327 328 let events = open_ks( 328 329 "events", 329 330 opts() ··· 433 434 // when adding new keyspaces, make sure to add them to the /stats endpoint 434 435 // and also update any relevant /debug/* endpoints 435 436 436 - #[cfg(feature = "indexer")] 437 + #[cfg(feature = "indexer_stream")] 437 438 let (event_tx, _) = broadcast::channel(10000); 438 439 439 440 #[cfg(feature = "relay")] ··· 455 456 resync, 456 457 #[cfg(feature = "indexer")] 457 458 resync_buffer, 458 - #[cfg(feature = "indexer")] 459 + #[cfg(feature = "indexer_stream")] 459 460 events, 460 461 counts, 461 462 filter, 462 463 crawler, 463 464 #[cfg(feature = "backlinks")] 464 465 backlinks, 465 - #[cfg(feature = "indexer")] 466 + #[cfg(feature = "indexer_stream")] 466 467 event_tx, 467 468 counts_map: HashMap::new(), 468 - #[cfg(feature = "indexer")] 469 + #[cfg(feature = "indexer_stream")] 469 470 next_event_id: Arc::new(AtomicU64::new(0)), 470 471 #[cfg(feature = "relay")] 471 472 relay_events, ··· 493 494 .store(last_relay_seq + 1, std::sync::atomic::Ordering::Relaxed); 494 495 } 495 496 496 - #[cfg(feature = "indexer")] 497 + #[cfg(feature = "indexer_stream")] 497 498 { 498 499 let mut last_id = 0; 499 500 if let Some(guard) = this.events.iter().next_back() { ··· 529 530 let ks = match ks_name { 530 531 #[cfg(feature = "indexer")] 531 532 "blocks" => &self.blocks, 532 - #[cfg(feature = "indexer")] 533 + #[cfg(feature = "indexer_stream")] 533 534 "events" => &self.events, 534 535 "repos" => &self.repos, 535 536 #[cfg(feature = "backlinks")] ··· 647 648 tasks.push(compact(self.pending.clone())); 648 649 tasks.push(compact(self.resync.clone())); 649 650 tasks.push(compact(self.resync_buffer.clone())); 650 - tasks.push(compact(self.events.clone())); 651 651 } 652 + #[cfg(feature = "indexer_stream")] 653 + tasks.push(compact(self.events.clone())); 652 654 653 655 #[cfg(feature = "relay")] 654 656 tasks.push(compact(self.relay_events.clone()));
+27 -12
src/ingest/indexer.rs
··· 4 4 use crate::ingest::validation; 5 5 use crate::resolver::{NoSigningKeyError, ResolverError}; 6 6 use crate::state::AppState; 7 - use crate::types::{ 8 - AccountEvt, BroadcastEvent, GaugeState, IdentityEvt, RepoMetadata, RepoState, RepoStatus, 9 - }; 7 + use crate::types::{GaugeState, RepoMetadata, RepoState, RepoStatus}; 10 8 use crate::{ops, util}; 11 9 12 10 use fjall::OwnedWriteBatch; 13 11 14 12 use jacquard_common::IntoStatic; 15 - use jacquard_common::cowstr::ToCowStr; 16 13 use jacquard_common::types::did::Did; 14 + 17 15 use jacquard_repo::error::CommitError; 18 16 use miette::{Diagnostic, IntoDiagnostic, Result}; 19 17 use std::sync::Arc; ··· 22 20 use tokio::runtime::Handle as TokioHandle; 23 21 use tokio::sync::mpsc; 24 22 use tracing::{debug, error, info, warn}; 23 + #[cfg(feature = "indexer_stream")] 24 + use { 25 + crate::types::{AccountEvt, BroadcastEvent, IdentityEvt}, 26 + jacquard_common::cowstr::ToCowStr, 27 + }; 25 28 26 29 #[derive(Debug)] 27 30 pub struct IndexerCommitData { ··· 124 127 batch: OwnedWriteBatch, 125 128 added_blocks: &'a mut i64, 126 129 records_delta: &'a mut i64, 130 + #[cfg(feature = "indexer_stream")] 127 131 broadcast_events: &'a mut Vec<BroadcastEvent>, 128 132 } 129 133 ··· 194 198 let _guard = handle.enter(); 195 199 debug!(shard = id, "shard started"); 196 200 201 + #[cfg(feature = "indexer_stream")] 197 202 let mut broadcast_events = Vec::new(); 198 203 199 204 while let Some(msg) = rx.blocking_recv() { 200 205 let batch = state.db.inner.batch(); 206 + #[cfg(feature = "indexer_stream")] 201 207 broadcast_events.clear(); 202 208 203 209 let mut added_blocks = 0; ··· 208 214 batch, 209 215 added_blocks: &mut added_blocks, 210 216 records_delta: &mut records_delta, 217 + #[cfg(feature = "indexer_stream")] 211 218 broadcast_events: &mut broadcast_events, 212 219 }; 213 220 ··· 393 400 if records_delta != 0 { 394 401 state.db.update_count("records", records_delta); 395 402 } 403 + #[cfg(feature = "indexer_stream")] 396 404 for evt in broadcast_events.drain(..) { 397 405 let _ = state.db.event_tx.send(evt); 398 406 } ··· 473 481 let repo_state = res.repo_state; 474 482 *ctx.added_blocks += res.blocks_count; 475 483 *ctx.records_delta += res.records_delta; 484 + #[cfg(feature = "indexer_stream")] 476 485 ctx.broadcast_events 477 486 .push(BroadcastEvent::Persisted(db.next_event_id.load(SeqCst) - 1)); 478 487 479 488 Ok(RepoProcessResult::Ok(repo_state)) 480 489 } 481 490 491 + #[cfg_attr(not(feature = "indexer_stream"), allow(unused_variables))] 482 492 fn handle_identity<'s>( 483 493 ctx: &mut WorkerContext, 484 494 repo_state: RepoState<'s>, 485 495 identity: &Identity<'_>, 486 496 changed: bool, 487 497 ) -> Result<RepoProcessResult<'s, 'static>, IngestError> { 488 - let db = &ctx.state.db; 489 - let did = &identity.did; 490 - if changed { 491 - let evt = IdentityEvt { 492 - did: did.clone().into_static(), 493 - handle: repo_state.handle.clone().map(IntoStatic::into_static), 494 - }; 495 - ctx.broadcast_events.push(ops::make_identity_event(db, evt)); 498 + #[cfg(feature = "indexer_stream")] 499 + { 500 + let db = &ctx.state.db; 501 + let did = &identity.did; 502 + if changed { 503 + let evt = IdentityEvt { 504 + did: did.clone().into_static(), 505 + handle: repo_state.handle.clone().map(IntoStatic::into_static), 506 + }; 507 + ctx.broadcast_events.push(ops::make_identity_event(db, evt)); 508 + } 496 509 } 497 510 498 511 Ok(RepoProcessResult::Ok(repo_state)) ··· 508 521 let db = &ctx.state.db; 509 522 let did = &account.did; 510 523 let is_inactive = !account.active; 524 + #[cfg(feature = "indexer_stream")] 511 525 let evt = AccountEvt { 512 526 did: did.clone().into_static(), 513 527 active: account.active, ··· 537 551 } 538 552 } 539 553 554 + #[cfg(feature = "indexer_stream")] 540 555 if changed { 541 556 ctx.broadcast_events.push(ops::make_account_event(db, evt)); 542 557 }
+9 -3
src/lib.rs
··· 14 14 pub use smol_str; 15 15 } 16 16 17 - #[cfg(all(feature = "relay", feature = "indexer"))] 17 + #[cfg(all( 18 + feature = "relay", 19 + any(feature = "indexer", feature = "indexer_stream", feature = "backlinks") 20 + ))] 18 21 compile_error!("can't be relay and indexer at the same time"); 19 - #[cfg(all(feature = "relay", feature = "backlinks"))] 20 - compile_error!("can't index backlinks while running as a relay"); 22 + #[cfg(all( 23 + not(feature = "indexer"), 24 + any(feature = "indexer_stream", feature = "backlinks") 25 + ))] 26 + compile_error!("indexer dependent features (stream, backlinks) without indexer can't be enabled"); 21 27 22 28 pub(crate) mod api; 23 29 #[cfg(feature = "indexer")]
+49 -32
src/ops.rs
··· 1 1 use fjall::OwnedWriteBatch; 2 2 use fjall::Slice; 3 3 4 - use jacquard_common::CowStr; 5 4 #[cfg(feature = "backlinks")] 6 5 use jacquard_common::Data; 7 6 use jacquard_common::types::did::Did; 8 7 use miette::{Context, IntoDiagnostic, Result}; 9 8 use std::collections::HashMap; 10 - use std::sync::atomic::Ordering; 11 9 use tracing::debug; 12 10 13 11 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; ··· 16 14 use crate::ingest::stream::Commit; 17 15 use crate::ingest::validation::ValidatedCommit; 18 16 use crate::state::AppState; 19 - use crate::types::StoredData; 20 - use crate::types::{ 21 - AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 22 - StoredEvent, 17 + use crate::types::{RepoState, RepoStatus, ResyncState}; 18 + 19 + #[cfg(feature = "indexer_stream")] 20 + use { 21 + crate::types::{ 22 + AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, StoredData, StoredEvent, 23 + }, 24 + jacquard_common::CowStr, 25 + std::sync::atomic::Ordering, 23 26 }; 24 27 25 28 pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> { ··· 36 39 37 40 // emitting identity is ephemeral 38 41 // we dont replay these, consumers can just fetch identity themselves if they need it 42 + #[cfg(feature = "indexer_stream")] 39 43 pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent { 40 44 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 41 45 let marshallable = MarshallableEvt { ··· 48 52 BroadcastEvent::Ephemeral(Box::new(marshallable)) 49 53 } 50 54 55 + #[cfg(feature = "indexer_stream")] 51 56 pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent { 52 57 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 53 58 let marshallable = MarshallableEvt { ··· 237 242 let rkey = DbRkey::new(rkey); 238 243 let db_key = keys::record_key(did, collection, &rkey); 239 244 245 + #[cfg(feature = "indexer_stream")] 240 246 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 241 247 242 248 let action = DbAction::try_from(op.action.as_str())?; 243 - let block = match action { 249 + let block: Option<bytes::Bytes> = match action { 244 250 DbAction::Create | DbAction::Update => { 245 251 let Some(cid) = &op.cid else { 246 252 continue; ··· 281 287 )?; 282 288 } 283 289 None 284 - } else if action == DbAction::Create || action == DbAction::Update { 285 - Some(bytes.clone()) 286 290 } else { 287 - unreachable!("we tested if we are in create or update action") 291 + // in ephemeral mode, capture bytes inline for event emission 292 + #[cfg(feature = "indexer_stream")] 293 + { 294 + Some(bytes.clone()) 295 + } 296 + #[cfg(not(feature = "indexer_stream"))] 297 + { 298 + let _ = bytes; 299 + None 300 + } 288 301 } 289 302 } 290 303 DbAction::Delete => { ··· 309 322 } 310 323 }; 311 324 312 - let evt = StoredEvent { 313 - live: true, 314 - did: TrimmedDid::from(did), 315 - rev: DbTid::from(&commit.rev), 316 - collection: CowStr::Borrowed(collection), 317 - rkey, 318 - action, 319 - data: block 320 - .map(StoredData::Block) 321 - .or_else(|| { 322 - (!only_index_links).then(|| { 323 - op.cid 324 - .as_ref() 325 - .map(|c| c.to_ipld().expect("valid cid")) 326 - .map(StoredData::Ptr) 327 - })? 328 - }) 329 - .unwrap_or(StoredData::Nothing), 330 - }; 331 - 332 - let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 333 - batch.insert(&db.events, keys::event_key(event_id), bytes); 325 + #[cfg(feature = "indexer_stream")] 326 + { 327 + let evt = StoredEvent { 328 + live: true, 329 + did: TrimmedDid::from(did), 330 + rev: DbTid::from(&commit.rev), 331 + collection: CowStr::Borrowed(collection), 332 + rkey, 333 + action, 334 + data: block 335 + .map(StoredData::Block) 336 + .or_else(|| { 337 + (!only_index_links).then(|| { 338 + op.cid 339 + .as_ref() 340 + .map(|c| c.to_ipld().expect("valid cid")) 341 + .map(StoredData::Ptr) 342 + })? 343 + }) 344 + .unwrap_or(StoredData::Nothing), 345 + }; 346 + let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 347 + batch.insert(&db.events, keys::event_key(event_id), bytes); 348 + } 349 + #[cfg(not(feature = "indexer_stream"))] 350 + drop(block); 334 351 } 335 352 336 353 // update counts
+10 -1
src/types.rs
··· 11 11 use serde_json::Value; 12 12 use smol_str::{SmolStr, ToSmolStr}; 13 13 14 - use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid}; 14 + use crate::db::types::{DbTid, DidKey}; 15 + 16 + #[cfg(feature = "indexer_stream")] 17 + use crate::db::types::{DbAction, DbRkey, TrimmedDid}; 15 18 use crate::resolver::MiniDoc; 16 19 17 20 pub(crate) mod v2 { ··· 200 203 } 201 204 } 202 205 206 + #[cfg(feature = "indexer_stream")] 203 207 #[derive(Clone, Debug)] 204 208 pub(crate) enum BroadcastEvent { 205 209 #[allow(dead_code)] ··· 367 371 pub status: Option<CowStr<'i>>, 368 372 } 369 373 374 + #[cfg(feature = "indexer_stream")] 370 375 #[derive(Serialize, Deserialize, Clone)] 371 376 pub(crate) enum StoredData { 372 377 Nothing, ··· 375 380 Block(Bytes), 376 381 } 377 382 383 + #[cfg(feature = "indexer_stream")] 378 384 impl StoredData { 379 385 pub fn is_nothing(&self) -> bool { 380 386 matches!(self, StoredData::Nothing) 381 387 } 382 388 } 383 389 390 + #[cfg(feature = "indexer_stream")] 384 391 impl Default for StoredData { 385 392 fn default() -> Self { 386 393 Self::Nothing 387 394 } 388 395 } 389 396 397 + #[cfg(feature = "indexer_stream")] 390 398 impl Debug for StoredData { 391 399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 392 400 match self { ··· 397 405 } 398 406 } 399 407 408 + #[cfg(feature = "indexer_stream")] 400 409 #[derive(Debug, Serialize, Deserialize, Clone)] 401 410 #[serde(bound(deserialize = "'i: 'de"))] 402 411 pub(crate) struct StoredEvent<'i> {