a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

build: switch to fts5

+429 -56
+37 -9
packages/api/internal/backfill/backfill.go
··· 84 84 85 85 alreadyTracked := 0 86 86 inProgress := 0 87 + statusFailures := 0 87 88 toSubmit := make([]string, 0, len(discovered)) 88 89 for _, user := range discovered { 89 90 status, err := r.tap.RepoStatus(ctx, user.DID) 90 91 if err != nil { 91 - return fmt.Errorf("tap info for %s: %w", user.DID, err) 92 + statusFailures++ 93 + r.log.Warn("tap classification failed", 94 + slog.String("did", user.DID), 95 + slog.String("error", err.Error()), 96 + ) 97 + continue 92 98 } 93 99 if status.Tracked && status.Backfilled { 94 100 alreadyTracked++ ··· 104 110 r.log.Info("tap classification complete", 105 111 slog.Int("already_tracked", alreadyTracked), 106 112 slog.Int("backfill_in_progress", inProgress), 113 + slog.Int("status_failures", statusFailures), 107 114 slog.Int("to_submit", len(toSubmit)), 108 115 ) 109 116 110 117 submitted := 0 118 + submitFailures := 0 111 119 for i := 0; i < len(toSubmit); i += opts.BatchSize { 112 120 end := i + opts.BatchSize 113 121 if end > len(toSubmit) { ··· 115 123 } 116 124 batch := toSubmit[i:end] 117 125 if err := r.tap.AddRepos(ctx, batch); err != nil { 118 - return fmt.Errorf("submit batch %d-%d: %w", i, end, err) 126 + r.log.Warn("tap batch submission failed", 127 + slog.Int("batch_start", i), 128 + slog.Int("batch_end", end), 129 + slog.Int("batch_size", len(batch)), 130 + slog.String("error", err.Error()), 131 + ) 132 + for _, did := range batch { 133 + if err := r.tap.AddRepos(ctx, []string{did}); err != nil { 134 + submitFailures++ 135 + r.log.Warn("tap repo submission failed", 136 + slog.String("did", did), 137 + slog.String("error", err.Error()), 138 + ) 139 + continue 140 + } 141 + submitted++ 142 + r.log.Info("submitted Tap repo", slog.String("did", did), slog.Int("submitted_total", submitted)) 143 + } 144 + } else { 145 + submitted += len(batch) 146 + r.log.Info("submitted Tap batch", 147 + slog.Int("batch_start", i), 148 + slog.Int("batch_end", end), 149 + slog.Int("batch_size", len(batch)), 150 + slog.Int("submitted_total", submitted), 151 + ) 119 152 } 120 - submitted += len(batch) 121 - r.log.Info("submitted Tap batch", 122 - slog.Int("batch_start", i), 123 - slog.Int("batch_end", end), 124 - slog.Int("batch_size", len(batch)), 125 - slog.Int("submitted_total", submitted), 126 - ) 127 153 if end < len(toSubmit) && opts.BatchDelay > 0 { 128 154 select { 129 155 case <-ctx.Done(): ··· 138 164 slog.Int("already_tracked", alreadyTracked), 139 165 slog.Int("backfill_in_progress", inProgress), 140 166 slog.Int("submitted", submitted), 167 + slog.Int("status_failures", statusFailures), 168 + slog.Int("submit_failures", submitFailures), 141 169 ) 142 170 return nil 143 171 }
+110 -2
packages/api/internal/backfill/backfill_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "errors" 5 6 "io" 6 7 "log/slog" 7 8 "os" 8 9 "path/filepath" 10 + "strings" 9 11 "testing" 10 12 ) 11 13 ··· 26 28 } 27 29 28 30 type fakeTapAdmin struct { 29 - statuses map[string]RepoStatus 30 - added [][]string 31 + statuses map[string]RepoStatus 32 + statusErrs map[string]error 33 + added [][]string 34 + addReposError func(dids []string) error 31 35 } 32 36 33 37 func (f *fakeTapAdmin) RepoStatus(_ context.Context, did string) (RepoStatus, error) { 38 + if err, ok := f.statusErrs[did]; ok { 39 + return RepoStatus{}, err 40 + } 34 41 if status, ok := f.statuses[did]; ok { 35 42 return status, nil 36 43 } ··· 38 45 } 39 46 40 47 func (f *fakeTapAdmin) AddRepos(_ context.Context, dids []string) error { 48 + if f.addReposError != nil { 49 + if err := f.addReposError(dids); err != nil { 50 + return err 51 + } 52 + } 41 53 batch := make([]string, len(dids)) 42 54 copy(batch, dids) 43 55 f.added = append(f.added, batch) ··· 144 156 t.Fatalf("expected no submission for in-progress did, got %#v", tap.added) 145 157 } 146 158 } 159 + 160 + func TestRunner_ContinuesWhenRepoStatusFails(t *testing.T) { 161 + st := &fakeStore{ 162 + collaborators: map[string][]string{ 163 + "did:plc:seed": {"did:plc:good", "did:plc:bad"}, 164 + }, 165 + } 166 + follows := &fakeFollowFetcher{follows: map[string][]string{}} 167 + tap := &fakeTapAdmin{ 168 + statuses: map[string]RepoStatus{}, 169 + statusErrs: map[string]error{"did:plc:bad": errors.New("tap info request failed: status 502")}, 170 + } 171 + resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 172 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 173 + r := NewRunnerWithDeps(st, tap, resolver, follows, log) 174 + 175 + dir := t.TempDir() 176 + seedsPath := filepath.Join(dir, "seeds.txt") 177 + if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 178 + t.Fatalf("write seeds: %v", err) 179 + } 180 + 181 + err := r.Run(context.Background(), Options{ 182 + SeedsPath: seedsPath, 183 + MaxHops: 1, 184 + Concurrency: 1, 185 + BatchSize: 10, 186 + }) 187 + if err != nil { 188 + t.Fatalf("run backfill: %v", err) 189 + } 190 + 191 + if len(tap.added) != 1 { 192 + t.Fatalf("expected one submission batch, got %d", len(tap.added)) 193 + } 194 + if len(tap.added[0]) != 2 { 195 + t.Fatalf("expected seed and good DID submitted, got %#v", tap.added[0]) 196 + } 197 + for _, did := range tap.added[0] { 198 + if did == "did:plc:bad" { 199 + t.Fatalf("did with status error should have been skipped, got %#v", tap.added[0]) 200 + } 201 + } 202 + } 203 + 204 + func TestRunner_FallsBackToSingleRepoSubmissionOnBatchFailure(t *testing.T) { 205 + st := &fakeStore{ 206 + collaborators: map[string][]string{ 207 + "did:plc:seed": {"did:plc:good", "did:plc:bad"}, 208 + }, 209 + } 210 + follows := &fakeFollowFetcher{follows: map[string][]string{}} 211 + tap := &fakeTapAdmin{ 212 + statuses: map[string]RepoStatus{}, 213 + addReposError: func(dids []string) error { 214 + if len(dids) > 1 { 215 + return errors.New("repos add failed: status 502") 216 + } 217 + if len(dids) == 1 && strings.Contains(dids[0], "bad") { 218 + return errors.New("repos add failed: status 502") 219 + } 220 + return nil 221 + }, 222 + } 223 + resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 224 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 225 + r := NewRunnerWithDeps(st, tap, resolver, follows, log) 226 + 227 + dir := t.TempDir() 228 + seedsPath := filepath.Join(dir, "seeds.txt") 229 + if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 230 + t.Fatalf("write seeds: %v", err) 231 + } 232 + 233 + err := r.Run(context.Background(), Options{ 234 + SeedsPath: seedsPath, 235 + MaxHops: 1, 236 + Concurrency: 1, 237 + BatchSize: 10, 238 + }) 239 + if err != nil { 240 + t.Fatalf("run backfill: %v", err) 241 + } 242 + 243 + if len(tap.added) != 2 { 244 + t.Fatalf("expected successful individual fallbacks only, got %#v", tap.added) 245 + } 246 + for _, batch := range tap.added { 247 + if len(batch) != 1 { 248 + t.Fatalf("expected only single-DID successful submissions after batch fallback, got %#v", tap.added) 249 + } 250 + if batch[0] == "did:plc:bad" { 251 + t.Fatalf("bad DID should not have been successfully submitted, got %#v", tap.added) 252 + } 253 + } 254 + }
+49 -23
packages/api/internal/search/search.go
··· 24 24 25 25 // Result is a single search hit. 26 26 type Result struct { 27 - ID string `json:"id"` 28 - Collection string `json:"collection"` 29 - RecordType string `json:"record_type"` 30 - Title string `json:"title"` 31 - BodySnippet string `json:"body_snippet,omitempty"` 32 - Summary string `json:"summary,omitempty"` 33 - RepoName string `json:"repo_name,omitempty"` 34 - AuthorHandle string `json:"author_handle,omitempty"` 35 - DID string `json:"did"` 36 - ATURI string `json:"at_uri"` 37 - Score float64 `json:"score"` 27 + ID string `json:"id"` 28 + Collection string `json:"collection"` 29 + RecordType string `json:"record_type"` 30 + Title string `json:"title"` 31 + BodySnippet string `json:"body_snippet,omitempty"` 32 + Summary string `json:"summary,omitempty"` 33 + RepoName string `json:"repo_name,omitempty"` 34 + AuthorHandle string `json:"author_handle,omitempty"` 35 + DID string `json:"did"` 36 + ATURI string `json:"at_uri"` 37 + Score float64 `json:"score"` 38 38 MatchedBy []string `json:"matched_by"` 39 - CreatedAt string `json:"created_at,omitempty"` 40 - UpdatedAt string `json:"updated_at,omitempty"` 39 + CreatedAt string `json:"created_at,omitempty"` 40 + UpdatedAt string `json:"updated_at,omitempty"` 41 41 } 42 42 43 43 // Response is the search API response envelope. ··· 67 67 68 68 // Keyword runs a full-text keyword search. 69 69 func (r *Repository) Keyword(ctx context.Context, p Params) (*Response, error) { 70 + ftsQuery := toFTS5Query(p.Query) 71 + 70 72 // Build filter conditions beyond the base FTS match. 71 73 var filters []string 72 74 var filterArgs []any ··· 108 110 filterArgs = append(filterArgs, p.State) 109 111 } 110 112 111 - where := "fts_match(d.title, d.body, d.summary, d.repo_name, d.author_handle, d.tags_json, ?) AND d.deleted_at IS NULL" 113 + where := "documents_fts MATCH ? AND d.deleted_at IS NULL" 112 114 if len(filters) > 0 { 113 115 where += " AND " + strings.Join(filters, " AND ") 114 116 } 115 117 116 118 // Count total matching documents. 117 - countSQL := fmt.Sprintf("SELECT COUNT(*) FROM documents d %s WHERE %s", join, where) 118 - countArgs := append([]any{p.Query}, filterArgs...) 119 + countSQL := fmt.Sprintf("SELECT COUNT(*) FROM documents_fts JOIN documents d ON d.id = documents_fts.id %s WHERE %s", join, where) 120 + countArgs := append([]any{ftsQuery}, filterArgs...) 119 121 120 122 var total int 121 123 if err := r.db.QueryRowContext(ctx, countSQL, countArgs...).Scan(&total); err != nil { 122 - return nil, fmt.Errorf("count: %w", err) 124 + return nil, explainNativeFTSError("count", err) 123 125 } 124 126 125 127 // Fetch results with score and snippet. 126 128 resultsSQL := fmt.Sprintf(` 127 129 SELECT d.id, d.title, d.summary, d.repo_name, d.author_handle, 128 130 d.did, d.at_uri, d.collection, d.record_type, d.created_at, d.updated_at, 129 - fts_score(d.title, d.body, d.summary, d.repo_name, d.author_handle, d.tags_json, ?) AS score, 130 - fts_highlight(d.body, '<mark>', '</mark>', ?) AS body_snippet 131 - FROM documents d 131 + -bm25(documents_fts, 0.0, 3.0, 1.0, 1.5, 2.5, 2.0, 1.2) AS score, 132 + snippet(documents_fts, 2, '<mark>', '</mark>', '...', 20) AS body_snippet 133 + FROM documents_fts 134 + JOIN documents d ON d.id = documents_fts.id 132 135 %s 133 136 WHERE %s 134 137 ORDER BY score DESC 135 138 LIMIT ? OFFSET ?`, join, where) 136 139 137 - resultsArgs := make([]any, 0, 3+len(filterArgs)+2) 138 - resultsArgs = append(resultsArgs, p.Query, p.Query, p.Query) // score, highlight, match 140 + resultsArgs := make([]any, 0, 1+len(filterArgs)+2) 141 + resultsArgs = append(resultsArgs, ftsQuery) 139 142 resultsArgs = append(resultsArgs, filterArgs...) 140 143 resultsArgs = append(resultsArgs, p.Limit, p.Offset) 141 144 142 145 rows, err := r.db.QueryContext(ctx, resultsSQL, resultsArgs...) 143 146 if err != nil { 144 - return nil, fmt.Errorf("search: %w", err) 147 + return nil, explainNativeFTSError("search", err) 145 148 } 146 149 defer rows.Close() 147 150 ··· 182 185 Results: results, 183 186 }, nil 184 187 } 188 + 189 + func explainNativeFTSError(op string, err error) error { 190 + msg := err.Error() 191 + if strings.Contains(msg, "no such table: documents_fts") || 192 + strings.Contains(msg, "no such module: fts5") { 193 + return fmt.Errorf("%s: SQLite FTS5 is unavailable on this database; ensure the FTS5 migration succeeded and that Turso SQLite extensions are enabled for this database/group: %w", op, err) 194 + } 195 + return fmt.Errorf("%s: %w", op, err) 196 + } 197 + 198 + func toFTS5Query(raw string) string { 199 + parts := strings.Fields(raw) 200 + if len(parts) == 0 { 201 + return `""` 202 + } 203 + 204 + quoted := make([]string, 0, len(parts)) 205 + for _, part := range parts { 206 + part = strings.ReplaceAll(part, `"`, `""`) 207 + quoted = append(quoted, `"`+part+`"`) 208 + } 209 + return strings.Join(quoted, " OR ") 210 + }
+81
packages/api/internal/search/search_test.go
··· 1 + package search_test 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "path/filepath" 7 + "testing" 8 + 9 + "tangled.org/desertthunder.dev/twister/internal/search" 10 + "tangled.org/desertthunder.dev/twister/internal/store" 11 + ) 12 + 13 + func TestKeywordSearchUsesFTS5Index(t *testing.T) { 14 + dir := t.TempDir() 15 + dbPath := filepath.Join(dir, "search.db") 16 + url := "file:" + dbPath 17 + 18 + db, err := store.Open(url, "") 19 + if err != nil { 20 + t.Fatalf("open: %v", err) 21 + } 22 + t.Cleanup(func() { 23 + _ = db.Close() 24 + _ = os.Remove(dbPath) 25 + }) 26 + 27 + if err := store.Migrate(db, url); err != nil { 28 + t.Fatalf("migrate: %v", err) 29 + } 30 + 31 + st := store.New(db) 32 + repo := search.NewRepository(db) 33 + ctx := context.Background() 34 + 35 + doc := &store.Document{ 36 + ID: "did:plc:abc|sh.tangled.repo|desert", 37 + DID: "did:plc:abc", 38 + Collection: "sh.tangled.repo", 39 + RKey: "desert", 40 + ATURI: "at://did:plc:abc/sh.tangled.repo/desert", 41 + CID: "bafyreidesert", 42 + RecordType: "repo", 43 + Title: "desert-runner", 44 + Body: "desert search repository", 45 + Summary: "index me", 46 + RepoName: "desert-runner", 47 + AuthorHandle: "owais.tangled.org", 48 + } 49 + if err := st.UpsertDocument(ctx, doc); err != nil { 50 + t.Fatalf("upsert doc: %v", err) 51 + } 52 + 53 + resp, err := repo.Keyword(ctx, search.Params{Query: "desert", Limit: 10}) 54 + if err != nil { 55 + t.Fatalf("keyword search: %v", err) 56 + } 57 + if resp.Total != 1 { 58 + t.Fatalf("total: got %d want 1", resp.Total) 59 + } 60 + if len(resp.Results) != 1 { 61 + t.Fatalf("results length: got %d want 1", len(resp.Results)) 62 + } 63 + if resp.Results[0].ID != doc.ID { 64 + t.Fatalf("result id: got %q want %q", resp.Results[0].ID, doc.ID) 65 + } 66 + if resp.Results[0].BodySnippet == "" { 67 + t.Fatal("expected body snippet") 68 + } 69 + 70 + if err := st.MarkDeleted(ctx, doc.ID); err != nil { 71 + t.Fatalf("mark deleted: %v", err) 72 + } 73 + 74 + resp, err = repo.Keyword(ctx, search.Params{Query: "desert", Limit: 10}) 75 + if err != nil { 76 + t.Fatalf("keyword search after delete: %v", err) 77 + } 78 + if resp.Total != 0 || len(resp.Results) != 0 { 79 + t.Fatalf("expected no results after delete, got total=%d len=%d", resp.Total, len(resp.Results)) 80 + } 81 + }
+32 -6
packages/api/internal/store/db.go
··· 17 17 18 18 var extensionMigrationNoticeLogged bool 19 19 20 + type migrationMode struct { 21 + allowTursoExtensionSkip bool 22 + targetDescription string 23 + } 24 + 20 25 // Open establishes a connection to the database. 21 26 // For remote Turso URLs (libsql:// or https://) it uses the libsql-client-go driver. 22 27 // For local file: URLs it uses the pure-Go SQLite driver (no CGo required). ··· 46 51 } 47 52 48 53 // Migrate runs all embedded SQL migration files in order. 49 - func Migrate(db *sql.DB) error { 54 + func Migrate(db *sql.DB, url string) error { 55 + mode := migrationMode{ 56 + allowTursoExtensionSkip: strings.HasPrefix(url, "file:"), 57 + targetDescription: migrationTargetDescription(url), 58 + } 50 59 entries, err := migrationsFS.ReadDir("migrations") 51 60 if err != nil { 52 61 return fmt.Errorf("read migrations dir: %w", err) ··· 62 71 if err != nil { 63 72 return fmt.Errorf("read migration %s: %w", entry.Name(), err) 64 73 } 65 - if err := execMigration(db, entry.Name(), string(data)); err != nil { 74 + if err := execMigration(db, entry.Name(), string(data), mode); err != nil { 66 75 return err 67 76 } 68 77 slog.Info("migration applied", "file", entry.Name()) ··· 70 79 return nil 71 80 } 72 81 73 - func execMigration(db *sql.DB, name, content string) error { 82 + func execMigration(db *sql.DB, name, content string, mode migrationMode) error { 74 83 for _, stmt := range splitStatements(content) { 75 84 if _, err := db.Exec(stmt); err != nil { 76 85 upper := strings.ToUpper(stmt) 77 - if strings.Contains(upper, "USING FTS") || strings.Contains(upper, "LIBSQL_VECTOR_IDX") { 86 + if strings.Contains(upper, "LIBSQL_VECTOR_IDX") { 78 87 if !extensionMigrationNoticeLogged { 79 88 extensionMigrationNoticeLogged = true 80 - slog.Info("migration: skipping Turso extension indexes in this environment", 89 + slog.Info("migration: skipping unsupported extension index", 81 90 "migration", name, 82 - "reason", "database engine does not support Turso-specific FTS/vector DDL", 91 + "reason", "database engine does not support vector index DDL in this environment", 83 92 ) 84 93 } 85 94 continue 86 95 } 96 + if strings.Contains(upper, "CREATE VIRTUAL TABLE") && strings.Contains(upper, "USING FTS5") { 97 + return fmt.Errorf( 98 + "migration %s: SQLite FTS5 statement failed on %s: %w\nstatement: %s\nhint: this app uses SQLite FTS5 on Turso Cloud. Enable SQLite extensions for the Turso group/database before rerunning the service", 99 + name, mode.targetDescription, err, stmt, 100 + ) 101 + } 87 102 return fmt.Errorf("migration %s: exec failed: %w\nstatement: %s", name, err, stmt) 88 103 } 89 104 } 90 105 return nil 106 + } 107 + 108 + func migrationTargetDescription(url string) string { 109 + switch { 110 + case strings.HasPrefix(url, "file:"): 111 + return "local SQLite" 112 + case strings.HasPrefix(url, "libsql://"), strings.HasPrefix(url, "https://"): 113 + return "remote Turso/libSQL" 114 + default: 115 + return "database" 116 + } 91 117 } 92 118 93 119 func splitStatements(content string) []string {
+44
packages/api/internal/store/db_test.go
··· 1 + package store 2 + 3 + import ( 4 + "database/sql" 5 + "strings" 6 + "testing" 7 + 8 + _ "modernc.org/sqlite" 9 + ) 10 + 11 + func TestExecMigrationSkipsTursoExtensionDDLForLocalSQLite(t *testing.T) { 12 + db, err := sql.Open("sqlite", ":memory:") 13 + if err != nil { 14 + t.Fatalf("open sqlite: %v", err) 15 + } 16 + t.Cleanup(func() { _ = db.Close() }) 17 + 18 + err = execMigration(db, "003_documents_fts5.sql", "CREATE VIRTUAL TABLE documents_fts USING fts5(title);", migrationMode{ 19 + allowTursoExtensionSkip: true, 20 + targetDescription: "local SQLite", 21 + }) 22 + if err != nil { 23 + t.Fatalf("expected local SQLite migration to create FTS5 table: %v", err) 24 + } 25 + } 26 + 27 + func TestExecMigrationFailsForRemoteWhenNativeFTSUnavailable(t *testing.T) { 28 + db, err := sql.Open("sqlite", ":memory:") 29 + if err != nil { 30 + t.Fatalf("open sqlite: %v", err) 31 + } 32 + t.Cleanup(func() { _ = db.Close() }) 33 + 34 + err = execMigration(db, "003_documents_fts5.sql", "CREATE VIRTUAL TABLE documents_fts USING fts5(", migrationMode{ 35 + allowTursoExtensionSkip: false, 36 + targetDescription: "remote Turso/libSQL", 37 + }) 38 + if err == nil { 39 + t.Fatal("expected remote migration to fail when FTS5 is unavailable") 40 + } 41 + if !strings.Contains(err.Error(), "uses SQLite FTS5 on Turso Cloud") { 42 + t.Fatalf("unexpected error: %v", err) 43 + } 44 + }
-9
packages/api/internal/store/migrations/001_initial.sql
··· 32 32 33 33 CREATE INDEX IF NOT EXISTS idx_documents_deleted_at ON documents(deleted_at); 34 34 35 - CREATE INDEX IF NOT EXISTS idx_documents_fts ON documents USING fts ( 36 - title WITH tokenizer=default, 37 - body WITH tokenizer=default, 38 - summary WITH tokenizer=default, 39 - repo_name WITH tokenizer=simple, 40 - author_handle WITH tokenizer=raw, 41 - tags_json WITH tokenizer=simple 42 - ) WITH (weights='title=3.0,repo_name=2.5,author_handle=2.0,summary=1.5,tags_json=1.2,body=1.0'); 43 - 44 35 CREATE TABLE IF NOT EXISTS sync_state ( 45 36 consumer_name TEXT PRIMARY KEY, 46 37 cursor TEXT NOT NULL,
+23
packages/api/internal/store/migrations/003_documents_fts5.sql
··· 1 + CREATE VIRTUAL TABLE IF NOT EXISTS documents_fts USING fts5 ( 2 + id UNINDEXED, 3 + title, 4 + body, 5 + summary, 6 + repo_name, 7 + author_handle, 8 + tags_json, 9 + tokenize = 'unicode61' 10 + ); 11 + 12 + DELETE FROM documents_fts; 13 + 14 + INSERT INTO documents_fts (id, title, body, summary, repo_name, author_handle, tags_json) 15 + SELECT d.id, 16 + COALESCE(d.title, ''), 17 + COALESCE(d.body, ''), 18 + COALESCE(d.summary, ''), 19 + COALESCE(d.repo_name, ''), 20 + COALESCE(d.author_handle, ''), 21 + COALESCE(d.tags_json, '') 22 + FROM documents d 23 + WHERE d.deleted_at IS NULL;
+48 -2
packages/api/internal/store/sql_store.go
··· 20 20 21 21 func (s *SQLStore) UpsertDocument(ctx context.Context, doc *Document) error { 22 22 doc.IndexedAt = time.Now().UTC().Format(time.RFC3339) 23 - _, err := s.db.ExecContext(ctx, ` 23 + tx, err := s.db.BeginTx(ctx, nil) 24 + if err != nil { 25 + return fmt.Errorf("begin upsert document tx: %w", err) 26 + } 27 + defer tx.Rollback() 28 + 29 + _, err = tx.ExecContext(ctx, ` 24 30 INSERT INTO documents ( 25 31 id, did, collection, rkey, at_uri, cid, record_type, 26 32 title, body, summary, repo_did, repo_name, author_handle, ··· 51 57 ) 52 58 if err != nil { 53 59 return fmt.Errorf("upsert document: %w", err) 60 + } 61 + if err := syncDocumentFTS(ctx, tx, doc); err != nil { 62 + return err 63 + } 64 + if err := tx.Commit(); err != nil { 65 + return fmt.Errorf("commit upsert document tx: %w", err) 54 66 } 55 67 return nil 56 68 } ··· 74 86 75 87 func (s *SQLStore) MarkDeleted(ctx context.Context, id string) error { 76 88 now := time.Now().UTC().Format(time.RFC3339) 77 - _, err := s.db.ExecContext(ctx, 89 + tx, err := s.db.BeginTx(ctx, nil) 90 + if err != nil { 91 + return fmt.Errorf("begin mark deleted tx: %w", err) 92 + } 93 + defer tx.Rollback() 94 + 95 + _, err = tx.ExecContext(ctx, 78 96 `UPDATE documents SET deleted_at = ? WHERE id = ?`, now, id) 79 97 if err != nil { 80 98 return fmt.Errorf("mark deleted: %w", err) 99 + } 100 + if _, err := tx.ExecContext(ctx, `DELETE FROM documents_fts WHERE id = ?`, id); err != nil { 101 + return fmt.Errorf("delete document from fts: %w", err) 102 + } 103 + if err := tx.Commit(); err != nil { 104 + return fmt.Errorf("commit mark deleted tx: %w", err) 81 105 } 82 106 return nil 83 107 } ··· 289 313 } 290 314 return s 291 315 } 316 + 317 + type execer interface { 318 + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) 319 + } 320 + 321 + func syncDocumentFTS(ctx context.Context, db execer, doc *Document) error { 322 + if _, err := db.ExecContext(ctx, `DELETE FROM documents_fts WHERE id = ?`, doc.ID); err != nil { 323 + return fmt.Errorf("delete document from fts: %w", err) 324 + } 325 + if doc.DeletedAt != "" { 326 + return nil 327 + } 328 + _, err := db.ExecContext(ctx, ` 329 + INSERT INTO documents_fts (id, title, body, summary, repo_name, author_handle, tags_json) 330 + VALUES (?, ?, ?, ?, ?, ?, ?)`, 331 + doc.ID, doc.Title, doc.Body, doc.Summary, doc.RepoName, doc.AuthorHandle, doc.TagsJSON, 332 + ) 333 + if err != nil { 334 + return fmt.Errorf("insert document into fts: %w", err) 335 + } 336 + return nil 337 + }
+1 -1
packages/api/internal/store/store_test.go
··· 24 24 os.Remove(dbPath) 25 25 }) 26 26 27 - if err := store.Migrate(db); err != nil { 27 + if err := store.Migrate(db, url); err != nil { 28 28 t.Fatalf("migrate: %v", err) 29 29 } 30 30
+4 -4
packages/api/main.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log/slog" 7 - "os" 8 7 "net/http" 8 + "os" 9 9 "os/signal" 10 10 "syscall" 11 11 "time" ··· 82 82 } 83 83 defer db.Close() 84 84 85 - if err := store.Migrate(db); err != nil { 85 + if err := store.Migrate(db, cfg.TursoURL); err != nil { 86 86 return fmt.Errorf("migrate database: %w", err) 87 87 } 88 88 ··· 125 125 } 126 126 defer db.Close() 127 127 128 - if err := store.Migrate(db); err != nil { 128 + if err := store.Migrate(db, cfg.TursoURL); err != nil { 129 129 return fmt.Errorf("migrate database: %w", err) 130 130 } 131 131 ··· 220 220 } 221 221 defer db.Close() 222 222 223 - if err := store.Migrate(db); err != nil { 223 + if err := store.Migrate(db, cfg.TursoURL); err != nil { 224 224 return fmt.Errorf("migrate database: %w", err) 225 225 } 226 226