don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(jetstream): move subscriber options as shared state

Simplifies handling of updates to the subscriber options set.

Signed-off-by: tjh <x@tjh.dev>

tjh e2158d1c a3e7ea0d

+120 -150
+1
Cargo.lock
··· 2652 2652 "lexicon", 2653 2653 "serde", 2654 2654 "serde_json", 2655 + "thiserror", 2655 2656 "time", 2656 2657 "tokio", 2657 2658 "tokio-tungstenite",
+1
crates/jetstream/Cargo.toml
··· 27 27 28 28 zstd = { version = "0.13.3", optional = true } 29 29 clap = { version = "4.5.50", features = ["derive"], optional = true } 30 + thiserror.workspace = true 30 31 31 32 [features] 32 33 default = ["clap", "zstd"]
+71 -80
crates/jetstream/src/client.rs
··· 2 2 Nsid, 3 3 de::Event, 4 4 metrics::{Metrics, MetricsData}, 5 + subscriber_options::SubscriberOptions, 6 + task::JetstreamTaskError, 5 7 }; 6 8 use atproto::did::Did; 7 9 use bytes::Bytes; 8 - use std::sync::Arc; 9 - use tokio::{sync::Notify, task::JoinHandle}; 10 + use std::sync::{Arc, Mutex}; 11 + use tokio::{sync::oneshot, task::JoinHandle}; 10 12 use tokio_util::sync::{CancellationToken, DropGuard}; 11 13 12 14 #[derive(Debug)] 13 15 pub struct JetstreamClient { 14 16 handle: JoinHandle<()>, 15 17 client_tx: flume::Sender<ClientCommand>, 18 + options: Arc<Mutex<SubscriberOptions>>, 16 19 metrics: Metrics, 17 20 shutdown: DropGuard, 18 21 } ··· 24 21 pub(crate) fn new( 25 22 handle: tokio::task::JoinHandle<()>, 26 23 client_tx: flume::Sender<ClientCommand>, 24 + options: Arc<Mutex<SubscriberOptions>>, 27 25 metrics: Metrics, 28 26 shutdown: CancellationToken, 29 27 ) -> Self { 30 28 Self { 31 29 handle, 32 30 client_tx, 31 + options, 33 32 metrics, 34 33 shutdown: shutdown.drop_guard(), 35 34 } ··· 44 39 45 40 /// Add a DID to the Jetstream filters. 46 41 pub async fn add_did(&self, did: impl Into<Box<Did>>) -> Result<(), JetstreamClientError> { 47 - let (command, complete) = ClientCommand::add_did(did.into()); 48 - self.client_tx.send_async(command).await?; 49 - complete.notified().await; 42 + if self.options.lock().unwrap().add_did(did.into())? { 43 + // The DID is new to the client, notify the task to update. 44 + self.update_task().await?; 45 + } 50 46 Ok(()) 51 47 } 52 48 53 49 /// Remove a DID from the Jetstream filters. 54 50 pub async fn remove_did(&self, did: impl Into<Box<Did>>) -> Result<(), JetstreamClientError> { 55 - let (command, complete) = ClientCommand::remove_did(did.into()); 56 - self.client_tx.send_async(command).await?; 57 - complete.notified().await; 51 + if self.options.lock().unwrap().remove_did(&did.into()) { 52 + self.update_task().await?; 53 + } 58 54 Ok(()) 59 55 } 60 56 ··· 64 58 &self, 65 59 collection: impl Into<Box<Nsid>>, 66 60 ) -> Result<(), JetstreamClientError> { 67 - let (command, complete) = ClientCommand::add_collection(collection.into()); 68 - self.client_tx.send_async(command).await?; 69 - complete.notified().await; 61 + if self 62 + .options 63 + .lock() 64 + .unwrap() 65 + .add_collection(collection.into())? 66 + { 67 + // The collection is new to the client, notify the task to update. 68 + self.update_task().await?; 69 + } 70 70 Ok(()) 71 71 } 72 72 ··· 81 69 &self, 82 70 collection: impl Into<Box<Nsid>>, 83 71 ) -> Result<(), JetstreamClientError> { 84 - let (command, complete) = ClientCommand::remove_collection(collection.into()); 85 - self.client_tx.send_async(command).await?; 86 - complete.notified().await; 72 + if self 73 + .options 74 + .lock() 75 + .unwrap() 76 + .remove_collection(&collection.into()) 77 + { 78 + self.update_task().await?; 79 + } 87 80 Ok(()) 88 81 } 89 82 90 83 pub fn metrics(&self) -> MetricsData { 91 84 self.metrics.export() 92 85 } 86 + 87 + async fn update_task(&self) -> Result<(), JetstreamClientError> { 88 + let (command, complete) = ClientCommand::subscriber_options_update(); 89 + self.client_tx.send(command)?; 90 + complete.await??; 91 + Ok(()) 92 + } 93 93 } 94 94 95 95 pub enum ClientCommand { 96 - AddDid { 97 - complete: Arc<Notify>, 98 - did: Box<Did>, 99 - }, 100 - RemoveDid { 101 - complete: Arc<Notify>, 102 - did: Box<Did>, 103 - }, 104 - AddCollection { 105 - complete: Arc<Notify>, 106 - collection: Box<Nsid>, 107 - }, 108 - RemoveCollection { 109 - complete: Arc<Notify>, 110 - collection: Box<Nsid>, 111 - }, 96 + SubscriberOptionsUpdate(oneshot::Sender<Result<(), JetstreamTaskError>>), 112 97 } 113 98 114 99 impl ClientCommand { 115 - fn add_did(did: Box<Did>) -> (Self, Arc<Notify>) { 116 - let complete = Arc::new(Notify::new()); 117 - ( 118 - Self::AddDid { 119 - complete: Arc::clone(&complete), 120 - did, 121 - }, 122 - complete, 123 - ) 124 - } 125 - 126 - fn remove_did(did: Box<Did>) -> (Self, Arc<Notify>) { 127 - let complete = Arc::new(Notify::new()); 128 - ( 129 - Self::RemoveDid { 130 - complete: Arc::clone(&complete), 131 - did, 132 - }, 133 - complete, 134 - ) 135 - } 136 - 137 - fn add_collection(collection: Box<Nsid>) -> (Self, Arc<Notify>) { 138 - let complete = Arc::new(Notify::new()); 139 - ( 140 - Self::AddCollection { 141 - complete: Arc::clone(&complete), 142 - collection, 143 - }, 144 - complete, 145 - ) 146 - } 147 - 148 - fn remove_collection(collection: Box<Nsid>) -> (Self, Arc<Notify>) { 149 - let complete = Arc::new(Notify::new()); 150 - ( 151 - Self::RemoveCollection { 152 - complete: Arc::clone(&complete), 153 - collection, 154 - }, 155 - complete, 156 - ) 100 + fn subscriber_options_update() -> (Self, oneshot::Receiver<Result<(), JetstreamTaskError>>) { 101 + let (tx, rx) = oneshot::channel(); 102 + (Self::SubscriberOptionsUpdate(tx), rx) 157 103 } 158 104 } 159 105 160 - #[derive(Debug)] 161 - pub struct JetstreamClientError; 162 - 163 - impl core::fmt::Display for JetstreamClientError { 164 - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 165 - f.write_str("Failed to send command to client task") 166 - } 106 + #[derive(Debug, thiserror::Error)] 107 + pub enum JetstreamClientError { 108 + #[error("Client task shutdown")] 109 + TaskShutdown, 110 + #[error("Error in jetstream client task: {0}")] 111 + TaskError(#[from] JetstreamTaskError), 112 + #[error("DID filter exceeds maximum size")] 113 + TooManyDids(Box<Did>), 114 + #[error("Collection filter exceeds maximum size")] 115 + TooManyCollections(Box<Nsid>), 167 116 } 168 117 169 118 impl<T> From<flume::SendError<T>> for JetstreamClientError { 170 119 fn from(_: flume::SendError<T>) -> Self { 171 - Self 120 + Self::TaskShutdown 121 + } 122 + } 123 + 124 + impl From<oneshot::error::RecvError> for JetstreamClientError { 125 + fn from(_: oneshot::error::RecvError) -> Self { 126 + Self::TaskShutdown 127 + } 128 + } 129 + 130 + impl From<Box<Did>> for JetstreamClientError { 131 + fn from(value: Box<Did>) -> Self { 132 + Self::TooManyDids(value) 133 + } 134 + } 135 + 136 + impl From<Box<Nsid>> for JetstreamClientError { 137 + fn from(value: Box<Nsid>) -> Self { 138 + Self::TooManyCollections(value) 172 139 } 173 140 } 174 141
+6 -2
crates/jetstream/src/client_builder.rs
··· 1 + use std::sync::{Arc, Mutex}; 2 + 1 3 use super::JetstreamClient; 2 4 use crate::{Receiver, metrics::Metrics, subscriber_options::SubscriberOptions}; 3 5 use atproto::{Did, Nsid}; ··· 17 15 let (event_tx, event_rx) = flume::bounded(8); 18 16 let (client_tx, client_rx) = flume::bounded(8); 19 17 18 + let options = Arc::new(Mutex::new(self.options)); 19 + 20 20 let shutdown = CancellationToken::new(); 21 21 let metrics = Metrics::new(); 22 22 let handle = tokio::task::spawn(crate::task::jetstream_subscriber( ··· 26 22 client_rx, 27 23 metrics.clone(), 28 24 instance, 29 - self.options, 25 + Arc::clone(&options), 30 26 self.cursor, 31 27 shutdown.child_token(), 32 28 )); 33 29 34 - let client = JetstreamClient::new(handle, client_tx, metrics, shutdown); 30 + let client = JetstreamClient::new(handle, client_tx, options, metrics, shutdown); 35 31 let receiver = Receiver::new(event_rx); 36 32 37 33 (client, receiver)
+41 -68
crates/jetstream/src/task.rs
··· 6 6 use bytes::Bytes; 7 7 use futures_util::{SinkExt, StreamExt as _}; 8 8 use serde::Deserialize; 9 - use std::{ops::ControlFlow, sync::Arc, time::Duration}; 10 - use tokio::{sync::Notify, time::Instant}; 9 + use std::{ 10 + sync::{Arc, Mutex}, 11 + time::Duration, 12 + }; 13 + use tokio::time::Instant; 11 14 use tokio_tungstenite::{ 12 15 connect_async, 13 16 tungstenite::{ClientRequestBuilder, Error as TungsteniteError, Message, http::Uri}, ··· 26 23 /// Duration to rewind the cursor by when reconnecting a borken subscription. 27 24 const REWIND: Duration = Duration::from_secs(1); 28 25 26 + #[derive(Debug, thiserror::Error)] 27 + pub enum JetstreamTaskError { 28 + #[error("Failed to send subscriber options update: {0}")] 29 + OptionsUpdate(#[from] TungsteniteError), 30 + } 31 + 29 32 pub async fn jetstream_subscriber( 30 33 event_tx: flume::Sender<Bytes>, 31 34 client_rx: flume::Receiver<ClientCommand>, 32 35 metrics: Metrics, 33 36 instance: url::Url, 34 - mut options: SubscriberOptions, 37 + options: Arc<Mutex<SubscriberOptions>>, 35 38 initial_cursor: Option<u128>, 36 39 shutdown: CancellationToken, 37 40 ) { ··· 51 42 let mut timeout = tokio::time::interval(Duration::from_secs(5)); 52 43 53 44 'outer: loop { 54 - let (subscribe_url, require_hello) = match options.subscribe_url(&instance, &cursor) { 55 - SubscribeMethod::Query(url) => (url, false), 56 - SubscribeMethod::Hello(url) => (url, true), 57 - }; 45 + let (subscribe_url, require_hello) = 46 + match options.lock().unwrap().subscribe_url(&instance, &cursor) { 47 + SubscribeMethod::Query(url) => (url, false), 48 + SubscribeMethod::Hello(url) => (url, true), 49 + }; 58 50 59 51 tracing::debug!(%subscribe_url, "connecting to jetstream"); 60 52 let uri: Uri = subscribe_url ··· 89 79 metrics.modify(|mut data| data.connects += 1); 90 80 let (mut write, mut read) = socket.split(); 91 81 92 - if require_hello { 93 - if let Err(e) = send_options_update::<_, TungsteniteError>(&mut write, &options).await { 94 - tracing::error!(?e, "failed to send subscibe options update"); 95 - continue; 96 - } 82 + if require_hello 83 + && let Err(e) = send_options_update::<_, TungsteniteError>(&mut write, &options).await 84 + { 85 + tracing::error!(?e, "failed to send subscibe options update"); 86 + continue; 97 87 } 98 88 99 89 let mut last = Instant::now(); ··· 104 94 message 105 95 }, 106 96 Ok(command) = client_rx.recv_async() => { 107 - match process_command(command, &mut options) { 108 - ControlFlow::Continue((Outcome::NoAction, complete)) => { 109 - complete.notify_one(); 110 - }, 111 - ControlFlow::Continue((Outcome::UpdateOptions, complete)) => { 97 + match command { 98 + ClientCommand::SubscriberOptionsUpdate(complete) => { 112 99 if let Err(e) = send_options_update::<_, TungsteniteError>(&mut write, &options).await { 113 - tracing::error!(?e, "failed to send subscibe options update"); 100 + tracing::error!(?e, "failed to send subscribe options update"); 101 + if complete.send(Err(e.into())).is_err() { 102 + break 'outer; 103 + } 114 104 break; 115 105 } 116 - complete.notify_one(); 106 + 107 + if complete.send(Ok(())).is_err() { 108 + break 'outer; 109 + } 117 110 } 118 - ControlFlow::Break(_) => break 'outer, 119 111 } 120 112 continue; 121 113 } ··· 251 239 tracing::warn!("jetstream subscriber task ended"); 252 240 } 253 241 254 - async fn send_options_update<S, E>(sink: &mut S, options: &SubscriberOptions) -> Result<(), E> 242 + async fn send_options_update<S, E>( 243 + sink: &mut S, 244 + options: &Mutex<SubscriberOptions>, 245 + ) -> Result<(), E> 255 246 where 256 247 S: SinkExt<Message> + Unpin, 257 248 E: From<S::Error>, 258 249 { 259 250 use crate::subscriber_options::SubscriberSourcedMessage; 260 251 261 - let update = SubscriberSourcedMessage::OptionsUpdate(options).to_json(); 252 + let update = { 253 + let options = options.lock().unwrap(); 254 + SubscriberSourcedMessage::OptionsUpdate(&options).to_json() 255 + }; 256 + 262 257 tracing::debug!(%update, "sending options update"); 263 258 sink.send(Message::Text(update.into())).await?; 264 259 Ok(()) 265 - } 266 - 267 - enum Outcome { 268 - NoAction, 269 - UpdateOptions, 270 - } 271 - 272 - fn process_command( 273 - command: ClientCommand, 274 - options: &mut SubscriberOptions, 275 - ) -> ControlFlow<(), (Outcome, Arc<Notify>)> { 276 - match command { 277 - ClientCommand::AddDid { complete, did } => match options.add_did(did) { 278 - Ok(true) => ControlFlow::Continue((Outcome::UpdateOptions, complete)), 279 - Ok(false) => ControlFlow::Continue((Outcome::NoAction, complete)), 280 - Err(did) => { 281 - tracing::error!(?did, "too many DIDs encountered when adding DID"); 282 - ControlFlow::Break(()) 283 - } 284 - }, 285 - ClientCommand::RemoveDid { complete, did } => match options.remove_did(&did) { 286 - true => ControlFlow::Continue((Outcome::UpdateOptions, complete)), 287 - false => ControlFlow::Continue((Outcome::NoAction, complete)), 288 - }, 289 - ClientCommand::AddCollection { 290 - complete, 291 - collection, 292 - } => match options.add_collection(collection) { 293 - Ok(true) => ControlFlow::Continue((Outcome::UpdateOptions, complete)), 294 - Ok(false) => ControlFlow::Continue((Outcome::NoAction, complete)), 295 - Err(collection) => { 296 - tracing::error!( 297 - ?collection, 298 - "too many collections encountered when adding collection" 299 - ); 300 - ControlFlow::Break(()) 301 - } 302 - }, 303 - ClientCommand::RemoveCollection { 304 - complete, 305 - collection, 306 - } => match options.remove_collection(&collection) { 307 - true => ControlFlow::Continue((Outcome::UpdateOptions, complete)), 308 - false => ControlFlow::Continue((Outcome::NoAction, complete)), 309 - }, 310 - } 311 260 }