auth dns over atproto
1use anyhow::{Context, Result};
2use futures_util::{SinkExt, StreamExt};
3use serde::Deserialize;
4use tokio_tungstenite::{connect_async, tungstenite::Message};
5
6/// A single event received from the TAP firehose websocket.
7#[derive(Debug, Deserialize)]
8pub struct TapEvent {
9 /// Monotonically increasing event sequence number.
10 pub id: u64,
11 /// Event kind, e.g. `"record"` or `"identity"`.
12 #[serde(rename = "type")]
13 pub event_type: String,
14 /// Present when event_type == "record".
15 pub record: Option<TapRecordEvent>,
16 /// Present when event_type == "identity".
17 pub identity: Option<TapIdentityEvent>,
18}
19
20/// Payload for a TAP record event (create, update, or delete).
21#[derive(Debug, Deserialize)]
22pub struct TapRecordEvent {
23 /// Whether this event comes from the live stream (true) or backfill (false).
24 pub live: bool,
25 /// DID of the repository that owns this record.
26 pub did: String,
27 /// Repository revision (commit CID) that produced this event.
28 pub rev: String,
29 /// Lexicon collection NSID, e.g. `"systems.kiri.zone"`.
30 pub collection: String,
31 /// Record key within the collection.
32 pub rkey: String,
33 /// The action performed on the record.
34 pub action: RecordAction,
35 /// The full record value; present on create and update, absent on delete.
36 pub record: Option<serde_json::Value>,
37 /// Content hash (CID) of the record; present on create and update.
38 pub cid: Option<String>,
39}
40
41/// The type of mutation performed on a record.
42#[derive(Debug, Deserialize, PartialEq)]
43#[serde(rename_all = "lowercase")]
44pub enum RecordAction {
45 /// A new record was created.
46 Create,
47 /// An existing record was updated.
48 Update,
49 /// A record was deleted.
50 Delete,
51}
52
53/// Payload for a TAP identity event (handle or status change).
54#[derive(Debug, Deserialize)]
55pub struct TapIdentityEvent {
56 /// DID of the identity that changed.
57 pub did: String,
58 /// Current handle for this identity.
59 pub handle: String,
60 /// Whether the account is currently active.
61 pub is_active: bool,
62 /// Account status string, e.g. `"active"`, `"deactivated"`.
63 pub status: String,
64}
65
66/// Persistent WebSocket consumer for the TAP firehose.
67///
68/// Connects to the relay, deserializes events, dispatches them to a handler,
69/// and sends acknowledgements. Automatically reconnects on failure.
70pub struct TapConsumer {
71 /// WebSocket URL of the TAP firehose endpoint.
72 url: String,
73 /// Whether to send ack messages after processing each event.
74 acks_enabled: bool,
75 /// Delay before reconnecting after a connection error.
76 reconnect_delay: std::time::Duration,
77}
78
79impl TapConsumer {
80 pub fn new(url: String, acks_enabled: bool, reconnect_delay_secs: u64) -> Self {
81 Self {
82 url,
83 acks_enabled,
84 reconnect_delay: std::time::Duration::from_secs(reconnect_delay_secs),
85 }
86 }
87
88 pub async fn run<F, Fut>(&self, handler: F) -> Result<()>
89 where
90 F: Fn(TapEvent) -> Fut + Send + Sync,
91 Fut: std::future::Future<Output = Result<()>> + Send,
92 {
93 loop {
94 match self.connect_and_consume(&handler).await {
95 Ok(()) => {
96 tracing::info!("tap connection closed cleanly, reconnecting...");
97 }
98 Err(e) => {
99 tracing::error!(
100 delay_secs = self.reconnect_delay.as_secs(),
101 "tap connection error: {e:#}, reconnecting..."
102 );
103 tokio::time::sleep(self.reconnect_delay).await;
104 }
105 }
106 }
107 }
108
109 async fn connect_and_consume<F, Fut>(&self, handler: &F) -> Result<()>
110 where
111 F: Fn(TapEvent) -> Fut + Send + Sync,
112 Fut: std::future::Future<Output = Result<()>> + Send,
113 {
114 let (ws, _) = connect_async(&self.url)
115 .await
116 .context("failed to connect to tap")?;
117
118 tracing::info!("connected to tap at {}", self.url);
119
120 let (mut sink, mut stream) = ws.split();
121
122 while let Some(msg) = stream.next().await {
123 let msg = msg.context("websocket read error")?;
124
125 let text = match msg {
126 Message::Text(t) => t,
127 Message::Ping(_) => continue,
128 Message::Pong(_) => continue,
129 Message::Close(_) => {
130 tracing::info!("tap sent close frame");
131 break;
132 }
133 _ => continue,
134 };
135
136 let event: TapEvent = match serde_json::from_str(&text) {
137 Ok(e) => e,
138 Err(e) => {
139 tracing::warn!("failed to deserialize tap event: {e}, raw: {text}");
140 continue;
141 }
142 };
143
144 let event_id = event.id;
145
146 if let Err(e) = handler(event).await {
147 tracing::error!("error handling tap event {event_id}: {e:#}");
148 }
149
150 if self.acks_enabled {
151 let ack = serde_json::json!({ "type": "ack", "id": event_id });
152 sink.send(Message::Text(ack.to_string().into()))
153 .await
154 .context("failed to send ack")?;
155 }
156 }
157
158 Ok(())
159 }
160}