Tap drinker
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: use cancellation tokens for shutdown

Signed-off-by: tjh <x@tjh.dev>

tjh 80375940 0d75ba44

+15 -17
+15 -17
src/tap/channel.rs
··· 5 5 6 6 use futures_util::{SinkExt as _, StreamExt}; 7 7 use serde::Serialize; 8 - use tokio::{ 9 - net::TcpStream, 10 - sync::{mpsc, watch}, 11 - }; 8 + use tokio::{net::TcpStream, sync::mpsc}; 12 9 use tokio_tungstenite::{ 13 10 MaybeTlsStream, WebSocketStream, 14 11 tungstenite::{Bytes, ClientRequestBuilder, Message}, 15 12 }; 13 + use tokio_util::sync::{CancellationToken, DropGuard}; 16 14 17 15 use crate::tap::{TapClient, TapEvent}; 18 16 ··· 64 62 #[derive(Debug)] 65 63 pub struct TapChannel { 66 64 rx: mpsc::Receiver<(TapEvent, Ack)>, 67 - shutdown: watch::Sender<bool>, 65 + #[allow(unused)] 66 + shutdown: DropGuard, 68 67 } 69 68 70 69 impl TapChannel { ··· 102 101 } 103 102 104 103 let (tx, rx) = mpsc::channel(capacity); 105 - let (shutdown, shutdown_rx) = watch::channel(false); 106 - let handle = channel_task(builder, tx, shutdown_rx, capacity); 104 + let shutdown = CancellationToken::new(); 105 + let handle = channel_task(builder, tx, shutdown.child_token(), capacity); 107 106 108 - (Self { rx, shutdown }, handle) 107 + ( 108 + Self { 109 + rx, 110 + shutdown: shutdown.drop_guard(), 111 + }, 112 + handle, 113 + ) 109 114 } 110 115 } 111 116 112 117 impl TapChannel { 113 118 pub async fn recv(&mut self) -> Option<(TapEvent, Ack)> { 114 119 self.rx.recv().await 115 - } 116 - } 117 - 118 - impl Drop for TapChannel { 119 - fn drop(&mut self) { 120 - self.rx.close(); 121 - let _ = self.shutdown.send(true); 122 120 } 123 121 } 124 122 ··· 133 131 async fn channel_task( 134 132 request_builder: ClientRequestBuilder, 135 133 event_tx: mpsc::Sender<(TapEvent, Ack)>, 136 - mut shutdown_rx: watch::Receiver<bool>, 134 + shutdown: CancellationToken, 137 135 capacity: usize, 138 136 ) -> Result<Vec<u64>, ChannelError> { 139 137 #[derive(Debug)] ··· 184 182 _ => Action::Ack, 185 183 }, 186 184 _ = timeout.tick() => Action::Timeout, 187 - _ = shutdown_rx.wait_for(|v| *v) => Action::ClearAcks, 185 + _ = shutdown.cancelled() => Action::ClearAcks, 188 186 else => Action::ClearAcks, 189 187 }; 190 188