Tap drinker
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: don't buffer acknowledgements

Signed-off-by: tjh <x@tjh.dev>

tjh 4134abe4 5e4b30d5

+27 -40
+4 -4
src/main.rs
··· 3 3 4 4 use std::{ 5 5 collections::{HashSet, VecDeque}, 6 - io, 6 + io, process, 7 7 time::Duration, 8 8 }; 9 9 ··· 43 43 44 44 let mut tasks = JoinSet::new(); 45 45 tasks.spawn(async move { 46 - let unsent_acks = tap_task.await?; 47 - if !unsent_acks.is_empty() { 48 - tracing::error!(?unsent_acks, "failed to clear Acks"); 46 + if let Err(error) = tap_task.await { 47 + tracing::error!(?error); 48 + process::abort(); 49 49 } 50 50 Ok(()) 51 51 });
+22 -35
src/tap/channel.rs
··· 67 67 capacity: usize, 68 68 ) -> ( 69 69 Self, 70 - impl Future<Output = Result<Vec<u64>, ChannelError>> + Send + 'static, 70 + impl Future<Output = Result<(), ChannelError>> + Send + 'static, 71 71 ) { 72 72 let mut url = tap.url().clone(); 73 73 url.set_path("/channel"); ··· 118 118 #[error("Client authorization failed")] 119 119 Authorization, 120 120 #[error("Failed to send pending Acks: {0:?}: {1}")] 121 - FailedAcks(Vec<u64>, tokio_tungstenite::tungstenite::Error), 121 + FailedAck(u64, tokio_tungstenite::tungstenite::Error), 122 122 } 123 123 124 124 async fn channel_task( ··· 126 126 event_tx: mpsc::Sender<(TapEvent, Ack)>, 127 127 shutdown: CancellationToken, 128 128 capacity: usize, 129 - ) -> Result<Vec<u64>, ChannelError> { 129 + ) -> Result<(), ChannelError> { 130 130 #[derive(Debug)] 131 131 enum Action { 132 132 Message(Message), 133 133 Timeout, 134 - Ack, 134 + Ack(u64), 135 135 ClearAcks, 136 136 } 137 137 138 138 let (ack_tx, mut ack_rx) = mpsc::channel(capacity); 139 - let mut acks: Vec<_> = Default::default(); 140 139 141 140 'outer: while !shutdown.is_cancelled() { 142 141 let mut ping_inflight = false; ··· 160 159 continue 'outer; 161 160 } 162 161 }; 163 - 164 - // Send any pending Acks. 165 - if let Err(error) = send_acknowledgements(&mut acks, &mut socket).await { 166 - tracing::error!(?error, "failed to send Ack"); 167 - continue; 168 - } 169 162 170 163 loop { 171 164 let action = tokio::select! { 172 165 Some(Ok(message)) = socket.next() => Action::Message(message), 173 - count = ack_rx.recv_many(&mut acks, 64) => match count { 174 - 0 => Action::ClearAcks, 175 - _ => Action::Ack, 176 - }, 166 + Some(ack) = ack_rx.recv() => Action::Ack(ack), 177 167 _ = timeout.tick() => Action::Timeout, 178 168 _ = shutdown.cancelled() => Action::ClearAcks, 179 169 else => Action::ClearAcks, ··· 189 179 continue; 190 180 } 191 181 }; 182 + 183 + tracing::info!(?event, "received event"); 192 184 193 185 let ack = Ack { 194 186 id: event.id(), ··· 219 211 break; 220 212 } 221 213 }, 222 - Action::Ack => { 223 - if let Err(error) = send_acknowledgements(&mut acks, &mut socket).await { 214 + Action::Ack(ack) => { 215 + if let Err(error) = send_acknowledgement(&mut socket, ack).await { 224 216 tracing::error!(?error, "failed to send Ack"); 225 217 break; 226 218 } ··· 248 240 Action::ClearAcks => { 249 241 drop(ack_tx); 250 242 while let Some(ack) = ack_rx.recv().await { 251 - acks.push(ack); 252 - } 253 - 254 - if let Err(error) = send_acknowledgements(&mut acks, &mut socket).await { 255 - return Err(ChannelError::FailedAcks(acks, error)); 243 + if let Err(error) = send_acknowledgement(&mut socket, ack).await { 244 + tracing::error!(?error, "failed to send ack"); 245 + return Err(ChannelError::FailedAck(ack, error)); 246 + } 256 247 } 257 248 258 249 break 'outer; ··· 264 255 } 265 256 266 257 tracing::info!("complete"); 267 - Ok(acks) 258 + Ok(()) 268 259 } 269 260 270 - async fn send_acknowledgements( 271 - acks: &mut Vec<u64>, 261 + async fn send_acknowledgement( 272 262 socket: &mut WebSocketStream<MaybeTlsStream<TcpStream>>, 273 - ) -> Result<usize, tokio_tungstenite::tungstenite::Error> { 274 - let mut count = 0; 275 - for ack in acks.drain(..) { 276 - tracing::debug!(?ack, "sending ack"); 277 - let message = serde_json::to_string(&ClientMessage::from(ack)) 278 - .expect("ClientMessage should be serializable"); 263 + ack: u64, 264 + ) -> Result<(), tokio_tungstenite::tungstenite::Error> { 265 + tracing::debug!(?ack, "sending ack"); 266 + let message = serde_json::to_string(&ClientMessage::from(ack)) 267 + .expect("ClientMessage should be serializable"); 279 268 280 - socket.send(Message::text(message)).await?; 281 - count += 1; 282 - } 269 + socket.send(Message::text(message)).await?; 283 270 284 - Ok(count) 271 + Ok(()) 285 272 }
+1 -1
src/tap/client.rs
··· 56 56 &self, 57 57 ) -> ( 58 58 TapChannel, 59 - impl Future<Output = core::result::Result<Vec<u64>, ChannelError>> + Send + 'static, 59 + impl Future<Output = core::result::Result<(), ChannelError>> + Send + 'static, 60 60 ) { 61 61 TapChannel::new(self, TapChannel::DEFAULT_CAPACITY) 62 62 }