very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[api] add /db/compact, move /train_dict to /db/train, document

dawn a868e313 403ec07d

+100 -42
+13
README.md
··· 74 74 - `GET /filter`: get the current filter configuration. 75 75 - `PATCH /filter`: update the filter configuration. 76 76 77 + #### ingestion control 78 + 79 + - `GET /ingestion`: get the current ingestion status. 80 + - returns `{ "crawler": bool, "firehose": bool }`. 81 + - `PATCH /ingestion`: enable or disable ingestion components at runtime without restarting. 82 + - body: `{ "crawler"?: bool, "firehose"?: bool }` — only provided fields are updated. 83 + - when disabled, the component pauses at the next idle point and resumes immediately when re-enabled. 84 + 85 + #### database operations 86 + 87 + - `POST /db/train`: train zstd compression dictionaries for the `repos`, `blocks`, and `events` keyspaces. dictionaries are written to disk; a restart is required to apply them. ingestion is paused for the duration and restored on completion. 88 + - `POST /db/compact`: trigger a full major compaction of all database keyspaces in parallel. ingestion is paused for the duration and restored on completion. 89 + 77 90 #### filter mode 78 91 79 92 the `mode` field controls what gets indexed:
+48
src/api/db.rs
··· 1 + use std::sync::Arc; 2 + 3 + use crate::state::AppState; 4 + use axum::{Router, extract::State, http::StatusCode, routing::post}; 5 + use futures::FutureExt; 6 + use miette::IntoDiagnostic; 7 + 8 + pub fn router() -> Router<Arc<AppState>> { 9 + Router::new() 10 + .route("/db/train", post(handle_train_dict)) 11 + .route("/db/compact", post(handle_compact)) 12 + } 13 + 14 + pub async fn handle_train_dict( 15 + State(state): State<Arc<AppState>>, 16 + ) -> Result<StatusCode, StatusCode> { 17 + state 18 + .with_ingestion_paused(async || { 19 + let train = |name: &'static str| { 20 + let db = state.db.clone(); 21 + tokio::task::spawn_blocking(move || db.train_dict(name)) 22 + .map(|res| res.into_diagnostic().flatten()) 23 + }; 24 + let repos = train("repos"); 25 + let blocks = train("blocks"); 26 + let events = train("events"); 27 + 28 + tokio::try_join!(repos, blocks, events) 29 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 30 + 31 + Ok(StatusCode::OK) 32 + }) 33 + .await 34 + } 35 + 36 + pub async fn handle_compact(State(state): State<Arc<AppState>>) -> Result<StatusCode, StatusCode> { 37 + state 38 + .with_ingestion_paused(async || { 39 + state 40 + .db 41 + .compact() 42 + .await 43 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 44 + 45 + Ok(StatusCode::OK) 46 + }) 47 + .await 48 + }
+2 -2
src/api/mod.rs
··· 4 4 use tower_http::cors::CorsLayer; 5 5 use tower_http::trace::TraceLayer; 6 6 7 + mod db; 7 8 mod debug; 8 9 mod filter; 9 10 mod ingestion; 10 11 mod repos; 11 12 mod stats; 12 13 mod stream; 13 - mod train_dict; 14 14 mod xrpc; 15 15 16 16 pub async fn serve(state: Arc<AppState>, port: u16) -> miette::Result<()> { ··· 22 22 .merge(filter::router()) 23 23 .merge(repos::router()) 24 24 .merge(ingestion::router()) 25 - .merge(train_dict::router()) 25 + .merge(db::router()) 26 26 .with_state(state) 27 27 .layer(TraceLayer::new_for_http()) 28 28 .layer(CorsLayer::permissive());
-27
src/api/train_dict.rs
··· 1 - use std::sync::Arc; 2 - 3 - use crate::state::AppState; 4 - use axum::{Router, extract::State, http::StatusCode, routing::post}; 5 - use futures::FutureExt; 6 - use miette::IntoDiagnostic; 7 - 8 - pub fn router() -> Router<Arc<AppState>> { 9 - Router::new().route("/train_dict", post(handle_train_dict)) 10 - } 11 - 12 - pub async fn handle_train_dict( 13 - State(state): State<Arc<AppState>>, 14 - ) -> Result<StatusCode, StatusCode> { 15 - let train = |name: &'static str| { 16 - let db = state.db.clone(); 17 - tokio::task::spawn_blocking(move || db.train_dict(name)) 18 - .map(|res| res.into_diagnostic().flatten()) 19 - }; 20 - let repos = train("repos"); 21 - let blocks = train("blocks"); 22 - let events = train("events"); 23 - 24 - tokio::try_join!(repos, blocks, events).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 25 - 26 - Ok(StatusCode::OK) 27 - }
+19 -12
src/db/mod.rs
··· 524 524 Ok(()) 525 525 } 526 526 527 - pub fn compact(&self) -> Result<()> { 528 - self.repos.major_compact().into_diagnostic()?; 529 - self.records.major_compact().into_diagnostic()?; 530 - self.blocks.major_compact().into_diagnostic()?; 531 - self.cursors.major_compact().into_diagnostic()?; 532 - self.pending.major_compact().into_diagnostic()?; 533 - self.resync.major_compact().into_diagnostic()?; 534 - self.resync_buffer.major_compact().into_diagnostic()?; 535 - self.events.major_compact().into_diagnostic()?; 536 - self.counts.major_compact().into_diagnostic()?; 537 - self.filter.major_compact().into_diagnostic()?; 538 - self.crawler.major_compact().into_diagnostic()?; 527 + pub async fn compact(&self) -> Result<()> { 528 + let compact = |ks: Keyspace| async move { 529 + tokio::task::spawn_blocking(move || ks.major_compact().into_diagnostic()) 530 + .await 531 + .into_diagnostic()? 532 + }; 533 + tokio::try_join!( 534 + compact(self.repos.clone()), 535 + compact(self.records.clone()), 536 + compact(self.blocks.clone()), 537 + compact(self.cursors.clone()), 538 + compact(self.pending.clone()), 539 + compact(self.resync.clone()), 540 + compact(self.resync_buffer.clone()), 541 + compact(self.events.clone()), 542 + compact(self.counts.clone()), 543 + compact(self.filter.clone()), 544 + compact(self.crawler.clone()), 545 + )?; 539 546 Ok(()) 540 547 } 541 548
+1 -1
src/main.rs
··· 33 33 34 34 if cfg.db_compact { 35 35 info!("compacting database..."); 36 - state.db.compact()?; 36 + state.db.compact().await?; 37 37 } 38 38 39 39 if cfg.full_network
+17
src/state.rs
··· 61 61 pub fn notify_backfill(&self) { 62 62 self.backfill_notify.notify_one(); 63 63 } 64 + 65 + /// pauses both crawler and firehose, runs `f`, then restores their prior state. 66 + /// the restore always happens, even if `f` returns an error. 67 + pub async fn with_ingestion_paused<F, Fut, T>(&self, f: F) -> T 68 + where 69 + F: FnOnce() -> Fut, 70 + Fut: Future<Output = T>, 71 + { 72 + let crawler_was = *self.crawler_enabled.borrow(); 73 + let firehose_was = *self.firehose_enabled.borrow(); 74 + self.crawler_enabled.send_replace(false); 75 + self.firehose_enabled.send_replace(false); 76 + let result = f().await; 77 + self.crawler_enabled.send_replace(crawler_was); 78 + self.firehose_enabled.send_replace(firehose_was); 79 + result 80 + } 64 81 }