···3636 }37373838 /// Add a DID to the Jetstream filters.3939+ ///4040+ /// # Errors4141+ ///4242+ /// Returns an error if the Jetstream task is no longer active.4343+ ///4444+ /// # Panics4545+ ///4646+ /// Panics if the [`Mutex`] for the client options has been poisoned.4747+ ///3948 pub async fn add_did(&self, did: impl Into<OwnedDid>) -> Result<(), JetstreamClientError> {4049 if self.options.lock().unwrap().add_did(did.into())? {4150 // The DID is new to the client, notify the task to update.···5445 }55465647 /// Remove a DID from the Jetstream filters.4848+ ///4949+ /// # Errors5050+ ///5151+ /// Returns an error if the Jetstream task is no longer active.5252+ ///5353+ /// # Panics5454+ ///5555+ /// Panics if the [`Mutex`] for the client options has been poisoned.5656+ ///5757 pub async fn remove_did(&self, did: impl Into<OwnedDid>) -> Result<(), JetstreamClientError> {5858 if self.options.lock().unwrap().remove_did(&did.into()) {5959 self.update_task().await?;···7153 }72547355 /// Add a collection to the Jetstream filters.5656+ ///5757+ /// # Errors5858+ ///5959+ /// Returns an error if the Jetstream task is no longer active.6060+ ///6161+ /// # Panics6262+ ///6363+ /// Panics if the [`Mutex`] for the client options has been poisoned.6464+ ///7465 pub async fn add_collection(7566 &self,7667 collection: impl Into<Box<Nsid>>,···9770 }98719972 /// Remove a collection from the Jetstream filters.7373+ ///7474+ /// # Errors7575+ ///7676+ /// Returns an error if the Jetstream task is no longer active.7777+ ///7878+ /// # Panics7979+ ///8080+ /// Panics if the [`Mutex`] for the client options has been poisoned.8181+ ///10082 pub async fn remove_collection(10183 &self,10284 collection: impl Into<Box<Nsid>>,···12690 self.metrics.export()12791 }128929393+ /// Shutdown the Jetstream client.9494+ ///9595+ /// # Errors9696+ ///9797+ /// Returns an error if the Jetstream task is no longer active.9898+ ///12999 pub async fn shutdown(self) -> Result<(), JetstreamClientError> {130100 let (command, complete) = ClientCommand::shutdown();131101 self.client_tx.send(command)?;···257215}258216259217impl<'a> JetstreamEvent {218218+ /// Deserialize the event.219219+ ///220220+ /// # Errors221221+ ///222222+ /// Returns an error if the event cannot be deserialized from it JSON223223+ /// representation.224224+ ///260225 pub fn deserialize(&'a self) -> Result<Event<'a>, serde_json::Error> {261226 let value = serde_json::from_slice(&self.bytes)?;262227 Ok(value)
···4848 /// Returns an error if the maximum number of subscribed collections has been reached; `Ok(true)`4949 /// if the collection was newly added to the set, or `Ok(false)` if the colletion was already in the5050 /// the set.5151+ ///5252+ /// # Errors5353+ ///5454+ /// Returns an error if adding `collection` would cause [`SubscriberOptions`] to exceed the maximum5555+ /// number of subscribed collections.5656+ ///5157 pub fn add_collection(&mut self, collection: Box<Nsid>) -> Result<bool, Box<Nsid>> {5258 if self.wanted_collections.len() == MAX_WANTED_COLLECTIONS5359 && !self.wanted_collections.contains(&collection)···7367 /// Returns an error if the maximum number of subscribed DIDs has been reached; `Ok(true)`7468 /// if the DID was newly added to the set, or `Ok(false)` if the DID was already in the7569 /// the set.7070+ ///7171+ /// # Errors7272+ ///7373+ /// Returns an error if adding `did` would cause [`SubscriberOptions`] to exceed the maximum7474+ /// number of subscribed DIDs.7575+ ///7676 pub fn add_did(&mut self, did: OwnedDid) -> Result<bool, OwnedDid> {7777 if self.wanted_dids.len() == MAX_WANTED_DIDS && !self.wanted_dids.contains(&did) {7878 return Err(did);···93819482 /// Get the normalized maximum message size.9583 #[must_use]9696- pub fn max_message_size(&self) -> i64 {8484+ pub const fn max_message_size(&self) -> i64 {9785 normalize_max_message_size(self.max_message_size_bytes)9886 }9987···178166 Ok(super::normalize_max_message_size(value))179167 }180168169169+ #[allow(clippy::trivially_copy_pass_by_ref)]181170 pub fn serialize<S>(value: &i64, serializer: S) -> Result<S::Ok, S::Error>182171 where183172 S: Serializer,···203190204191impl SubscriberSourcedMessage<'_> {205192 /// Serialize the [`SubscriberSourcedMessage`] to JSON.193193+ ///194194+ /// # Panics195195+ ///196196+ /// Panics if [`SubscriberSourcedMessage`] cannot be serialized as JSON.197197+ ///206198 #[must_use]207199 pub fn to_json(&self) -> String {208200 serde_json::to_string(self).expect("SubscriberSourcedMessage should be serializable")
+24-16
crates/gordian-jetstream/src/task.rs
···4545 self: std::pin::Pin<&mut Self>,4646 cx: &mut std::task::Context<'_>,4747 ) -> std::task::Poll<Self::Output> {4848- match self.get_mut().0 {4949- Some(ref mut fut) => fut.as_mut().poll(cx),5050- None => unreachable!(),5151- }4848+ self.get_mut()4949+ .05050+ .as_mut()5151+ .map_or_else(|| unreachable!(), |fut| fut.as_mut().poll(cx))5252 }5353}5454···6868 }6969}70707171+#[derive(Deserialize)]7272+struct PartialEvent<'a> {7373+ time_us: u64,7474+ #[serde(borrow)]7575+ kind: &'a str,7676+}7777+7878+#[allow(clippy::too_many_lines)]7179pub async fn jetstream_subscriber(7280 event_tx: flume::Sender<Bytes>,7381 client_rx: flume::Receiver<ClientCommand>,···8476 subscriber_options: Arc<Mutex<SubscriberOptions>>,8577 shutdown: CancellationToken,8678) {8787- let mut state = State { metrics };7979+ let state = State { metrics };8880 let mut reconnect_backoff = client_options.reconnect_backoff_min;89819082 let mut instance_idx = fastrand::usize(0..client_options.instances.len());···10698 #[cfg(not(feature = "zstd"))]10799 let request = ClientRequestBuilder::new(uri);108100109109- let (socket, _) = match shutdown.run_until_cancelled(connect_async(request)).await {101101+ let (socket, _) = match shutdown102102+ .run_until_cancelled(Box::pin(connect_async(request)))103103+ .await104104+ {110105 Some(Ok(socket)) => {111106 reconnect_backoff = client_options.reconnect_backoff_min;112107 socket···150139151140 loop {152141 let message = tokio::select! {153153- Some(Ok(outcome)) = shutdown.run_until_cancelled(handle_read_socket(&mut read, &mut state)) => {142142+ Some(Ok(outcome)) = shutdown.run_until_cancelled(handle_read_socket(&mut read, &state)) => {154143 match outcome {155144 ReadOutcome::Event(message) => message,156145 ReadOutcome::Ping(payload) => {···197186 else => break,198187 };199188200200- #[derive(Deserialize)]201201- struct PartialEvent<'a> {202202- time_us: u64,203203- #[serde(borrow)]204204- kind: &'a str,205205- }206206-207189 // Deserialize just the event timestamp and event kind.208190 let new_cursor = match serde_json::from_slice::<PartialEvent>(&message) {209191 Ok(event) => {···216212 state.metrics.modify(|mut data| data.messages_received += 1);217213 if let Err(error) = event_tx.send_async(message).await {218214 let payload = error.into_inner();219219- if let Ok(payload) = std::str::from_utf8(&payload) { tracing::error!(%payload, "Failed to dispatch event to channel") } else { tracing::error!(?payload, "Failed to dispatch event to channel") }215215+ if let Ok(payload) = std::str::from_utf8(&payload) {216216+ tracing::error!(%payload, "Failed to dispatch event to channel");217217+ } else {218218+ tracing::error!(?payload, "Failed to dispatch event to channel");219219+ }220220 break 'outer;221221 }222222···243235244236async fn handle_read_socket<S>(245237 stream: &mut S,246246- state: &mut State,238238+ state: &State,247239) -> Result<ReadOutcome, TungsteniteError>248240where249241 S: StreamExt<Item = Result<Message, TungsteniteError>> + Unpin,