a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: reindex cmd

+316 -118
+7 -75
docs/api/tasks/phase-1-mvp.md
··· 100 100 - [x] Handle normalization failures: log, skip, advance cursor 101 101 - [x] Handle DB failures: retry with backoff, do not advance cursor 102 102 103 - ### Verification 104 - 105 - - [ ] Indexer connects to Tap via WebSocket in development 106 - - [ ] A newly created tracked record appears in `documents` table 107 - - [ ] An updated record changes the existing row (CID changes) 108 - - [ ] A delete event tombstones the row (`deleted_at` set) 109 - - [ ] Killing and restarting the indexer resumes from persisted cursor without duplication 110 - - [ ] Identity events update handle cache 111 - - [ ] Unsupported collections are silently skipped 112 - - [ ] Connection drops trigger automatic reconnection 113 - 114 103 ### Exit Criteria 115 104 116 105 The system continuously ingests and persists `sh.tangled.*` records from Tap. ··· 179 168 - repeat run after expanding the seed list 180 169 - dry-run before production mutation 181 170 - Implemented: `packages/api/internal/backfill/doc.go` 182 - 183 - ### Verification 184 - 185 - - [ ] A small seed file of known Tangled users produces a non-empty discovery graph 186 - - [ ] `--max-hops 1` limits discovery to direct neighbors 187 - - [ ] `--dry-run` does not call Tap mutation endpoints 188 - - [ ] Already-tracked DIDs are reported and not re-submitted unnecessarily 189 - - [ ] Re-running the same seeds is effectively idempotent 190 - - [ ] Newly submitted DIDs cause Tap to begin historical backfill 191 - - [ ] Search results become materially richer after bootstrap than they were under live-only ingestion 192 171 193 172 ### Exit Criteria 194 173 ··· 247 226 - [x] Add request logging middleware (method, path, status, duration) 248 227 - [x] Add CORS headers if needed 249 228 250 - ### Verification 251 - 252 - - [ ] Searching by exact repo name returns the expected repo first 253 - - [ ] Searching by title term returns expected documents 254 - - [ ] Searching by author handle returns relevant docs 255 - - [ ] Tombstoned documents do not appear 256 - - [ ] Malformed query parameters return 400 with error JSON 257 - - [ ] DB outage causes `/readyz` to fail (503) 258 - - [ ] Pagination works: `offset=0&limit=5` then `offset=5&limit=5` returns different results 259 - - [ ] Filter by collection returns only matching docs 260 - 261 229 ### Exit Criteria 262 230 263 231 A user can search Tangled content reliably with keyword search. ··· 302 270 - [x] Result card links open canonical Tangled URLs in new tab 303 271 - [x] Verify total site weight under 50 KB (excluding fonts and Alpine CDN) — 21 KB total 304 272 305 - ### Verification 306 - 307 - - [ ] `twister api` serves the search page at `http://localhost:8080/` 308 - - [ ] API endpoints (`/search`, `/healthz`, etc.) still work alongside the site 309 - - [ ] Searching a known repo name shows it in results 310 - - [ ] Filter by type restricts results to that type 311 - - [ ] "Load more" appends next page of results 312 - - [ ] API docs pages render correct endpoint signatures, parameter tables, and example JSON 313 - - [ ] Site works on mobile viewport (stacked layout at 640px) 314 - - [ ] Site works with API unavailable (error state shown, no crash) 315 - - [ ] All pages share consistent styling and navigation 316 - 317 273 ### Exit Criteria 318 274 319 275 A user can search Tangled content and read API docs from a public URL without installing anything. ··· 352 308 - [x] Test graceful shutdown on redeploy (SIGTERM handling) 353 309 - [x] Document deploy steps 354 310 355 - ### Verification 356 - 357 - - [ ] API service becomes healthy and routable (public URL) 358 - - [ ] Indexer service starts and stays healthy 359 - - [ ] A new Tangled record ingested post-deploy becomes searchable 360 - - [ ] A redeploy preserves API availability 361 - - [ ] A restart does not lose sync position (cursor persisted) 362 - - [ ] Health checks correctly report status 363 - 364 311 ### Exit Criteria 365 312 366 313 The system runs as a deployed service with health-checked processes on Railway. 367 314 368 - ## M7 — Reindex and Repair 315 + ## M7 — Reindex and Repair ✅ 369 316 370 317 refs: [specs/05-search.md](../specs/05-search.md) 371 318 ··· 377 324 378 325 - `twister reindex` command with scoping options 379 326 - Dry-run mode 380 - - Admin reindex endpoint (optional) 327 + - Admin reindex endpoint 381 328 - Progress logging and error summary 382 329 383 330 ### Tasks 384 331 385 - - [ ] Implement `reindex` subcommand with flags: 332 + - [x] Implement `reindex` subcommand with flags: 386 333 - `--collection` — reindex one collection 387 334 - `--did` — reindex one DID's documents 388 335 - `--document` — reindex one document by ID 389 336 - `--dry-run` — show intended work without writes 390 337 - No flags → reindex all 391 - - [ ] Implement reindex logic: 338 + - [x] Implement reindex logic: 392 339 1. Select documents matching scope 393 340 2. For each document, re-run normalization from stored fields (or re-fetch if source available) 394 341 3. Update FTS-relevant fields 395 342 4. Upsert back to store 396 343 5. Run `OPTIMIZE INDEX idx_documents_fts` after bulk reindex to merge Tantivy segments 397 344 6. Log progress (N/total, errors) 398 - - [ ] Implement `POST /admin/reindex` endpoint (behind `ENABLE_ADMIN_ENDPOINTS` + `ADMIN_AUTH_TOKEN`) 399 - - [ ] Add error summary output on completion 400 - - [ ] Exit non-zero on unrecoverable failures 401 - 402 - ### Verification 403 - 404 - - [ ] Reindexing one document updates its stored normalized text 405 - - [ ] Reindexing one collection repairs intentionally corrupted rows 406 - - [ ] Dry-run shows intended work without writes 407 - - [ ] Reindex command exits non-zero on failures 408 - - [ ] Admin endpoint triggers reindex when enabled 345 + - [x] Implement `POST /admin/reindex` endpoint (behind `ENABLE_ADMIN_ENDPOINTS` + `ADMIN_AUTH_TOKEN`) 346 + - [x] Add error summary output on completion 347 + - [x] Exit non-zero on unrecoverable failures 409 348 410 349 ### Exit Criteria 411 350 ··· 443 382 - Reindex procedure 444 383 - Backfill notes 445 384 - Failure triage guide 446 - 447 - ### Verification 448 - 449 - - [ ] A failed Tap decode surfaces enough context to debug (collection, DID, rkey, error class) 450 - - [ ] DB connectivity failures are visible in logs and readiness 451 - - [ ] Operator can follow the runbook to diagnose a broken indexer 452 - - [ ] Search latency is logged per request 453 385 454 386 ### Exit Criteria 455 387
+44 -1
packages/api/internal/api/api.go
··· 7 7 "log/slog" 8 8 "net/http" 9 9 "strconv" 10 + "strings" 10 11 "time" 11 12 12 13 "tangled.org/desertthunder.dev/twister/internal/config" 14 + "tangled.org/desertthunder.dev/twister/internal/reindex" 13 15 "tangled.org/desertthunder.dev/twister/internal/search" 14 16 "tangled.org/desertthunder.dev/twister/internal/store" 15 17 "tangled.org/desertthunder.dev/twister/internal/view" ··· 47 49 mux.HandleFunc("GET /documents/{id}", s.handleGetDocument) 48 50 49 51 if s.cfg.EnableAdminEndpoints { 50 - mux.HandleFunc("POST /admin/reindex", s.handleNotImplemented) 52 + mux.HandleFunc("POST /admin/reindex", s.handleAdminReindex) 51 53 mux.HandleFunc("POST /admin/reembed", s.handleNotImplemented) 52 54 } 53 55 ··· 237 239 } 238 240 239 241 writeJSON(w, http.StatusOK, documentResponse(doc)) 242 + } 243 + 244 + func (s *Server) handleAdminReindex(w http.ResponseWriter, r *http.Request) { 245 + if s.cfg.AdminAuthToken != "" { 246 + token := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ") 247 + if token != s.cfg.AdminAuthToken { 248 + writeJSON(w, http.StatusUnauthorized, errorBody("unauthorized", "invalid admin token")) 249 + return 250 + } 251 + } 252 + 253 + opts := reindex.Options{ 254 + Collection: r.URL.Query().Get("collection"), 255 + DID: r.URL.Query().Get("did"), 256 + DocumentID: r.URL.Query().Get("document"), 257 + } 258 + 259 + runner := reindex.New(s.store, s.log) 260 + result, err := runner.Run(r.Context(), opts) 261 + if err != nil { 262 + s.log.Error("admin reindex failed", slog.String("error", err.Error())) 263 + if result != nil { 264 + writeJSON(w, http.StatusInternalServerError, map[string]any{ 265 + "error": "reindex_error", 266 + "message": err.Error(), 267 + "total": result.Total, 268 + "updated": result.Updated, 269 + "errors": result.Errors, 270 + }) 271 + } else { 272 + writeJSON(w, http.StatusInternalServerError, errorBody("reindex_error", err.Error())) 273 + } 274 + return 275 + } 276 + 277 + writeJSON(w, http.StatusOK, map[string]any{ 278 + "status": "ok", 279 + "total": result.Total, 280 + "updated": result.Updated, 281 + "errors": result.Errors, 282 + }) 240 283 } 241 284 242 285 func (s *Server) handleNotImplemented(w http.ResponseWriter, _ *http.Request) {
-2
packages/api/internal/backfill/backfill.go
··· 369 369 } 370 370 371 371 handle := res.profile.Handle 372 - // Prefer the seed handle if the user was specified by handle in seeds. 373 372 if h, ok := seedHandles[res.did]; ok && h != "" { 374 373 handle = h 375 374 } ··· 386 385 } 387 386 } 388 387 389 - // Only create a document if we got a profile record back. 390 388 if res.profile.Record == nil { 391 389 continue 392 390 }
-2
packages/api/internal/backfill/backfill_test.go
··· 310 310 t.Fatalf("run backfill: %v", err) 311 311 } 312 312 313 - // Identity handle should be persisted. 314 313 if st.identities["did:plc:seed"] != "alice.tangled.sh" { 315 314 t.Fatalf("expected identity handle for seed DID, got %#v", st.identities) 316 315 } 317 316 318 - // Profile document should be created. 319 317 if len(st.documents) != 1 { 320 318 t.Fatalf("expected 1 profile document, got %d", len(st.documents)) 321 319 }
-1
packages/api/internal/backfill/profile.go
··· 63 63 defer resp.Body.Close() 64 64 65 65 if resp.StatusCode == http.StatusNotFound { 66 - // No profile record — return handle only so identity can still be stored. 67 66 return &ProfileRecord{Handle: handle}, nil 68 67 } 69 68 if resp.StatusCode != http.StatusOK {
-1
packages/api/internal/backfill/seed.go
··· 64 64 return parseSeedFile(input) 65 65 } 66 66 67 - // Single inline DID/handle is supported for convenience. 68 67 return parseSeedList(input) 69 68 } 70 69
+1 -1
packages/api/internal/config/config.go
··· 124 124 if _, err := os.Stat(candidate); err != nil { 125 125 continue 126 126 } 127 - // Load does not override existing process env vars. 127 + 128 128 _ = godotenv.Load(candidate) 129 129 } 130 130 }
+12
packages/api/internal/ingest/ingest_test.go
··· 111 111 return nil, nil 112 112 } 113 113 114 + func (f *fakeStore) ListDocuments(_ context.Context, _ store.DocumentFilter) ([]*store.Document, error) { 115 + docs := make([]*store.Document, 0, len(f.docs)) 116 + for _, d := range f.docs { 117 + docs = append(docs, d) 118 + } 119 + return docs, nil 120 + } 121 + 122 + func (f *fakeStore) OptimizeFTS(_ context.Context) error { 123 + return nil 124 + } 125 + 114 126 func (f *fakeStore) CountDocuments(_ context.Context) (int64, error) { 115 127 return int64(len(f.docs)), nil 116 128 }
+1 -5
packages/api/internal/normalize/normalize_test.go
··· 101 101 t.Errorf("ATURI = %q", doc.ATURI) 102 102 } 103 103 104 - // Searchable 105 104 if !adapter.Searchable(event.Record.Record) { 106 105 t.Error("Searchable returned false for a named repo") 107 106 } ··· 136 135 t.Errorf("TagsJSON = %q, want []", doc.TagsJSON) 137 136 } 138 137 139 - // Deterministic output 140 138 doc2, _ := adapter.Normalize(event) 141 139 if doc.ID != doc2.ID { 142 140 t.Error("Normalize is not deterministic") ··· 234 232 t.Errorf("RecordType = %q", doc.RecordType) 235 233 } 236 234 237 - // Searchable only when contents is non-empty 238 235 if !adapter.Searchable(event.Record.Record) { 239 236 t.Error("Searchable = false for non-empty contents") 240 237 } ··· 262 259 if doc.RecordType != "profile" { 263 260 t.Errorf("RecordType = %q", doc.RecordType) 264 261 } 265 - // Title is intentionally empty (handle resolved separately via identity events) 262 + 266 263 if doc.Title != "" { 267 264 t.Errorf("Title = %q, want empty (handle resolved externally)", doc.Title) 268 265 } 269 266 270 - // Searchable only when description is non-empty 271 267 if !adapter.Searchable(event.Record.Record) { 272 268 t.Error("Searchable = false for non-empty description") 273 269 }
+4 -5
packages/api/internal/normalize/profile.go
··· 40 40 ATURI: BuildATURI(r.DID, r.Collection, r.RKey), 41 41 CID: r.CID, 42 42 RecordType: a.RecordType(), 43 - // Title (handle) is resolved from DID by the indexer via identity events. 44 - Title: "", 45 - Body: description, 46 - Summary: truncate(summary, 200), 47 - TagsJSON: "[]", 43 + Title: "", 44 + Body: description, 45 + Summary: truncate(summary, 200), 46 + TagsJSON: "[]", 48 47 }, nil 49 48 }
-1
packages/api/internal/normalize/pull.go
··· 23 23 title := str(rec, "title") 24 24 body := str(rec, "body") 25 25 26 - // repo DID is extracted from record.target.repo AT-URI 27 26 repoDID := "" 28 27 target := nestedMap(rec, "target") 29 28 if target != nil {
+119
packages/api/internal/reindex/reindex.go
··· 1 + // Package reindex re-syncs documents to the FTS index from stored fields. 2 + // It is used by the `twister reindex` CLI command and the POST /admin/reindex endpoint. 3 + package reindex 4 + 5 + import ( 6 + "context" 7 + "fmt" 8 + "log/slog" 9 + 10 + "tangled.org/desertthunder.dev/twister/internal/store" 11 + ) 12 + 13 + // Options controls which documents are reindexed. 14 + type Options struct { 15 + Collection string // reindex documents in this collection only 16 + DID string // reindex documents authored by this DID only 17 + DocumentID string // reindex a single document by stable ID 18 + DryRun bool // log intended work without writing 19 + } 20 + 21 + // Result summarises the outcome of a reindex run. 22 + type Result struct { 23 + Total int 24 + Updated int 25 + Errors int 26 + } 27 + 28 + // Runner performs the reindex operation. 29 + type Runner struct { 30 + store store.Store 31 + log *slog.Logger 32 + } 33 + 34 + // New creates a Runner. 35 + func New(st store.Store, log *slog.Logger) *Runner { 36 + return &Runner{store: st, log: log} 37 + } 38 + 39 + // Run reindexes documents matching opts. 40 + // It re-upserts each document (which re-syncs the FTS virtual table) and then 41 + // runs an FTS optimize pass to merge Tantivy/FTS5 segments. 42 + func (r *Runner) Run(ctx context.Context, opts Options) (*Result, error) { 43 + filter := store.DocumentFilter{ 44 + Collection: opts.Collection, 45 + DID: opts.DID, 46 + DocumentID: opts.DocumentID, 47 + } 48 + 49 + docs, err := r.store.ListDocuments(ctx, filter) 50 + if err != nil { 51 + return nil, fmt.Errorf("list documents: %w", err) 52 + } 53 + 54 + result := &Result{Total: len(docs)} 55 + 56 + r.log.Info("reindex: starting", 57 + slog.Int("total", result.Total), 58 + slog.Bool("dry_run", opts.DryRun), 59 + slog.String("collection", opts.Collection), 60 + slog.String("did", opts.DID), 61 + slog.String("document_id", opts.DocumentID), 62 + ) 63 + 64 + for i, doc := range docs { 65 + if ctx.Err() != nil { 66 + break 67 + } 68 + 69 + if opts.DryRun { 70 + r.log.Info("reindex: would upsert", 71 + slog.String("id", doc.ID), 72 + slog.String("collection", doc.Collection), 73 + slog.Int("progress", i+1), 74 + slog.Int("total", result.Total), 75 + ) 76 + result.Updated++ 77 + continue 78 + } 79 + 80 + if err := r.store.UpsertDocument(ctx, doc); err != nil { 81 + r.log.Error("reindex: upsert failed", 82 + slog.String("id", doc.ID), 83 + slog.String("collection", doc.Collection), 84 + slog.String("error", err.Error()), 85 + ) 86 + result.Errors++ 87 + continue 88 + } 89 + 90 + result.Updated++ 91 + 92 + if (i+1)%100 == 0 || i+1 == result.Total { 93 + r.log.Info("reindex: progress", 94 + slog.Int("done", i+1), 95 + slog.Int("total", result.Total), 96 + slog.Int("errors", result.Errors), 97 + ) 98 + } 99 + } 100 + 101 + if !opts.DryRun { 102 + r.log.Info("reindex: optimizing fts index") 103 + if err := r.store.OptimizeFTS(ctx); err != nil { 104 + r.log.Error("reindex: fts optimize failed", slog.String("error", err.Error())) 105 + result.Errors++ 106 + } 107 + } 108 + 109 + r.log.Info("reindex: complete", 110 + slog.Int("total", result.Total), 111 + slog.Int("updated", result.Updated), 112 + slog.Int("errors", result.Errors), 113 + ) 114 + 115 + if result.Errors > 0 { 116 + return result, fmt.Errorf("reindex completed with %d error(s)", result.Errors) 117 + } 118 + return result, nil 119 + }
+15 -19
packages/api/internal/search/search.go
··· 24 24 25 25 // Result is a single search hit. 26 26 type Result struct { 27 - ID string `json:"id"` 28 - Collection string `json:"collection"` 29 - RecordType string `json:"record_type"` 30 - Title string `json:"title"` 31 - BodySnippet string `json:"body_snippet,omitempty"` 32 - Summary string `json:"summary,omitempty"` 33 - RepoName string `json:"repo_name,omitempty"` 34 - RepoOwnerHandle string `json:"repo_owner_handle,omitempty"` 35 - AuthorHandle string `json:"author_handle,omitempty"` 36 - DID string `json:"did"` 37 - ATURI string `json:"at_uri"` 38 - Score float64 `json:"score"` 39 - MatchedBy []string `json:"matched_by"` 40 - CreatedAt string `json:"created_at,omitempty"` 41 - UpdatedAt string `json:"updated_at,omitempty"` 27 + ID string `json:"id"` 28 + Collection string `json:"collection"` 29 + RecordType string `json:"record_type"` 30 + Title string `json:"title"` 31 + BodySnippet string `json:"body_snippet,omitempty"` 32 + Summary string `json:"summary,omitempty"` 33 + RepoName string `json:"repo_name,omitempty"` 34 + RepoOwnerHandle string `json:"repo_owner_handle,omitempty"` 35 + AuthorHandle string `json:"author_handle,omitempty"` 36 + DID string `json:"did"` 37 + ATURI string `json:"at_uri"` 38 + Score float64 `json:"score"` 39 + MatchedBy []string `json:"matched_by"` 40 + CreatedAt string `json:"created_at,omitempty"` 41 + UpdatedAt string `json:"updated_at,omitempty"` 42 42 } 43 43 44 44 // Response is the search API response envelope. ··· 70 70 func (r *Repository) Keyword(ctx context.Context, p Params) (*Response, error) { 71 71 ftsQuery := toFTS5Query(p.Query) 72 72 73 - // Build filter conditions beyond the base FTS match. 74 73 var filters []string 75 74 var filterArgs []any 76 75 ··· 103 102 filterArgs = append(filterArgs, p.To) 104 103 } 105 104 106 - // State filter requires a JOIN. 107 105 var join string 108 106 if p.State != "" { 109 107 join = "JOIN record_state rs ON rs.subject_uri = d.at_uri" ··· 116 114 where += " AND " + strings.Join(filters, " AND ") 117 115 } 118 116 119 - // Count total matching documents. 120 117 countSQL := fmt.Sprintf("SELECT COUNT(*) FROM documents_fts JOIN documents d ON d.id = documents_fts.id %s WHERE %s", join, where) 121 118 countArgs := append([]any{ftsQuery}, filterArgs...) 122 119 ··· 125 122 return nil, explainNativeFTSError("count", err) 126 123 } 127 124 128 - // Fetch results with score and snippet. 129 125 resultsSQL := fmt.Sprintf(` 130 126 SELECT d.id, d.title, d.summary, d.repo_name, repo_owner.handle, d.author_handle, 131 127 d.did, d.at_uri, d.collection, d.record_type, d.created_at, d.updated_at,
+67
packages/api/internal/store/sql_store.go
··· 67 67 return nil 68 68 } 69 69 70 + func (s *SQLStore) ListDocuments(ctx context.Context, filter DocumentFilter) ([]*Document, error) { 71 + query := `SELECT id, did, collection, rkey, at_uri, cid, record_type, 72 + title, body, summary, repo_did, repo_name, author_handle, 73 + tags_json, language, created_at, updated_at, indexed_at, deleted_at 74 + FROM documents WHERE deleted_at IS NULL` 75 + args := []any{} 76 + 77 + if filter.DocumentID != "" { 78 + query += " AND id = ?" 79 + args = append(args, filter.DocumentID) 80 + } 81 + if filter.Collection != "" { 82 + query += " AND collection = ?" 83 + args = append(args, filter.Collection) 84 + } 85 + if filter.DID != "" { 86 + query += " AND did = ?" 87 + args = append(args, filter.DID) 88 + } 89 + 90 + rows, err := s.db.QueryContext(ctx, query, args...) 91 + if err != nil { 92 + return nil, fmt.Errorf("list documents: %w", err) 93 + } 94 + defer rows.Close() 95 + 96 + var docs []*Document 97 + for rows.Next() { 98 + doc := &Document{} 99 + var ( 100 + title, body, summary, repoDID, repoName, authorHandle sql.NullString 101 + tagsJSON, language, createdAt, updatedAt, deletedAt sql.NullString 102 + ) 103 + if err := rows.Scan( 104 + &doc.ID, &doc.DID, &doc.Collection, &doc.RKey, &doc.ATURI, &doc.CID, &doc.RecordType, 105 + &title, &body, &summary, &repoDID, &repoName, &authorHandle, 106 + &tagsJSON, &language, &createdAt, &updatedAt, &doc.IndexedAt, &deletedAt, 107 + ); err != nil { 108 + return nil, fmt.Errorf("scan document: %w", err) 109 + } 110 + doc.Title = title.String 111 + doc.Body = body.String 112 + doc.Summary = summary.String 113 + doc.RepoDID = repoDID.String 114 + doc.RepoName = repoName.String 115 + doc.AuthorHandle = authorHandle.String 116 + doc.TagsJSON = tagsJSON.String 117 + doc.Language = language.String 118 + doc.CreatedAt = createdAt.String 119 + doc.UpdatedAt = updatedAt.String 120 + doc.DeletedAt = deletedAt.String 121 + docs = append(docs, doc) 122 + } 123 + if err := rows.Err(); err != nil { 124 + return nil, fmt.Errorf("iterate documents: %w", err) 125 + } 126 + return docs, nil 127 + } 128 + 129 + func (s *SQLStore) OptimizeFTS(ctx context.Context) error { 130 + _, err := s.db.ExecContext(ctx, `INSERT INTO documents_fts(documents_fts) VALUES('optimize')`) 131 + if err != nil { 132 + return fmt.Errorf("optimize fts: %w", err) 133 + } 134 + return nil 135 + } 136 + 70 137 func (s *SQLStore) GetDocument(ctx context.Context, id string) (*Document, error) { 71 138 row := s.db.QueryRowContext(ctx, ` 72 139 SELECT id, did, collection, rkey, at_uri, cid, record_type,
+9
packages/api/internal/store/store.go
··· 40 40 UpdatedAt string 41 41 } 42 42 43 + // DocumentFilter scopes a ListDocuments query to a subset of documents. 44 + type DocumentFilter struct { 45 + Collection string // filter by collection NSID 46 + DID string // filter by author DID 47 + DocumentID string // filter to a single document by stable ID 48 + } 49 + 43 50 // Store is the persistence interface for Twister. 44 51 type Store interface { 45 52 UpsertDocument(ctx context.Context, doc *Document) error 46 53 GetDocument(ctx context.Context, id string) (*Document, error) 47 54 MarkDeleted(ctx context.Context, id string) error 55 + ListDocuments(ctx context.Context, filter DocumentFilter) ([]*Document, error) 56 + OptimizeFTS(ctx context.Context) error 48 57 GetSyncState(ctx context.Context, consumer string) (*SyncState, error) 49 58 SetSyncState(ctx context.Context, consumer string, cursor string) error 50 59 UpdateRecordState(ctx context.Context, subjectURI string, state string) error
+37 -5
packages/api/main.go
··· 17 17 "tangled.org/desertthunder.dev/twister/internal/ingest" 18 18 "tangled.org/desertthunder.dev/twister/internal/normalize" 19 19 "tangled.org/desertthunder.dev/twister/internal/observability" 20 + "tangled.org/desertthunder.dev/twister/internal/reindex" 20 21 "tangled.org/desertthunder.dev/twister/internal/search" 21 22 "tangled.org/desertthunder.dev/twister/internal/store" 22 23 "tangled.org/desertthunder.dev/twister/internal/tapclient" ··· 141 142 ctx, cancel := baseContext() 142 143 defer cancel() 143 144 144 - // Start health server on separate port for Railway health checks. 145 145 healthMux := http.NewServeMux() 146 146 healthMux.HandleFunc("GET /health", func(w http.ResponseWriter, r *http.Request) { 147 147 if err := st.Ping(r.Context()); err != nil { ··· 264 264 } 265 265 266 266 func newReindexCmd(local *bool) *cobra.Command { 267 - return &cobra.Command{ 267 + var opts reindex.Options 268 + 269 + cmd := &cobra.Command{ 268 270 Use: "reindex", 269 - Short: "Re-normalize and upsert all documents", 271 + Short: "Re-normalize and upsert all documents into the FTS index", 270 272 RunE: func(cmd *cobra.Command, args []string) error { 271 273 cfg, err := config.Load(config.LoadOptions{Local: *local}) 272 274 if err != nil { 273 275 return fmt.Errorf("config: %w", err) 274 276 } 275 277 log := observability.NewLogger(cfg) 276 - log.Info("reindex: not yet implemented") 277 - return nil 278 + log.Info("starting reindex", slog.String("service", "reindex"), slog.String("version", version)) 279 + 280 + db, err := store.Open(cfg.TursoURL, cfg.TursoToken) 281 + if err != nil { 282 + return fmt.Errorf("open database: %w", err) 283 + } 284 + defer db.Close() 285 + 286 + if err := store.Migrate(db, cfg.TursoURL); err != nil { 287 + return fmt.Errorf("migrate database: %w", err) 288 + } 289 + 290 + ctx, cancel := baseContext() 291 + defer cancel() 292 + 293 + runner := reindex.New(store.New(db), log) 294 + result, err := runner.Run(ctx, opts) 295 + if result != nil { 296 + log.Info("reindex finished", 297 + slog.Int("total", result.Total), 298 + slog.Int("updated", result.Updated), 299 + slog.Int("errors", result.Errors), 300 + ) 301 + } 302 + return err 278 303 }, 279 304 } 305 + 306 + cmd.Flags().StringVar(&opts.Collection, "collection", "", "Reindex only documents in this collection") 307 + cmd.Flags().StringVar(&opts.DID, "did", "", "Reindex only documents authored by this DID") 308 + cmd.Flags().StringVar(&opts.DocumentID, "document", "", "Reindex a single document by stable ID") 309 + cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Show intended work without writing") 310 + 311 + return cmd 280 312 } 281 313 282 314 func newReembedCmd(local *bool) *cobra.Command {