···11+[package]
22+authors = ["videah <videah@selfish.systems>", "phil <phil@bad-example.com>"]
33+name = "jetstream"
44+version = "0.1.1"
55+edition = "2021"
66+license = "MIT"
77+description = "Library for easily interacting with and consuming the Bluesky Jetstream service."
88+repository = "https://github.com/at-microcosm/links"
99+readme = "README.md"
1010+1111+[dependencies]
1212+async-trait = "0.1.83"
1313+atrium-api = { version = "0.24.7", default-features = false, features = [
1414+ "namespace-appbsky",
1515+] }
1616+tokio = { version = "1.41.1", features = ["full", "sync", "time"] }
1717+tokio-tungstenite = { version = "0.24.0", features = [
1818+ "connect",
1919+ "native-tls",
2020+ "url",
2121+] }
2222+futures-util = "0.3.31"
2323+url = "2.5.4"
2424+serde = { version = "1.0.215", features = ["derive"] }
2525+serde_json = "1.0.132"
2626+chrono = "0.4.38"
2727+zstd = "0.13.2"
2828+thiserror = "2.0.3"
2929+flume = "0.11.1"
3030+log = "0.4.22"
3131+tokio-util = "0.7.13"
3232+3333+[dev-dependencies]
3434+anyhow = "1.0.93"
3535+clap = { version = "4.5.20", features = ["derive"] }
+21
jetstream/LICENSE
···11+MIT License
22+33+Copyright (c) 2024 videah
44+55+Permission is hereby granted, free of charge, to any person obtaining a copy
66+of this software and associated documentation files (the "Software"), to deal
77+in the Software without restriction, including without limitation the rights
88+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
99+copies of the Software, and to permit persons to whom the Software is
1010+furnished to do so, subject to the following conditions:
1111+1212+The above copyright notice and this permission notice shall be included in all
1313+copies or substantial portions of the Software.
1414+1515+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
1616+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
1717+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
1818+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
1919+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
2020+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121+SOFTWARE.
+69
jetstream/README.md
···11+# jetstream-oxide
22+33+[](https://crates.io/crates/jetstream-oxide)
44+[](https://docs.rs/jetstream-oxide/latest/jetstream_oxide)
55+66+A typed Rust library for easily interacting with and consuming the
77+Bluesky [Jetstream](https://github.com/bluesky-social/jetstream)
88+service.
99+1010+```rust
1111+let config = JetstreamConfig {
1212+ endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
1313+ compression: JetstreamCompression::Zstd,
1414+ ..Default::default()
1515+};
1616+1717+let jetstream = JetstreamConnector::new(config).unwrap();
1818+let receiver = jetstream.connect().await?;
1919+2020+while let Ok(event) = receiver.recv_async().await {
2121+ if let Commit(commit) = event {
2222+ match commit {
2323+ CommitEvent::Create { info, commit } => {
2424+ println!("Received create event: {:#?}", info);
2525+ }
2626+ CommitEvent::Update { info, commit } => {
2727+ println!("Received update event: {:#?}", info);
2828+ }
2929+ CommitEvent::Delete { info, commit } => {
3030+ println!("Received delete event: {:#?}", info);
3131+ }
3232+ }
3333+ }
3434+}
3535+```
3636+3737+## Example
3838+3939+A small example CLI utility to show how to use this crate can be found in the `examples` directory. To run it, use the
4040+following command:
4141+4242+```sh
4343+cargo run --example basic -- --nsid "app.bsky.feed.post"
4444+```
4545+4646+This will display a real-time feed of every single post that is being made or deleted in the entire Bluesky network,
4747+right in your terminal!
4848+4949+You can filter it down to just specific accounts like this:
5050+5151+```sh
5252+cargo run --example basic -- \
5353+--nsid "app.bsky.feed.post" \
5454+--did "did:plc:inze6wrmsm7pjl7yta3oig77"
5555+```
5656+5757+This listens for posts that *I personally make*. You can substitute your own DID and make a few test posts yourself if
5858+you'd
5959+like of course!
6060+6161+6262+## Running `rustfmt` (requires nightly)
6363+6464+```bash
6565+# get nightly set up
6666+rustup toolchain install nightly --allow-downgrade -c rustfmt
6767+# run the nightly version of fmt
6868+cargo +nightly fmt
6969+```
+53
jetstream/examples/arbitrary_record.rs
···11+//! An example of how to listen for create/delete events on a specific DID and potentialy unknown
22+//! NSID
33+44+use atrium_api::types::string;
55+use clap::Parser;
66+use jetstream::{
77+ events::{
88+ commit::CommitEvent,
99+ JetstreamEvent::Commit,
1010+ },
1111+ DefaultJetstreamEndpoints,
1212+ JetstreamCompression,
1313+ JetstreamConfig,
1414+ JetstreamConnector,
1515+};
1616+1717+#[derive(Parser, Debug)]
1818+#[command(version, about, long_about = None)]
1919+struct Args {
2020+ /// The DIDs to listen for events on, if not provided we will listen for all DIDs.
2121+ #[arg(short, long)]
2222+ did: Option<Vec<string::Did>>,
2323+ /// The NSID for the collection to listen for (e.g. `blue.flashes.feed.post`).
2424+ #[arg(short, long)]
2525+ nsid: string::Nsid,
2626+}
2727+2828+#[tokio::main]
2929+async fn main() -> anyhow::Result<()> {
3030+ let args = Args::parse();
3131+3232+ let dids = args.did.unwrap_or_default();
3333+ let config: JetstreamConfig<serde_json::Value> = JetstreamConfig {
3434+ endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
3535+ wanted_collections: vec![args.nsid.clone()],
3636+ wanted_dids: dids.clone(),
3737+ compression: JetstreamCompression::Zstd,
3838+ ..Default::default()
3939+ };
4040+4141+ let jetstream: JetstreamConnector<serde_json::Value> = JetstreamConnector::new(config)?;
4242+ let receiver = jetstream.connect().await?;
4343+4444+ println!("Listening for '{}' events on DIDs: {:?}", &*args.nsid, dids);
4545+4646+ while let Ok(event) = receiver.recv_async().await {
4747+ if let Commit(CommitEvent::Create { commit, .. }) = event {
4848+ println!("got record: {:?}", commit.record);
4949+ }
5050+ }
5151+5252+ Ok(())
5353+}
+68
jetstream/examples/basic.rs
···11+//! A very basic example of how to listen for create/delete events on a specific DID and NSID.
22+33+use atrium_api::{
44+ record::KnownRecord::AppBskyFeedPost,
55+ types::string,
66+};
77+use clap::Parser;
88+use jetstream::{
99+ events::{
1010+ commit::CommitEvent,
1111+ JetstreamEvent::Commit,
1212+ },
1313+ DefaultJetstreamEndpoints,
1414+ JetstreamCompression,
1515+ JetstreamConfig,
1616+ JetstreamConnector,
1717+};
1818+1919+#[derive(Parser, Debug)]
2020+#[command(version, about, long_about = None)]
2121+struct Args {
2222+ /// The DIDs to listen for events on, if not provided we will listen for all DIDs.
2323+ #[arg(short, long)]
2424+ did: Option<Vec<string::Did>>,
2525+ /// The NSID for the collection to listen for (e.g. `app.bsky.feed.post`).
2626+ #[arg(short, long)]
2727+ nsid: string::Nsid,
2828+}
2929+3030+#[tokio::main]
3131+async fn main() -> anyhow::Result<()> {
3232+ let args = Args::parse();
3333+3434+ let dids = args.did.unwrap_or_default();
3535+ let config = JetstreamConfig {
3636+ endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
3737+ wanted_collections: vec![args.nsid.clone()],
3838+ wanted_dids: dids.clone(),
3939+ compression: JetstreamCompression::Zstd,
4040+ ..Default::default()
4141+ };
4242+4343+ let jetstream = JetstreamConnector::new(config)?;
4444+ let receiver = jetstream.connect().await?;
4545+4646+ println!("Listening for '{}' events on DIDs: {:?}", &*args.nsid, dids);
4747+4848+ while let Ok(event) = receiver.recv_async().await {
4949+ if let Commit(commit) = event {
5050+ match commit {
5151+ CommitEvent::Create { info: _, commit } => {
5252+ if let AppBskyFeedPost(record) = commit.record {
5353+ println!(
5454+ "New post created! ({})\n\n'{}'",
5555+ commit.info.rkey, record.text
5656+ );
5757+ }
5858+ }
5959+ CommitEvent::Delete { info: _, commit } => {
6060+ println!("A post has been deleted. ({})", commit.rkey);
6161+ }
6262+ _ => {}
6363+ }
6464+ }
6565+ }
6666+6767+ Ok(())
6868+}
···11+//! Various error types.
22+use std::io;
33+44+use thiserror::Error;
55+66+/// Possible errors that can occur when a [JetstreamConfig](crate::JetstreamConfig) that is passed
77+/// to a [JetstreamConnector](crate::JetstreamConnector) is invalid.
88+#[derive(Error, Debug)]
99+pub enum ConfigValidationError {
1010+ #[error("too many wanted collections: {0} > 100")]
1111+ TooManyWantedCollections(usize),
1212+ #[error("too many wanted DIDs: {0} > 10,000")]
1313+ TooManyDids(usize),
1414+}
1515+1616+/// Possible errors that can occur in the process of connecting to a Jetstream instance over
1717+/// WebSockets.
1818+///
1919+/// See [JetstreamConnector::connect](crate::JetstreamConnector::connect).
2020+#[derive(Error, Debug)]
2121+pub enum ConnectionError {
2222+ #[error("invalid endpoint: {0}")]
2323+ InvalidEndpoint(#[from] url::ParseError),
2424+ #[error("failed to connect to Jetstream instance: {0}")]
2525+ WebSocketFailure(#[from] tokio_tungstenite::tungstenite::Error),
2626+ #[error("the Jetstream config is invalid (this really should not happen here): {0}")]
2727+ InvalidConfig(#[from] ConfigValidationError),
2828+}
2929+3030+/// Possible errors that can occur when receiving events from a Jetstream instance over WebSockets.
3131+///
3232+/// See [websocket_task](crate::websocket_task).
3333+#[derive(Error, Debug)]
3434+pub enum JetstreamEventError {
3535+ #[error("received websocket message that could not be deserialized as JSON: {0}")]
3636+ ReceivedMalformedJSON(#[from] serde_json::Error),
3737+ #[error("failed to load built-in zstd dictionary for decoding: {0}")]
3838+ CompressionDictionaryError(io::Error),
3939+ #[error("failed to decode zstd-compressed message: {0}")]
4040+ CompressionDecoderError(io::Error),
4141+ #[error("all receivers were dropped but the websocket connection failed to close cleanly")]
4242+ WebSocketCloseFailure,
4343+}
+40
jetstream/src/events/account.rs
···11+use chrono::Utc;
22+use serde::Deserialize;
33+44+use crate::{
55+ events::EventInfo,
66+ exports,
77+};
88+99+/// An event representing a change to an account.
1010+#[derive(Deserialize, Debug)]
1111+pub struct AccountEvent {
1212+ /// Basic metadata included with every event.
1313+ #[serde(flatten)]
1414+ pub info: EventInfo,
1515+ /// Account specific data bundled with this event.
1616+ pub account: AccountData,
1717+}
1818+1919+/// Account specific data bundled with an account event.
2020+#[derive(Deserialize, Debug)]
2121+pub struct AccountData {
2222+ /// Whether the account is currently active.
2323+ pub active: bool,
2424+ /// The DID of the account.
2525+ pub did: exports::Did,
2626+ pub seq: u64,
2727+ pub time: chrono::DateTime<Utc>,
2828+ /// If `active` is `false` this will be present to explain why the account is inactive.
2929+ pub status: Option<AccountStatus>,
3030+}
3131+3232+/// The possible reasons an account might be listed as inactive.
3333+#[derive(Deserialize, Debug)]
3434+#[serde(rename_all = "lowercase")]
3535+pub enum AccountStatus {
3636+ Deactivated,
3737+ Deleted,
3838+ Suspended,
3939+ TakenDown,
4040+}
+60
jetstream/src/events/commit.rs
···11+use serde::Deserialize;
22+33+use crate::{
44+ events::EventInfo,
55+ exports,
66+};
77+88+/// An event representing a repo commit, which can be a `create`, `update`, or `delete` operation.
99+#[derive(Deserialize, Debug)]
1010+#[serde(untagged, rename_all = "snake_case")]
1111+pub enum CommitEvent<R> {
1212+ Create {
1313+ #[serde(flatten)]
1414+ info: EventInfo,
1515+ commit: CommitData<R>,
1616+ },
1717+ Update {
1818+ #[serde(flatten)]
1919+ info: EventInfo,
2020+ commit: CommitData<R>,
2121+ },
2222+ Delete {
2323+ #[serde(flatten)]
2424+ info: EventInfo,
2525+ commit: CommitInfo,
2626+ },
2727+}
2828+2929+/// The type of commit operation that was performed.
3030+#[derive(Deserialize, Debug)]
3131+#[serde(rename_all = "snake_case")]
3232+pub enum CommitType {
3333+ Create,
3434+ Update,
3535+ Delete,
3636+}
3737+3838+/// Basic commit specific info bundled with every event, also the only data included with a `delete`
3939+/// operation.
4040+#[derive(Deserialize, Debug)]
4141+pub struct CommitInfo {
4242+ /// The type of commit operation that was performed.
4343+ pub operation: CommitType,
4444+ pub rev: String,
4545+ pub rkey: String,
4646+ /// The NSID of the record type that this commit is associated with.
4747+ pub collection: exports::Nsid,
4848+}
4949+5050+/// Detailed data bundled with a commit event. This data is only included when the event is
5151+/// `create` or `update`.
5252+#[derive(Deserialize, Debug)]
5353+pub struct CommitData<R> {
5454+ #[serde(flatten)]
5555+ pub info: CommitInfo,
5656+ /// The CID of the record that was operated on.
5757+ pub cid: exports::Cid,
5858+ /// The record that was operated on.
5959+ pub record: R,
6060+}
+28
jetstream/src/events/identity.rs
···11+use chrono::Utc;
22+use serde::Deserialize;
33+44+use crate::{
55+ events::EventInfo,
66+ exports,
77+};
88+99+/// An event representing a change to an identity.
1010+#[derive(Deserialize, Debug)]
1111+pub struct IdentityEvent {
1212+ /// Basic metadata included with every event.
1313+ #[serde(flatten)]
1414+ pub info: EventInfo,
1515+ /// Identity specific data bundled with this event.
1616+ pub identity: IdentityData,
1717+}
1818+1919+/// Identity specific data bundled with an identity event.
2020+#[derive(Deserialize, Debug)]
2121+pub struct IdentityData {
2222+ /// The DID of the identity.
2323+ pub did: exports::Did,
2424+ /// The handle associated with the identity.
2525+ pub handle: Option<exports::Handle>,
2626+ pub seq: u64,
2727+ pub time: chrono::DateTime<Utc>,
2828+}
+31
jetstream/src/events/mod.rs
···11+pub mod account;
22+pub mod commit;
33+pub mod identity;
44+55+use serde::Deserialize;
66+77+use crate::exports;
88+99+/// Basic data that is included with every event.
1010+#[derive(Deserialize, Debug)]
1111+pub struct EventInfo {
1212+ pub did: exports::Did,
1313+ pub time_us: u64,
1414+ pub kind: EventKind,
1515+}
1616+1717+#[derive(Deserialize, Debug)]
1818+#[serde(untagged)]
1919+pub enum JetstreamEvent<R> {
2020+ Commit(commit::CommitEvent<R>),
2121+ Identity(identity::IdentityEvent),
2222+ Account(account::AccountEvent),
2323+}
2424+2525+#[derive(Deserialize, Debug)]
2626+#[serde(rename_all = "snake_case")]
2727+pub enum EventKind {
2828+ Commit,
2929+ Identity,
3030+ Account,
3131+}
+8
jetstream/src/exports.rs
···11+//! Useful exports for third-party crates used by this project.
22+33+pub use atrium_api::types::string::{
44+ Cid,
55+ Did,
66+ Handle,
77+ Nsid,
88+};
+415
jetstream/src/lib.rs
···11+pub mod error;
22+pub mod events;
33+pub mod exports;
44+55+use std::{
66+ io::{
77+ Cursor,
88+ Read,
99+ },
1010+ marker::PhantomData,
1111+ sync::Arc,
1212+ time::{
1313+ Duration,
1414+ Instant,
1515+ },
1616+};
1717+1818+use atrium_api::record::KnownRecord;
1919+use chrono::Utc;
2020+use futures_util::{
2121+ stream::StreamExt,
2222+ SinkExt,
2323+};
2424+use serde::de::DeserializeOwned;
2525+use tokio::{
2626+ net::TcpStream,
2727+ sync::Mutex,
2828+};
2929+use tokio_tungstenite::{
3030+ connect_async,
3131+ tungstenite::Message,
3232+ MaybeTlsStream,
3333+ WebSocketStream,
3434+};
3535+use tokio_util::sync::CancellationToken;
3636+use url::Url;
3737+use zstd::dict::DecoderDictionary;
3838+3939+use crate::{
4040+ error::{
4141+ ConfigValidationError,
4242+ ConnectionError,
4343+ JetstreamEventError,
4444+ },
4545+ events::JetstreamEvent,
4646+};
4747+4848+/// The Jetstream endpoints officially provided by Bluesky themselves.
4949+///
5050+/// There are no guarantees that these endpoints will always be available, but you are free
5151+/// to run your own Jetstream instance in any case.
5252+pub enum DefaultJetstreamEndpoints {
5353+ /// `jetstream1.us-east.bsky.network`
5454+ USEastOne,
5555+ /// `jetstream2.us-east.bsky.network`
5656+ USEastTwo,
5757+ /// `jetstream1.us-west.bsky.network`
5858+ USWestOne,
5959+ /// `jetstream2.us-west.bsky.network`
6060+ USWestTwo,
6161+}
6262+6363+impl From<DefaultJetstreamEndpoints> for String {
6464+ fn from(endpoint: DefaultJetstreamEndpoints) -> Self {
6565+ match endpoint {
6666+ DefaultJetstreamEndpoints::USEastOne => {
6767+ "wss://jetstream1.us-east.bsky.network/subscribe".to_owned()
6868+ }
6969+ DefaultJetstreamEndpoints::USEastTwo => {
7070+ "wss://jetstream2.us-east.bsky.network/subscribe".to_owned()
7171+ }
7272+ DefaultJetstreamEndpoints::USWestOne => {
7373+ "wss://jetstream1.us-west.bsky.network/subscribe".to_owned()
7474+ }
7575+ DefaultJetstreamEndpoints::USWestTwo => {
7676+ "wss://jetstream2.us-west.bsky.network/subscribe".to_owned()
7777+ }
7878+ }
7979+ }
8080+}
8181+8282+/// The maximum number of wanted collections that can be requested on a single Jetstream connection.
8383+const MAX_WANTED_COLLECTIONS: usize = 100;
8484+/// The maximum number of wanted DIDs that can be requested on a single Jetstream connection.
8585+const MAX_WANTED_DIDS: usize = 10_000;
8686+8787+/// The custom `zstd` dictionary used for decoding compressed Jetstream messages.
8888+///
8989+/// Sourced from the [official Bluesky Jetstream repo.](https://github.com/bluesky-social/jetstream/tree/main/pkg/models)
9090+const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary");
9191+9292+/// A receiver channel for consuming Jetstream events.
9393+pub type JetstreamReceiver<R> = flume::Receiver<JetstreamEvent<R>>;
9494+9595+/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
9696+type JetstreamSender<R> = flume::Sender<JetstreamEvent<R>>;
9797+9898+/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
9999+/// receive and consume events. See [JetstreamConnector::connect] for more info.
100100+pub struct JetstreamConnector<R: DeserializeOwned> {
101101+ /// The configuration for the Jetstream connection.
102102+ config: JetstreamConfig<R>,
103103+}
104104+105105+pub enum JetstreamCompression {
106106+ /// No compression, just raw plaintext JSON.
107107+ None,
108108+ /// Use the `zstd` compression algorithm, which can result in a ~56% smaller messages on
109109+ /// average. See [here](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#compression) for more info.
110110+ Zstd,
111111+}
112112+113113+impl From<JetstreamCompression> for bool {
114114+ fn from(compression: JetstreamCompression) -> Self {
115115+ match compression {
116116+ JetstreamCompression::None => false,
117117+ JetstreamCompression::Zstd => true,
118118+ }
119119+ }
120120+}
121121+122122+pub struct JetstreamConfig<R: DeserializeOwned = KnownRecord> {
123123+ /// A Jetstream endpoint to connect to with a WebSocket Scheme i.e.
124124+ /// `wss://jetstream1.us-east.bsky.network/subscribe`.
125125+ pub endpoint: String,
126126+ /// A list of collection [NSIDs](https://atproto.com/specs/nsid) to filter events for.
127127+ ///
128128+ /// An empty list will receive events for *all* collections.
129129+ ///
130130+ /// Regardless of desired collections, all subscribers receive
131131+ /// [AccountEvent](events::account::AccountEvent) and
132132+ /// [IdentityEvent](events::identity::Identity) events.
133133+ pub wanted_collections: Vec<exports::Nsid>,
134134+ /// A list of repo [DIDs](https://atproto.com/specs/did) to filter events for.
135135+ ///
136136+ /// An empty list will receive events for *all* repos, which is a lot of events!
137137+ pub wanted_dids: Vec<exports::Did>,
138138+ /// The compression algorithm to request and use for the WebSocket connection (if any).
139139+ pub compression: JetstreamCompression,
140140+ /// An optional timestamp to begin playback from.
141141+ ///
142142+ /// An absent cursor or a cursor from the future will result in live-tail operation.
143143+ ///
144144+ /// When reconnecting, use the time_us from your most recently processed event and maybe
145145+ /// provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback.
146146+ pub cursor: Option<chrono::DateTime<Utc>>,
147147+ /// Marker for record deserializable type.
148148+ ///
149149+ /// See examples/arbitrary_record.rs for an example using serde_json::Value
150150+ ///
151151+ /// You can omit this if you construct `JetstreamConfig { a: b, ..Default::default() }.
152152+ /// If you have to specify it, use `std::marker::PhantomData` with no type parameters.
153153+ pub record_type: PhantomData<R>,
154154+}
155155+156156+impl<R: DeserializeOwned> Default for JetstreamConfig<R> {
157157+ fn default() -> Self {
158158+ JetstreamConfig {
159159+ endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
160160+ wanted_collections: Vec::new(),
161161+ wanted_dids: Vec::new(),
162162+ compression: JetstreamCompression::None,
163163+ cursor: None,
164164+ record_type: PhantomData,
165165+ }
166166+ }
167167+}
168168+169169+impl<R: DeserializeOwned> JetstreamConfig<R> {
170170+ /// Constructs a new endpoint URL with the given [JetstreamConfig] applied.
171171+ pub fn construct_endpoint(&self, endpoint: &str) -> Result<Url, url::ParseError> {
172172+ let did_search_query = self
173173+ .wanted_dids
174174+ .iter()
175175+ .map(|s| ("wantedDids", s.to_string()));
176176+177177+ let collection_search_query = self
178178+ .wanted_collections
179179+ .iter()
180180+ .map(|s| ("wantedCollections", s.to_string()));
181181+182182+ let compression = (
183183+ "compress",
184184+ match self.compression {
185185+ JetstreamCompression::None => "false".to_owned(),
186186+ JetstreamCompression::Zstd => "true".to_owned(),
187187+ },
188188+ );
189189+190190+ let cursor = self
191191+ .cursor
192192+ .map(|c| ("cursor", c.timestamp_micros().to_string()));
193193+194194+ let params = did_search_query
195195+ .chain(collection_search_query)
196196+ .chain(std::iter::once(compression))
197197+ .chain(cursor)
198198+ .collect::<Vec<(&str, String)>>();
199199+200200+ Url::parse_with_params(endpoint, params)
201201+ }
202202+203203+ /// Validates the configuration to make sure it is within the limits of the Jetstream API.
204204+ ///
205205+ /// # Constants
206206+ /// The following constants are used to validate the configuration and should only be changed
207207+ /// if the Jetstream API has itself changed.
208208+ /// - [MAX_WANTED_COLLECTIONS]
209209+ /// - [MAX_WANTED_DIDS]
210210+ pub fn validate(&self) -> Result<(), ConfigValidationError> {
211211+ let collections = self.wanted_collections.len();
212212+ let dids = self.wanted_dids.len();
213213+214214+ if collections > MAX_WANTED_COLLECTIONS {
215215+ return Err(ConfigValidationError::TooManyWantedCollections(collections));
216216+ }
217217+218218+ if dids > MAX_WANTED_DIDS {
219219+ return Err(ConfigValidationError::TooManyDids(dids));
220220+ }
221221+222222+ Ok(())
223223+ }
224224+}
225225+226226+impl<R: DeserializeOwned + Send + 'static> JetstreamConnector<R> {
227227+ /// Create a Jetstream connector with a valid [JetstreamConfig].
228228+ ///
229229+ /// After creation, you can call [connect] to connect to the provided Jetstream instance.
230230+ pub fn new(config: JetstreamConfig<R>) -> Result<Self, ConfigValidationError> {
231231+ // We validate the configuration here so any issues are caught early.
232232+ config.validate()?;
233233+ Ok(JetstreamConnector { config })
234234+ }
235235+236236+ /// Connects to a Jetstream instance as defined in the [JetstreamConfig].
237237+ ///
238238+ /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
239239+ /// of this receiver are dropped, the connection and task are automatically closed.
240240+ pub async fn connect(&self) -> Result<JetstreamReceiver<R>, ConnectionError> {
241241+ // We validate the config again for good measure. Probably not necessary but it can't hurt.
242242+ self.config
243243+ .validate()
244244+ .map_err(ConnectionError::InvalidConfig)?;
245245+246246+ // TODO: Run some benchmarks and look into using a bounded channel instead.
247247+ let (send_channel, receive_channel) = flume::unbounded();
248248+249249+ let configured_endpoint = self
250250+ .config
251251+ .construct_endpoint(&self.config.endpoint)
252252+ .map_err(ConnectionError::InvalidEndpoint)?;
253253+254254+ tokio::task::spawn(async move {
255255+ let max_retries = 30;
256256+ let base_delay_ms = 1_000; // 1 second
257257+ let max_delay_ms = 30_000; // 30 seconds
258258+ let success_threshold_s = 15; // 15 seconds, retry count is reset if we were connected at least this long
259259+260260+ let mut retry_attempt = 0;
261261+ loop {
262262+ let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
263263+264264+ retry_attempt += 1;
265265+ if let Ok((ws_stream, _)) = connect_async(&configured_endpoint).await {
266266+ let t_connected = Instant::now();
267267+ if let Err(e) = websocket_task(dict, ws_stream, send_channel.clone()).await {
268268+ log::error!("Jetstream closed after encountering error: {e:?}");
269269+ } else {
270270+ log::error!("Jetstream connection closed cleanly");
271271+ }
272272+ if t_connected.elapsed() > Duration::from_secs(success_threshold_s) {
273273+ retry_attempt = 0;
274274+ continue;
275275+ }
276276+ }
277277+278278+ if retry_attempt >= max_retries {
279279+ break;
280280+ }
281281+282282+ // Exponential backoff
283283+ let delay_ms = base_delay_ms * (2_u64.pow(retry_attempt));
284284+285285+ log::error!("Connection failed, retrying in {delay_ms}ms...");
286286+ tokio::time::sleep(Duration::from_millis(delay_ms.min(max_delay_ms))).await;
287287+ log::info!("Attempting to reconnect...")
288288+ }
289289+ log::error!("Connection retries exhausted. Jetstream is disconnected.");
290290+ });
291291+292292+ Ok(receive_channel)
293293+ }
294294+}
295295+296296+/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any
297297+/// receivers that are listening for them.
298298+async fn websocket_task<R: DeserializeOwned>(
299299+ dictionary: DecoderDictionary<'_>,
300300+ ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
301301+ send_channel: JetstreamSender<R>,
302302+) -> Result<(), JetstreamEventError> {
303303+ // TODO: Use the write half to allow the user to change configuration settings on the fly.
304304+ let (socket_write, mut socket_read) = ws.split();
305305+ let shared_socket_write = Arc::new(Mutex::new(socket_write));
306306+307307+ let ping_cancellation_token = CancellationToken::new();
308308+ let mut ping_interval = tokio::time::interval(Duration::from_secs(30));
309309+ let ping_cancelled = ping_cancellation_token.clone();
310310+ let ping_shared_socket_write = shared_socket_write.clone();
311311+ tokio::spawn(async move {
312312+ loop {
313313+ ping_interval.tick().await;
314314+ let false = ping_cancelled.is_cancelled() else {
315315+ break;
316316+ };
317317+ log::trace!("Sending ping");
318318+ match ping_shared_socket_write
319319+ .lock()
320320+ .await
321321+ .send(Message::Ping("ping".as_bytes().to_vec()))
322322+ .await
323323+ {
324324+ Ok(_) => (),
325325+ Err(error) => {
326326+ log::error!("Ping failed: {error}");
327327+ break;
328328+ }
329329+ }
330330+ }
331331+ });
332332+333333+ let mut closing_connection = false;
334334+ loop {
335335+ match socket_read.next().await {
336336+ Some(Ok(message)) => {
337337+ match message {
338338+ Message::Text(json) => {
339339+ let event = serde_json::from_str(&json)
340340+ .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
341341+342342+ if send_channel.send(event).is_err() {
343343+ // We can assume that all receivers have been dropped, so we can close
344344+ // the connection and exit the task.
345345+ log::info!(
346346+ "All receivers for the Jetstream connection have been dropped, closing connection."
347347+ );
348348+ closing_connection = true;
349349+ }
350350+ }
351351+ Message::Binary(zstd_json) => {
352352+ let mut cursor = Cursor::new(zstd_json);
353353+ let mut decoder = zstd::stream::Decoder::with_prepared_dictionary(
354354+ &mut cursor,
355355+ &dictionary,
356356+ )
357357+ .map_err(JetstreamEventError::CompressionDictionaryError)?;
358358+359359+ let mut json = String::new();
360360+ decoder
361361+ .read_to_string(&mut json)
362362+ .map_err(JetstreamEventError::CompressionDecoderError)?;
363363+364364+ let event = serde_json::from_str(&json)
365365+ .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
366366+367367+ if send_channel.send(event).is_err() {
368368+ // We can assume that all receivers have been dropped, so we can close
369369+ // the connection and exit the task.
370370+ log::info!(
371371+ "All receivers for the Jetstream connection have been dropped, closing connection..."
372372+ );
373373+ closing_connection = true;
374374+ }
375375+ }
376376+ Message::Ping(vec) => {
377377+ log::trace!("Ping recieved, responding");
378378+ _ = shared_socket_write
379379+ .lock()
380380+ .await
381381+ .send(Message::Pong(vec))
382382+ .await;
383383+ }
384384+ Message::Close(close_frame) => {
385385+ if let Some(close_frame) = close_frame {
386386+ let reason = close_frame.reason;
387387+ let code = close_frame.code;
388388+ log::trace!("Connection closed. Reason: {reason}, Code: {code}");
389389+ }
390390+ }
391391+ Message::Pong(pong) => {
392392+ let pong_payload =
393393+ String::from_utf8(pong).unwrap_or("Invalid payload".to_string());
394394+ log::trace!("Pong recieved. Payload: {pong_payload}");
395395+ }
396396+ Message::Frame(_) => (),
397397+ }
398398+ }
399399+ Some(Err(error)) => {
400400+ log::error!("Web socket error: {error}");
401401+ ping_cancellation_token.cancel();
402402+ closing_connection = true;
403403+ }
404404+ None => {
405405+ log::error!("No web socket result");
406406+ ping_cancellation_token.cancel();
407407+ closing_connection = true;
408408+ }
409409+ }
410410+ if closing_connection {
411411+ _ = shared_socket_write.lock().await.close().await;
412412+ return Ok(());
413413+ }
414414+ }
415415+}