A simple to-do app focused on tasks that can be completed within a specific time span.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

sse for tags

ToBinio 2b486091 8cb23d47

+119 -26
+1
Cargo.lock
··· 3845 3845 "futures-core", 3846 3846 "pin-project-lite", 3847 3847 "tokio", 3848 + "tokio-util", 3848 3849 ] 3849 3850 3850 3851 [[package]]
+1 -1
api/Cargo.toml
··· 26 26 serde = { version = "1.0.228", features = ["derive"] } 27 27 serde_json = "1.0.149" 28 28 tracing = "0.1.44" 29 + tokio-stream = { version = "0.1.18", features = ["sync"] } 29 30 30 31 [dev-dependencies] 31 32 serial_test = "3.4.0" 32 33 reqwest = { version = "0.13.2", features = ["json", "stream"] } 33 - tokio-stream = "0.1.18"
+8 -1
api/src/main.rs
··· 7 7 use tower_http::trace::{self, TraceLayer}; 8 8 use tracing::{Level, info}; 9 9 10 + use crate::services::events::EventService; 11 + 10 12 mod auth; 11 13 mod entities; 12 14 mod routes; 15 + mod services; 13 16 14 17 #[derive(Clone)] 15 18 struct AppState { 16 19 db_connection: DatabaseConnection, 20 + event_service: EventService, 17 21 } 18 22 19 23 #[tokio::main] ··· 27 31 28 32 let db_connection = init_db(&db_url).await?; 29 33 30 - let state = AppState { db_connection }; 34 + let state = AppState { 35 + db_connection, 36 + event_service: EventService::new(), 37 + }; 31 38 32 39 let app = Router::new() 33 40 .route("/reset_db", post(reset_db))
+81 -24
api/src/routes/tags.rs
··· 1 + use std::convert::Infallible; 2 + use std::time::Duration; 3 + 1 4 use axum::extract::Path; 5 + use axum::response::Sse; 6 + use axum::response::sse::Event; 2 7 use axum::routing::{delete, post}; 3 8 use axum::{Json, Router, extract::State, routing::get}; 9 + use futures::Stream; 4 10 use http::StatusCode; 5 11 use sea_orm::ActiveValue::Set; 6 12 use sea_orm::{ColumnTrait, EntityTrait, ModelTrait, QueryFilter}; 13 + use tokio_stream::StreamExt; 14 + use tokio_stream::wrappers::BroadcastStream; 7 15 use tracing::warn; 8 16 use types::{HexColor, Tag as TagModel}; 9 17 use uuid::Uuid; 10 18 19 + use crate::services::events::{self}; 11 20 use crate::{AppState, auth::User}; 12 21 13 22 use crate::entities::{prelude::*, tag}; ··· 17 26 .route("/", get(get_all_tags)) 18 27 .route("/", post(add_tag)) 19 28 .route("/{tag_uuid}", delete(delete_tag)) 29 + .route("/sse", get(sse_handler)) 20 30 .with_state(state) 21 31 } 22 32 ··· 50 60 (StatusCode::INTERNAL_SERVER_ERROR, "failed to add tag") 51 61 })?; 52 62 53 - TagModel { 54 - uuid: existing_tag.id, 55 - name: existing_tag.name, 56 - color: HexColor::from_str(&existing_tag.color).unwrap(), 57 - } 63 + existing_tag.into() 58 64 } 59 65 None => { 60 66 let new_tag = Tag::insert(tag) ··· 65 71 (StatusCode::INTERNAL_SERVER_ERROR, "failed to add tag") 66 72 })?; 67 73 68 - TagModel { 69 - uuid: new_tag.id, 70 - name: new_tag.name, 71 - color: HexColor::from_str(&new_tag.color).unwrap(), 72 - } 74 + new_tag.into() 73 75 } 74 76 }; 75 77 78 + state.event_service.broadcast(events::Event::Tag(user.uuid)); 76 79 Ok(Json(new_tag)) 77 80 } 78 81 ··· 89 92 (StatusCode::INTERNAL_SERVER_ERROR, "failed to fetch tags") 90 93 })?; 91 94 92 - Ok(Json( 93 - tags.into_iter() 94 - .map(|tag| TagModel { 95 - uuid: tag.id, 96 - name: tag.name, 97 - color: HexColor::from_str(&tag.color).unwrap(), 98 - }) 99 - .collect(), 100 - )) 95 + state.event_service.broadcast(events::Event::Tag(user.uuid)); 96 + Ok(Json(tags.into_iter().map(|tag| tag.into()).collect())) 101 97 } 102 98 103 99 async fn delete_tag( ··· 118 114 return Err((StatusCode::NOT_FOUND, "tag not found")); 119 115 }; 120 116 121 - let deleted_tag = TagModel { 122 - uuid: tag.id, 123 - name: tag.name.to_string(), 124 - color: HexColor::from_str(&tag.color).unwrap(), 125 - }; 117 + let deleted_tag = TagModel::from(&tag); 126 118 127 119 tag.delete(&state.db_connection) 128 120 .await 129 121 .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "failed to delete tag"))?; 130 122 123 + state.event_service.broadcast(events::Event::Tag(user.uuid)); 131 124 Ok(Json(deleted_tag)) 132 125 } 126 + 127 + async fn sse_handler( 128 + state: State<AppState>, 129 + user: User, 130 + ) -> Sse<impl Stream<Item = Result<Event, Infallible>>> { 131 + let stream = BroadcastStream::new(state.event_service.subscribe()) 132 + .filter(move |event| { 133 + let Ok(event) = event else { 134 + return false; 135 + }; 136 + 137 + match event { 138 + events::Event::Tag(user_uuid) => user_uuid == &user.uuid, 139 + } 140 + }) 141 + .then(move |_| { 142 + let db = state.db_connection.clone(); 143 + 144 + async move { 145 + Tag::find() 146 + .filter(tag::Column::OwnerId.eq(user.uuid)) 147 + .all(&db) 148 + .await 149 + .map_err(|e| { 150 + warn!("failed to fetch tag: {}", e); 151 + "failed to fetch tags" 152 + }) 153 + .map(|tags| { 154 + tags.into_iter() 155 + .map(|tag| TagModel::from(tag)) 156 + .collect::<Vec<_>>() 157 + }) 158 + .map(|tags| Event::default().data(&serde_json::to_string(&tags).unwrap())) 159 + .unwrap() 160 + } 161 + }) 162 + .map(Ok); 163 + 164 + Sse::new(stream).keep_alive( 165 + axum::response::sse::KeepAlive::new() 166 + .interval(Duration::from_secs(1)) 167 + .text("keep-alive-text"), 168 + ) 169 + } 170 + 171 + impl From<tag::Model> for TagModel { 172 + fn from(value: tag::Model) -> Self { 173 + TagModel { 174 + uuid: value.id, 175 + name: value.name, 176 + color: HexColor::from_str(&value.color).unwrap(), 177 + } 178 + } 179 + } 180 + 181 + impl From<&tag::Model> for TagModel { 182 + fn from(value: &tag::Model) -> Self { 183 + TagModel { 184 + uuid: value.id, 185 + name: value.name.to_string(), 186 + color: HexColor::from_str(&value.color).unwrap(), 187 + } 188 + } 189 + }
+27
api/src/services/events.rs
··· 1 + use tokio::sync::broadcast; 2 + use uuid::Uuid; 3 + 4 + #[derive(Clone)] 5 + pub enum Event { 6 + Tag(Uuid), 7 + } 8 + 9 + #[derive(Clone)] 10 + pub struct EventService { 11 + tx: broadcast::Sender<Event>, 12 + } 13 + 14 + impl EventService { 15 + pub fn new() -> Self { 16 + let tx = broadcast::Sender::new(64); 17 + Self { tx } 18 + } 19 + 20 + pub fn broadcast(&self, event: Event) { 21 + let _ = self.tx.send(event); 22 + } 23 + 24 + pub fn subscribe(&self) -> broadcast::Receiver<Event> { 25 + self.tx.subscribe() 26 + } 27 + }
+1
api/src/services/mod.rs
··· 1 + pub mod events;