auth dns over atproto
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 160 lines 5.3 kB view raw
1use anyhow::{Context, Result}; 2use futures_util::{SinkExt, StreamExt}; 3use serde::Deserialize; 4use tokio_tungstenite::{connect_async, tungstenite::Message}; 5 6/// A single event received from the TAP firehose websocket. 7#[derive(Debug, Deserialize)] 8pub struct TapEvent { 9 /// Monotonically increasing event sequence number. 10 pub id: u64, 11 /// Event kind, e.g. `"record"` or `"identity"`. 12 #[serde(rename = "type")] 13 pub event_type: String, 14 /// Present when event_type == "record". 15 pub record: Option<TapRecordEvent>, 16 /// Present when event_type == "identity". 17 pub identity: Option<TapIdentityEvent>, 18} 19 20/// Payload for a TAP record event (create, update, or delete). 21#[derive(Debug, Deserialize)] 22pub struct TapRecordEvent { 23 /// Whether this event comes from the live stream (true) or backfill (false). 24 pub live: bool, 25 /// DID of the repository that owns this record. 26 pub did: String, 27 /// Repository revision (commit CID) that produced this event. 28 pub rev: String, 29 /// Lexicon collection NSID, e.g. `"systems.kiri.zone"`. 30 pub collection: String, 31 /// Record key within the collection. 32 pub rkey: String, 33 /// The action performed on the record. 34 pub action: RecordAction, 35 /// The full record value; present on create and update, absent on delete. 36 pub record: Option<serde_json::Value>, 37 /// Content hash (CID) of the record; present on create and update. 38 pub cid: Option<String>, 39} 40 41/// The type of mutation performed on a record. 42#[derive(Debug, Deserialize, PartialEq)] 43#[serde(rename_all = "lowercase")] 44pub enum RecordAction { 45 /// A new record was created. 46 Create, 47 /// An existing record was updated. 48 Update, 49 /// A record was deleted. 50 Delete, 51} 52 53/// Payload for a TAP identity event (handle or status change). 54#[derive(Debug, Deserialize)] 55pub struct TapIdentityEvent { 56 /// DID of the identity that changed. 57 pub did: String, 58 /// Current handle for this identity. 59 pub handle: String, 60 /// Whether the account is currently active. 61 pub is_active: bool, 62 /// Account status string, e.g. `"active"`, `"deactivated"`. 63 pub status: String, 64} 65 66/// Persistent WebSocket consumer for the TAP firehose. 67/// 68/// Connects to the relay, deserializes events, dispatches them to a handler, 69/// and sends acknowledgements. Automatically reconnects on failure. 70pub struct TapConsumer { 71 /// WebSocket URL of the TAP firehose endpoint. 72 url: String, 73 /// Whether to send ack messages after processing each event. 74 acks_enabled: bool, 75 /// Delay before reconnecting after a connection error. 76 reconnect_delay: std::time::Duration, 77} 78 79impl TapConsumer { 80 pub fn new(url: String, acks_enabled: bool, reconnect_delay_secs: u64) -> Self { 81 Self { 82 url, 83 acks_enabled, 84 reconnect_delay: std::time::Duration::from_secs(reconnect_delay_secs), 85 } 86 } 87 88 pub async fn run<F, Fut>(&self, handler: F) -> Result<()> 89 where 90 F: Fn(TapEvent) -> Fut + Send + Sync, 91 Fut: std::future::Future<Output = Result<()>> + Send, 92 { 93 loop { 94 match self.connect_and_consume(&handler).await { 95 Ok(()) => { 96 tracing::info!("tap connection closed cleanly, reconnecting..."); 97 } 98 Err(e) => { 99 tracing::error!( 100 delay_secs = self.reconnect_delay.as_secs(), 101 "tap connection error: {e:#}, reconnecting..." 102 ); 103 tokio::time::sleep(self.reconnect_delay).await; 104 } 105 } 106 } 107 } 108 109 async fn connect_and_consume<F, Fut>(&self, handler: &F) -> Result<()> 110 where 111 F: Fn(TapEvent) -> Fut + Send + Sync, 112 Fut: std::future::Future<Output = Result<()>> + Send, 113 { 114 let (ws, _) = connect_async(&self.url) 115 .await 116 .context("failed to connect to tap")?; 117 118 tracing::info!("connected to tap at {}", self.url); 119 120 let (mut sink, mut stream) = ws.split(); 121 122 while let Some(msg) = stream.next().await { 123 let msg = msg.context("websocket read error")?; 124 125 let text = match msg { 126 Message::Text(t) => t, 127 Message::Ping(_) => continue, 128 Message::Pong(_) => continue, 129 Message::Close(_) => { 130 tracing::info!("tap sent close frame"); 131 break; 132 } 133 _ => continue, 134 }; 135 136 let event: TapEvent = match serde_json::from_str(&text) { 137 Ok(e) => e, 138 Err(e) => { 139 tracing::warn!("failed to deserialize tap event: {e}, raw: {text}"); 140 continue; 141 } 142 }; 143 144 let event_id = event.id; 145 146 if let Err(e) = handler(event).await { 147 tracing::error!("error handling tap event {event_id}: {e:#}"); 148 } 149 150 if self.acks_enabled { 151 let ack = serde_json::json!({ "type": "ack", "id": event_id }); 152 sink.send(Message::Text(ack.to_string().into())) 153 .await 154 .context("failed to send ack")?; 155 } 156 } 157 158 Ok(()) 159 } 160}