very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[firehose] use cancel token instead of aborting task, fix bug where upserted tasks would remove the new task from the map

dawn 147baa33 3f22754e

+28 -13
+28 -13
src/control/firehose.rs
··· 1 1 use std::sync::Arc; 2 + use std::sync::atomic::{AtomicUsize, Ordering}; 2 3 3 4 use miette::{IntoDiagnostic, Result}; 4 - use tokio::sync::watch; 5 + use tokio_util::sync::CancellationToken; 5 6 use tracing::{error, info}; 6 7 use url::Url; 7 8 ··· 11 12 use crate::state::AppState; 12 13 13 14 pub(super) struct FirehoseIngestorHandle { 14 - abort: tokio::task::AbortHandle, 15 + id: usize, 16 + cancel: CancellationToken, 15 17 pub(super) is_pds: bool, 16 18 } 17 19 18 20 impl Drop for FirehoseIngestorHandle { 19 21 fn drop(&mut self) { 20 - self.abort.abort(); 22 + self.cancel.cancel(); 21 23 } 22 24 } 23 25 ··· 46 48 pub(super) tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>, 47 49 /// set of urls persisted in the database (dynamically added sources). 48 50 pub(super) persisted: Arc<scc::HashSet<Url>>, 51 + /// ids assigned to spawned tasks 52 + next_task_id: Arc<AtomicUsize>, 49 53 } 50 54 51 55 impl FirehoseHandle { ··· 55 59 shared: Arc::new(std::sync::OnceLock::new()), 56 60 tasks: Arc::new(scc::HashMap::new()), 57 61 persisted: Arc::new(scc::HashSet::new()), 62 + next_task_id: Arc::new(AtomicUsize::new(0)), 58 63 } 59 64 } 60 65 ··· 87 92 ) 88 93 .await; 89 94 90 - let abort = tokio::spawn({ 95 + let id = self.next_task_id.fetch_add(1, Ordering::Relaxed); 96 + let cancel = CancellationToken::new(); 97 + 98 + tokio::spawn({ 91 99 let relay_url = source.url.clone(); 92 100 let tasks = self.tasks.clone(); 101 + let token = cancel.clone(); 93 102 async move { 94 - if let Err(e) = ingestor.run().await { 95 - error!(relay = %relay_url, err = %e, "firehose ingestor exited with error"); 96 - } else { 97 - // remove from tasks since we shutdown 98 - tasks.remove_async(&relay_url).await; 99 - info!(relay = %relay_url, "firehose shut down!"); 103 + tokio::select! { 104 + res = ingestor.run() => { 105 + // only remove our own entry because an upsert could replace us 106 + tasks.remove_if_async(&relay_url, |h| h.id == id).await; 107 + match res { 108 + Ok(()) => info!(relay = %relay_url, "firehose shut down!"), 109 + Err(e) => error!(relay = %relay_url, err = %e, "firehose ingestor exited with error"), 110 + } 111 + }, 112 + _ = token.cancelled() => { 113 + info!(relay = %relay_url, "firehose ingestor cancelled"); 114 + } 100 115 } 101 116 } 102 - }) 103 - .abort_handle(); 117 + }); 104 118 105 119 let handle = FirehoseIngestorHandle { 106 - abort, 120 + id, 121 + cancel, 107 122 is_pds: source.is_pds, 108 123 }; 109 124 self.tasks.upsert_async(source.url.clone(), handle).await;