don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(jetstream): return future instead of join handle

Signed-off-by: tjh <x@tjh.dev>

tjh ac684096 749be5ff

+17 -23
+1 -12
crates/jetstream/src/client.rs
··· 8 8 use atproto::did::Did; 9 9 use bytes::Bytes; 10 10 use std::sync::{Arc, Mutex}; 11 - use tokio::{ 12 - sync::{Notify, oneshot}, 13 - task::JoinHandle, 14 - }; 11 + use tokio::sync::oneshot; 15 12 use tokio_util::sync::{CancellationToken, DropGuard}; 16 13 17 14 #[derive(Debug)] 18 15 pub struct JetstreamClient { 19 - handle: JoinHandle<()>, 20 16 client_tx: flume::Sender<ClientCommand>, 21 17 options: Arc<Mutex<SubscriberOptions>>, 22 18 metrics: Metrics, ··· 21 25 22 26 impl JetstreamClient { 23 27 pub(crate) fn new( 24 - handle: tokio::task::JoinHandle<()>, 25 28 client_tx: flume::Sender<ClientCommand>, 26 29 options: Arc<Mutex<SubscriberOptions>>, 27 30 metrics: Metrics, 28 31 shutdown: CancellationToken, 29 32 ) -> Self { 30 33 Self { 31 - handle, 32 34 client_tx, 33 35 options, 34 36 metrics, 35 37 shutdown: shutdown.drop_guard(), 36 38 } 37 - } 38 - 39 - pub async fn join(self) -> Result<(), tokio::task::JoinError> { 40 - drop(self.client_tx); 41 - self.handle.await 42 39 } 43 40 44 41 /// Add a DID to the Jetstream filters.
+7 -7
crates/jetstream/src/client_builder.rs
··· 13 13 } 14 14 15 15 impl JetstreamClientBuilder { 16 - pub fn build(self, instance: Url) -> (JetstreamClient, Receiver) { 16 + pub fn build(self, instance: Url) -> (JetstreamClient, Receiver, impl Future<Output = ()>) { 17 17 let (event_tx, event_rx) = flume::bounded(8); 18 18 let (client_tx, client_rx) = flume::bounded(8); 19 19 20 20 let options = Arc::new(Mutex::new(self.options)); 21 - 22 - let shutdown = CancellationToken::new(); 23 21 let metrics = Metrics::new(); 24 - let handle = tokio::task::spawn(crate::task::jetstream_subscriber( 22 + let shutdown = CancellationToken::new(); 23 + 24 + let task = crate::task::jetstream_subscriber( 25 25 event_tx, 26 26 client_rx, 27 27 metrics.clone(), ··· 29 29 Arc::clone(&options), 30 30 self.cursor, 31 31 shutdown.child_token(), 32 - )); 32 + ); 33 33 34 - let client = JetstreamClient::new(handle, client_tx, options, metrics, shutdown); 34 + let client = JetstreamClient::new(client_tx, options, metrics, shutdown); 35 35 let receiver = Receiver::new(event_rx); 36 36 37 - (client, receiver) 37 + (client, receiver, task) 38 38 } 39 39 40 40 pub fn add_collection(&mut self, nsid: Box<Nsid>) -> &mut Self {
+3 -1
crates/jetstream/src/lib.rs
··· 23 23 Default::default() 24 24 } 25 25 26 - pub fn subscribe(instance: impl AsRef<str>) -> (JetstreamClient, Receiver) { 26 + pub fn subscribe( 27 + instance: impl AsRef<str>, 28 + ) -> (JetstreamClient, Receiver, impl Future<Output = ()>) { 27 29 use url::Url; 28 30 builder().build(Url::parse(instance.as_ref()).expect("Instance should be a valid URL")) 29 31 }
+6 -3
crates/jetstream/src/main.rs
··· 80 80 } 81 81 82 82 #[cfg(not(feature = "lexicon"))] 83 - let (client, rx) = builder.build(arguments.instance); 83 + let (client, rx, task) = builder.build(arguments.instance); 84 84 85 85 #[cfg(feature = "lexicon")] 86 - let (client, rx) = builder.build(arguments.instance); 86 + let (client, rx, task) = builder.build(arguments.instance); 87 + 88 + // Spawn the client task. 89 + let handle = tokio::spawn(task); 87 90 88 91 while let Some(message) = rx.recv_async().await { 89 92 let msg = String::from_utf8_lossy(message.as_bytes()); ··· 100 97 } 101 98 } 102 99 103 - client.join().await.unwrap(); 100 + handle.await.unwrap(); 104 101 }