Tap drinker
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

dispatch events with timeout

Signed-off-by: tjh <x@tjh.dev>

tjh ae586b81 4134abe4

+56 -28
+21 -16
src/main.rs
··· 12 12 use sqlx::PgPool; 13 13 use tokio::{sync::mpsc, task::JoinSet}; 14 14 use tokio_util::sync::CancellationToken; 15 - use tracing::level_filters::LevelFilter; 15 + use tracing::{Instrument as _, level_filters::LevelFilter}; 16 16 use tracing_subscriber::{EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _}; 17 17 use trap::tap::{IdentityEvent, RecordAction, RecordEvent, TapChannel, TapClient, TapEvent}; 18 18 ··· 100 100 tx: mpsc::UnboundedSender<(String, DidSource)>, 101 101 shutdown: CancellationToken, 102 102 ) -> anyhow::Result<()> { 103 - while let Some(Some((event, ack))) = shutdown.run_until_cancelled(channel.recv()).await { 104 - let mut transaction = pool.begin().await?; 105 - match event { 106 - TapEvent::Record(record) => { 107 - let (record, parsed_record) = handle_record(record, &mut transaction).await?; 103 + while let Some(Some((span, event, ack))) = shutdown.run_until_cancelled(channel.recv()).await { 104 + async { 105 + let mut transaction = pool.begin().await?; 106 + match event { 107 + TapEvent::Record(record) => { 108 + let (record, parsed_record) = handle_record(record, &mut transaction).await?; 108 109 109 - // Expand the network of tracked DIDs. 110 - let nsid = record.collection.into_boxed_str(); 111 - for did in extract_dids(&parsed_record) { 112 - tx.send((did, DidSource::Record(nsid.clone())))?; 110 + // Expand the network of tracked DIDs. 111 + let nsid = record.collection.into_boxed_str(); 112 + for did in extract_dids(&parsed_record) { 113 + tx.send((did, DidSource::Record(nsid.clone())))?; 114 + } 115 + } 116 + TapEvent::Identity(identity) => { 117 + handle_identity(identity, &mut transaction).await?; 113 118 } 114 119 } 115 - TapEvent::Identity(identity) => { 116 - handle_identity(identity, &mut transaction).await?; 117 - } 120 + 121 + transaction.commit().await?; 122 + ack.acknowledge().await?; 123 + Ok::<_, anyhow::Error>(()) 118 124 } 119 - 120 - transaction.commit().await?; 121 - ack.acknowledge().await?; 125 + .instrument(span) 126 + .await?; 122 127 } 123 128 124 129 tracing::info!("complete");
+35 -12
src/tap/channel.rs
··· 2 2 3 3 use futures_util::{SinkExt as _, StreamExt}; 4 4 use serde::Serialize; 5 - use tokio::{net::TcpStream, sync::mpsc}; 5 + use tokio::{ 6 + net::TcpStream, 7 + sync::mpsc::{self, error::SendTimeoutError}, 8 + }; 6 9 use tokio_tungstenite::{ 7 10 MaybeTlsStream, WebSocketStream, 8 11 tungstenite::{Bytes, ClientRequestBuilder, Message}, 9 12 }; 10 13 use tokio_util::sync::{CancellationToken, DropGuard}; 14 + use tracing::Span; 11 15 12 16 use crate::tap::{TapClient, TapEvent}; 13 17 14 18 const TIMEOUT: Duration = Duration::from_secs(10); 15 19 20 + const DISPATCH_TIMEOUT: Duration = Duration::from_secs(2); 21 + 16 22 #[derive(Debug, thiserror::Error)] 17 23 #[error("Failed to enqueue acknowledgement for event #{0}")] 18 24 pub struct AckError(u64); ··· 54 60 55 61 #[derive(Debug)] 56 62 pub struct TapChannel { 57 - rx: mpsc::Receiver<(TapEvent, Ack)>, 63 + rx: mpsc::Receiver<(Span, TapEvent, Ack)>, 58 64 #[allow(unused)] 59 65 shutdown: DropGuard, 60 66 } ··· 108 114 } 109 115 110 116 impl TapChannel { 111 - pub async fn recv(&mut self) -> Option<(TapEvent, Ack)> { 117 + pub async fn recv(&mut self) -> Option<(Span, TapEvent, Ack)> { 112 118 self.rx.recv().await 113 119 } 114 120 } ··· 123 129 124 130 async fn channel_task( 125 131 request_builder: ClientRequestBuilder, 126 - event_tx: mpsc::Sender<(TapEvent, Ack)>, 132 + event_tx: mpsc::Sender<(Span, TapEvent, Ack)>, 127 133 shutdown: CancellationToken, 128 134 capacity: usize, 129 135 ) -> Result<(), ChannelError> { ··· 139 145 140 146 'outer: while !shutdown.is_cancelled() { 141 147 let mut ping_inflight = false; 142 - let mut timeout = tokio::time::interval(TIMEOUT); 143 - timeout.tick().await; 148 + let mut recv_timeout = tokio::time::interval(TIMEOUT); 149 + recv_timeout.tick().await; 144 150 145 151 let request = request_builder.clone(); 146 152 let (mut socket, _) = match tokio_tungstenite::connect_async(request).await { ··· 164 170 let action = tokio::select! { 165 171 Some(Ok(message)) = socket.next() => Action::Message(message), 166 172 Some(ack) = ack_rx.recv() => Action::Ack(ack), 167 - _ = timeout.tick() => Action::Timeout, 173 + _ = recv_timeout.tick() => Action::Timeout, 168 174 _ = shutdown.cancelled() => Action::ClearAcks, 169 175 else => Action::ClearAcks, 170 176 }; 177 + 178 + recv_timeout.reset(); 171 179 172 180 match action { 173 181 Action::Message(message) => match message { ··· 180 188 } 181 189 }; 182 190 183 - tracing::info!(?event, "received event"); 191 + let span = tracing::info_span!("event", id = event.id()); 192 + span.in_scope(|| { 193 + tracing::info!(?event, "received event"); 194 + }); 184 195 185 196 let ack = Ack { 186 197 id: event.id(), 187 198 tx: ack_tx.clone(), 188 199 }; 189 200 190 - if event_tx.send((event, ack)).await.is_err() { 191 - tracing::error!("failed to dispatch event"); 192 - break 'outer; 201 + let message = (span, event, ack); 202 + match event_tx.send_timeout(message, DISPATCH_TIMEOUT).await { 203 + Err(SendTimeoutError::Timeout((span, event, _))) => { 204 + span.in_scope(|| { 205 + tracing::error!(?event, "channel consumer stalled"); 206 + }); 207 + break 'outer; 208 + } 209 + Err(SendTimeoutError::Closed((span, event, _))) => { 210 + span.in_scope(|| { 211 + tracing::error!(?event, "channel consumer closed"); 212 + }); 213 + break 'outer; 214 + } 215 + Ok(_) => {} 193 216 } 194 217 } 195 218 Message::Binary(_) | Message::Frame(_) => { ··· 262 285 socket: &mut WebSocketStream<MaybeTlsStream<TcpStream>>, 263 286 ack: u64, 264 287 ) -> Result<(), tokio_tungstenite::tungstenite::Error> { 265 - tracing::debug!(?ack, "sending ack"); 288 + tracing::info!(?ack, "sending ack"); 266 289 let message = serde_json::to_string(&ClientMessage::from(ack)) 267 290 .expect("ClientMessage should be serializable"); 268 291