···11-// Prevents additional console window on Windows in release, DO NOT REMOVE!!
11+//! Prevents additional console window on Windows in release, DO NOT REMOVE!!
22#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
3344fn main() {
+7
src-tauri/src/migrations/006_app_settings.sql
···11+CREATE TABLE IF NOT EXISTS app_settings (
22+ key TEXT PRIMARY KEY,
33+ value TEXT NOT NULL
44+);
55+66+-- Embeddings are enabled by default (opt-out).
77+INSERT OR IGNORE INTO app_settings(key, value) VALUES ('embeddings_enabled', '1');
+770
src-tauri/src/search.rs
···11+use super::auth::LazuriteOAuthSession;
22+use super::error::{AppError, Result};
33+use super::state::AppState;
44+use fastembed::{EmbeddingModel, TextEmbedding, TextInitOptions};
55+use jacquard::api::app_bsky::actor::search_actors::SearchActors;
66+use jacquard::api::app_bsky::feed::get_actor_likes::GetActorLikes;
77+use jacquard::api::app_bsky::feed::search_posts::SearchPosts;
88+use jacquard::api::app_bsky::graph::search_starter_packs::SearchStarterPacks;
99+use jacquard::types::did::Did;
1010+use jacquard::types::ident::AtIdentifier;
1111+use jacquard::xrpc::XrpcClient;
1212+use rusqlite::{params, Connection, OptionalExtension};
1313+use serde::Serialize;
1414+use std::path::PathBuf;
1515+use std::sync::Arc;
1616+use tauri::{AppHandle, Manager};
1717+use tauri_plugin_log::log;
1818+1919+#[derive(Debug, Serialize)]
2020+#[serde(rename_all = "camelCase")]
2121+pub struct SyncStatus {
2222+ pub source: String,
2323+ pub post_count: i64,
2424+ pub cursor: Option<String>,
2525+ pub last_synced_at: Option<String>,
2626+}
2727+2828+fn validate_query(query: &str) -> Result<()> {
2929+ if query.trim().is_empty() {
3030+ return Err(AppError::validation("search query must not be empty"));
3131+ }
3232+ Ok(())
3333+}
3434+3535+fn validate_source(source: &str) -> Result<()> {
3636+ match source {
3737+ "like" | "bookmark" => Ok(()),
3838+ _ => Err(AppError::validation("source must be 'like' or 'bookmark'")),
3939+ }
4040+}
4141+4242+async fn get_session(state: &AppState) -> Result<Arc<LazuriteOAuthSession>> {
4343+ let did = state
4444+ .active_session
4545+ .read()
4646+ .map_err(|error| {
4747+ log::error!("active_session poisoned: {error}");
4848+ AppError::StatePoisoned("active_session")
4949+ })?
5050+ .as_ref()
5151+ .ok_or_else(|| {
5252+ log::error!("no active account");
5353+ AppError::Validation("no active account".into())
5454+ })?
5555+ .did
5656+ .clone();
5757+5858+ state
5959+ .sessions
6060+ .read()
6161+ .map_err(|error| {
6262+ log::error!("sessions poisoned: {error}");
6363+ AppError::StatePoisoned("sessions")
6464+ })?
6565+ .get(&did)
6666+ .cloned()
6767+ .ok_or_else(|| {
6868+ log::error!("session not found for active account");
6969+ AppError::Validation("session not found for active account".into())
7070+ })
7171+}
7272+7373+fn db_load_sync_cursor(conn: &Connection, did: &str, source: &str) -> Result<Option<String>> {
7474+ conn.query_row(
7575+ "SELECT cursor FROM sync_state WHERE did = ?1 AND source = ?2",
7676+ params![did, source],
7777+ |row| row.get::<_, Option<String>>(0),
7878+ )
7979+ .optional()
8080+ .map(|opt| opt.flatten())
8181+ .map_err(AppError::from)
8282+}
8383+8484+fn db_save_sync_state(conn: &Connection, did: &str, source: &str, cursor: Option<&str>) -> Result<()> {
8585+ conn.execute(
8686+ "INSERT INTO sync_state(did, source, cursor, last_synced_at)
8787+ VALUES(?1, ?2, ?3, CURRENT_TIMESTAMP)
8888+ ON CONFLICT(did, source) DO UPDATE SET
8989+ cursor = excluded.cursor,
9090+ last_synced_at = excluded.last_synced_at",
9191+ params![did, source, cursor],
9292+ )?;
9393+ Ok(())
9494+}
9595+9696+/// Upsert a single `FeedViewPost` JSON item into the `posts` table.
9797+/// On conflict (same uri) updates mutable fields but keeps indexed_at.
9898+fn db_upsert_post(conn: &Connection, feed_item: &serde_json::Value, source: &str) -> Result<()> {
9999+ let post = feed_item.get("post").unwrap_or(feed_item);
100100+101101+ let uri = post
102102+ .get("uri")
103103+ .and_then(|v| v.as_str())
104104+ .ok_or_else(|| AppError::validation("feed item missing post.uri"))?;
105105+ let cid = post
106106+ .get("cid")
107107+ .and_then(|v| v.as_str())
108108+ .ok_or_else(|| AppError::validation("feed item missing post.cid"))?;
109109+ let author = post
110110+ .get("author")
111111+ .ok_or_else(|| AppError::validation("feed item missing post.author"))?;
112112+ let author_did = author
113113+ .get("did")
114114+ .and_then(|v| v.as_str())
115115+ .ok_or_else(|| AppError::validation("feed item missing post.author.did"))?;
116116+ let author_handle = author.get("handle").and_then(|v| v.as_str());
117117+118118+ let record = post.get("record");
119119+ let text = record.and_then(|r| r.get("text")).and_then(|v| v.as_str());
120120+ let created_at = record.and_then(|r| r.get("createdAt")).and_then(|v| v.as_str());
121121+ let json_record = record.map(|r| r.to_string());
122122+123123+ conn.execute(
124124+ "INSERT INTO posts(uri, cid, author_did, author_handle, text, created_at, json_record, source)
125125+ VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
126126+ ON CONFLICT(uri) DO UPDATE SET
127127+ cid = excluded.cid,
128128+ author_handle = excluded.author_handle,
129129+ text = excluded.text,
130130+ json_record = excluded.json_record",
131131+ params![
132132+ uri,
133133+ cid,
134134+ author_did,
135135+ author_handle,
136136+ text,
137137+ created_at,
138138+ json_record,
139139+ source
140140+ ],
141141+ )?;
142142+ Ok(())
143143+}
144144+145145+fn db_post_count(conn: &Connection, source: &str) -> Result<i64> {
146146+ conn.query_row("SELECT COUNT(*) FROM posts WHERE source = ?1", params![source], |row| {
147147+ row.get(0)
148148+ })
149149+ .map_err(AppError::from)
150150+}
151151+152152+fn db_sync_status(conn: &Connection, source: &str) -> Result<SyncStatus> {
153153+ let post_count = db_post_count(conn, source)?;
154154+ let (cursor, last_synced_at) = conn
155155+ .query_row(
156156+ "SELECT cursor, last_synced_at FROM sync_state WHERE source = ?1",
157157+ params![source],
158158+ |row| Ok((row.get::<_, Option<String>>(0)?, row.get::<_, Option<String>>(1)?)),
159159+ )
160160+ .optional()?
161161+ .unwrap_or((None, None));
162162+163163+ Ok(SyncStatus { source: source.to_owned(), post_count, cursor, last_synced_at })
164164+}
165165+166166+pub async fn search_posts_network(
167167+ query: String, sort: Option<String>, limit: Option<u32>, cursor: Option<String>, state: &AppState,
168168+) -> Result<serde_json::Value> {
169169+ validate_query(&query)?;
170170+ let session = get_session(state).await?;
171171+172172+ let output = session
173173+ .send(
174174+ SearchPosts::new()
175175+ .sort(sort.as_deref().map(|s| s.into()))
176176+ .limit(limit.map(|l| l as i64))
177177+ .cursor(cursor.as_deref().map(|c| c.into()))
178178+ .q(query.as_str())
179179+ .build(),
180180+ )
181181+ .await
182182+ .map_err(|error| {
183183+ log::error!("searchPosts error: {error}");
184184+ AppError::validation("searchPosts error")
185185+ })?
186186+ .into_output()
187187+ .map_err(|error| {
188188+ log::error!("searchPosts output error: {error}");
189189+ AppError::validation("searchPosts output error")
190190+ })?;
191191+192192+ serde_json::to_value(&output).map_err(AppError::from)
193193+}
194194+195195+pub async fn search_actors(
196196+ query: String, limit: Option<u32>, cursor: Option<String>, state: &AppState,
197197+) -> Result<serde_json::Value> {
198198+ validate_query(&query)?;
199199+ let session = get_session(state).await?;
200200+201201+ let output = session
202202+ .send(
203203+ SearchActors::new()
204204+ .q(Some(query.as_str().into()))
205205+ .limit(limit.map(|l| l as i64))
206206+ .cursor(cursor.as_deref().map(|c| c.into()))
207207+ .build(),
208208+ )
209209+ .await
210210+ .map_err(|error| {
211211+ log::error!("searchActors error: {error}");
212212+ AppError::validation("searchActors error")
213213+ })?
214214+ .into_output()
215215+ .map_err(|error| {
216216+ log::error!("searchActors output error: {error}");
217217+ AppError::validation("searchActors output error")
218218+ })?;
219219+220220+ serde_json::to_value(&output).map_err(AppError::from)
221221+}
222222+223223+pub async fn search_starter_packs(
224224+ query: String, limit: Option<u32>, cursor: Option<String>, state: &AppState,
225225+) -> Result<serde_json::Value> {
226226+ validate_query(&query)?;
227227+ let session = get_session(state).await?;
228228+229229+ let output = session
230230+ .send(
231231+ SearchStarterPacks::new()
232232+ .limit(limit.map(|l| l as i64))
233233+ .cursor(cursor.as_deref().map(|c| c.into()))
234234+ .q(query.as_str())
235235+ .build(),
236236+ )
237237+ .await
238238+ .map_err(|error| {
239239+ log::error!("searchStarterPacks error: {error}");
240240+ AppError::validation("searchStarterPacks error")
241241+ })?
242242+ .into_output()
243243+ .map_err(|error| {
244244+ log::error!("searchStarterPacks output error: {error}");
245245+ AppError::validation("searchStarterPacks output error")
246246+ })?;
247247+248248+ serde_json::to_value(&output).map_err(AppError::from)
249249+}
250250+251251+/// Sync the authenticated user's likes (or bookmarks) into the local DB.
252252+///
253253+/// Resumes from the last stored cursor so interrupted syncs never re-fetch the full history.
254254+/// On completion the cursor is cleared, allowing subsequent calls to pick up new items from the top of the feed.
255255+pub async fn sync_posts(did: String, source: String, state: &AppState) -> Result<SyncStatus> {
256256+ validate_source(&source)?;
257257+258258+ if source == "bookmark" {
259259+ return Err(AppError::validation("bookmark sync is not yet supported"));
260260+ }
261261+262262+ let session = get_session(state).await?;
263263+264264+ let mut cursor: Option<String> = {
265265+ let conn = state.auth_store.lock_connection()?;
266266+ db_load_sync_cursor(&conn, &did, &source)?
267267+ };
268268+269269+ log::info!("starting {source} sync for {did}, resume cursor: {cursor:?}");
270270+271271+ loop {
272272+ let output = session
273273+ .send(
274274+ GetActorLikes::new()
275275+ .limit(Some(100i64))
276276+ .cursor(cursor.as_deref().map(|c| c.into()))
277277+ .actor(AtIdentifier::Did(Did::new(&did)?))
278278+ .build(),
279279+ )
280280+ .await
281281+ .map_err(|error| {
282282+ log::error!("getActorLikes error: {error}");
283283+ AppError::validation("getActorLikes error")
284284+ })?
285285+ .into_output()
286286+ .map_err(|error| {
287287+ log::error!("getActorLikes output error: {error}");
288288+ AppError::validation("getActorLikes output error")
289289+ })?;
290290+291291+ let output_json = serde_json::to_value(&output)?;
292292+293293+ let feed = output_json
294294+ .get("feed")
295295+ .and_then(|v| v.as_array())
296296+ .cloned()
297297+ .unwrap_or_default();
298298+299299+ if feed.is_empty() {
300300+ log::info!("{source} sync for {did}: empty page, stopping");
301301+ break;
302302+ }
303303+304304+ let next_cursor = output_json.get("cursor").and_then(|v| v.as_str()).map(str::to_owned);
305305+306306+ {
307307+ let conn = state.auth_store.lock_connection()?;
308308+ for item in &feed {
309309+ db_upsert_post(&conn, item, &source)?;
310310+ }
311311+ db_save_sync_state(&conn, &did, &source, next_cursor.as_deref())?;
312312+ }
313313+314314+ log::debug!(
315315+ "{source} sync for {did}: upserted {} posts, next cursor: {next_cursor:?}",
316316+ feed.len()
317317+ );
318318+319319+ match next_cursor {
320320+ None => {
321321+ log::info!("{source} sync for {did}: reached end of feed");
322322+ break;
323323+ }
324324+ Some(c) => cursor = Some(c),
325325+ }
326326+ }
327327+328328+ let conn = state.auth_store.lock_connection()?;
329329+ db_sync_status(&conn, &source)
330330+}
331331+332332+/// Returns sync status for all sources for the given DID.
333333+pub fn get_sync_status(did: &str, state: &AppState) -> Result<Vec<SyncStatus>> {
334334+ let conn = state.auth_store.lock_connection()?;
335335+ let mut stmt = conn.prepare(
336336+ "SELECT ss.source,
337337+ COUNT(p.uri) AS post_count,
338338+ ss.cursor,
339339+ ss.last_synced_at
340340+ FROM sync_state ss
341341+ LEFT JOIN posts p ON p.source = ss.source
342342+ WHERE ss.did = ?1
343343+ GROUP BY ss.source",
344344+ )?;
345345+346346+ let rows = stmt.query_map(params![did], |row| {
347347+ Ok(SyncStatus {
348348+ source: row.get(0)?,
349349+ post_count: row.get(1)?,
350350+ cursor: row.get(2)?,
351351+ last_synced_at: row.get(3)?,
352352+ })
353353+ })?;
354354+355355+ rows.collect::<rusqlite::Result<Vec<_>>>().map_err(AppError::from)
356356+}
357357+358358+const EMBED_BATCH_SIZE: usize = 32;
359359+360360+fn resolve_models_dir(app: &AppHandle) -> Result<PathBuf> {
361361+ let mut dir = app
362362+ .path()
363363+ .app_data_dir()
364364+ .map_err(|error| AppError::PathResolve(error.to_string()))?;
365365+ dir.push("models");
366366+ std::fs::create_dir_all(&dir)?;
367367+ Ok(dir)
368368+}
369369+370370+fn db_get_embeddings_enabled(conn: &Connection) -> Result<bool> {
371371+ let val: Option<String> = conn
372372+ .query_row(
373373+ "SELECT value FROM app_settings WHERE key = 'embeddings_enabled'",
374374+ [],
375375+ |row| row.get(0),
376376+ )
377377+ .optional()?;
378378+ Ok(val.map(|v| v != "0").unwrap_or(true))
379379+}
380380+381381+fn db_set_embeddings_enabled(conn: &Connection, enabled: bool) -> Result<()> {
382382+ conn.execute(
383383+ "INSERT INTO app_settings(key, value) VALUES('embeddings_enabled', ?1)
384384+ ON CONFLICT(key) DO UPDATE SET value = excluded.value",
385385+ params![if enabled { "1" } else { "0" }],
386386+ )?;
387387+ Ok(())
388388+}
389389+390390+/// Returns (uri, text) for posts that have no embedding yet.
391391+fn db_posts_without_embeddings(conn: &Connection) -> Result<Vec<(String, String)>> {
392392+ let mut stmt = conn.prepare(
393393+ "SELECT p.uri, p.text
394394+ FROM posts p
395395+ WHERE p.text IS NOT NULL
396396+ AND p.text != ''
397397+ AND p.uri NOT IN (SELECT uri FROM posts_vec)",
398398+ )?;
399399+400400+ let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
401401+ rows.collect::<rusqlite::Result<Vec<_>>>().map_err(AppError::from)
402402+}
403403+404404+/// Returns (uri, text) for ALL posts that have non-empty text.
405405+fn db_all_posts_with_text(conn: &Connection) -> Result<Vec<(String, String)>> {
406406+ let mut stmt = conn.prepare("SELECT uri, text FROM posts WHERE text IS NOT NULL AND text != ''")?;
407407+408408+ let rows = stmt.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))?;
409409+ rows.collect::<rusqlite::Result<Vec<_>>>().map_err(AppError::from)
410410+}
411411+412412+fn db_upsert_embedding(conn: &Connection, uri: &str, embedding: &[f32]) -> Result<()> {
413413+ let bytes: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
414414+ conn.execute(
415415+ "INSERT OR REPLACE INTO posts_vec(uri, embedding) VALUES(?1, ?2)",
416416+ params![uri, bytes.as_slice()],
417417+ )?;
418418+ Ok(())
419419+}
420420+421421+fn embed_posts(posts: &[(String, String)], models_dir: PathBuf, state: &AppState) -> Result<usize> {
422422+ if posts.is_empty() {
423423+ return Ok(0);
424424+ }
425425+426426+ let mut model = TextEmbedding::try_new(
427427+ TextInitOptions::new(EmbeddingModel::NomicEmbedTextV15)
428428+ .with_cache_dir(models_dir)
429429+ .with_show_download_progress(false),
430430+ )
431431+ .map_err(|error| AppError::validation(format!("failed to init embedding model: {error}")))?;
432432+433433+ let mut total = 0usize;
434434+435435+ for chunk in posts.chunks(EMBED_BATCH_SIZE) {
436436+ let texts: Vec<String> = chunk.iter().map(|(_, text)| text.clone()).collect();
437437+ let embeddings = model
438438+ .embed(texts, Some(EMBED_BATCH_SIZE))
439439+ .map_err(|error| AppError::validation(format!("embedding error: {error}")))?;
440440+441441+ let conn = state.auth_store.lock_connection()?;
442442+ for ((uri, _), embedding) in chunk.iter().zip(embeddings.iter()) {
443443+ db_upsert_embedding(&conn, uri, embedding)?;
444444+ }
445445+ total += chunk.len();
446446+ }
447447+448448+ Ok(total)
449449+}
450450+451451+/// Embed all posts that do not yet have an embedding. Skipped when embeddings are disabled.
452452+pub fn embed_pending_posts(app: &AppHandle, state: &AppState) -> Result<usize> {
453453+ let enabled = {
454454+ let conn = state.auth_store.lock_connection()?;
455455+ db_get_embeddings_enabled(&conn)?
456456+ };
457457+ if !enabled {
458458+ log::info!("embeddings disabled, skipping embed_pending_posts");
459459+ return Ok(0);
460460+ }
461461+462462+ let posts = {
463463+ let conn = state.auth_store.lock_connection()?;
464464+ db_posts_without_embeddings(&conn)?
465465+ };
466466+467467+ log::info!("embedding {} pending posts", posts.len());
468468+ let models_dir = resolve_models_dir(app)?;
469469+ embed_posts(&posts, models_dir, state)
470470+}
471471+472472+/// Clear all embeddings from `posts_vec` then re-embed every post.
473473+pub fn reindex_embeddings(app: &AppHandle, state: &AppState) -> Result<usize> {
474474+ {
475475+ let conn = state.auth_store.lock_connection()?;
476476+ conn.execute("DELETE FROM posts_vec", [])?;
477477+ }
478478+ log::info!("cleared posts_vec for reindex");
479479+480480+ let posts = {
481481+ let conn = state.auth_store.lock_connection()?;
482482+ db_all_posts_with_text(&conn)?
483483+ };
484484+485485+ log::info!("reindexing {} posts", posts.len());
486486+ let models_dir = resolve_models_dir(app)?;
487487+ embed_posts(&posts, models_dir, state)
488488+}
489489+490490+/// Persist the embeddings-enabled preference.
491491+pub fn set_embeddings_enabled(enabled: bool, state: &AppState) -> Result<()> {
492492+ let conn = state.auth_store.lock_connection()?;
493493+ db_set_embeddings_enabled(&conn, enabled)
494494+}
495495+496496+#[cfg(test)]
497497+mod tests {
498498+ use super::{
499499+ db_get_embeddings_enabled, db_load_sync_cursor, db_post_count, db_save_sync_state, db_set_embeddings_enabled,
500500+ db_upsert_post, validate_query, validate_source,
501501+ };
502502+ use rusqlite::Connection;
503503+504504+ /// Minimal schema for unit tests w/o FTS/vec tables.
505505+ fn test_db() -> Connection {
506506+ let conn = Connection::open_in_memory().expect("in-memory db should open");
507507+ conn.execute_batch(
508508+ "CREATE TABLE posts (
509509+ uri TEXT PRIMARY KEY,
510510+ cid TEXT NOT NULL,
511511+ author_did TEXT NOT NULL,
512512+ author_handle TEXT,
513513+ text TEXT,
514514+ created_at TEXT,
515515+ indexed_at TEXT DEFAULT CURRENT_TIMESTAMP,
516516+ json_record TEXT,
517517+ source TEXT NOT NULL
518518+ );
519519+ CREATE TABLE sync_state (
520520+ did TEXT NOT NULL,
521521+ source TEXT NOT NULL,
522522+ cursor TEXT,
523523+ last_synced_at TEXT,
524524+ PRIMARY KEY (did, source)
525525+ );
526526+ CREATE TABLE app_settings (
527527+ key TEXT PRIMARY KEY,
528528+ value TEXT NOT NULL
529529+ );",
530530+ )
531531+ .expect("test schema should apply");
532532+ conn
533533+ }
534534+535535+ fn feed_item(uri: &str, cid: &str, did: &str, handle: &str, text: &str) -> serde_json::Value {
536536+ serde_json::json!({
537537+ "post": {
538538+ "uri": uri,
539539+ "cid": cid,
540540+ "author": { "did": did, "handle": handle },
541541+ "record": { "$type": "app.bsky.feed.post", "text": text, "createdAt": "2024-01-01T00:00:00Z" }
542542+ }
543543+ })
544544+ }
545545+546546+ #[test]
547547+ fn empty_query_is_rejected() {
548548+ assert!(validate_query("").is_err());
549549+ }
550550+551551+ #[test]
552552+ fn whitespace_only_query_is_rejected() {
553553+ assert!(validate_query(" ").is_err());
554554+ }
555555+556556+ #[test]
557557+ fn valid_query_is_accepted() {
558558+ assert!(validate_query("rust programming").is_ok());
559559+ }
560560+561561+ #[test]
562562+ fn single_char_query_is_accepted() {
563563+ assert!(validate_query("a").is_ok());
564564+ }
565565+566566+ #[test]
567567+ fn from_handle_syntax_is_accepted() {
568568+ assert!(validate_query("from:alice.bsky.social hello").is_ok());
569569+ }
570570+571571+ #[test]
572572+ fn valid_sources_are_accepted() {
573573+ assert!(validate_source("like").is_ok());
574574+ assert!(validate_source("bookmark").is_ok());
575575+ }
576576+577577+ #[test]
578578+ fn unknown_source_is_rejected() {
579579+ assert!(validate_source("repost").is_err());
580580+ assert!(validate_source("").is_err());
581581+ }
582582+583583+ #[test]
584584+ fn cursor_is_none_when_no_sync_state_row_exists() {
585585+ let conn = test_db();
586586+ let cursor = db_load_sync_cursor(&conn, "did:plc:alice", "like").unwrap();
587587+ assert!(cursor.is_none());
588588+ }
589589+590590+ #[test]
591591+ fn save_and_load_cursor_roundtrips() {
592592+ let conn = test_db();
593593+ db_save_sync_state(&conn, "did:plc:alice", "like", Some("cursor-abc")).unwrap();
594594+ let loaded = db_load_sync_cursor(&conn, "did:plc:alice", "like").unwrap();
595595+ assert_eq!(loaded.as_deref(), Some("cursor-abc"));
596596+ }
597597+598598+ #[test]
599599+ fn saving_none_cursor_clears_stored_cursor() {
600600+ let conn = test_db();
601601+ db_save_sync_state(&conn, "did:plc:alice", "like", Some("cursor-abc")).unwrap();
602602+ db_save_sync_state(&conn, "did:plc:alice", "like", None).unwrap();
603603+ let loaded = db_load_sync_cursor(&conn, "did:plc:alice", "like").unwrap();
604604+ assert!(loaded.is_none());
605605+ }
606606+607607+ #[test]
608608+ fn cursor_is_per_did_and_source() {
609609+ let conn = test_db();
610610+ db_save_sync_state(&conn, "did:plc:alice", "like", Some("cursor-alice-like")).unwrap();
611611+ db_save_sync_state(&conn, "did:plc:alice", "bookmark", Some("cursor-alice-bm")).unwrap();
612612+ db_save_sync_state(&conn, "did:plc:bob", "like", Some("cursor-bob-like")).unwrap();
613613+614614+ assert_eq!(
615615+ db_load_sync_cursor(&conn, "did:plc:alice", "like").unwrap().as_deref(),
616616+ Some("cursor-alice-like")
617617+ );
618618+ assert_eq!(
619619+ db_load_sync_cursor(&conn, "did:plc:alice", "bookmark")
620620+ .unwrap()
621621+ .as_deref(),
622622+ Some("cursor-alice-bm")
623623+ );
624624+ assert_eq!(
625625+ db_load_sync_cursor(&conn, "did:plc:bob", "like").unwrap().as_deref(),
626626+ Some("cursor-bob-like")
627627+ );
628628+ }
629629+630630+ #[test]
631631+ fn upsert_inserts_new_post() {
632632+ let conn = test_db();
633633+ let item = feed_item(
634634+ "at://did:plc:a/app.bsky.feed.post/1",
635635+ "cid1",
636636+ "did:plc:a",
637637+ "alice",
638638+ "hello",
639639+ );
640640+ db_upsert_post(&conn, &item, "like").unwrap();
641641+ assert_eq!(db_post_count(&conn, "like").unwrap(), 1);
642642+ }
643643+644644+ #[test]
645645+ fn upsert_is_idempotent_for_same_uri() {
646646+ let conn = test_db();
647647+ let item = feed_item(
648648+ "at://did:plc:a/app.bsky.feed.post/1",
649649+ "cid1",
650650+ "did:plc:a",
651651+ "alice",
652652+ "hello",
653653+ );
654654+ db_upsert_post(&conn, &item, "like").unwrap();
655655+ db_upsert_post(&conn, &item, "like").unwrap();
656656+ assert_eq!(db_post_count(&conn, "like").unwrap(), 1);
657657+ }
658658+659659+ #[test]
660660+ fn upsert_updates_text_on_conflict() {
661661+ let conn = test_db();
662662+ let original = feed_item(
663663+ "at://did:plc:a/app.bsky.feed.post/1",
664664+ "cid1",
665665+ "did:plc:a",
666666+ "alice",
667667+ "original",
668668+ );
669669+ db_upsert_post(&conn, &original, "like").unwrap();
670670+671671+ let updated = feed_item(
672672+ "at://did:plc:a/app.bsky.feed.post/1",
673673+ "cid2",
674674+ "did:plc:a",
675675+ "alice",
676676+ "updated",
677677+ );
678678+ db_upsert_post(&conn, &updated, "like").unwrap();
679679+680680+ let text: String = conn
681681+ .query_row(
682682+ "SELECT text FROM posts WHERE uri = ?1",
683683+ ["at://did:plc:a/app.bsky.feed.post/1"],
684684+ |r| r.get(0),
685685+ )
686686+ .unwrap();
687687+ assert_eq!(text, "updated");
688688+ }
689689+690690+ #[test]
691691+ fn upsert_stores_source() {
692692+ let conn = test_db();
693693+ let item = feed_item(
694694+ "at://did:plc:a/app.bsky.feed.post/1",
695695+ "cid1",
696696+ "did:plc:a",
697697+ "alice",
698698+ "hi",
699699+ );
700700+ db_upsert_post(&conn, &item, "bookmark").unwrap();
701701+ let source: String = conn
702702+ .query_row(
703703+ "SELECT source FROM posts WHERE uri = ?1",
704704+ ["at://did:plc:a/app.bsky.feed.post/1"],
705705+ |r| r.get(0),
706706+ )
707707+ .unwrap();
708708+ assert_eq!(source, "bookmark");
709709+ }
710710+711711+ #[test]
712712+ fn upsert_rejects_item_missing_uri() {
713713+ let conn = test_db();
714714+ let bad = serde_json::json!({ "post": { "cid": "cid1", "author": { "did": "x" } } });
715715+ assert!(db_upsert_post(&conn, &bad, "like").is_err());
716716+ }
717717+718718+ #[test]
719719+ fn post_count_is_per_source() {
720720+ let conn = test_db();
721721+ db_upsert_post(
722722+ &conn,
723723+ &feed_item("at://a/app.bsky.feed.post/1", "c1", "did:plc:a", "a", "t"),
724724+ "like",
725725+ )
726726+ .unwrap();
727727+ db_upsert_post(
728728+ &conn,
729729+ &feed_item("at://a/app.bsky.feed.post/2", "c2", "did:plc:a", "a", "t"),
730730+ "bookmark",
731731+ )
732732+ .unwrap();
733733+ assert_eq!(db_post_count(&conn, "like").unwrap(), 1);
734734+ assert_eq!(db_post_count(&conn, "bookmark").unwrap(), 1);
735735+ }
736736+737737+ #[test]
738738+ fn embeddings_enabled_defaults_to_true_when_row_absent() {
739739+ let conn = test_db();
740740+ assert!(db_get_embeddings_enabled(&conn).unwrap());
741741+ }
742742+743743+ #[test]
744744+ fn set_embeddings_enabled_false_persists() {
745745+ let conn = test_db();
746746+ db_set_embeddings_enabled(&conn, false).unwrap();
747747+ assert!(!db_get_embeddings_enabled(&conn).unwrap());
748748+ }
749749+750750+ #[test]
751751+ fn set_embeddings_enabled_true_persists() {
752752+ let conn = test_db();
753753+ db_set_embeddings_enabled(&conn, false).unwrap();
754754+ db_set_embeddings_enabled(&conn, true).unwrap();
755755+ assert!(db_get_embeddings_enabled(&conn).unwrap());
756756+ }
757757+758758+ #[test]
759759+ fn embeddings_enabled_toggle_is_idempotent() {
760760+ let conn = test_db();
761761+ conn.execute(
762762+ "INSERT INTO app_settings(key, value) VALUES('embeddings_enabled', '1')",
763763+ [],
764764+ )
765765+ .unwrap();
766766+ db_set_embeddings_enabled(&conn, false).unwrap();
767767+ db_set_embeddings_enabled(&conn, false).unwrap();
768768+ assert!(!db_get_embeddings_enabled(&conn).unwrap());
769769+ }
770770+}