lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

handle lockup from fjall poisoning

eg., from too many files open

phil 3f84af7d ac036f67

+66 -7
+19
src/error.rs
··· 17 17 Other(String), 18 18 } 19 19 20 + impl Error { 21 + /// True if this error indicates the database is in an unrecoverable state. 22 + /// 23 + /// `fjall::Error::Poisoned` is fjall's signal that a prior flush/commit 24 + /// failed and subsequent writes can't be trusted — fjall's own docs say 25 + /// to crash the application. `Unrecoverable` is a similar terminal state. 26 + /// Callers should force-exit the process rather than attempt graceful 27 + /// shutdown, since the blocking thread pool may be stuck on the same 28 + /// underlying failure. 29 + pub fn is_db_fatal(&self) -> bool { 30 + matches!( 31 + self, 32 + Error::Storage(StorageError::Fjall( 33 + fjall::Error::Poisoned | fjall::Error::Unrecoverable 34 + )) 35 + ) 36 + } 37 + } 38 + 20 39 pub type Result<T> = std::result::Result<T, Error>;
+40 -6
src/main.rs
··· 121 121 max_deep_crawl_workers: usize, 122 122 } 123 123 124 - #[tokio::main] 125 - async fn main() -> Result<()> { 124 + fn main() { 126 125 rustls::crypto::aws_lc_rs::default_provider() 127 126 .install_default() 128 127 .expect("failed to install rustls crypto provider"); ··· 131 130 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) 132 131 .init(); 133 132 133 + let rt = tokio::runtime::Builder::new_multi_thread() 134 + .enable_all() 135 + .build() 136 + .expect("failed to build tokio runtime"); 137 + 138 + let result = rt.block_on(run()); 139 + 140 + // Force-shutdown the runtime after a bounded wait. Without this, a 141 + // `spawn_blocking` task genuinely stuck in fjall (e.g. after Poisoned) 142 + // holds a blocking-pool thread, and since those threads are non-daemon 143 + // they'd prevent process exit. `shutdown_timeout` detaches any remaining 144 + // tasks after the deadline; the explicit `process::exit` below then 145 + // guarantees we don't wait for detached blocking threads either. 146 + rt.shutdown_timeout(Duration::from_secs(10)); 147 + 148 + match result { 149 + Ok(()) => std::process::exit(0), 150 + Err(e) => { 151 + eprintln!("fatal: {e}"); 152 + std::process::exit(1); 153 + } 154 + } 155 + } 156 + 157 + async fn run() -> Result<()> { 134 158 let args = Args::parse(); 135 159 136 160 let subscribe_host = args ··· 377 401 378 402 /// Flatten a task join result into an optional error. 379 403 /// Panics (JoinError) are treated as errors. 404 + /// 405 + /// If the error indicates an unrecoverable database state 406 + /// ([`Error::is_db_fatal`]), this immediately force-exits the process rather 407 + /// than returning. Graceful shutdown isn't safe in that state because other 408 + /// tasks may be stuck in blocking fjall calls that will never return. 380 409 fn into_error(r: std::result::Result<Result<()>, tokio::task::JoinError>) -> Option<Error> { 381 - match r { 382 - Ok(Ok(())) => None, 383 - Ok(Err(e)) => Some(e), 384 - Err(e) => Some(Error::TaskPanic(e)), 410 + let err = match r { 411 + Ok(Ok(())) => return None, 412 + Ok(Err(e)) => e, 413 + Err(e) => Error::TaskPanic(e), 414 + }; 415 + if err.is_db_fatal() { 416 + eprintln!("FATAL: database poisoned, force-exiting: {err}"); 417 + std::process::exit(2); 385 418 } 419 + Some(err) 386 420 } 387 421 388 422 fn install_metrics(addr: SocketAddr) -> Result<()> {
+7 -1
src/sync/resync/dispatcher.rs
··· 294 294 } 295 295 Ok(Ok(None)) => break, // queue empty or all ready items busy 296 296 Ok(Err(e)) => { 297 - error!(error = %e, "error claiming resync job; pausing"); 297 + let wrapped = crate::error::Error::from(e); 298 + if wrapped.is_db_fatal() { 299 + error!(error = %wrapped, 300 + "claim_resync hit unrecoverable db state; exiting dispatcher"); 301 + return Err(wrapped); 302 + } 303 + error!(error = %wrapped, "error claiming resync job; pausing"); 298 304 break; 299 305 } 300 306 }