aturi indexer with listRecords and countRecords endpoints
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

graceful-ish shutdown, drop events immediately

dawn fd819714 c8dab5d7

+114 -63
+1 -1
Cargo.lock
··· 54 54 "serde_json", 55 55 "tapped", 56 56 "tokio", 57 + "tokio-util", 57 58 "tracing", 58 59 "tracing-subscriber", 59 60 ] ··· 1750 1751 "bytes", 1751 1752 "libc", 1752 1753 "mio", 1753 - "parking_lot", 1754 1754 "pin-project-lite", 1755 1755 "signal-hook-registry", 1756 1756 "socket2 0.6.2",
+2 -1
Cargo.toml
··· 5 5 6 6 [dependencies] 7 7 tapped = { git = "https://tangled.sh/ptr.pet/tapped" } 8 - tokio = { version = "1", features = ["full"] } 8 + tokio = { version = "1", features = ["rt-multi-thread", "signal"] } 9 + tokio-util = { version = "0.7" } 9 10 axum = "0.7" 10 11 serde = { version = "1", features = ["derive"] } 11 12 serde_json = "1"
+3
flake.nix
··· 17 17 devShells = { 18 18 default = pkgs.mkShell { 19 19 nativeBuildInputs = with pkgs; [ 20 + rustPlatform.rustLibSrc 20 21 rust-analyzer 21 22 cargo 23 + rustc 24 + rustfmt 22 25 gemini-cli 23 26 go 24 27 cmake
+108 -61
src/main.rs
··· 3 3 routing::get, 4 4 Json, Router, 5 5 }; 6 + use fjall::{Database, Keyspace, KeyspaceCreateOptions}; 6 7 use serde::{Deserialize, Serialize}; 7 - use tracing::{info, error, warn}; 8 - use tapped::{TapClient, Event, RecordAction, RecordEvent}; 9 - use fjall::{Database, Keyspace, KeyspaceCreateOptions}; 10 - use std::sync::{Arc, atomic::{AtomicU64, Ordering}}; 11 - use std::collections::HashMap; 8 + use std::{ 9 + pin, 10 + sync::{ 11 + atomic::{AtomicU64, Ordering}, 12 + Arc, 13 + }, 14 + time::Duration, 15 + }; 16 + use tapped::{Event, RecordAction, RecordEvent, TapClient}; 17 + use tokio::signal::unix::SignalKind; 18 + use tokio_util::sync::CancellationToken; 19 + use tracing::{error, info, warn}; 12 20 13 21 #[global_allocator] 14 22 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; ··· 38 46 let num_consumers = std::env::var("TAP_CONCURRENCY") 39 47 .ok() 40 48 .and_then(|s| s.parse().ok()) 41 - .unwrap_or(100); 49 + .unwrap_or(20); 50 + let closed = CancellationToken::new(); 42 51 43 52 for i in 0..num_consumers { 44 53 let db_clone = state.db.clone(); 45 54 let counts_clone = state.counts.clone(); 46 55 let ops_count_clone = ops_count.clone(); 56 + let closed = closed.child_token(); 47 57 tokio::spawn(async move { 48 58 info!("starting consumer #{}", i); 49 - run_tap_consumer(db_clone, counts_clone, ops_count_clone).await; 59 + run_tap_consumer(db_clone, counts_clone, ops_count_clone, closed).await; 50 60 }); 51 61 } 52 62 ··· 57 67 let mut last_count = 0; 58 68 // The first tick completes immediately 59 69 interval.tick().await; 60 - 70 + 61 71 loop { 62 72 interval.tick().await; 63 73 let current_count = ops_count_stats.load(Ordering::Relaxed); 64 74 let delta = current_count - last_count; 65 75 let ops_sec = delta as f64 / 60.0; 66 - info!("stats: total_ops={} delta_ops={} ops_sec={:.2}", current_count, delta, ops_sec); 76 + info!( 77 + "stats: total_ops={} delta_ops={} ops_sec={:.2}", 78 + current_count, delta, ops_sec 79 + ); 67 80 last_count = current_count; 68 81 } 69 82 }); 70 83 71 84 let app = Router::new() 72 85 .route("/xrpc/systems.gaze.aturlist.listRecords", get(list_records)) 73 - .route("/xrpc/systems.gaze.aturlist.countRecords", get(count_records)) 86 + .route( 87 + "/xrpc/systems.gaze.aturlist.countRecords", 88 + get(count_records), 89 + ) 74 90 .with_state(state); 75 91 76 92 let listener = tokio::net::TcpListener::bind("0.0.0.0:7155").await?; 77 93 info!("listening on {}", listener.local_addr()?); 78 - axum::serve(listener, app).await?; 94 + 95 + let mut _sigterm = tokio::signal::unix::signal(SignalKind::terminate())?; 96 + let mut _sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; 97 + let sigterm = pin::pin!(_sigterm.recv()); 98 + let sigint = pin::pin!(_sigint.recv()); 99 + let terminating = futures::future::select(sigterm, sigint); 100 + 101 + tokio::select! { 102 + res = axum::serve(listener, app) => res?, 103 + _ = terminating => { 104 + info!("shutting down!"); 105 + closed.cancel(); 106 + } 107 + } 108 + 109 + info!("waiting 10 seconds for cleanup..."); 110 + tokio::time::sleep(Duration::from_secs(10)).await; 111 + 112 + info!("byebye! (_ _*)Zzz"); 79 113 80 114 Ok(()) 81 115 } 82 116 83 - async fn run_tap_consumer(db: Database, counts: Keyspace, ops_count: Arc<AtomicU64>) { 117 + async fn run_tap_consumer( 118 + db: Database, 119 + counts: Keyspace, 120 + ops_count: Arc<AtomicU64>, 121 + closed: CancellationToken, 122 + ) { 84 123 let tap_url = "http://localhost:2480"; 85 - let mut keyspaces: HashMap<String, Keyspace> = HashMap::new(); 86 124 87 125 loop { 88 126 info!("connecting to tap at {}", tap_url); ··· 97 135 match client.channel().await { 98 136 Ok(mut receiver) => { 99 137 info!("connected to tap firehose"); 100 - while let Ok(event) = receiver.recv().await { 101 - ops_count.fetch_add(1, Ordering::Relaxed); 102 - 103 - // check if it's a record and get the keyspace, without moving event yet 104 - let ks = if let Event::Record(rec) = &*event { 105 - if let Some(k) = keyspaces.get(&rec.collection) { 106 - Some(k.clone()) 107 - } else { 108 - match db.keyspace(&rec.collection, KeyspaceCreateOptions::default) { 109 - Ok(k) => { 110 - keyspaces.insert(rec.collection.clone(), k.clone()); 111 - Some(k) 138 + loop { 139 + tokio::select! { 140 + ev = receiver.recv() => { 141 + match ev { 142 + Ok(event) => { 143 + let Event::Record(rec) = event.event else { 144 + continue; 145 + }; 146 + // we drop here and assume we wont crash in between here and fjall i suppose 147 + 148 + ops_count.fetch_add(1, Ordering::Relaxed); 149 + 150 + let ks = match db 151 + .keyspace(&rec.collection, KeyspaceCreateOptions::default) 152 + { 153 + Ok(ks) => ks, 154 + Err(err) => { 155 + error!( 156 + "failed to open keyspace for {}: {}", 157 + rec.collection, err 158 + ); 159 + continue; 160 + } 161 + }; 162 + 163 + let counts = counts.clone(); 164 + tokio::task::spawn_blocking(move || { 165 + if let Err(e) = handle_record(&counts, &ks, rec) { 166 + error!("error handling record: {}", e); 167 + } 168 + }); 112 169 } 113 - Err(e) => { 114 - error!("failed to open keyspace for {}: {}", rec.collection, e); 115 - None 170 + Err(err) => { 171 + warn!("tap channel closed: {err}"); 172 + break; 116 173 } 117 174 } 118 175 } 119 - } else { 120 - None 121 - }; 122 - 123 - if let Some(ks) = ks { 124 - let counts = counts.clone(); 125 - tokio::task::spawn_blocking(move || { 126 - if let Event::Record(rec) = &*event { 127 - if let Err(e) = handle_record(&counts, &ks, rec) { 128 - error!("error handling record: {}", e); 129 - } 130 - } 131 - }); 176 + _ = closed.cancelled() => break, 132 177 } 133 178 } 134 - warn!("tap channel closed"); 135 179 } 136 180 Err(e) => { 137 181 warn!("failed to subscribe to channel: {}", e); ··· 150 194 did.strip_prefix("did:").unwrap_or(did) 151 195 } 152 196 153 - fn handle_record( 154 - counts: &Keyspace, 155 - records: &Keyspace, 156 - rec: &RecordEvent, 157 - ) -> anyhow::Result<()> { 197 + fn handle_record(counts: &Keyspace, records: &Keyspace, rec: RecordEvent) -> anyhow::Result<()> { 158 198 // index everything, no filter. 159 199 // key: strip_did(did)|rkey 160 200 let key = make_key(strip_did_prefix(&rec.did), &rec.rkey); 161 - 201 + 162 202 // logic to maintain counts: 163 203 // create: insert and increment. 164 204 // update: insert (overwrite). no count change. ··· 181 221 } 182 222 _ => {} 183 223 } 224 + 184 225 Ok(()) 185 226 } 186 227 ··· 196 237 let key = make_count_key(repo_stripped, collection); 197 238 let mut current = 0u64; 198 239 if let Some(val) = counts.get(&key)? { 199 - if val.len() == 8 { 200 - current = u64::from_le_bytes(val[..].try_into().unwrap()); 201 - } 240 + if val.len() == 8 { 241 + current = u64::from_le_bytes(val[..].try_into().unwrap()); 242 + } 202 243 } 203 244 current += 1; 204 245 counts.insert(&key, current.to_le_bytes())?; ··· 210 251 let mut current = 0u64; 211 252 if let Some(val) = counts.get(&key)? { 212 253 if val.len() == 8 { 213 - current = u64::from_le_bytes(val[..].try_into().unwrap()); 254 + current = u64::from_le_bytes(val[..].try_into().unwrap()); 214 255 } 215 256 } 216 257 if current > 0 { ··· 244 285 State(state): State<AppState>, 245 286 Query(params): Query<ListRecordsParams>, 246 287 ) -> Json<ListRecordsResponse> { 247 - let records = match state.db.keyspace(&params.collection, || KeyspaceCreateOptions::default()) { 288 + let records = match state 289 + .db 290 + .keyspace(&params.collection, || KeyspaceCreateOptions::default()) 291 + { 248 292 Ok(p) => p, 249 293 Err(_) => { 250 294 return Json(ListRecordsResponse { ··· 258 302 let repo_stripped = strip_did_prefix(&params.repo); 259 303 let prefix_str = format!("{}|", repo_stripped); 260 304 let prefix = prefix_str.as_bytes(); 261 - 305 + 262 306 // default to descending (newest first) -> reverse=false means descending. 263 307 // reverse=true means ascending. 264 308 let ascending = params.reverse.unwrap_or(false); ··· 279 323 // descending 280 324 prefix.to_vec() 281 325 }; 282 - 326 + 283 327 let end_bound = if ascending { 284 328 let mut p = prefix.to_vec(); 285 329 p.push(0xFF); ··· 294 338 p 295 339 } 296 340 }; 297 - 341 + 298 342 let range = records.range(start_bound..end_bound); 299 - 343 + 300 344 let mut process_key = |k: &[u8]| { 301 345 let k_str = String::from_utf8_lossy(k); 302 346 let parts: Vec<&str> = k_str.split('|').collect(); 303 347 // key format: repo_stripped|rkey 304 348 if parts.len() == 2 { 305 349 let rkey = parts[1]; 306 - aturis.push(format!("at://{}/{}/{}", params.repo, params.collection, rkey)); 350 + aturis.push(format!( 351 + "at://{}/{}/{}", 352 + params.repo, params.collection, rkey 353 + )); 307 354 last_rkey = Some(rkey.to_string()); 308 355 } 309 356 }; ··· 326 373 327 374 Json(ListRecordsResponse { 328 375 aturis, 329 - count, 376 + count, 330 377 cursor: last_rkey, 331 378 }) 332 379 } ··· 351 398 let repo_stripped = strip_did_prefix(&params.repo); 352 399 let key = make_count_key(repo_stripped, &params.collection); 353 400 let mut count = 0u64; 354 - 401 + 355 402 if let Ok(Some(val)) = state.counts.get(&key) { 356 403 if val.len() == 8 { 357 404 count = u64::from_le_bytes(val[..].try_into().unwrap()); 358 405 } 359 406 } 360 - 407 + 361 408 Json(CountRecordsResponse { 362 409 repo: params.repo, 363 410 collection: params.collection,