Tap drinker
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: remove Ack wrapper and rename AckHandle

Signed-off-by: tjh <x@tjh.dev>

tjh d0ef3b04 2f68171d

+21 -39
+1
rust-toolchain.toml
··· 1 1 [toolchain] 2 2 channel = "stable" 3 3 profile = "default" 4 + components = ["rust-analyzer"]
+18 -34
src/tap/channel.rs
··· 1 - use core::fmt; 2 1 use std::{ 3 2 collections::HashSet, 4 3 time::{Duration, SystemTime}, ··· 27 26 #[error("Failed to enqueue acknowledgement for event #{0}")] 28 27 pub struct AckError(u64); 29 28 30 - impl From<mpsc::error::SendError<Ack>> for AckError { 31 - fn from(error: mpsc::error::SendError<Ack>) -> Self { 32 - Self(error.0.0) 29 + impl From<mpsc::error::SendError<u64>> for AckError { 30 + fn from(error: mpsc::error::SendError<u64>) -> Self { 31 + Self(error.0) 33 32 } 34 33 } 35 34 36 - pub struct AckHandle { 35 + pub struct Ack { 37 36 id: u64, 38 - tx: mpsc::Sender<Ack>, 37 + tx: mpsc::Sender<u64>, 39 38 } 40 39 41 - impl AckHandle { 40 + impl Ack { 42 41 /// Acknowledge receipt of the associated event. 43 42 /// 44 43 /// Success does *not* mean the Tap server has successfully received the 45 44 /// acknowledgement, only that the ack has be queued by the client. 46 45 pub async fn acknowledge(self) -> Result<(), AckError> { 47 - self.tx.send(Ack::new(self.id)).await?; 46 + self.tx.send(self.id).await?; 48 47 Ok(()) 49 48 } 50 49 } 51 50 52 - #[derive(Debug)] 53 - pub struct Ack(u64); 54 - 55 - impl fmt::Display for Ack { 56 - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 57 - fmt::Display::fmt(&self.0, f) 58 - } 59 - } 60 - 61 - impl Ack { 62 - fn new(id: u64) -> Self { 63 - Self(id) 64 - } 65 - } 66 - 67 51 /// Messages that are serialized and sent to the Tap server. 68 52 #[derive(Debug, Serialize)] 69 53 #[serde(tag = "type", rename_all = "snake_case")] ··· 71 55 Ack { id: u64 }, 72 56 } 73 57 74 - impl From<&Ack> for ClientMessage { 75 - fn from(&Ack(id): &Ack) -> Self { 58 + impl From<u64> for ClientMessage { 59 + fn from(id: u64) -> Self { 76 60 Self::Ack { id } 77 61 } 78 62 } 79 63 80 64 #[derive(Debug)] 81 65 pub struct TapChannel { 82 - rx: mpsc::Receiver<(TapEvent, AckHandle)>, 66 + rx: mpsc::Receiver<(TapEvent, Ack)>, 83 67 shutdown: watch::Sender<bool>, 84 68 } 85 69 ··· 91 75 capacity: usize, 92 76 ) -> ( 93 77 Self, 94 - impl Future<Output = Result<Vec<Ack>, ChannelError>> + Send + 'static, 78 + impl Future<Output = Result<Vec<u64>, ChannelError>> + Send + 'static, 95 79 ) { 96 80 let mut url = tap.url().clone(); 97 81 url.set_path("/channel"); ··· 126 110 } 127 111 128 112 impl TapChannel { 129 - pub async fn recv(&mut self) -> Option<(TapEvent, AckHandle)> { 113 + pub async fn recv(&mut self) -> Option<(TapEvent, Ack)> { 130 114 self.rx.recv().await 131 115 } 132 116 } ··· 143 127 #[error("Client authorization failed")] 144 128 Authorization, 145 129 #[error("Failed to send pending Acks: {0:?}: {1}")] 146 - FailedAcks(Vec<Ack>, tokio_tungstenite::tungstenite::Error), 130 + FailedAcks(Vec<u64>, tokio_tungstenite::tungstenite::Error), 147 131 } 148 132 149 133 async fn channel_task( 150 134 request_builder: ClientRequestBuilder, 151 - event_tx: mpsc::Sender<(TapEvent, AckHandle)>, 135 + event_tx: mpsc::Sender<(TapEvent, Ack)>, 152 136 mut shutdown_rx: watch::Receiver<bool>, 153 137 capacity: usize, 154 - ) -> Result<Vec<Ack>, ChannelError> { 138 + ) -> Result<Vec<u64>, ChannelError> { 155 139 #[derive(Debug)] 156 140 enum Action { 157 141 Message(Message), ··· 215 199 } 216 200 }; 217 201 218 - let ack = AckHandle { 202 + let ack = Ack { 219 203 id: event.id(), 220 204 tx: ack_tx.clone(), 221 205 }; ··· 296 280 } 297 281 298 282 async fn send_acknowledgements( 299 - acks: &mut Vec<Ack>, 283 + acks: &mut Vec<u64>, 300 284 socket: &mut WebSocketStream<MaybeTlsStream<TcpStream>>, 301 285 ) -> Result<usize, tokio_tungstenite::tungstenite::Error> { 302 286 let mut count = 0; 303 287 for ack in acks.drain(..) { 304 288 tracing::debug!(?ack, "sending ack"); 305 - let message = serde_json::to_string(&ClientMessage::from(&ack)) 289 + let message = serde_json::to_string(&ClientMessage::from(ack)) 306 290 .expect("ClientMessage should be serializable"); 307 291 308 292 socket.send(Message::text(message)).await?;
+2 -5
src/tap/client.rs
··· 6 6 use serde_json::Value; 7 7 use url::Url; 8 8 9 - use crate::tap::{ 10 - Error, HttpClient, RepoInfo, TapChannel, 11 - channel::{Ack, ChannelError}, 12 - }; 9 + use crate::tap::{Error, HttpClient, RepoInfo, TapChannel, channel::ChannelError}; 13 10 14 11 pub type Result<T> = core::result::Result<T, Error>; 15 12 ··· 59 56 &self, 60 57 ) -> ( 61 58 TapChannel, 62 - impl Future<Output = core::result::Result<Vec<Ack>, ChannelError>> + Send + 'static, 59 + impl Future<Output = core::result::Result<Vec<u64>, ChannelError>> + Send + 'static, 63 60 ) { 64 61 TapChannel::new(self, TapChannel::DEFAULT_CAPACITY) 65 62 }