Tap drinker
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: store raw events from Tap

Signed-off-by: tjh <x@tjh.dev>

tjh 918e1ed4 09150613

+80 -6
+1
migrations/20260112144404_event.down.sql
··· 1 + DROP TABLE event;
+6
migrations/20260112144404_event.up.sql
··· 1 + CREATE TABLE event ( 2 + id bigserial NOT NULL, 3 + data jsonb NOT NULL, 4 + 5 + PRIMARY KEY (id) 6 + );
+18
src/main.rs
··· 111 111 while let Some(Some((span, event, ack))) = shutdown.run_until_cancelled(channel.recv()).await { 112 112 async { 113 113 let mut transaction = pool.begin().await?; 114 + 115 + save_event(&event, &mut transaction).await?; 114 116 match event { 115 117 TapEvent::Record(record) => { 116 118 let (record, parsed_record) = handle_record(record, &mut transaction).await?; ··· 137 139 } 138 140 139 141 tracing::info!("complete"); 142 + Ok(()) 143 + } 144 + 145 + async fn save_event( 146 + event: &TapEvent, 147 + transaction: &mut sqlx::Transaction<'static, sqlx::Postgres>, 148 + ) -> anyhow::Result<()> { 149 + let json = serde_json::to_value(&event)?; 150 + sqlx::query!( 151 + "INSERT INTO event (id, data) VALUES ($1, $2)", 152 + i64::try_from(event.id()).expect("event ID should not exceed i64::MAX"), 153 + json 154 + ) 155 + .execute(&mut **transaction) 156 + .await?; 157 + 140 158 Ok(()) 141 159 } 142 160
+55 -6
src/tap/types.rs
··· 1 1 use serde::{Deserialize, Serialize}; 2 2 use serde_json::value::RawValue; 3 3 4 - #[derive(Debug, Hash, Deserialize, Serialize)] 4 + #[derive(Clone, Debug, Hash, Deserialize, Serialize)] 5 5 #[serde(rename_all = "lowercase")] 6 6 pub enum RecordAction { 7 7 Create, ··· 21 21 live: bool, 22 22 } 23 23 24 - #[derive(Debug, Deserialize, Serialize)] 24 + #[derive(Clone, Debug, Deserialize, Serialize)] 25 25 pub struct RecordEvent { 26 26 pub id: u64, 27 27 pub did: String, ··· 34 34 pub live: bool, 35 35 } 36 36 37 - #[derive(Debug, Deserialize, Serialize, sqlx::Type)] 37 + #[derive(Clone, Debug, Deserialize, Serialize, sqlx::Type)] 38 38 #[serde(rename_all = "lowercase")] 39 39 #[sqlx(type_name = "identity_status", rename_all = "lowercase")] 40 40 pub enum IdentityStatus { ··· 53 53 status: IdentityStatus, 54 54 } 55 55 56 - #[derive(Debug, Deserialize, Serialize)] 56 + #[derive(Clone, Debug, Deserialize, Serialize)] 57 57 pub struct IdentityEvent { 58 58 pub id: u64, 59 59 pub did: String, ··· 79 79 identity: Option<InnerIdentityEvent>, 80 80 } 81 81 82 - #[derive(Debug, Deserialize, Serialize)] 83 - #[serde(try_from = "InnerEvent")] 82 + #[derive(Clone, Debug, Deserialize, Serialize)] 83 + #[serde(try_from = "InnerEvent", into = "InnerEvent")] 84 84 pub enum TapEvent { 85 85 Record(RecordEvent), 86 86 Identity(IdentityEvent), ··· 154 154 status, 155 155 })) 156 156 } 157 + } 158 + } 159 + } 160 + 161 + impl From<TapEvent> for InnerEvent { 162 + fn from(value: TapEvent) -> Self { 163 + match value { 164 + TapEvent::Record(RecordEvent { 165 + id, 166 + did, 167 + rev, 168 + collection, 169 + rkey, 170 + action, 171 + record, 172 + cid, 173 + live, 174 + }) => Self { 175 + id, 176 + r#type: EventType::Record, 177 + record: Some(InnerRecordEvent { 178 + did, 179 + rev, 180 + collection, 181 + rkey, 182 + action, 183 + record, 184 + cid, 185 + live, 186 + }), 187 + identity: None, 188 + }, 189 + TapEvent::Identity(IdentityEvent { 190 + id, 191 + did, 192 + handle, 193 + is_active, 194 + status, 195 + }) => Self { 196 + id, 197 + r#type: EventType::Identity, 198 + record: None, 199 + identity: Some(InnerIdentityEvent { 200 + did, 201 + handle, 202 + is_active, 203 + status, 204 + }), 205 + }, 157 206 } 158 207 } 159 208 }