a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: search api

+539 -13
+10 -10
docs/api/tasks/phase-1-mvp.md
··· 215 215 216 216 ### Tasks 217 217 218 - - [ ] Set up HTTP server with net/http router 219 - - [ ] Implement `/healthz` (always 200) and `/readyz` (SELECT 1 against DB) 220 - - [ ] Implement search repository with FTS queries: 218 + - [x] Set up HTTP server with net/http router 219 + - [x] Implement `/healthz` (always 200) and `/readyz` (SELECT 1 against DB) 220 + - [x] Implement search repository with FTS queries: 221 221 222 222 ```sql 223 223 SELECT id, title, summary, repo_name, author_handle, collection, record_type, ··· 231 231 LIMIT ? OFFSET ?; 232 232 ``` 233 233 234 - - [ ] Implement request validation: 234 + - [x] Implement request validation: 235 235 - `q` required, non-empty 236 236 - `limit` 1–100, default 20 237 237 - `offset` >= 0, default 0 238 238 - Reject unknown parameters with 400 239 - - [ ] Implement filters (as WHERE clauses): 239 + - [x] Implement filters (as WHERE clauses): 240 240 - `collection` → `d.collection = ?` 241 241 - `type` → `d.record_type = ?` 242 242 - `author` → `d.author_handle = ?` or `d.did = ?` 243 243 - `repo` → `d.repo_name = ?` 244 - - [ ] Implement `/documents/{id}` — full document response 245 - - [ ] Implement stable JSON response contract (see spec 05-search.md) 246 - - [ ] Exclude tombstoned documents (`deleted_at IS NOT NULL`) by default 247 - - [ ] Add request logging middleware (method, path, status, duration) 248 - - [ ] Add CORS headers if needed 244 + - [x] Implement `/documents/{id}` — full document response 245 + - [x] Implement stable JSON response contract (see spec 05-search.md) 246 + - [x] Exclude tombstoned documents (`deleted_at IS NOT NULL`) by default 247 + - [x] Add request logging middleware (method, path, status, duration) 248 + - [x] Add CORS headers if needed 249 249 250 250 ### Verification 251 251
+321
packages/api/internal/api/api.go
··· 1 1 package api 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log/slog" 8 + "net/http" 9 + "strconv" 10 + "time" 11 + 12 + "tangled.org/desertthunder.dev/twister/internal/config" 13 + "tangled.org/desertthunder.dev/twister/internal/search" 14 + "tangled.org/desertthunder.dev/twister/internal/store" 15 + ) 16 + 17 + // Server is the HTTP search API server. 18 + type Server struct { 19 + search *search.Repository 20 + store store.Store 21 + cfg *config.Config 22 + log *slog.Logger 23 + } 24 + 25 + // New creates a new API server. 26 + func New(searchRepo *search.Repository, st store.Store, cfg *config.Config, log *slog.Logger) *Server { 27 + return &Server{ 28 + search: searchRepo, 29 + store: st, 30 + cfg: cfg, 31 + log: log, 32 + } 33 + } 34 + 35 + // Handler returns the HTTP handler with all routes registered. 36 + func (s *Server) Handler() http.Handler { 37 + mux := http.NewServeMux() 38 + 39 + // Health 40 + mux.HandleFunc("GET /healthz", s.handleHealthz) 41 + mux.HandleFunc("GET /readyz", s.handleReadyz) 42 + 43 + // Search — M5 44 + mux.HandleFunc("GET /search", s.handleSearch) 45 + mux.HandleFunc("GET /search/keyword", s.handleSearchKeyword) 46 + 47 + // Search — placeholders (Phase 2/3) 48 + mux.HandleFunc("GET /search/semantic", s.handleNotImplemented) 49 + mux.HandleFunc("GET /search/hybrid", s.handleNotImplemented) 50 + 51 + // Documents 52 + mux.HandleFunc("GET /documents/{id}", s.handleGetDocument) 53 + 54 + // Admin — placeholders (M7) 55 + if s.cfg.EnableAdminEndpoints { 56 + mux.HandleFunc("POST /admin/reindex", s.handleNotImplemented) 57 + mux.HandleFunc("POST /admin/reembed", s.handleNotImplemented) 58 + } 59 + 60 + return s.withMiddleware(mux) 61 + } 62 + 63 + // Run starts the HTTP server and blocks until ctx is cancelled. 64 + func (s *Server) Run(ctx context.Context) error { 65 + srv := &http.Server{ 66 + Addr: s.cfg.HTTPBindAddr, 67 + Handler: s.Handler(), 68 + ReadHeaderTimeout: 10 * time.Second, 69 + IdleTimeout: 60 * time.Second, 70 + } 71 + 72 + errCh := make(chan error, 1) 73 + go func() { 74 + s.log.Info("listening", slog.String("addr", s.cfg.HTTPBindAddr)) 75 + errCh <- srv.ListenAndServe() 76 + }() 77 + 78 + select { 79 + case err := <-errCh: 80 + return err 81 + case <-ctx.Done(): 82 + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 83 + defer cancel() 84 + return srv.Shutdown(shutdownCtx) 85 + } 86 + } 87 + 88 + // --- Middleware --- 89 + 90 + func (s *Server) withMiddleware(next http.Handler) http.Handler { 91 + return s.corsMiddleware(s.loggingMiddleware(next)) 92 + } 93 + 94 + func (s *Server) loggingMiddleware(next http.Handler) http.Handler { 95 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 96 + start := time.Now() 97 + rw := &responseWriter{ResponseWriter: w, status: 200} 98 + next.ServeHTTP(rw, r) 99 + s.log.Info("request", 100 + slog.String("method", r.Method), 101 + slog.String("path", r.URL.Path), 102 + slog.String("query", r.URL.RawQuery), 103 + slog.Int("status", rw.status), 104 + slog.Int64("duration_ms", time.Since(start).Milliseconds()), 105 + ) 106 + }) 107 + } 108 + 109 + func (s *Server) corsMiddleware(next http.Handler) http.Handler { 110 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 111 + w.Header().Set("Access-Control-Allow-Origin", "*") 112 + w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS") 113 + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 114 + if r.Method == http.MethodOptions { 115 + w.WriteHeader(http.StatusNoContent) 116 + return 117 + } 118 + next.ServeHTTP(w, r) 119 + }) 120 + } 121 + 122 + type responseWriter struct { 123 + http.ResponseWriter 124 + status int 125 + } 126 + 127 + func (rw *responseWriter) WriteHeader(code int) { 128 + rw.status = code 129 + rw.ResponseWriter.WriteHeader(code) 130 + } 131 + 132 + // --- Health Handlers --- 133 + 134 + func (s *Server) handleHealthz(w http.ResponseWriter, _ *http.Request) { 135 + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) 136 + } 137 + 138 + func (s *Server) handleReadyz(w http.ResponseWriter, r *http.Request) { 139 + if err := s.search.Ping(r.Context()); err != nil { 140 + s.log.Error("readyz: db unreachable", slog.String("error", err.Error())) 141 + writeJSON(w, http.StatusServiceUnavailable, errorBody("db_unreachable", "database is not reachable")) 142 + return 143 + } 144 + writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) 145 + } 146 + 147 + // --- Search Handlers --- 148 + 149 + // knownSearchParams is the whitelist of accepted query parameters for search endpoints. 150 + var knownSearchParams = map[string]bool{ 151 + "q": true, "mode": true, "limit": true, "offset": true, 152 + "collection": true, "type": true, "author": true, "repo": true, 153 + "language": true, "from": true, "to": true, "state": true, 154 + } 155 + 156 + func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) { 157 + mode := r.URL.Query().Get("mode") 158 + if mode == "" { 159 + mode = s.cfg.SearchDefaultMode 160 + } 161 + switch mode { 162 + case "keyword": 163 + s.handleSearchKeyword(w, r) 164 + case "semantic", "hybrid": 165 + s.handleNotImplemented(w, r) 166 + default: 167 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "mode must be keyword, semantic, or hybrid")) 168 + } 169 + } 170 + 171 + func (s *Server) handleSearchKeyword(w http.ResponseWriter, r *http.Request) { 172 + // Reject unknown parameters. 173 + for key := range r.URL.Query() { 174 + if !knownSearchParams[key] { 175 + writeJSON(w, http.StatusBadRequest, errorBody("unknown_parameter", fmt.Sprintf("unknown parameter: %s", key))) 176 + return 177 + } 178 + } 179 + 180 + q := r.URL.Query().Get("q") 181 + if q == "" { 182 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "q is required")) 183 + return 184 + } 185 + 186 + limit, err := intParam(r, "limit", s.cfg.SearchDefaultLimit) 187 + if err != nil || limit < 1 || limit > s.cfg.SearchMaxLimit { 188 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", fmt.Sprintf("limit must be between 1 and %d", s.cfg.SearchMaxLimit))) 189 + return 190 + } 191 + 192 + offset, err := intParam(r, "offset", 0) 193 + if err != nil || offset < 0 { 194 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "offset must be >= 0")) 195 + return 196 + } 197 + 198 + params := search.Params{ 199 + Query: q, 200 + Limit: limit, 201 + Offset: offset, 202 + Collection: r.URL.Query().Get("collection"), 203 + Type: r.URL.Query().Get("type"), 204 + Author: r.URL.Query().Get("author"), 205 + Repo: r.URL.Query().Get("repo"), 206 + Language: r.URL.Query().Get("language"), 207 + From: r.URL.Query().Get("from"), 208 + To: r.URL.Query().Get("to"), 209 + State: r.URL.Query().Get("state"), 210 + } 211 + 212 + resp, err := s.search.Keyword(r.Context(), params) 213 + if err != nil { 214 + s.log.Error("search failed", slog.String("error", err.Error()), slog.String("query", q)) 215 + writeJSON(w, http.StatusInternalServerError, errorBody("search_error", "search failed")) 216 + return 217 + } 218 + 219 + writeJSON(w, http.StatusOK, resp) 220 + } 221 + 222 + // --- Document Handler --- 223 + 224 + func (s *Server) handleGetDocument(w http.ResponseWriter, r *http.Request) { 225 + id := r.PathValue("id") 226 + if id == "" { 227 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "document id is required")) 228 + return 229 + } 230 + 231 + // Path value may be URL-encoded with | separators. The mux already decodes it, 232 + // but callers may use pipe-encoded or slash-separated IDs; accept as-is. 233 + doc, err := s.store.GetDocument(r.Context(), id) 234 + if err != nil { 235 + s.log.Error("get document failed", slog.String("error", err.Error()), slog.String("id", id)) 236 + writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to fetch document")) 237 + return 238 + } 239 + if doc == nil { 240 + writeJSON(w, http.StatusNotFound, errorBody("not_found", "document not found")) 241 + return 242 + } 243 + if doc.DeletedAt != "" { 244 + writeJSON(w, http.StatusNotFound, errorBody("not_found", "document not found")) 245 + return 246 + } 247 + 248 + writeJSON(w, http.StatusOK, documentResponse(doc)) 249 + } 250 + 251 + // --- Placeholder --- 252 + 253 + func (s *Server) handleNotImplemented(w http.ResponseWriter, _ *http.Request) { 254 + writeJSON(w, http.StatusNotImplemented, errorBody("not_implemented", "this endpoint is not yet available")) 255 + } 256 + 257 + // --- Helpers --- 258 + 259 + func writeJSON(w http.ResponseWriter, status int, v any) { 260 + w.Header().Set("Content-Type", "application/json") 261 + w.WriteHeader(status) 262 + _ = json.NewEncoder(w).Encode(v) 263 + } 264 + 265 + func errorBody(code, message string) map[string]string { 266 + return map[string]string{"error": code, "message": message} 267 + } 268 + 269 + func intParam(r *http.Request, key string, def int) (int, error) { 270 + v := r.URL.Query().Get(key) 271 + if v == "" { 272 + return def, nil 273 + } 274 + n, err := strconv.Atoi(v) 275 + if err != nil { 276 + return 0, err 277 + } 278 + return n, nil 279 + } 280 + 281 + type documentJSON struct { 282 + ID string `json:"id"` 283 + DID string `json:"did"` 284 + Collection string `json:"collection"` 285 + RKey string `json:"rkey"` 286 + ATURI string `json:"at_uri"` 287 + CID string `json:"cid"` 288 + RecordType string `json:"record_type"` 289 + Title string `json:"title"` 290 + Body string `json:"body"` 291 + Summary string `json:"summary,omitempty"` 292 + RepoName string `json:"repo_name,omitempty"` 293 + AuthorHandle string `json:"author_handle,omitempty"` 294 + TagsJSON string `json:"tags_json,omitempty"` 295 + Language string `json:"language,omitempty"` 296 + CreatedAt string `json:"created_at,omitempty"` 297 + UpdatedAt string `json:"updated_at,omitempty"` 298 + IndexedAt string `json:"indexed_at"` 299 + } 300 + 301 + func documentResponse(doc *store.Document) documentJSON { 302 + return documentJSON{ 303 + ID: doc.ID, 304 + DID: doc.DID, 305 + Collection: doc.Collection, 306 + RKey: doc.RKey, 307 + ATURI: doc.ATURI, 308 + CID: doc.CID, 309 + RecordType: doc.RecordType, 310 + Title: doc.Title, 311 + Body: doc.Body, 312 + Summary: doc.Summary, 313 + RepoName: doc.RepoName, 314 + AuthorHandle: doc.AuthorHandle, 315 + TagsJSON: doc.TagsJSON, 316 + Language: doc.Language, 317 + CreatedAt: doc.CreatedAt, 318 + UpdatedAt: doc.UpdatedAt, 319 + IndexedAt: doc.IndexedAt, 320 + } 321 + } 322 +
+183
packages/api/internal/search/search.go
··· 1 1 package search 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "strings" 8 + ) 9 + 10 + // Params holds validated search query parameters. 11 + type Params struct { 12 + Query string 13 + Limit int 14 + Offset int 15 + Collection string 16 + Type string 17 + Author string 18 + Repo string 19 + Language string 20 + From string 21 + To string 22 + State string 23 + } 24 + 25 + // Result is a single search hit. 26 + type Result struct { 27 + ID string `json:"id"` 28 + Collection string `json:"collection"` 29 + RecordType string `json:"record_type"` 30 + Title string `json:"title"` 31 + BodySnippet string `json:"body_snippet,omitempty"` 32 + Summary string `json:"summary,omitempty"` 33 + RepoName string `json:"repo_name,omitempty"` 34 + AuthorHandle string `json:"author_handle,omitempty"` 35 + DID string `json:"did"` 36 + ATURI string `json:"at_uri"` 37 + Score float64 `json:"score"` 38 + MatchedBy []string `json:"matched_by"` 39 + CreatedAt string `json:"created_at,omitempty"` 40 + UpdatedAt string `json:"updated_at,omitempty"` 41 + } 42 + 43 + // Response is the search API response envelope. 44 + type Response struct { 45 + Query string `json:"query"` 46 + Mode string `json:"mode"` 47 + Total int `json:"total"` 48 + Limit int `json:"limit"` 49 + Offset int `json:"offset"` 50 + Results []Result `json:"results"` 51 + } 52 + 53 + // Repository executes search queries against the database. 54 + type Repository struct { 55 + db *sql.DB 56 + } 57 + 58 + // NewRepository creates a search repository backed by the given database. 59 + func NewRepository(db *sql.DB) *Repository { 60 + return &Repository{db: db} 61 + } 62 + 63 + // Ping checks database connectivity. 64 + func (r *Repository) Ping(ctx context.Context) error { 65 + return r.db.PingContext(ctx) 66 + } 67 + 68 + // Keyword runs a full-text keyword search. 69 + func (r *Repository) Keyword(ctx context.Context, p Params) (*Response, error) { 70 + // Build filter conditions beyond the base FTS match. 71 + var filters []string 72 + var filterArgs []any 73 + 74 + if p.Collection != "" { 75 + filters = append(filters, "d.collection = ?") 76 + filterArgs = append(filterArgs, p.Collection) 77 + } 78 + if p.Type != "" { 79 + filters = append(filters, "d.record_type = ?") 80 + filterArgs = append(filterArgs, p.Type) 81 + } 82 + if p.Author != "" { 83 + filters = append(filters, "(d.author_handle = ? OR d.did = ?)") 84 + filterArgs = append(filterArgs, p.Author, p.Author) 85 + } 86 + if p.Repo != "" { 87 + filters = append(filters, "(d.repo_name = ? OR d.repo_did = ?)") 88 + filterArgs = append(filterArgs, p.Repo, p.Repo) 89 + } 90 + if p.Language != "" { 91 + filters = append(filters, "d.language = ?") 92 + filterArgs = append(filterArgs, p.Language) 93 + } 94 + if p.From != "" { 95 + filters = append(filters, "d.created_at >= ?") 96 + filterArgs = append(filterArgs, p.From) 97 + } 98 + if p.To != "" { 99 + filters = append(filters, "d.created_at <= ?") 100 + filterArgs = append(filterArgs, p.To) 101 + } 102 + 103 + // State filter requires a JOIN. 104 + var join string 105 + if p.State != "" { 106 + join = "JOIN record_state rs ON rs.subject_uri = d.at_uri" 107 + filters = append(filters, "rs.state = ?") 108 + filterArgs = append(filterArgs, p.State) 109 + } 110 + 111 + where := "fts_match(d.title, d.body, d.summary, d.repo_name, d.author_handle, d.tags_json, ?) AND d.deleted_at IS NULL" 112 + if len(filters) > 0 { 113 + where += " AND " + strings.Join(filters, " AND ") 114 + } 115 + 116 + // Count total matching documents. 117 + countSQL := fmt.Sprintf("SELECT COUNT(*) FROM documents d %s WHERE %s", join, where) 118 + countArgs := append([]any{p.Query}, filterArgs...) 119 + 120 + var total int 121 + if err := r.db.QueryRowContext(ctx, countSQL, countArgs...).Scan(&total); err != nil { 122 + return nil, fmt.Errorf("count: %w", err) 123 + } 124 + 125 + // Fetch results with score and snippet. 126 + resultsSQL := fmt.Sprintf(` 127 + SELECT d.id, d.title, d.summary, d.repo_name, d.author_handle, 128 + d.did, d.at_uri, d.collection, d.record_type, d.created_at, d.updated_at, 129 + fts_score(d.title, d.body, d.summary, d.repo_name, d.author_handle, d.tags_json, ?) AS score, 130 + fts_highlight(d.body, '<mark>', '</mark>', ?) AS body_snippet 131 + FROM documents d 132 + %s 133 + WHERE %s 134 + ORDER BY score DESC 135 + LIMIT ? OFFSET ?`, join, where) 136 + 137 + resultsArgs := make([]any, 0, 3+len(filterArgs)+2) 138 + resultsArgs = append(resultsArgs, p.Query, p.Query, p.Query) // score, highlight, match 139 + resultsArgs = append(resultsArgs, filterArgs...) 140 + resultsArgs = append(resultsArgs, p.Limit, p.Offset) 141 + 142 + rows, err := r.db.QueryContext(ctx, resultsSQL, resultsArgs...) 143 + if err != nil { 144 + return nil, fmt.Errorf("search: %w", err) 145 + } 146 + defer rows.Close() 147 + 148 + results := make([]Result, 0) 149 + for rows.Next() { 150 + var res Result 151 + var title, summary, repoName, authorHandle sql.NullString 152 + var createdAt, updatedAt sql.NullString 153 + var bodySnippet sql.NullString 154 + 155 + if err := rows.Scan( 156 + &res.ID, &title, &summary, &repoName, &authorHandle, 157 + &res.DID, &res.ATURI, &res.Collection, &res.RecordType, 158 + &createdAt, &updatedAt, &res.Score, &bodySnippet, 159 + ); err != nil { 160 + return nil, fmt.Errorf("scan: %w", err) 161 + } 162 + res.Title = title.String 163 + res.Summary = summary.String 164 + res.RepoName = repoName.String 165 + res.AuthorHandle = authorHandle.String 166 + res.BodySnippet = bodySnippet.String 167 + res.CreatedAt = createdAt.String 168 + res.UpdatedAt = updatedAt.String 169 + res.MatchedBy = []string{"keyword"} 170 + results = append(results, res) 171 + } 172 + if err := rows.Err(); err != nil { 173 + return nil, fmt.Errorf("rows: %w", err) 174 + } 175 + 176 + return &Response{ 177 + Query: p.Query, 178 + Mode: "keyword", 179 + Total: total, 180 + Limit: p.Limit, 181 + Offset: p.Offset, 182 + Results: results, 183 + }, nil 184 + }
+25 -3
packages/api/main.go
··· 10 10 "time" 11 11 12 12 "github.com/spf13/cobra" 13 + "tangled.org/desertthunder.dev/twister/internal/api" 13 14 "tangled.org/desertthunder.dev/twister/internal/backfill" 14 15 "tangled.org/desertthunder.dev/twister/internal/config" 15 16 "tangled.org/desertthunder.dev/twister/internal/ingest" 16 17 "tangled.org/desertthunder.dev/twister/internal/normalize" 17 18 "tangled.org/desertthunder.dev/twister/internal/observability" 19 + "tangled.org/desertthunder.dev/twister/internal/search" 18 20 "tangled.org/desertthunder.dev/twister/internal/store" 19 21 "tangled.org/desertthunder.dev/twister/internal/tapclient" 20 22 ) ··· 62 64 63 65 func newAPICmd() *cobra.Command { 64 66 return &cobra.Command{ 65 - Use: "api", 66 - Short: "Start the HTTP search API", 67 + Use: "api", 68 + Aliases: []string{"serve"}, 69 + Short: "Start the HTTP search API", 67 70 RunE: func(cmd *cobra.Command, args []string) error { 68 71 cfg, err := config.Load() 69 72 if err != nil { ··· 71 74 } 72 75 log := observability.NewLogger(cfg) 73 76 log.Info("starting api", slog.String("service", "api"), slog.String("version", version), slog.String("addr", cfg.HTTPBindAddr)) 77 + 78 + db, err := store.Open(cfg.TursoURL, cfg.TursoToken) 79 + if err != nil { 80 + return fmt.Errorf("open database: %w", err) 81 + } 82 + defer db.Close() 83 + 84 + if err := store.Migrate(db); err != nil { 85 + return fmt.Errorf("migrate database: %w", err) 86 + } 87 + 88 + st := store.New(db) 89 + searchRepo := search.NewRepository(db) 90 + srv := api.New(searchRepo, st, cfg, log) 91 + 74 92 ctx, cancel := baseContext() 75 93 defer cancel() 76 - <-ctx.Done() 94 + 95 + if err := srv.Run(ctx); err != nil { 96 + return fmt.Errorf("run api: %w", err) 97 + } 98 + 77 99 log.Info("shutting down api") 78 100 return nil 79 101 },