very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[db] make count tracking more durable

dawn 6f2cce4f 53d93fb5

+688 -295
+20 -17
src/backfill/manager.rs
··· 1 1 use crate::db::types::TrimmedDid; 2 - use crate::db::{self, keys}; 2 + use crate::db::{self, CountDeltas, keys}; 3 3 use crate::state::AppState; 4 4 use crate::types::{GaugeState, ResyncState}; 5 5 use miette::{IntoDiagnostic, Result}; ··· 10 10 11 11 pub fn queue_gone_backfills(state: &Arc<AppState>) -> Result<()> { 12 12 debug!("scanning for deactivated/takendown repos to retry..."); 13 - let mut transitions = Vec::new(); 13 + let mut transitions = 0usize; 14 + let mut count_deltas = CountDeltas::default(); 14 15 15 16 let mut batch = state.db.inner.batch(); 16 17 ··· 61 62 crate::db::ser_repo_meta(&metadata)?, 62 63 ); 63 64 64 - transitions.push((GaugeState::Resync(None), GaugeState::Pending)); 65 + count_deltas.add_gauge_diff(&GaugeState::Resync(None), &GaugeState::Pending); 66 + transitions += 1; 65 67 } 66 68 } 67 69 } 68 70 69 - if transitions.is_empty() { 71 + if transitions == 0 { 70 72 return Ok(()); 71 73 } 72 74 75 + let reservation = state.db.stage_count_deltas(&mut batch, &count_deltas); 73 76 batch.commit().into_diagnostic()?; 74 - 75 - for (old_gauge, new_gauge) in &transitions { 76 - state.db.update_gauge_diff(old_gauge, new_gauge); 77 - } 77 + state.db.apply_count_deltas(&count_deltas); 78 + drop(reservation); 78 79 79 80 state.notify_backfill(); 80 81 81 - info!(count = transitions.len(), "queued gone backfills"); 82 + info!(count = transitions, "queued gone backfills"); 82 83 Ok(()) 83 84 } 84 85 ··· 90 91 std::thread::sleep(Duration::from_secs(60)); 91 92 92 93 let now = chrono::Utc::now().timestamp(); 93 - let mut transitions = Vec::new(); 94 + let mut transitions = 0usize; 95 + let mut count_deltas = CountDeltas::default(); 94 96 95 97 let mut batch = state.db.inner.batch(); 96 98 ··· 161 163 }; 162 164 batch.insert(&state.db.repo_metadata, &metadata_key, serialized_metadata); 163 165 164 - transitions.push((GaugeState::Resync(Some(kind)), GaugeState::Pending)); 166 + count_deltas 167 + .add_gauge_diff(&GaugeState::Resync(Some(kind)), &GaugeState::Pending); 168 + transitions += 1; 165 169 } 166 170 } 167 171 Ok(_) => { ··· 174 178 } 175 179 } 176 180 177 - if transitions.is_empty() { 181 + if transitions == 0 { 178 182 continue; 179 183 } 180 184 185 + let reservation = state.db.stage_count_deltas(&mut batch, &count_deltas); 181 186 if let Err(e) = batch.commit() { 182 187 error!(err = %e, "failed to commit batch"); 183 188 db::check_poisoned(&e); 184 189 continue; 185 190 } 186 - 187 - for (old_gauge, new_gauge) in &transitions { 188 - state.db.update_gauge_diff(old_gauge, new_gauge); 189 - } 191 + state.db.apply_count_deltas(&count_deltas); 192 + drop(reservation); 190 193 state.notify_backfill(); 191 - info!(count = transitions.len(), "queued retries"); 194 + info!(count = transitions, "queued retries"); 192 195 } 193 196 }
+49 -44
src/backfill/mod.rs
··· 1 1 use crate::db::types::{DbAction, DbRkey, TrimmedDid}; 2 - use crate::db::{self, Db, keys, ser_repo_state}; 2 + use crate::db::{self, CountDeltas, Db, keys, ser_repo_state}; 3 3 use crate::filter::FilterMode; 4 4 use crate::ops; 5 5 use crate::resolver::ResolverError; ··· 194 194 // determine old gauge state 195 195 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly. 196 196 let mut batch = db.inner.batch(); 197 + let mut count_deltas = CountDeltas::default(); 197 198 // unconditionally remove from pending 198 199 batch.remove(&db.pending, pending_key); 199 200 // remove from resync, just in case 200 201 batch.remove(&db.resync, &did_key); 202 + count_deltas.add_gauge_diff(&GaugeState::Pending, &GaugeState::Synced); 203 + let reservation = db.stage_count_deltas(&mut batch, &count_deltas); 201 204 202 205 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 203 206 .await 204 207 .into_diagnostic()??; 205 - 206 - state 207 - .db 208 - .update_gauge_diff_async(&GaugeState::Pending, &GaugeState::Synced) 209 - .await; 208 + db.apply_count_deltas(&count_deltas); 209 + drop(reservation); 210 210 211 211 let state = state.clone(); 212 212 tokio::task::spawn_blocking(move || { ··· 227 227 } 228 228 Ok(()) 229 229 } 230 - Ok(None) => { 231 - // signal mode: repo had no matching records, was cleaned up by process_did 232 - state.db.update_count_async("repos", -1).await; 233 - state.db.update_count_async("pending", -1).await; 234 - Ok(()) 235 - } 230 + Ok(None) => Ok(()), 236 231 Err(BackfillError::Deleted) => { 237 232 warn!("orphaned pending entry, cleaning up"); 238 - // orphaned pending entry, clean it up 239 - Db::remove(db.pending.clone(), pending_key).await?; 240 - state.db.update_count_async("pending", -1).await; 233 + let mut batch = db.inner.batch(); 234 + let mut count_deltas = CountDeltas::default(); 235 + batch.remove(&db.pending, pending_key); 236 + count_deltas.add("pending", -1); 237 + let reservation = db.stage_count_deltas(&mut batch, &count_deltas); 238 + tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 239 + .await 240 + .into_diagnostic()??; 241 + db.apply_count_deltas(&count_deltas); 242 + drop(reservation); 241 243 Ok(()) 242 244 } 243 245 Err(e) => { ··· 286 288 retry_count, 287 289 next_retry, 288 290 }; 291 + let old_gauge = prev_kind 292 + .map(|k| GaugeState::Resync(Some(k))) 293 + .unwrap_or(GaugeState::Pending); 294 + let new_gauge = GaugeState::Resync(Some(error_kind.clone())); 295 + let mut count_deltas = CountDeltas::default(); 296 + count_deltas.add_gauge_diff(&old_gauge, &new_gauge); 289 297 290 298 let error_string = e.to_string(); 291 299 292 300 tokio::task::spawn_blocking({ 293 301 let state = state.clone(); 294 302 let did_key = did_key.into_static(); 303 + let count_deltas = count_deltas.clone(); 295 304 move || { 296 305 // 3. save to resync 297 306 let serialized_resync_state = ··· 316 325 if let Some(state_bytes) = serialized_repo_state { 317 326 batch.insert(&state.db.repos, &did_key, state_bytes); 318 327 } 319 - batch.commit().into_diagnostic() 328 + let reservation = state.db.stage_count_deltas(&mut batch, &count_deltas); 329 + batch.commit().into_diagnostic().inspect(|_| { 330 + state.db.apply_count_deltas(&count_deltas); 331 + drop(reservation); 332 + }) 320 333 } 321 334 }) 322 335 .await 323 336 .into_diagnostic()??; 324 - 325 - let old_gauge = prev_kind 326 - .map(|k| GaugeState::Resync(Some(k))) 327 - .unwrap_or(GaugeState::Pending); 328 - 329 - let new_gauge = GaugeState::Resync(Some(error_kind)); 330 - 331 - state 332 - .db 333 - .update_gauge_diff_async(&old_gauge, &new_gauge) 334 - .await; 335 337 336 338 Err(e) 337 339 } ··· 776 778 )?; 777 779 } 778 780 781 + let mut count_deltas = CountDeltas::default(); 782 + if delta != 0 { 783 + count_deltas.add("records", delta); 784 + } 785 + if added_blocks > 0 { 786 + count_deltas.add("blocks", added_blocks); 787 + } 788 + let reservation = app_state.db.stage_count_deltas(&mut batch, &count_deltas); 779 789 batch.commit().into_diagnostic()?; 790 + app_state.db.apply_count_deltas(&count_deltas); 791 + drop(reservation); 780 792 781 - Ok::<_, miette::Report>(Some((state, delta, added_blocks, count))) 793 + Ok::<_, miette::Report>(Some(count)) 782 794 }) 783 795 .await 784 796 .into_diagnostic()?? ··· 792 804 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?; 793 805 let metadata = crate::db::deser_repo_meta(metadata_bytes.as_ref())?; 794 806 795 - let Some((_state, records_cnt_delta, added_blocks, count)) = result else { 807 + let Some(count) = result else { 796 808 // signal mode: no signal-matching records found, clean up the optimistically-added repo 797 809 let did_key = keys::repo_key(did); 798 810 let backfill_pending_key = keys::pending_key(metadata.index_id); 799 811 let app_state = app_state.clone(); 800 812 tokio::task::spawn_blocking(move || { 801 813 let mut batch = app_state.db.inner.batch(); 814 + let mut count_deltas = CountDeltas::default(); 802 815 batch.remove(&app_state.db.repos, &did_key); 803 816 batch.remove(&app_state.db.repo_metadata, &metadata_key); 804 817 batch.remove(&app_state.db.pending, backfill_pending_key); 805 - batch.commit().into_diagnostic() 818 + count_deltas.add("repos", -1); 819 + count_deltas.add("pending", -1); 820 + let reservation = app_state.db.stage_count_deltas(&mut batch, &count_deltas); 821 + batch.commit().into_diagnostic().inspect(|_| { 822 + app_state.db.apply_count_deltas(&count_deltas); 823 + drop(reservation); 824 + }) 806 825 }) 807 826 .await 808 827 .into_diagnostic()??; ··· 810 829 }; 811 830 812 831 trace!(ops = count, elapsed = %start.elapsed().as_secs_f32(), "did ops"); 813 - 814 - // do the counts 815 - if records_cnt_delta != 0 { 816 - app_state 817 - .db 818 - .update_count_async("records", records_cnt_delta) 819 - .await; 820 - } 821 - if added_blocks > 0 { 822 - app_state 823 - .db 824 - .update_count_async("blocks", added_blocks) 825 - .await; 826 - } 827 832 trace!( 828 833 elapsed = %start.elapsed().as_secs_f32(), 829 834 "committed backfill batch"
+19 -4
src/control/mod.rs
··· 333 333 true 334 334 }); 335 335 336 - if let Err(e) = db::persist_counts(&state.db) { 337 - error!(err = %e, "failed to persist counts"); 338 - db::check_poisoned_report(&e); 339 - } 336 + let checkpoint_watermark = match state.db.checkpoint_count_deltas() { 337 + Ok(watermark) => watermark, 338 + Err(e) => { 339 + error!(err = %e, "failed to checkpoint count deltas"); 340 + db::check_poisoned_report(&e); 341 + None 342 + } 343 + }; 340 344 341 345 if let Err(e) = state.db.persist() { 342 346 error!(err = %e, "db persist failed"); 343 347 db::check_poisoned_report(&e); 348 + } else { 349 + let watermark = checkpoint_watermark 350 + .map(Ok) 351 + .unwrap_or_else(|| db::load_count_delta_watermark(&state.db)); 352 + match watermark { 353 + Ok(watermark) => state.db.mark_count_checkpoint_persisted(watermark), 354 + Err(e) => { 355 + error!(err = %e, "failed to load durable count checkpoint watermark"); 356 + db::check_poisoned_report(&e); 357 + } 358 + } 344 359 } 345 360 } 346 361 });
+27 -35
src/control/repos/indexer.rs
··· 2 2 use rand::Rng; 3 3 4 4 use super::*; 5 + use crate::db::CountDeltas; 5 6 6 7 impl ReposControl { 7 8 /// iterates through pending repositories, returning their state. ··· 104 105 db: &Db, 105 106 did: &Did<'_>, 106 107 batch: &mut fjall::OwnedWriteBatch, 107 - transitions: &mut Vec<(GaugeState, GaugeState)>, 108 + count_deltas: &mut CountDeltas, 108 109 ) -> Result<bool> { 109 110 let did_key = keys::repo_key(did); 110 111 let metadata_key = keys::repo_metadata_key(did); ··· 144 145 &metadata_key, 145 146 crate::db::ser_repo_meta(&metadata)?, 146 147 ); 147 - transitions.push((old, GaugeState::Pending)); 148 + count_deltas.add_gauge_diff(&old, &GaugeState::Pending); 148 149 return Ok(true); 149 150 } 150 151 } ··· 167 168 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect(); 168 169 let state = self.0.clone(); 169 170 170 - let (queued, transitions) = tokio::task::spawn_blocking(move || { 171 + let queued = tokio::task::spawn_blocking(move || { 171 172 let db = &state.db; 172 173 let mut batch = db.inner.batch(); 173 174 let mut queued: Vec<Did<'static>> = Vec::new(); 174 - let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new(); 175 + let mut count_deltas = CountDeltas::default(); 175 176 176 177 for did in dids { 177 - if Self::_resync(db, &did, &mut batch, &mut transitions)? { 178 + if Self::_resync(db, &did, &mut batch, &mut count_deltas)? { 178 179 queued.push(did); 179 180 } 180 181 } 181 182 183 + let reservation = db.stage_count_deltas(&mut batch, &count_deltas); 182 184 batch.commit().into_diagnostic()?; 185 + db.apply_count_deltas(&count_deltas); 186 + drop(reservation); 183 187 state.db.persist()?; 184 - Ok::<_, miette::Report>((queued, transitions)) 188 + Ok::<_, miette::Report>(queued) 185 189 }) 186 190 .await 187 191 .into_diagnostic()??; 188 - 189 - for (old, new) in transitions { 190 - self.0.db.update_gauge_diff_async(&old, &new).await; 191 - } 192 192 if !queued.is_empty() { 193 193 self.0.notify_backfill(); 194 194 } ··· 208 208 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect(); 209 209 let state = self.0.clone(); 210 210 211 - let (new_count, queued, transitions) = tokio::task::spawn_blocking(move || { 211 + let queued = tokio::task::spawn_blocking(move || { 212 212 let db = &state.db; 213 213 let mut batch = db.inner.batch(); 214 - let mut added = 0i64; 215 214 let mut queued: Vec<Did<'static>> = Vec::new(); 216 - let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new(); 215 + let mut count_deltas = CountDeltas::default(); 217 216 218 217 for did in dids { 219 218 let did_key = keys::repo_key(&did); ··· 225 224 .transpose()?; 226 225 227 226 if let Some(metadata) = existing_metadata { 228 - if !metadata.tracked && Self::_resync(db, &did, &mut batch, &mut transitions)? { 227 + if !metadata.tracked && Self::_resync(db, &did, &mut batch, &mut count_deltas)? 228 + { 229 229 queued.push(did); 230 230 } 231 231 } else { ··· 238 238 crate::db::ser_repo_meta(&metadata)?, 239 239 ); 240 240 batch.insert(&db.pending, keys::pending_key(metadata.index_id), &did_key); 241 - added += 1; 241 + count_deltas.add("repos", 1); 242 + count_deltas.add_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 242 243 queued.push(did); 243 - transitions.push((GaugeState::Synced, GaugeState::Pending)); 244 244 } 245 245 } 246 246 247 + let reservation = db.stage_count_deltas(&mut batch, &count_deltas); 247 248 batch.commit().into_diagnostic()?; 249 + db.apply_count_deltas(&count_deltas); 250 + drop(reservation); 248 251 state.db.persist()?; 249 - Ok::<_, miette::Report>((added, queued, transitions)) 252 + Ok::<_, miette::Report>(queued) 250 253 }) 251 254 .await 252 255 .into_diagnostic()??; 253 - 254 - if new_count > 0 { 255 - self.0.db.update_count_async("repos", new_count).await; 256 - } 257 - for (old, new) in transitions { 258 - self.0.db.update_gauge_diff_async(&old, &new).await; 259 - } 260 256 self.0.notify_backfill(); 261 257 Ok(queued) 262 258 } ··· 271 267 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect(); 272 268 let state = self.0.clone(); 273 269 274 - let (untracked, gauge_decrements) = tokio::task::spawn_blocking(move || { 270 + let untracked = tokio::task::spawn_blocking(move || { 275 271 let db = &state.db; 276 272 let mut batch = db.inner.batch(); 277 273 let mut untracked: Vec<Did<'static>> = Vec::new(); 278 - let mut gauge_decrements = Vec::new(); 274 + let mut count_deltas = CountDeltas::default(); 279 275 280 276 for did in dids { 281 277 let did_key = keys::repo_key(&did); ··· 306 302 batch.remove(&db.pending, keys::pending_key(metadata.index_id)); 307 303 batch.remove(&db.resync, &did_key); 308 304 if old != GaugeState::Synced { 309 - gauge_decrements.push(old); 305 + count_deltas.add_gauge_diff(&old, &GaugeState::Synced); 310 306 } 311 307 untracked.push(did); 312 308 } ··· 314 310 } 315 311 } 316 312 313 + let reservation = db.stage_count_deltas(&mut batch, &count_deltas); 317 314 batch.commit().into_diagnostic()?; 315 + db.apply_count_deltas(&count_deltas); 316 + drop(reservation); 318 317 state.db.persist()?; 319 - Ok::<_, miette::Report>((untracked, gauge_decrements)) 318 + Ok::<_, miette::Report>(untracked) 320 319 }) 321 320 .await 322 321 .into_diagnostic()??; 323 - 324 - for gauge in gauge_decrements { 325 - self.0 326 - .db 327 - .update_gauge_diff_async(&gauge, &GaugeState::Synced) 328 - .await; 329 - } 330 322 Ok(untracked) 331 323 } 332 324 }
+19 -2
src/control/seed.rs
··· 9 9 use url::Url; 10 10 11 11 use super::firehose::FirehoseHandle; 12 - use crate::db::{self, keys}; 12 + use crate::db::{self, CountDeltas, keys}; 13 13 use crate::state::AppState; 14 14 15 15 const MAX_CONCURRENT_SEEDS: usize = 4; ··· 137 137 let count_key = keys::pds_account_count_key(host.hostname.as_ref()); 138 138 let current = state.db.get_count(&count_key).await; 139 139 if current == 0 { 140 - state.db.update_count_async(&count_key, count).await; 140 + let state = state.clone(); 141 + let count_key = count_key.clone(); 142 + let result = tokio::task::spawn_blocking(move || -> miette::Result<()> { 143 + let mut batch = state.db.inner.batch(); 144 + let mut count_deltas = CountDeltas::default(); 145 + count_deltas.add(&count_key, count); 146 + let reservation = state.db.stage_count_deltas(&mut batch, &count_deltas); 147 + batch.commit().into_diagnostic()?; 148 + state.db.apply_count_deltas(&count_deltas); 149 + drop(reservation); 150 + Ok(()) 151 + }) 152 + .await 153 + .into_diagnostic() 154 + .flatten(); 155 + if let Err(e) = result { 156 + warn!(hostname = %host.hostname, err = %e, "failed to seed host account count"); 157 + } 141 158 } 142 159 } 143 160
+8 -9
src/crawler/worker.rs
··· 1 - use crate::db::{keys, ser_repo_state}; 1 + use crate::db::{CountDeltas, keys, ser_repo_state}; 2 2 use crate::state::AppState; 3 3 use crate::types::{RepoMetadata, RepoState}; 4 4 use miette::{IntoDiagnostic, Result}; ··· 144 144 let mut rng: SmallRng = rand::make_rng(); 145 145 let mut batch = app_state.db.inner.batch(); 146 146 let mut surviving = Vec::new(); 147 + let mut count_deltas = CountDeltas::default(); 147 148 for guard in guards { 148 149 let did_key = keys::repo_key(&*guard); 149 150 let metadata_key = keys::repo_metadata_key(&*guard); ··· 172 173 // clear any stale retry entry, this DID is confirmed and being enqueued 173 174 batch.remove(&app_state.db.crawler, keys::crawler_retry_key(&*guard)); 174 175 trace!(did = %*guard, "enqueuing repo"); 176 + count_deltas.add("repos", 1); 177 + #[cfg(feature = "indexer")] 178 + count_deltas.add("pending", 1); 175 179 surviving.push(guard); 176 180 } 177 181 if let Some(cursor) = cursor_update { 178 182 batch.insert(&app_state.db.cursors, cursor.key, cursor.value); 179 183 } 184 + let reservation = app_state.db.stage_count_deltas(&mut batch, &count_deltas); 180 185 // todo: repo state overwrites here are acceptable? 181 186 batch.commit().into_diagnostic()?; 187 + app_state.db.apply_count_deltas(&count_deltas); 188 + drop(reservation); 182 189 Ok(surviving) 183 190 }), 184 191 ) ··· 197 204 198 205 if count > 0 { 199 206 self.stats.record_processed(count); 200 - self.state 201 - .db 202 - .update_count_async("repos", count as i64) 203 - .await; 204 - self.state 205 - .db 206 - .update_count_async("pending", count as i64) 207 - .await; 208 207 #[cfg(feature = "indexer")] 209 208 self.state.notify_backfill(); 210 209 }
+35 -29
src/db/compaction.rs
··· 1 + use crate::db::keys; 1 2 use fjall::compaction::filter::Context; 2 3 use lsm_tree::compaction::{CompactionFilter, Factory}; 3 4 use lsm_tree::compaction::{ItemAccessor, Verdict}; 5 + use std::sync::Arc; 6 + use std::sync::atomic::{AtomicU64, Ordering}; 4 7 5 - mod drop_prefix { 6 - use super::*; 8 + pub struct CountsGcFilterFactory { 9 + pub drop_collection_counts: bool, 10 + pub delta_gc_watermark: Arc<AtomicU64>, 11 + } 7 12 8 - pub struct DropPrefixFilter { 9 - prefix: &'static [u8], 10 - } 13 + struct CountsGcFilter { 14 + drop_collection_counts: bool, 15 + delta_gc_watermark: Arc<AtomicU64>, 16 + } 11 17 12 - impl CompactionFilter for DropPrefixFilter { 13 - fn filter_item( 14 - &mut self, 15 - item: ItemAccessor<'_>, 16 - _: &Context, 17 - ) -> lsm_tree::Result<Verdict> { 18 - Ok(item 19 - .key() 20 - .starts_with(&self.prefix) 21 - .then_some(Verdict::Destroy) 22 - .unwrap_or(Verdict::Keep)) 18 + impl CompactionFilter for CountsGcFilter { 19 + fn filter_item(&mut self, item: ItemAccessor<'_>, _: &Context) -> lsm_tree::Result<Verdict> { 20 + let key = item.key(); 21 + 22 + if self.drop_collection_counts && key.starts_with(keys::COUNT_COLLECTION_PREFIX) { 23 + return Ok(Verdict::Remove); 23 24 } 24 - } 25 25 26 - pub struct DropPrefixFilterFactory { 27 - pub prefix: &'static [u8], 26 + if key.starts_with(keys::COUNT_DELTA_PREFIX) 27 + && let Ok((id, _)) = keys::parse_count_delta_key(key) 28 + && id <= self.delta_gc_watermark.load(Ordering::Relaxed) 29 + { 30 + return Ok(Verdict::Destroy); 31 + } 32 + 33 + Ok(Verdict::Keep) 28 34 } 35 + } 29 36 30 - impl Factory for DropPrefixFilterFactory { 31 - fn name(&self) -> &str { 32 - "drop_prefix" 33 - } 37 + impl Factory for CountsGcFilterFactory { 38 + fn name(&self) -> &str { 39 + "counts_gc" 40 + } 34 41 35 - fn make_filter(&self, _: &Context) -> Box<dyn CompactionFilter> { 36 - Box::new(DropPrefixFilter { 37 - prefix: self.prefix, 38 - }) 39 - } 42 + fn make_filter(&self, _: &Context) -> Box<dyn CompactionFilter> { 43 + Box::new(CountsGcFilter { 44 + drop_collection_counts: self.drop_collection_counts, 45 + delta_gc_watermark: self.delta_gc_watermark.clone(), 46 + }) 40 47 } 41 48 } 42 - pub use drop_prefix::*;
-8
src/db/indexer.rs
··· 9 9 use crate::types::RepoState; 10 10 11 11 impl Db { 12 - pub(crate) fn update_gauge_diff(&self, old: &GaugeState, new: &GaugeState) { 13 - update_gauge_diff_impl!(self, old, new, update_count); 14 - } 15 - 16 - pub(crate) async fn update_gauge_diff_async(&self, old: &GaugeState, new: &GaugeState) { 17 - update_gauge_diff_impl!(self, old, new, update_count_async, await); 18 - } 19 - 20 12 pub(crate) fn update_repo_state<F, T>( 21 13 batch: &mut OwnedWriteBatch, 22 14 repos: &Keyspace,
+42
src/db/keys/mod.rs
··· 67 67 key 68 68 } 69 69 70 + pub const COUNT_DELTA_PREFIX: &[u8] = &[b'd', SEP]; 71 + 72 + pub fn count_delta_key(id: u64, name: &str) -> Vec<u8> { 73 + let mut key = Vec::with_capacity(COUNT_DELTA_PREFIX.len() + 8 + 1 + name.len()); 74 + key.extend_from_slice(COUNT_DELTA_PREFIX); 75 + key.extend_from_slice(&id.to_be_bytes()); 76 + key.push(SEP); 77 + key.extend_from_slice(name.as_bytes()); 78 + key 79 + } 80 + 81 + pub fn count_delta_start_key(id: u64) -> Vec<u8> { 82 + let mut key = Vec::with_capacity(COUNT_DELTA_PREFIX.len() + 8); 83 + key.extend_from_slice(COUNT_DELTA_PREFIX); 84 + key.extend_from_slice(&id.to_be_bytes()); 85 + key 86 + } 87 + 88 + pub fn parse_count_delta_key(key: &[u8]) -> miette::Result<(u64, &str)> { 89 + let min_len = COUNT_DELTA_PREFIX.len() + 8 + 1; 90 + if key.len() < min_len || !key.starts_with(COUNT_DELTA_PREFIX) { 91 + miette::bail!("invalid count delta key"); 92 + } 93 + 94 + let id_start = COUNT_DELTA_PREFIX.len(); 95 + let id_end = id_start + 8; 96 + let id = u64::from_be_bytes( 97 + key[id_start..id_end] 98 + .try_into() 99 + .map_err(|e| miette::miette!("invalid count delta key id: {e}"))?, 100 + ); 101 + if key[id_end] != SEP { 102 + miette::bail!("invalid count delta key separator"); 103 + } 104 + 105 + let name = std::str::from_utf8(&key[id_end + 1..]) 106 + .map_err(|e| miette::miette!("invalid count delta key name: {e}"))?; 107 + Ok((id, name)) 108 + } 109 + 110 + pub const COUNT_DELTA_WATERMARK_KEY: &[u8] = b"w|count_delta_watermark"; 111 + 70 112 pub const COUNT_COLLECTION_PREFIX: &[u8] = &[b'r', SEP]; 71 113 72 114 pub fn did_collection_prefix(did: &Did) -> Vec<u8> {
+418 -78
src/db/mod.rs
··· 1 1 use crate::config::Compression; 2 - use crate::db::compaction::DropPrefixFilterFactory; 2 + use crate::db::compaction::CountsGcFilterFactory; 3 3 use crate::types::{RepoMetadata, RepoState}; 4 4 5 5 #[cfg(feature = "indexer_stream")] ··· 17 17 use smol_str::SmolStr; 18 18 19 19 use std::cell::RefCell; 20 - use std::collections::HashSet; 21 - use std::sync::Arc; 20 + use std::collections::{BTreeMap, BTreeSet, HashSet}; 21 + use std::sync::atomic::{AtomicU64, Ordering}; 22 + use std::sync::{Arc, Mutex}; 22 23 use url::Url; 23 24 24 25 pub mod compaction; ··· 32 33 use tracing::error; 33 34 34 35 #[cfg(any(feature = "indexer_stream", feature = "relay"))] 35 - use {std::sync::atomic::AtomicU64, tokio::sync::broadcast}; 36 + use tokio::sync::broadcast; 36 37 37 38 fn default_opts() -> KeyspaceCreateOptions { 38 39 KeyspaceCreateOptions::default() ··· 72 73 #[cfg(feature = "relay")] 73 74 pub(crate) relay_broadcast_tx: broadcast::Sender<RelayBroadcast>, 74 75 pub counts_map: HashMap<SmolStr, u64>, 76 + next_count_delta_id: Arc<AtomicU64>, 77 + count_delta_checkpoint_watermark: Arc<AtomicU64>, 78 + count_delta_gc_watermark: Arc<AtomicU64>, 79 + count_delta_in_flight: Arc<Mutex<BTreeSet<u64>>>, 75 80 } 76 81 77 - #[cfg(feature = "indexer")] 78 - macro_rules! update_gauge_diff_impl { 79 - ($self:ident, $old:ident, $new:ident, $update_method:ident $(, $await:tt)?) => {{ 82 + #[derive(Debug, Clone, Default)] 83 + pub struct CountDeltas { 84 + deltas: BTreeMap<SmolStr, i64>, 85 + } 86 + 87 + impl CountDeltas { 88 + pub(crate) fn add(&mut self, key: &str, delta: i64) { 89 + if delta == 0 { 90 + return; 91 + } 92 + 93 + let entry = self.deltas.entry(SmolStr::new(key)).or_insert(0); 94 + *entry += delta; 95 + if *entry == 0 { 96 + self.deltas.remove(key); 97 + } 98 + } 99 + 100 + #[cfg(feature = "indexer")] 101 + pub(crate) fn add_gauge_diff( 102 + &mut self, 103 + old: &crate::types::GaugeState, 104 + new: &crate::types::GaugeState, 105 + ) { 80 106 use crate::types::GaugeState; 81 107 82 - if $old == $new { 108 + if old == new { 83 109 return; 84 110 } 85 111 86 - // pending 87 - match ($old, $new) { 112 + match (old, new) { 88 113 (GaugeState::Pending, GaugeState::Pending) => {} 89 - (GaugeState::Pending, _) => {$self.$update_method("pending", -1) $(.$await)?;}, 90 - (_, GaugeState::Pending) => {$self.$update_method("pending", 1) $(.$await)?;}, 114 + (GaugeState::Pending, _) => self.add("pending", -1), 115 + (_, GaugeState::Pending) => self.add("pending", 1), 91 116 _ => {} 92 117 } 93 118 94 - // resync 95 - let old_resync = $old.is_resync(); 96 - let new_resync = $new.is_resync(); 97 - match (old_resync, new_resync) { 98 - (true, false) => {$self.$update_method("resync", -1) $(.$await)?;}, 99 - (false, true) => {$self.$update_method("resync", 1) $(.$await)?;}, 119 + match (old.is_resync(), new.is_resync()) { 120 + (true, false) => self.add("resync", -1), 121 + (false, true) => self.add("resync", 1), 100 122 _ => {} 101 123 } 102 124 103 - // error kinds 104 - if let GaugeState::Resync(Some(kind)) = $old { 105 - let key = match kind { 106 - crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 107 - crate::types::ResyncErrorKind::Transport => "error_transport", 108 - crate::types::ResyncErrorKind::Generic => "error_generic", 109 - }; 110 - $self.$update_method(key, -1) $(.$await)?; 125 + if let GaugeState::Resync(Some(kind)) = old { 126 + self.add( 127 + match kind { 128 + crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 129 + crate::types::ResyncErrorKind::Transport => "error_transport", 130 + crate::types::ResyncErrorKind::Generic => "error_generic", 131 + }, 132 + -1, 133 + ); 111 134 } 112 135 113 - if let GaugeState::Resync(Some(kind)) = $new { 114 - let key = match kind { 115 - crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 116 - crate::types::ResyncErrorKind::Transport => "error_transport", 117 - crate::types::ResyncErrorKind::Generic => "error_generic", 118 - }; 119 - $self.$update_method(key, 1) $(.$await)?; 136 + if let GaugeState::Resync(Some(kind)) = new { 137 + self.add( 138 + match kind { 139 + crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 140 + crate::types::ResyncErrorKind::Transport => "error_transport", 141 + crate::types::ResyncErrorKind::Generic => "error_generic", 142 + }, 143 + 1, 144 + ); 120 145 } 121 - }}; 146 + } 147 + 148 + pub(crate) fn is_empty(&self) -> bool { 149 + self.deltas.is_empty() 150 + } 151 + 152 + pub(crate) fn len(&self) -> usize { 153 + self.deltas.len() 154 + } 155 + 156 + pub(crate) fn get(&self, key: &str) -> i64 { 157 + self.deltas.get(key).copied().unwrap_or(0) 158 + } 159 + 160 + pub(crate) fn projected_count(&self, db: &Db, key: &str) -> u64 { 161 + apply_count_delta(db.get_count_sync(key), self.get(key)) 162 + } 163 + 164 + pub(crate) fn iter(&self) -> impl Iterator<Item = (&SmolStr, &i64)> { 165 + self.deltas.iter() 166 + } 167 + } 168 + 169 + pub(crate) struct CountDeltaReservation { 170 + in_flight: Arc<Mutex<BTreeSet<u64>>>, 171 + start_id: u64, 172 + } 173 + 174 + impl Drop for CountDeltaReservation { 175 + fn drop(&mut self) { 176 + let Ok(mut in_flight) = self.in_flight.lock() else { 177 + error!( 178 + start_id = self.start_id, 179 + "count delta reservations poisoned" 180 + ); 181 + return; 182 + }; 183 + in_flight.remove(&self.start_id); 184 + } 122 185 } 123 186 124 187 const fn kb(v: u32) -> u32 { ··· 128 191 v * 1024 * 1024 129 192 } 130 193 194 + fn apply_count_delta(current: u64, delta: i64) -> u64 { 195 + if delta >= 0 { 196 + current.saturating_add(delta as u64) 197 + } else { 198 + current.saturating_sub(delta.unsigned_abs()) 199 + } 200 + } 201 + 131 202 impl Db { 132 203 pub fn open(cfg: &crate::config::Config) -> Result<Self> { 204 + let count_delta_gc_watermark = Arc::new(AtomicU64::new(0)); 133 205 let db = Database::builder(&cfg.database_path) 134 206 .cache_size(cfg.cache_size * 2_u64.pow(20) / 2) 135 207 .manual_journal_persist(true) ··· 142 214 .max_journaling_size(mb(cfg.db_max_journaling_size_mb)) 143 215 .with_compaction_filter_factories({ 144 216 let ephemeral = cfg.ephemeral; 217 + let count_delta_gc_watermark = count_delta_gc_watermark.clone(); 145 218 let f = move |ks: &str| match ks { 146 - "counts" => ephemeral.then(|| -> Arc<dyn Factory> { 147 - Arc::new(DropPrefixFilterFactory { 148 - prefix: keys::COUNT_COLLECTION_PREFIX, 149 - }) 150 - }), 219 + "counts" => Some(Arc::new(CountsGcFilterFactory { 220 + drop_collection_counts: ephemeral, 221 + delta_gc_watermark: count_delta_gc_watermark.clone(), 222 + }) as Arc<dyn Factory>), 151 223 _ => None, 152 224 }; 153 225 Arc::new(f) ··· 474 546 next_relay_seq: Arc::new(AtomicU64::new(0)), 475 547 #[cfg(feature = "relay")] 476 548 relay_broadcast_tx, 549 + next_count_delta_id: Arc::new(AtomicU64::new(0)), 550 + count_delta_checkpoint_watermark: Arc::new(AtomicU64::new(0)), 551 + count_delta_gc_watermark, 552 + count_delta_in_flight: Arc::new(Mutex::new(BTreeSet::new())), 477 553 }; 478 554 479 555 migration::run(&this)?; ··· 517 593 let name = std::str::from_utf8(&k[keys::COUNT_KS_PREFIX.len()..]) 518 594 .into_diagnostic() 519 595 .wrap_err("expected valid utf8 for ks count key")?; 520 - let _ = this.counts_map.insert_sync( 521 - SmolStr::new(name), 522 - u64::from_be_bytes(v.as_ref().try_into().unwrap()), 523 - ); 596 + let _ = this 597 + .counts_map 598 + .insert_sync(SmolStr::new(name), read_u64_counter(&v)?); 524 599 } 600 + 601 + let durable_watermark = load_count_delta_watermark(&this)?; 602 + replay_count_deltas(&this, durable_watermark)?; 603 + this.count_delta_checkpoint_watermark 604 + .store(durable_watermark, Ordering::Relaxed); 605 + this.count_delta_gc_watermark 606 + .store(durable_watermark, Ordering::Relaxed); 607 + 608 + let next_count_delta_id = this 609 + .counts 610 + .prefix(keys::COUNT_DELTA_PREFIX) 611 + .next_back() 612 + .map(|guard| -> Result<u64> { 613 + let key = guard.key().into_diagnostic()?; 614 + let (id, _) = keys::parse_count_delta_key(&key)?; 615 + Ok(id + 1) 616 + }) 617 + .transpose()? 618 + .unwrap_or(durable_watermark.saturating_add(1)); 619 + this.next_count_delta_id 620 + .store(next_count_delta_id, Ordering::Relaxed); 525 621 526 622 Ok(this) 527 623 } ··· 699 795 .into_diagnostic()? 700 796 } 701 797 798 + pub(crate) fn stage_count_deltas( 799 + &self, 800 + batch: &mut OwnedWriteBatch, 801 + deltas: &CountDeltas, 802 + ) -> Option<CountDeltaReservation> { 803 + if deltas.is_empty() { 804 + return None; 805 + } 806 + 807 + let start_id = self 808 + .next_count_delta_id 809 + .fetch_add(deltas.len() as u64, Ordering::SeqCst); 810 + self.count_delta_in_flight 811 + .lock() 812 + .expect("count delta reservations poisoned") 813 + .insert(start_id); 814 + 815 + for (offset, (key, delta)) in deltas.iter().enumerate() { 816 + batch.insert( 817 + &self.counts, 818 + keys::count_delta_key(start_id + offset as u64, key), 819 + delta.to_be_bytes(), 820 + ); 821 + } 822 + 823 + Some(CountDeltaReservation { 824 + in_flight: self.count_delta_in_flight.clone(), 825 + start_id, 826 + }) 827 + } 828 + 829 + pub(crate) fn apply_count_deltas(&self, deltas: &CountDeltas) { 830 + for (key, delta) in deltas.iter() { 831 + self.update_count(key, *delta); 832 + } 833 + } 834 + 835 + pub fn checkpoint_count_deltas(&self) -> Result<Option<u64>> { 836 + let start = self 837 + .count_delta_checkpoint_watermark 838 + .load(Ordering::SeqCst) 839 + .saturating_add(1); 840 + 841 + let mut end = self 842 + .next_count_delta_id 843 + .load(Ordering::SeqCst) 844 + .saturating_sub(1); 845 + 846 + let lowest_in_flight = self 847 + .count_delta_in_flight 848 + .lock() 849 + .expect("count delta reservations poisoned") 850 + .first() 851 + .copied(); 852 + if let Some(lowest_in_flight) = lowest_in_flight { 853 + end = end.min(lowest_in_flight.saturating_sub(1)); 854 + } 855 + 856 + if end < start { 857 + return Ok(None); 858 + } 859 + 860 + let aggregated = load_count_delta_range(self, start, end)?; 861 + let mut batch = self.inner.batch(); 862 + 863 + for (name, delta) in aggregated { 864 + let current = get_persisted_ks_count(self, &name)?; 865 + set_ks_count(&mut batch, self, &name, apply_count_delta(current, delta)); 866 + } 867 + 868 + set_count_delta_watermark(&mut batch, self, end); 869 + batch.commit().into_diagnostic()?; 870 + self.count_delta_checkpoint_watermark 871 + .store(end, Ordering::SeqCst); 872 + 873 + Ok(Some(end)) 874 + } 875 + 876 + pub fn mark_count_checkpoint_persisted(&self, watermark: u64) { 877 + self.count_delta_gc_watermark 878 + .store(watermark, Ordering::SeqCst); 879 + } 880 + 702 881 pub fn update_count(&self, key: &str, delta: i64) -> u64 { 703 882 let mut entry = self.counts_map.entry_sync(SmolStr::new(key)).or_insert(0); 704 - if delta >= 0 { 705 - *entry = entry.saturating_add(delta as u64); 883 + if delta < 0 && *entry < delta.unsigned_abs() { 884 + error!( 885 + key, 886 + current = *entry, 887 + decrement = delta.unsigned_abs(), 888 + "count underflow !!! this is a bug" 889 + ); 890 + *entry = 0; 706 891 } else { 707 - let decrement = delta.unsigned_abs(); 708 - if *entry < decrement { 709 - error!( 710 - key, 711 - current = *entry, 712 - decrement, 713 - "count underflow !!! this is a bug" 714 - ); 715 - *entry = 0; 716 - } else { 717 - *entry -= decrement; 718 - } 892 + *entry = apply_count_delta(*entry, delta); 719 893 } 720 894 *entry 721 895 } ··· 726 900 .entry_async(SmolStr::new(key)) 727 901 .await 728 902 .or_insert(0); 729 - if delta >= 0 { 730 - *entry = entry.saturating_add(delta as u64); 903 + if delta < 0 && *entry < delta.unsigned_abs() { 904 + error!( 905 + key, 906 + current = *entry, 907 + decrement = delta.unsigned_abs(), 908 + "count underflow !!! this is a bug" 909 + ); 910 + *entry = 0; 731 911 } else { 732 - let decrement = delta.unsigned_abs(); 733 - if *entry < decrement { 734 - error!( 735 - key, 736 - current = *entry, 737 - decrement, 738 - "count underflow !!! this is a bug" 739 - ); 740 - *entry = 0; 741 - } else { 742 - *entry -= decrement; 743 - } 912 + *entry = apply_count_delta(*entry, delta); 744 913 } 745 914 } 746 915 ··· 827 996 batch.insert(&db.counts, key, count.to_be_bytes()); 828 997 } 829 998 830 - pub fn persist_counts(db: &Db) -> Result<()> { 831 - let mut batch = db.inner.batch(); 832 - db.counts_map.iter_sync(|k, v| { 833 - set_ks_count(&mut batch, db, k, *v); 834 - true 835 - }); 836 - batch.commit().into_diagnostic() 999 + pub fn set_count_delta_watermark(batch: &mut OwnedWriteBatch, db: &Db, watermark: u64) { 1000 + batch.insert( 1001 + &db.counts, 1002 + keys::COUNT_DELTA_WATERMARK_KEY, 1003 + watermark.to_be_bytes(), 1004 + ); 1005 + } 1006 + 1007 + pub fn load_count_delta_watermark(db: &Db) -> Result<u64> { 1008 + db.counts 1009 + .get(keys::COUNT_DELTA_WATERMARK_KEY) 1010 + .into_diagnostic()? 1011 + .map(|value| read_u64_counter(&value)) 1012 + .transpose() 1013 + .map(|watermark| watermark.unwrap_or(0)) 1014 + } 1015 + 1016 + fn read_u64_counter(value: &[u8]) -> Result<u64> { 1017 + value 1018 + .try_into() 1019 + .into_diagnostic() 1020 + .wrap_err("counter value must be 8 bytes") 1021 + .map(u64::from_be_bytes) 1022 + } 1023 + 1024 + fn read_i64_counter_delta(value: &[u8]) -> Result<i64> { 1025 + value 1026 + .try_into() 1027 + .into_diagnostic() 1028 + .wrap_err("counter delta must be 8 bytes") 1029 + .map(i64::from_be_bytes) 1030 + } 1031 + 1032 + fn replay_count_deltas(db: &Db, watermark: u64) -> Result<()> { 1033 + let start = watermark.saturating_add(1); 1034 + for (name, delta) in load_count_delta_range(db, start, u64::MAX)? { 1035 + db.update_count(&name, delta); 1036 + } 1037 + Ok(()) 1038 + } 1039 + 1040 + fn load_count_delta_range(db: &Db, start: u64, end: u64) -> Result<BTreeMap<SmolStr, i64>> { 1041 + let mut aggregated = BTreeMap::new(); 1042 + for guard in db.counts.range(keys::count_delta_start_key(start)..) { 1043 + let (key, value) = guard.into_inner().into_diagnostic()?; 1044 + if !key.starts_with(keys::COUNT_DELTA_PREFIX) { 1045 + break; 1046 + } 1047 + 1048 + let (id, name) = keys::parse_count_delta_key(&key)?; 1049 + if id > end { 1050 + break; 1051 + } 1052 + 1053 + *aggregated.entry(SmolStr::new(name)).or_insert(0) += read_i64_counter_delta(&value)?; 1054 + } 1055 + Ok(aggregated) 1056 + } 1057 + 1058 + fn get_persisted_ks_count(db: &Db, name: &str) -> Result<u64> { 1059 + db.counts 1060 + .get(keys::count_keyspace_key(name)) 1061 + .into_diagnostic()? 1062 + .map(|value| read_u64_counter(&value)) 1063 + .transpose() 1064 + .map(|count| count.unwrap_or(0)) 837 1065 } 838 1066 839 1067 /// load the persisted (day, count) pair for the daily PDS add counter, if present. ··· 886 1114 } 887 1115 Ok(sources) 888 1116 } 1117 + 1118 + #[cfg(test)] 1119 + mod tests { 1120 + use super::*; 1121 + 1122 + fn test_config(path: &std::path::Path) -> crate::config::Config { 1123 + crate::config::Config { 1124 + database_path: path.to_path_buf(), 1125 + ..Default::default() 1126 + } 1127 + } 1128 + 1129 + #[test] 1130 + fn count_deltas_replay_and_checkpoint_across_restart() -> Result<()> { 1131 + let tmp = tempfile::tempdir().into_diagnostic()?; 1132 + let cfg = test_config(tmp.path()); 1133 + 1134 + { 1135 + let db = Db::open(&cfg)?; 1136 + let mut batch = db.inner.batch(); 1137 + set_ks_count(&mut batch, &db, "repos", 10); 1138 + batch.commit().into_diagnostic()?; 1139 + 1140 + let mut batch = db.inner.batch(); 1141 + let mut deltas = CountDeltas::default(); 1142 + deltas.add("repos", 2); 1143 + deltas.add("pending", 1); 1144 + let reservation = db 1145 + .stage_count_deltas(&mut batch, &deltas) 1146 + .expect("count deltas should reserve ids"); 1147 + batch.commit().into_diagnostic()?; 1148 + db.apply_count_deltas(&deltas); 1149 + drop(reservation); 1150 + db.persist()?; 1151 + } 1152 + 1153 + { 1154 + let db = Db::open(&cfg)?; 1155 + assert_eq!(db.get_count_sync("repos"), 12); 1156 + assert_eq!(db.get_count_sync("pending"), 1); 1157 + let checkpointed_watermark = db 1158 + .checkpoint_count_deltas()? 1159 + .expect("checkpoint should fold pending deltas"); 1160 + db.persist()?; 1161 + db.mark_count_checkpoint_persisted(checkpointed_watermark); 1162 + assert_eq!(load_count_delta_watermark(&db)?, checkpointed_watermark); 1163 + } 1164 + 1165 + { 1166 + let db = Db::open(&cfg)?; 1167 + assert_eq!(db.get_count_sync("repos"), 12); 1168 + assert_eq!(db.get_count_sync("pending"), 1); 1169 + assert!(load_count_delta_watermark(&db)? >= 1); 1170 + } 1171 + 1172 + Ok(()) 1173 + } 1174 + 1175 + #[test] 1176 + fn checkpoint_skips_inflight_deltas() -> Result<()> { 1177 + let tmp = tempfile::tempdir().into_diagnostic()?; 1178 + let cfg = test_config(tmp.path()); 1179 + let db = Db::open(&cfg)?; 1180 + 1181 + let mut batch = db.inner.batch(); 1182 + let mut deltas = CountDeltas::default(); 1183 + deltas.add("repos", 1); 1184 + let reservation = db 1185 + .stage_count_deltas(&mut batch, &deltas) 1186 + .expect("count deltas should reserve ids"); 1187 + 1188 + assert!(db.checkpoint_count_deltas()?.is_none()); 1189 + 1190 + batch.commit().into_diagnostic()?; 1191 + db.apply_count_deltas(&deltas); 1192 + drop(reservation); 1193 + 1194 + assert!(db.checkpoint_count_deltas()?.is_some()); 1195 + 1196 + Ok(()) 1197 + } 1198 + 1199 + #[test] 1200 + fn checkpointed_count_deltas_are_gcable() -> Result<()> { 1201 + let tmp = tempfile::tempdir().into_diagnostic()?; 1202 + let cfg = test_config(tmp.path()); 1203 + let db = Db::open(&cfg)?; 1204 + 1205 + let mut batch = db.inner.batch(); 1206 + let mut deltas = CountDeltas::default(); 1207 + deltas.add("repos", 1); 1208 + let reservation = db 1209 + .stage_count_deltas(&mut batch, &deltas) 1210 + .expect("count deltas should reserve ids"); 1211 + batch.commit().into_diagnostic()?; 1212 + db.apply_count_deltas(&deltas); 1213 + drop(reservation); 1214 + 1215 + let watermark = db 1216 + .checkpoint_count_deltas()? 1217 + .expect("checkpoint should fold pending deltas"); 1218 + db.persist()?; 1219 + db.mark_count_checkpoint_persisted(watermark); 1220 + 1221 + db.counts.rotate_memtable_and_wait().into_diagnostic()?; 1222 + db.counts.major_compact().into_diagnostic()?; 1223 + 1224 + assert_eq!(db.counts.prefix(keys::COUNT_DELTA_PREFIX).count(), 0); 1225 + 1226 + Ok(()) 1227 + } 1228 + }
+27 -11
src/ingest/indexer.rs
··· 1 1 use super::*; 2 - use crate::db::{self, keys, ser_repo_meta}; 2 + use crate::db::{self, CountDeltas, keys, ser_repo_meta}; 3 3 use crate::ingest::stream::{Account, Commit, Identity}; 4 4 use crate::ingest::validation; 5 5 use crate::resolver::{NoSigningKeyError, ResolverError}; ··· 127 127 batch: OwnedWriteBatch, 128 128 added_blocks: &'a mut i64, 129 129 records_delta: &'a mut i64, 130 + count_deltas: &'a mut CountDeltas, 130 131 #[cfg(feature = "indexer_stream")] 131 132 broadcast_events: &'a mut Vec<BroadcastEvent>, 132 133 } ··· 208 209 209 210 let mut added_blocks = 0; 210 211 let mut records_delta = 0; 212 + let mut count_deltas = CountDeltas::default(); 211 213 212 214 let mut ctx = WorkerContext { 213 215 state: &state, 214 216 batch, 215 217 added_blocks: &mut added_blocks, 216 218 records_delta: &mut records_delta, 219 + count_deltas: &mut count_deltas, 217 220 #[cfg(feature = "indexer_stream")] 218 221 broadcast_events: &mut broadcast_events, 219 222 }; ··· 340 343 ) { 341 344 Ok(RepoProcessResult::Ok(_)) => {} 342 345 Ok(RepoProcessResult::Deleted) => { 343 - state.db.update_count("repos", -1); 346 + ctx.count_deltas.add("repos", -1); 344 347 } 345 348 Ok(RepoProcessResult::NeedsBackfill(Some(commit))) => { 346 349 try_persist(commit); ··· 390 393 } 391 394 } 392 395 393 - if let Err(e) = ctx.batch.commit() { 394 - error!(shard = id, err = %e, "failed to commit batch"); 395 - } 396 - 396 + let mut batch = ctx.batch; 397 397 if added_blocks > 0 { 398 - state.db.update_count("blocks", added_blocks); 398 + count_deltas.add("blocks", added_blocks); 399 399 } 400 400 if records_delta != 0 { 401 - state.db.update_count("records", records_delta); 401 + count_deltas.add("records", records_delta); 402 402 } 403 + let reservation = state.db.stage_count_deltas(&mut batch, &count_deltas); 404 + if let Err(e) = batch.commit() { 405 + error!(shard = id, err = %e, "failed to commit batch"); 406 + drop(reservation); 407 + continue; 408 + } 409 + state.db.apply_count_deltas(&count_deltas); 410 + drop(reservation); 403 411 #[cfg(feature = "indexer_stream")] 404 412 for evt in broadcast_events.drain(..) { 405 413 let _ = state.db.event_tx.send(evt); ··· 540 548 // status update logic is now handled in RelayWorker; 541 549 // FirehoseWorker just needs to update gauges if status changed. 542 550 if changed && was_active { 543 - db.update_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None)); 551 + ctx.count_deltas 552 + .add_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None)); 544 553 } 545 554 } 546 555 } 547 556 } else { 548 557 // if account became active, update gauges 549 558 if !was_active { 550 - db.update_gauge_diff(&GaugeState::Resync(None), &GaugeState::Synced); 559 + ctx.count_deltas 560 + .add_gauge_diff(&GaugeState::Resync(None), &GaugeState::Synced); 551 561 } 552 562 } 553 563 ··· 614 624 ) -> Result<RepoState<'s>, IngestError> { 615 625 let db = &ctx.state.db; 616 626 let mut batch = db.inner.batch(); 627 + let mut count_deltas = CountDeltas::default(); 617 628 let repo_key = keys::repo_key(did); 618 629 let meta_key = keys::repo_metadata_key(did); 619 630 ··· 643 654 metadata.index_id = rand::random::<u64>(); 644 655 batch.insert(&db.pending, keys::pending_key(metadata.index_id), &repo_key); 645 656 batch.insert(&db.repo_metadata, &meta_key, ser_repo_meta(&metadata)?); 657 + if !was_pending { 658 + count_deltas.add_gauge_diff(&old_gauge, &crate::types::GaugeState::Pending); 659 + } 660 + let reservation = db.stage_count_deltas(&mut batch, &count_deltas); 646 661 batch.commit().into_diagnostic()?; 662 + db.apply_count_deltas(&count_deltas); 663 + drop(reservation); 647 664 648 665 if !was_pending { 649 - db.update_gauge_diff(&old_gauge, &crate::types::GaugeState::Pending); 650 666 ctx.state.notify_backfill(); 651 667 } 652 668
+23 -14
src/ingest/relay.rs
··· 19 19 use url::Url; 20 20 21 21 use crate::db::keys::pds_account_count_key; 22 - use crate::db::{self, keys}; 22 + use crate::db::{self, CountDeltas, keys}; 23 23 use crate::ingest::stream::AccountStatus; 24 24 #[cfg(feature = "relay")] 25 25 use crate::ingest::stream::encode_frame; ··· 41 41 state: &'a AppState, 42 42 vctx: ValidationContext<'a>, 43 43 batch: OwnedWriteBatch, 44 + count_deltas: CountDeltas, 44 45 #[cfg(feature = "relay")] 45 46 pending_broadcasts: Vec<RelayBroadcast>, 46 47 #[cfg(feature = "indexer")] ··· 191 192 opts: &validation_opts, 192 193 }, 193 194 batch: state.db.inner.batch(), 195 + count_deltas: CountDeltas::default(), 194 196 #[cfg(feature = "relay")] 195 197 pending_broadcasts: Vec::with_capacity(2), 196 198 #[cfg(feature = "indexer")] ··· 202 204 }; 203 205 204 206 while let Some(msg) = rx.blocking_recv() { 207 + ctx.count_deltas = CountDeltas::default(); 205 208 let (did, seq) = match &msg.msg { 206 209 SubscribeReposMessage::Commit(c) => (c.repo.clone(), c.seq), 207 210 SubscribeReposMessage::Identity(i) => (i.did.clone(), i.seq), ··· 217 220 error!(did = %did, err = %e, "relay shard: error processing message"); 218 221 } 219 222 220 - let res = std::mem::replace(&mut ctx.batch, ctx.state.db.inner.batch()).commit(); 223 + let mut batch = std::mem::replace(&mut ctx.batch, ctx.state.db.inner.batch()); 224 + let reservation = ctx 225 + .state 226 + .db 227 + .stage_count_deltas(&mut batch, &ctx.count_deltas); 228 + let res = batch.commit(); 221 229 if let Err(e) = res { 222 230 error!(shard = id, err = %e, "relay shard: failed to commit batch"); 231 + drop(reservation); 223 232 continue; 224 233 } 234 + ctx.state.db.apply_count_deltas(&ctx.count_deltas); 235 + drop(reservation); 225 236 226 237 #[cfg(feature = "relay")] 227 238 for broadcast in ctx.pending_broadcasts.drain(..) { ··· 525 536 if is_pds { 526 537 if let Some(host) = firehose.host_str() { 527 538 let count_key = pds_account_count_key(host); 528 - let changed = if !was_active && repo_state.active { 529 - Some(ctx.state.db.update_count(&count_key, 1)) 539 + let delta = if !was_active && repo_state.active { 540 + 1 530 541 } else if was_active && !repo_state.active { 531 - Some(ctx.state.db.update_count(&count_key, -1)) 542 + -1 532 543 } else { 533 - None 544 + 0 534 545 }; 535 546 536 - if let Some(count) = changed { 537 - let mut batch_for_status = ctx.state.db.inner.batch(); 547 + if delta != 0 { 548 + ctx.count_deltas.add(&count_key, delta); 549 + let count = ctx.count_deltas.projected_count(&ctx.state.db, &count_key); 538 550 ctx.state 539 - .apply_host_limit_status(&mut batch_for_status, host, count); 540 - if let Err(e) = batch_for_status.commit() { 541 - error!(%host, err = %e, "failed to commit host status update"); 542 - } 551 + .apply_host_limit_status(&mut ctx.batch, host, count); 543 552 } 544 553 } 545 554 } ··· 893 902 )); 894 903 } 895 904 896 - db.update_count("repos", 1); 905 + self.count_deltas.add("repos", 1); 897 906 898 907 // track initial active state for per-PDS rate limiting 899 908 if msg.is_pds && repo_state.active { 900 909 if let Some(host) = msg.firehose.host_str() { 901 - db.update_count(&pds_account_count_key(host), 1); 910 + self.count_deltas.add(&pds_account_count_key(host), 1); 902 911 } 903 912 } 904 913
-43
tests/api.nu
··· 408 408 print "pds tier persistence test passed!" 409 409 } 410 410 411 - # verify that HYDRANT_TRUSTED_HOSTS pre-assigns hosts to the trusted tier at startup. 412 - def test-pds-trusted-hosts [binary: string, db_path: string, port: int] { 413 - print "=== test: HYDRANT_TRUSTED_HOSTS pre-assigns tier at startup ===" 414 - 415 - let url = $"http://localhost:($port)" 416 - let host_a = "alpha.example.com" 417 - let host_b = "beta.example.com" 418 - 419 - let instance = (with-env { 420 - HYDRANT_CRAWLER_URLS: "", 421 - HYDRANT_RELAY_HOSTS: "", 422 - HYDRANT_TRUSTED_HOSTS: $"($host_a),($host_b)" 423 - } { 424 - start-hydrant $binary $db_path $port 425 - }) 426 - if not (wait-for-api $url) { 427 - fail "hydrant did not start" 428 - } 429 - 430 - print " checking pre-assigned trusted hosts..." 431 - let tiers = (http get $"($url)/pds/tiers") 432 - let assignments = $tiers.assignments 433 - 434 - for host in [$host_a, $host_b] { 435 - if not ($host in $assignments) { 436 - fail $"expected assignment for ($host) from HYDRANT_TRUSTED_HOSTS" $instance.pid 437 - } 438 - if ($assignments | get $host) != "trusted" { 439 - fail $"expected tier=trusted for ($host)" $instance.pid 440 - } 441 - } 442 - print $" ok: ($host_a) and ($host_b) pre-assigned to trusted tier" 443 - 444 - kill $instance.pid 445 - print "trusted hosts startup test passed!" 446 - } 447 - 448 411 # verify that a custom tier defined via HYDRANT_RATE_TIERS is visible and assignable. 449 412 def test-pds-custom-rate-tier [binary: string, db_path: string, port: int] { 450 413 print "=== test: custom rate tier via HYDRANT_RATE_TIERS ===" ··· 625 588 let db_pds_persist = (mktemp -d -t hydrant_api.XXXXXX) 626 589 print $"db: ($db_pds_persist)" 627 590 test-pds-tier-persistence $binary $db_pds_persist $port 628 - 629 - sleep 1sec 630 - 631 - let db_pds_trusted = (mktemp -d -t hydrant_api.XXXXXX) 632 - print $"db: ($db_pds_trusted)" 633 - test-pds-trusted-hosts $binary $db_pds_trusted $port 634 591 635 592 sleep 1sec 636 593
+1 -1
tests/run_all.nu
··· 25 25 HYDRANT_API_PORT: $in.api 26 26 HYDRANT_DEBUG_PORT: $in.debug 27 27 HYDRANT_TEST_MOCK_PORT: $in.mock 28 - HYDRANT_BINARY: "target/debug/hydrant" 28 + HYDRANT_BINARY: "target/x86_64-unknown-linux-gnu/debug/hydrant" 29 29 } { 30 30 ^nu $"tests/($in.name).nu" | complete 31 31 })