don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(jetstream): add explicit shutdown command

Signed-off-by: tjh <x@tjh.dev>

tjh 749be5ff e2158d1c

+38 -12
+17 -1
crates/jetstream/src/client.rs
··· 8 8 use atproto::did::Did; 9 9 use bytes::Bytes; 10 10 use std::sync::{Arc, Mutex}; 11 - use tokio::{sync::oneshot, task::JoinHandle}; 11 + use tokio::{ 12 + sync::{Notify, oneshot}, 13 + task::JoinHandle, 14 + }; 12 15 use tokio_util::sync::{CancellationToken, DropGuard}; 13 16 14 17 #[derive(Debug)] ··· 99 96 self.metrics.export() 100 97 } 101 98 99 + pub async fn shutdown(self) -> Result<(), JetstreamClientError> { 100 + let (command, complete) = ClientCommand::shutdown(); 101 + self.client_tx.send(command)?; 102 + complete.await??; 103 + Ok(()) 104 + } 105 + 102 106 async fn update_task(&self) -> Result<(), JetstreamClientError> { 103 107 let (command, complete) = ClientCommand::subscriber_options_update(); 104 108 self.client_tx.send(command)?; ··· 116 106 117 107 pub enum ClientCommand { 118 108 SubscriberOptionsUpdate(oneshot::Sender<Result<(), JetstreamTaskError>>), 109 + Shutdown(oneshot::Sender<Result<(), JetstreamTaskError>>), 119 110 } 120 111 121 112 impl ClientCommand { 122 113 fn subscriber_options_update() -> (Self, oneshot::Receiver<Result<(), JetstreamTaskError>>) { 123 114 let (tx, rx) = oneshot::channel(); 124 115 (Self::SubscriberOptionsUpdate(tx), rx) 116 + } 117 + 118 + fn shutdown() -> (Self, oneshot::Receiver<Result<(), JetstreamTaskError>>) { 119 + let (tx, rx) = oneshot::channel(); 120 + (Self::Shutdown(tx), rx) 125 121 } 126 122 } 127 123
+21 -11
crates/jetstream/src/task.rs
··· 29 29 #[derive(Debug, thiserror::Error)] 30 30 pub enum JetstreamTaskError { 31 31 #[error("Failed to send subscriber options update: {0}")] 32 - OptionsUpdate(#[from] TungsteniteError), 32 + OptionsUpdate(TungsteniteError), 33 + #[error("Failed to send close message: {0}")] 34 + Close(TungsteniteError), 33 35 } 34 36 35 37 pub async fn jetstream_subscriber( ··· 108 106 Ok(command) = client_rx.recv_async() => { 109 107 match command { 110 108 ClientCommand::SubscriberOptionsUpdate(complete) => { 111 - if let Err(e) = send_options_update::<_, TungsteniteError>(&mut write, &options).await { 112 - tracing::error!(?e, "failed to send subscribe options update"); 113 - if complete.send(Err(e.into())).is_err() { 114 - break 'outer; 115 - } 116 - break; 117 - } 109 + let result = send_options_update::<_, TungsteniteError>(&mut write, &options) 110 + .await 111 + .map_err(JetstreamTaskError::OptionsUpdate); 118 112 119 - if complete.send(Ok(())).is_err() { 120 - break 'outer; 113 + match (result.is_err(), complete.send(result)) { 114 + (_, Err(_)) => { 115 + // Client is broken. 116 + break 'outer; 117 + }, 118 + (true, _) => { 119 + // Reconnect websocket. 120 + break; 121 + } 122 + (false, _) => continue, 121 123 } 122 124 } 125 + ClientCommand::Shutdown(complete) => { 126 + let result = write.send(Message::Close(None)).await.map_err(JetstreamTaskError::Close); 127 + let _ = complete.send(result); 128 + break 'outer; 129 + } 123 130 } 124 - continue; 125 131 } 126 132 now = timeout.tick() => { 127 133 if now.duration_since(last) > THRESHOLD {