jj workspaces over the network
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(server): add WebSocket events endpoint for web UI

+152
+152
crates/tandem-server/src/events.rs
··· 1 + use axum::{ 2 + extract::{ws::{Message, WebSocket, WebSocketUpgrade}, Path, State}, 3 + response::IntoResponse, 4 + }; 5 + use futures_util::{SinkExt, StreamExt}; 6 + use serde::Serialize; 7 + use tokio::sync::{broadcast, RwLock}; 8 + use std::collections::HashMap; 9 + use crate::AppState; 10 + 11 + /// Events sent to web UI clients 12 + #[derive(Debug, Clone, Serialize)] 13 + #[serde(tag = "type", rename_all = "snake_case")] 14 + pub enum Event { 15 + ChangeUpdated { 16 + change_id: String, 17 + record_id: String, 18 + }, 19 + BookmarkMoved { 20 + name: String, 21 + target: String, 22 + }, 23 + PresenceChanged { 24 + user_id: String, 25 + change_id: Option<String>, 26 + }, 27 + Connected { 28 + repo_id: String, 29 + }, 30 + } 31 + 32 + /// Manages event subscriptions for web UI clients 33 + pub struct EventManager { 34 + channels: RwLock<HashMap<String, broadcast::Sender<Event>>>, 35 + } 36 + 37 + impl EventManager { 38 + pub fn new() -> Self { 39 + Self { 40 + channels: RwLock::new(HashMap::new()), 41 + } 42 + } 43 + 44 + /// Subscribe to events for a repository 45 + pub async fn subscribe(&self, repo_id: &str) -> broadcast::Receiver<Event> { 46 + let mut channels = self.channels.write().await; 47 + 48 + if let Some(sender) = channels.get(repo_id) { 49 + sender.subscribe() 50 + } else { 51 + let (tx, rx) = broadcast::channel(100); 52 + channels.insert(repo_id.to_string(), tx); 53 + rx 54 + } 55 + } 56 + 57 + /// Broadcast an event to all subscribers of a repository 58 + pub async fn broadcast(&self, repo_id: &str, event: Event) { 59 + let channels = self.channels.read().await; 60 + if let Some(sender) = channels.get(repo_id) { 61 + let _ = sender.send(event); 62 + } 63 + } 64 + 65 + /// Emit change updated event 66 + pub async fn emit_change_updated(&self, repo_id: &str, change_id: &str, record_id: &str) { 67 + self.broadcast(repo_id, Event::ChangeUpdated { 68 + change_id: change_id.to_string(), 69 + record_id: record_id.to_string(), 70 + }).await; 71 + } 72 + 73 + /// Emit bookmark moved event 74 + pub async fn emit_bookmark_moved(&self, repo_id: &str, name: &str, target: &str) { 75 + self.broadcast(repo_id, Event::BookmarkMoved { 76 + name: name.to_string(), 77 + target: target.to_string(), 78 + }).await; 79 + } 80 + 81 + /// Emit presence changed event 82 + pub async fn emit_presence_changed(&self, repo_id: &str, user_id: &str, change_id: Option<&str>) { 83 + self.broadcast(repo_id, Event::PresenceChanged { 84 + user_id: user_id.to_string(), 85 + change_id: change_id.map(|s| s.to_string()), 86 + }).await; 87 + } 88 + } 89 + 90 + impl Default for EventManager { 91 + fn default() -> Self { 92 + Self::new() 93 + } 94 + } 95 + 96 + /// WebSocket handler for web UI events 97 + pub async fn events_handler( 98 + ws: WebSocketUpgrade, 99 + Path(repo_id): Path<String>, 100 + State(state): State<AppState>, 101 + ) -> impl IntoResponse { 102 + ws.on_upgrade(move |socket| handle_events(socket, repo_id, state)) 103 + } 104 + 105 + async fn handle_events(socket: WebSocket, repo_id: String, state: AppState) { 106 + let (mut sender, mut receiver) = socket.split(); 107 + 108 + // Subscribe to events for this repo 109 + let mut event_rx = state.events.subscribe(&repo_id).await; 110 + 111 + tracing::info!("Web UI client connected to events for repo {}", repo_id); 112 + 113 + // Send connected event 114 + let connected = Event::Connected { repo_id: repo_id.clone() }; 115 + if let Ok(json) = serde_json::to_string(&connected) { 116 + let _ = sender.send(Message::Text(json)).await; 117 + } 118 + 119 + loop { 120 + tokio::select! { 121 + // Forward events to client 122 + Ok(event) = event_rx.recv() => { 123 + if let Ok(json) = serde_json::to_string(&event) { 124 + if let Err(e) = sender.send(Message::Text(json)).await { 125 + tracing::error!("Failed to send event: {}", e); 126 + break; 127 + } 128 + } 129 + } 130 + // Handle incoming messages (ping/pong, close) 131 + Some(msg) = receiver.next() => { 132 + match msg { 133 + Ok(Message::Close(_)) => { 134 + tracing::info!("Web UI client disconnected from repo {}", repo_id); 135 + break; 136 + } 137 + Ok(Message::Ping(data)) => { 138 + if let Err(e) = sender.send(Message::Pong(data)).await { 139 + tracing::error!("Failed to send pong: {}", e); 140 + break; 141 + } 142 + } 143 + Err(e) => { 144 + tracing::error!("WebSocket error: {}", e); 145 + break; 146 + } 147 + _ => {} 148 + } 149 + } 150 + } 151 + } 152 + }