personal activity index (bluesky, leaflet, substack)
pai.desertthunder.dev
rss
bluesky
1use pai_core::{Item, ListFilter, PaiError, Result, SourceKind, Storage};
2use rusqlite::{params, Connection, OptionalExtension};
3use std::path::Path;
4
5const SCHEMA_VERSION: i32 = 1;
6
7const INIT_SQL: &str = r#"
8CREATE TABLE IF NOT EXISTS schema_version (
9 version INTEGER PRIMARY KEY
10);
11
12CREATE TABLE IF NOT EXISTS items (
13 id TEXT PRIMARY KEY,
14 source_kind TEXT NOT NULL,
15 source_id TEXT NOT NULL,
16 author TEXT,
17 title TEXT,
18 summary TEXT,
19 url TEXT NOT NULL,
20 content_html TEXT,
21 published_at TEXT NOT NULL,
22 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
23);
24
25CREATE INDEX IF NOT EXISTS idx_items_source_date
26 ON items (source_kind, source_id, published_at DESC);
27"#;
28
29/// SQLite implementation of the Storage trait
30///
31/// Manages persistent storage of items in a local SQLite database.
32/// Handles schema initialization and migrations automatically on first connection.
33pub struct SqliteStorage {
34 conn: Connection,
35}
36
37impl SqliteStorage {
38 /// Opens or creates a SQLite database at the given path
39 ///
40 /// Initializes the schema if the database is new or runs migrations if needed.
41 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
42 let path_ref = path.as_ref();
43
44 if let Some(parent) = path_ref.parent() {
45 std::fs::create_dir_all(parent)
46 .map_err(|e| PaiError::Storage(format!("Failed to create database directory: {e}")))?;
47 }
48
49 let conn = Connection::open(path).map_err(|e| PaiError::Storage(format!("Failed to open database: {e}")))?;
50
51 let mut storage = Self { conn };
52 storage.init_schema()?;
53 Ok(storage)
54 }
55
56 /// Initializes the database schema
57 ///
58 /// Creates tables and indexes if they don't exist, and sets up version tracking.
59 fn init_schema(&mut self) -> Result<()> {
60 self.conn
61 .execute_batch(INIT_SQL)
62 .map_err(|e| PaiError::Storage(format!("Failed to initialize schema: {e}")))?;
63
64 let version: Option<i32> = self
65 .conn
66 .query_row("SELECT version FROM schema_version LIMIT 1", [], |row| row.get(0))
67 .optional()
68 .map_err(|e| PaiError::Storage(format!("Failed to check schema version: {e}")))?;
69
70 match version {
71 None => {
72 self.conn
73 .execute(
74 "INSERT INTO schema_version (version) VALUES (?1)",
75 params![SCHEMA_VERSION],
76 )
77 .map_err(|e| PaiError::Storage(format!("Failed to set schema version: {e}")))?;
78 }
79 Some(v) if v < SCHEMA_VERSION => {
80 return Err(PaiError::Storage(format!(
81 "Database migration needed: current={v}, required={SCHEMA_VERSION}"
82 )));
83 }
84 _ => {}
85 }
86
87 Ok(())
88 }
89
90 /// Gets basic statistics about stored items
91 pub fn get_stats(&self) -> Result<Vec<(String, usize)>> {
92 let mut stmt = self
93 .conn
94 .prepare("SELECT source_kind, COUNT(*) FROM items GROUP BY source_kind ORDER BY source_kind")
95 .map_err(|e| PaiError::Storage(format!("Failed to prepare stats query: {e}")))?;
96
97 let stats = stmt
98 .query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?)))
99 .map_err(|e| PaiError::Storage(format!("Failed to query stats: {e}")))?
100 .collect::<std::result::Result<Vec<_>, _>>()
101 .map_err(|e| PaiError::Storage(format!("Failed to collect stats: {e}")))?;
102
103 Ok(stats)
104 }
105
106 /// Gets total item count
107 pub fn count_items(&self) -> Result<usize> {
108 self.conn
109 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
110 .map_err(|e| PaiError::Storage(format!("Failed to count items: {e}")))
111 }
112
113 /// Verifies schema integrity
114 ///
115 /// Checks that required tables and indexes exist.
116 pub fn verify_schema(&self) -> Result<()> {
117 let tables = vec!["schema_version", "items"];
118 for table in tables {
119 let exists: bool = self
120 .conn
121 .query_row(
122 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
123 params![table],
124 |row| {
125 let count: i32 = row.get(0)?;
126 Ok(count > 0)
127 },
128 )
129 .map_err(|e| PaiError::Storage(format!("Failed to verify table {table}: {e}")))?;
130
131 if !exists {
132 return Err(PaiError::Storage(format!("Missing table: {table}")));
133 }
134 }
135
136 Ok(())
137 }
138}
139
140impl Storage for SqliteStorage {
141 fn insert_or_replace_item(&self, item: &Item) -> Result<()> {
142 self.conn
143 .execute(
144 "INSERT OR REPLACE INTO items
145 (id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at)
146 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
147 params![
148 item.id,
149 item.source_kind.to_string(),
150 item.source_id,
151 item.author,
152 item.title,
153 item.summary,
154 item.url,
155 item.content_html,
156 item.published_at,
157 item.created_at,
158 ],
159 )
160 .map_err(|e| PaiError::Storage(format!("Failed to insert item: {e}")))?;
161
162 Ok(())
163 }
164
165 fn list_items(&self, filter: &ListFilter) -> Result<Vec<Item>> {
166 let mut sql = String::from("SELECT id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at FROM items WHERE 1=1");
167 let mut conditions = Vec::new();
168
169 if filter.source_kind.is_some() {
170 sql.push_str(" AND source_kind = ?");
171 conditions.push(filter.source_kind.unwrap().to_string());
172 }
173
174 if let Some(ref source_id) = filter.source_id {
175 sql.push_str(" AND source_id = ?");
176 conditions.push(source_id.clone());
177 }
178
179 if let Some(ref since) = filter.since {
180 sql.push_str(" AND published_at >= ?");
181 conditions.push(since.clone());
182 }
183
184 if let Some(ref query) = filter.query {
185 sql.push_str(" AND (title LIKE ? OR summary LIKE ?)");
186 let pattern = format!("%{query}%");
187 conditions.push(pattern.clone());
188 conditions.push(pattern);
189 }
190
191 sql.push_str(" ORDER BY published_at DESC");
192
193 if let Some(limit) = filter.limit {
194 sql.push_str(&format!(" LIMIT {limit}"));
195 }
196
197 let mut stmt = self
198 .conn
199 .prepare(&sql)
200 .map_err(|e| PaiError::Storage(format!("Failed to prepare query: {e}")))?;
201
202 let params_refs: Vec<&dyn rusqlite::ToSql> = conditions.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
203
204 let items = stmt
205 .query_map(params_refs.as_slice(), |row| {
206 let source_kind_str: String = row.get(1)?;
207 let source_kind = source_kind_str.parse::<SourceKind>().map_err(|e| {
208 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(e))
209 })?;
210
211 Ok(Item {
212 id: row.get(0)?,
213 source_kind,
214 source_id: row.get(2)?,
215 author: row.get(3)?,
216 title: row.get(4)?,
217 summary: row.get(5)?,
218 url: row.get(6)?,
219 content_html: row.get(7)?,
220 published_at: row.get(8)?,
221 created_at: row.get(9)?,
222 })
223 })
224 .map_err(|e| PaiError::Storage(format!("Failed to query items: {e}")))?
225 .collect::<std::result::Result<Vec<_>, _>>()
226 .map_err(|e| PaiError::Storage(format!("Failed to collect items: {e}")))?;
227
228 Ok(items)
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use chrono::Utc;
236
237 fn create_test_storage() -> SqliteStorage {
238 SqliteStorage::new(":memory:").expect("Failed to create in-memory database")
239 }
240
241 fn create_test_item(id: &str, source_kind: SourceKind, source_id: &str) -> Item {
242 Item {
243 id: id.to_string(),
244 source_kind,
245 source_id: source_id.to_string(),
246 author: Some("Test Author".to_string()),
247 title: Some("Test Title".to_string()),
248 summary: Some("Test summary".to_string()),
249 url: format!("https://example.com/{id}"),
250 content_html: Some("<p>Test content</p>".to_string()),
251 published_at: Utc::now().to_rfc3339(),
252 created_at: Utc::now().to_rfc3339(),
253 }
254 }
255
256 #[test]
257 fn new_database_initializes_schema() {
258 let storage = create_test_storage();
259 assert!(storage.verify_schema().is_ok());
260 }
261
262 #[test]
263 fn insert_and_retrieve_item() {
264 let storage = create_test_storage();
265 let item = create_test_item("test-1", SourceKind::Substack, "test.substack.com");
266
267 storage.insert_or_replace_item(&item).expect("Failed to insert item");
268
269 let filter = ListFilter::default();
270 let items = storage.list_items(&filter).expect("Failed to list items");
271
272 assert_eq!(items.len(), 1);
273 assert_eq!(items[0].id, "test-1");
274 assert_eq!(items[0].source_kind, SourceKind::Substack);
275 }
276
277 #[test]
278 fn insert_replaces_existing_item() {
279 let storage = create_test_storage();
280 let mut item = create_test_item("test-1", SourceKind::Substack, "test.substack.com");
281
282 storage.insert_or_replace_item(&item).expect("Failed to insert item");
283
284 item.title = Some("Updated Title".to_string());
285 storage.insert_or_replace_item(&item).expect("Failed to replace item");
286
287 let filter = ListFilter::default();
288 let items = storage.list_items(&filter).expect("Failed to list items");
289
290 assert_eq!(items.len(), 1);
291 assert_eq!(items[0].title, Some("Updated Title".to_string()));
292 }
293
294 #[test]
295 fn filter_by_source_kind() {
296 let storage = create_test_storage();
297
298 storage
299 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Substack, "test.substack.com"))
300 .expect("Failed to insert");
301 storage
302 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Bluesky, "test.bsky.social"))
303 .expect("Failed to insert");
304
305 let filter = ListFilter { source_kind: Some(SourceKind::Substack), ..Default::default() };
306 let items = storage.list_items(&filter).expect("Failed to list items");
307
308 assert_eq!(items.len(), 1);
309 assert_eq!(items[0].source_kind, SourceKind::Substack);
310 }
311
312 #[test]
313 fn filter_by_source_id() {
314 let storage = create_test_storage();
315
316 storage
317 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Leaflet, "source1.leaflet.pub"))
318 .expect("Failed to insert");
319 storage
320 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Leaflet, "source2.leaflet.pub"))
321 .expect("Failed to insert");
322
323 let filter = ListFilter { source_id: Some("source1.leaflet.pub".to_string()), ..Default::default() };
324 let items = storage.list_items(&filter).expect("Failed to list items");
325
326 assert_eq!(items.len(), 1);
327 assert_eq!(items[0].source_id, "source1.leaflet.pub");
328 }
329
330 #[test]
331 fn filter_with_limit() {
332 let storage = create_test_storage();
333
334 for i in 0..5 {
335 storage
336 .insert_or_replace_item(&create_test_item(
337 &format!("test-{i}"),
338 SourceKind::Substack,
339 "test.substack.com",
340 ))
341 .expect("Failed to insert");
342 }
343
344 let filter = ListFilter { limit: Some(3), ..Default::default() };
345 let items = storage.list_items(&filter).expect("Failed to list items");
346
347 assert_eq!(items.len(), 3);
348 }
349
350 #[test]
351 fn filter_by_query() {
352 let storage = create_test_storage();
353
354 let mut item1 = create_test_item("test-1", SourceKind::Substack, "test.substack.com");
355 item1.title = Some("Rust Programming".to_string());
356 storage.insert_or_replace_item(&item1).expect("Failed to insert");
357
358 let mut item2 = create_test_item("test-2", SourceKind::Substack, "test.substack.com");
359 item2.title = Some("Python Tutorial".to_string());
360 storage.insert_or_replace_item(&item2).expect("Failed to insert");
361
362 let filter = ListFilter { query: Some("Rust".to_string()), ..Default::default() };
363 let items = storage.list_items(&filter).expect("Failed to list items");
364
365 assert_eq!(items.len(), 1);
366 assert_eq!(items[0].id, "test-1");
367 }
368
369 #[test]
370 fn get_stats_returns_counts_by_source() {
371 let storage = create_test_storage();
372
373 storage
374 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Substack, "test.substack.com"))
375 .expect("Failed to insert");
376 storage
377 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Substack, "test.substack.com"))
378 .expect("Failed to insert");
379 storage
380 .insert_or_replace_item(&create_test_item("test-3", SourceKind::Bluesky, "test.bsky.social"))
381 .expect("Failed to insert");
382
383 let stats = storage.get_stats().expect("Failed to get stats");
384
385 assert_eq!(stats.len(), 2);
386 assert!(stats.iter().any(|(k, v)| k == "bluesky" && *v == 1));
387 assert!(stats.iter().any(|(k, v)| k == "substack" && *v == 2));
388 }
389
390 #[test]
391 fn count_items_returns_total() {
392 let storage = create_test_storage();
393
394 for i in 0..3 {
395 storage
396 .insert_or_replace_item(&create_test_item(
397 &format!("test-{i}"),
398 SourceKind::Substack,
399 "test.substack.com",
400 ))
401 .expect("Failed to insert");
402 }
403
404 let count = storage.count_items().expect("Failed to count items");
405 assert_eq!(count, 3);
406 }
407}