···25252626use super::config::KnotConfiguration;27272828+#[derive(Clone)]2929+pub enum Event {3030+ RefUpdate(lexicon::sh::tangled::git::RefUpdate<'static>),3131+}3232+2833#[derive(Debug)]2934pub struct KnotState {3035 config: KnotConfiguration,···52475348 /// Thread pool for running synchronous tasks.5449 pool: ThreadPool,5050+5151+ events: tokio::sync::broadcast::Sender<Event>,55525653 /// Stores JWT claims to prevent re-use.5754 jwt_claims: Mutex<HashMap<Box<str>, jwt::Claims>>,···8174 .build()8275 .expect("Failed to build thread pool");83767777+ let (events, _) = tokio::sync::broadcast::channel(8);7878+8479 let inner = Arc::new(Self {8580 config,8681 public_http,···9081 jetstream,9182 store: database,9283 pool,8484+ events,9385 jwt_claims: Default::default(),9486 repo_cache: Default::default(),9587 push_seed: Default::default(),···139129 #[inline]140130 pub(crate) fn pool(&self) -> &ThreadPool {141131 &self.pool132132+ }133133+134134+ pub(crate) fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<Event> {135135+ self.events.subscribe()142136 }143137144138 /// Return a reference to the database shim.
+2
crates/knot/src/public.rs
···11+pub mod events;12pub mod git;23pub mod xrpc;34···98 .without_v07_checks()109 .nest("/xrpc", xrpc::router())1110 .nest("/{owner}/{name}", serve_git::router())1111+ .route("/events", axum::routing::get(events::handler))1212}
+51
crates/knot/src/public/events.rs
···11+use std::time::Duration;22+33+use axum::{44+ extract::{55+ State, WebSocketUpgrade,66+ ws::{Message, WebSocket},77+ },88+ response::IntoResponse,99+};1010+use tokio::time::Instant;1111+1212+use crate::model::{Knot, knot_state::Event};1313+1414+const KEEP_ALIVE: Duration = Duration::from_secs(45);1515+1616+pub async fn handler(State(state): State<Knot>, ws: WebSocketUpgrade) -> impl IntoResponse {1717+ ws.on_upgrade(|socket| handle_socket(state, socket))1818+}1919+2020+async fn handle_socket(state: Knot, mut socket: WebSocket) {2121+ let mut keep_alive = tokio::time::interval(KEEP_ALIVE);2222+ let mut events = state.subscribe_events();2323+ let start = Instant::now();2424+2525+ tracing::info!("new events subscriber");2626+ loop {2727+ let event = tokio::select! {2828+ now = keep_alive.tick() => {2929+ let bytes = (now.duration_since(start)).as_secs().to_string().into();3030+ if let Err(error) = socket.send(Message::Ping(bytes)).await {3131+ tracing::error!(?error, "failed to send ping");3232+ break;3333+ }3434+ continue;3535+ }3636+ Ok(event) = events.recv() => {3737+ event3838+ }3939+ };4040+4141+ match event {4242+ Event::RefUpdate(ref_update) => {4343+ let bytes = serde_json::to_string(&ref_update).unwrap();4444+ if let Err(error) = socket.send(Message::Text(bytes.into())).await {4545+ tracing::error!(?error, "failed to send ref update");4646+ break;4747+ }4848+ }4949+ }5050+ }5151+}