personal activity index (bluesky, leaflet, substack) pai.desertthunder.dev
rss bluesky
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at a55bc23c35eeb8ac8cf652f1ebac1eb258132bd6 203 lines 6.0 kB view raw
1use crate::storage::SqliteStorage; 2use crate::{ensure_positive_limit, normalize_optional_string, normalize_since_input}; 3use axum::{ 4 extract::{Path, Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7 routing::get, 8 Json, Router, 9}; 10use owo_colors::OwoColorize; 11use pai_core::{Item, ListFilter, PaiError, SourceKind}; 12use serde::{Deserialize, Serialize}; 13use std::{net::SocketAddr, path::PathBuf, sync::Arc}; 14use tokio::net::TcpListener; 15 16const DEFAULT_LIMIT: usize = 20; 17 18/// Launches the HTTP server using the provided SQLite database path and address. 19pub(crate) fn serve(db_path: PathBuf, address: String) -> Result<(), PaiError> { 20 let addr: SocketAddr = address 21 .parse() 22 .map_err(|e| PaiError::Config(format!("Invalid listen address '{address}': {e}")))?; 23 24 let runtime = tokio::runtime::Builder::new_multi_thread() 25 .enable_all() 26 .build() 27 .map_err(PaiError::Io)?; 28 29 runtime.block_on(async move { run_server(db_path, addr).await }) 30} 31 32async fn run_server(db_path: PathBuf, addr: SocketAddr) -> Result<(), PaiError> { 33 // Ensure the database exists and schema is ready before serving requests. 34 let storage = SqliteStorage::new(&db_path)?; 35 storage.verify_schema()?; 36 drop(storage); 37 38 let state = AppState { db_path: Arc::new(db_path) }; 39 40 let app = Router::new() 41 .route("/api/feed", get(feed_handler)) 42 .route("/api/item/:id", get(item_handler)) 43 .with_state(state); 44 45 let listener = TcpListener::bind(addr).await.map_err(PaiError::Io)?; 46 let local_addr = listener.local_addr().map_err(PaiError::Io)?; 47 println!("{} Listening on http://{}", "Info:".cyan(), local_addr); 48 49 axum::serve(listener, app.into_make_service()) 50 .with_graceful_shutdown(shutdown_signal()) 51 .await 52 .map_err(|err| PaiError::Io(std::io::Error::new(std::io::ErrorKind::Other, err))) 53} 54 55#[derive(Clone)] 56struct AppState { 57 db_path: Arc<PathBuf>, 58} 59 60impl AppState { 61 fn open_storage(&self) -> Result<SqliteStorage, PaiError> { 62 SqliteStorage::new(self.db_path.as_ref()) 63 } 64} 65 66#[derive(Debug, Default, Deserialize)] 67struct FeedQuery { 68 source_kind: Option<SourceKind>, 69 source_id: Option<String>, 70 limit: Option<usize>, 71 since: Option<String>, 72 q: Option<String>, 73} 74 75impl FeedQuery { 76 fn into_filter(self) -> Result<ListFilter, PaiError> { 77 let limit = match self.limit { 78 Some(value) => ensure_positive_limit(value)?, 79 None => DEFAULT_LIMIT, 80 }; 81 82 Ok(ListFilter { 83 source_kind: self.source_kind, 84 source_id: normalize_optional_string(self.source_id), 85 limit: Some(limit), 86 since: normalize_since_input(self.since)?, 87 query: normalize_optional_string(self.q), 88 }) 89 } 90} 91 92#[derive(Serialize)] 93struct FeedResponse { 94 count: usize, 95 items: Vec<Item>, 96} 97 98async fn feed_handler( 99 State(state): State<AppState>, Query(query): Query<FeedQuery>, 100) -> Result<Json<FeedResponse>, ApiError> { 101 let filter = query.into_filter()?; 102 let storage = state.open_storage()?; 103 let items = pai_core::Storage::list_items(&storage, &filter)?; 104 105 Ok(Json(FeedResponse { count: items.len(), items })) 106} 107 108async fn item_handler(State(state): State<AppState>, Path(id): Path<String>) -> Result<Json<Item>, ApiError> { 109 let storage = state.open_storage()?; 110 let item = storage 111 .get_item(&id)? 112 .ok_or_else(|| ApiError::not_found(format!("Item '{id}' not found")))?; 113 114 Ok(Json(item)) 115} 116 117struct ApiError { 118 status: StatusCode, 119 message: String, 120} 121 122impl ApiError { 123 fn bad_request(msg: impl Into<String>) -> Self { 124 Self { status: StatusCode::BAD_REQUEST, message: msg.into() } 125 } 126 127 fn not_found(msg: impl Into<String>) -> Self { 128 Self { status: StatusCode::NOT_FOUND, message: msg.into() } 129 } 130 131 fn internal(msg: impl Into<String>) -> Self { 132 Self { status: StatusCode::INTERNAL_SERVER_ERROR, message: msg.into() } 133 } 134} 135 136impl From<PaiError> for ApiError { 137 fn from(err: PaiError) -> Self { 138 match err { 139 PaiError::InvalidArgument(msg) => Self::bad_request(msg), 140 other => Self::internal(other.to_string()), 141 } 142 } 143} 144 145#[derive(Serialize)] 146struct ErrorBody { 147 error: String, 148} 149 150impl IntoResponse for ApiError { 151 fn into_response(self) -> Response { 152 (self.status, Json(ErrorBody { error: self.message })).into_response() 153 } 154} 155 156async fn shutdown_signal() { 157 let _ = tokio::signal::ctrl_c().await; 158} 159 160#[cfg(test)] 161mod tests { 162 use super::*; 163 164 #[test] 165 fn feed_query_defaults() { 166 let filter = FeedQuery::default().into_filter().unwrap(); 167 assert_eq!(filter.limit, Some(DEFAULT_LIMIT)); 168 assert!(filter.source_kind.is_none()); 169 assert!(filter.source_id.is_none()); 170 } 171 172 #[test] 173 fn feed_query_respects_parameters() { 174 let query = FeedQuery { 175 source_kind: Some(SourceKind::Bluesky), 176 source_id: Some(" desertthunder.dev ".to_string()), 177 limit: Some(5), 178 since: Some("2024-01-01T00:00:00Z".to_string()), 179 q: Some(" rust ".to_string()), 180 }; 181 182 let filter = query.into_filter().unwrap(); 183 assert_eq!(filter.limit, Some(5)); 184 assert_eq!(filter.source_kind, Some(SourceKind::Bluesky)); 185 assert_eq!(filter.source_id.unwrap(), "desertthunder.dev"); 186 assert_eq!(filter.query.unwrap(), "rust"); 187 assert_eq!(filter.since.unwrap(), "2024-01-01T00:00:00+00:00"); 188 } 189 190 #[test] 191 fn feed_query_rejects_zero_limit() { 192 let err = FeedQuery { limit: Some(0), ..Default::default() } 193 .into_filter() 194 .unwrap_err(); 195 assert!(matches!(err, PaiError::InvalidArgument(_))); 196 } 197 198 #[test] 199 fn api_error_into_response_sets_status() { 200 let resp = ApiError::bad_request("oops").into_response(); 201 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 202 } 203}