very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[crawler] abort on panic in tasks

dawn dab3b719 9f8e65a0

+24 -9
+24 -9
src/crawler/mod.rs
··· 208 208 let crawler = Arc::new(self); 209 209 210 210 // stats ticker 211 - tokio::spawn({ 211 + let ticker = tokio::spawn({ 212 212 use std::time::Instant; 213 213 let crawler = crawler.clone(); 214 214 let mut last_time = Instant::now(); ··· 226 226 if is_throttled { 227 227 info!("throttled: pending queue full"); 228 228 } else { 229 - debug!("no repos crawled or processed in 60s"); 229 + info!("idle: no repos crawled or processed in 60s"); 230 230 } 231 231 continue; 232 232 } ··· 251 251 } 252 252 } 253 253 }); 254 + tokio::spawn(async move { 255 + let Err(e) = ticker.await; 256 + error!(err = ?e, "stats ticker panicked, aborting"); 257 + std::process::abort(); 258 + }); 254 259 255 260 // retry thread 256 261 std::thread::spawn({ ··· 261 266 262 267 let _g = handle.enter(); 263 268 264 - loop { 265 - match crawler.process_retry_queue() { 266 - Ok(Some(dur)) => sleep(dur.max(Duration::from_secs(1))), 267 - Ok(None) => sleep(Duration::from_secs(60)), 268 - Err(e) => { 269 - error!(err = %e, "retry loop failed"); 270 - sleep(Duration::from_secs(60)); 269 + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { 270 + loop { 271 + match crawler.process_retry_queue() { 272 + Ok(Some(dur)) => sleep(dur.max(Duration::from_secs(1))), 273 + Ok(None) => sleep(Duration::from_secs(60)), 274 + Err(e) => { 275 + error!(err = %e, "retry loop failed"); 276 + sleep(Duration::from_secs(60)); 277 + } 271 278 } 272 279 } 280 + })); 281 + if result.is_err() { 282 + error!("retry thread panicked, aborting"); 283 + std::process::abort(); 273 284 } 274 285 } 275 286 }); ··· 484 495 info!("reached end of list."); 485 496 cursor = Cursor::Done(c); 486 497 } 498 + info!("sleeping 1h before next enumeration pass"); 487 499 tokio::time::sleep(Duration::from_secs(3600)).await; 500 + info!("resuming after 1h sleep"); 488 501 continue; 489 502 } 490 503 Err(e) => return Err(e).wrap_err("error while crawling"), ··· 546 559 crawler.account_new_repos(valid_dids.len()).await; 547 560 548 561 if matches!(cursor, Cursor::Done(_)) { 562 + info!("enumeration complete, sleeping 1h before next pass"); 549 563 tokio::time::sleep(Duration::from_secs(3600)).await; 564 + info!("resuming after 1h sleep"); 550 565 } 551 566 } 552 567 }