very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[db] improve ephemeral task, add test for the relay ttl to the gc test

dawn db9caf45 995c1c8b

+167 -5
+59 -2
src/api/debug.rs
··· 42 42 "/debug/ephemeral_ttl_tick", 43 43 post(handle_debug_ephemeral_ttl_tick), 44 44 ) 45 - .route("/debug/seed_watermark", post(handle_debug_seed_watermark)); 45 + .route("/debug/seed_watermark", post(handle_debug_seed_watermark)) 46 + .route("/debug/seed_events", post(handle_debug_seed_events)); 46 47 47 48 #[cfg(feature = "indexer")] 48 49 let r = r.route("/debug/count", get(handle_debug_count)); ··· 182 183 Query(req): Query<DebugIterRequest>, 183 184 ) -> Result<Json<DebugIterResponse>, StatusCode> { 184 185 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 185 - let is_events = req.partition == "events"; 186 + let is_events = req.partition == "events" || req.partition == "relay_events"; 186 187 let partition = req.partition.clone(); 187 188 188 189 let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> { ··· 287 288 "resync" => Ok(db.resync.clone()), 288 289 #[cfg(feature = "indexer_stream")] 289 290 "events" => Ok(db.events.clone()), 291 + #[cfg(feature = "relay")] 292 + "relay_events" => Ok(db.relay_events.clone()), 290 293 #[cfg(feature = "indexer")] 291 294 "records" => Ok(db.records.clone()), 292 295 _ => Err(StatusCode::BAD_REQUEST), ··· 379 382 380 383 Ok(StatusCode::OK) 381 384 } 385 + 386 + #[derive(Deserialize)] 387 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 388 + pub struct DebugSeedEventsRequest { 389 + pub partition: String, 390 + pub count: u64, 391 + } 392 + 393 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 394 + pub async fn handle_debug_seed_events( 395 + State(state): State<Arc<AppState>>, 396 + Query(req): Query<DebugSeedEventsRequest>, 397 + ) -> Result<StatusCode, StatusCode> { 398 + tokio::task::spawn_blocking(move || -> Result<(), StatusCode> { 399 + let mut batch = state.db.inner.batch(); 400 + if req.partition == "events" { 401 + #[cfg(feature = "indexer_stream")] 402 + { 403 + for _ in 0..req.count { 404 + let seq = state 405 + .db 406 + .next_event_id 407 + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); 408 + batch.insert(&state.db.events, crate::db::keys::event_key(seq), b"dummy"); 409 + } 410 + } 411 + } else if req.partition == "relay_events" { 412 + #[cfg(feature = "relay")] 413 + { 414 + for _ in 0..req.count { 415 + let seq = state 416 + .db 417 + .next_relay_seq 418 + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); 419 + batch.insert( 420 + &state.db.relay_events, 421 + crate::db::keys::relay_event_key(seq), 422 + b"dummy", 423 + ); 424 + } 425 + } 426 + } else { 427 + return Err(StatusCode::BAD_REQUEST); 428 + } 429 + batch 430 + .commit() 431 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 432 + Ok(()) 433 + }) 434 + .await 435 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??; 436 + 437 + Ok(StatusCode::OK) 438 + }
+29 -3
src/db/ephemeral.rs
··· 76 76 77 77 // find the watermark entry closest to and <= cutoff_ts 78 78 let cutoff_key = watermark_key(cutoff_ts); 79 + let start: &[u8] = watermark_prefix; 80 + let end: &[u8] = cutoff_key.as_slice(); 81 + 79 82 let cutoff_seq = db 80 83 .cursors 81 - .range(..=cutoff_key.as_slice()) 84 + .range(start..=end) 82 85 .next_back() 83 86 .map(|g| g.into_inner().into_diagnostic()) 84 87 .transpose()? ··· 97 100 return Ok(()); 98 101 }; 99 102 103 + let mut pruned_key = Vec::with_capacity(7 + watermark_prefix.len()); 104 + pruned_key.extend_from_slice(b"pruned|"); 105 + pruned_key.extend_from_slice(watermark_prefix); 106 + 107 + let last_pruned_seq = db 108 + .cursors 109 + .get(&pruned_key) 110 + .into_diagnostic()? 111 + .map(|v| { 112 + v.as_ref() 113 + .try_into() 114 + .into_diagnostic() 115 + .wrap_err("expected last pruned seq to be u64") 116 + }) 117 + .transpose()? 118 + .map(u64::from_be_bytes) 119 + .unwrap_or(0); 120 + 121 + let start_key_events = keys::event_key(last_pruned_seq); 100 122 let cutoff_key_events = keys::event_key(cutoff_seq); 101 123 let mut batch = db.inner.batch(); 102 124 let mut pruned = 0usize; 103 125 104 - for guard in events_ks.range(..cutoff_key_events) { 126 + for guard in events_ks.range(start_key_events..cutoff_key_events) { 105 127 let k = guard.key().into_diagnostic()?; 106 128 batch.remove(events_ks, k); 107 129 pruned += 1; 108 130 } 131 + 132 + batch.insert(&db.cursors, pruned_key, cutoff_seq.to_be_bytes()); 109 133 110 134 // clean up consumed watermark entries (everything up to and including cutoff_ts) 111 - for guard in db.cursors.range(..=cutoff_key) { 135 + let start: &[u8] = watermark_prefix; 136 + let end: &[u8] = cutoff_key.as_slice(); 137 + for guard in db.cursors.range(start..=end) { 112 138 let k = guard.key().into_diagnostic()?; 113 139 if k.starts_with(watermark_prefix) { 114 140 batch.remove(&db.cursors, k);
+10
tests/common.nu
··· 122 122 parse-hydrant-executable $out.stdout 123 123 } 124 124 125 + # build the hydrant binary for relay only (no default features) 126 + export def build-hydrant-relay [] { 127 + if ($env | get --optional HYDRANT_BINARY | is-not-empty) { 128 + return $env.HYDRANT_BINARY 129 + } 130 + print "building hydrant for relay only..." 131 + let out = (^cargo build --no-default-features --features relay --message-format json err> /dev/null | complete) 132 + parse-hydrant-executable $out.stdout 133 + } 134 + 125 135 # start hydrant in the background 126 136 export def start-hydrant [binary: string, db_path: string, port: int] { 127 137 let log_file = $"($db_path)/hydrant.log"
+69
tests/ephemeral_gc.nu
··· 35 35 sleep 2sec 36 36 } 37 37 38 + # start hydrant in relay mode 39 + def run-relay-instance [name: string, port: int, scenario_closure: closure] { 40 + let debug_port = resolve-test-debug-port ($port + 1) 41 + let url = $"http://localhost:($port)" 42 + let debug_url = $"http://localhost:($debug_port)" 43 + let db_path = (mktemp -d -t hydrant_relay_gc_test.XXXXXX) 44 + 45 + print $"--- running scenario: ($name) ---" 46 + print $"database path: ($db_path)" 47 + 48 + let binary = build-hydrant-relay 49 + let instance = (with-env { HYDRANT_RELAY: "true", HYDRANT_EPHEMERAL_TTL: "60min" } { 50 + start-hydrant $binary $db_path $port 51 + }) 52 + 53 + try { 54 + if not (wait-for-api $url) { 55 + error make {msg: "api failed to start"} 56 + } 57 + 58 + do $scenario_closure $url $debug_url 59 + 60 + print $"PASSED: ($name)\n" 61 + } catch { |e| 62 + print $"test failed: ($e.msg)" 63 + try { kill --force $instance.pid } 64 + sleep 2sec 65 + exit 1 66 + } 67 + 68 + try { kill --force $instance.pid } 69 + sleep 2sec 70 + } 71 + 38 72 def trigger-ttl-tick [debug_url: string] { 39 73 print "triggering ephemeral TTL tick..." 40 74 let response = (http post -f -e -H [Content-Length 0] $"($debug_url)/debug/ephemeral_ttl_tick" "") ··· 109 143 error make {msg: $"FAILED: expected 0 events after TTL expiry, got ($remaining_events)"} 110 144 } 111 145 print "all events pruned" 146 + } 147 + 148 + run-relay-instance "Relay mode GC prunes relay_events" ($port + 100) { |url, debug_url| 149 + print "seeding 50 dummy relay events..." 150 + let seed_res = (http post -f -e -H [Content-Length 0] $"($debug_url)/debug/seed_events?partition=relay_events&count=50" "") 151 + if $seed_res.status != 200 { 152 + error make {msg: $"FAILED: seed_events returned ($seed_res.status)"} 153 + } 154 + 155 + let events = (http get -f -e $"($debug_url)/debug/iter?partition=relay_events&limit=1000").body.items 156 + let event_count = ($events | length) 157 + if $event_count != 50 { 158 + error make {msg: $"FAILED: expected 50 events, found ($event_count)"} 159 + } 160 + print $"found ($event_count) relay events" 161 + 162 + let max_event_id = ($events | each { |item| ($item | first | into int) } | math max) 163 + print $"max event id: ($max_event_id)" 164 + 165 + let past_ts = ((date now | into int) / 1_000_000_000 | into int) - 3601 166 + let cutoff_event_id = $max_event_id + 1 167 + print $"seeding watermark at ts=($past_ts) event_id=($cutoff_event_id)" 168 + let seed_response = (http post -f -e -H [Content-Length 0] 169 + $"($debug_url)/debug/seed_watermark?ts=($past_ts)&event_id=($cutoff_event_id)" "") 170 + if $seed_response.status != 200 { 171 + error make {msg: $"FAILED: seed_watermark returned ($seed_response.status)"} 172 + } 173 + 174 + trigger-ttl-tick $debug_url 175 + 176 + let remaining_events = ((http get -f -e $"($debug_url)/debug/iter?partition=relay_events&limit=1000").body.items | length) 177 + if $remaining_events != 0 { 178 + error make {msg: $"FAILED: expected 0 relay events after TTL expiry, got ($remaining_events)"} 179 + } 180 + print "all relay events pruned" 112 181 } 113 182 114 183 print "all ephemeral gc tests passed!"