very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[api] allow backfill component to be paused

dawn 6729188d 90f016e9

+28 -8
+5 -5
README.md
··· 76 76 #### ingestion control 77 77 78 78 - `GET /ingestion`: get the current ingestion status. 79 - - returns `{ "crawler": bool, "firehose": bool }`. 79 + - returns `{ "crawler": bool, "firehose": bool, "backfill": bool }`. 80 80 - `PATCH /ingestion`: enable or disable ingestion components at runtime without restarting. 81 - - body: `{ "crawler"?: bool, "firehose"?: bool }` — only provided fields are updated. 82 - - when disabled, the component pauses at the next idle point and resumes immediately when re-enabled. 81 + - body: `{ "crawler"?: bool, "firehose"?: bool, "backfill"?: bool }` — only provided fields are updated. 82 + - when disabled, each component finishes its current task before pausing (e.g. the backfill worker completes any in-flight repo syncs, the firehose finishes processing the current message). they resume immediately when re-enabled. 83 83 84 84 #### database operations 85 85 86 - - `POST /db/train`: train zstd compression dictionaries for the `repos`, `blocks`, and `events` keyspaces. dictionaries are written to disk; a restart is required to apply them. ingestion is paused for the duration and restored on completion. 87 - - `POST /db/compact`: trigger a full major compaction of all database keyspaces in parallel. ingestion is paused for the duration and restored on completion. 86 + - `POST /db/train`: train zstd compression dictionaries for the `repos`, `blocks`, and `events` keyspaces. dictionaries are written to disk; a restart is required to apply them. the crawler, firehose, and backfill worker are paused for the duration and restored on completion. 87 + - `POST /db/compact`: trigger a full major compaction of all database keyspaces in parallel. the crawler, firehose, and backfill worker are paused for the duration and restored on completion. 88 88 89 89 #### filter mode 90 90
+7
src/api/ingestion.rs
··· 19 19 pub struct IngestionStatus { 20 20 pub crawler: bool, 21 21 pub firehose: bool, 22 + pub backfill: bool, 22 23 } 23 24 24 25 pub async fn get_ingestion(State(state): State<Arc<AppState>>) -> Json<IngestionStatus> { 25 26 Json(IngestionStatus { 26 27 crawler: *state.crawler_enabled.borrow(), 27 28 firehose: *state.firehose_enabled.borrow(), 29 + backfill: *state.backfill_enabled.borrow(), 28 30 }) 29 31 } 30 32 ··· 34 36 pub crawler: Option<bool>, 35 37 #[serde(default)] 36 38 pub firehose: Option<bool>, 39 + #[serde(default)] 40 + pub backfill: Option<bool>, 37 41 } 38 42 39 43 pub async fn patch_ingestion( ··· 45 49 } 46 50 if let Some(firehose) = body.firehose { 47 51 state.firehose_enabled.send_replace(firehose); 52 + } 53 + if let Some(backfill) = body.backfill { 54 + state.backfill_enabled.send_replace(backfill); 48 55 } 49 56 StatusCode::OK 50 57 }
+6 -1
src/backfill/mod.rs
··· 32 32 pub mod manager; 33 33 34 34 use crate::ingest::{BufferTx, IngestMessage}; 35 + use crate::util::WatchEnabledExt; 35 36 36 37 pub struct BackfillWorker { 37 38 state: Arc<AppState>, ··· 41 42 verify_signatures: bool, 42 43 ephemeral: bool, 43 44 in_flight: Arc<scc::HashSet<Did<'static>>>, 45 + enabled: tokio::sync::watch::Receiver<bool>, 44 46 } 45 47 46 48 impl BackfillWorker { ··· 51 53 concurrency_limit: usize, 52 54 verify_signatures: bool, 53 55 ephemeral: bool, 56 + enabled: tokio::sync::watch::Receiver<bool>, 54 57 ) -> Self { 55 58 Self { 56 59 state, ··· 66 69 verify_signatures, 67 70 ephemeral, 68 71 in_flight: Arc::new(scc::HashSet::new()), 72 + enabled, 69 73 } 70 74 } 71 75 } ··· 82 86 } 83 87 84 88 impl BackfillWorker { 85 - pub async fn run(self) { 89 + pub async fn run(mut self) { 86 90 info!("backfill worker started"); 87 91 88 92 loop { 93 + self.enabled.wait_enabled("backfill").await; 89 94 let mut spawned = 0; 90 95 91 96 for guard in self.state.db.pending.iter() {
+2 -1
src/main.rs
··· 92 92 let state = state.clone(); 93 93 let timeout = cfg.repo_fetch_timeout; 94 94 BackfillWorker::new( 95 - state, 95 + state.clone(), 96 96 buffer_tx.clone(), 97 97 timeout, 98 98 cfg.backfill_concurrency_limit, ··· 101 101 SignatureVerification::Full | SignatureVerification::BackfillOnly 102 102 ), 103 103 cfg.ephemeral, 104 + state.backfill_enabled.subscribe(), 104 105 ) 105 106 .run() 106 107 });
+8 -1
src/state.rs
··· 22 22 pub crawler_enabled: watch::Sender<bool>, 23 23 /// Controls whether firehose ingestion is running. Receivers are held by ingestor tasks. 24 24 pub firehose_enabled: watch::Sender<bool>, 25 + /// Controls whether the backfill worker picks up new tasks. Receiver is held by the backfill worker. 26 + pub backfill_enabled: watch::Sender<bool>, 25 27 } 26 28 27 29 impl AppState { ··· 46 48 47 49 let (crawler_enabled, _) = watch::channel(crawler_default); 48 50 let (firehose_enabled, _) = watch::channel(config.enable_firehose); 51 + let (backfill_enabled, _) = watch::channel(true); 49 52 50 53 Ok(Self { 51 54 db, ··· 55 58 backfill_notify: Notify::new(), 56 59 crawler_enabled, 57 60 firehose_enabled, 61 + backfill_enabled, 58 62 }) 59 63 } 60 64 ··· 62 66 self.backfill_notify.notify_one(); 63 67 } 64 68 65 - /// pauses both crawler and firehose, runs `f`, then restores their prior state. 69 + /// pauses the crawler, firehose, and backfill worker, runs `f`, then restores their prior state. 66 70 /// the restore always happens, even if `f` returns an error. 67 71 pub async fn with_ingestion_paused<F, Fut, T>(&self, f: F) -> T 68 72 where ··· 71 75 { 72 76 let crawler_was = *self.crawler_enabled.borrow(); 73 77 let firehose_was = *self.firehose_enabled.borrow(); 78 + let backfill_was = *self.backfill_enabled.borrow(); 74 79 self.crawler_enabled.send_replace(false); 75 80 self.firehose_enabled.send_replace(false); 81 + self.backfill_enabled.send_replace(false); 76 82 let result = f().await; 77 83 self.crawler_enabled.send_replace(crawler_was); 78 84 self.firehose_enabled.send_replace(firehose_was); 85 + self.backfill_enabled.send_replace(backfill_was); 79 86 result 80 87 } 81 88 }