a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: read-through indexing job processing

- job retry with exponential backoff.
- update local SQLite configuration with PRAGMA settings for better concurrency.
- smoke checks/tests for API endpoints using uv

+1049 -48
+5
.gitignore
··· 33 33 plugins 34 34 www 35 35 36 + # SQLite 36 37 *.db 38 + *.db-shm 39 + *.db-wal 40 + 37 41 .env 42 + __pycache__/
+19 -14
README.md
··· 41 41 Start the Ionic/Vite app: 42 42 43 43 ```bash 44 - pnpm dev 45 - # or: just dev 44 + pnpm dev # or: just dev 46 45 ``` 47 46 48 47 That serves the client from `apps/twisted` with Vite. 49 48 50 - To run the Go API locally, make sure `packages/api/.env` has at least: 49 + To run the Go API locally for routine experimentation, no Turso credentials are required. 50 + 51 + Start the API in local file mode: 52 + 53 + ```bash 54 + pnpm api:run:api # or: just api-dev 55 + ``` 51 56 52 - - `TURSO_DATABASE_URL` 53 - - `TURSO_AUTH_TOKEN` 57 + This serves the API and search site on `http://localhost:8080` using 58 + `packages/api/twister-dev.db`. 54 59 55 - Then start the API: 60 + To run the API against remote Turso instead: 56 61 57 62 ```bash 58 - pnpm api:run:api 59 - # or: just api-dev 63 + just api-dev remote 60 64 ``` 61 65 62 - This serves the API and search site on `http://localhost:8080`. 66 + To run the indexer in local file mode as well: 63 67 64 - To run the indexer as well, `packages/api/.env` also needs: 68 + ```bash 69 + pnpm api:run:indexer # or: just api-run-indexer 70 + ``` 71 + 72 + To run the indexer against remote Turso, `packages/api/.env` needs: 65 73 66 74 - `TAP_URL` 67 75 - `TAP_AUTH_PASSWORD` 68 76 - `INDEXED_COLLECTIONS` 69 77 70 - Then start the indexer in a separate terminal: 71 - 72 78 ```bash 73 - pnpm api:run:indexer 74 - # or: just api-run-indexer 79 + just api-run-indexer remote 75 80 ``` 76 81 77 82 Typical local setup is three terminals:
+16 -16
docs/reference/api.md
··· 133 133 134 134 All configuration is via environment variables (with `.env` file support): 135 135 136 - | Variable | Default | Purpose | 137 - | -------------------------- | ----------------------- | ---------------------------------------------- | 138 - | `TURSO_DATABASE_URL` | — | Database connection (required) | 139 - | `TURSO_AUTH_TOKEN` | — | Auth token (required for remote) | 140 - | `TAP_URL` | — | Tap WebSocket URL | 141 - | `TAP_AUTH_PASSWORD` | — | Tap admin password | 142 - | `INDEXED_COLLECTIONS` | all | Collection allowlist (CSV, supports wildcards) | 143 - | `HTTP_BIND_ADDR` | `:8080` | API server bind address | 144 - | `INDEXER_HEALTH_ADDR` | `:9090` | Indexer health probe address | 145 - | `LOG_LEVEL` | info | debug/info/warn/error | 146 - | `LOG_FORMAT` | json | json or text | 147 - | `ENABLE_ADMIN_ENDPOINTS` | false | Enable admin routes | 148 - | `ADMIN_AUTH_TOKEN` | — | Bearer token for admin | 149 - | `ENABLE_INGEST_ENRICHMENT` | true | XRPC enrichment at ingest time | 150 - | `PLC_DIRECTORY_URL` | `https://plc.directory` | PLC Directory | 151 - | `XRPC_TIMEOUT` | 15s | XRPC HTTP timeout | 136 + | Variable | Default | Purpose | 137 + | -------------------------- | ----------------------- | ----------------------------------------------- | 138 + | `TURSO_DATABASE_URL` | — | Database connection (required unless `--local`) | 139 + | `TURSO_AUTH_TOKEN` | — | Auth token (required for remote) | 140 + | `TAP_URL` | — | Tap WebSocket URL | 141 + | `TAP_AUTH_PASSWORD` | — | Tap admin password | 142 + | `INDEXED_COLLECTIONS` | all | Collection allowlist (CSV, supports wildcards) | 143 + | `HTTP_BIND_ADDR` | `:8080` | API server bind address | 144 + | `INDEXER_HEALTH_ADDR` | `:9090` | Indexer health probe address | 145 + | `LOG_LEVEL` | info | debug/info/warn/error | 146 + | `LOG_FORMAT` | json | json or text | 147 + | `ENABLE_ADMIN_ENDPOINTS` | false | Enable admin routes | 148 + | `ADMIN_AUTH_TOKEN` | — | Bearer token for admin | 149 + | `ENABLE_INGEST_ENRICHMENT` | true | XRPC enrichment at ingest time | 150 + | `PLC_DIRECTORY_URL` | `https://plc.directory` | PLC Directory | 151 + | `XRPC_TIMEOUT` | 15s | XRPC HTTP timeout | 152 152 153 153 ## Deployment 154 154
+3 -3
docs/roadmap.md
··· 7 7 8 8 Highest priority. This work blocks further investment in semantic search, hybrid ranking, and broader discovery features. 9 9 10 - - [ ] Stabilize local development and experimentation around a local `file:` database 10 + - [x] Stabilize local development and experimentation around a local `file:` database 11 11 - [x] Document backup, restore, and disk-growth procedures for the experimental local DB 12 12 - [x] Research production backend options: PostgreSQL, Turso remote/libSQL, and Turso embedded replicas 13 13 - [x] Write a production storage decision record with workload and operational tradeoffs, using `docs/adr/pg.md` and `docs/adr/turso.md` 14 14 - [x] Define the migration path from the experimental local setup to the chosen production backend 15 - - [ ] Add cURL smoke tests for `healthz`, `readyz`, `search`, `documents`, indexing, and activity in `scripts/api/` 15 + - [x] Add a durable read-through indexing job queue for records fetched through the API 16 + - [x] Add API smoke tests for `healthz`, `readyz`, `search`, `documents`, indexing, and activity in `scripts/api/` 16 17 - desertthunder.dev DID: `did:plc:xg2vq45muivyy3xwatcehspu` 17 18 - Twisted AT URI: `at://did:plc:xg2vq45muivyy3xwatcehspu/sh.tangled.repo/3mho6hukiei22` 18 19 - Profile AT URI: `at://did:plc:xg2vq45muivyy3xwatcehspu/sh.tangled.actor.profile/self` 19 20 - Follow AT URI (desertthunder.dev follows npmx): `at://did:plc:xg2vq45muivyy3xwatcehspu/sh.tangled.graph.follow/3mhofstanru22` 20 21 - Star AT URI (desertthunder.dev stars microcosm-rs): `at://did:plc:lulmyldiq4sb2ikags5sfb25/sh.tangled.repo/3lvsxzinfz222` 21 22 - ~~Add `just` targets for smoke-test runs locally and against a remote base URL~~ directly invoking the scripts is fine. 22 - - [ ] Add a durable read-through indexing job queue for records fetched through the API 23 23 - [ ] Reuse the existing normalization and upsert path for on-demand indexing jobs 24 24 - [ ] Trigger indexing jobs from repo, issue, PR, profile, and similar fetch handlers 25 25 - [ ] Add dedupe, retries, and observability for indexing jobs
+6 -4
justfile
··· 41 41 api-build: 42 42 just --justfile packages/api/justfile build 43 43 44 - api-dev: 45 - just --justfile packages/api/justfile run-api 44 + # Run API. Usage: just api-dev [mode], mode: local|remote (default local) 45 + api-dev mode="local": 46 + just --justfile packages/api/justfile run-api {{mode}} 46 47 47 - api-run-indexer: 48 - just --justfile packages/api/justfile run-indexer 48 + # Run indexer. Usage: just api-run-indexer [mode], mode: local|remote (default local) 49 + api-run-indexer mode="local": 50 + just --justfile packages/api/justfile run-indexer {{mode}} 49 51 50 52 api-test: 51 53 just --justfile packages/api/justfile test
+2 -2
package.json
··· 13 13 "app:cap:run:android": "pnpm --dir apps/twisted exec cap run android", 14 14 "api:build": "just --justfile packages/api/justfile build", 15 15 "api:test": "just --justfile packages/api/justfile test", 16 - "api:run:api": "just --justfile packages/api/justfile run-api", 17 - "api:run:indexer": "just --justfile packages/api/justfile run-indexer" 16 + "api:run:api": "just api-dev", 17 + "api:run:indexer": "just api-run-indexer" 18 18 } 19 19 }
+18
packages/api/README.md
··· 18 18 19 19 The server listens on `:8080` by default. Logs are printed as text when `--local` is set. 20 20 21 + ## API Smoke Tests 22 + 23 + Smoke checks for the API surface live in a uv-managed Python project at 24 + `scripts/api/`. 25 + 26 + From the repo root: 27 + 28 + ```sh 29 + uv run --project scripts/api twister-api-smoke 30 + ``` 31 + 32 + Optional base URL override: 33 + 34 + ```sh 35 + TWISTER_API_BASE_URL=http://localhost:8080 \ 36 + uv run --project scripts/api twister-api-smoke 37 + ``` 38 + 21 39 ## Experimental Local DB Operations 22 40 23 41 The experimental local database lives at `packages/api/twister-dev.db` when you run Twister from `packages/api` with `--local`.
+16
packages/api/internal/api/actors.go
··· 84 84 if err != nil { 85 85 return nil, fmt.Errorf("list repos for %s: %w", actor.DID, err) 86 86 } 87 + s.enqueueXRPCList(r.Context(), entries) 87 88 88 89 for _, entry := range entries { 89 90 name, _ := entry.Value["name"].(string) ··· 208 209 s.actorError(w, err) 209 210 return 210 211 } 212 + s.enqueueXRPCRecord(r.Context(), rec.URI, rec.CID, rec.Value) 211 213 212 214 var bsky *bskyProfileResponse 213 215 if linked, _ := rec.Value["bluesky"].(bool); linked { ··· 243 245 s.actorError(w, err) 244 246 return 245 247 } 248 + s.enqueueXRPCList(r.Context(), entries) 246 249 247 250 records := make([]recordEntry, len(entries)) 248 251 for i, e := range entries { ··· 279 282 s.actorError(w, err) 280 283 return 281 284 } 285 + s.enqueueXRPCRecord(r.Context(), rec.URI, rec.CID, rec.Value) 282 286 283 287 writeJSON(w, http.StatusOK, map[string]any{ 284 288 "did": repo.DID, ··· 443 447 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch issues")) 444 448 return 445 449 } 450 + s.enqueueXRPCList(r.Context(), issues) 446 451 447 452 var records []issueEntry 448 453 for _, e := range issues { ··· 480 485 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch pulls")) 481 486 return 482 487 } 488 + s.enqueueXRPCList(r.Context(), pulls) 483 489 484 490 var records []pullEntry 485 491 for _, e := range pulls { ··· 518 524 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch issues")) 519 525 return 520 526 } 527 + s.enqueueXRPCList(r.Context(), issues) 521 528 522 529 records := make([]issueEntry, len(issues)) 523 530 for i, e := range issues { ··· 548 555 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch pulls")) 549 556 return 550 557 } 558 + s.enqueueXRPCList(r.Context(), pulls) 551 559 552 560 records := make([]pullEntry, len(pulls)) 553 561 for i, e := range pulls { ··· 578 586 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch follows")) 579 587 return 580 588 } 589 + s.enqueueXRPCList(r.Context(), entries) 581 590 582 591 records := make([]recordEntry, len(entries)) 583 592 for i, e := range entries { ··· 605 614 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch strings")) 606 615 return 607 616 } 617 + s.enqueueXRPCList(r.Context(), entries) 608 618 609 619 records := make([]recordEntry, len(entries)) 610 620 for i, e := range entries { ··· 635 645 s.actorError(w, err) 636 646 return 637 647 } 648 + s.enqueueXRPCRecord(r.Context(), rec.URI, rec.CID, rec.Value) 638 649 639 650 _, stateMap, err := s.fetchIssuesAndStates(r, actor.PDS, actor.DID) 640 651 if err != nil { ··· 667 678 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch comments")) 668 679 return 669 680 } 681 + s.enqueueXRPCList(r.Context(), entries) 670 682 671 683 var records []recordEntry 672 684 for _, e := range entries { ··· 704 716 s.actorError(w, err) 705 717 return 706 718 } 719 + s.enqueueXRPCRecord(r.Context(), rec.URI, rec.CID, rec.Value) 707 720 708 721 _, statusMap, err := s.fetchPullsAndStatuses(r, actor.PDS, actor.DID) 709 722 if err != nil { ··· 736 749 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch comments")) 737 750 return 738 751 } 752 + s.enqueueXRPCList(r.Context(), entries) 739 753 740 754 var records []recordEntry 741 755 for _, e := range entries { ··· 792 806 } 793 807 794 808 stateMap := make(map[string]string, len(states)) 809 + s.enqueueXRPCList(r.Context(), states) 795 810 for _, e := range states { 796 811 issueURI, _ := e.Value["issue"].(string) 797 812 state, _ := e.Value["state"].(string) ··· 839 854 } 840 855 841 856 statusMap := make(map[string]string, len(statuses)) 857 + s.enqueueXRPCList(r.Context(), statuses) 842 858 for _, e := range statuses { 843 859 pullURI, _ := e.Value["pull"].(string) 844 860 status, _ := e.Value["status"].(string)
+21
packages/api/internal/api/api.go
··· 1 1 package api 2 2 3 3 import ( 4 + "bufio" 4 5 "context" 5 6 "encoding/json" 6 7 "fmt" 7 8 "log/slog" 9 + "net" 8 10 "net/http" 9 11 "strconv" 10 12 "strings" ··· 13 15 14 16 "tangled.org/desertthunder.dev/twister/internal/config" 15 17 "tangled.org/desertthunder.dev/twister/internal/constellation" 18 + "tangled.org/desertthunder.dev/twister/internal/normalize" 16 19 "tangled.org/desertthunder.dev/twister/internal/reindex" 17 20 "tangled.org/desertthunder.dev/twister/internal/search" 18 21 "tangled.org/desertthunder.dev/twister/internal/store" ··· 28 31 log *slog.Logger 29 32 constellation *constellation.Client 30 33 xrpc *xrpc.Client 34 + registry *normalize.Registry 31 35 } 32 36 33 37 // New creates a new API server. ··· 39 43 log: log, 40 44 constellation: constellation, 41 45 xrpc: xrpcClient, 46 + registry: normalize.NewRegistry(), 42 47 } 43 48 } 44 49 ··· 114 119 } 115 120 116 121 errCh := make(chan error, 1) 122 + go s.runReadThroughIndexer(ctx) 123 + 117 124 go func() { 118 125 s.log.Info("listening", slog.String("addr", s.cfg.HTTPBindAddr)) 119 126 errCh <- srv.ListenAndServe() ··· 169 176 func (rw *responseWriter) WriteHeader(code int) { 170 177 rw.status = code 171 178 rw.ResponseWriter.WriteHeader(code) 179 + } 180 + 181 + func (rw *responseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { 182 + h, ok := rw.ResponseWriter.(http.Hijacker) 183 + if !ok { 184 + return nil, nil, fmt.Errorf("response writer does not support hijacking") 185 + } 186 + return h.Hijack() 187 + } 188 + 189 + func (rw *responseWriter) Flush() { 190 + if f, ok := rw.ResponseWriter.(http.Flusher); ok { 191 + f.Flush() 192 + } 172 193 } 173 194 174 195 func (s *Server) handleHealthz(w http.ResponseWriter, _ *http.Request) {
+202
packages/api/internal/api/readthrough.go
··· 1 + package api 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log/slog" 8 + "time" 9 + 10 + "tangled.org/desertthunder.dev/twister/internal/normalize" 11 + "tangled.org/desertthunder.dev/twister/internal/store" 12 + "tangled.org/desertthunder.dev/twister/internal/xrpc" 13 + ) 14 + 15 + const readThroughIdlePoll = 1 * time.Second 16 + 17 + func (s *Server) runReadThroughIndexer(ctx context.Context) { 18 + ticker := time.NewTicker(readThroughIdlePoll) 19 + defer ticker.Stop() 20 + 21 + s.log.Info("read-through indexer worker started") 22 + for { 23 + if ctx.Err() != nil { 24 + s.log.Info("read-through indexer worker stopped") 25 + return 26 + } 27 + 28 + job, err := s.store.ClaimIndexingJob(ctx) 29 + if err != nil { 30 + s.log.Warn("read-through claim failed", slog.String("error", err.Error())) 31 + select { 32 + case <-ctx.Done(): 33 + return 34 + case <-ticker.C: 35 + } 36 + continue 37 + } 38 + if job == nil { 39 + select { 40 + case <-ctx.Done(): 41 + return 42 + case <-ticker.C: 43 + } 44 + continue 45 + } 46 + 47 + if err := s.processReadThroughJob(ctx, job); err != nil { 48 + nextDelay := retryDelay(job.Attempts + 1) 49 + nextAt := time.Now().UTC().Add(nextDelay).Format(time.RFC3339) 50 + retryErr := s.store.RetryIndexingJob(ctx, job.DocumentID, nextAt, truncateErr(err)) 51 + if retryErr != nil { 52 + s.log.Error("read-through retry update failed", 53 + slog.String("document_id", job.DocumentID), 54 + slog.String("error", retryErr.Error()), 55 + ) 56 + continue 57 + } 58 + s.log.Warn("read-through job failed; scheduled retry", 59 + slog.String("document_id", job.DocumentID), 60 + slog.Int("attempt", job.Attempts+1), 61 + slog.Duration("retry_in", nextDelay), 62 + slog.String("error", err.Error()), 63 + ) 64 + continue 65 + } 66 + 67 + if err := s.store.CompleteIndexingJob(ctx, job.DocumentID); err != nil { 68 + s.log.Error("read-through complete failed", 69 + slog.String("document_id", job.DocumentID), 70 + slog.String("error", err.Error()), 71 + ) 72 + continue 73 + } 74 + } 75 + } 76 + 77 + func (s *Server) processReadThroughJob(ctx context.Context, job *store.IndexingJob) error { 78 + record := map[string]any{} 79 + if err := json.Unmarshal([]byte(job.RecordJSON), &record); err != nil { 80 + return fmt.Errorf("decode record json: %w", err) 81 + } 82 + 83 + event := normalize.TapRecordEvent{ 84 + Type: "record", 85 + Record: &normalize.TapRecord{ 86 + DID: job.DID, 87 + Collection: job.Collection, 88 + RKey: job.RKey, 89 + Action: "create", 90 + CID: job.CID, 91 + Record: record, 92 + }, 93 + } 94 + 95 + if handler, ok := s.registry.StateHandler(job.Collection); ok { 96 + update, err := handler.HandleState(event) 97 + if err != nil { 98 + return fmt.Errorf("state normalize: %w", err) 99 + } 100 + if err := s.store.UpdateRecordState(ctx, update.SubjectURI, update.State); err != nil { 101 + return fmt.Errorf("update state: %w", err) 102 + } 103 + return nil 104 + } 105 + 106 + adapter, ok := s.registry.Adapter(job.Collection) 107 + if !ok { 108 + return nil 109 + } 110 + 111 + doc, err := adapter.Normalize(event) 112 + if err != nil { 113 + return fmt.Errorf("normalize record: %w", err) 114 + } 115 + 116 + handle, err := s.store.GetIdentityHandle(ctx, job.DID) 117 + if err != nil { 118 + return fmt.Errorf("lookup identity handle: %w", err) 119 + } 120 + if handle != "" { 121 + doc.AuthorHandle = handle 122 + if doc.RecordType == "profile" { 123 + doc.Title = handle 124 + } 125 + } 126 + 127 + if err := s.store.UpsertDocument(ctx, doc); err != nil { 128 + return fmt.Errorf("upsert document: %w", err) 129 + } 130 + 131 + if adapter.Searchable(record) { 132 + if err := s.store.EnqueueEmbeddingJob(ctx, doc.ID); err != nil { 133 + s.log.Warn("read-through enqueue embedding failed", 134 + slog.String("document_id", doc.ID), 135 + slog.String("error", err.Error()), 136 + ) 137 + } 138 + } 139 + return nil 140 + } 141 + 142 + func (s *Server) enqueueXRPCRecord(ctx context.Context, uri, cid string, value map[string]any) { 143 + did, collection, rkey, err := normalize.ParseATURI(uri) 144 + if err != nil { 145 + s.log.Debug("read-through skip invalid at-uri", slog.String("uri", uri), slog.String("error", err.Error())) 146 + return 147 + } 148 + payload, err := json.Marshal(value) 149 + if err != nil { 150 + s.log.Debug("read-through skip unmarshalable record", slog.String("uri", uri), slog.String("error", err.Error())) 151 + return 152 + } 153 + input := store.IndexingJobInput{ 154 + DocumentID: normalize.StableID(did, collection, rkey), 155 + DID: did, 156 + Collection: collection, 157 + RKey: rkey, 158 + CID: cid, 159 + RecordJSON: string(payload), 160 + } 161 + if err := s.store.EnqueueIndexingJob(ctx, input); err != nil { 162 + s.log.Warn("enqueue read-through indexing job failed", 163 + slog.String("document_id", input.DocumentID), 164 + slog.String("error", err.Error()), 165 + ) 166 + } 167 + } 168 + 169 + func (s *Server) enqueueXRPCList(ctx context.Context, entries []xrpc.ListRecordEntry) { 170 + for _, e := range entries { 171 + s.enqueueXRPCRecord(ctx, e.URI, e.CID, e.Value) 172 + } 173 + } 174 + 175 + func retryDelay(attempt int) time.Duration { 176 + if attempt < 1 { 177 + attempt = 1 178 + } 179 + base := time.Second * time.Duration(1<<minInt(attempt-1, 8)) 180 + if base > 5*time.Minute { 181 + return 5 * time.Minute 182 + } 183 + return base 184 + } 185 + 186 + func truncateErr(err error) string { 187 + if err == nil { 188 + return "" 189 + } 190 + msg := err.Error() 191 + if len(msg) > 500 { 192 + return msg[:500] 193 + } 194 + return msg 195 + } 196 + 197 + func minInt(a, b int) int { 198 + if a < b { 199 + return a 200 + } 201 + return b 202 + }
+16
packages/api/internal/ingest/ingest_test.go
··· 103 103 return nil 104 104 } 105 105 106 + func (f *fakeStore) EnqueueIndexingJob(_ context.Context, _ store.IndexingJobInput) error { 107 + return nil 108 + } 109 + 110 + func (f *fakeStore) ClaimIndexingJob(_ context.Context) (*store.IndexingJob, error) { 111 + return nil, nil 112 + } 113 + 114 + func (f *fakeStore) CompleteIndexingJob(_ context.Context, _ string) error { 115 + return nil 116 + } 117 + 118 + func (f *fakeStore) RetryIndexingJob(_ context.Context, _, _, _ string) error { 119 + return nil 120 + } 121 + 106 122 func (f *fakeStore) GetFollowSubjects(_ context.Context, _ string) ([]string, error) { 107 123 return nil, nil 108 124 }
+27
packages/api/internal/store/db.go
··· 7 7 "log/slog" 8 8 "sort" 9 9 "strings" 10 + "time" 10 11 11 12 _ "github.com/tursodatabase/libsql-client-go/libsql" 12 13 _ "modernc.org/sqlite" ··· 31 32 if err != nil { 32 33 return nil, fmt.Errorf("open db: %w", err) 33 34 } 35 + if strings.HasPrefix(url, "file:") { 36 + if err := configureLocalSQLite(db); err != nil { 37 + db.Close() 38 + return nil, err 39 + } 40 + } 34 41 if err := db.Ping(); err != nil { 35 42 db.Close() 36 43 return nil, fmt.Errorf("ping db: %w", err) 37 44 } 38 45 return db, nil 46 + } 47 + 48 + func configureLocalSQLite(db *sql.DB) error { 49 + // Busy timeout gives the writer a window to wait instead of failing fast with "database is locked". 50 + if _, err := db.Exec(`PRAGMA busy_timeout = 5000`); err != nil { 51 + return fmt.Errorf("configure sqlite busy_timeout: %w", err) 52 + } 53 + // WAL mode allows concurrent readers with a writer and is the default for multi-process local dev. 54 + if _, err := db.Exec(`PRAGMA journal_mode = WAL`); err != nil { 55 + return fmt.Errorf("configure sqlite wal mode: %w", err) 56 + } 57 + if _, err := db.Exec(`PRAGMA synchronous = NORMAL`); err != nil { 58 + return fmt.Errorf("configure sqlite synchronous mode: %w", err) 59 + } 60 + 61 + db.SetMaxOpenConns(1) 62 + db.SetMaxIdleConns(1) 63 + db.SetConnMaxLifetime(0) 64 + db.SetConnMaxIdleTime(5 * time.Minute) 65 + return nil 39 66 } 40 67 41 68 // driverAndDSN returns the sql driver name and DSN for the given URL.
+30
packages/api/internal/store/db_test.go
··· 2 2 3 3 import ( 4 4 "database/sql" 5 + "path/filepath" 5 6 "strings" 6 7 "testing" 7 8 ··· 42 43 t.Fatalf("unexpected error: %v", err) 43 44 } 44 45 } 46 + 47 + func TestOpenLocalSQLiteAppliesPragmasAndPoolLimits(t *testing.T) { 48 + path := filepath.Join(t.TempDir(), "local.db") 49 + db, err := Open("file:"+path, "") 50 + if err != nil { 51 + t.Fatalf("open local sqlite: %v", err) 52 + } 53 + t.Cleanup(func() { _ = db.Close() }) 54 + 55 + if got := db.Stats().MaxOpenConnections; got != 1 { 56 + t.Fatalf("max open conns: got %d, want 1", got) 57 + } 58 + 59 + var mode string 60 + if err := db.QueryRow(`PRAGMA journal_mode`).Scan(&mode); err != nil { 61 + t.Fatalf("pragma journal_mode: %v", err) 62 + } 63 + if !strings.EqualFold(mode, "wal") { 64 + t.Fatalf("journal_mode: got %q, want wal", mode) 65 + } 66 + 67 + var timeout int 68 + if err := db.QueryRow(`PRAGMA busy_timeout`).Scan(&timeout); err != nil { 69 + t.Fatalf("pragma busy_timeout: %v", err) 70 + } 71 + if timeout != 5000 { 72 + t.Fatalf("busy_timeout: got %d, want 5000", timeout) 73 + } 74 + }
+16
packages/api/internal/store/migrations/005_indexing_jobs.sql
··· 1 + CREATE TABLE IF NOT EXISTS indexing_jobs ( 2 + document_id TEXT PRIMARY KEY, 3 + did TEXT NOT NULL, 4 + collection TEXT NOT NULL, 5 + rkey TEXT NOT NULL, 6 + cid TEXT NOT NULL, 7 + record_json TEXT NOT NULL, 8 + status TEXT NOT NULL, 9 + attempts INTEGER NOT NULL DEFAULT 0, 10 + last_error TEXT, 11 + scheduled_at TEXT NOT NULL, 12 + updated_at TEXT NOT NULL 13 + ); 14 + 15 + CREATE INDEX IF NOT EXISTS idx_indexing_jobs_status_scheduled 16 + ON indexing_jobs(status, scheduled_at, updated_at);
+118 -2
packages/api/internal/store/sql_store.go
··· 98 98 for rows.Next() { 99 99 doc := &Document{} 100 100 var ( 101 - title, body, summary, repoDID, repoName, authorHandle sql.NullString 101 + title, body, summary, repoDID, repoName, authorHandle sql.NullString 102 102 tagsJSON, language, createdAt, updatedAt, webURL, deletedAt sql.NullString 103 103 ) 104 104 if err := rows.Scan( ··· 271 271 return nil 272 272 } 273 273 274 + func (s *SQLStore) EnqueueIndexingJob(ctx context.Context, input IndexingJobInput) error { 275 + now := time.Now().UTC().Format(time.RFC3339) 276 + _, err := s.db.ExecContext(ctx, ` 277 + INSERT INTO indexing_jobs ( 278 + document_id, did, collection, rkey, cid, record_json, 279 + status, attempts, last_error, scheduled_at, updated_at 280 + ) VALUES (?, ?, ?, ?, ?, ?, 'pending', 0, NULL, ?, ?) 281 + ON CONFLICT(document_id) DO UPDATE SET 282 + did = excluded.did, 283 + collection = excluded.collection, 284 + rkey = excluded.rkey, 285 + cid = excluded.cid, 286 + record_json = excluded.record_json, 287 + status = 'pending', 288 + last_error = NULL, 289 + scheduled_at = excluded.scheduled_at, 290 + updated_at = excluded.updated_at`, 291 + input.DocumentID, input.DID, input.Collection, input.RKey, input.CID, input.RecordJSON, now, now, 292 + ) 293 + if err != nil { 294 + return fmt.Errorf("enqueue indexing job: %w", err) 295 + } 296 + return nil 297 + } 298 + 299 + func (s *SQLStore) ClaimIndexingJob(ctx context.Context) (*IndexingJob, error) { 300 + now := time.Now().UTC().Format(time.RFC3339) 301 + staleCutoff := time.Now().UTC().Add(-5 * time.Minute).Format(time.RFC3339) 302 + 303 + tx, err := s.db.BeginTx(ctx, nil) 304 + if err != nil { 305 + return nil, fmt.Errorf("begin claim indexing job tx: %w", err) 306 + } 307 + defer tx.Rollback() 308 + 309 + var job IndexingJob 310 + row := tx.QueryRowContext(ctx, ` 311 + SELECT document_id, did, collection, rkey, cid, record_json, 312 + attempts, status, COALESCE(last_error, ''), scheduled_at, updated_at 313 + FROM indexing_jobs 314 + WHERE (status = 'pending' AND scheduled_at <= ?) 315 + OR (status = 'processing' AND updated_at <= ?) 316 + ORDER BY scheduled_at ASC, updated_at ASC 317 + LIMIT 1`, now, staleCutoff) 318 + 319 + err = row.Scan( 320 + &job.DocumentID, 321 + &job.DID, 322 + &job.Collection, 323 + &job.RKey, 324 + &job.CID, 325 + &job.RecordJSON, 326 + &job.Attempts, 327 + &job.Status, 328 + &job.LastError, 329 + &job.ScheduledAt, 330 + &job.UpdatedAt, 331 + ) 332 + if errors.Is(err, sql.ErrNoRows) { 333 + return nil, nil 334 + } 335 + if err != nil { 336 + return nil, fmt.Errorf("select indexing job: %w", err) 337 + } 338 + 339 + res, err := tx.ExecContext(ctx, ` 340 + UPDATE indexing_jobs 341 + SET status = 'processing', updated_at = ? 342 + WHERE document_id = ?`, 343 + now, job.DocumentID, 344 + ) 345 + if err != nil { 346 + return nil, fmt.Errorf("mark indexing job processing: %w", err) 347 + } 348 + affected, err := res.RowsAffected() 349 + if err != nil { 350 + return nil, fmt.Errorf("rows affected claim indexing job: %w", err) 351 + } 352 + if affected == 0 { 353 + return nil, nil 354 + } 355 + 356 + if err := tx.Commit(); err != nil { 357 + return nil, fmt.Errorf("commit claim indexing job tx: %w", err) 358 + } 359 + job.Status = "processing" 360 + job.UpdatedAt = now 361 + return &job, nil 362 + } 363 + 364 + func (s *SQLStore) CompleteIndexingJob(ctx context.Context, documentID string) error { 365 + _, err := s.db.ExecContext(ctx, `DELETE FROM indexing_jobs WHERE document_id = ?`, documentID) 366 + if err != nil { 367 + return fmt.Errorf("complete indexing job: %w", err) 368 + } 369 + return nil 370 + } 371 + 372 + func (s *SQLStore) RetryIndexingJob(ctx context.Context, documentID string, nextScheduledAt string, lastError string) error { 373 + now := time.Now().UTC().Format(time.RFC3339) 374 + _, err := s.db.ExecContext(ctx, ` 375 + UPDATE indexing_jobs 376 + SET status = 'pending', 377 + attempts = attempts + 1, 378 + last_error = ?, 379 + scheduled_at = ?, 380 + updated_at = ? 381 + WHERE document_id = ?`, 382 + lastError, nextScheduledAt, now, documentID, 383 + ) 384 + if err != nil { 385 + return fmt.Errorf("retry indexing job: %w", err) 386 + } 387 + return nil 388 + } 389 + 274 390 func (s *SQLStore) GetFollowSubjects(ctx context.Context, did string) ([]string, error) { 275 391 rows, err := s.db.QueryContext(ctx, ` 276 392 SELECT DISTINCT repo_did ··· 351 467 func scanDocument(row *sql.Row) (*Document, error) { 352 468 doc := &Document{} 353 469 var ( 354 - title, body, summary, repoDID, repoName, authorHandle sql.NullString 470 + title, body, summary, repoDID, repoName, authorHandle sql.NullString 355 471 tagsJSON, language, createdAt, updatedAt, webURL, deletedAt sql.NullString 356 472 ) 357 473 err := row.Scan(
+35 -3
packages/api/internal/store/store.go
··· 41 41 UpdatedAt string 42 42 } 43 43 44 + // IndexingJob stores queued read-through indexing work fetched through the API. 45 + type IndexingJob struct { 46 + DocumentID string 47 + DID string 48 + Collection string 49 + RKey string 50 + CID string 51 + RecordJSON string 52 + Attempts int 53 + Status string 54 + LastError string 55 + ScheduledAt string 56 + UpdatedAt string 57 + } 58 + 59 + // IndexingJobInput is the payload used to enqueue or refresh an indexing job. 60 + type IndexingJobInput struct { 61 + DocumentID string 62 + DID string 63 + Collection string 64 + RKey string 65 + CID string 66 + RecordJSON string 67 + } 68 + 44 69 // DocumentFilter scopes a ListDocuments query to a subset of documents. 45 70 type DocumentFilter struct { 46 - Collection string // filter by collection NSID 47 - DID string // filter by author DID 48 - DocumentID string // filter to a single document by stable ID 71 + // filter by collection NSID 72 + Collection string 73 + // filter by author DID 74 + DID string 75 + // filter to a single document by stable ID 76 + DocumentID string 49 77 } 50 78 51 79 // Store is the persistence interface for Twister. ··· 61 89 UpsertIdentityHandle(ctx context.Context, did, handle string, isActive bool, status string) error 62 90 GetIdentityHandle(ctx context.Context, did string) (string, error) 63 91 EnqueueEmbeddingJob(ctx context.Context, documentID string) error 92 + EnqueueIndexingJob(ctx context.Context, input IndexingJobInput) error 93 + ClaimIndexingJob(ctx context.Context) (*IndexingJob, error) 94 + CompleteIndexingJob(ctx context.Context, documentID string) error 95 + RetryIndexingJob(ctx context.Context, documentID string, nextScheduledAt string, lastError string) error 64 96 GetFollowSubjects(ctx context.Context, did string) ([]string, error) 65 97 GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) 66 98 CountDocuments(ctx context.Context) (int64, error)
+64
packages/api/internal/store/store_test.go
··· 315 315 t.Fatalf("collaborators: got %#v", collaborators) 316 316 } 317 317 }) 318 + 319 + t.Run("indexing jobs enqueue claim retry complete", func(t *testing.T) { 320 + job := store.IndexingJobInput{ 321 + DocumentID: "did:plc:owner|sh.tangled.repo|repo1", 322 + DID: "did:plc:owner", 323 + Collection: "sh.tangled.repo", 324 + RKey: "repo1", 325 + CID: "cid-repo1", 326 + RecordJSON: `{"name":"repo1"}`, 327 + } 328 + if err := st.EnqueueIndexingJob(ctx, job); err != nil { 329 + t.Fatalf("enqueue indexing job: %v", err) 330 + } 331 + if err := st.EnqueueIndexingJob(ctx, job); err != nil { 332 + t.Fatalf("enqueue indexing job second call: %v", err) 333 + } 334 + 335 + claimed, err := st.ClaimIndexingJob(ctx) 336 + if err != nil { 337 + t.Fatalf("claim indexing job: %v", err) 338 + } 339 + if claimed == nil { 340 + t.Fatal("expected claimed indexing job") 341 + } 342 + if claimed.DocumentID != job.DocumentID { 343 + t.Fatalf("claimed document id: got %q want %q", claimed.DocumentID, job.DocumentID) 344 + } 345 + 346 + if err := st.RetryIndexingJob(ctx, job.DocumentID, "9999-12-31T23:59:59Z", "boom"); err != nil { 347 + t.Fatalf("retry indexing job: %v", err) 348 + } 349 + 350 + none, err := st.ClaimIndexingJob(ctx) 351 + if err != nil { 352 + t.Fatalf("claim delayed indexing job: %v", err) 353 + } 354 + if none != nil { 355 + t.Fatalf("expected no claim before schedule time, got %#v", none) 356 + } 357 + 358 + if err := st.RetryIndexingJob(ctx, job.DocumentID, "1970-01-01T00:00:00Z", "retry-now"); err != nil { 359 + t.Fatalf("retry indexing job now: %v", err) 360 + } 361 + 362 + claimed, err = st.ClaimIndexingJob(ctx) 363 + if err != nil { 364 + t.Fatalf("claim retried indexing job: %v", err) 365 + } 366 + if claimed == nil { 367 + t.Fatal("expected claimed retried indexing job") 368 + } 369 + 370 + if err := st.CompleteIndexingJob(ctx, job.DocumentID); err != nil { 371 + t.Fatalf("complete indexing job: %v", err) 372 + } 373 + 374 + claimed, err = st.ClaimIndexingJob(ctx) 375 + if err != nil { 376 + t.Fatalf("claim after complete: %v", err) 377 + } 378 + if claimed != nil { 379 + t.Fatalf("expected no job after complete, got %#v", claimed) 380 + } 381 + }) 318 382 }
+20 -4
packages/api/justfile
··· 5 5 build: 6 6 CGO_ENABLED=0 go build -ldflags "{{ldflags}}" -o twister ./main.go 7 7 8 - run-api: 9 - go run -ldflags "{{ldflags}}" ./main.go api 8 + # Run the API server. Usage: just run-api [mode], mode: local|remote (default local) 9 + run-api mode="local": 10 + if [ "{{mode}}" = "local" ]; then \ 11 + go run -ldflags "{{ldflags}}" ./main.go api --local; \ 12 + elif [ "{{mode}}" = "remote" ]; then \ 13 + go run -ldflags "{{ldflags}}" ./main.go api; \ 14 + else \ 15 + echo "invalid mode '{{mode}}' (expected local or remote)" >&2; \ 16 + exit 1; \ 17 + fi 10 18 11 - run-indexer: 12 - go run -ldflags "{{ldflags}}" ./main.go indexer 19 + # Run the indexer. Usage: just run-indexer [mode], mode: local|remote (default local) 20 + run-indexer mode="local": 21 + if [ "{{mode}}" = "local" ]; then \ 22 + go run -ldflags "{{ldflags}}" ./main.go indexer --local; \ 23 + elif [ "{{mode}}" = "remote" ]; then \ 24 + go run -ldflags "{{ldflags}}" ./main.go indexer; \ 25 + else \ 26 + echo "invalid mode '{{mode}}' (expected local or remote)" >&2; \ 27 + exit 1; \ 28 + fi 13 29 14 30 test: 15 31 go test ./...
+24
scripts/api/README.md
··· 1 + # Twister API Smoke Checks 2 + 3 + Python smoke checks for Twister API endpoints, managed with uv. 4 + 5 + ## Usage 6 + 7 + From the repo root: 8 + 9 + ```sh 10 + # Run all 11 + uv run --project scripts/api twister-api-smoke 12 + # Run specific checks (healthz | readyz | search | documents | indexing | activity) 13 + uv run --project scripts/api twister-api-smoke --check healthz 14 + ``` 15 + 16 + ## Options 17 + 18 + - `--verbose` for detailed output of API responses (JSON) 19 + - `--base-url` (or env `TWISTER_API_BASE_URL`, default `http://localhost:8080`) 20 + - `--query` for search check (default `twisted`) 21 + - `--document-id` for documents check 22 + - `--actor-handle` for indexing check (default `desertthunder.dev`) 23 + - `--repo-at-uri` for repo fixture indexing/search checks 24 + - `--profile-at-uri` for profile fixture indexing/search checks
+20
scripts/api/pyproject.toml
··· 1 + [project] 2 + name = "twister-api-smoke" 3 + version = "0.1.0" 4 + description = "Smoke checks for Twister API endpoints" 5 + readme = "README.md" 6 + requires-python = ">=3.11" 7 + dependencies = [] 8 + 9 + [project.scripts] 10 + twister-api-smoke = "twister_api_smoke.cli:main" 11 + 12 + [build-system] 13 + requires = ["hatchling>=1.24.0"] 14 + build-backend = "hatchling.build" 15 + 16 + [tool.uv] 17 + package = true 18 + 19 + [tool.ruff] 20 + line-length = 100
+1
scripts/api/src/twister_api_smoke/__init__.py
··· 1 + """Twister API smoke checks."""
+362
scripts/api/src/twister_api_smoke/cli.py
··· 1 + import argparse 2 + import enum 3 + import json 4 + import os 5 + import sys 6 + import time 7 + from dataclasses import dataclass 8 + from http.client import HTTPConnection 9 + from typing import Any, NoReturn 10 + from urllib.error import HTTPError, URLError 11 + from urllib.parse import quote, urlencode, urljoin, urlparse 12 + from urllib.request import Request, urlopen 13 + 14 + 15 + class ANSI(enum.StrEnum): 16 + RED = "\033[31m" 17 + GREEN = "\033[32m" 18 + YELLOW = "\033[33m" 19 + CYAN = "\033[36m" 20 + MAGENTA = "\033[35m" 21 + RESET = "\033[0m" 22 + 23 + def colorize(self, msg: str) -> str: 24 + return f"{self.value}{msg}{ANSI.RESET.value}" 25 + 26 + 27 + def echo(msg: str) -> None: 28 + tag = ANSI.GREEN.colorize("[smoke]") 29 + print(f"{tag} {msg}") 30 + 31 + 32 + def fail(msg: str) -> NoReturn: 33 + tag = ANSI.RED.colorize("[smoke] FAIL") 34 + print(f"{tag}: {msg}", file=sys.stderr) 35 + raise SystemExit(1) 36 + 37 + 38 + @dataclass(frozen=True) 39 + class Options: 40 + base_url: str 41 + check: str 42 + query: str 43 + document_id: str 44 + actor_handle: str 45 + repo_at_uri: str 46 + profile_at_uri: str 47 + verbose: bool 48 + 49 + 50 + def http_get_status(url: str) -> int: 51 + req = Request(url, method="GET") 52 + try: 53 + with urlopen(req, timeout=10) as resp: 54 + return resp.status 55 + except HTTPError as err: 56 + return err.code 57 + except URLError as err: 58 + fail(f"request failed for {url}: {err}") 59 + 60 + 61 + def http_get_json(url: str, params: dict[str, str] | None = None) -> Any: 62 + if params: 63 + query = urlencode(params) 64 + sep = "&" if "?" in url else "?" 65 + url = f"{url}{sep}{query}" 66 + 67 + req = Request(url, method="GET") 68 + try: 69 + with urlopen(req, timeout=15) as resp: 70 + payload = resp.read().decode("utf-8") 71 + return json.loads(payload) 72 + except HTTPError as err: 73 + body = err.read().decode("utf-8", errors="replace") 74 + fail(f"{url} returned {err.code}: {body}") 75 + except URLError as err: 76 + fail(f"request failed for {url}: {err}") 77 + except json.JSONDecodeError as err: 78 + fail(f"invalid JSON from {url}: {err}") 79 + 80 + 81 + def assert_status(url: str, expected: int) -> None: 82 + actual = http_get_status(url) 83 + if actual != expected: 84 + fail(f"{url} returned {actual} (expected {expected})") 85 + 86 + 87 + def at_uri_to_document_id(at_uri: str) -> str: 88 + if not at_uri.startswith("at://"): 89 + fail(f"invalid at uri: {at_uri}") 90 + trimmed = at_uri[len("at://") :] 91 + parts = trimmed.split("/", 2) 92 + if len(parts) != 3 or not all(parts): 93 + fail(f"invalid at uri: {at_uri}") 94 + did, collection, rkey = parts 95 + return f"{did}|{collection}|{rkey}" 96 + 97 + 98 + def encode_document_id(document_id: str) -> str: 99 + return quote(document_id, safe="") 100 + 101 + 102 + def colorize_json(json_str: str, indent=2) -> str: 103 + """Recursively colorize JSON string for terminal output, retaining indentation. 104 + 105 + CYAN: keys, GREEN: string values, YELLOW: numbers, MAGENTA: booleans/null 106 + """ 107 + 108 + def colorize_and_indent(value: Any, level: int = 0) -> str: 109 + indent_str = " " * (indent * level) 110 + if isinstance(value, dict): 111 + items = [] 112 + for k, v in value.items(): 113 + colored_key = ANSI.CYAN.colorize(json.dumps(k)) 114 + colored_value = colorize_and_indent(v, level + 1) 115 + items.append(f"{indent_str} {colored_key}: {colored_value}") 116 + return "{\n" + ",\n".join(items) + f"\n{indent_str}}}" 117 + elif isinstance(value, list): 118 + items = [colorize_and_indent(v, level + 1) for v in value] 119 + return "[\n" + ",\n".join(f"{indent_str} {i}" for i in items) + f"\n{indent_str}]" 120 + elif isinstance(value, str): 121 + return ANSI.GREEN.colorize(json.dumps(value)) 122 + elif isinstance(value, (int, float)): 123 + return ANSI.YELLOW.colorize(str(value)) 124 + elif isinstance(value, bool) or value is None: 125 + return ANSI.MAGENTA.colorize(str(value).lower()) 126 + else: 127 + return json.dumps(value) 128 + 129 + try: 130 + parsed = json.loads(json_str) 131 + return colorize_and_indent(parsed) 132 + except json.JSONDecodeError: 133 + return json_str 134 + 135 + 136 + def maybe_log_json(opts: Options, label: str, payload: Any) -> None: 137 + if not opts.verbose: 138 + return 139 + pretty = json.dumps(payload, indent=2, sort_keys=True) 140 + echo(f"{label} JSON:\n{colorize_json(pretty)}") 141 + 142 + 143 + def check_healthz(opts: Options) -> None: 144 + echo("checking GET /healthz") 145 + payload = http_get_json(urljoin(opts.base_url, "/healthz")) 146 + maybe_log_json(opts, "healthz", payload) 147 + echo("healthz ok") 148 + 149 + 150 + def check_readyz(opts: Options) -> None: 151 + echo("checking GET /readyz") 152 + payload = http_get_json(urljoin(opts.base_url, "/readyz")) 153 + maybe_log_json(opts, "readyz", payload) 154 + echo("readyz ok") 155 + 156 + 157 + def check_search(opts: Options) -> None: 158 + repo_id = at_uri_to_document_id(opts.repo_at_uri) 159 + profile_id = at_uri_to_document_id(opts.profile_at_uri) 160 + 161 + echo("checking search with indexed repo fixture") 162 + repo_payload = http_get_json(urljoin(opts.base_url, "/search"), {"q": "twisted"}) 163 + if not isinstance(repo_payload, dict): 164 + fail("search response must be a JSON object") 165 + repo_results = repo_payload.get("results") 166 + if not isinstance(repo_results, list): 167 + fail("search response is missing expected fields") 168 + repo_ids = { 169 + r.get("id") for r in repo_results if isinstance(r, dict) and isinstance(r.get("id"), str) 170 + } 171 + if repo_id not in repo_ids: 172 + fail(f"repo fixture id not found in search results: {repo_id}") 173 + maybe_log_json(opts, "search-repo", repo_payload) 174 + 175 + echo("checking search with indexed profile fixture") 176 + profile_payload = http_get_json(urljoin(opts.base_url, "/search"), {"q": opts.actor_handle}) 177 + if not isinstance(profile_payload, dict): 178 + fail("profile search response must be a JSON object") 179 + profile_results = profile_payload.get("results") 180 + if not isinstance(profile_results, list): 181 + fail("profile search response is missing expected fields") 182 + profile_ids = { 183 + r.get("id") for r in profile_results if isinstance(r, dict) and isinstance(r.get("id"), str) 184 + } 185 + if profile_id not in profile_ids: 186 + fail(f"profile fixture id not found in search results: {profile_id}") 187 + maybe_log_json(opts, "search-profile", profile_payload) 188 + echo("search ok") 189 + 190 + 191 + def check_documents(opts: Options) -> None: 192 + document_id = opts.document_id 193 + if not document_id: 194 + echo("no document id provided, deriving first result from search") 195 + payload = http_get_json(urljoin(opts.base_url, "/search"), {"q": opts.query}) 196 + results = payload.get("results") if isinstance(payload, dict) else None 197 + if not isinstance(results, list) or not results: 198 + fail("document id required or search must return at least one result") 199 + first = results[0] 200 + if not isinstance(first, dict) or not isinstance(first.get("id"), str): 201 + fail("search result missing document id") 202 + document_id = first["id"] 203 + 204 + encoded = encode_document_id(document_id) 205 + echo(f"checking GET /documents/{{id}} for {document_id}") 206 + payload = http_get_json(urljoin(opts.base_url, f"/documents/{encoded}")) 207 + maybe_log_json(opts, "documents", payload) 208 + echo("documents ok") 209 + 210 + 211 + def check_indexing(opts: Options) -> None: 212 + repo_id = at_uri_to_document_id(opts.repo_at_uri) 213 + profile_id = at_uri_to_document_id(opts.profile_at_uri) 214 + repo_encoded = encode_document_id(repo_id) 215 + profile_encoded = encode_document_id(profile_id) 216 + 217 + profile_url = urljoin(opts.base_url, f"/documents/{profile_encoded}") 218 + repo_url = urljoin(opts.base_url, f"/documents/{repo_encoded}") 219 + 220 + echo(f"triggering read-through fetch via /actors/{opts.actor_handle}") 221 + assert_status(urljoin(opts.base_url, f"/actors/{opts.actor_handle}"), 200) 222 + 223 + echo(f"triggering read-through fetch via /actors/{opts.actor_handle}/repos") 224 + assert_status(urljoin(opts.base_url, f"/actors/{opts.actor_handle}/repos"), 200) 225 + 226 + echo(f"waiting for queued indexing of profile fixture {profile_id}") 227 + for _ in range(30): 228 + if http_get_status(profile_url) == 200: 229 + if opts.verbose: 230 + payload = http_get_json(profile_url) 231 + maybe_log_json(opts, "indexing-profile", payload) 232 + break 233 + time.sleep(1) 234 + else: 235 + fail(f"profile fixture did not become available at /documents/{profile_encoded} within 30s") 236 + 237 + echo(f"waiting for queued indexing of repo fixture {repo_id}") 238 + for _ in range(30): 239 + if http_get_status(repo_url) == 200: 240 + if opts.verbose: 241 + payload = http_get_json(repo_url) 242 + maybe_log_json(opts, "indexing-repo", payload) 243 + echo("indexing ok") 244 + return 245 + time.sleep(1) 246 + 247 + fail(f"repo fixture did not become available at /documents/{repo_encoded} within 30s") 248 + 249 + 250 + def check_activity(opts: Options) -> None: 251 + echo("checking websocket handshake on /activity/stream") 252 + parsed = urlparse(opts.base_url) 253 + if parsed.scheme not in {"http", "https"}: 254 + fail(f"unsupported base url scheme: {parsed.scheme}") 255 + 256 + host = parsed.hostname 257 + if host is None: 258 + fail(f"invalid base url: {opts.base_url}") 259 + 260 + port = parsed.port 261 + if port is None: 262 + port = 443 if parsed.scheme == "https" else 80 263 + 264 + path = parsed.path.rstrip("/") + "/activity/stream?wantedCollections=sh.tangled.repo" 265 + 266 + conn = HTTPConnection(host, port, timeout=10) 267 + try: 268 + conn.putrequest("GET", path) 269 + conn.putheader("Connection", "Upgrade") 270 + conn.putheader("Upgrade", "websocket") 271 + conn.putheader("Sec-WebSocket-Version", "13") 272 + conn.putheader("Sec-WebSocket-Key", "dGhlIHNhbXBsZSBub25jZQ==") 273 + conn.endheaders() 274 + resp = conn.getresponse() 275 + if resp.status != 101: 276 + fail(f"/activity/stream websocket handshake returned {resp.status} (expected 101)") 277 + echo("activity handshake ok") 278 + finally: 279 + conn.close() 280 + 281 + 282 + CHECKS: dict[str, Any] = { 283 + "healthz": check_healthz, 284 + "readyz": check_readyz, 285 + "search": check_search, 286 + "documents": check_documents, 287 + "indexing": check_indexing, 288 + "activity": check_activity, 289 + } 290 + 291 + 292 + def parse_args(argv: list[str]) -> Options: 293 + parser = argparse.ArgumentParser(description="Twister API smoke checks") 294 + parser.add_argument( 295 + "--base-url", 296 + default=None, 297 + help="Twister API base URL (default env TWISTER_API_BASE_URL or http://localhost:8080)", 298 + ) 299 + parser.add_argument( 300 + "--check", 301 + choices=["all", *CHECKS.keys()], 302 + default="all", 303 + help="Which check to run", 304 + ) 305 + parser.add_argument( 306 + "--query", default="twisted", help="Search query for search/documents checks" 307 + ) 308 + parser.add_argument("--document-id", default="", help="Document ID for documents check") 309 + parser.add_argument( 310 + "--actor-handle", 311 + default="desertthunder.dev", 312 + help="Actor handle used to trigger read-through indexing", 313 + ) 314 + parser.add_argument( 315 + "--repo-at-uri", 316 + default="at://did:plc:xg2vq45muivyy3xwatcehspu/sh.tangled.repo/3mho6hukiei22", 317 + help="Repo AT URI expected to be fetched and indexed by smoke checks", 318 + ) 319 + parser.add_argument( 320 + "--profile-at-uri", 321 + default="at://did:plc:xg2vq45muivyy3xwatcehspu/sh.tangled.actor.profile/self", 322 + help="Profile AT URI expected to be fetched and indexed by smoke checks", 323 + ) 324 + parser.add_argument( 325 + "--verbose", action="store_true", help="Print JSON payloads returned by smoke endpoints" 326 + ) 327 + 328 + ns = parser.parse_args(argv) 329 + base_url = ns.base_url or os.environ.get("TWISTER_API_BASE_URL", "http://localhost:8080") 330 + return Options( 331 + base_url=base_url, 332 + check=ns.check, 333 + query=ns.query, 334 + document_id=ns.document_id, 335 + actor_handle=ns.actor_handle, 336 + repo_at_uri=ns.repo_at_uri, 337 + profile_at_uri=ns.profile_at_uri, 338 + verbose=ns.verbose, 339 + ) 340 + 341 + 342 + def main(argv: list[str] | None = None) -> int: 343 + opts = parse_args(sys.argv[1:] if argv is None else argv) 344 + if opts.check == "all": 345 + for name in ( 346 + "healthz", 347 + "readyz", 348 + "indexing", 349 + "search", 350 + "documents", 351 + "activity", 352 + ): 353 + CHECKS[name](opts) 354 + echo("all API smoke checks passed") 355 + return 0 356 + 357 + CHECKS[opts.check](opts) 358 + return 0 359 + 360 + 361 + if __name__ == "__main__": 362 + raise SystemExit(main())
+8
scripts/api/uv.lock
··· 1 + version = 1 2 + revision = 3 3 + requires-python = ">=3.11" 4 + 5 + [[package]] 6 + name = "twister-api-smoke" 7 + version = "0.1.0" 8 + source = { editable = "." }