Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #4 from cyypherus/fix

Fix web socket error handling loop

authored by

videah and committed by
GitHub
66602988 38bd24e3

+167 -87
+8 -1
.gitignore
··· 1 - /target 1 + target/ 2 + pkg/ 3 + **/*.rs.bk 4 + dist/ 5 + traces/ 6 + *.DS_Store 7 + .cargo/ 8 + .env
+14
Cargo.lock
··· 767 767 "thiserror 2.0.3", 768 768 "tokio", 769 769 "tokio-tungstenite", 770 + "tokio-util", 770 771 "url", 771 772 "zstd", 772 773 ] ··· 1463 1464 "tokio", 1464 1465 "tokio-native-tls", 1465 1466 "tungstenite", 1467 + ] 1468 + 1469 + [[package]] 1470 + name = "tokio-util" 1471 + version = "0.7.13" 1472 + source = "registry+https://github.com/rust-lang/crates.io-index" 1473 + checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" 1474 + dependencies = [ 1475 + "bytes", 1476 + "futures-core", 1477 + "futures-sink", 1478 + "pin-project-lite", 1479 + "tokio", 1466 1480 ] 1467 1481 1468 1482 [[package]]
+11 -4
Cargo.toml
··· 10 10 11 11 [dependencies] 12 12 async-trait = "0.1.83" 13 - atrium-api = { version = "0.24.7", default-features = false, features = ["namespace-appbsky"] } 14 - tokio = { version = "1.41.1", features = ["full", "sync"] } 15 - tokio-tungstenite = { version = "0.24.0", features = ["connect", "native-tls-vendored", "url"] } 13 + atrium-api = { version = "0.24.7", default-features = false, features = [ 14 + "namespace-appbsky", 15 + ] } 16 + tokio = { version = "1.41.1", features = ["full", "sync", "time"] } 17 + tokio-tungstenite = { version = "0.24.0", features = [ 18 + "connect", 19 + "native-tls-vendored", 20 + "url", 21 + ] } 16 22 futures-util = "0.3.31" 17 23 url = "2.5.3" 18 24 serde = { version = "1.0.215", features = ["derive"] } ··· 22 28 thiserror = "2.0.3" 23 29 flume = "0.11.1" 24 30 log = "0.4.22" 31 + tokio-util = "0.7.13" 25 32 26 33 [dev-dependencies] 27 34 anyhow = "1.0.93" 28 - clap = { version = "4.5.20", features = ["derive"] } 35 + clap = { version = "4.5.20", features = ["derive"] }
+4 -13
examples/basic.rs
··· 1 1 //! A very basic example of how to listen for create/delete events on a specific DID and NSID. 2 2 3 - use atrium_api::{ 4 - record::KnownRecord::AppBskyFeedPost, 5 - types::string, 6 - }; 3 + use atrium_api::{record::KnownRecord::AppBskyFeedPost, types::string}; 7 4 use clap::Parser; 8 5 use jetstream_oxide::{ 9 - events::{ 10 - commit::CommitEvent, 11 - JetstreamEvent::Commit, 12 - }, 13 - DefaultJetstreamEndpoints, 14 - JetstreamCompression, 15 - JetstreamConfig, 16 - JetstreamConnector, 6 + events::{commit::CommitEvent, JetstreamEvent::Commit}, 7 + DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, 17 8 }; 18 9 19 10 #[derive(Parser, Debug)] ··· 41 32 }; 42 33 43 34 let jetstream = JetstreamConnector::new(config)?; 44 - let (receiver, _) = jetstream.connect().await?; 35 + let receiver = jetstream.connect().await?; 45 36 46 37 println!( 47 38 "Listening for '{}' events on DIDs: {:?}",
+1 -2
src/error.rs
··· 1 - ///! Various error types. 1 + //! Various error types. 2 2 use std::io; 3 - 4 3 5 4 use thiserror::Error; 6 5
+129 -67
src/lib.rs
··· 2 2 pub mod events; 3 3 pub mod exports; 4 4 5 - use std::io::{ 6 - Cursor, 7 - Read, 5 + use std::{ 6 + io::{Cursor, Read}, 7 + sync::Arc, 8 + time::Duration, 8 9 }; 9 10 10 11 use chrono::Utc; 11 - use futures_util::stream::StreamExt; 12 - use tokio::{ 13 - net::TcpStream, 14 - task::JoinHandle, 15 - }; 16 - use tokio_tungstenite::{ 17 - connect_async, 18 - tungstenite::Message, 19 - MaybeTlsStream, 20 - WebSocketStream, 21 - }; 12 + use futures_util::{stream::StreamExt, SinkExt}; 13 + use tokio::{net::TcpStream, sync::Mutex}; 14 + use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; 15 + use tokio_util::sync::CancellationToken; 22 16 use url::Url; 23 17 use zstd::dict::DecoderDictionary; 24 18 25 19 use crate::{ 26 - error::{ 27 - ConfigValidationError, 28 - ConnectionError, 29 - JetstreamEventError, 30 - }, 20 + error::{ConfigValidationError, ConnectionError, JetstreamEventError}, 31 21 events::JetstreamEvent, 32 22 }; 33 23 ··· 146 136 147 137 impl JetstreamConfig { 148 138 /// Constructs a new endpoint URL with the given [JetstreamConfig] applied. 149 - pub fn construct_endpoint(&self, endpoint: &String) -> Result<Url, url::ParseError> { 139 + pub fn construct_endpoint(&self, endpoint: &str) -> Result<Url, url::ParseError> { 150 140 let did_search_query = self 151 141 .wanted_dids 152 142 .iter() ··· 172 162 let params = did_search_query 173 163 .chain(collection_search_query) 174 164 .chain(std::iter::once(compression)) 175 - .chain(cursor.into_iter()) 165 + .chain(cursor) 176 166 .collect::<Vec<(&str, String)>>(); 177 167 178 - Url::parse_with_params(&endpoint, params) 168 + Url::parse_with_params(endpoint, params) 179 169 } 180 170 181 171 /// Validates the configuration to make sure it is within the limits of the Jetstream API. ··· 215 205 /// 216 206 /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances 217 207 /// of this receiver are dropped, the connection and task are automatically closed. 218 - pub async fn connect( 219 - &self, 220 - ) -> Result< 221 - ( 222 - JetstreamReceiver, 223 - JoinHandle<Result<(), JetstreamEventError>>, 224 - ), 225 - ConnectionError, 226 - > { 208 + pub async fn connect(&self) -> Result<JetstreamReceiver, ConnectionError> { 227 209 // We validate the config again for good measure. Probably not necessary but it can't hurt. 228 210 self.config 229 211 .validate() ··· 237 219 .construct_endpoint(&self.config.endpoint) 238 220 .map_err(ConnectionError::InvalidEndpoint)?; 239 221 240 - let (ws_stream, _) = connect_async(&configured_endpoint) 241 - .await 242 - .map_err(ConnectionError::WebSocketFailure)?; 222 + tokio::task::spawn(async move { 223 + let max_retries = 10; 224 + let base_delay_ms = 1_000; // 1 second 225 + let max_delay_ms = 30_000; // 30 seconds 243 226 244 - let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY); 227 + for retry_attempt in 0..max_retries { 228 + let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY); 229 + 230 + if let Ok((ws_stream, _)) = connect_async(&configured_endpoint).await { 231 + let _ = websocket_task(dict, ws_stream, send_channel.clone()).await; 232 + } 245 233 246 - // TODO: Internally creating and returning a tokio task might not be the best idea(?) 247 - let handle = tokio::task::spawn(websocket_task(dict, ws_stream, send_channel)); 234 + // Exponential backoff 235 + let delay_ms = base_delay_ms * (2_u64.pow(retry_attempt)); 236 + 237 + log::error!("Connection failed, retrying in {delay_ms}ms..."); 238 + tokio::time::sleep(Duration::from_millis(delay_ms.min(max_delay_ms))).await; 239 + log::info!("Attempting to reconnect...") 240 + } 241 + log::error!("Connection retries exhausted. Jetstream is disconnected."); 242 + }); 248 243 249 - Ok((receive_channel, handle)) 244 + Ok(receive_channel) 250 245 } 251 246 } 252 247 ··· 258 253 send_channel: JetstreamSender, 259 254 ) -> Result<(), JetstreamEventError> { 260 255 // TODO: Use the write half to allow the user to change configuration settings on the fly. 261 - let (_, mut read) = ws.split(); 256 + let (socket_write, mut socket_read) = ws.split(); 257 + let shared_socket_write = Arc::new(Mutex::new(socket_write)); 258 + 259 + let ping_cancellation_token = CancellationToken::new(); 260 + let mut ping_interval = tokio::time::interval(Duration::from_secs(30)); 261 + let ping_cancelled = ping_cancellation_token.clone(); 262 + let ping_shared_socket_write = shared_socket_write.clone(); 263 + tokio::spawn(async move { 264 + loop { 265 + ping_interval.tick().await; 266 + let false = ping_cancelled.is_cancelled() else { 267 + break; 268 + }; 269 + log::trace!("Sending ping"); 270 + match ping_shared_socket_write 271 + .lock() 272 + .await 273 + .send(Message::Ping("ping".as_bytes().to_vec())) 274 + .await 275 + { 276 + Ok(_) => (), 277 + Err(error) => { 278 + log::error!("Ping failed: {error}"); 279 + break; 280 + } 281 + } 282 + } 283 + }); 284 + 285 + let mut closing_connection = false; 262 286 loop { 263 - if let Some(Ok(message)) = read.next().await { 264 - match message { 265 - Message::Text(json) => { 266 - let event = serde_json::from_str::<JetstreamEvent>(&json) 267 - .map_err(JetstreamEventError::ReceivedMalformedJSON)?; 287 + match socket_read.next().await { 288 + Some(Ok(message)) => { 289 + match message { 290 + Message::Text(json) => { 291 + let event = serde_json::from_str::<JetstreamEvent>(&json) 292 + .map_err(JetstreamEventError::ReceivedMalformedJSON)?; 268 293 269 - if let Err(_) = send_channel.send(event) { 270 - // We can assume that all receivers have been dropped, so we can close the 271 - // connection and exit the task. 272 - log::info!( 294 + if send_channel.send(event).is_err() { 295 + // We can assume that all receivers have been dropped, so we can close the 296 + // connection and exit the task. 297 + log::info!( 273 298 "All receivers for the Jetstream connection have been dropped, closing connection." 274 299 ); 275 - return Ok(()); 300 + closing_connection = true; 301 + } 276 302 } 277 - } 278 - Message::Binary(zstd_json) => { 279 - let mut cursor = Cursor::new(zstd_json); 280 - let mut decoder = 281 - zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary) 282 - .map_err(JetstreamEventError::CompressionDictionaryError)?; 303 + Message::Binary(zstd_json) => { 304 + let mut cursor = Cursor::new(zstd_json); 305 + let mut decoder = zstd::stream::Decoder::with_prepared_dictionary( 306 + &mut cursor, 307 + &dictionary, 308 + ) 309 + .map_err(JetstreamEventError::CompressionDictionaryError)?; 283 310 284 - let mut json = String::new(); 285 - decoder 286 - .read_to_string(&mut json) 287 - .map_err(JetstreamEventError::CompressionDecoderError)?; 311 + let mut json = String::new(); 312 + decoder 313 + .read_to_string(&mut json) 314 + .map_err(JetstreamEventError::CompressionDecoderError)?; 288 315 289 - let event = serde_json::from_str::<JetstreamEvent>(&json) 290 - .map_err(JetstreamEventError::ReceivedMalformedJSON)?; 316 + let event = serde_json::from_str::<JetstreamEvent>(&json) 317 + .map_err(JetstreamEventError::ReceivedMalformedJSON)?; 291 318 292 - if let Err(_) = send_channel.send(event) { 293 - // We can assume that all receivers have been dropped, so we can close the 294 - // connection and exit the task. 295 - log::info!( 319 + if send_channel.send(event).is_err() { 320 + // We can assume that all receivers have been dropped, so we can close the 321 + // connection and exit the task. 322 + log::info!( 296 323 "All receivers for the Jetstream connection have been dropped, closing connection..." 297 324 ); 298 - return Ok(()); 325 + closing_connection = true; 326 + } 327 + } 328 + Message::Ping(vec) => { 329 + log::trace!("Ping recieved, responding"); 330 + _ = shared_socket_write 331 + .lock() 332 + .await 333 + .send(Message::Pong(vec)) 334 + .await; 335 + } 336 + Message::Close(close_frame) => { 337 + if let Some(close_frame) = close_frame { 338 + let reason = close_frame.reason; 339 + let code = close_frame.code; 340 + log::trace!("Connection closed. Reason: {reason}, Code: {code}"); 341 + } 299 342 } 343 + Message::Pong(pong) => { 344 + let pong_payload = 345 + String::from_utf8(pong).unwrap_or("Invalid payload".to_string()); 346 + log::trace!("Pong recieved. Payload: {pong_payload}"); 347 + } 348 + Message::Frame(_) => (), 300 349 } 301 - _ => {} 302 350 } 351 + Some(Err(error)) => { 352 + log::error!("Web socket error: {error}"); 353 + ping_cancellation_token.cancel(); 354 + closing_connection = true; 355 + } 356 + None => { 357 + log::error!("No web socket result"); 358 + ping_cancellation_token.cancel(); 359 + closing_connection = true; 360 + } 361 + } 362 + if closing_connection { 363 + _ = shared_socket_write.lock().await.close().await; 364 + return Ok(()); 303 365 } 304 366 } 305 367 }