don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(knot): update for new Jetstream API

Signed-off-by: tjh <x@tjh.dev>

tjh 8823a0e2 b8816560

+474 -255
+1 -1
Cargo.lock
··· 2647 2647 "atproto", 2648 2648 "bytes", 2649 2649 "clap", 2650 + "fastrand", 2650 2651 "flume 0.12.0", 2651 2652 "futures-util", 2652 - "lexicon", 2653 2653 "serde", 2654 2654 "serde_json", 2655 2655 "thiserror",
+2
Cargo.toml
··· 47 47 lto = "fat" 48 48 strip = true 49 49 50 + [profile.dev] 51 + panic = "abort"
+1 -1
crates/jetstream/Cargo.toml
··· 9 9 10 10 [dependencies] 11 11 atproto = { workspace = true, features = ["serde"] } 12 - lexicon = { workspace = true, optional = true } 13 12 14 13 serde.workspace = true 15 14 serde_json.workspace = true ··· 27 28 zstd = { version = "0.13.3", optional = true } 28 29 clap = { version = "4.5.50", features = ["derive"], optional = true } 29 30 thiserror.workspace = true 31 + fastrand = "2.3.0" 30 32 31 33 [features] 32 34 default = ["clap", "zstd"]
+33 -15
crates/jetstream/src/client.rs
··· 103 103 } 104 104 } 105 105 106 + pub type CommandResponse<T> = oneshot::Sender<Result<T, JetstreamTaskError>>; 107 + 106 108 pub enum ClientCommand { 107 - SubscriberOptionsUpdate(oneshot::Sender<Result<(), JetstreamTaskError>>), 108 - Shutdown(oneshot::Sender<Result<(), JetstreamTaskError>>), 109 + SubscriberOptionsUpdate(CommandResponse<()>), 110 + Shutdown(CommandResponse<()>), 109 111 } 110 112 111 113 impl ClientCommand { ··· 159 157 } 160 158 161 159 #[derive(Debug)] 162 - pub struct Receiver { 160 + pub struct JetstreamReceiver { 163 161 event_rx: flume::Receiver<Bytes>, 164 162 } 165 163 166 - impl Receiver { 164 + impl JetstreamReceiver { 167 165 pub(crate) fn new(event_rx: flume::Receiver<Bytes>) -> Self { 168 166 Self { event_rx } 169 167 } 170 168 171 - pub async fn recv_async(&self) -> Option<EventRef> { 169 + /// Asynchronously receive a Jetstream event. 170 + /// 171 + /// Returns [`None`] when the Jetstream client is shutdown. 172 + pub async fn recv_async(&self) -> Option<JetstreamEvent> { 172 173 let bytes = self.event_rx.recv_async().await.ok()?; 173 - Some(EventRef::new(bytes)) 174 + Some(JetstreamEvent::new(bytes)) 174 175 } 175 176 176 - pub fn recv(&self) -> Option<EventRef> { 177 + /// Synchronously receive a Jetstream event. 178 + /// 179 + /// Returns [`None`] when the Jetstream client is shutdown. 180 + pub fn recv(&self) -> Option<JetstreamEvent> { 177 181 let bytes = self.event_rx.recv().ok()?; 178 - Some(EventRef::new(bytes)) 182 + Some(JetstreamEvent::new(bytes)) 183 + } 184 + 185 + /// Consume the Jetstream receiver and return the wrapped flume channel receiver. 186 + pub fn to_inner(self) -> flume::Receiver<Bytes> { 187 + self.event_rx 179 188 } 180 189 } 181 190 182 191 #[derive(Debug)] 183 - pub struct EventRef { 192 + pub struct JetstreamEvent { 184 193 bytes: Bytes, 185 194 } 186 195 187 - impl EventRef { 188 - fn new(bytes: Bytes) -> Self { 189 - Self { bytes } 190 - } 191 - 196 + impl JetstreamEvent { 192 197 pub fn as_bytes(&self) -> &[u8] { 193 198 &self.bytes 194 199 } 200 + 201 + /// Consume the event, returning the internal [`Bytes`] buffer. 202 + pub fn to_inner(self) -> Bytes { 203 + self.bytes 204 + } 205 + 206 + fn new(bytes: Bytes) -> Self { 207 + Self { bytes } 208 + } 195 209 } 196 210 197 - impl<'a> EventRef { 211 + impl<'a> JetstreamEvent { 198 212 pub fn deserialize(&'a self) -> Result<Event<'a>, serde_json::Error> { 199 213 let value = serde_json::from_slice(&self.bytes)?; 200 214 Ok(value)
-81
crates/jetstream/src/client_builder.rs
··· 1 - use std::sync::{Arc, Mutex}; 2 - 3 - use super::JetstreamClient; 4 - use crate::{Receiver, metrics::Metrics, subscriber_options::SubscriberOptions}; 5 - use atproto::{Did, Nsid}; 6 - use tokio_util::sync::CancellationToken; 7 - use url::Url; 8 - 9 - #[derive(Default)] 10 - pub struct JetstreamClientBuilder { 11 - options: SubscriberOptions, 12 - } 13 - 14 - impl JetstreamClientBuilder { 15 - pub fn build(self, instance: Url) -> (JetstreamClient, Receiver, impl Future<Output = ()>) { 16 - let (event_tx, event_rx) = flume::bounded(8); 17 - let (client_tx, client_rx) = flume::bounded(8); 18 - 19 - let options = Arc::new(Mutex::new(self.options)); 20 - let metrics = Metrics::new(); 21 - let shutdown = CancellationToken::new(); 22 - 23 - let task = crate::task::jetstream_subscriber( 24 - event_tx, 25 - client_rx, 26 - metrics.clone(), 27 - instance, 28 - Arc::clone(&options), 29 - shutdown.child_token(), 30 - ); 31 - 32 - let client = JetstreamClient::new(client_tx, options, metrics, shutdown); 33 - let receiver = Receiver::new(event_rx); 34 - 35 - (client, receiver, task) 36 - } 37 - 38 - pub fn add_collection(&mut self, nsid: Box<Nsid>) -> &mut Self { 39 - self.options.add_collection(nsid); 40 - self 41 - } 42 - 43 - pub fn collection(mut self, nsid: Box<Nsid>) -> Self { 44 - self.add_collection(nsid); 45 - self 46 - } 47 - 48 - pub fn collections(mut self, nsids: impl IntoIterator<Item = Box<Nsid>>) -> Self { 49 - for nsid in nsids { 50 - self.add_collection(nsid); 51 - } 52 - self 53 - } 54 - 55 - pub fn add_did(&mut self, did: Box<Did>) -> &mut Self { 56 - self.options.add_did(did); 57 - self 58 - } 59 - 60 - pub fn did(mut self, did: Box<Did>) -> Self { 61 - self.add_did(did); 62 - self 63 - } 64 - 65 - pub fn dids(mut self, dids: impl IntoIterator<Item = Box<Did>>) -> Self { 66 - for did in dids { 67 - self.add_did(did); 68 - } 69 - self 70 - } 71 - 72 - pub fn max_message_size(mut self, bytes_len: i64) -> Self { 73 - self.options.max_message_size_bytes = bytes_len; 74 - self 75 - } 76 - 77 - pub fn cursor(mut self, cursor: Option<u128>) -> Self { 78 - self.options.cursor = cursor; 79 - self 80 - } 81 - }
+107
crates/jetstream/src/client_config.rs
··· 1 + use std::{ 2 + borrow::Cow, 3 + sync::{Arc, Mutex}, 4 + }; 5 + 6 + use futures_util::FutureExt as _; 7 + use tokio_util::sync::CancellationToken; 8 + 9 + use crate::{ 10 + JetstreamClient, JetstreamReceiver, PUBLIC_JETSTREAM_US_EAST1, PUBLIC_JETSTREAM_US_EAST2, 11 + PUBLIC_JETSTREAM_US_WEST1, PUBLIC_JETSTREAM_US_WEST2, client_options::ClientOptions, 12 + metrics::Metrics, subscriber_options::SubscriberOptions, task::JetstreamTask, 13 + }; 14 + 15 + #[derive(Clone, Debug, Default)] 16 + pub struct JetstreamConfig { 17 + pub client_options: ClientOptions, 18 + pub subscriber_options: SubscriberOptions, 19 + } 20 + 21 + impl JetstreamConfig { 22 + /// Create default a [`JetstreamConfig`] connecting to jetstream{1,2}.us-east.bsky.network 23 + /// instances. 24 + pub fn us_east() -> Self { 25 + Self { 26 + client_options: ClientOptions { 27 + instances: vec![ 28 + Cow::Borrowed(PUBLIC_JETSTREAM_US_EAST1), 29 + Cow::Borrowed(PUBLIC_JETSTREAM_US_EAST2), 30 + ], 31 + ..Default::default() 32 + }, 33 + ..Default::default() 34 + } 35 + } 36 + 37 + /// Create default a [`JetstreamConfig`] connecting to jetstream{1,2}.us-west.bsky.network 38 + /// instances. 39 + pub fn us_west() -> Self { 40 + Self { 41 + client_options: ClientOptions { 42 + instances: vec![ 43 + Cow::Borrowed(PUBLIC_JETSTREAM_US_WEST1), 44 + Cow::Borrowed(PUBLIC_JETSTREAM_US_WEST2), 45 + ], 46 + ..Default::default() 47 + }, 48 + ..Default::default() 49 + } 50 + } 51 + 52 + pub fn connect(self) -> (JetstreamClient, JetstreamReceiver, JetstreamTask) { 53 + let (event_tx, event_rx) = flume::bounded(8); 54 + let (client_tx, client_rx) = flume::bounded(8); 55 + 56 + let options = Arc::new(Mutex::new(self.subscriber_options)); 57 + let metrics = Metrics::new(); 58 + let shutdown = CancellationToken::new(); 59 + 60 + let task = JetstreamTask(Some( 61 + crate::task::jetstream_subscriber( 62 + event_tx, 63 + client_rx, 64 + metrics.clone(), 65 + self.client_options, 66 + Arc::clone(&options), 67 + shutdown.child_token(), 68 + ) 69 + .boxed(), 70 + )); 71 + 72 + let client = JetstreamClient::new(client_tx, options, metrics, shutdown); 73 + let receiver = JetstreamReceiver::new(event_rx); 74 + 75 + (client, receiver, task) 76 + } 77 + 78 + pub fn with_cursor(mut self, cursor: Option<u128>) -> Self { 79 + self.subscriber_options.cursor = cursor; 80 + self 81 + } 82 + } 83 + 84 + #[cfg(test)] 85 + mod tests { 86 + use super::JetstreamConfig; 87 + 88 + #[ignore] 89 + #[tokio::test] 90 + async fn connect_default() { 91 + let (client, receiver, task) = JetstreamConfig::default().connect(); 92 + let handle = tokio::spawn(task); 93 + 94 + let Some(event) = receiver.recv_async().await else { 95 + panic!("No messages received"); 96 + }; 97 + 98 + let Ok(event) = event.deserialize() else { 99 + panic!("Failed to deserialize message"); 100 + }; 101 + 102 + eprintln!("{event:?}"); 103 + 104 + drop(client); 105 + handle.await.unwrap(); 106 + } 107 + }
+31
crates/jetstream/src/client_options.rs
··· 1 + use std::{borrow::Cow, time::Duration}; 2 + 3 + #[derive(Clone, Debug)] 4 + pub struct ClientOptions { 5 + pub instances: Vec<Cow<'static, str>>, 6 + pub received_timeout: Duration, 7 + pub reconnect_backoff_min: Duration, 8 + pub reconnect_backoff_max: Duration, 9 + 10 + /// Max fraction of the reconnect delay to randomize by± 11 + pub reconnect_backoff_jitter: f32, 12 + 13 + pub rewind: Duration, 14 + } 15 + 16 + impl Default for ClientOptions { 17 + fn default() -> Self { 18 + Self { 19 + instances: crate::PUBLIC_JETSTREAM_INSTANCES 20 + .iter() 21 + .map(|&value| Cow::Borrowed(value)) 22 + .collect(), 23 + received_timeout: Duration::from_secs(30), 24 + reconnect_backoff_min: Duration::from_millis(150), 25 + reconnect_backoff_max: Duration::from_secs(300), 26 + // Randomize reconnect delay by ±10% 27 + reconnect_backoff_jitter: 0.1, 28 + rewind: Duration::from_secs(3), 29 + } 30 + } 31 + }
+15 -20
crates/jetstream/src/lib.rs
··· 1 1 mod client; 2 - mod client_builder; 3 2 mod de; 4 - pub mod metrics; 5 - pub mod subscriber_options; 6 3 mod task; 7 4 5 + pub mod client_config; 6 + pub mod client_options; 7 + pub mod metrics; 8 + pub mod subscriber_options; 9 + 8 10 pub use atproto::{Did, Nsid}; 9 - pub use client::{EventRef, JetstreamClient, JetstreamClientError, Receiver}; 10 - pub use client_builder::JetstreamClientBuilder; 11 + pub use client::{JetstreamClient, JetstreamClientError, JetstreamEvent, JetstreamReceiver}; 11 12 pub use de::{AccountStatus, Commit, CommitEvent, Delete, Event, Identity, InnerAccount}; 12 13 pub use serde_json::Value; 13 14 15 + pub const PUBLIC_JETSTREAM_US_EAST1: &str = "wss://jetstream1.us-east.bsky.network"; 16 + pub const PUBLIC_JETSTREAM_US_EAST2: &str = "wss://jetstream2.us-east.bsky.network"; 17 + pub const PUBLIC_JETSTREAM_US_WEST1: &str = "wss://jetstream1.us-west.bsky.network"; 18 + pub const PUBLIC_JETSTREAM_US_WEST2: &str = "wss://jetstream2.us-west.bsky.network"; 19 + 14 20 /// Official public Jetstream instances. 15 21 pub const PUBLIC_JETSTREAM_INSTANCES: &[&str] = &[ 16 - "wss://jetstream1.us-east.bsky.network", 17 - "wss://jetstream2.us-east.bsky.network", 18 - "wss://jetstream1.us-west.bsky.network", 19 - "wss://jetstream2.us-west.bsky.network", 22 + PUBLIC_JETSTREAM_US_EAST1, 23 + PUBLIC_JETSTREAM_US_EAST2, 24 + PUBLIC_JETSTREAM_US_WEST1, 25 + PUBLIC_JETSTREAM_US_WEST2, 20 26 ]; 21 - 22 - pub fn builder() -> JetstreamClientBuilder { 23 - Default::default() 24 - } 25 - 26 - pub fn subscribe( 27 - instance: impl AsRef<str>, 28 - ) -> (JetstreamClient, Receiver, impl Future<Output = ()>) { 29 - use url::Url; 30 - builder().build(Url::parse(instance.as_ref()).expect("Instance should be a valid URL")) 31 - }
+9 -18
crates/jetstream/src/main.rs
··· 2 2 use std::path::PathBuf; 3 3 4 4 use clap::Parser; 5 - use url::Url; 6 5 7 6 #[derive(Parser)] 8 7 pub struct Arguments { 9 - /// Jetstream instance 10 - #[arg( 11 - long, 12 - short = 'I', 13 - default_value = "wss://jetstream1.us-east.bsky.network" 14 - )] 15 - pub instance: Url, 16 - 17 8 /// Initial cursor in seconds. 18 9 #[arg(long, short = 'C', allow_negative_numbers = true)] 19 10 pub cursor: Option<i64>, ··· 35 44 time::{Duration, SystemTime, UNIX_EPOCH}, 36 45 }; 37 46 38 - use jetstream::Event; 47 + use jetstream::{Event, client_config::JetstreamConfig}; 39 48 40 49 #[tokio::main(flavor = "current_thread")] 41 50 async fn main() { ··· 58 67 None => None, 59 68 }; 60 69 61 - let mut builder = jetstream::builder().max_message_size(0).cursor(cursor); 70 + let mut config = JetstreamConfig::default().with_cursor(cursor); 62 71 for mut filter in arguments.filter.iter().cloned() { 63 72 if let Ok(did) = filter.parse() { 64 - builder.add_did(did); 73 + config.subscriber_options.add_did(did).unwrap(); 65 74 } else { 66 75 if filter.ends_with('.') { 67 76 filter.push('*'); 68 77 } 69 - builder.add_collection(filter.try_into().unwrap()); 78 + config 79 + .subscriber_options 80 + .add_collection(filter.try_into().unwrap()) 81 + .unwrap(); 70 82 } 71 83 } 72 84 73 - #[cfg(not(feature = "lexicon"))] 74 - let (client, rx, task) = builder.build(arguments.instance); 75 - 76 - #[cfg(feature = "lexicon")] 77 - let (client, rx, task) = builder.build(arguments.instance); 85 + tracing::debug!(?config); 86 + let (client, rx, task) = config.connect(); 78 87 79 88 // Spawn the client task. 80 89 let handle = tokio::spawn(task);
+132 -60
crates/jetstream/src/subscriber_options.rs
··· 1 1 use std::collections::HashSet; 2 2 3 - use serde::{Deserialize, Serialize, Serializer}; 3 + use serde::{Deserialize, Serialize}; 4 4 5 5 use crate::{Did, Nsid}; 6 6 ··· 11 11 // @TODO Review 12 12 pub const MAX_URL_LENGTH: usize = 4000; 13 13 14 - /// Subscriber sourced message. 15 - /// 16 - /// Ref: <https://github.com/bluesky-social/jetstream?tab=readme-ov-file#subscriber-sourced-messages> 17 - /// 18 - #[derive(Debug, Serialize)] 19 - #[serde(tag = "type", content = "payload", rename_all = "snake_case")] 20 - pub enum SubscriberSourcedMessage<'a> { 21 - OptionsUpdate(&'a SubscriberOptions), 22 - } 23 - 24 - impl<'a> SubscriberSourcedMessage<'a> { 25 - /// Serialize the [`SubscriberSourcedMessage`] to JSON. 26 - pub fn to_json(&self) -> String { 27 - serde_json::to_string(self).expect("SubscriberSourcedMessage should be serializable") 28 - } 29 - } 30 - 31 14 /// Jetstream subscription options. 32 15 /// 33 16 /// Can either be appended to the `/subscribe` URL on connection to the Jetstream instance ··· 18 35 /// 19 36 /// Ref: <https://github.com/bluesky-social/jetstream?tab=readme-ov-file#options-updates> 20 37 /// 21 - #[derive(Debug, Default, Deserialize, Serialize)] 38 + #[derive(Clone, Debug, Default, Deserialize, Serialize)] 22 39 #[serde(rename_all = "camelCase")] 23 40 pub struct SubscriberOptions { 24 41 /// Collection NSIDs to filter which records are received. 25 42 /// 26 43 /// Maximum: 100 27 - wanted_collections: HashSet<Box<Nsid>>, 44 + pub wanted_collections: HashSet<Box<Nsid>>, 28 45 29 46 /// Repository DIDs to filter which records are received. 30 47 /// 31 48 /// Maximum: 10_000 32 - wanted_dids: HashSet<Box<Did>>, 49 + pub wanted_dids: HashSet<Box<Did>>, 33 50 34 51 /// Maximum message size in bytes the subscriber wants to receive. 35 52 /// 36 53 /// Zero means no limit, negative values are treated as zero by Jetstream, and 37 54 /// will be normalized to zero when serialized. 38 - #[serde(serialize_with = "serialize_max_message_size")] 55 + #[serde(with = "max_message_size")] 39 56 pub max_message_size_bytes: i64, 40 57 41 - #[serde(skip)] 42 58 pub cursor: Option<u128>, 43 - } 44 - 45 - fn normalize_max_message_size(value: i64) -> i64 { 46 - value.abs() 47 - } 48 - 49 - fn serialize_max_message_size<S>(value: &i64, serializer: S) -> Result<S::Ok, S::Error> 50 - where 51 - S: Serializer, 52 - { 53 - serializer.serialize_i64(normalize_max_message_size(*value)) 54 59 } 55 60 56 61 impl SubscriberOptions { ··· 83 112 normalize_max_message_size(self.max_message_size_bytes) 84 113 } 85 114 86 - fn subscribe_url_len(&self, base: &url::Url) -> usize { 87 - const WANTED_DIDS_LEN: usize = "wantedDids=".len(); 88 - const WANTED_COLLECTIONS_LEN: usize = "wantedCollections=".len(); 89 - 90 - let (wanted_did_len, wanted_dids_count) = 91 - self.wanted_dids.iter().fold((0, 0), |(len, count), val| { 92 - (len + WANTED_DIDS_LEN + val.len(), count + 1) 93 - }); 94 - 95 - let (wanted_col_len, wanted_col_count) = self 96 - .wanted_collections 97 - .iter() 98 - .fold((0, 0), |(len, count), val| { 99 - (len + WANTED_COLLECTIONS_LEN + val.len(), count + 1) 100 - }); 101 - 102 - let (message_size_len, message_size_count) = match self.max_message_size() { 103 - 0 => (0, 0), 104 - n => (n.to_string().len() + "maxMessageSizeBytes=".len(), 1), 105 - }; 106 - 107 - let param_count = wanted_dids_count + wanted_col_count + message_size_count; 108 - base.as_str().len() + message_size_len + wanted_did_len + wanted_col_len + param_count 109 - } 110 - 111 115 /// Construct the Jetstream subscribe URL, returning a tuple of the URL and a boolean 112 116 /// indicating whether the client should send an options update message on connect. 113 117 pub fn subscribe_url(&self, url: &url::Url) -> (url::Url, bool) { ··· 119 173 120 174 (url, false) 121 175 } 176 + 177 + /// Present the SubscriberOptions as a [`SubscriberSourcedMessage`] for serialization. 178 + pub fn as_subscriber_sourced_message<'a>(&'a self) -> SubscriberSourcedMessage<'a> { 179 + SubscriberSourcedMessage::OptionsUpdate(self.into()) 180 + } 181 + 182 + fn subscribe_url_len(&self, base: &url::Url) -> usize { 183 + const WANTED_DIDS_LEN: usize = "wantedDids=".len(); 184 + const WANTED_COLLECTIONS_LEN: usize = "wantedCollections=".len(); 185 + 186 + let (wanted_did_len, wanted_dids_count) = 187 + self.wanted_dids.iter().fold((0, 0), |(len, count), val| { 188 + (len + WANTED_DIDS_LEN + val.len(), count + 1) 189 + }); 190 + 191 + let (wanted_col_len, wanted_col_count) = self 192 + .wanted_collections 193 + .iter() 194 + .fold((0, 0), |(len, count), val| { 195 + (len + WANTED_COLLECTIONS_LEN + val.len(), count + 1) 196 + }); 197 + 198 + let (message_size_len, message_size_count) = match self.max_message_size() { 199 + 0 => (0, 0), 200 + n => (n.to_string().len() + "maxMessageSizeBytes=".len(), 1), 201 + }; 202 + 203 + let param_count = wanted_dids_count + wanted_col_count + message_size_count; 204 + base.as_str().len() + message_size_len + wanted_did_len + wanted_col_len + param_count 205 + } 206 + } 207 + 208 + mod max_message_size { 209 + use serde::{Deserialize, Deserializer, Serializer}; 210 + 211 + pub fn deserialize<'de, D>(deserializer: D) -> Result<i64, D::Error> 212 + where 213 + D: Deserializer<'de>, 214 + { 215 + let value = <i64 as Deserialize>::deserialize(deserializer)?; 216 + Ok(super::normalize_max_message_size(value)) 217 + } 218 + 219 + pub fn serialize<S>(value: &i64, serializer: S) -> Result<S::Ok, S::Error> 220 + where 221 + S: Serializer, 222 + { 223 + serializer.serialize_i64(super::normalize_max_message_size(*value)) 224 + } 225 + } 226 + 227 + fn normalize_max_message_size(value: i64) -> i64 { 228 + value.abs() 229 + } 230 + 231 + /// Subscriber sourced message. 232 + /// 233 + /// Ref: <https://github.com/bluesky-social/jetstream?tab=readme-ov-file#subscriber-sourced-messages> 234 + /// 235 + #[derive(Debug, Serialize)] 236 + #[serde(tag = "type", content = "payload", rename_all = "snake_case")] 237 + pub enum SubscriberSourcedMessage<'a> { 238 + OptionsUpdate(OptionsUpdate<'a>), 239 + } 240 + 241 + impl<'a> SubscriberSourcedMessage<'a> { 242 + /// Serialize the [`SubscriberSourcedMessage`] to JSON. 243 + pub fn to_json(&self) -> String { 244 + serde_json::to_string(self).expect("SubscriberSourcedMessage should be serializable") 245 + } 246 + } 247 + 248 + #[derive(Debug, Serialize)] 249 + #[serde(rename_all = "camelCase")] 250 + pub struct OptionsUpdate<'a> { 251 + wanted_collections: &'a HashSet<Box<Nsid>>, 252 + wanted_dids: &'a HashSet<Box<Did>>, 253 + #[serde(with = "max_message_size")] 254 + max_message_size_bytes: &'a i64, 255 + } 256 + 257 + impl<'a> From<&'a SubscriberOptions> for OptionsUpdate<'a> { 258 + fn from(value: &'a SubscriberOptions) -> Self { 259 + let SubscriberOptions { 260 + wanted_collections, 261 + wanted_dids, 262 + max_message_size_bytes, 263 + cursor: _, 264 + } = value; 265 + Self { 266 + wanted_collections, 267 + wanted_dids, 268 + max_message_size_bytes, 269 + } 270 + } 122 271 } 123 272 124 273 #[cfg(test)] 125 274 mod tests { 126 - use atproto::Nsid; 275 + use std::collections::HashSet; 276 + 277 + use atproto::{Did, Nsid}; 127 278 128 279 use super::SubscriberOptions; 129 280 ··· 280 237 options.subscribe_url_len(&url), 281 238 "wss://example.url/subscribe?wantedCollections=sh.tangled.*&wantedCollections=app.bsky.*&maxMessageSizeBytes=1000000".len() 282 239 ); 240 + } 241 + 242 + #[test] 243 + fn serialize_default_options() { 244 + let options = SubscriberOptions::default(); 245 + let serialized = options.as_subscriber_sourced_message().to_json(); 246 + assert_eq!( 247 + serialized, 248 + r#"{"type":"options_update","payload":{"wantedCollections":[],"wantedDids":[],"maxMessageSizeBytes":0}}"# 249 + ); 250 + } 251 + 252 + #[test] 253 + fn serialize_example_options() { 254 + let options = SubscriberOptions { 255 + wanted_collections: HashSet::from_iter( 256 + [Nsid::from_static("app.bsky.feed.post").into()], 257 + ), 258 + wanted_dids: HashSet::from_iter([ 259 + Did::from_static("did:plc:q6gjnaw2blty4crticxkmujt").into() 260 + ]), 261 + max_message_size_bytes: 1000000, 262 + ..Default::default() 263 + }; 264 + let serialized = options.as_subscriber_sourced_message().to_json(); 265 + assert_eq!( 266 + serialized, 267 + r#"{"type":"options_update","payload":{"wantedCollections":["app.bsky.feed.post"],"wantedDids":["did:plc:q6gjnaw2blty4crticxkmujt"],"maxMessageSizeBytes":1000000}}"# 268 + ) 283 269 } 284 270 }
+73 -22
crates/jetstream/src/task.rs
··· 1 - use crate::{client::ClientCommand, metrics::Metrics, subscriber_options::SubscriberOptions}; 1 + use crate::{ 2 + client::ClientCommand, client_options::ClientOptions, metrics::Metrics, 3 + subscriber_options::SubscriberOptions, 4 + }; 2 5 use bytes::Bytes; 3 6 use futures_util::{SinkExt, StreamExt}; 4 7 use serde::Deserialize; 5 8 use std::{ 9 + pin::Pin, 6 10 sync::{Arc, Mutex}, 7 11 time::Duration, 8 12 }; ··· 16 12 tungstenite::{ClientRequestBuilder, Error as TungsteniteError, Message, http::Uri}, 17 13 }; 18 14 use tokio_util::sync::CancellationToken; 15 + use url::Url; 19 16 20 17 #[cfg(feature = "zstd")] 21 18 const ZSTD_DICTIONARY: &[u8] = include_bytes!("dictionary"); 22 - 23 - /// Duration to rewind the cursor by when reconnecting a borken subscription. 24 - const REWIND: Duration = Duration::from_secs(1); 25 19 26 20 const RECV_TIMEOUT: Duration = Duration::from_secs(45); 27 21 ··· 36 34 metrics: Metrics, 37 35 } 38 36 37 + pub struct JetstreamTask(pub(crate) Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>); 38 + 39 + impl Future for JetstreamTask { 40 + type Output = (); 41 + 42 + fn poll( 43 + self: std::pin::Pin<&mut Self>, 44 + cx: &mut std::task::Context<'_>, 45 + ) -> std::task::Poll<Self::Output> { 46 + match self.get_mut().0 { 47 + Some(ref mut fut) => fut.as_mut().poll(cx), 48 + None => unreachable!(), 49 + } 50 + } 51 + } 52 + 53 + trait Jitter { 54 + fn jitter(&self, frac: f32) -> Self; 55 + } 56 + 57 + impl Jitter for f32 { 58 + fn jitter(&self, frac: f32) -> Self { 59 + self * (1.0 + 2.0 * frac * (fastrand::f32() - 0.5)) 60 + } 61 + } 62 + 63 + impl Jitter for Duration { 64 + fn jitter(&self, frac: f32) -> Self { 65 + Duration::from_secs_f32(self.as_secs_f32().jitter(frac).abs()) 66 + } 67 + } 68 + 39 69 pub async fn jetstream_subscriber( 40 70 event_tx: flume::Sender<Bytes>, 41 71 client_rx: flume::Receiver<ClientCommand>, 42 72 metrics: Metrics, 43 - instance: url::Url, 44 - options: Arc<Mutex<SubscriberOptions>>, 73 + client_options: ClientOptions, 74 + subscriber_options: Arc<Mutex<SubscriberOptions>>, 45 75 shutdown: CancellationToken, 46 76 ) { 47 77 let mut state = State { metrics }; 78 + let mut reconnect_backoff = client_options.reconnect_backoff_min; 79 + 80 + let mut instance_idx = fastrand::usize(0..client_options.instances.len()); 48 81 49 82 'outer: loop { 50 - let (subscribe_url, require_hello) = options.lock().unwrap().subscribe_url(&instance); 83 + let instance = Url::parse(&client_options.instances[instance_idx]).unwrap(); 84 + let (subscribe_url, require_hello) = 85 + subscriber_options.lock().unwrap().subscribe_url(&instance); 86 + 51 87 tracing::debug!(%subscribe_url, "connecting to jetstream"); 52 88 let uri: Uri = subscribe_url 53 89 .as_str() ··· 99 59 let request = ClientRequestBuilder::new(uri); 100 60 101 61 let (socket, _) = match shutdown.run_until_cancelled(connect_async(request)).await { 102 - Some(Ok(socket)) => socket, 62 + Some(Ok(socket)) => { 63 + reconnect_backoff = client_options.reconnect_backoff_min; 64 + socket 65 + } 103 66 Some(Err(error)) => { 104 67 match error { 105 68 tokio_tungstenite::tungstenite::Error::Http(response) => { ··· 112 69 } 113 70 error => tracing::error!(?error), 114 71 } 115 - tokio::time::sleep(Duration::from_secs(5)).await; 72 + 73 + let delay = reconnect_backoff.jitter(client_options.reconnect_backoff_jitter); 74 + tracing::debug!(?delay, "connection failed, waiting to reconnect"); 75 + tokio::time::sleep(delay).await; 76 + 77 + reconnect_backoff = 78 + std::cmp::min(reconnect_backoff * 2, client_options.reconnect_backoff_max); 79 + instance_idx = (instance_idx + 1).rem_euclid(client_options.instances.len()); 80 + 116 81 continue; 117 82 } 118 83 None => break, ··· 130 79 let (mut write, mut read) = socket.split(); 131 80 132 81 if require_hello 133 - && let Err(e) = send_options_update::<_, TungsteniteError>(&mut write, &options).await 82 + && let Err(error) = 83 + send_options_update::<_, TungsteniteError>(&mut write, &subscriber_options).await 134 84 { 135 - tracing::error!(?e, "failed to send subscibe options update"); 85 + tracing::error!(?error, "failed to send subscribe options update"); 136 86 continue; 137 87 } 138 88 ··· 153 101 ReadOutcome::Timeout => { 154 102 tracing::error!("time since last received message exceeds threshold"); 155 103 state.metrics.modify(|mut data| data.timeouts += 1); 156 - rewind_cursor(&options); 104 + rewind_cursor(&subscriber_options, client_options.rewind); 157 105 break; 158 106 159 107 }, ··· 166 114 Ok(command) = client_rx.recv_async() => { 167 115 match command { 168 116 ClientCommand::SubscriberOptionsUpdate(complete) => { 169 - let result = send_options_update::<_, TungsteniteError>(&mut write, &options) 117 + let result = send_options_update::<_, TungsteniteError>(&mut write, &subscriber_options) 170 118 .await 171 119 .map_err(JetstreamTaskError::OptionsUpdate); 172 120 ··· 220 168 } 221 169 222 170 // Update the cursor since the message has been dispatched. 223 - set_cursor(&options, new_cursor); 171 + set_cursor(&subscriber_options, new_cursor); 224 172 } 225 173 226 174 state.metrics.modify(|mut data| data.disconnects += 1); ··· 326 274 Ok(ReadOutcome::Closed) 327 275 } 328 276 329 - fn rewind_cursor(options: &Mutex<SubscriberOptions>) { 277 + fn rewind_cursor(options: &Mutex<SubscriberOptions>, amount: Duration) { 330 278 let mut options = options.lock().unwrap(); 331 279 if let Some(value) = &mut options.cursor { 332 - *value = value.saturating_sub(REWIND.as_micros()) 280 + *value = value.saturating_sub(amount.as_micros()) 333 281 } 334 282 } 335 283 ··· 346 294 S: SinkExt<Message> + Unpin, 347 295 E: From<S::Error>, 348 296 { 349 - use crate::subscriber_options::SubscriberSourcedMessage; 350 - 351 - let update = { 352 - let options = options.lock().unwrap(); 353 - SubscriberSourcedMessage::OptionsUpdate(&options).to_json() 354 - }; 297 + let update = options 298 + .lock() 299 + .unwrap() 300 + .as_subscriber_sourced_message() 301 + .to_json(); 355 302 356 303 tracing::debug!(%update, "sending options update"); 357 304 sink.send(Message::Text(update.into())).await?;
+44 -25
crates/knot/src/main.rs
··· 6 6 Router, 7 7 http::{HeaderName, Request, Response}, 8 8 }; 9 + use futures_util::StreamExt; 9 10 use identity::Resolver; 11 + use jetstream::client_config::JetstreamConfig; 10 12 use knot::{ 11 13 model::{Knot, KnotState, config::KnotConfiguration}, 12 14 services::database::DataStore, ··· 34 32 }; 35 33 use tracing::{Span, field::Empty}; 36 34 use tracing_subscriber::{EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _}; 37 - use url::Url; 38 35 39 36 fn main() { 40 37 use knot::private::Hook; ··· 196 195 .without_v07_checks() 197 196 .merge(knot::public::router()); 198 197 199 - let jetstream = { 198 + let jetstream_config = { 200 199 let cursor = db 201 200 .get_jetstream_cursor() 202 201 .await ··· 207 206 tracing::info!(?cursor, ?cursor_us, "found jetstream cursor"); 208 207 } 209 208 210 - jetstream::builder() 211 - .collection("sh.tangled.publicKey".try_into()?) 212 - .collection("sh.tangled.repo".try_into()?) 213 - .cursor(cursor.map(|(_, ts)| ts)) 214 - .build(Url::parse(jetstream::PUBLIC_JETSTREAM_INSTANCES[0])?) 209 + let mut config = JetstreamConfig::default().with_cursor(cursor.map(|(_, us)| us)); 210 + 211 + let mut records = sqlx::query!( 212 + "SELECT DISTINCT did FROM knot_member UNION SELECT DISTINCT did FROM repository_member" 213 + ) 214 + .fetch(&pool); 215 + while let Some(record) = records.next().await { 216 + let record = record.unwrap(); 217 + let did = record 218 + .did 219 + .parse() 220 + .expect("DID from database should be valid"); 221 + config.subscriber_options.add_did(did).unwrap(); 222 + } 223 + 224 + config 215 225 }; 216 226 217 227 let mut service = JoinSet::new(); ··· 248 236 .private_sockets(&private_addrs) 249 237 .build()?; 250 238 251 - let knot: Knot = KnotState::new(config, resolver, public_http, jetstream, db).into(); 239 + let (knot_state, knot_tasks) = 240 + KnotState::new(config, resolver, public_http, jetstream_config, db); 241 + let knot: Knot = knot_state.into(); 252 242 243 + let owner = resolved_owner.as_str(); 244 + match sqlx::query!( 245 + "INSERT INTO knot_member (did) VALUES (?) ON CONFLICT (did) DO NOTHING RETURNING did", 246 + owner 247 + ) 248 + .fetch_optional(&pool) 249 + .await 253 250 { 254 - let knot = knot.clone(); 255 - tokio::spawn(async move { 256 - let owner = resolved_owner.as_str(); 257 - match sqlx::query!( 258 - "INSERT INTO knot_member (did) VALUES (?) ON CONFLICT (did) DO NOTHING RETURNING did", 259 - owner 260 - ) 261 - .fetch_optional(&pool) 262 - .await { 263 - Ok(Some(_)) => { 264 - knot.backfill_public_keys().await.unwrap(); 265 - knot.backfill_repositories(&resolved_owner).await.unwrap(); 266 - }, 267 - Ok(None) => tracing::debug!("skipping public key backfill"), 268 - Err(error) => tracing::error!(?error) 269 - } 270 - }); 251 + Ok(Some(_)) => { 252 + tracing::info!("Appears to be first run, fetching records"); 253 + knot.backfill_public_keys().await.unwrap(); 254 + knot.backfill_repositories(&resolved_owner).await.unwrap(); 255 + } 256 + Ok(None) => {} 257 + Err(error) => tracing::error!(?error), 271 258 } 272 259 273 260 let router = router ··· 313 302 for socket in sockets { 314 303 serve(&mut service, socket, router.clone()).await; 315 304 } 305 + 306 + service.spawn(async move { 307 + tracing::debug!("starting knot tasks"); 308 + for _task in knot_tasks.join_all().await { 309 + // 310 + } 311 + Ok(()) 312 + }); 316 313 317 314 for task in service.join_all().await { 318 315 if let Err(error) = task {
+22 -10
crates/knot/src/model/knot_state.rs
··· 9 9 use auth::jwt; 10 10 use bytes::Bytes; 11 11 use identity::{HttpClient, Resolver}; 12 - use jetstream::JetstreamClient; 12 + use jetstream::{JetstreamClient, client_config::JetstreamConfig}; 13 13 use lexicon::sh::tangled::{git::RefUpdate, repo::Repo}; 14 14 use rayon::{ThreadPool, ThreadPoolBuilder}; 15 15 use serde::Serialize; 16 16 use time::OffsetDateTime; 17 + use tokio::task::JoinSet; 17 18 use tokio_stream::StreamExt as _; 18 19 19 20 use crate::{ ··· 87 86 config: KnotConfiguration, 88 87 resolver: Resolver, 89 88 public_http: HttpClient, 90 - (jetstream, jetstream_rx): (JetstreamClient, jetstream::Receiver), 89 + jetstream_config: JetstreamConfig, 91 90 database: DataStore, 92 - ) -> Arc<Self> { 91 + ) -> (Arc<Self>, JoinSet<()>) { 93 92 let pool = ThreadPoolBuilder::new() 94 93 .build() 95 94 .expect("Failed to build thread pool"); 96 95 97 96 let (events, _) = tokio::sync::broadcast::channel(16); 97 + 98 + let (jetstream, jetstream_rx, jetstream_task) = jetstream_config.connect(); 98 99 99 100 let inner = Arc::new(Self { 100 101 config, ··· 113 110 repo_handle_cache: Default::default(), 114 111 }); 115 112 116 - let state = Arc::clone(&inner); 117 - tokio::task::spawn(crate::services::jetstream::jetstream_task( 118 - Arc::clone(&state), 119 - jetstream_rx, 120 - )); 113 + let mut tasks = JoinSet::new(); 121 114 122 - tokio::task::spawn(async move { 115 + let _ = { 116 + let state = Arc::clone(&inner); 117 + tasks.spawn(async move { 118 + tokio::join!( 119 + crate::services::jetstream::jetstream_task(Arc::clone(&state), jetstream_rx,), 120 + jetstream_task 121 + ); 122 + 123 + panic!("jetstream consumer/task completed"); 124 + }) 125 + }; 126 + 127 + let state = Arc::clone(&inner); 128 + tasks.spawn(async move { 123 129 use tokio_stream::wrappers::IntervalStream; 124 130 125 131 const EXPIRY_SLOP: i64 = 60; ··· 150 138 } 151 139 }); 152 140 153 - inner 141 + (inner, tasks) 154 142 } 155 143 156 144 /// Return a reference to the identity resolver.
+4 -2
crates/knot/src/services/jetstream.rs
··· 7 7 use std::{sync::Arc, time::Duration}; 8 8 use tokio::time::Instant; 9 9 10 - pub async fn jetstream_task(state: Arc<KnotState>, jetstream_rx: jetstream::Receiver) { 10 + pub async fn jetstream_task(state: Arc<KnotState>, jetstream_rx: jetstream::JetstreamReceiver) { 11 11 let mut last_jetstream_sync: Option<Instant> = None; 12 12 13 13 while let Some(event) = jetstream_rx.recv_async().await { ··· 19 19 continue; 20 20 } 21 21 }; 22 + 23 + tracing::debug!(?event); 22 24 23 25 match &event { 24 26 Event::Commit(commit) => match commit.collection() { ··· 52 50 } 53 51 } 54 52 55 - tracing::warn!("jetstream consumer task completed"); 53 + panic!("jetstream consumer finished"); 56 54 } 57 55 58 56 async fn process_public_key<'db, 'c, 'k>(