···5151 let handle = TapHandle::spawn_default(config).await?;
52525353 // Subscribe to events
5454- let mut channel = handle.channel().await?;
5454+ let (mut receiver, mut ack_sender) = handle.channel().await?;
55555656- while let Ok(received) = channel.recv().await {
5757- match &received.event {
5656+ while let Ok((event, ack_id)) = receiver.recv().await {
5757+ match event {
5858 Event::Record(record) => {
5959 println!("[{:?}] {}/{}",
6060 record.action,
···6666 println!("Identity: {} -> {}", identity.did, identity.handle);
6767 }
6868 }
6969- // Event is auto-acknowledged when `received` is dropped
6969+ // Manual acknowledgment required
7070+ ack_sender.ack(ack_id).await?;
7071 }
71727273 Ok(())
···164165165166### Working with Events
166167167167-Events are automatically acknowledged when dropped:
168168+Events must be manually acknowledged:
168169169170```rust
170171use tapped::{Event, RecordAction};
171172172172-let mut channel = client.channel().await?;
173173+let (mut receiver, mut ack_sender) = client.channel().await?;
173174174174-while let Ok(received) = channel.recv().await {
175175- match &received.event {
175175+while let Ok((event, ack_id)) = receiver.recv().await {
176176+ match event {
176177 Event::Record(record) => {
177178 match record.action {
178179 RecordAction::Create => {
···193194 println!("{} is now @{}", identity.did, identity.handle);
194195 }
195196 }
196196- // Ack sent automatically here when `received` goes out of scope
197197+ // Ack must be sent manually
198198+ ack_sender.ack(ack_id).await?;
197199}
198200```
199201
+10-6
standard-site-sync/src/main.rs
···6767 client.health().await?;
6868 info!("Tap is healthy!");
69697070- let mut receiver = client.channel().await?;
7070+ let (mut receiver, mut ack_sender) = client.channel().await?;
7171 info!("Connected! Waiting for events...");
72727373 // In-memory cache - load from disk if available
···83838484 loop {
8585 match receiver.recv().await {
8686- Ok(received) => {
8787- if let Event::Record(ref record_event) = *received {
8686+ Ok((event, ack_id)) => {
8787+ if let Event::Record(ref record_event) = event {
8888 // Track live vs backfill
8989 if record_event.live {
9090 live_count += 1;
···9696 write_output_files(&cache)?;
97979898 // Periodically show event source breakdown
9999- if (live_count + backfill_count).is_multiple_of(10) {
9999+ if (live_count + backfill_count) % 10 == 0 {
100100 info!(
101101 "[Stats] Live events: {}, Backfill events: {}",
102102 live_count, backfill_count
103103 );
104104 }
105105- } else if let Event::Identity(ref identity_event) = *received {
105105+ } else if let Event::Identity(ref identity_event) = event {
106106 info!(
107107 "[IDENTITY] {} -> {} (active: {})",
108108 identity_event.did, identity_event.handle, identity_event.is_active
109109 );
110110 }
111111- // Event is automatically acked when `received` is dropped here
111111+112112+ if let Err(e) = ack_sender.ack(ack_id).await {
113113+ error!("Failed to ack event: {}", e);
114114+ break;
115115+ }
112116 }
113117 Err(e) => {
114118 eprintln!("Error receiving event: {}", e);
+55-133
tapped/src/channel.rs
···11//! WebSocket event channel and receiver.
2233use serde::Serialize;
44-use tokio::sync::mpsc;
54use sockudo_ws::{Message, Http1, Config, Stream as WsTransportStream, SplitWriter, SplitReader};
65use sockudo_ws::client::WebSocketClient;
77-use bytes::Bytes;
86use url::Url;
97108use crate::types::RawEvent;
···1311type WsSink = SplitWriter<WsTransportStream<Http1>>;
1412type WsSource = SplitReader<WsTransportStream<Http1>>;
15131414+/// Opaque identifier for an event to be acknowledged.
1515+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1616+pub struct AckId(u64);
1717+1818+/// Sender for event acknowledgments.
1919+pub struct AckSender {
2020+ write: WsSink,
2121+}
2222+2323+impl AckSender {
2424+ /// Send an acknowledgment for an event.
2525+ pub async fn ack(&mut self, id: AckId) -> Result<()> {
2626+ #[derive(Serialize)]
2727+ struct AckMessage {
2828+ #[serde(rename = "type")]
2929+ type_: &'static str,
3030+ id: u64,
3131+ }
3232+3333+ let msg = AckMessage { type_: "ack", id: id.0 };
3434+ let json = serde_json::to_string(&msg)?;
3535+3636+ self.write.send(Message::text(json)).await.map_err(|e| Error::WebSocket(Box::new(e)))
3737+ }
3838+}
3939+1640/// Receiver for events from a tap WebSocket channel.
1741///
1842/// Events are received via the [`recv`](EventReceiver::recv) method.
1919-/// Acknowledgments are sent automatically when events are dropped.
4343+/// Acknowledgments must be sent manually using the [`AckSender`] returned by [`TapClient::channel()`](crate::TapClient::channel).
2044///
2145/// This type does not implement auto-reconnection. If the connection
2246/// closes, `recv()` will return an error and you must create a new
2347/// `EventReceiver` via [`TapClient::channel()`](crate::TapClient::channel).
2448pub struct EventReceiver {
2525- event_rx: mpsc::Receiver<EventWithAck>,
2626- _ack_tx: mpsc::Sender<u64>,
2727-}
2828-2929-struct EventWithAck {
3030- event: Bytes,
3131- ack_tx: mpsc::Sender<u64>,
3232-}
3333-3434-struct AckGuard {
3535- id: u64,
3636- ack_tx: Option<mpsc::Sender<u64>>,
3737-}
3838-3939-impl Drop for AckGuard {
4040- fn drop(&mut self) {
4141- if let Some(tx) = self.ack_tx.take() {
4242- // Fire and forget - if the channel is closed, we can't ack anyway
4343- let id = self.id;
4444- tokio::spawn(async move {
4545- let _ = tx.send(id).await;
4646- });
4747- }
4848- }
4949-}
5050-5151-/// Wrapper around Event that includes the ack trigger.
5252-pub struct ReceivedEvent {
5353- pub event: Event,
5454- _ack_guard: AckGuard,
5555-}
5656-5757-impl std::ops::Deref for ReceivedEvent {
5858- type Target = Event;
5959-6060- fn deref(&self) -> &Self::Target {
6161- &self.event
6262- }
4949+ read: WsSource,
6350}
64516552impl EventReceiver {
6653 /// Connect to a tap WebSocket channel.
6767- pub(crate) async fn connect(base_url: &Url, admin_password: Option<&str>) -> Result<Self> {
5454+ pub(crate) async fn connect(base_url: &Url, admin_password: Option<&str>) -> Result<(Self, AckSender)> {
6855 let mut ws_url = base_url.clone();
6956 match ws_url.scheme() {
7057 "http" => ws_url.set_scheme("ws").unwrap(),
···122109123110 let (read, write) = ws_stream.split();
124111125125- // Buffer size increased to 2048 to prevent TCP window clamping during brief processing spikes
126126- let (event_tx, event_rx) = mpsc::channel(2048);
127127- let (ack_tx, ack_rx) = mpsc::channel(1000);
128128-129129- let ack_tx_clone = ack_tx.clone();
130130- tokio::spawn(async move {
131131- Self::writer_task(write, ack_rx).await;
132132- });
133133-134134- tokio::spawn(async move {
135135- Self::reader_task(read, event_tx, ack_tx_clone).await;
136136- });
137137-138138- Ok(Self {
139139- event_rx,
140140- _ack_tx: ack_tx,
141141- })
112112+ Ok((
113113+ Self { read },
114114+ AckSender { write },
115115+ ))
142116 }
143117144118 /// Receive the next event.
145119 ///
146146- /// Returns the event wrapped in a [`ReceivedEvent`] that automatically
147147- /// sends an acknowledgment when dropped.
120120+ /// Returns the event and an opaque ID that must be passed to [`AckSender::ack`]
121121+ /// to acknowledge the event.
148122 ///
149123 /// # Errors
150124 ///
151125 /// Returns [`Error::ChannelClosed`] if the WebSocket connection closes.
152152- pub async fn recv(&mut self) -> Result<ReceivedEvent> {
126126+ pub async fn recv(&mut self) -> Result<(Event, AckId)> {
153127 loop {
154154- match self.event_rx.recv().await {
155155- Some(event_with_ack) => {
156156- let json = event_with_ack.event;
157157- let json_str = std::str::from_utf8(&json).expect("must be utf8");
128128+ match self.read.next().await {
129129+ Some(Ok(Message::Text(event_bytes))) => {
130130+ let event_bytes = bytes::Bytes::from(event_bytes);
131131+ let json_str = match std::str::from_utf8(&event_bytes) {
132132+ Ok(s) => s,
133133+ Err(e) => {
134134+ tracing::warn!("Failed to parse event as utf8: {}", e);
135135+ continue;
136136+ }
137137+ };
138138+158139 let raw = match serde_json::from_str::<RawEvent>(json_str) {
159140 Ok(raw) => raw,
160141 Err(e) => {
161161- tracing::warn!("Failed to parse event: {}", e);
142142+ tracing::warn!("Failed to parse event json: {}", e);
162143 continue;
163144 }
164145 };
165146166166- if let Some(event) = raw.into_event(json.clone()) {
147147+ if let Some(event) = raw.into_event(event_bytes.clone()) {
167148 let id = event.id();
168168- break Ok(ReceivedEvent {
169169- event,
170170- _ack_guard: AckGuard {
171171- id,
172172- ack_tx: Some(event_with_ack.ack_tx),
173173- },
174174- });
175175- }
176176- }
177177- None => break Err(Error::ChannelClosed),
178178- }
179179- }
180180- }
181181-182182- /// Writer task: sends ack messages to the WebSocket.
183183- async fn writer_task(mut write: WsSink, mut ack_rx: mpsc::Receiver<u64>) {
184184- #[derive(Serialize)]
185185- struct AckMessage {
186186- #[serde(rename = "type")]
187187- type_: &'static str,
188188- id: u64,
189189- }
190190-191191- while let Some(id) = ack_rx.recv().await {
192192- let msg = AckMessage { type_: "ack", id };
193193- let json = match serde_json::to_string(&msg) {
194194- Ok(j) => j,
195195- Err(e) => {
196196- tracing::warn!("Failed to serialize ack: {}", e);
197197- continue;
198198- }
199199- };
200200-201201- if let Err(e) = write.send(Message::text(json)).await {
202202- tracing::warn!("Failed to send ack: {}", e);
203203- break;
204204- }
205205- }
206206- }
207207-208208- /// Reader task: reads events from WebSocket and sends to channel.
209209- async fn reader_task(
210210- mut read: WsSource,
211211- event_tx: mpsc::Sender<EventWithAck>,
212212- ack_tx: mpsc::Sender<u64>,
213213- ) {
214214- while let Some(msg_result) = read.next().await {
215215- match msg_result {
216216- Ok(Message::Text(event)) => {
217217- let event_with_ack = EventWithAck {
218218- event,
219219- ack_tx: ack_tx.clone(),
220220- };
221221- if event_tx.send(event_with_ack).await.is_err() {
222222- break;
149149+ return Ok((event, AckId(id)));
223150 }
224151 }
225225- Ok(Message::Close(_)) => {
226226- break;
227227- }
228228- Ok(_) => {
229229- // Ignore ping/pong/binary
230230- }
231231- Err(_) => {
232232- break;
233233- }
152152+ Some(Ok(Message::Close(_))) => return Err(Error::ChannelClosed),
153153+ Some(Ok(_)) => continue, // Ping/Pong/Binary
154154+ Some(Err(_)) => return Err(Error::ChannelClosed),
155155+ None => return Err(Error::ChannelClosed),
234156 }
235157 }
236158 }
237237-}
159159+}
+7-6
tapped/src/client.rs
···337337338338 /// Connect to the WebSocket event channel.
339339 ///
340340- /// Returns an [`EventReceiver`] for receiving events. Events are
341341- /// automatically acknowledged when dropped.
340340+ /// Returns an [`EventReceiver`] for receiving events and an [`AckSender`] for
341341+ /// acknowledging them. Events must be manually acknowledged using [`AckSender::ack`].
342342 ///
343343 /// # Example
344344 ///
···347347 /// use tapped::TapClient;
348348 ///
349349 /// let client = TapClient::new("http://localhost:2480")?;
350350- /// let mut receiver = client.channel().await?;
350350+ /// let (mut receiver, mut ack_sender) = client.channel().await?;
351351 ///
352352- /// while let Ok(event) = receiver.recv().await {
353353- /// // Event is automatically acknowledged when dropped
352352+ /// while let Ok((event, ack_id)) = receiver.recv().await {
353353+ /// // Process event...
354354+ /// ack_sender.ack(ack_id).await?;
354355 /// }
355356 /// # Ok(())
356357 /// # }
357358 /// ```
358358- pub async fn channel(&self) -> Result<EventReceiver> {
359359+ pub async fn channel(&self) -> Result<(EventReceiver, crate::channel::AckSender)> {
359360 EventReceiver::connect(&self.base_url, self.admin_password.as_deref()).await
360361 }
361362}
+3-2
tapped/src/handle.rs
···3232/// // Use the client methods directly on the handle
3333/// handle.health().await?;
3434///
3535-/// let mut channel = handle.channel().await?;
3636-/// while let Ok(event) = channel.recv().await {
3535+/// let (mut receiver, mut ack_sender) = handle.channel().await?;
3636+/// while let Ok((event, ack_id)) = receiver.recv().await {
3737/// // Handle event
3838+/// ack_sender.ack(ack_id).await?;
3839/// }
3940///
4041/// Ok(())
+7-6
tapped/src/lib.rs
···1010//!
1111//! - Connect to an existing tap instance or spawn one as a subprocess
1212//! - Strongly-typed configuration with builder pattern
1313-//! - Async event streaming with automatic acknowledgment
1313+//! - Async event streaming with manual acknowledgment
1414//! - Full HTTP API coverage for repo management and statistics
1515//!
1616//! ## Example
···3030//! client.add_repos(&["did:plc:example1234567890abc"]).await?;
3131//!
3232//! // Stream events
3333-//! let mut receiver = client.channel().await?;
3434-//! while let Ok(event) = receiver.recv().await {
3535-//! // Event is automatically acknowledged when dropped
3333+//! let (mut receiver, mut ack_sender) = client.channel().await?;
3434+//! while let Ok((event, ack_id)) = receiver.recv().await {
3535+//! // Process event...
3636+//! ack_sender.ack(ack_id).await?;
3637//! }
3738//!
3839//! Ok(())
···4748mod process;
4849mod types;
49505050-pub use channel::{EventReceiver, ReceivedEvent};
5151+pub use channel::{EventReceiver, AckSender, AckId};
5152pub use client::TapClient;
5253pub use config::{LogLevel, TapConfig, TapConfigBuilder};
5354pub use error::Error;
···5960};
60616162/// A specialised Result type for tapped operations.
6262-pub type Result<T> = std::result::Result<T, Error>;
6363+pub type Result<T> = std::result::Result<T, Error>;
+1-1
tapped/tests/integration.rs
···149149 .await
150150 .expect("Failed to spawn tap");
151151152152- let _channel = handle.channel().await.expect("channel connection failed");
152152+ let (_receiver, _ack_sender) = handle.channel().await.expect("channel connection failed");
153153}
154154155155#[tokio::test]