personal activity index (bluesky, leaflet, substack) pai.desertthunder.dev
rss bluesky
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 2eb2bd05f4bf674ebd5dd767f64a40906453bc74 407 lines 15 kB view raw
1use pai_core::{Item, ListFilter, PaiError, Result, SourceKind, Storage}; 2use rusqlite::{params, Connection, OptionalExtension}; 3use std::path::Path; 4 5const SCHEMA_VERSION: i32 = 1; 6 7const INIT_SQL: &str = r#" 8CREATE TABLE IF NOT EXISTS schema_version ( 9 version INTEGER PRIMARY KEY 10); 11 12CREATE TABLE IF NOT EXISTS items ( 13 id TEXT PRIMARY KEY, 14 source_kind TEXT NOT NULL, 15 source_id TEXT NOT NULL, 16 author TEXT, 17 title TEXT, 18 summary TEXT, 19 url TEXT NOT NULL, 20 content_html TEXT, 21 published_at TEXT NOT NULL, 22 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP 23); 24 25CREATE INDEX IF NOT EXISTS idx_items_source_date 26 ON items (source_kind, source_id, published_at DESC); 27"#; 28 29/// SQLite implementation of the Storage trait 30/// 31/// Manages persistent storage of items in a local SQLite database. 32/// Handles schema initialization and migrations automatically on first connection. 33pub struct SqliteStorage { 34 conn: Connection, 35} 36 37impl SqliteStorage { 38 /// Opens or creates a SQLite database at the given path 39 /// 40 /// Initializes the schema if the database is new or runs migrations if needed. 41 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> { 42 let path_ref = path.as_ref(); 43 44 if let Some(parent) = path_ref.parent() { 45 std::fs::create_dir_all(parent) 46 .map_err(|e| PaiError::Storage(format!("Failed to create database directory: {e}")))?; 47 } 48 49 let conn = Connection::open(path).map_err(|e| PaiError::Storage(format!("Failed to open database: {e}")))?; 50 51 let mut storage = Self { conn }; 52 storage.init_schema()?; 53 Ok(storage) 54 } 55 56 /// Initializes the database schema 57 /// 58 /// Creates tables and indexes if they don't exist, and sets up version tracking. 59 fn init_schema(&mut self) -> Result<()> { 60 self.conn 61 .execute_batch(INIT_SQL) 62 .map_err(|e| PaiError::Storage(format!("Failed to initialize schema: {e}")))?; 63 64 let version: Option<i32> = self 65 .conn 66 .query_row("SELECT version FROM schema_version LIMIT 1", [], |row| row.get(0)) 67 .optional() 68 .map_err(|e| PaiError::Storage(format!("Failed to check schema version: {e}")))?; 69 70 match version { 71 None => { 72 self.conn 73 .execute( 74 "INSERT INTO schema_version (version) VALUES (?1)", 75 params![SCHEMA_VERSION], 76 ) 77 .map_err(|e| PaiError::Storage(format!("Failed to set schema version: {e}")))?; 78 } 79 Some(v) if v < SCHEMA_VERSION => { 80 return Err(PaiError::Storage(format!( 81 "Database migration needed: current={v}, required={SCHEMA_VERSION}" 82 ))); 83 } 84 _ => {} 85 } 86 87 Ok(()) 88 } 89 90 /// Gets basic statistics about stored items 91 pub fn get_stats(&self) -> Result<Vec<(String, usize)>> { 92 let mut stmt = self 93 .conn 94 .prepare("SELECT source_kind, COUNT(*) FROM items GROUP BY source_kind ORDER BY source_kind") 95 .map_err(|e| PaiError::Storage(format!("Failed to prepare stats query: {e}")))?; 96 97 let stats = stmt 98 .query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?))) 99 .map_err(|e| PaiError::Storage(format!("Failed to query stats: {e}")))? 100 .collect::<std::result::Result<Vec<_>, _>>() 101 .map_err(|e| PaiError::Storage(format!("Failed to collect stats: {e}")))?; 102 103 Ok(stats) 104 } 105 106 /// Gets total item count 107 pub fn count_items(&self) -> Result<usize> { 108 self.conn 109 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0)) 110 .map_err(|e| PaiError::Storage(format!("Failed to count items: {e}"))) 111 } 112 113 /// Verifies schema integrity 114 /// 115 /// Checks that required tables and indexes exist. 116 pub fn verify_schema(&self) -> Result<()> { 117 let tables = vec!["schema_version", "items"]; 118 for table in tables { 119 let exists: bool = self 120 .conn 121 .query_row( 122 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", 123 params![table], 124 |row| { 125 let count: i32 = row.get(0)?; 126 Ok(count > 0) 127 }, 128 ) 129 .map_err(|e| PaiError::Storage(format!("Failed to verify table {table}: {e}")))?; 130 131 if !exists { 132 return Err(PaiError::Storage(format!("Missing table: {table}"))); 133 } 134 } 135 136 Ok(()) 137 } 138} 139 140impl Storage for SqliteStorage { 141 fn insert_or_replace_item(&self, item: &Item) -> Result<()> { 142 self.conn 143 .execute( 144 "INSERT OR REPLACE INTO items 145 (id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at) 146 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 147 params![ 148 item.id, 149 item.source_kind.to_string(), 150 item.source_id, 151 item.author, 152 item.title, 153 item.summary, 154 item.url, 155 item.content_html, 156 item.published_at, 157 item.created_at, 158 ], 159 ) 160 .map_err(|e| PaiError::Storage(format!("Failed to insert item: {e}")))?; 161 162 Ok(()) 163 } 164 165 fn list_items(&self, filter: &ListFilter) -> Result<Vec<Item>> { 166 let mut sql = String::from("SELECT id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at FROM items WHERE 1=1"); 167 let mut conditions = Vec::new(); 168 169 if filter.source_kind.is_some() { 170 sql.push_str(" AND source_kind = ?"); 171 conditions.push(filter.source_kind.unwrap().to_string()); 172 } 173 174 if let Some(ref source_id) = filter.source_id { 175 sql.push_str(" AND source_id = ?"); 176 conditions.push(source_id.clone()); 177 } 178 179 if let Some(ref since) = filter.since { 180 sql.push_str(" AND published_at >= ?"); 181 conditions.push(since.clone()); 182 } 183 184 if let Some(ref query) = filter.query { 185 sql.push_str(" AND (title LIKE ? OR summary LIKE ?)"); 186 let pattern = format!("%{query}%"); 187 conditions.push(pattern.clone()); 188 conditions.push(pattern); 189 } 190 191 sql.push_str(" ORDER BY published_at DESC"); 192 193 if let Some(limit) = filter.limit { 194 sql.push_str(&format!(" LIMIT {limit}")); 195 } 196 197 let mut stmt = self 198 .conn 199 .prepare(&sql) 200 .map_err(|e| PaiError::Storage(format!("Failed to prepare query: {e}")))?; 201 202 let params_refs: Vec<&dyn rusqlite::ToSql> = conditions.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); 203 204 let items = stmt 205 .query_map(params_refs.as_slice(), |row| { 206 let source_kind_str: String = row.get(1)?; 207 let source_kind = source_kind_str.parse::<SourceKind>().map_err(|e| { 208 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(e)) 209 })?; 210 211 Ok(Item { 212 id: row.get(0)?, 213 source_kind, 214 source_id: row.get(2)?, 215 author: row.get(3)?, 216 title: row.get(4)?, 217 summary: row.get(5)?, 218 url: row.get(6)?, 219 content_html: row.get(7)?, 220 published_at: row.get(8)?, 221 created_at: row.get(9)?, 222 }) 223 }) 224 .map_err(|e| PaiError::Storage(format!("Failed to query items: {e}")))? 225 .collect::<std::result::Result<Vec<_>, _>>() 226 .map_err(|e| PaiError::Storage(format!("Failed to collect items: {e}")))?; 227 228 Ok(items) 229 } 230} 231 232#[cfg(test)] 233mod tests { 234 use super::*; 235 use chrono::Utc; 236 237 fn create_test_storage() -> SqliteStorage { 238 SqliteStorage::new(":memory:").expect("Failed to create in-memory database") 239 } 240 241 fn create_test_item(id: &str, source_kind: SourceKind, source_id: &str) -> Item { 242 Item { 243 id: id.to_string(), 244 source_kind, 245 source_id: source_id.to_string(), 246 author: Some("Test Author".to_string()), 247 title: Some("Test Title".to_string()), 248 summary: Some("Test summary".to_string()), 249 url: format!("https://example.com/{id}"), 250 content_html: Some("<p>Test content</p>".to_string()), 251 published_at: Utc::now().to_rfc3339(), 252 created_at: Utc::now().to_rfc3339(), 253 } 254 } 255 256 #[test] 257 fn new_database_initializes_schema() { 258 let storage = create_test_storage(); 259 assert!(storage.verify_schema().is_ok()); 260 } 261 262 #[test] 263 fn insert_and_retrieve_item() { 264 let storage = create_test_storage(); 265 let item = create_test_item("test-1", SourceKind::Substack, "test.substack.com"); 266 267 storage.insert_or_replace_item(&item).expect("Failed to insert item"); 268 269 let filter = ListFilter::default(); 270 let items = storage.list_items(&filter).expect("Failed to list items"); 271 272 assert_eq!(items.len(), 1); 273 assert_eq!(items[0].id, "test-1"); 274 assert_eq!(items[0].source_kind, SourceKind::Substack); 275 } 276 277 #[test] 278 fn insert_replaces_existing_item() { 279 let storage = create_test_storage(); 280 let mut item = create_test_item("test-1", SourceKind::Substack, "test.substack.com"); 281 282 storage.insert_or_replace_item(&item).expect("Failed to insert item"); 283 284 item.title = Some("Updated Title".to_string()); 285 storage.insert_or_replace_item(&item).expect("Failed to replace item"); 286 287 let filter = ListFilter::default(); 288 let items = storage.list_items(&filter).expect("Failed to list items"); 289 290 assert_eq!(items.len(), 1); 291 assert_eq!(items[0].title, Some("Updated Title".to_string())); 292 } 293 294 #[test] 295 fn filter_by_source_kind() { 296 let storage = create_test_storage(); 297 298 storage 299 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Substack, "test.substack.com")) 300 .expect("Failed to insert"); 301 storage 302 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Bluesky, "test.bsky.social")) 303 .expect("Failed to insert"); 304 305 let filter = ListFilter { source_kind: Some(SourceKind::Substack), ..Default::default() }; 306 let items = storage.list_items(&filter).expect("Failed to list items"); 307 308 assert_eq!(items.len(), 1); 309 assert_eq!(items[0].source_kind, SourceKind::Substack); 310 } 311 312 #[test] 313 fn filter_by_source_id() { 314 let storage = create_test_storage(); 315 316 storage 317 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Leaflet, "source1.leaflet.pub")) 318 .expect("Failed to insert"); 319 storage 320 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Leaflet, "source2.leaflet.pub")) 321 .expect("Failed to insert"); 322 323 let filter = ListFilter { source_id: Some("source1.leaflet.pub".to_string()), ..Default::default() }; 324 let items = storage.list_items(&filter).expect("Failed to list items"); 325 326 assert_eq!(items.len(), 1); 327 assert_eq!(items[0].source_id, "source1.leaflet.pub"); 328 } 329 330 #[test] 331 fn filter_with_limit() { 332 let storage = create_test_storage(); 333 334 for i in 0..5 { 335 storage 336 .insert_or_replace_item(&create_test_item( 337 &format!("test-{i}"), 338 SourceKind::Substack, 339 "test.substack.com", 340 )) 341 .expect("Failed to insert"); 342 } 343 344 let filter = ListFilter { limit: Some(3), ..Default::default() }; 345 let items = storage.list_items(&filter).expect("Failed to list items"); 346 347 assert_eq!(items.len(), 3); 348 } 349 350 #[test] 351 fn filter_by_query() { 352 let storage = create_test_storage(); 353 354 let mut item1 = create_test_item("test-1", SourceKind::Substack, "test.substack.com"); 355 item1.title = Some("Rust Programming".to_string()); 356 storage.insert_or_replace_item(&item1).expect("Failed to insert"); 357 358 let mut item2 = create_test_item("test-2", SourceKind::Substack, "test.substack.com"); 359 item2.title = Some("Python Tutorial".to_string()); 360 storage.insert_or_replace_item(&item2).expect("Failed to insert"); 361 362 let filter = ListFilter { query: Some("Rust".to_string()), ..Default::default() }; 363 let items = storage.list_items(&filter).expect("Failed to list items"); 364 365 assert_eq!(items.len(), 1); 366 assert_eq!(items[0].id, "test-1"); 367 } 368 369 #[test] 370 fn get_stats_returns_counts_by_source() { 371 let storage = create_test_storage(); 372 373 storage 374 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Substack, "test.substack.com")) 375 .expect("Failed to insert"); 376 storage 377 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Substack, "test.substack.com")) 378 .expect("Failed to insert"); 379 storage 380 .insert_or_replace_item(&create_test_item("test-3", SourceKind::Bluesky, "test.bsky.social")) 381 .expect("Failed to insert"); 382 383 let stats = storage.get_stats().expect("Failed to get stats"); 384 385 assert_eq!(stats.len(), 2); 386 assert!(stats.iter().any(|(k, v)| k == "bluesky" && *v == 1)); 387 assert!(stats.iter().any(|(k, v)| k == "substack" && *v == 2)); 388 } 389 390 #[test] 391 fn count_items_returns_total() { 392 let storage = create_test_storage(); 393 394 for i in 0..3 { 395 storage 396 .insert_or_replace_item(&create_test_item( 397 &format!("test-{i}"), 398 SourceKind::Substack, 399 "test.substack.com", 400 )) 401 .expect("Failed to insert"); 402 } 403 404 let count = storage.count_items().expect("Failed to count items"); 405 assert_eq!(count, 3); 406 } 407}