don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(jetstream): move cursor in SubscriberOptions

Signed-off-by: tjh <x@tjh.dev>

tjh b8816560 7cdc24a9

+94 -90
+1 -3
crates/jetstream/src/client_builder.rs
··· 8 8 9 9 #[derive(Default)] 10 10 pub struct JetstreamClientBuilder { 11 - cursor: Option<u128>, 12 11 options: SubscriberOptions, 13 12 } 14 13 ··· 26 27 metrics.clone(), 27 28 instance, 28 29 Arc::clone(&options), 29 - self.cursor, 30 30 shutdown.child_token(), 31 31 ); 32 32 ··· 75 77 } 76 78 77 79 pub fn cursor(mut self, cursor: Option<u128>) -> Self { 78 - self.cursor = cursor; 80 + self.options.cursor = cursor; 79 81 self 80 82 } 81 83 }
+72 -51
crates/jetstream/src/subscriber_options.rs
··· 1 - use core::{error, fmt}; 2 1 use std::collections::HashSet; 3 2 4 3 use serde::{Deserialize, Serialize, Serializer}; ··· 54 55 /// will be normalized to zero when serialized. 55 56 #[serde(serialize_with = "serialize_max_message_size")] 56 57 pub max_message_size_bytes: i64, 58 + 59 + #[serde(skip)] 60 + pub cursor: Option<u128>, 57 61 } 58 62 59 63 fn normalize_max_message_size(value: i64) -> i64 { ··· 112 110 normalize_max_message_size(self.max_message_size_bytes) 113 111 } 114 112 115 - /// Compute the length, in bytes, of the query string required to express 116 - /// this set of options in a Jetstream `/subscribe` request. 117 - pub fn subscribe_url_len(&self, base: &url::Url) -> usize { 113 + fn subscribe_url_len(&self, base: &url::Url) -> usize { 118 114 const WANTED_DIDS_LEN: usize = "wantedDids=".len(); 119 115 const WANTED_COLLECTIONS_LEN: usize = "wantedCollections=".len(); 120 116 121 - let (wdl, wdc): (_, usize) = self.wanted_dids.iter().fold((0, 0), |(sl, sc), v| { 122 - (sl + WANTED_DIDS_LEN + v.len(), sc + 1) 123 - }); 117 + let (wanted_did_len, wanted_dids_count) = 118 + self.wanted_dids.iter().fold((0, 0), |(len, count), val| { 119 + (len + WANTED_DIDS_LEN + val.len(), count + 1) 120 + }); 124 121 125 - let (wcl, wcc): (_, usize) = self.wanted_collections.iter().fold((0, 0), |(sl, sc), v| { 126 - (sl + WANTED_COLLECTIONS_LEN + v.len(), sc + 1) 127 - }); 122 + let (wanted_col_len, wanted_col_count) = self 123 + .wanted_collections 124 + .iter() 125 + .fold((0, 0), |(len, count), val| { 126 + (len + WANTED_COLLECTIONS_LEN + val.len(), count + 1) 127 + }); 128 128 129 - let mml = match self.max_message_size() { 130 - 0 => 0, 131 - n if wdc > 0 || wcc > 0 => 1 + n.to_string().len() + "maxMessageSizeBytes=".len(), 132 - n => n.to_string().len() + "maxMessageSizeBytes=".len(), 129 + let (message_size_len, message_size_count) = match self.max_message_size() { 130 + 0 => (0, 0), 131 + n => (n.to_string().len() + "maxMessageSizeBytes=".len(), 1), 133 132 }; 134 133 135 - mml + (wdl + wdc.saturating_sub(1)) + wcl + wcc.saturating_sub(1) + base.as_str().len() 134 + let param_count = wanted_dids_count + wanted_col_count + message_size_count; 135 + base.as_str().len() + message_size_len + wanted_did_len + wanted_col_len + param_count 136 136 } 137 137 138 - /// Appends the subscription options to the specified URL. 139 - pub fn subscribe_url(&self, url: &url::Url, cursor: &Option<u128>) -> SubscribeMethod { 138 + /// Construct the Jetstream subscribe URL, returning a tuple of the URL and a boolean 139 + /// indicating whether the client should send an options update message on connect. 140 + pub fn subscribe_url(&self, url: &url::Url) -> (url::Url, bool) { 140 141 let mut url = url.to_owned(); 141 142 url.set_path("/subscribe"); 142 - url.query_pairs_mut().clear(); 143 + url.set_query(None); 143 144 144 - if let Some(cursor) = cursor { 145 + if let Some(cursor) = self.cursor { 145 146 url.query_pairs_mut() 146 147 .append_pair("cursor", &cursor.to_string()); 147 148 } 148 149 149 150 if self.subscribe_url_len(&url) > MAX_URL_LENGTH { 150 151 url.query_pairs_mut().append_pair("requireHello", "true"); 151 - return SubscribeMethod::Hello(url); 152 + return (url, true); 152 153 } 153 154 154 - { 155 + if !self.wanted_dids.is_empty() || !self.wanted_collections.is_empty() { 155 156 let mut query = url.query_pairs_mut(); 156 157 for collection in &self.wanted_collections { 157 158 query.append_pair("wantedCollections", collection); ··· 162 157 for did in &self.wanted_dids { 163 158 query.append_pair("wantedDids", did.as_str()); 164 159 } 165 - if self.max_message_size() > 0 { 166 - query.append_pair( 167 - "maxMessageSizeBytes", 168 - &self.max_message_size_bytes.to_string(), 169 - ); 170 - } 171 160 } 172 161 173 - SubscribeMethod::Query(url) 162 + if self.max_message_size() > 0 { 163 + url.query_pairs_mut().append_pair( 164 + "maxMessageSizeBytes", 165 + &self.max_message_size_bytes.to_string(), 166 + ); 167 + } 168 + 169 + (url, false) 174 170 } 175 - } 176 - 177 - #[derive(Debug)] 178 - pub struct UrlTooLong; 179 - 180 - impl fmt::Display for UrlTooLong { 181 - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 182 - f.write_str("URL too long") 183 - } 184 - } 185 - 186 - impl error::Error for UrlTooLong {} 187 - 188 - pub enum SubscribeMethod { 189 - Query(url::Url), 190 - Hello(url::Url), 191 171 } 192 172 193 173 #[cfg(test)] ··· 182 192 use super::SubscriberOptions; 183 193 184 194 #[test] 185 - fn query_len() { 195 + fn default() { 196 + let base = "wss://jetstream1.us-east.bsky.network".parse().unwrap(); 197 + let options = SubscriberOptions::default(); 198 + let (url, _) = options.subscribe_url(&base); 199 + assert_eq!( 200 + url.as_str(), 201 + "wss://jetstream1.us-east.bsky.network/subscribe" 202 + ); 203 + } 204 + 205 + #[test] 206 + fn one_collection() { 207 + let base = "wss://jetstream1.us-east.bsky.network".parse().unwrap(); 186 208 let mut options = SubscriberOptions::default(); 209 + options 210 + .add_collection(Nsid::from_static("app.bsky.feed.like").into()) 211 + .unwrap(); 212 + let (url, _) = options.subscribe_url(&base); 213 + assert_eq!( 214 + url.as_str(), 215 + "wss://jetstream1.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.like" 216 + ); 217 + } 218 + 219 + #[test] 220 + fn query_len() { 221 + let url: url::Url = "wss://example.url/subscribe".parse().unwrap(); 222 + let mut options = SubscriberOptions::default(); 223 + assert_eq!( 224 + options.subscribe_url_len(&url), 225 + "wss://example.url/subscribe".len() 226 + ); 227 + 187 228 options 188 229 .add_collection(Nsid::from_static("sh.tangled.*").into()) 189 230 .unwrap(); 190 231 191 232 assert_eq!( 192 - options.subscribe_url_len(&"https://example.url/subscribe".parse().unwrap()), 193 - "wantedCollections=sh.tangled.*".len() 233 + options.subscribe_url_len(&url), 234 + "wss://example.url/subscribe?wantedCollections=sh.tangled.*".len() 194 235 ); 195 236 196 237 options 197 238 .add_collection(Nsid::from_static("app.bsky.*").into()) 198 239 .unwrap(); 199 240 assert_eq!( 200 - options.subscribe_url_len(&"https://example.url/subscribe".parse().unwrap()), 201 - "wantedCollections=sh.tangled.*&wantedCollections=app.bsky.*".len() 241 + options.subscribe_url_len(&url), 242 + "wss://example.url/subscribe?wantedCollections=sh.tangled.*&wantedCollections=app.bsky.*".len() 202 243 ); 203 244 204 245 options.max_message_size_bytes = 1_000_000; 205 246 assert_eq!( 206 - options.subscribe_url_len(&"https://example.url/subscribe".parse().unwrap()), 207 - "wantedCollections=sh.tangled.*&wantedCollections=app.bsky.*&maxMessageSizeBytes=1000000".len() 247 + options.subscribe_url_len(&url), 248 + "wss://example.url/subscribe?wantedCollections=sh.tangled.*&wantedCollections=app.bsky.*&maxMessageSizeBytes=1000000".len() 208 249 ); 209 250 } 210 251 }
+21 -36
crates/jetstream/src/task.rs
··· 1 - use crate::{ 2 - client::ClientCommand, 3 - metrics::Metrics, 4 - subscriber_options::{SubscribeMethod, SubscriberOptions}, 5 - }; 1 + use crate::{client::ClientCommand, metrics::Metrics, subscriber_options::SubscriberOptions}; 6 2 use bytes::Bytes; 7 3 use futures_util::{SinkExt, StreamExt}; 8 4 use serde::Deserialize; ··· 31 35 32 36 #[derive(Default)] 33 37 struct State { 34 - cursor: Option<u128>, 35 38 metrics: Metrics, 36 39 } 37 40 ··· 40 45 metrics: Metrics, 41 46 instance: url::Url, 42 47 options: Arc<Mutex<SubscriberOptions>>, 43 - initial_cursor: Option<u128>, 44 48 shutdown: CancellationToken, 45 49 ) { 46 - let mut state = State { 47 - cursor: initial_cursor, 48 - metrics, 49 - }; 50 + let mut state = State { metrics }; 50 51 51 52 'outer: loop { 52 - let (subscribe_url, require_hello) = match options 53 - .lock() 54 - .unwrap() 55 - .subscribe_url(&instance, &state.cursor) 56 - { 57 - SubscribeMethod::Query(url) => (url, false), 58 - SubscribeMethod::Hello(url) => (url, true), 59 - }; 60 - 53 + let (subscribe_url, require_hello) = options.lock().unwrap().subscribe_url(&instance); 61 54 tracing::debug!(%subscribe_url, "connecting to jetstream"); 62 55 let uri: Uri = subscribe_url 63 56 .as_str() ··· 101 118 ReadOutcome::Timeout => { 102 119 tracing::error!("time since last received message exceeds threshold"); 103 120 state.metrics.modify(|mut data| data.timeouts += 1); 104 - state.cursor = rewind_cursor(state.cursor); 121 + rewind_cursor(&options); 105 122 break; 106 123 107 124 }, ··· 141 158 kind: &'a str, 142 159 } 143 160 144 - let mut new_cursor = state.cursor; 145 - 146 161 // Deserialize just the event timestamp and event kind. 147 - match serde_json::from_slice::<PartialEvent>(&message) { 162 + let new_cursor = match serde_json::from_slice::<PartialEvent>(&message) { 148 163 Ok(event) => { 149 164 state.metrics.increment_message_kind(event.kind); 150 - new_cursor.replace(event.time_us.into()); 165 + event.time_us.into() 151 166 } 152 167 Err(error) => { 153 - match std::str::from_utf8(&message) { 154 - Ok(payload) => { 155 - tracing::error!(?error, ?payload, "failed to deserialize event") 156 - } 157 - Err(_) => tracing::error!(?error, ?message, "failed to deserialize event"), 158 - } 168 + tracing::error!( 169 + ?error, 170 + message = %String::from_utf8_lossy(&message), 171 + "failed to deserialize event" 172 + ); 159 173 break; 160 174 } 161 - } 175 + }; 162 176 163 177 state.metrics.modify(|mut data| data.messages_received += 1); 164 178 if let Err(error) = event_tx.send_async(message).await { ··· 168 188 } 169 189 170 190 // Update the cursor since the message has been dispatched. 171 - state.cursor = new_cursor; 191 + set_cursor(&options, new_cursor); 172 192 } 173 193 174 194 state.metrics.modify(|mut data| data.disconnects += 1); ··· 274 294 Ok(ReadOutcome::Closed) 275 295 } 276 296 277 - fn rewind_cursor(mut cursor: Option<u128>) -> Option<u128> { 278 - if let Some(value) = &mut cursor { 297 + fn rewind_cursor(options: &Mutex<SubscriberOptions>) { 298 + let mut options = options.lock().unwrap(); 299 + if let Some(value) = &mut options.cursor { 279 300 *value = value.saturating_sub(REWIND.as_micros()) 280 301 } 281 - cursor 302 + } 303 + 304 + fn set_cursor(options: &Mutex<SubscriberOptions>, value: u128) { 305 + let mut options = options.lock().unwrap(); 306 + options.cursor = Some(value); 282 307 } 283 308 284 309 async fn send_options_update<S, E>(