don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: replay events from db

Signed-off-by: tjh <did:plc:65gha4t3avpfpzmvpbwovss7>

+341 -48
+1 -1
crates/knot/Cargo.toml
··· 9 9 publish.workspace = true 10 10 11 11 [dependencies] 12 - atproto.workspace = true 12 + atproto = { workspace = true, features = ["time"] } 13 13 auth.workspace = true 14 14 identity.workspace = true 15 15 jetstream.workspace = true
+1
crates/knot/migrations/20251126224502_events.down.sql
··· 1 + DROP TABLE events;
+9
crates/knot/migrations/20251126224502_events.up.sql
··· 1 + CREATE TABLE events ( 2 + id serial PRIMARY KEY, 3 + collection text NOT NULL, 4 + rkey text NOT NULL, 5 + event jsonb NOT NULL, 6 + 7 + UNIQUE (collection, rkey), 8 + UNIQUE (event) 9 + );
+21 -8
crates/knot/src/model/knot_state.rs
··· 5 5 time::Duration, 6 6 }; 7 7 8 - use atproto::did::Did; 8 + use atproto::{did::Did, tid::Tid}; 9 9 use auth::{jwt, public_key}; 10 10 use bytes::Bytes; 11 11 use identity::{HttpClient, Resolver}; 12 12 use jetstream::JetstreamClient; 13 - use lexicon::com::atproto::repo::list_records; 13 + use lexicon::{com::atproto::repo::list_records, sh::tangled::git::RefUpdate}; 14 14 use rayon::{ThreadPool, ThreadPoolBuilder}; 15 + use serde::Serialize; 15 16 use time::OffsetDateTime; 16 17 use tokio_stream::StreamExt as _; 17 18 ··· 26 25 27 26 use super::config::KnotConfiguration; 28 27 29 - #[derive(Clone)] 28 + #[derive(Clone, Debug, Serialize)] 30 29 pub enum Event { 31 - RefUpdate(lexicon::sh::tangled::git::RefUpdate<'static>), 30 + RefUpdate(Arc<RefUpdate<'static>>), 31 + } 32 + 33 + impl Event { 34 + pub const fn collection(&self) -> &'static str { 35 + match self { 36 + Self::RefUpdate(_) => "sh.tangled.git.refUpdate", 37 + } 38 + } 32 39 } 33 40 34 41 #[derive(Debug)] ··· 62 53 /// Thread pool for running synchronous tasks. 63 54 pool: ThreadPool, 64 55 65 - events: tokio::sync::broadcast::Sender<Event>, 56 + events: tokio::sync::broadcast::Sender<(i32, Tid, Event)>, 66 57 67 58 /// Stores JWT claims to prevent re-use. 68 59 jwt_claims: Mutex<HashMap<Box<str>, jwt::Claims>>, ··· 90 81 .build() 91 82 .expect("Failed to build thread pool"); 92 83 93 - let (events, _) = tokio::sync::broadcast::channel(8); 84 + let (events, _) = tokio::sync::broadcast::channel(4); 94 85 95 86 let inner = Arc::new(Self { 96 87 config, ··· 150 141 &self.pool 151 142 } 152 143 153 - pub(crate) fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<Event> { 144 + pub(crate) fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<(i32, Tid, Event)> { 154 145 self.events.subscribe() 146 + } 147 + 148 + pub(crate) async fn send_event(&self, id: i32, rkey: Tid, event: Event) { 149 + self.events.send((id, rkey, event)).unwrap(); 155 150 } 156 151 157 152 /// Return a reference to the database shim. ··· 312 299 } 313 300 }; 314 301 315 - let rkey = self 302 + let (rkey, _) = self 316 303 .store() 317 304 .resolve_repository(&owner, repo.name()) 318 305 .await?
+68 -8
crates/knot/src/private.rs
··· 1 1 use core::fmt; 2 - use std::borrow::Cow; 2 + use std::{borrow::Cow, sync::Arc}; 3 3 4 - use atproto::Did; 4 + use atproto::{Did, tid::TidClock}; 5 5 use axum::{ 6 6 extract::{FromRequestParts, Path, State}, 7 7 http::{HeaderMap, StatusCode, request::Parts}, 8 8 response::IntoResponse, 9 9 }; 10 + use lexicon::sh::tangled::git::{Meta, RefUpdate}; 10 11 use serde::{Deserialize, Serialize}; 11 12 12 13 /// Environment variable containing one or more whitespace separated URLs for the internal API. ··· 34 33 pub const ENV_HEADER_PREFIX: &str = "X-Gordian"; 35 34 36 35 use crate::{ 37 - model::Knot, 36 + model::{Knot, errors}, 37 + public::xrpc::XrpcError, 38 38 types::{push_certificate::PushCertificate, repository_key::RepositoryKey}, 39 39 }; 40 + 41 + static TID_CLOCK: TidClock = TidClock::with_id(0); 40 42 41 43 /// Build a new router for the internal API. 42 44 #[rustfmt::skip] ··· 268 264 StatusCode::NO_CONTENT 269 265 } 270 266 271 - #[tracing::instrument(skip(body))] 267 + // #[tracing::instrument(skip(body))] 272 268 async fn hook_post_receive( 269 + State(state): State<Knot>, 273 270 Path(repo): Path<RepositoryKey>, 274 - headers: HeaderMap, 271 + GordianUserDid(user_did): GordianUserDid, 275 272 body: String, 276 - ) -> impl IntoResponse { 277 - tracing::debug!("{body}"); 278 - StatusCode::NO_CONTENT 273 + ) -> Result<impl IntoResponse, XrpcError> { 274 + let repository = state 275 + .open_repository_direct(&repo) 276 + .map_err(errors::RepoNotFound)? 277 + .to_thread_local(); 278 + 279 + let default_ref = repository 280 + .head()? 281 + .referent_name() 282 + .ok_or(errors::HeadDetached)? 283 + .to_string(); 284 + 285 + let (_, repo_name) = state 286 + .store() 287 + .resolve_repository(&repo.owner, repo.rkey()) 288 + .await 289 + .map_err(errors::RepoNotFound)? 290 + .ok_or(errors::RepoNotFound(""))?; 291 + 292 + for line in body.lines() { 293 + let mut parts = line.split_whitespace(); 294 + let (old_sha, new_sha, refname) = 295 + match (parts.next(), parts.next(), parts.next(), parts.next()) { 296 + (Some(old_sha), Some(new_sha), Some(refname), None) => (old_sha, new_sha, refname), 297 + _ => panic!(), 298 + }; 299 + 300 + let ref_update: RefUpdate<'static> = RefUpdate { 301 + r#ref: Cow::Owned(refname.into()), 302 + committer_did: Cow::Owned(user_did.as_ref().into()), 303 + repo_did: Cow::Owned(repo.owner.as_ref().into()), 304 + repo_name: Cow::Owned(repo_name.as_ref().into()), 305 + old_sha: Cow::Owned(old_sha.into()), 306 + new_sha: Cow::Owned(new_sha.into()), 307 + meta: Meta { 308 + is_default_ref: refname == default_ref, 309 + ..Default::default() 310 + }, 311 + }; 312 + 313 + let rkey = TID_CLOCK.next(); 314 + let id = state 315 + .store() 316 + .insert_event("sh.tangled.git.refUpdate", &rkey, &ref_update) 317 + .await 318 + .unwrap(); 319 + 320 + tracing::info!(?id); 321 + state 322 + .send_event( 323 + id, 324 + rkey, 325 + crate::model::knot_state::Event::RefUpdate(Arc::new(ref_update)), 326 + ) 327 + .await; 328 + } 329 + 330 + Ok(StatusCode::NO_CONTENT) 279 331 } 280 332 281 333 #[tracing::instrument(skip(body))]
+88 -18
crates/knot/src/public/events.rs
··· 2 2 3 3 use axum::{ 4 4 extract::{ 5 - State, WebSocketUpgrade, 5 + Query, State, WebSocketUpgrade, 6 6 ws::{Message, WebSocket}, 7 7 }, 8 + http::StatusCode, 8 9 response::IntoResponse, 9 10 }; 11 + use futures_util::{SinkExt as _, StreamExt as _, TryStreamExt as _}; 12 + use serde::{Deserialize, Serialize}; 13 + use time::OffsetDateTime; 10 14 use tokio::time::Instant; 11 15 12 - use crate::model::{Knot, knot_state::Event}; 16 + use crate::model::Knot; 17 + 18 + use super::xrpc::XrpcError; 13 19 14 20 const KEEP_ALIVE: Duration = Duration::from_secs(45); 15 21 16 - pub async fn handler(State(state): State<Knot>, ws: WebSocketUpgrade) -> impl IntoResponse { 17 - ws.on_upgrade(|socket| handle_socket(state, socket)) 22 + #[derive(Serialize)] 23 + struct EventWrapper<'a, T> { 24 + nsid: &'a str, 25 + rkey: &'a str, 26 + event: &'a T, 18 27 } 19 28 20 - async fn handle_socket(state: Knot, mut socket: WebSocket) { 29 + #[derive(Deserialize)] 30 + pub struct EventsParameters { 31 + /// Nanoseconds from UNIX epoch. 32 + pub cursor: Option<i64>, 33 + } 34 + 35 + pub async fn handler( 36 + State(state): State<Knot>, 37 + Query(parameters): Query<EventsParameters>, 38 + ws: WebSocketUpgrade, 39 + ) -> Result<impl IntoResponse, XrpcError> { 40 + let cursor = parameters 41 + .cursor 42 + .and_then(|nanos| match nanos { 43 + 0 => None, 44 + _ => Some( 45 + OffsetDateTime::from_unix_timestamp_nanos(i128::from(nanos)).map_err(|error| { 46 + XrpcError::new(StatusCode::BAD_REQUEST, "InvalidCursor", error.to_string()) 47 + }), 48 + ), 49 + }) 50 + .transpose()?; 51 + 52 + Ok(ws.on_upgrade(move |socket| handle_socket(state, cursor, socket))) 53 + } 54 + 55 + async fn handle_socket(state: Knot, start_ts: Option<OffsetDateTime>, socket: WebSocket) { 21 56 let mut keep_alive = tokio::time::interval(KEEP_ALIVE); 57 + keep_alive.tick().await; 58 + 22 59 let mut events = state.subscribe_events(); 23 60 let start = Instant::now(); 24 61 25 - tracing::info!("new events subscriber"); 62 + let mut cursor = 0; 63 + let start_ts = start_ts.unwrap_or(OffsetDateTime::now_utc()); 64 + tracing::debug!(?start_ts, "new events listener"); 65 + 66 + let (mut sender, mut receiver) = socket.split(); 67 + 68 + let mut past_events = state.store().get_events(start_ts); 69 + while let Some(Ok(db_event)) = past_events.next().await { 70 + cursor = db_event.id; 71 + let wrapper = EventWrapper { 72 + nsid: &db_event.collection, 73 + rkey: &db_event.rkey.to_string(), 74 + event: &db_event.event, 75 + }; 76 + 77 + let serialized = serde_json::to_string(&wrapper).unwrap(); 78 + if let Err(error) = sender.send(Message::text(serialized)).await { 79 + tracing::error!(?error, "failed to send event"); 80 + return; 81 + } 82 + } 83 + 26 84 loop { 27 - let event = tokio::select! { 85 + let (event_id, rkey, event) = tokio::select! { 28 86 now = keep_alive.tick() => { 29 87 let bytes = (now.duration_since(start)).as_secs().to_string().into(); 30 - if let Err(error) = socket.send(Message::Ping(bytes)).await { 88 + if let Err(error) = sender.send(Message::Ping(bytes)).await { 31 89 tracing::error!(?error, "failed to send ping"); 32 90 break; 33 91 } 34 92 continue; 35 93 } 36 - Ok(event) = events.recv() => { 37 - event 94 + Ok(Some(message)) = receiver.try_next() => { 95 + tracing::debug!(?message); 96 + continue; 38 97 } 98 + Ok(event) = events.recv() => event, 99 + else => break, 39 100 }; 40 101 41 - match event { 42 - Event::RefUpdate(ref_update) => { 43 - let bytes = serde_json::to_string(&ref_update).unwrap(); 44 - if let Err(error) = socket.send(Message::Text(bytes.into())).await { 45 - tracing::error!(?error, "failed to send ref update"); 46 - break; 47 - } 48 - } 102 + if event_id < cursor { 103 + tracing::debug!(?event_id, "skipping event, client has already seen"); 104 + continue; 49 105 } 106 + 107 + let wrapper = EventWrapper { 108 + nsid: event.collection(), 109 + rkey: &rkey.to_string(), 110 + event: &event, 111 + }; 112 + 113 + let serialized = serde_json::to_string(&wrapper).unwrap(); 114 + if let Err(error) = sender.send(Message::text(serialized)).await { 115 + tracing::error!(?error, "failed to send event"); 116 + return; 117 + } 118 + 119 + cursor = event_id; 50 120 } 51 121 }
+41 -6
crates/knot/src/services/database.rs
··· 1 1 mod pg_impl; 2 2 pub mod types; 3 3 4 - use atproto::Did; 4 + use atproto::{Did, tid::Tid}; 5 5 use core::fmt; 6 6 use futures_util::{FutureExt, StreamExt, TryStreamExt as _, future::BoxFuture, stream::BoxStream}; 7 7 use jetstream::{Commit, CommitEvent}; ··· 9 9 com::atproto::repo::list_records::Record, 10 10 sh::tangled::{PublicKey, repo::Repo}, 11 11 }; 12 + use serde::Serialize; 12 13 use std::sync::Arc; 13 14 use time::OffsetDateTime; 14 - use types::InsertRepositoryResult; 15 + use types::{EventRow, InsertRepositoryResult}; 15 16 16 17 pub use pg_impl::PgDatabase; 17 18 ··· 79 78 &'d self, 80 79 did: &'a Did, 81 80 name_or_rkey: &'a str, 82 - ) -> BoxFuture<'a, Result<Option<Box<str>>, Self::Error>>; 81 + ) -> BoxFuture<'a, Result<Option<(Box<str>, Box<str>)>, Self::Error>>; 83 82 84 83 fn repository_members<'d: 'a, 'a>( 85 84 &'d self, ··· 91 90 &'d self, 92 91 instance_name: &'a str, 93 92 ) -> BoxStream<'a, Result<Box<Did>, Self::Error>>; 93 + 94 + fn insert_event<'a: 'b, 'b>( 95 + &'a self, 96 + collection: &'b str, 97 + rkey: &'b Tid, 98 + event: &'b serde_json::Value, 99 + ) -> BoxFuture<'b, Result<i32, Self::Error>>; 100 + 101 + fn get_events<'a: 'b, 'b>( 102 + &'a self, 103 + from: OffsetDateTime, 104 + ) -> BoxStream<'b, Result<EventRow, Self::Error>>; 94 105 } 95 106 96 107 #[derive(Debug, thiserror::Error)] ··· 285 272 &self, 286 273 did: &Did, 287 274 name_or_rkey: &str, 288 - ) -> Result<Option<Box<str>>, DataStoreError> { 289 - let rkey = self.inner.resolve_repository(did, name_or_rkey).await?; 290 - Ok(rkey) 275 + ) -> Result<Option<(Box<str>, Box<str>)>, DataStoreError> { 276 + let result = self.inner.resolve_repository(did, name_or_rkey).await?; 277 + Ok(result) 291 278 } 292 279 293 280 pub async fn is_repository_member( ··· 307 294 .knot_members(instance_name) 308 295 .any(|member_did| async move { member_did.is_ok_and(|value| value == did) }) 309 296 .await 297 + } 298 + 299 + pub async fn insert_event<T>( 300 + &self, 301 + collection: &str, 302 + rkey: &Tid, 303 + event: &T, 304 + ) -> Result<i32, DataStoreError> 305 + where 306 + T: Serialize, 307 + { 308 + let serialized = serde_json::to_value(event).unwrap(); 309 + let id = self 310 + .inner 311 + .insert_event(collection, rkey, &serialized) 312 + .await?; 313 + 314 + Ok(id) 315 + } 316 + 317 + pub fn get_events(&self, from: OffsetDateTime) -> BoxStream<Result<EventRow, DataStoreError>> { 318 + self.inner.get_events(from) 310 319 } 311 320 }
+55 -5
crates/knot/src/services/database/pg_impl.rs
··· 1 1 use super::{ 2 2 DataStoreError, Database, 3 - types::{InsertRepositoryResult, PublicKeyRecord}, 3 + types::{EventRow, InsertRepositoryResult, PublicKeyRecord}, 4 4 }; 5 - use atproto::Did; 5 + use atproto::{Did, tid::Tid}; 6 6 use futures_util::{FutureExt as _, StreamExt, TryStreamExt, future::BoxFuture, stream::BoxStream}; 7 7 use jetstream::CommitEvent; 8 8 use lexicon::sh::tangled::repo::Repo; ··· 284 284 &'d self, 285 285 did: &'a Did, 286 286 name_or_rkey: &'a str, 287 - ) -> BoxFuture<'a, Result<Option<Box<str>>, Self::Error>> { 287 + ) -> BoxFuture<'a, Result<Option<(Box<str>, Box<str>)>, Self::Error>> { 288 288 async move { 289 289 #[derive(sqlx::FromRow)] 290 290 struct Record { 291 291 rkey: Box<str>, 292 + name: Box<str>, 292 293 } 293 294 294 295 let result: Option<Record> = sqlx::query_as!( 295 296 Record, 296 - "SELECT rkey FROM repository WHERE did = $1 AND (rkey = $2 OR name = $2)", 297 + "SELECT rkey, name FROM repository WHERE did = $1 AND (rkey = $2 OR name = $2)", 297 298 did.as_str(), 298 299 name_or_rkey 299 300 ) 300 301 .fetch_optional(&self.pool) 301 302 .await?; 302 303 303 - Ok(result.map(|record| record.rkey)) 304 + Ok(result.map(|record| (record.rkey, record.name))) 304 305 } 305 306 .boxed() 306 307 } ··· 348 347 .map(|record| { 349 348 let did = record?.member_did.parse()?; 350 349 Ok(did) 350 + }) 351 + .boxed() 352 + } 353 + 354 + fn insert_event<'a: 'b, 'b>( 355 + &'a self, 356 + collection: &'b str, 357 + rkey: &'b atproto::tid::Tid, 358 + event: &'b serde_json::Value, 359 + ) -> BoxFuture<'b, Result<i32, Self::Error>> { 360 + #[derive(sqlx::FromRow)] 361 + struct Row { 362 + id: i32, 363 + } 364 + 365 + async move { 366 + let Row { id } = sqlx::query_as!( 367 + Row, 368 + "INSERT INTO events (collection, rkey, event) VALUES ($1, $2, $3) RETURNING id", 369 + collection, 370 + rkey.to_string(), 371 + event 372 + ) 373 + .fetch_one(&self.pool) 374 + .await?; 375 + 376 + Ok(id) 377 + } 378 + .boxed() 379 + } 380 + 381 + fn get_events<'a: 'b, 'b>( 382 + &'a self, 383 + from: OffsetDateTime, 384 + ) -> BoxStream<'b, Result<EventRow, Self::Error>> { 385 + let rkey = Tid::from_datetime(from, 0); 386 + sqlx::query!( 387 + "SELECT id, collection, rkey, event FROM events WHERE rkey >= $1 ORDER BY id", 388 + rkey.to_string() 389 + ) 390 + .fetch(&self.pool) 391 + .map(|record| { 392 + let record = record?; 393 + Ok(EventRow { 394 + id: record.id, 395 + collection: record.collection, 396 + rkey: record.rkey.parse().unwrap(), 397 + event: record.event, 398 + }) 351 399 }) 352 400 .boxed() 353 401 }
+8
crates/knot/src/services/database/types.rs
··· 111 111 pub old_jetstream_at: Option<OffsetDateTime>, 112 112 pub new_jetstream_at: Option<OffsetDateTime>, 113 113 } 114 + 115 + #[derive(Debug)] 116 + pub struct EventRow { 117 + pub id: i32, 118 + pub collection: String, 119 + pub rkey: atproto::tid::Tid, 120 + pub event: serde_json::Value, 121 + }
+47
crates/lexicon/src/sh/tangled.rs
··· 10 10 pub mod spindle; 11 11 pub mod string; 12 12 13 + use atproto::Did; 13 14 use serde::{Deserialize, Serialize}; 14 15 use std::borrow::Cow; 15 16 use time::OffsetDateTime; ··· 46 45 /// Key upload timestamp 47 46 #[serde(alias = "created", with = "time::serde::rfc3339")] 48 47 pub created_at: OffsetDateTime, 48 + } 49 + 50 + #[derive(Debug, Hash, PartialEq, Eq, Deserialize, Serialize)] 51 + #[serde(rename_all = "camelCase")] 52 + pub struct Pipeline<'a> { 53 + trigger_metadata: TriggerMetadata<'a>, 54 + } 55 + 56 + #[derive(Debug, Hash, PartialEq, Eq, Deserialize, Serialize)] 57 + #[serde(rename_all = "camelCase")] 58 + pub struct TriggerMetadata<'a> { 59 + pub kind: Cow<'a, str>, 60 + } 61 + 62 + #[derive(Debug, Hash, PartialEq, Eq, Deserialize, Serialize)] 63 + #[serde(rename_all = "camelCase")] 64 + pub struct TriggerRepo<'a> { 65 + #[serde(borrow)] 66 + pub knot: Cow<'a, str>, 67 + 68 + pub did: Cow<'a, Did>, 69 + 70 + pub repo: Cow<'a, str>, 71 + 72 + pub default_branch: Cow<'a, str>, 73 + } 74 + 75 + #[derive(Debug, Hash, PartialEq, Eq, Deserialize, Serialize)] 76 + #[serde(rename_all = "camelCase")] 77 + pub struct Workflow<'a> { 78 + #[serde(borrow)] 79 + pub name: Cow<'a, str>, 80 + 81 + pub engine: Cow<'a, str>, 82 + 83 + pub clone: CloneOptions, 84 + 85 + pub raw: Cow<'a, str>, 86 + } 87 + 88 + #[derive(Debug, Hash, PartialEq, Eq, Deserialize, Serialize)] 89 + #[serde(rename_all = "camelCase")] 90 + pub struct CloneOptions { 91 + pub skip: bool, 92 + pub depth: u64, 93 + pub submodules: bool, 49 94 }
+2 -2
crates/lexicon/src/sh/tangled/git.rs
··· 45 45 #[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] 46 46 #[serde(rename_all = "camelCase")] 47 47 pub struct LanguageBreakdown<'a> { 48 - #[serde(borrow, default, skip_serializing_if = "Vec::is_empty")] 48 + #[serde(borrow, default)] 49 49 pub inputs: Vec<Language<'a>>, 50 50 } 51 51 ··· 60 60 #[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] 61 61 #[serde(rename_all = "camelCase")] 62 62 pub struct CommitCountBreakdown<'a> { 63 - #[serde(borrow, default, skip_serializing_if = "Vec::is_empty")] 63 + #[serde(borrow, default)] 64 64 pub by_email: Vec<CommitCount<'a>>, 65 65 } 66 66